azkaban-developers

Changes

src/sql/update.execution_jobs.2.1.sql 4(+0 -4)

src/sql/update.project_events.2.1.sql 1(+0 -1)

src/sql/update.projects.2.1.sql 2(+0 -2)

src/sql/update.schedules.2.1.sql 2(+0 -2)

src/sql/update.schedules.2.2.sql 3(+0 -3)

Details

diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
index 16869b4..94e3841 100644
--- a/src/java/azkaban/trigger/ConditionChecker.java
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -40,7 +40,4 @@ public interface ConditionChecker {
 	
 	long getNextCheckTime();
 	
-//	void setCondition(Condition c);
-//	
-//	String getDescription();
 }
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 6f403e7..594f759 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -53,6 +53,8 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 	
 	private ExecutorManagerEventListener listener = new ExecutorManagerEventListener();
 	
+	private Object syncObj = new Object();
+	
 	private String scannerStage = "";
 	
 	public TriggerManager(Props props, TriggerLoader triggerLoader, ExecutorManager executorManager) throws TriggerManagerException {
@@ -106,51 +108,61 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 		return actionTypeLoader;
 	}
 
-	public synchronized void insertTrigger(Trigger t) throws TriggerManagerException {
-		try {
-			triggerLoader.addTrigger(t);
-		} catch (TriggerLoaderException e) {
-			throw new TriggerManagerException(e);
+	public void insertTrigger(Trigger t) throws TriggerManagerException {
+		synchronized (syncObj) {
+			try {
+				triggerLoader.addTrigger(t);
+			} catch (TriggerLoaderException e) {
+				throw new TriggerManagerException(e);
+			}
+			runnerThread.addTrigger(t);
+			triggerIdMap.put(t.getTriggerId(), t);
 		}
-		runnerThread.addTrigger(t);
-		triggerIdMap.put(t.getTriggerId(), t);
 	}
 	
-	public synchronized void removeTrigger(int id) throws TriggerManagerException {
-		Trigger t = triggerIdMap.get(id);
-		if(t != null) {
-			removeTrigger(triggerIdMap.get(id));
+	public void removeTrigger(int id) throws TriggerManagerException {
+		synchronized (syncObj) {
+			Trigger t = triggerIdMap.get(id);
+			if(t != null) {
+				removeTrigger(triggerIdMap.get(id));
+			}	
 		}
 	}
 	
-	public synchronized void updateTrigger(int id) throws TriggerManagerException {
-		if(! triggerIdMap.containsKey(id)) {
-			throw new TriggerManagerException("The trigger to update " + id + " doesn't exist!");
-		}
-		
-		Trigger t;
-		try {
-			t = triggerLoader.loadTrigger(id);
-		} catch (TriggerLoaderException e) {
-			throw new TriggerManagerException(e);
+	public void updateTrigger(int id) throws TriggerManagerException {
+		synchronized (syncObj) {
+			if(! triggerIdMap.containsKey(id)) {
+				throw new TriggerManagerException("The trigger to update " + id + " doesn't exist!");
+			}
+			
+			Trigger t;
+			try {
+				t = triggerLoader.loadTrigger(id);
+			} catch (TriggerLoaderException e) {
+				throw new TriggerManagerException(e);
+			}
+			updateTrigger(t);	
 		}
-		updateTrigger(t);
 	}
 	
-	public synchronized void updateTrigger(Trigger t) throws TriggerManagerException {
-		runnerThread.deleteTrigger(triggerIdMap.get(t.getTriggerId()));
-		runnerThread.addTrigger(t);
-		triggerIdMap.put(t.getTriggerId(), t);
+	public void updateTrigger(Trigger t) throws TriggerManagerException {
+		synchronized (syncObj) {
+			runnerThread.deleteTrigger(triggerIdMap.get(t.getTriggerId()));
+			runnerThread.addTrigger(t);
+			triggerIdMap.put(t.getTriggerId(), t);	
+		}
 	}
 
-	public synchronized void removeTrigger(Trigger t) throws TriggerManagerException {
-		runnerThread.deleteTrigger(t);
-		triggerIdMap.remove(t.getTriggerId());
-		try {
-			t.stopCheckers();
-			triggerLoader.removeTrigger(t);
-		} catch (TriggerLoaderException e) {
-			throw new TriggerManagerException(e);
+	public void removeTrigger(Trigger t) throws TriggerManagerException {
+		synchronized (syncObj) {
+			runnerThread.deleteTrigger(t);
+			triggerIdMap.remove(t.getTriggerId());
+			try {
+				t.stopCheckers();
+				triggerLoader.removeTrigger(t);
+			} catch (TriggerLoaderException e) {
+				throw new TriggerManagerException(e);
+			}	
 		}
 	}
 	
@@ -183,23 +195,27 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 			this.interrupt();
 		}
 		
-		public synchronized void addJustFinishedFlow(ExecutableFlow flow) {
-			justFinishedFlows.put(flow.getExecutionId(), flow);
+		public void addJustFinishedFlow(ExecutableFlow flow) {
+			synchronized (syncObj) {
+				justFinishedFlows.put(flow.getExecutionId(), flow);		
+			}
 		}
 		
-		public synchronized void addTrigger(Trigger t) {
-			t.updateNextCheckTime();
-			triggers.add(t);
+		public void addTrigger(Trigger t) {
+			synchronized (syncObj) {
+				t.updateNextCheckTime();
+				triggers.add(t);	
+			}
 		}
 		
-		public synchronized void deleteTrigger(Trigger t) {
+		public void deleteTrigger(Trigger t) {
 			triggers.remove(t);
 		}
 
 		public void run() {
 			//while(stillAlive.get()) {
 			while(!shutdown) {
-				synchronized (this) {
+				synchronized (syncObj) {
 					try{
 						lastRunnerThreadCheckTime = System.currentTimeMillis();
 						
@@ -343,8 +359,10 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 		}
 	}
 	
-	public synchronized Trigger getTrigger(int triggerId) {
-		return triggerIdMap.get(triggerId);
+	public Trigger getTrigger(int triggerId) {
+		synchronized (syncObj) {
+			return triggerIdMap.get(triggerId);	
+		}
 	}
 
 	public void expireTrigger(int triggerId) {
@@ -386,20 +404,6 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 		return triggers;
 	}
 
-//	public void loadTrigger(int triggerId) throws TriggerManagerException {
-//		Trigger t;
-//		try {
-//			t = triggerLoader.loadTrigger(triggerId);
-//		} catch (TriggerLoaderException e) {
-//			throw new TriggerManagerException(e);
-//		}
-//		if(t.getStatus().equals(TriggerStatus.PREPARING)) {
-//			triggerIdMap.put(t.getTriggerId(), t);
-//			runnerThread.addTrigger(t);
-//			t.setStatus(TriggerStatus.READY);
-//		}
-//	}
-
 	@Override
 	public void insertTrigger(Trigger t, String user) throws TriggerManagerException {
 		insertTrigger(t);
@@ -415,19 +419,6 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 		updateTrigger(t);
 	}
 	
-//	@Override
-//	public void insertTrigger(int triggerId, String user) throws TriggerManagerException {
-//		Trigger t;
-//		try {
-//			t = triggerLoader.loadTrigger(triggerId);
-//		} catch (TriggerLoaderException e) {
-//			throw new TriggerManagerException(e);
-//		}
-//		if(t != null) {
-//			insertTrigger(t);
-//		}
-//	}
-	
 	@Override
 	public void shutdown() {
 		runnerThread.shutdown();
@@ -510,12 +501,14 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 		}
 		
 		@Override
-		public synchronized void handleEvent(Event event) {
-			
-			ExecutableFlow flow = (ExecutableFlow) event.getRunner();
-			if (event.getType() == Type.FLOW_FINISHED) {
-				logger.info("Flow finish event received. " + flow.getExecutionId() );
-				runnerThread.addJustFinishedFlow(flow);
+		public void handleEvent(Event event) {
+			// this needs to be fixed for perf
+			synchronized (syncObj) {
+				ExecutableFlow flow = (ExecutableFlow) event.getRunner();
+				if (event.getType() == Type.FLOW_FINISHED) {
+					logger.info("Flow finish event received. " + flow.getExecutionId() );
+					runnerThread.addJustFinishedFlow(flow);
+				}
 			}
 		}
 	}
diff --git a/src/package/soloserver/bin/azkaban-solo-start.sh b/src/package/soloserver/bin/azkaban-solo-start.sh
index 54ac8d9..7d2fb8b 100755
--- a/src/package/soloserver/bin/azkaban-solo-start.sh
+++ b/src/package/soloserver/bin/azkaban-solo-start.sh
@@ -45,7 +45,7 @@ serverpath=`pwd`
 if [ -z $AZKABAN_OPTS ]; then
   AZKABAN_OPTS=-Xmx3G
 fi
-AZKABAN_OPTS=$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath
+AZKABAN_OPTS="$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath"
 
 java $AZKABAN_OPTS -cp $CLASSPATH azkaban.webapp.AzkabanSingleServer -conf $azkaban_dir/conf $@ &