Details
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
index e222c32..e9e4d35 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -21,11 +21,12 @@ import azkaban.metric.IMetricEmitter;
import azkaban.metric.MetricException;
import azkaban.utils.Props;
import java.util.Date;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.log4j.Logger;
@@ -46,7 +47,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
/**
* Data structure to keep track of snapshots
*/
- protected Map<String, LinkedList<InMemoryHistoryNode>> historyListMapping;
+ protected Map<String, LinkedBlockingDeque<InMemoryHistoryNode>> historyListMapping;
/**
* Interval (in millisecond) from today for which we should maintain the in memory snapshots
*/
@@ -60,7 +61,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
* @param azkProps Azkaban Properties
*/
public InMemoryMetricEmitter(final Props azkProps) {
- this.historyListMapping = new HashMap<>();
+ this.historyListMapping = new ConcurrentHashMap<>();
this.timeWindow = azkProps.getLong(INMEMORY_METRIC_REPORTER_WINDOW, 60 * 60 * 24 * 7 * 1000);
this.numInstances = azkProps.getLong(INMEMORY_METRIC_NUM_INSTANCES, 50);
this.standardDeviationFactor = azkProps.getDouble(INMEMORY_METRIC_STANDARDDEVIATION_FACTOR, 2);
@@ -92,7 +93,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
final String metricName = metric.getName();
if (!this.historyListMapping.containsKey(metricName)) {
logger.info("First time capturing metric: " + metricName);
- this.historyListMapping.put(metricName, new LinkedList<>());
+ this.historyListMapping.put(metricName, new LinkedBlockingDeque<>());
}
synchronized (this.historyListMapping.get(metricName)) {
logger.debug("Ingesting metric: " + metricName);
diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index e9e6e9b..8ddecaa 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -115,6 +115,8 @@ public class FileIOUtils {
for (final File targetFile : targetFiles) {
if (targetFile.isFile()) {
final File linkFile = new File(path, targetFile.getName());
+ // NOTE!! If modifying this, you must run this ignored test manually to validate:
+ // FileIOUtilsTest#testHardlinkCopyOfBigDir
Files.createLink(linkFile.toPath(), Paths.get(targetFile.getAbsolutePath()));
}
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/SleepJavaJob.java b/azkaban-common/src/test/java/azkaban/executor/SleepJavaJob.java
index 4bb2b69..e43c056 100644
--- a/azkaban-common/src/test/java/azkaban/executor/SleepJavaJob.java
+++ b/azkaban-common/src/test/java/azkaban/executor/SleepJavaJob.java
@@ -88,7 +88,9 @@ public class SleepJavaJob {
System.out.println("Sec " + sec);
synchronized (this) {
try {
- this.wait(sec * 1000);
+ if (sec > 0) {
+ this.wait(sec * 1000);
+ }
} catch (final InterruptedException e) {
System.out.println("Interrupted " + this.fail);
}
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
index 5a3fd43..ccbbf62 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
@@ -126,7 +126,7 @@ public class JavaProcessJobTest {
// initialize the Props
this.props.put(JavaProcessJob.JAVA_CLASS,
"azkaban.executor.SleepJavaJob");
- this.props.put("seconds", 1);
+ this.props.put("seconds", 0);
this.props.put("input", inputFile);
this.props.put("output", outputFile);
this.props.put("classpath", classPaths);
diff --git a/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
index 7aec3c9..6448c63 100644
--- a/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
@@ -10,6 +10,9 @@ import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
import azkaban.utils.Props;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;
@@ -21,6 +24,7 @@ public class MetricManagerTest {
MetricReportManager manager;
FakeMetric metric;
InMemoryMetricEmitter emitter;
+ MetricEmitterWrapper emitterWrapper;
@Before
public void setUp() throws Exception {
@@ -28,7 +32,8 @@ public class MetricManagerTest {
this.metric = new FakeMetric(this.manager);
this.manager.addMetric(this.metric);
this.emitter = new InMemoryMetricEmitter(new Props());
- this.manager.addMetricEmitter(this.emitter);
+ this.emitterWrapper = new MetricEmitterWrapper();
+ this.manager.addMetricEmitter(this.emitterWrapper);
}
/**
@@ -60,13 +65,14 @@ public class MetricManagerTest {
*/
@Test
public void managerEmitterMaintenanceTest() {
- assertTrue("Failed to add Emitter", this.manager.getMetricEmitters().contains(this.emitter));
+ assertTrue("Failed to add Emitter",
+ this.manager.getMetricEmitters().contains(this.emitterWrapper));
final int originalSize = this.manager.getMetricEmitters().size();
- this.manager.removeMetricEmitter(this.emitter);
+ this.manager.removeMetricEmitter(this.emitterWrapper);
assertEquals("Failed to remove emitter", this.manager.getMetricEmitters().size(),
originalSize - 1);
- this.manager.addMetricEmitter(this.emitter);
+ this.manager.addMetricEmitter(this.emitterWrapper);
}
/**
@@ -75,15 +81,31 @@ public class MetricManagerTest {
@Test
public void managerEmitterHandlingTest() throws Exception {
this.emitter.purgeAllData();
- final Date from = new Date();
+ final Date from = DateTime.now().minusMinutes(1).toDate();
this.metric.notifyManager();
- Thread.sleep(2000);
+ this.emitterWrapper.countDownLatch.await(10L, TimeUnit.SECONDS);
- final List<InMemoryHistoryNode> nodes = this.emitter
- .getMetrics("FakeMetric", from, new Date(), false);
+ final Date to = DateTime.now().plusMinutes(1).toDate();
+ final List<InMemoryHistoryNode> nodes = this.emitter.getMetrics("FakeMetric", from, to, false);
assertEquals("Failed to report metric", 1, nodes.size());
assertEquals("Failed to report metric", nodes.get(0).getValue(), 4);
}
+
+ private class MetricEmitterWrapper implements IMetricEmitter {
+
+ private final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ @Override
+ public void reportMetric(final IMetric<?> metric) throws MetricException {
+ MetricManagerTest.this.emitter.reportMetric(metric);
+ this.countDownLatch.countDown();
+ }
+
+ @Override
+ public void purgeAllData() throws MetricException {
+ MetricManagerTest.this.emitter.purgeAllData();
+ }
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
index 679fe33..376b573 100644
--- a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
@@ -32,6 +32,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.comparator.NameFileComparator;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -116,6 +117,7 @@ public class FileIOUtilsTest {
assertThat(areDirsEqual(this.baseDir, this.sourceDir, true)).isTrue();
}
+ @Ignore("Slow test (over 30s) - run manually if need to touch createDeepHardlink()")
@Test
public void testHardlinkCopyOfBigDir() throws IOException {
final File bigDir = new File(this.baseDir.getAbsolutePath() + "/bigdir");