diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
index af61be4..1598d27 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -52,7 +52,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
* </pre>
* */
static {
- filterRepository = new HashMap<String, FactorFilter<Executor, ExecutableFlow>>();
+ filterRepository = new HashMap<>();
filterRepository.put(STATICREMAININGFLOWSIZE_FILTER_NAME, getStaticRemainingFlowSizeFilter());
filterRepository.put(MINIMUMFREEMEMORY_FILTER_NAME, getMinimumReservedMemoryFilter());
filterRepository.put(CPUSTATUS_FILTER_NAME, getCpuStatusFilter());
@@ -95,23 +95,21 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
*</pre>
* */
private static FactorFilter<Executor, ExecutableFlow> getStaticRemainingFlowSizeFilter(){
- return FactorFilter.create(STATICREMAININGFLOWSIZE_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
- public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
- if (null == filteringTarget){
- logger.debug(String.format("%s : filtering out the target as it is null.", STATICREMAININGFLOWSIZE_FILTER_NAME));
- return false;
- }
+ return FactorFilter.create(STATICREMAININGFLOWSIZE_FILTER_NAME, (filteringTarget, referencingObject) -> {
+ if (null == filteringTarget){
+ logger.debug(String.format("%s : filtering out the target as it is null.", STATICREMAININGFLOWSIZE_FILTER_NAME));
+ return false;
+ }
- ExecutorInfo stats = filteringTarget.getExecutorInfo();
- if (null == stats) {
- logger.debug(String.format("%s : filtering out %s as it's stats is unavailable.",
- STATICREMAININGFLOWSIZE_FILTER_NAME,
- filteringTarget.toString()));
- return false;
- }
- return stats.getRemainingFlowCapacity() > 0 ;
- }
- });
+ ExecutorInfo stats = filteringTarget.getExecutorInfo();
+ if (null == stats) {
+ logger.debug(String.format("%s : filtering out %s as it's stats is unavailable.",
+ STATICREMAININGFLOWSIZE_FILTER_NAME,
+ filteringTarget.toString()));
+ return false;
+ }
+ return stats.getRemainingFlowCapacity() > 0 ;
+ });
}
/**<pre>
@@ -124,6 +122,8 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
private static FactorFilter<Executor, ExecutableFlow> getMinimumReservedMemoryFilter(){
return FactorFilter.create(MINIMUMFREEMEMORY_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
private static final int MINIMUM_FREE_MEMORY = 6 * 1024;
+
+ @Override
public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
if (null == filteringTarget){
logger.debug(String.format("%s : filtering out the target as it is null.", MINIMUMFREEMEMORY_FILTER_NAME));
@@ -154,6 +154,8 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
private static FactorFilter<Executor, ExecutableFlow> getCpuStatusFilter(){
return FactorFilter.create(CPUSTATUS_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
private static final int MAX_CPU_CURRENT_USAGE = 95;
+
+ @Override
public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
if (null == filteringTarget){
logger.debug(String.format("%s : filtering out the target as it is null.", CPUSTATUS_FILTER_NAME));
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
index 04b04b7..3cc0154 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
@@ -70,6 +70,6 @@ public final class FactorFilter<T,V>{
* the object being checked need to be filtered or not.
* @return true if the check passed, false if check failed, which means the item need to be filtered.
* */
- public boolean filterTarget(T filteringTarget, V referencingObject);
+ boolean filterTarget(T filteringTarget, V referencingObject);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
index f4df5e2..25b735c 100644
--- a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -110,14 +110,11 @@ public class MetricReportManager {
logger.debug(String.format("Submitting %s metric for metric emission pool", metricSnapshot.getName()));
// report to all emitters
for (final IMetricEmitter metricEmitter : metricEmitters) {
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- metricEmitter.reportMetric(metricSnapshot);
- } catch (Exception ex) {
- logger.error(String.format("Failed to report %s metric due to ", metricSnapshot.getName()), ex);
- }
+ executorService.submit(() -> {
+ try {
+ metricEmitter.reportMetric(metricSnapshot);
+ } catch (Exception ex) {
+ logger.error(String.format("Failed to report %s metric due to ", metricSnapshot.getName()), ex);
}
});
}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerStatus.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerStatus.java
index 1a55249..d213ebb 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerStatus.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerStatus.java
@@ -19,7 +19,7 @@ package azkaban.trigger;
public enum TriggerStatus {
READY(10), PAUSED(20), EXPIRED(30);
- private int numVal;
+ private final int numVal;
TriggerStatus(int numVal) {
this.numVal = numVal;
@@ -28,18 +28,4 @@ public enum TriggerStatus {
public int getNumVal() {
return numVal;
}
-
- public static TriggerStatus fromInteger(int x) {
- switch (x) {
- case 10:
- return READY;
- case 20:
- return PAUSED;
- case 30:
- return EXPIRED;
- default:
- return READY;
- }
- }
-
}