azkaban-aplcache
Changes
.gitignore 2(+2 -0)
Details
.gitignore 2(+2 -0)
diff --git a/.gitignore b/.gitignore
index ec34c0c..97422aa 100644
--- a/.gitignore
+++ b/.gitignore
@@ -26,3 +26,5 @@ temp/
.shelf
# Intellij default build output directory
out/
+
+currentpid
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index f000db8..3013c4b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1734,7 +1734,7 @@ public class ExecutorManager extends EventHandler implements
private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
private final int maxDispatchingErrors;
- private final long activeExecutorRefreshWindowInMilisec;
+ private final long activeExecutorRefreshWindowInMillisec;
private final int activeExecutorRefreshWindowInFlows;
private volatile boolean shutdown = false;
@@ -1748,7 +1748,7 @@ public class ExecutorManager extends EventHandler implements
this.maxDispatchingErrors = maxDispatchingErrors;
this.activeExecutorRefreshWindowInFlows =
activeExecutorRefreshWindowInFlows;
- this.activeExecutorRefreshWindowInMilisec =
+ this.activeExecutorRefreshWindowInMillisec =
activeExecutorRefreshWindowInTime;
this.setName("AzkabanWebServer-QueueProcessor-Thread");
}
@@ -1775,7 +1775,7 @@ public class ExecutorManager extends EventHandler implements
try {
// start processing queue if active, other wait for sometime
if (this.isActive) {
- processQueuedFlows(this.activeExecutorRefreshWindowInMilisec,
+ processQueuedFlows(this.activeExecutorRefreshWindowInMillisec,
this.activeExecutorRefreshWindowInFlows);
}
wait(QUEUE_PROCESSOR_WAIT_IN_MS);
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
index e9e4d35..6161af1 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -70,7 +70,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
/**
* Update reporting interval
*
- * @param val interval in milli seconds
+ * @param val interval in milliseconds
*/
public synchronized void setReportingInterval(final long val) {
this.timeWindow = val;
diff --git a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
index 55d03d1..937060a 100644
--- a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
@@ -26,8 +26,8 @@ import java.util.TimerTask;
*/
public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
- protected long MAX_MILISEC_INTERVAL = 60 * 60 * 1000;
- protected long MIN_MILISEC_INTERVAL = 3 * 1000;
+ protected long MAX_MILLISEC_INTERVAL = 60 * 60 * 1000;
+ protected long MIN_MILLISEC_INTERVAL = 3 * 1000;
private Timer timer;
/**
@@ -84,7 +84,7 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
}
private boolean isValidInterval(final long interval) {
- return interval >= this.MIN_MILISEC_INTERVAL && interval <= this.MAX_MILISEC_INTERVAL;
+ return interval >= this.MIN_MILLISEC_INTERVAL && interval <= this.MAX_MILLISEC_INTERVAL;
}
/**
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index 65942e9..58c6bd9 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -357,4 +357,12 @@ public class ExecutionFlowDaoTest {
assertThat(new HashSet<>(flow1.getEndNodes())).isEqualTo(new HashSet<>(flow2.getEndNodes()));
}
+ /**
+ * restores the clock; see {@link #testFetchEmptyRecentlyFinishedFlows()}
+ */
+ @After
+ public void clockReset() {
+ DateTimeUtils.setCurrentMillisOffset(0);
+ }
+
}
diff --git a/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
index 6448c63..e8bd96e 100644
--- a/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
@@ -80,13 +80,18 @@ public class MetricManagerTest {
*/
@Test
public void managerEmitterHandlingTest() throws Exception {
+
+ // metrics use System.currentTimeMillis, so that method should be the millis provider
+ final DateTime aboutNow = new DateTime(System.currentTimeMillis());
+
this.emitter.purgeAllData();
- final Date from = DateTime.now().minusMinutes(1).toDate();
+
+ final Date from = aboutNow.minusMinutes(1).toDate();
this.metric.notifyManager();
this.emitterWrapper.countDownLatch.await(10L, TimeUnit.SECONDS);
- final Date to = DateTime.now().plusMinutes(1).toDate();
+ final Date to = aboutNow.plusMinutes(1).toDate();
final List<InMemoryHistoryNode> nodes = this.emitter.getMetrics("FakeMetric", from, to, false);
assertEquals("Failed to report metric", 1, nodes.size());
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
index cf0a72c..cd75f7c 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
@@ -55,7 +55,9 @@ public class DirectoryFlowLoaderTest {
parent.mkdirs();
}
- IOUtils.copy(tais, new FileOutputStream(outputFile));
+ try (FileOutputStream os = new FileOutputStream(outputFile)) {
+ IOUtils.copy(tais, os);
+ }
}
return outputDir;