azkaban-developers
Changes
src/java/azkaban/trigger/TriggerManager.java 145(+69 -76)
Details
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
index 16869b4..94e3841 100644
--- a/src/java/azkaban/trigger/ConditionChecker.java
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -40,7 +40,4 @@ public interface ConditionChecker {
long getNextCheckTime();
-// void setCondition(Condition c);
-//
-// String getDescription();
}
src/java/azkaban/trigger/TriggerManager.java 145(+69 -76)
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 6f403e7..594f759 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -53,6 +53,8 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
private ExecutorManagerEventListener listener = new ExecutorManagerEventListener();
+ private Object syncObj = new Object();
+
private String scannerStage = "";
public TriggerManager(Props props, TriggerLoader triggerLoader, ExecutorManager executorManager) throws TriggerManagerException {
@@ -106,51 +108,61 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
return actionTypeLoader;
}
- public synchronized void insertTrigger(Trigger t) throws TriggerManagerException {
- try {
- triggerLoader.addTrigger(t);
- } catch (TriggerLoaderException e) {
- throw new TriggerManagerException(e);
+ public void insertTrigger(Trigger t) throws TriggerManagerException {
+ synchronized (syncObj) {
+ try {
+ triggerLoader.addTrigger(t);
+ } catch (TriggerLoaderException e) {
+ throw new TriggerManagerException(e);
+ }
+ runnerThread.addTrigger(t);
+ triggerIdMap.put(t.getTriggerId(), t);
}
- runnerThread.addTrigger(t);
- triggerIdMap.put(t.getTriggerId(), t);
}
- public synchronized void removeTrigger(int id) throws TriggerManagerException {
- Trigger t = triggerIdMap.get(id);
- if(t != null) {
- removeTrigger(triggerIdMap.get(id));
+ public void removeTrigger(int id) throws TriggerManagerException {
+ synchronized (syncObj) {
+ Trigger t = triggerIdMap.get(id);
+ if(t != null) {
+ removeTrigger(triggerIdMap.get(id));
+ }
}
}
- public synchronized void updateTrigger(int id) throws TriggerManagerException {
- if(! triggerIdMap.containsKey(id)) {
- throw new TriggerManagerException("The trigger to update " + id + " doesn't exist!");
- }
-
- Trigger t;
- try {
- t = triggerLoader.loadTrigger(id);
- } catch (TriggerLoaderException e) {
- throw new TriggerManagerException(e);
+ public void updateTrigger(int id) throws TriggerManagerException {
+ synchronized (syncObj) {
+ if(! triggerIdMap.containsKey(id)) {
+ throw new TriggerManagerException("The trigger to update " + id + " doesn't exist!");
+ }
+
+ Trigger t;
+ try {
+ t = triggerLoader.loadTrigger(id);
+ } catch (TriggerLoaderException e) {
+ throw new TriggerManagerException(e);
+ }
+ updateTrigger(t);
}
- updateTrigger(t);
}
- public synchronized void updateTrigger(Trigger t) throws TriggerManagerException {
- runnerThread.deleteTrigger(triggerIdMap.get(t.getTriggerId()));
- runnerThread.addTrigger(t);
- triggerIdMap.put(t.getTriggerId(), t);
+ public void updateTrigger(Trigger t) throws TriggerManagerException {
+ synchronized (syncObj) {
+ runnerThread.deleteTrigger(triggerIdMap.get(t.getTriggerId()));
+ runnerThread.addTrigger(t);
+ triggerIdMap.put(t.getTriggerId(), t);
+ }
}
- public synchronized void removeTrigger(Trigger t) throws TriggerManagerException {
- runnerThread.deleteTrigger(t);
- triggerIdMap.remove(t.getTriggerId());
- try {
- t.stopCheckers();
- triggerLoader.removeTrigger(t);
- } catch (TriggerLoaderException e) {
- throw new TriggerManagerException(e);
+ public void removeTrigger(Trigger t) throws TriggerManagerException {
+ synchronized (syncObj) {
+ runnerThread.deleteTrigger(t);
+ triggerIdMap.remove(t.getTriggerId());
+ try {
+ t.stopCheckers();
+ triggerLoader.removeTrigger(t);
+ } catch (TriggerLoaderException e) {
+ throw new TriggerManagerException(e);
+ }
}
}
@@ -183,23 +195,27 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
this.interrupt();
}
- public synchronized void addJustFinishedFlow(ExecutableFlow flow) {
- justFinishedFlows.put(flow.getExecutionId(), flow);
+ public void addJustFinishedFlow(ExecutableFlow flow) {
+ synchronized (syncObj) {
+ justFinishedFlows.put(flow.getExecutionId(), flow);
+ }
}
- public synchronized void addTrigger(Trigger t) {
- t.updateNextCheckTime();
- triggers.add(t);
+ public void addTrigger(Trigger t) {
+ synchronized (syncObj) {
+ t.updateNextCheckTime();
+ triggers.add(t);
+ }
}
- public synchronized void deleteTrigger(Trigger t) {
+ public void deleteTrigger(Trigger t) {
triggers.remove(t);
}
public void run() {
//while(stillAlive.get()) {
while(!shutdown) {
- synchronized (this) {
+ synchronized (syncObj) {
try{
lastRunnerThreadCheckTime = System.currentTimeMillis();
@@ -343,8 +359,10 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
}
}
- public synchronized Trigger getTrigger(int triggerId) {
- return triggerIdMap.get(triggerId);
+ public Trigger getTrigger(int triggerId) {
+ synchronized (syncObj) {
+ return triggerIdMap.get(triggerId);
+ }
}
public void expireTrigger(int triggerId) {
@@ -386,20 +404,6 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
return triggers;
}
-// public void loadTrigger(int triggerId) throws TriggerManagerException {
-// Trigger t;
-// try {
-// t = triggerLoader.loadTrigger(triggerId);
-// } catch (TriggerLoaderException e) {
-// throw new TriggerManagerException(e);
-// }
-// if(t.getStatus().equals(TriggerStatus.PREPARING)) {
-// triggerIdMap.put(t.getTriggerId(), t);
-// runnerThread.addTrigger(t);
-// t.setStatus(TriggerStatus.READY);
-// }
-// }
-
@Override
public void insertTrigger(Trigger t, String user) throws TriggerManagerException {
insertTrigger(t);
@@ -415,19 +419,6 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
updateTrigger(t);
}
-// @Override
-// public void insertTrigger(int triggerId, String user) throws TriggerManagerException {
-// Trigger t;
-// try {
-// t = triggerLoader.loadTrigger(triggerId);
-// } catch (TriggerLoaderException e) {
-// throw new TriggerManagerException(e);
-// }
-// if(t != null) {
-// insertTrigger(t);
-// }
-// }
-
@Override
public void shutdown() {
runnerThread.shutdown();
@@ -510,12 +501,14 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
}
@Override
- public synchronized void handleEvent(Event event) {
-
- ExecutableFlow flow = (ExecutableFlow) event.getRunner();
- if (event.getType() == Type.FLOW_FINISHED) {
- logger.info("Flow finish event received. " + flow.getExecutionId() );
- runnerThread.addJustFinishedFlow(flow);
+ public void handleEvent(Event event) {
+ // this needs to be fixed for perf
+ synchronized (syncObj) {
+ ExecutableFlow flow = (ExecutableFlow) event.getRunner();
+ if (event.getType() == Type.FLOW_FINISHED) {
+ logger.info("Flow finish event received. " + flow.getExecutionId() );
+ runnerThread.addJustFinishedFlow(flow);
+ }
}
}
}
diff --git a/src/package/soloserver/bin/azkaban-solo-start.sh b/src/package/soloserver/bin/azkaban-solo-start.sh
index 54ac8d9..7d2fb8b 100755
--- a/src/package/soloserver/bin/azkaban-solo-start.sh
+++ b/src/package/soloserver/bin/azkaban-solo-start.sh
@@ -45,7 +45,7 @@ serverpath=`pwd`
if [ -z $AZKABAN_OPTS ]; then
AZKABAN_OPTS=-Xmx3G
fi
-AZKABAN_OPTS=$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath
+AZKABAN_OPTS="$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath"
java $AZKABAN_OPTS -cp $CLASSPATH azkaban.webapp.AzkabanSingleServer -conf $azkaban_dir/conf $@ &