azkaban-uncached

trigger manager in web server

9/5/2013 7:42:12 PM

Changes

src/java/azkaban/executor/RemoteExecutorConnector.java 5(+0 -5)

Details

diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 152a546..16dc8d0 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -477,4 +477,15 @@ public class ExecutableFlow {
 	public void setVersion(int version) {
 		this.version = version;
 	}
+	
+	public static boolean isFinished(ExecutableFlow flow) {
+		switch(flow.getStatus()) {
+		case SUCCEEDED:
+		case FAILED:
+		case KILLED:
+			return true;
+		default:
+			return false;
+		}
+	}
 }
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 92a941c..7e1f2fc 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -59,7 +59,7 @@ import azkaban.utils.PropsUtils;
  * Executor manager used to manage the client side job.
  *
  */
-public class ExecutorManager {
+public class ExecutorManager implements ExecutorManagerAdapter {
 	private static Logger logger = Logger.getLogger(ExecutorManager.class);
 	private ExecutorLoader executorLoader;
 	private String executorHost;
@@ -77,8 +77,6 @@ public class ExecutorManager {
 	
 	private long lastThreadCheckTime = -1;
 	
-	private final boolean isActive;
-	
 	private Map<String, Alerter> alerters;
 	
 	public interface Alerter {
@@ -88,28 +86,21 @@ public class ExecutorManager {
 		void alertOnSla(SlaOption slaOption, String slaMessage) throws Exception;
 	}
 	
-	public ExecutorManager(Props props, ExecutorLoader loader, boolean isActive) throws ExecutorManagerException {
+	public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
 		this.executorLoader = loader;
 		this.loadRunningFlows();
 		
 		executorHost = props.getString("executor.host", "localhost");
 		executorPort = props.getInt("executor.port");
 		
-		
-		
-		this.isActive = isActive;		
-		
 		executingManager = new ExecutingManagerUpdaterThread();
 		executingManager.start();
 		
-		if(isActive) {
-
-			long executionLogsRetentionMs = props.getLong("execution.logs.retention.ms", DEFAULT_EXECUTION_LOGS_RETENTION_MS);
-			cleanerThread = new CleanerThread(executionLogsRetentionMs);
-			cleanerThread.start();
-			
-			alerters = loadAlerters(props);
-		}
+		long executionLogsRetentionMs = props.getLong("execution.logs.retention.ms", DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+		cleanerThread = new CleanerThread(executionLogsRetentionMs);
+		cleanerThread.start();
+		
+		alerters = loadAlerters(props);
 	}
 	
 	private Map<String, Alerter> loadAlerters(Props props) {
@@ -249,23 +240,26 @@ public class ExecutorManager {
 		
 	}
 
-	public String getExecutorHost() {
-		return executorHost;
-	}
-	
-	public int getExecutorPort() {
-		return executorPort;
-	}
+//	private String getExecutorHost() {
+//		return executorHost;
+//	}
+//	
+//	private int getExecutorPort() {
+//		return executorPort;
+//	}
 	
-	public State getExecutorThreadState() {
+	@Override
+	public State getExecutorManagerThreadState() {
 		return executingManager.getState();
 	}
 	
-	public boolean isThreadActive() {
+	@Override
+	public boolean isExecutorManagerThreadActive() {
 		return executingManager.isAlive();
 	}
 	
-	public long getLastThreadCheckTime() {
+	@Override
+	public long getLastExecutorManagerThreadCheckTime() {
 		return lastThreadCheckTime;
 	}
 	
@@ -273,6 +267,7 @@ public class ExecutorManager {
 		return this.lastCleanerThreadCheckTime;
 	}
 	
+	@Override
 	public Set<String> getPrimaryServerHosts() {
 		// Only one for now. More probably later.
 		HashSet<String> ports = new HashSet<String>();
@@ -280,6 +275,7 @@ public class ExecutorManager {
 		return ports;
 	}
 	
+	@Override
 	public Set<String> getAllActiveExecutorServerHosts() {
 		// Includes non primary server/hosts
 		HashSet<String> ports = new HashSet<String>();
@@ -292,15 +288,16 @@ public class ExecutorManager {
 		return ports;
 	}
 	
-	public ExecutableFlow fetchExecutableFlow(int execId) throws ExecutorManagerException {
-		ExecutableFlow exflow = executorLoader.fetchExecutableFlow(execId);
-		return exflow;
-	}
+//	private ExecutableFlow fetchExecutableFlow(int execId) throws ExecutorManagerException {
+//		ExecutableFlow exflow = executorLoader.fetchExecutableFlow(execId);
+//		return exflow;
+//	}
 	
 	private void loadRunningFlows() throws ExecutorManagerException {
 		runningFlows.putAll(executorLoader.fetchActiveFlows());
 	}
 	
+	@Override
 	public List<Integer> getRunningFlows(int projectId, String flowId) {
 		ArrayList<Integer> executionIds = new ArrayList<Integer>();
 		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
@@ -308,31 +305,29 @@ public class ExecutorManager {
 				executionIds.add(ref.getFirst().getExecId());
 			}
 		}
-		
 		return executionIds;
 	}
 	
+	@Override
 	public boolean isFlowRunning(int projectId, String flowId) {
 		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
-
 			if (ref.getSecond().getProjectId() == projectId && ref.getSecond().getFlowId().equals(flowId)) {
 				return true;
 			}
 		}
-		
 		return false;
 	}
 	
+	@Override
 	public ExecutableFlow getExecutableFlow(int execId) throws ExecutorManagerException {
 		Pair<ExecutionReference, ExecutableFlow> active = runningFlows.get(execId);
-		
 		if (active == null) {
 			return executorLoader.fetchExecutableFlow(execId);
 		}
-
 		return active.getSecond();
 	}
 	
+	@Override
 	public List<ExecutableFlow> getRunningFlows() {
 		ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
 		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
@@ -341,43 +336,52 @@ public class ExecutorManager {
 		return flows;
 	}
 	
+	@Override
 	public List<ExecutableFlow> getRecentlyFinishedFlows() {
 		return new ArrayList<ExecutableFlow>(recentlyFinished.values());
 	}
 	
+	@Override
 	public List<ExecutableFlow> getExecutableFlows(Project project, String flowId, int skip, int size) throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
 		return flows;
 	}
 	
+	@Override
 	public List<ExecutableFlow> getExecutableFlows(int skip, int size) throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
 		return flows;
 	}
 	
+	@Override
 	public List<ExecutableFlow> getExecutableFlows(String flowIdContains, int skip, int size) throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
 		return flows;
 	}
 
+	@Override
 	public List<ExecutableFlow> getExecutableFlows(String projContain, String flowContain, String userContain, int status, long begin, long end, int skip, int size) throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(projContain, flowContain, userContain, status, begin, end , skip, size);
 		return flows;
 	}
 	
+	@Override
 	public List<ExecutableJobInfo> getExecutableJobs(Project project, String jobId, int skip, int size) throws ExecutorManagerException {
 		List<ExecutableJobInfo> nodes = executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
 		return nodes;
 	}
 	
+	@Override
 	public int getNumberOfJobExecutions(Project project, String jobId) throws ExecutorManagerException{
 		return executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
 	}
 	
+	@Override
 	public int getNumberOfExecutions(Project project, String flowId) throws ExecutorManagerException{
 		return executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
 	}
 	
+	@Override
 	public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset, int length) throws ExecutorManagerException {
 		Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
 		if (pair != null) {
@@ -395,10 +399,10 @@ public class ExecutorManager {
 		}
 	}
 	
+	@Override
 	public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
 		Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
 		if (pair != null) {
-
 			Pair<String,String> typeParam = new Pair<String,String>("type", "job");
 			Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
 			Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
@@ -415,6 +419,7 @@ public class ExecutorManager {
 		}
 	}
 	
+	@Override
 	public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
 		Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
 		if (pair != null) {
@@ -434,6 +439,7 @@ public class ExecutorManager {
 		}
 	}
 	
+	@Override
 	public void cancelFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
 		synchronized(exFlow) {
 			Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
@@ -444,6 +450,7 @@ public class ExecutorManager {
 		}
 	}
 	
+	@Override
 	public void resumeFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
 		synchronized(exFlow) {
 			Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
@@ -454,6 +461,7 @@ public class ExecutorManager {
 		}
 	}
 	
+	@Override
 	public void pauseFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
 		synchronized(exFlow) {
 			Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
@@ -464,30 +472,37 @@ public class ExecutorManager {
 		}
 	}
 	
+	@Override
 	public void pauseExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
 		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId, jobIds);
 	}
 	
+	@Override
 	public void resumeExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
 		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId, jobIds);
 	}
 	
+	@Override
 	public void retryFailures(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
 		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
 	}
 	
+	@Override
 	public void retryExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
 		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId, jobIds);
 	}
 	
+	@Override
 	public void disableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
 		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId, jobIds);
 	}
 	
+	@Override
 	public void enableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
 		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId, jobIds);
 	}
 	
+	@Override
 	public void cancelExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
 		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId, jobIds);
 	}
@@ -530,9 +545,10 @@ public class ExecutorManager {
 		}
 	}
 	
-	public String submitExecutableFlow(ExecutableFlow exflow) throws ExecutorManagerException {
+	@Override
+	public String submitExecutableFlow(ExecutableFlow exflow, String userId) throws ExecutorManagerException {
 		synchronized(exflow) {
-			logger.info("Submitting execution flow " + exflow.getFlowId());
+			logger.info("Submitting execution flow " + exflow.getFlowId() + " by " + userId);
 
 			int projectId = exflow.getProjectId();
 			String flowId = exflow.getFlowId();
@@ -592,7 +608,7 @@ public class ExecutorManager {
 	}
 	
 	
-	public void cleanOldExecutionLogs(long millis) {
+	private void cleanOldExecutionLogs(long millis) {
 		try {
 			int count = executorLoader.removeExecutionLogsByTime(millis);
 			logger.info("Cleaned up " + count + " log entries.");
@@ -688,6 +704,7 @@ public class ExecutorManager {
 		return jsonResponse;
 	}
 	
+	@Override
 	public Map<String, Object> callExecutorJMX(String hostPort, String action, String mBean) throws IOException {
 		URIBuilder builder = new URIBuilder();
 		
@@ -732,6 +749,7 @@ public class ExecutorManager {
 		return jsonResponse;
 	}
 	
+	@Override
 	public void shutdown() {
 		executingManager.shutdown();
 	}
@@ -762,7 +780,7 @@ public class ExecutorManager {
 				try {
 					lastThreadCheckTime = System.currentTimeMillis();
 					
-					loadRunningFlows();
+//					loadRunningFlows();
 					
 					Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
 					ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
@@ -880,27 +898,23 @@ public class ExecutorManager {
 				
 			}
 
-			if(isActive) {
-				// If it's marked finished, we're good. If not, we fail everything and then mark it finished.
-				if (!isFinished(dsFlow)) {
-					failEverything(dsFlow);
-					executorLoader.updateExecutableFlow(dsFlow);
-				}
-			
-				// Delete the executing reference.
-				if (flow.getEndTime() == -1) {
-					flow.setEndTime(System.currentTimeMillis());
-					executorLoader.updateExecutableFlow(dsFlow);
-				}
-				executorLoader.removeActiveExecutableReference(execId);
-				
-				runningFlows.remove(execId);
-				recentlyFinished.put(execId, dsFlow);
-			} else {
-				runningFlows.remove(execId);
-				recentlyFinished.put(execId, dsFlow);
-				return;
+
+			// If it's marked finished, we're good. If not, we fail everything and then mark it finished.
+			if (!isFinished(dsFlow)) {
+				failEverything(dsFlow);
+				executorLoader.updateExecutableFlow(dsFlow);
+			}
+		
+			// Delete the executing reference.
+			if (flow.getEndTime() == -1) {
+				flow.setEndTime(System.currentTimeMillis());
+				executorLoader.updateExecutableFlow(dsFlow);
 			}
+			executorLoader.removeActiveExecutableReference(execId);
+			
+			runningFlows.remove(execId);
+			recentlyFinished.put(execId, dsFlow);
+
 		} catch (ExecutorManagerException e) {
 			logger.error(e);
 		}
@@ -1043,7 +1057,7 @@ public class ExecutorManager {
 		Status newStatus = flow.getStatus();
 		
 		ExecutionOptions options = flow.getExecutionOptions();
-		if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING) && isActive) {
+		if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
 			// We want to see if we should give an email status on first failure.
 			if (options.getNotifyOnFirstFailure()) {
 				Alerter mailAlerter = alerters.get("email");
@@ -1178,12 +1192,14 @@ public class ExecutorManager {
 		}
 	}
 	
+	@Override
 	public int getExecutableFlows(int projectId, String flowId, int from, int length, List<ExecutableFlow> outputList) throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(projectId, flowId, from, length);
 		outputList.addAll(flows);
 		return executorLoader.fetchNumExecutableFlows(projectId, flowId);
 	}
 
+	@Override
 	public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId, int from, int length, Status status) throws ExecutorManagerException {
 		return executorLoader.fetchFlowHistory(projectId, flowId, from, length, status);
 	}
diff --git a/src/java/azkaban/executor/ExecutorManagerAdapter.java b/src/java/azkaban/executor/ExecutorManagerAdapter.java
new file mode 100644
index 0000000..3ddab0c
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -0,0 +1,133 @@
+package azkaban.executor;
+
+import java.io.IOException;
+import java.lang.Thread.State;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import azkaban.project.Project;
+import azkaban.utils.FileIOUtils.JobMetaData;
+import azkaban.utils.FileIOUtils.LogData;
+
+public interface ExecutorManagerAdapter {
+	
+	public static final String LOCAL_MODE = "local";
+	public static final String REMOTE_MODE = "remote";
+	
+	public static final String REMOTE_EXECUTOR_MANAGER_HOST = "remote.executor.manager.host";
+	public static final String REMOTE_EXECUTOR_MANAGER_PORT = "remote.executor.manager.port";
+	public static final String REMOTE_EXECUTOR_MANAGER_URL = "/executormanager";
+	
+	public static final String ACTION_GET_FLOW_LOG = "getFlowLog";
+	public static final String ACTION_GET_JOB_LOG = "getJobLog";
+	public static final String ACTION_CANCEL_FLOW = "cancelFlow";
+	public static final String ACTION_SUBMIT_FLOW = "submitFlow";
+	public static final String ACTION_RESUME_FLOW = "resumeFlow";
+	public static final String ACTION_PAUSE_FLOW = "pauseFlow";
+	public static final String ACTION_MODIFY_EXECUTION = "modifyExecution"; 
+	public static final String ACTION_UPDATE = "update";
+	public static final String ACTION_GET_JMX = "getJMX";
+	
+	public static final String COMMAND_MODIFY_PAUSE_JOBS = "modifyPauseJobs";
+	public static final String COMMAND_MODIFY_RESUME_JOBS = "modifyResumeJobs";
+	public static final String COMMAND_MODIFY_RETRY_FAILURES = "modifyRetryFailures";
+	public static final String COMMAND_MODIFY_RETRY_JOBS = "modifyRetryJobs";
+	public static final String COMMAND_MODIFY_DISABLE_JOBS = "modifyDisableJobs";
+	public static final String COMMAND_MODIFY_ENABLE_JOBS = "modifyEnableJobs";
+	public static final String COMMAND_MODIFY_CANCEL_JOBS = "modifyCancelJobs";
+	
+	public static final String INFO_JMX_TYPE = "jmxType";
+	public static final String INFO_JMX_DATA = "jmxData";
+	public static final String INFO_ACTION = "action";
+	public static final String INFO_TYPE = "type";
+	public static final String INFO_EXEC_ID = "execId";
+	public static final String INFO_EXEC_FLOW_JSON = "execFlowJson";
+	public static final String INFO_PROJECT_ID = "projectId";
+	public static final String INFO_FLOW_NAME = "flowName";
+	public static final String INFO_JOB_NAME = "jobName";
+	public static final String INFO_OFFSET = "offset";
+	public static final String INFO_LENGTH = "length";
+	public static final String INFO_ATTEMPT = "attempt";
+	public static final String INFO_MODIFY_JOB_IDS = "modifyJobIds";
+	public static final String INFO_MODIFY_COMMAND = "modifyCommand";
+	public static final String INFO_MESSAGE = "message";
+	public static final String INFO_ERROR = "error";
+	public static final String INFO_UPDATE_TIME_LIST = "updateTimeList";
+	public static final String INFO_EXEC_ID_LIST = "execIdList";
+	public static final String INFO_UPDATES = "updates";
+	public static final String INFO_USER_ID = "userId";
+	public static final String INFO_LOG = "logData";
+	
+	public boolean isFlowRunning(int projectId, String flowId);
+	
+	public ExecutableFlow getExecutableFlow(int execId) throws ExecutorManagerException;
+	
+	public List<Integer> getRunningFlows(int projectId, String flowId);
+	
+	public List<ExecutableFlow> getRunningFlows() throws IOException;
+	
+	public List<ExecutableFlow> getRecentlyFinishedFlows();
+	
+	public List<ExecutableFlow> getExecutableFlows(Project project, String flowId, int skip, int size) throws ExecutorManagerException;
+	
+	public List<ExecutableFlow> getExecutableFlows(int skip, int size) throws ExecutorManagerException;
+	
+	public List<ExecutableFlow> getExecutableFlows(String flowIdContains, int skip, int size) throws ExecutorManagerException;
+	
+	public List<ExecutableFlow> getExecutableFlows(String projContain, String flowContain, String userContain, int status, long begin, long end, int skip, int size) throws ExecutorManagerException;
+
+	public int getExecutableFlows(int projectId, String flowId, int from, int length, List<ExecutableFlow> outputList) throws ExecutorManagerException;
+
+	public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId, int from, int length, Status status) throws ExecutorManagerException;
+
+	public List<ExecutableJobInfo> getExecutableJobs(Project project, String jobId, int skip, int size) throws ExecutorManagerException;
+	
+	public int getNumberOfJobExecutions(Project project, String jobId) throws ExecutorManagerException;
+	
+	public int getNumberOfExecutions(Project project, String flowId) throws ExecutorManagerException;
+	
+	public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset, int length) throws ExecutorManagerException;
+	
+	public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
+	
+	public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
+	
+	public void cancelFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException;
+	
+	public void resumeFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException;
+	
+	public void pauseFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException;
+	
+	public void pauseExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException;
+	
+	public void resumeExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException;
+	
+	public void retryFailures(ExecutableFlow exFlow, String userId) throws ExecutorManagerException;
+	
+	public void retryExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException;
+	
+	public void disableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException;
+	
+	public void enableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException;
+	
+	public void cancelExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException;
+
+	public String submitExecutableFlow(ExecutableFlow exflow, String userId) throws ExecutorManagerException;
+	
+	public Map<String, Object> callExecutorJMX(String hostPort, String action, String mBean) throws IOException;
+
+	public void shutdown();
+
+	public Set<String> getAllActiveExecutorServerHosts();
+
+	public State getExecutorManagerThreadState();
+
+	public boolean isExecutorManagerThreadActive();
+
+	public long getLastExecutorManagerThreadCheckTime();
+
+	public Set<? extends String> getPrimaryServerHosts();
+	
+}
diff --git a/src/java/azkaban/executor/ExecutorManagerRemoteAdapter.java b/src/java/azkaban/executor/ExecutorManagerRemoteAdapter.java
new file mode 100644
index 0000000..2b3515f
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutorManagerRemoteAdapter.java
@@ -0,0 +1,761 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.IOException;
+import java.lang.Thread.State;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.log4j.Logger;
+
+import azkaban.project.Project;
+import azkaban.scheduler.ScheduleStatisticManager;
+import azkaban.utils.FileIOUtils.JobMetaData;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+
+/**
+ * Executor manager used to manage the client side job.
+ *
+ */
+public class ExecutorManagerRemoteAdapter implements ExecutorManagerAdapter {
+	private static Logger logger = Logger.getLogger(ExecutorManagerRemoteAdapter.class);
+	private ExecutorLoader executorLoader;
+	private String executorHost;
+	private int executorPort;
+	private String executorManagerHost;
+	private int executorManagerPort;
+	
+	private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows = new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+	private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished = new ConcurrentHashMap<Integer, ExecutableFlow>();
+
+	private UpdaterThread updater;
+	
+	private long lastThreadCheckTime = -1;
+	
+	public ExecutorManagerRemoteAdapter(Props props, ExecutorLoader loader) throws ExecutorManagerException {
+		this.executorLoader = loader;
+		this.loadRunningFlows();
+		
+		executorHost = props.getString("executor.host", "localhost");
+		executorPort = props.getInt("executor.port");
+		
+		executorManagerHost = props.getString(REMOTE_EXECUTOR_MANAGER_HOST);
+		executorManagerPort = props.getInt(REMOTE_EXECUTOR_MANAGER_PORT);
+		
+		updater = new UpdaterThread();
+		updater.start();
+
+	}
+	
+	@Override
+	public State getExecutorManagerThreadState() {
+		List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+		params.add(new Pair<String, String>(INFO_JMX_TYPE, "getExecutorManagerThreadState"));
+		Map<String, Object> response;
+		try {
+			response = callRemoteExecutorManager(ACTION_GET_JMX, "azkaban", params);
+			return (State) response.get(INFO_JMX_DATA);
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+			return null;
+		}
+	}
+	
+	@Override
+	public boolean isExecutorManagerThreadActive() {
+		List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+		params.add(new Pair<String, String>(INFO_JMX_TYPE, "isExecutorManagerThreadActive"));
+		Map<String, Object> response;
+		try {
+			response = callRemoteExecutorManager(ACTION_GET_JMX, "azkaban", params);
+			return (Boolean) response.get(INFO_JMX_DATA);
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+			return false;
+		}		
+	}
+	
+	@Override
+	public long getLastExecutorManagerThreadCheckTime() {
+		List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+		params.add(new Pair<String, String>(INFO_JMX_TYPE, "getLastExecutorManagerThreadCheckTime"));
+		Map<String, Object> response;
+		try {
+			response = callRemoteExecutorManager(ACTION_GET_JMX, "azkaban", params);
+			return (Long) response.get(INFO_JMX_DATA);
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+			return -1;
+		}	
+	}
+	
+	@Override
+	public Set<String> getPrimaryServerHosts() {
+		// Only one for now. More probably later.
+		HashSet<String> ports = new HashSet<String>();
+		ports.add(executorHost + ":" + executorPort);
+		return ports;
+	}
+	
+	@Override
+	public Set<String> getAllActiveExecutorServerHosts() {
+		// Includes non primary server/hosts
+		HashSet<String> ports = new HashSet<String>();
+		ports.add(executorHost + ":" + executorPort);
+		for(Pair<ExecutionReference, ExecutableFlow> running: runningFlows.values()) {
+			ExecutionReference ref = running.getFirst();
+			ports.add(ref.getHost() + ":" + ref.getPort());
+		}
+		
+		return ports;
+	}
+	
+//	private ExecutableFlow fetchExecutableFlow(int execId) throws ExecutorManagerException {
+//		ExecutableFlow exflow = executorLoader.fetchExecutableFlow(execId);
+//		return exflow;
+//	}
+	
+	private void loadRunningFlows() throws ExecutorManagerException {
+		runningFlows.putAll(executorLoader.fetchActiveFlows());
+	}
+	
+	@Override
+	public List<Integer> getRunningFlows(int projectId, String flowId) {
+		ArrayList<Integer> executionIds = new ArrayList<Integer>();
+		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+			if (ref.getSecond().getFlowId().equals(flowId)) {
+				executionIds.add(ref.getFirst().getExecId());
+			}
+		}
+		return executionIds;
+	}
+	
+	@Override
+	public boolean isFlowRunning(int projectId, String flowId) {
+		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+			if (ref.getSecond().getProjectId() == projectId && ref.getSecond().getFlowId().equals(flowId)) {
+				return true;
+			}
+		}
+		return false;
+	}
+	
+	@Override
+	public ExecutableFlow getExecutableFlow(int execId) throws ExecutorManagerException {
+		Pair<ExecutionReference, ExecutableFlow> active = runningFlows.get(execId);
+		if (active == null) {
+			return executorLoader.fetchExecutableFlow(execId);
+		}
+		return active.getSecond();
+	}
+	
+	@Override
+	public List<ExecutableFlow> getRunningFlows() {
+		ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
+		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+			flows.add(ref.getSecond());
+		}
+		return flows;
+	}
+	
+	@Override
+	public List<ExecutableFlow> getRecentlyFinishedFlows() {
+		return new ArrayList<ExecutableFlow>(recentlyFinished.values());
+	}
+	
+	@Override
+	public List<ExecutableFlow> getExecutableFlows(Project project, String flowId, int skip, int size) throws ExecutorManagerException {
+		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
+		return flows;
+	}
+	
+	@Override
+	public List<ExecutableFlow> getExecutableFlows(int skip, int size) throws ExecutorManagerException {
+		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
+		return flows;
+	}
+	
+	@Override
+	public List<ExecutableFlow> getExecutableFlows(String flowIdContains, int skip, int size) throws ExecutorManagerException {
+		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
+		return flows;
+	}
+
+	@Override
+	public List<ExecutableFlow> getExecutableFlows(String projContain, String flowContain, String userContain, int status, long begin, long end, int skip, int size) throws ExecutorManagerException {
+		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(projContain, flowContain, userContain, status, begin, end , skip, size);
+		return flows;
+	}
+	
+	@Override
+	public List<ExecutableJobInfo> getExecutableJobs(Project project, String jobId, int skip, int size) throws ExecutorManagerException {
+		List<ExecutableJobInfo> nodes = executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
+		return nodes;
+	}
+	
+	@Override
+	public int getNumberOfJobExecutions(Project project, String jobId) throws ExecutorManagerException{
+		return executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
+	}
+	
+	@Override
+	public int getNumberOfExecutions(Project project, String flowId) throws ExecutorManagerException{
+		return executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
+	}
+	
+	@Override
+	public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset, int length) throws ExecutorManagerException {
+		Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+		if (pair != null) {
+			Pair<String,String> typeParam = new Pair<String,String>("type", "flow");
+			Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
+			Pair<String,String> lengthParam = new Pair<String,String>("length", String.valueOf(length));
+			
+			@SuppressWarnings("unchecked")
+			Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, offsetParam, lengthParam);
+			return LogData.createLogDataFromObject(result);
+		}
+		else {
+			LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset, length);
+			return value;
+		}
+	}
+	
+	@Override
+	public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
+		Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+		if (pair != null) {
+			Pair<String,String> typeParam = new Pair<String,String>("type", "job");
+			Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
+			Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
+			Pair<String,String> lengthParam = new Pair<String,String>("length", String.valueOf(length));
+			Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
+			
+			@SuppressWarnings("unchecked")
+			Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+			return LogData.createLogDataFromObject(result);
+		}
+		else {
+			LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt, offset, length);
+			return value;
+		}
+	}
+	
+	@Override
+	public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
+		Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+		if (pair != null) {
+
+			Pair<String,String> typeParam = new Pair<String,String>("type", "job");
+			Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
+			Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
+			Pair<String,String> lengthParam = new Pair<String,String>("length", String.valueOf(length));
+			Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
+			
+			@SuppressWarnings("unchecked")
+			Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+			return JobMetaData.createJobMetaDataFromObject(result);
+		}
+		else {
+			return null;
+		}
+	}
+	
+	@Override
+	public void cancelFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
+		List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+		params.add(new Pair<String, String>(INFO_EXEC_ID, String.valueOf(exFlow.getExecutionId())));
+		Map<String, Object> response;
+		try {
+			response = callRemoteExecutorManager(ACTION_CANCEL_FLOW, userId, params);
+			if(response.containsKey(INFO_ERROR)) {
+				throw new ExecutorManagerException((String)response.get(INFO_ERROR));
+			}
+		} catch (Exception e) {
+			throw new ExecutorManagerException(e);
+		}	
+	}
+	
+	@Override
+	public void resumeFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
+		List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+		params.add(new Pair<String, String>(INFO_EXEC_ID, String.valueOf(exFlow.getExecutionId())));
+		Map<String, Object> response;
+		try {
+			response = callRemoteExecutorManager(ACTION_RESUME_FLOW, userId, params);
+			if(response.containsKey(INFO_ERROR)) {
+				throw new ExecutorManagerException((String)response.get(INFO_ERROR));
+			}
+		} catch (Exception e) {
+			throw new ExecutorManagerException(e);
+		}	
+	}
+	
+	@Override
+	public void pauseFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
+		List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+		params.add(new Pair<String, String>(INFO_EXEC_ID, String.valueOf(exFlow.getExecutionId())));
+		Map<String, Object> response;
+		try {
+			response = callRemoteExecutorManager(ACTION_PAUSE_FLOW, userId, params);
+			if(response.containsKey(INFO_ERROR)) {
+				throw new ExecutorManagerException((String)response.get(INFO_ERROR));
+			}
+		} catch (Exception e) {
+			throw new ExecutorManagerException(e);
+		}	
+	}
+	
+	@Override
+	public void pauseExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId, jobIds);
+	}
+	
+	@Override
+	public void resumeExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId, jobIds);
+	}
+	
+	@Override
+	public void retryFailures(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
+		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
+	}
+	
+	@Override
+	public void retryExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId, jobIds);
+	}
+	
+	@Override
+	public void disableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId, jobIds);
+	}
+	
+	@Override
+	public void enableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId, jobIds);
+	}
+	
+	@Override
+	public void cancelExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId, jobIds);
+	}
+	
+	@SuppressWarnings("unchecked")
+	private Map<String, Object> modifyExecutingJobs(ExecutableFlow exFlow, String command, String userId, String ... jobIds) throws ExecutorManagerException {
+		synchronized(exFlow) {
+			Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+			if (pair == null) {
+				throw new ExecutorManagerException("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
+			}
+			
+			Map<String, Object> response = null;
+			if (jobIds != null && jobIds.length > 0) {
+				for (String jobId: jobIds) {
+					if (!jobId.isEmpty()) {
+						ExecutableNode node = exFlow.getExecutableNode(jobId);
+						if (node == null) {
+							throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + exFlow.getExecutionId() + ".");
+						}
+					}
+				}
+				String ids = StringUtils.join(jobIds, ',');
+				response = callExecutorServer(
+						pair.getFirst(), 
+						ConnectorParams.MODIFY_EXECUTION_ACTION, 
+						userId, 
+						new Pair<String,String>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command), 
+						new Pair<String,String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
+			}
+			else {
+				response = callExecutorServer(
+						pair.getFirst(), 
+						ConnectorParams.MODIFY_EXECUTION_ACTION, 
+						userId, 
+						new Pair<String,String>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
+			}
+			
+			return response;
+		}
+	}
+	
+	@Override
+	public String submitExecutableFlow(ExecutableFlow exflow, String userId) throws ExecutorManagerException {
+		List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+		params.add(new Pair<String, String>(INFO_EXEC_FLOW_JSON, JSONUtils.toJSON(exflow.toObject())));
+		Map<String, Object> response;
+		try {
+			response = callRemoteExecutorManager(ACTION_SUBMIT_FLOW, userId, params);
+			if(response.containsKey(INFO_ERROR)) {
+				throw new ExecutorManagerException((String)response.get(INFO_ERROR));
+			}
+			String message = (String) response.get(INFO_MESSAGE);
+			return message;
+		} catch (Exception e) {
+			throw new ExecutorManagerException(e);
+		}	
+	}
+	
+	private Map<String, Object> callExecutorServer(ExecutionReference ref, String action, Pair<String,String> ... params) throws ExecutorManagerException {
+		try {
+			return callExecutorServer(ref.getHost(), ref.getPort(), action, ref.getExecId(), null, params);
+		} catch (IOException e) {
+			throw new ExecutorManagerException(e);
+		}
+	}
+	
+	private Map<String, Object> callExecutorServer(ExecutionReference ref, String action, String user, Pair<String,String> ... params) throws ExecutorManagerException {
+		try {
+			return callExecutorServer(ref.getHost(), ref.getPort(), action, ref.getExecId(), user, params);
+		} catch (IOException e) {
+			throw new ExecutorManagerException(e);
+		}
+	}
+	
+	private Map<String, Object> callExecutorServer(String host, int port, String action, Integer executionId, String user, Pair<String,String> ... params) throws IOException {
+		URIBuilder builder = new URIBuilder();
+		builder.setScheme("http")
+			.setHost(host)
+			.setPort(port)
+			.setPath("/executor");
+
+		builder.setParameter(ConnectorParams.ACTION_PARAM, action);
+		
+		if (executionId != null) {
+			builder.setParameter(ConnectorParams.EXECID_PARAM,String.valueOf(executionId));
+		}
+		
+		if (user != null) {
+			builder.setParameter(ConnectorParams.USER_PARAM, user);
+		}
+		
+		if (params != null) {
+			for (Pair<String, String> pair: params) {
+				builder.setParameter(pair.getFirst(), pair.getSecond());
+			}
+		}
+
+		URI uri = null;
+		try {
+			uri = builder.build();
+		} catch (URISyntaxException e) {
+			throw new IOException(e);
+		}
+		
+		ResponseHandler<String> responseHandler = new BasicResponseHandler();
+		
+		HttpClient httpclient = new DefaultHttpClient();
+		HttpGet httpget = new HttpGet(uri);
+		String response = null;
+		try {
+			response = httpclient.execute(httpget, responseHandler);
+		} catch (IOException e) {
+			throw e;
+		}
+		finally {
+			httpclient.getConnectionManager().shutdown();
+		}
+		
+		@SuppressWarnings("unchecked")
+		Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+		String error = (String)jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
+		if (error != null) {
+			throw new IOException(error);
+		}
+		
+		return jsonResponse;
+	}
+	
+	@Override
+	public Map<String, Object> callExecutorJMX(String hostPort, String action, String mBean) throws IOException {
+		URIBuilder builder = new URIBuilder();
+		
+		String[] hostPortSplit = hostPort.split(":");
+		builder.setScheme("http")
+			.setHost(hostPortSplit[0])
+			.setPort(Integer.parseInt(hostPortSplit[1]))
+			.setPath("/jmx");
+
+		builder.setParameter(action, "");
+		if (mBean != null) {
+			builder.setParameter(ConnectorParams.JMX_MBEAN, mBean);
+		}
+
+		URI uri = null;
+		try {
+			uri = builder.build();
+		} catch (URISyntaxException e) {
+			throw new IOException(e);
+		}
+		
+		ResponseHandler<String> responseHandler = new BasicResponseHandler();
+		
+		HttpClient httpclient = new DefaultHttpClient();
+		HttpGet httpget = new HttpGet(uri);
+		String response = null;
+		try {
+			response = httpclient.execute(httpget, responseHandler);
+		} catch (IOException e) {
+			throw e;
+		}
+		finally {
+			httpclient.getConnectionManager().shutdown();
+		}
+		
+		@SuppressWarnings("unchecked")
+		Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+		String error = (String)jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
+		if (error != null) {
+			throw new IOException(error);
+		}
+		return jsonResponse;
+	}
+	
+	private Map<String, Object> callRemoteExecutorManager(String action, String user, List<Pair<String,String>> params) throws IOException {
+		URIBuilder builder = new URIBuilder();
+		builder.setScheme("http")
+			.setHost(executorManagerHost)
+			.setPort(executorManagerPort)
+			.setPath(ExecutorManagerServlet.URL);
+
+		builder.setParameter(ConnectorParams.ACTION_PARAM, action);
+		
+		if (user != null) {
+			builder.setParameter(ConnectorParams.USER_PARAM, user);
+		}
+		
+		if (params != null) {
+			for (Pair<String, String> pair: params) {
+				builder.setParameter(pair.getFirst(), pair.getSecond());
+			}
+		}
+
+		URI uri = null;
+		try {
+			uri = builder.build();
+		} catch (URISyntaxException e) {
+			throw new IOException(e);
+		}
+		
+		ResponseHandler<String> responseHandler = new BasicResponseHandler();
+		
+		HttpClient httpclient = new DefaultHttpClient();
+		HttpGet httpget = new HttpGet(uri);
+		String response = null;
+		try {
+			response = httpclient.execute(httpget, responseHandler);
+		} catch (IOException e) {
+			throw e;
+		}
+		finally {
+			httpclient.getConnectionManager().shutdown();
+		}
+		
+		@SuppressWarnings("unchecked")
+		Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+		String error = (String)jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
+		if (error != null) {
+			throw new IOException(error);
+		}
+		
+		return jsonResponse;
+	}
+	
+	@Override
+	public void shutdown() {
+		updater.shutdown();
+	}
+	
+	private class UpdaterThread extends Thread {
+		private boolean shutdown = false;
+
+		public UpdaterThread() {
+			this.setName("ExecutorManagerRemoteUpdaterThread");
+		}
+		
+		// 10 mins recently finished threshold.
+		private long recentlyFinishedLifetimeMs = 600000;
+		private int waitTimeIdleMs = 2000;
+		private int waitTimeMs = 500;
+		
+		private void shutdown() {
+			shutdown = true;
+		}
+		
+		@SuppressWarnings("unchecked")
+		public void run() {
+			while(!shutdown) {
+				try {
+					lastThreadCheckTime = System.currentTimeMillis();
+					
+//					loadRunningFlows();
+					List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+					ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
+					
+					List<Long> updateTimesList = new ArrayList<Long>();
+					List<Integer> executionIdsList = new ArrayList<Integer>();
+				
+					// We pack the parameters of the same host together before we query.
+					fillUpdateTimeAndExecId(executionIdsList, updateTimesList);
+					
+					params.add(new Pair<String, String>(INFO_UPDATE_TIME_LIST, JSONUtils.toJSON(updateTimesList)));
+					params.add(new Pair<String, String>(INFO_EXEC_ID_LIST, JSONUtils.toJSON(executionIdsList)));
+					
+					Map<String, Object> results = null;
+					try {
+						results = callRemoteExecutorManager(ACTION_UPDATE, "azkaban", params);
+					} catch (IOException e) {
+						logger.error(e);
+					}
+					
+					// We gets results
+					if (results != null) {
+						List<Map<String,Object>> executionUpdates = (List<Map<String,Object>>)results.get(INFO_UPDATES);
+						for (Map<String,Object> updateMap: executionUpdates) {
+							try {
+								ExecutableFlow flow = updateExecution(updateMap);
+								if (isFinished(flow)) {
+									finishedFlows.add(flow);
+								}
+							} catch (ExecutorManagerException e) {
+								logger.error(e);
+							}
+						}
+
+						evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
+						// Add new finished
+						for (ExecutableFlow flow: finishedFlows) {
+							if(flow.getScheduleId() >= 0 && flow.getStatus() == Status.SUCCEEDED){
+								ScheduleStatisticManager.invalidateCache(flow.getScheduleId());
+							}
+							recentlyFinished.put(flow.getExecutionId(), flow);
+						}
+						
+
+					}
+					
+					synchronized(this) {
+						try {
+							if (runningFlows.size() > 0) {
+								this.wait(waitTimeMs);
+							}
+							else {
+								this.wait(waitTimeIdleMs);
+							}
+						} catch (InterruptedException e) {
+						}
+					}
+				}
+				catch (Exception e) {
+					logger.error(e);
+				}
+			}
+		}
+	}
+	
+	private void evictOldRecentlyFinished(long ageMs) {
+		ArrayList<Integer> recentlyFinishedKeys = new ArrayList<Integer>(recentlyFinished.keySet());
+		long oldAgeThreshold = System.currentTimeMillis() - ageMs;
+		for (Integer key: recentlyFinishedKeys) {
+			ExecutableFlow flow = recentlyFinished.get(key);
+			
+			if (flow.getEndTime() < oldAgeThreshold) {
+				// Evict
+				recentlyFinished.remove(key);
+			}
+		}
+	}
+	
+	private ExecutableFlow updateExecution(Map<String,Object> updateData) throws ExecutorManagerException {
+		
+		Integer execId = (Integer)updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
+		if (execId == null) {
+			throw new ExecutorManagerException("Response is malformed. Need exec id to update.");
+		}
+		
+		Pair<ExecutionReference, ExecutableFlow> refPair = this.runningFlows.get(execId);
+		if (refPair == null) {
+			throw new ExecutorManagerException("No running flow found with the execution id.");
+		}
+		
+		ExecutionReference ref = refPair.getFirst();
+		ExecutableFlow flow = refPair.getSecond();
+		if (updateData.containsKey("error")) {
+			// The flow should be finished here.
+			throw new ExecutorManagerException((String)updateData.get("error"), flow);
+		}
+		
+		// Reset errors.
+		ref.setNextCheckTime(0);
+		ref.setNumErrors(0);
+		flow.applyUpdateObject(updateData);
+	
+		return flow;
+	}
+	
+	public boolean isFinished(ExecutableFlow flow) {
+		switch(flow.getStatus()) {
+		case SUCCEEDED:
+		case FAILED:
+		case KILLED:
+			return true;
+		default:
+			return false;
+		}
+	}
+	
+	private void fillUpdateTimeAndExecId(List<Integer> executionIds, List<Long> updateTimes) {
+		for (Pair<ExecutionReference, ExecutableFlow> flow: runningFlows.values()) {
+			executionIds.add(flow.getSecond().getExecutionId());
+			updateTimes.add(flow.getSecond().getUpdateTime());
+		}
+	}
+	
+	@Override
+	public int getExecutableFlows(int projectId, String flowId, int from, int length, List<ExecutableFlow> outputList) throws ExecutorManagerException {
+		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(projectId, flowId, from, length);
+		outputList.addAll(flows);
+		return executorLoader.fetchNumExecutableFlows(projectId, flowId);
+	}
+
+	@Override
+	public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId, int from, int length, Status status) throws ExecutorManagerException {
+		return executorLoader.fetchFlowHistory(projectId, flowId, from, length, status);
+	}
+
+
+}
diff --git a/src/java/azkaban/executor/ExecutorManagerServlet.java b/src/java/azkaban/executor/ExecutorManagerServlet.java
new file mode 100644
index 0000000..ab8d5b2
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutorManagerServlet.java
@@ -0,0 +1,225 @@
+package azkaban.executor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.JSONUtils;
+import azkaban.webapp.servlet.AbstractServiceServlet;
+
+
+public class ExecutorManagerServlet extends AbstractServiceServlet {
+	private final ExecutorManagerAdapter executorManager;
+	
+	public static final String URL = "executorManager";
+	private static final long serialVersionUID = 1L;
+	private static final Logger logger = Logger.getLogger(ExecutorManagerServlet.class);
+	
+	public ExecutorManagerServlet(ExecutorManagerAdapter executorManager) {
+		this.executorManager = executorManager;
+	}
+	
+	@Override
+	public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+		HashMap<String,Object> respMap= new HashMap<String,Object>();
+		//logger.info("ExecutorServer called by " + req.getRemoteAddr());
+		try {
+			if (!hasParam(req, ExecutorManagerAdapter.INFO_ACTION)) {
+				logger.error("Parameter action not set");
+				respMap.put("error", "Parameter action not set");
+			}
+			else {
+				String action = getParam(req, ExecutorManagerAdapter.INFO_ACTION);
+				if (action.equals(ExecutorManagerAdapter.ACTION_UPDATE)) {
+					//logger.info("Updated called");
+					handleAjaxUpdateRequest(req, respMap);
+				}
+				else {
+					int execid = Integer.parseInt(getParam(req, ExecutorManagerAdapter.INFO_EXEC_ID));
+					String user = getParam(req, ExecutorManagerAdapter.INFO_USER_ID, null);
+					
+					logger.info("User " + user + " has called action " + action + " on " + execid);
+					if (action.equals(ExecutorManagerAdapter.ACTION_GET_FLOW_LOG)) { 
+						handleFetchFlowLogEvent(execid, req, resp, respMap);
+					} else if (action.equals(ExecutorManagerAdapter.ACTION_GET_JOB_LOG)) {
+						handleFetchJobLogEvent(execid, req, resp, respMap);
+					}
+					else if (action.equals(ExecutorManagerAdapter.ACTION_SUBMIT_FLOW)) {
+						handleAjaxSubmitFlow(req, respMap, execid);
+					}
+					else if (action.equals(ExecutorManagerAdapter.ACTION_CANCEL_FLOW)) {
+						logger.info("Cancel called.");
+						handleAjaxCancelFlow(respMap, execid, user);
+					}
+					else if (action.equals(ExecutorManagerAdapter.ACTION_PAUSE_FLOW)) {
+						logger.info("Paused called.");
+						handleAjaxPauseFlow(respMap, execid, user);
+					}
+					else if (action.equals(ExecutorManagerAdapter.ACTION_RESUME_FLOW)) {
+						logger.info("Resume called.");
+						handleAjaxResumeFlow(respMap, execid, user);
+					}
+					else if (action.equals(ExecutorManagerAdapter.ACTION_MODIFY_EXECUTION)) {
+						logger.info("Modify Execution Action");
+						handleModifyExecution(respMap, execid, user, req);
+					}
+					else {
+						logger.error("action: '" + action + "' not supported.");
+						respMap.put("error", "action: '" + action + "' not supported.");
+					}
+				}
+			}
+		} catch (Exception e) {
+			logger.error(e);
+			respMap.put(ExecutorManagerAdapter.INFO_ERROR, e.getMessage());
+		}
+		writeJSON(resp, respMap);
+		resp.flushBuffer();
+	}
+
+	private void handleModifyExecution(HashMap<String, Object> respMap,
+			int execid, String user, HttpServletRequest req) {
+		if (!hasParam(req, ExecutorManagerAdapter.INFO_MODIFY_COMMAND)) {
+			respMap.put(ExecutorManagerAdapter.INFO_ERROR, "Modification command not set.");
+			return;
+		}
+
+		try {
+			String modificationType = getParam(req, ExecutorManagerAdapter.INFO_MODIFY_COMMAND);
+			ExecutableFlow exflow = executorManager.getExecutableFlow(execid);
+			if (ExecutorManagerAdapter.COMMAND_MODIFY_RETRY_FAILURES.equals(modificationType)) {
+				executorManager.retryFailures(exflow, user);
+			}
+			else {
+//				String modifiedJobList = getParam(req, MODIFY_JOBS_LIST);
+//				String[] jobIds = modifiedJobList.split("\\s*,\\s*");
+//				
+//				if (MODIFY_RETRY_JOBS.equals(modificationType)) {
+//				}
+//				else if (MODIFY_CANCEL_JOBS.equals(modificationType)) {
+//				}
+//				else if (MODIFY_DISABLE_JOBS.equals(modificationType)) {
+//				}
+//				else if (MODIFY_ENABLE_JOBS.equals(modificationType)) {
+//				}
+//				else if (MODIFY_PAUSE_JOBS.equals(modificationType)) {
+//				}
+//				else if (MODIFY_RESUME_JOBS.equals(modificationType)) {
+//				}
+			}
+		} catch (Exception e) {
+			respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+		}
+	}
+
+	private void handleAjaxResumeFlow(HashMap<String, Object> respMap, int execid, String user) {
+		try {
+			ExecutableFlow exFlow = executorManager.getExecutableFlow(execid);
+			executorManager.resumeFlow(exFlow, user);
+		} catch (Exception e) {
+			respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+		}
+		
+	}
+
+	private void handleAjaxPauseFlow(HashMap<String, Object> respMap, int execid, String user) {
+		try {
+			ExecutableFlow exFlow = executorManager.getExecutableFlow(execid);
+			executorManager.pauseFlow(exFlow, user);
+		} catch (Exception e) {
+			respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+		}
+	}
+
+	private void handleAjaxCancelFlow(HashMap<String, Object> respMap, int execid, String user) {
+		try {
+			ExecutableFlow exFlow = executorManager.getExecutableFlow(execid);
+			executorManager.cancelFlow(exFlow, user);
+		} catch (Exception e) {
+			respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+		}
+	}
+
+	private void handleAjaxSubmitFlow(HttpServletRequest req, HashMap<String, Object> respMap, int execid) {
+		try{
+			String execFlowJson = getParam(req, ExecutorManagerAdapter.INFO_EXEC_FLOW_JSON);
+			ExecutableFlow exflow = ExecutableFlow.createExecutableFlowFromObject(JSONUtils.parseJSONFromString(execFlowJson));
+			String user = getParam(req, ExecutorManagerAdapter.INFO_USER_ID);
+			executorManager.submitExecutableFlow(exflow, user);
+			respMap.put(ExecutorManagerAdapter.INFO_EXEC_ID, exflow.getExecutionId());
+		} catch (Exception e) {
+			e.printStackTrace();
+			respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+		}
+	}
+
+	private void handleFetchJobLogEvent(int execid, HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> respMap) {
+		try{
+			ExecutableFlow exFlow = executorManager.getExecutableFlow(execid);
+			String jobId = getParam(req, ExecutorManagerAdapter.INFO_JOB_NAME);
+			int offset = getIntParam(req, ExecutorManagerAdapter.INFO_OFFSET);
+			int length = getIntParam(req, ExecutorManagerAdapter.INFO_LENGTH);
+			int attempt = getIntParam(req, ExecutorManagerAdapter.INFO_ATTEMPT);
+			LogData log = executorManager.getExecutionJobLog(exFlow, jobId, offset, length, attempt);
+			respMap.put(ExecutorManagerAdapter.INFO_LOG, JSONUtils.toJSON(log.toObject()));
+		}  catch (Exception e) {
+			e.printStackTrace();
+			respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+		}
+	}
+
+	private void handleFetchFlowLogEvent(int execid, HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> respMap) {
+		try{
+			ExecutableFlow exFlow = executorManager.getExecutableFlow(execid);
+			int offset = getIntParam(req, ExecutorManagerAdapter.INFO_OFFSET);
+			int length = getIntParam(req, ExecutorManagerAdapter.INFO_LENGTH);
+			LogData log = executorManager.getExecutableFlowLog(exFlow, offset, length);
+			respMap.put(ExecutorManagerAdapter.INFO_LOG, JSONUtils.toJSON(log.toObject()));
+		}  catch (Exception e) {
+			e.printStackTrace();
+			respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+		}
+		
+	}
+
+	@SuppressWarnings("unchecked")
+	private void handleAjaxUpdateRequest(HttpServletRequest req, HashMap<String, Object> respMap) {
+		try {
+			ArrayList<Object> updateTimesList = (ArrayList<Object>)JSONUtils.parseJSONFromString(getParam(req, ExecutorManagerAdapter.INFO_UPDATE_TIME_LIST));
+			ArrayList<Object> execIDList = (ArrayList<Object>)JSONUtils.parseJSONFromString(getParam(req, ExecutorManagerAdapter.INFO_EXEC_ID_LIST));
+			
+			ArrayList<Object> updateList = new ArrayList<Object>();
+			for (int i = 0; i < execIDList.size(); ++i) {
+				long updateTime = JSONUtils.getLongFromObject(updateTimesList.get(i));
+				int execId = (Integer)execIDList.get(i);
+				
+				ExecutableFlow flow = executorManager.getExecutableFlow(execId);
+				if (flow == null) {
+					Map<String, Object> errorResponse = new HashMap<String,Object>();
+					errorResponse.put(ExecutorManagerAdapter.INFO_ERROR, "Flow does not exist");
+					errorResponse.put(ExecutorManagerAdapter.INFO_EXEC_ID, execId);
+					updateList.add(errorResponse);
+					continue;
+				}
+				
+				if (flow.getUpdateTime() > updateTime) {
+					updateList.add(flow.toUpdateObject(updateTime));
+				}
+			}
+			
+			respMap.put(ExecutorManagerAdapter.INFO_UPDATES, updateList);
+		}  catch (Exception e) {
+			e.printStackTrace();
+			respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+		}		
+	}
+	
+}
diff --git a/src/java/azkaban/jmx/JmxExecutorManagerAdapter.java b/src/java/azkaban/jmx/JmxExecutorManagerAdapter.java
new file mode 100644
index 0000000..67edbc9
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxExecutorManagerAdapter.java
@@ -0,0 +1,47 @@
+package azkaban.jmx;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
+
+public class JmxExecutorManagerAdapter implements JmxExecutorManagerAdapterMBean {
+	private ExecutorManagerAdapter manager;
+
+	public JmxExecutorManagerAdapter(ExecutorManagerAdapter manager) {
+		this.manager = manager;
+	}
+
+	@Override
+	public int getNumRunningFlows() {
+		try {
+			return this.manager.getRunningFlows().size();
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+			return 0;
+		}
+	}
+
+	@Override
+	public String getExecutorManagerThreadState() {
+		return manager.getExecutorManagerThreadState().toString();
+	}
+
+	@Override
+	public boolean isExecutorManagerThreadActive() {
+		return manager.isExecutorManagerThreadActive();
+	}
+
+	@Override
+	public Long getLastExecutorManagerThreadCheckTime() {
+		return manager.getLastExecutorManagerThreadCheckTime();
+	}
+	
+	@Override 
+	public List<String> getPrimaryExecutorHostPorts() {
+		return new ArrayList<String>(manager.getPrimaryServerHosts());
+	}
+}
diff --git a/src/java/azkaban/jmx/JmxExecutorManagerAdapterMBean.java b/src/java/azkaban/jmx/JmxExecutorManagerAdapterMBean.java
new file mode 100644
index 0000000..b9742cb
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxExecutorManagerAdapterMBean.java
@@ -0,0 +1,20 @@
+package azkaban.jmx;
+
+import java.util.List;
+
+public interface JmxExecutorManagerAdapterMBean {
+	@DisplayName("OPERATION: getNumRunningFlows")
+	public int getNumRunningFlows();
+	
+	@DisplayName("OPERATION: getExecutorThreadState")
+	public String getExecutorManagerThreadState();
+
+	@DisplayName("OPERATION: isThreadActive")
+	public boolean isExecutorManagerThreadActive();
+
+	@DisplayName("OPERATION: getLastThreadCheckTime")
+	public Long getLastExecutorManagerThreadCheckTime();
+
+	@DisplayName("OPERATION: getPrimaryExecutorHostPorts")
+	public List<String> getPrimaryExecutorHostPorts();
+}
diff --git a/src/java/azkaban/jmx/JmxTriggerManager.java b/src/java/azkaban/jmx/JmxTriggerManager.java
index 0bacf7b..dbee790 100644
--- a/src/java/azkaban/jmx/JmxTriggerManager.java
+++ b/src/java/azkaban/jmx/JmxTriggerManager.java
@@ -6,47 +6,56 @@ import java.util.List;
 import org.joda.time.DateTime;
 
 import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerAdapter;
+import azkaban.trigger.TriggerManagerAdapter.TriggerJMX;
 
 public class JmxTriggerManager implements JmxTriggerManagerMBean {
-	private TriggerManager manager;
+	private TriggerJMX jmxStats;
 
-	public JmxTriggerManager(TriggerManager manager) {
-		this.manager = manager;
+	public JmxTriggerManager(TriggerManagerAdapter manager) {
+		this.jmxStats = manager.getJMX();
 	}
 
 	@Override
-	public String getLastThreadCheckTime() {
-		return new DateTime(manager.getLastThreadCheckTime()).toString();
+	public String getLastRunnerThreadCheckTime() {
+		return new DateTime(jmxStats.getLastRunnerThreadCheckTime()).toString();
 	}
 
 	@Override
-	public boolean isThreadActive() {
-		return manager.isThreadActive();
+	public boolean isRunnerThreadActive() {
+		return jmxStats.isRunnerThreadActive();
 	}
 
 	@Override
-	public List<String> getPrimaryTriggerHostPorts() {
-		return new ArrayList<String>(manager.getPrimaryServerHosts());
+	public String getPrimaryTriggerHostPort() {
+		return jmxStats.getPrimaryServerHost();
 	}
 
-	@Override
-	public List<String> getAllTriggerHostPorts() {
-		return new ArrayList<String>(manager.getAllActiveTriggerServerHosts());
-	}
+//	@Override
+//	public List<String> getAllTriggerHostPorts() {
+//		return new ArrayList<String>(manager.getAllActiveTriggerServerHosts());
+//	}
 
 	@Override
 	public int getNumTriggers() {
-		return manager.getNumTriggers();
+		return jmxStats.getNumTriggers();
 	}
 
 	@Override
 	public String getTriggerSources() {
-		return manager.getTriggerSources();
+		return jmxStats.getTriggerSources();
 	}
 
 	@Override
 	public String getTriggerIds() {
-		return manager.getTriggerIds();
+		return jmxStats.getTriggerIds();
 	}
+
+	@Override
+	public long getScannerIdleTime() {
+		return jmxStats.getScannerIdleTime();
+	}
+	
+	
 	
 }
diff --git a/src/java/azkaban/jmx/JmxTriggerManagerMBean.java b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
index af3cc9a..6302885 100644
--- a/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
@@ -5,16 +5,16 @@ import java.util.List;
 public interface JmxTriggerManagerMBean {	
 	
 	@DisplayName("OPERATION: getLastThreadCheckTime")
-	public String getLastThreadCheckTime();
+	public String getLastRunnerThreadCheckTime();
 
 	@DisplayName("OPERATION: isThreadActive")
-	public boolean isThreadActive();
+	public boolean isRunnerThreadActive();
 
-	@DisplayName("OPERATION: getPrimaryTriggerHostPorts")
-	public List<String> getPrimaryTriggerHostPorts();
+	@DisplayName("OPERATION: getPrimaryTriggerHostPort")
+	public String getPrimaryTriggerHostPort();
 	
-	@DisplayName("OPERATION: getAllTriggerHostPorts")
-	public List<String> getAllTriggerHostPorts();
+//	@DisplayName("OPERATION: getAllTriggerHostPorts")
+//	public List<String> getAllTriggerHostPorts();
 	
 	@DisplayName("OPERATION: getNumTriggers")
 	public int getNumTriggers();
@@ -24,4 +24,7 @@ public interface JmxTriggerManagerMBean {
 	
 	@DisplayName("OPERATION: getTriggerIds")
 	public String getTriggerIds();
+	
+	@DisplayName("OPERATION: getScannerIdleTime")
+	public long getScannerIdleTime();
 }
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index eea9c46..e4a4ab1 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -37,7 +37,6 @@ public class ProjectManager {
 	private final int projectVersionRetention;
 	private final boolean creatorDefaultPermissions;
 	
-	private TriggerManager triggerManager;
 	private boolean loadTriggerFromFile = false;
 	
 	public ProjectManager(ProjectLoader loader, Props props) {
@@ -58,10 +57,6 @@ public class ProjectManager {
 		loadAllProjects();
 	}
 
-	public void setTriggerManager(TriggerManager triggerManager) {
-		this.triggerManager = triggerManager;
-	}
-	
 	public void setLoadTriggerFromFile(boolean enable) {
 		this.loadTriggerFromFile = enable;
 	}
@@ -354,21 +349,22 @@ public class ProjectManager {
 			logger.info("Uploading Props properties");
 			projectLoader.uploadProjectProperties(project, propProps);
 		}
-		
-		if(loadTriggerFromFile) {
-			logger.info("Loading triggers.");
-			Props triggerProps = new Props();
-			triggerProps.put("projectId", project.getId());
-			triggerProps.put("projectName", project.getName());
-			triggerProps.put("submitUser", uploader.getUserId());
-			try {
-				triggerManager.loadTriggerFromDir(file, triggerProps);
-			} catch (Exception e) {
-				// TODO Auto-generated catch block
-				e.printStackTrace();
-				logger.error("Failed to load triggers.", e);
-			}
-		}
+	
+		//TODO: find something else to load triggers
+//		if(loadTriggerFromFile) {
+//			logger.info("Loading triggers.");
+//			Props triggerProps = new Props();
+//			triggerProps.put("projectId", project.getId());
+//			triggerProps.put("projectName", project.getName());
+//			triggerProps.put("submitUser", uploader.getUserId());
+//			try {
+//				triggerManager.loadTriggerFromDir(file, triggerProps);
+//			} catch (Exception e) {
+//				// TODO Auto-generated catch block
+//				e.printStackTrace();
+//				logger.error("Failed to load triggers.", e);
+//			}
+//		}
 		
 		logger.info("Uploaded project files. Cleaning up temp files.");
 		projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(), "Uploaded project files zip " + archive.getName());
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index ad4104a..fcf079d 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -37,6 +37,7 @@ import org.joda.time.format.DateTimeFormatter;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.project.Project;
@@ -64,7 +65,7 @@ public class ScheduleManager implements TriggerAgent {
 	private Map<Integer, Schedule> scheduleIDMap = new LinkedHashMap<Integer, Schedule>();
 	private Map<Pair<Integer, String>, Schedule> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Schedule>();
 	
-	private final ExecutorManager executorManager;
+	private final ExecutorManagerAdapter executorManager;
 	
 	private ProjectManager projectManager = null;
 	
@@ -80,7 +81,7 @@ public class ScheduleManager implements TriggerAgent {
 	 * 
 	 * @param loader
 	 */
-	public ScheduleManager (ExecutorManager executorManager,
+	public ScheduleManager (ExecutorManagerAdapter executorManager,
 							ScheduleLoader loader,
 							boolean useExternalRunner) 
 	{
@@ -539,7 +540,7 @@ public class ScheduleManager implements TriggerAgent {
 									}
 									
 									try {
-										executorManager.submitExecutableFlow(exflow);
+										executorManager.submitExecutableFlow(exflow, s.getSubmitUser());
 										logger.info("Scheduler has invoked " + exflow.getExecutionId());
 									} 
 									catch (ExecutorManagerException e) {
diff --git a/src/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
index 4c430de..3aaf9ff 100644
--- a/src/java/azkaban/scheduler/ScheduleStatisticManager.java
+++ b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -9,6 +9,7 @@ import java.util.Map;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
 import azkaban.utils.JSONUtils;
@@ -35,7 +36,7 @@ public class ScheduleStatisticManager {
 
 	private static Map<String, Object> calculateStats(int scheduleId, AzkabanWebServer server) throws ScheduleManagerException {
 		Map<String, Object> data = new HashMap<String, Object>();
-		ExecutorManager executorManager = server.getExecutorManager();
+		ExecutorManagerAdapter executorManager = server.getExecutorManager();
 		ScheduleManager scheduleManager = server.getScheduleManager();
 		Schedule schedule = scheduleManager.getSchedule(scheduleId);
 
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index ba6bdd0..1f638d7 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -8,9 +8,8 @@ import java.util.Map;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
-import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.project.ProjectManager;
-import azkaban.sla.SlaOption;
 import azkaban.trigger.Condition;
 import azkaban.trigger.ConditionChecker;
 import azkaban.trigger.Trigger;
@@ -18,9 +17,7 @@ import azkaban.trigger.TriggerAction;
 import azkaban.trigger.TriggerManager;
 import azkaban.trigger.TriggerManagerException;
 import azkaban.trigger.builtin.BasicTimeChecker;
-import azkaban.trigger.builtin.CreateTriggerAction;
 import azkaban.trigger.builtin.ExecuteFlowAction;
-import azkaban.trigger.builtin.SlaChecker;
 
 public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	
@@ -33,7 +30,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 //	private Map<Integer, Trigger> triggersLocalCopy;
 	private long lastUpdateTime = -1;
 	
-	public TriggerBasedScheduleLoader(TriggerManager triggerManager, ExecutorManager executorManager, ProjectManager projectManager, String triggerSource) {
+	public TriggerBasedScheduleLoader(TriggerManager triggerManager, ExecutorManagerAdapter executorManager, ProjectManager projectManager, String triggerSource) {
 		this.triggerManager = triggerManager;
 		this.triggerSource = triggerSource;
 		// need to init the action types and condition checker types 
@@ -46,7 +43,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 		Condition triggerCondition = createTimeTriggerCondition(s);
 		Condition expireCondition = createTimeExpireCondition(s);
 		List<TriggerAction> actions = createActions(s);
-		Trigger t = new Trigger(s.getScheduleId(), new DateTime(s.getLastModifyTime()), new DateTime(s.getSubmitTime()), s.getSubmitUser(), triggerSource, triggerCondition, expireCondition, actions);
+		Trigger t = new Trigger(s.getScheduleId(), s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), triggerSource, triggerCondition, expireCondition, actions);
 		if(s.isRecurring()) {
 			t.setResetOnTrigger(true);
 		}
@@ -72,7 +69,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	
 	private Condition createTimeTriggerCondition (Schedule s) {
 		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
-		ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_1", new DateTime(s.getFirstSchedTime()), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+		ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_1", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
 		checkers.put(checker.getId(), checker);
 		String expr = checker.getId() + ".eval()";
 		Condition cond = new Condition(checkers, expr);
@@ -82,7 +79,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	// if failed to trigger, auto expire?
 	private Condition createTimeExpireCondition (Schedule s) {
 		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
-		ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_2", new DateTime(s.getFirstSchedTime()), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+		ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_2", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
 		checkers.put(checker.getId(), checker);
 		String expr = checker.getId() + ".eval()";
 		Condition cond = new Condition(checkers, expr);
@@ -122,7 +119,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 		List<Schedule> schedules = new ArrayList<Schedule>();
 //		triggersLocalCopy = new HashMap<Integer, Trigger>();
 		for(Trigger t : triggers) {
-			lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
+			lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime());
 			Schedule s = triggerToSchedule(t);
 			schedules.add(s);
 			System.out.println("loaded schedule for " + s.getProjectId() + s.getProjectName());
@@ -156,12 +153,12 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 					act.getProjectName(), 
 					act.getFlowName(), 
 					t.getStatus().toString(), 
-					ck.getFirstCheckTime().getMillis(), 
-					ck.getFirstCheckTime().getZone(), 
+					ck.getFirstCheckTime(), 
+					ck.getTimeZone(), 
 					ck.getPeriod(),
-					t.getLastModifyTime().getMillis(),
-					ck.getNextCheckTime().getMillis(),
-					t.getSubmitTime().getMillis(),
+					t.getLastModifyTime(),
+					ck.getNextCheckTime(),
+					t.getSubmitTime(),
 					t.getSubmitUser(),
 					act.getExecutionOptions(),
 					act.getSlaOptions());
@@ -196,7 +193,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	public synchronized List<Schedule> loadUpdatedSchedules() throws ScheduleManagerException {
 		List<Trigger> triggers;
 		try {
-			triggers = triggerManager.getUpdatedTriggers(triggerSource, lastUpdateTime);
+			triggers = triggerManager.getTriggerUpdates(triggerSource, lastUpdateTime);
 		} catch (TriggerManagerException e) {
 			// TODO Auto-generated catch block
 			e.printStackTrace();
@@ -204,7 +201,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 		}
 		List<Schedule> schedules = new ArrayList<Schedule>();
 		for(Trigger t : triggers) {
-			lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
+			lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime());
 			Schedule s = triggerToSchedule(t);
 			schedules.add(s);
 			System.out.println("loaded schedule for " + s.getProjectId() + s.getProjectName());
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduler.java b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
index 28b0328..29512f7 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduler.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
@@ -29,6 +29,7 @@ import org.joda.time.format.DateTimeFormatter;
 
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.project.ProjectManager;
 import azkaban.sla.SlaOption;
 import azkaban.trigger.TriggerManager;
@@ -55,7 +56,7 @@ public class TriggerBasedScheduler {
 	 * 
 	 * @param loader
 	 */
-	public TriggerBasedScheduler(ExecutorManager executorManager,
+	public TriggerBasedScheduler(ExecutorManagerAdapter executorManager,
 							ProjectManager projectManager, 
 							TriggerManager triggerManager,
 							ScheduleLoader loader) 
diff --git a/src/java/azkaban/trigger/builtin/BasicTimeChecker.java b/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
index 6071019..13eaf27 100644
--- a/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
+++ b/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
@@ -14,8 +14,8 @@ public class BasicTimeChecker implements ConditionChecker {
 
 	public static final String type = "BasicTimeChecker";
 	
-	private DateTime firstCheckTime;
-	private DateTime nextCheckTime;
+	private long firstCheckTime;
+	private long nextCheckTime;
 	private DateTimeZone timezone;
 	private boolean isRecurring = true;
 	private boolean skipPastChecks = true;
@@ -25,7 +25,7 @@ public class BasicTimeChecker implements ConditionChecker {
 	
 	public BasicTimeChecker(
 			String id,
-			DateTime firstCheckTime,
+			long firstCheckTime,
 			DateTimeZone timezone,
 			boolean isRecurring, 
 			boolean skipPastChecks,
@@ -36,13 +36,17 @@ public class BasicTimeChecker implements ConditionChecker {
 		this.isRecurring = isRecurring;
 		this.skipPastChecks = skipPastChecks;
 		this.period = period;
-		this.nextCheckTime = new DateTime(firstCheckTime);
+		this.nextCheckTime = firstCheckTime;
 		this.nextCheckTime = calculateNextCheckTime();
 	}
 	
-	public DateTime getFirstCheckTime() {
+	public long getFirstCheckTime() {
 		return firstCheckTime;
 	}
+	
+	public DateTimeZone getTimeZone() {
+		return timezone;
+	}
 
 	public boolean isRecurring() {
 		return isRecurring;
@@ -56,7 +60,7 @@ public class BasicTimeChecker implements ConditionChecker {
 		return period;
 	}
 
-	public DateTime getNextCheckTime() {
+	public long getNextCheckTime() {
 		return nextCheckTime;
 	}
 	
@@ -75,9 +79,9 @@ public class BasicTimeChecker implements ConditionChecker {
 	
 	public BasicTimeChecker(
 			String id,
-			DateTime firstCheckTime,
+			long firstCheckTime,
 			DateTimeZone timezone,
-			DateTime nextCheckTime,
+			long nextCheckTime,
 			boolean isRecurring, 
 			boolean skipPastChecks,
 			ReadablePeriod period) {
@@ -92,7 +96,7 @@ public class BasicTimeChecker implements ConditionChecker {
 	
 	@Override
 	public Boolean eval() {
-		return nextCheckTime.isBeforeNow();
+		return nextCheckTime < System.currentTimeMillis();
 	}
 
 	@Override
@@ -125,21 +129,22 @@ public class BasicTimeChecker implements ConditionChecker {
 
 	@SuppressWarnings("unchecked")
 	public static BasicTimeChecker createFromJson(Object obj) throws Exception {
-		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
-		if(!jsonObj.get("type").equals(type)) {
-			throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
-		}
-		long firstTimeMillis = Long.valueOf((String)jsonObj.get("firstCheckTime"));
-		String timezoneId = (String) jsonObj.get("timezone");
-		long nextTimeMillis = Long.valueOf((String)jsonObj.get("nextCheckTime"));
-		DateTimeZone timezone = DateTimeZone.forID(timezoneId);
-		DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
-		DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
-		boolean isRecurring = Boolean.valueOf((String)jsonObj.get("isRecurring"));
-		boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
-		ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
-		String id = (String) jsonObj.get("id");
-		return new BasicTimeChecker(id, firstCheckTime, timezone, nextCheckTime, isRecurring, skipPastChecks, period);
+//		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+//		if(!jsonObj.get("type").equals(type)) {
+//			throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
+//		}
+//		long firstCheckTime = Long.valueOf((String)jsonObj.get("firstCheckTime"));
+//		String timezoneId = (String) jsonObj.get("timezone");
+//		long nextCheckTime = Long.valueOf((String)jsonObj.get("nextCheckTime"));
+//		DateTimeZone timezone = DateTimeZone.forID(timezoneId);
+////		DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
+////		DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
+//		boolean isRecurring = Boolean.valueOf((String)jsonObj.get("isRecurring"));
+//		boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
+//		ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
+//		String id = (String) jsonObj.get("id");
+//		return new BasicTimeChecker(id, firstCheckTime, timezone, nextCheckTime, isRecurring, skipPastChecks, period);
+		return createFromJson((HashMap<String, Object>)obj);
 	}
 	
 	public static BasicTimeChecker createFromJson(HashMap<String, Object> obj) throws Exception {
@@ -147,12 +152,12 @@ public class BasicTimeChecker implements ConditionChecker {
 		if(!jsonObj.get("type").equals(type)) {
 			throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
 		}
-		Long firstTimeMillis = Long.valueOf((String) jsonObj.get("firstCheckTime"));
+		Long firstCheckTime = Long.valueOf((String) jsonObj.get("firstCheckTime"));
 		String timezoneId = (String) jsonObj.get("timezone");
-		long nextTimeMillis = Long.valueOf((String) jsonObj.get("nextCheckTime"));
+		long nextCheckTime = Long.valueOf((String) jsonObj.get("nextCheckTime"));
 		DateTimeZone timezone = DateTimeZone.forID(timezoneId);
-		DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
-		DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
+//		DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
+//		DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
 		boolean isRecurring = Boolean.valueOf((String)jsonObj.get("isRecurring"));
 		boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
 		ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
@@ -212,10 +217,10 @@ public class BasicTimeChecker implements ConditionChecker {
 		nextCheckTime = calculateNextCheckTime();
 	}
 	
-	private DateTime calculateNextCheckTime(){
-		DateTime date = new DateTime(nextCheckTime);
+	private long calculateNextCheckTime(){
+		DateTime date = new DateTime(nextCheckTime).withZone(timezone);
 		int count = 0;
-		while(!DateTime.now().isBefore(date)) {
+		while(!date.isAfterNow()) {
 			if(count > 100000) {
 				throw new IllegalStateException("100000 increments of period did not get to present time.");
 			}
@@ -230,7 +235,7 @@ public class BasicTimeChecker implements ConditionChecker {
 				continue;
 			}
 		}
-		return date;
+		return date.getMillis();
 	}
 	
 	@Override
@@ -243,9 +248,9 @@ public class BasicTimeChecker implements ConditionChecker {
 	public Object toJson() {
 		Map<String, Object> jsonObj = new HashMap<String, Object>();
 		jsonObj.put("type", type);
-		jsonObj.put("firstCheckTime", String.valueOf(firstCheckTime.getMillis()));
+		jsonObj.put("firstCheckTime", String.valueOf(firstCheckTime));
 		jsonObj.put("timezone", timezone.getID());
-		jsonObj.put("nextCheckTime", String.valueOf(nextCheckTime.getMillis()));
+		jsonObj.put("nextCheckTime", String.valueOf(nextCheckTime));
 		jsonObj.put("isRecurrint", String.valueOf(isRecurring));
 		jsonObj.put("skipPastChecks", String.valueOf(skipPastChecks));
 		jsonObj.put("period", Utils.createPeriodString(period));
diff --git a/src/java/azkaban/trigger/builtin/CreateTriggerAction.java b/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
index 4f8baac..b4c5f84 100644
--- a/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
+++ b/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
@@ -5,12 +5,12 @@ import java.util.Map;
 
 import azkaban.trigger.Trigger;
 import azkaban.trigger.TriggerAction;
-import azkaban.triggerapp.TriggerRunnerManager;
+import azkaban.trigger.TriggerManager;
 
 public class CreateTriggerAction implements TriggerAction {
 	
 	public static final String type = "CreateTriggerAction";
-	private static TriggerRunnerManager triggerRunnerManager;
+	private static TriggerManager triggerManager;
 	private Trigger trigger;
 	private Map<String, Object> context;
 	private String actionId;
@@ -25,8 +25,8 @@ public class CreateTriggerAction implements TriggerAction {
 		return type;
 	}
 	
-	public static void setTriggerRunnerManager(TriggerRunnerManager trm) {
-		triggerRunnerManager = trm;
+	public static void setTriggerManager(TriggerManager trm) {
+		triggerManager = trm;
 	}
 
 	@SuppressWarnings("unchecked")
@@ -58,7 +58,7 @@ public class CreateTriggerAction implements TriggerAction {
 
 	@Override
 	public void doAction() throws Exception {
-		triggerRunnerManager.insertTrigger(trigger);
+		triggerManager.insertTrigger(trigger);
 	}
 
 	@Override
diff --git a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index 1ff52cb..6124b22 100644
--- a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -10,6 +10,7 @@ import org.apache.log4j.Logger;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.project.Project;
@@ -19,7 +20,7 @@ import azkaban.trigger.Condition;
 import azkaban.trigger.ConditionChecker;
 import azkaban.trigger.Trigger;
 import azkaban.trigger.TriggerAction;
-import azkaban.triggerapp.TriggerRunnerManager;
+import azkaban.trigger.TriggerManager;
 
 public class ExecuteFlowAction implements TriggerAction {
 
@@ -27,8 +28,8 @@ public class ExecuteFlowAction implements TriggerAction {
 
 	public static final String EXEC_ID = "ExecuteFlowAction.execid";
 	
-	private static ExecutorManager executorManager;
-	private static TriggerRunnerManager triggerRunnerManager;
+	private static ExecutorManagerAdapter executorManager;
+	private static TriggerManager triggerManager;
 	private String actionId;
 	private int projectId;
 	private String projectName;
@@ -99,20 +100,20 @@ public class ExecuteFlowAction implements TriggerAction {
 		this.slaOptions = slaOptions;
 	}
 
-	public static ExecutorManager getExecutorManager() {
+	public static ExecutorManagerAdapter getExecutorManager() {
 		return executorManager;
 	}
  	
-	public static void setExecutorManager(ExecutorManager executorManager) {
+	public static void setExecutorManager(ExecutorManagerAdapter executorManager) {
 		ExecuteFlowAction.executorManager = executorManager;
 	}
 	
-	public static TriggerRunnerManager getTriggerRunnerManager() {
-		return triggerRunnerManager;
+	public static TriggerManager getTriggerManager() {
+		return triggerManager;
 	}
  	
-	public static void setTriggerRunnerManager(TriggerRunnerManager triggerRunnerManager) {
-		ExecuteFlowAction.triggerRunnerManager = triggerRunnerManager;
+	public static void setTriggerManager(TriggerManager triggerManager) {
+		ExecuteFlowAction.triggerManager = triggerManager;
 	}
 
 	public static ProjectManager getProjectManager() {
@@ -214,7 +215,7 @@ public class ExecuteFlowAction implements TriggerAction {
 		exflow.setExecutionOptions(executionOptions);
 		
 		try{
-			executorManager.submitExecutableFlow(exflow);
+			executorManager.submitExecutableFlow(exflow, submitUser);
 			Map<String, Object> outputProps = new HashMap<String, Object>();
 			outputProps.put(EXEC_ID, exflow.getExecutionId());
 			context.put(actionId, outputProps);
@@ -250,7 +251,7 @@ public class ExecuteFlowAction implements TriggerAction {
 				Trigger slaTrigger = new Trigger("azkaban_sla", "azkaban", triggerCond, expireCond, actions);
 				slaTrigger.setResetOnTrigger(false);
 				slaTrigger.setResetOnExpire(false);
-				triggerRunnerManager.insertTrigger(slaTrigger);
+				triggerManager.insertTrigger(slaTrigger);
 			}
 		}
 		
diff --git a/src/java/azkaban/trigger/builtin/KillExecutionAction.java b/src/java/azkaban/trigger/builtin/KillExecutionAction.java
index 1da405d..0dabe73 100644
--- a/src/java/azkaban/trigger/builtin/KillExecutionAction.java
+++ b/src/java/azkaban/trigger/builtin/KillExecutionAction.java
@@ -7,6 +7,7 @@ import org.apache.log4j.Logger;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.trigger.TriggerAction;
 
 public class KillExecutionAction implements TriggerAction{
@@ -17,14 +18,14 @@ public class KillExecutionAction implements TriggerAction{
 	
 	private String actionId;
 	private int execId;
-	private static ExecutorManager executorManager;
+	private static ExecutorManagerAdapter executorManager;
 	
 	public KillExecutionAction(String actionId, int execId) {
 		this.execId = execId;
 		this.actionId = actionId;
 	}
 	
-	public static void setExecutorManager(ExecutorManager em) {
+	public static void setExecutorManager(ExecutorManagerAdapter em) {
 		executorManager = em;
 	}
 	
@@ -73,7 +74,7 @@ public class KillExecutionAction implements TriggerAction{
 	public void doAction() throws Exception {
 		ExecutableFlow exFlow = executorManager.getExecutableFlow(execId);
 		logger.info("ready to kill execution " + execId);
-		if(!executorManager.isFinished(exFlow)) {
+		if(!ExecutableFlow.isFinished(exFlow)) {
 			logger.info("Killing execution " + execId);
 			executorManager.cancelFlow(exFlow, "azkaban_sla");
 		}
diff --git a/src/java/azkaban/trigger/builtin/SlaAlertAction.java b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
index ead2124..11c6bd5 100644
--- a/src/java/azkaban/trigger/builtin/SlaAlertAction.java
+++ b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
@@ -16,6 +16,7 @@ import org.apache.log4j.Logger;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorMailer;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManager.Alerter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.sla.SlaOption;
@@ -36,7 +37,7 @@ public class SlaAlertAction implements TriggerAction{
 //	private List<Map<String, Object>> alerts;
 	private static Map<String, Alerter> alerters;
 	private Map<String, Object> context;
-	private static ExecutorManager executorManager;
+	private static ExecutorManagerAdapter executorManager;
 
 	public SlaAlertAction(String id, SlaOption slaOption, int execId) {
 		this.actionId = id;
@@ -49,7 +50,7 @@ public class SlaAlertAction implements TriggerAction{
 		alerters = alts;
 	}
 	
-	public static void setExecutorManager(ExecutorManager em) {
+	public static void setExecutorManager(ExecutorManagerAdapter em) {
 		executorManager = em;
 	}
 	
diff --git a/src/java/azkaban/trigger/builtin/SlaChecker.java b/src/java/azkaban/trigger/builtin/SlaChecker.java
index e9826eb..05a00ac 100644
--- a/src/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/src/java/azkaban/trigger/builtin/SlaChecker.java
@@ -11,6 +11,7 @@ import org.joda.time.ReadablePeriod;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
 import azkaban.sla.SlaOption;
@@ -28,7 +29,7 @@ public class SlaChecker implements ConditionChecker{
 	private Map<String, Object> context;
 	private boolean passChecker = true;
 	
-	private static ExecutorManager executorManager;
+	private static ExecutorManagerAdapter executorManager;
 	
 	public SlaChecker(String id, SlaOption slaOption, int execId, boolean passChecker) {
 		this.id = id;
@@ -46,7 +47,7 @@ public class SlaChecker implements ConditionChecker{
 		this.passChecker = passChecker;
 	}
 
-	public static void setExecutorManager(ExecutorManager em) {
+	public static void setExecutorManager(ExecutorManagerAdapter em) {
 		executorManager = em;
 	}
 	
@@ -226,4 +227,10 @@ public class SlaChecker implements ConditionChecker{
 		this.context = context;
 	}
 
+	@Override
+	public long getNextCheckTime() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
 }
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index 451285b..0311b54 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -9,6 +9,7 @@ import org.apache.commons.jexl2.Expression;
 import org.apache.commons.jexl2.JexlEngine;
 import org.apache.commons.jexl2.MapContext;
 import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
 
 
 public class Condition {
@@ -20,12 +21,19 @@ public class Condition {
 	private Expression expression;
 	private Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
 	private MapContext context = new MapContext();
+	private long nextCheckTime = -1;	
 	
 	public Condition(Map<String, ConditionChecker> checkers, String expr) {
 		setCheckers(checkers);
 		this.expression = jexl.createExpression(expr);
 	}
 	
+	public Condition(Map<String, ConditionChecker> checkers, String expr, long nextCheckTime) {
+		this.nextCheckTime = nextCheckTime;
+		setCheckers(checkers);
+		this.expression = jexl.createExpression(expr);
+	}
+	
 	public synchronized static void setJexlEngine(JexlEngine jexl) {
 		Condition.jexl = jexl;
 	}
@@ -43,6 +51,10 @@ public class Condition {
 		context.set(checker.getId(), checker);
 	}
 	
+	public long getNextCheckTime() {
+		return nextCheckTime;
+	}
+	
 	public Map<String, ConditionChecker> getCheckers() {
 		return this.checkers;
 	}
@@ -55,9 +67,13 @@ public class Condition {
 	}
 	
 	public void resetCheckers() {
+		long time = Long.MAX_VALUE;
 		for(ConditionChecker checker : checkers.values()) {
 			checker.reset();
+			time = Math.min(time, checker.getNextCheckTime());
 		}
+		logger.error("Done resetting checkers. The next check time will be " + new DateTime(time));
+		this.nextCheckTime = time;
 	}
 	
 	public String getExpression() {
@@ -84,6 +100,7 @@ public class Condition {
 			checkersJson.add(oneChecker);
 		}
 		jsonObj.put("checkers", checkersJson);
+		jsonObj.put("nextCheckTime", String.valueOf(nextCheckTime));
 		
 		return jsonObj;
 	}
@@ -107,8 +124,9 @@ public class Condition {
 				checkers.put(ck.getId(), ck);
 			}
 			String expr = (String) jsonObj.get("expression");
+			long nextCheckTime = Long.valueOf((String) jsonObj.get("nextCheckTime"));
 				
-			cond = new Condition(checkers, expr);
+			cond = new Condition(checkers, expr, nextCheckTime);
 			
 		} catch(Exception e) {
 			e.printStackTrace();
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
index 85b4003..342312b 100644
--- a/src/java/azkaban/trigger/ConditionChecker.java
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -23,4 +23,5 @@ public interface ConditionChecker {
 	
 	void setContext(Map<String, Object> context);
 	
+	long getNextCheckTime();
 }
diff --git a/src/java/azkaban/trigger/JdbcTriggerLoader.java b/src/java/azkaban/trigger/JdbcTriggerLoader.java
index 658594b..2d70c65 100644
--- a/src/java/azkaban/trigger/JdbcTriggerLoader.java
+++ b/src/java/azkaban/trigger/JdbcTriggerLoader.java
@@ -73,7 +73,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 	}
 
 	@Override
-	public List<Trigger> getUpdatedTriggers(long lastUpdateTime) throws TriggerManagerException {
+	public List<Trigger> getUpdatedTriggers(long lastUpdateTime) throws TriggerLoaderException {
 		logger.info("Loading triggers changed since " + new DateTime(lastUpdateTime).toString());
 		Connection connection = getConnection();
 
@@ -87,7 +87,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 		} catch (SQLException e) {
 			logger.error(GET_ALL_TRIGGERS + " failed.");
 
-			throw new TriggerManagerException("Loading triggers from db failed. ", e);
+			throw new TriggerLoaderException("Loading triggers from db failed. ", e);
 		} finally {
 			DbUtils.closeQuietly(connection);
 		}
@@ -98,7 +98,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 	}
 	
 	@Override
-	public List<Trigger> loadTriggers() throws TriggerManagerException {
+	public List<Trigger> loadTriggers() throws TriggerLoaderException {
 		logger.info("Loading all triggers from db.");
 		Connection connection = getConnection();
 
@@ -112,7 +112,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 		} catch (SQLException e) {
 			logger.error(GET_ALL_TRIGGERS + " failed.");
 
-			throw new TriggerManagerException("Loading triggers from db failed. ", e);
+			throw new TriggerLoaderException("Loading triggers from db failed. ", e);
 		} finally {
 			DbUtils.closeQuietly(connection);
 		}
@@ -123,38 +123,38 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 	}
 
 	@Override
-	public void removeTrigger(Trigger t) throws TriggerManagerException {		
+	public void removeTrigger(Trigger t) throws TriggerLoaderException {		
 		logger.info("Removing trigger " + t.toString() + " from db.");
 
 		QueryRunner runner = createQueryRunner();
 		try {
 			int removes =  runner.update(REMOVE_TRIGGER, t.getTriggerId());
 			if (removes == 0) {
-				throw new TriggerManagerException("No trigger has been removed.");
+				throw new TriggerLoaderException("No trigger has been removed.");
 			}
 		} catch (SQLException e) {
 			logger.error(REMOVE_TRIGGER + " failed.");
-			throw new TriggerManagerException("Remove trigger " + t.toString() + " from db failed. ", e);
+			throw new TriggerLoaderException("Remove trigger " + t.toString() + " from db failed. ", e);
 		}
 	}
 	
 	@Override
-	public void addTrigger(Trigger t) throws TriggerManagerException {
+	public void addTrigger(Trigger t) throws TriggerLoaderException {
 		logger.info("Inserting trigger " + t.toString() + " into db.");
-		t.setLastModifyTime(DateTime.now());
+		t.setLastModifyTime(System.currentTimeMillis());
 		Connection connection = getConnection();
 		try {
 			addTrigger(connection, t, defaultEncodingType);
 		}
 		catch (Exception e) {
-			throw new TriggerManagerException("Error uploading trigger", e);
+			throw new TriggerLoaderException("Error uploading trigger", e);
 		}
 		finally {
 			DbUtils.closeQuietly(connection);
 		}
 	}
 
-	private synchronized void addTrigger(Connection connection, Trigger t, EncodingType encType) throws TriggerManagerException {
+	private synchronized void addTrigger(Connection connection, Trigger t, EncodingType encType) throws TriggerLoaderException {
 		
 		QueryRunner runner = new QueryRunner();
 		
@@ -166,37 +166,37 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 			id = runner.query(connection, LastInsertID.LAST_INSERT_ID, new LastInsertID());
 
 			if (id == -1l) {
-				throw new TriggerManagerException("trigger id is not properly created.");
+				logger.error("trigger id is not properly created.");
+				throw new TriggerLoaderException("trigger id is not properly created.");
 			}
 			
 			t.setTriggerId((int)id);
 			updateTrigger(t);
-			logger.info("uploaded trigger " + t.toString());
+			logger.info("uploaded trigger " + t.getDescription());
 		} catch (SQLException e) {
-			throw new TriggerManagerException("Error creating trigger.", e);
+			throw new TriggerLoaderException("Error creating trigger.", e);
 		}
 		
 	}
 	
 	@Override
-	public void updateTrigger(Trigger t) throws TriggerManagerException {
-		logger.info("Updating trigger " + t.toString() + " into db.");
-		t.setLastModifyTime(DateTime.now());
+	public void updateTrigger(Trigger t) throws TriggerLoaderException {
+		logger.info("Updating trigger " + t.getTriggerId() + " into db.");
+		t.setLastModifyTime(System.currentTimeMillis());
 		Connection connection = getConnection();
 		try{
-			t.setLastModifyTime(DateTime.now());
 			updateTrigger(connection, t, defaultEncodingType);
 		}
 		catch(Exception e) {
 			e.printStackTrace();
-			throw new TriggerManagerException("Failed to update trigger " + t.toString() + " into db!");
+			throw new TriggerLoaderException("Failed to update trigger " + t.toString() + " into db!");
 		}
 		finally {
 			DbUtils.closeQuietly(connection);
 		}
 	}
 		
-	private void updateTrigger(Connection connection, Trigger t, EncodingType encType) throws TriggerManagerException {
+	private void updateTrigger(Connection connection, Trigger t, EncodingType encType) throws TriggerLoaderException {
 
 		String json = JSONUtils.toJSON(t.toJson());
 		byte[] data = null;
@@ -210,7 +210,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
 		}
 		catch (IOException e) {
-			throw new TriggerManagerException("Error encoding the trigger " + t.toString());
+			throw new TriggerLoaderException("Error encoding the trigger " + t.toString());
 		}
 		
 		QueryRunner runner = new QueryRunner();
@@ -219,20 +219,22 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 			int updates =  runner.update( connection, 
 					UPDATE_TRIGGER, 
 					t.getSource(),
-					t.getLastModifyTime().getMillis(),
+					t.getLastModifyTime(),
 					encType.getNumVal(),
 					data,
 					t.getTriggerId());
+			connection.commit();
 			if (updates == 0) {
-				throw new TriggerManagerException("No trigger has been updated.");
+				throw new TriggerLoaderException("No trigger has been updated.");
 				//logger.error("No trigger is updated!");
+			} else {
+				logger.info("Updated " + updates + " records.");
 			}
 		} catch (SQLException e) {
 			logger.error(UPDATE_TRIGGER + " failed.");
-			throw new TriggerManagerException("Update trigger " + t.toString() + " into db failed. ", e);
+			throw new TriggerLoaderException("Update trigger " + t.toString() + " into db failed. ", e);
 		}
 	}
-
 	
 	private static class LastInsertID implements ResultSetHandler<Long> {
 		private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
@@ -301,20 +303,20 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 		
 	}
 	
-	private Connection getConnection() throws TriggerManagerException {
+	private Connection getConnection() throws TriggerLoaderException {
 		Connection connection = null;
 		try {
 			connection = super.getDBConnection(false);
 		} catch (Exception e) {
 			DbUtils.closeQuietly(connection);
-			throw new TriggerManagerException("Error getting DB connection.", e);
+			throw new TriggerLoaderException("Error getting DB connection.", e);
 		}
 		
 		return connection;
 	}
 
 	@Override
-	public Trigger loadTrigger(int triggerId) throws TriggerManagerException {
+	public Trigger loadTrigger(int triggerId) throws TriggerLoaderException {
 		logger.info("Loading trigger " + triggerId + " from db.");
 		Connection connection = getConnection();
 
@@ -327,14 +329,14 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 			triggers = runner.query(connection, GET_TRIGGER, handler, triggerId);
 		} catch (SQLException e) {
 			logger.error(GET_TRIGGER + " failed.");
-			throw new TriggerManagerException("Loading trigger from db failed. ", e);
+			throw new TriggerLoaderException("Loading trigger from db failed. ", e);
 		} finally {
 			DbUtils.closeQuietly(connection);
 		}
 		
 		if(triggers.size() == 0) {
 			logger.error("Loaded 0 triggers. Failed to load trigger " + triggerId);
-			throw new TriggerManagerException("Loaded 0 triggers. Failed to load trigger " + triggerId);
+			throw new TriggerLoaderException("Loaded 0 triggers. Failed to load trigger " + triggerId);
 		}
 		
 		return triggers.get(0);
diff --git a/src/java/azkaban/trigger/Trigger.java b/src/java/azkaban/trigger/Trigger.java
index d668b5e..013a548 100644
--- a/src/java/azkaban/trigger/Trigger.java
+++ b/src/java/azkaban/trigger/Trigger.java
@@ -8,13 +8,15 @@ import java.util.Map;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
+import azkaban.utils.JSONUtils;
+
 public class Trigger {
 	
 	private static Logger logger = Logger.getLogger(Trigger.class);
 	
 	private int triggerId = -1;
-	private DateTime lastModifyTime;
-	private DateTime submitTime;
+	private long lastModifyTime;
+	private long submitTime;
 	private String submitUser;
 	private String source;
 	private TriggerStatus status = TriggerStatus.READY;
@@ -32,12 +34,26 @@ public class Trigger {
 	private boolean resetOnTrigger = true;
 	private boolean resetOnExpire = true;
 	
+	private long nextCheckTime = -1;
+	
 	@SuppressWarnings("unused")
 	private Trigger() throws TriggerManagerException {	
 		throw new TriggerManagerException("Triggers should always be specified");
 	}
 	
-	public DateTime getSubmitTime() {
+	public void updateNextCheckTime() {
+		this.nextCheckTime = Math.min(triggerCondition.getNextCheckTime(), expireCondition.getNextCheckTime());
+	}
+	
+	public long getNextCheckTime() {
+		return nextCheckTime;
+	}
+
+	public void setNextCheckTime(long nct) {
+		this.nextCheckTime = nct;
+	}
+	
+	public long getSubmitTime() {
 		return submitTime;
 	}
 
@@ -86,8 +102,8 @@ public class Trigger {
 	}
 	
 	public Trigger(
-			DateTime lastModifyTime, 
-			DateTime submitTime, 
+			long lastModifyTime, 
+			long submitTime, 
 			String submitUser, 
 			String source,
 			Condition triggerCondition,
@@ -109,8 +125,8 @@ public class Trigger {
 	}
 	
 	public Trigger(
-			DateTime lastModifyTime, 
-			DateTime submitTime, 
+			long lastModifyTime, 
+			long submitTime, 
 			String submitUser, 
 			String source,
 			Condition triggerCondition,
@@ -134,8 +150,8 @@ public class Trigger {
 			Condition expireCondition,
 			List<TriggerAction> actions, 
 			List<TriggerAction> expireActions) {
-		this.lastModifyTime = DateTime.now();
-		this.submitTime = DateTime.now();
+		this.lastModifyTime = DateTime.now().getMillis();
+		this.submitTime = DateTime.now().getMillis();
 		this.submitUser = submitUser;
 		this.source = source;
 		this.triggerCondition = triggerCondition;
@@ -150,8 +166,8 @@ public class Trigger {
 			Condition triggerCondition,
 			Condition expireCondition,
 			List<TriggerAction> actions) {
-		this.lastModifyTime = DateTime.now();
-		this.submitTime = DateTime.now();
+		this.lastModifyTime = DateTime.now().getMillis();
+		this.submitTime = DateTime.now().getMillis();
 		this.submitUser = submitUser;
 		this.source = source;
 		this.triggerCondition = triggerCondition;
@@ -161,8 +177,8 @@ public class Trigger {
 	}
 	
 	public Trigger(
-			DateTime lastModifyTime, 
-			DateTime submitTime, 
+			long lastModifyTime, 
+			long submitTime, 
 			String submitUser, 
 			String source,
 			Condition triggerCondition,
@@ -180,8 +196,8 @@ public class Trigger {
 	
 	public Trigger(
 			int triggerId,
-			DateTime lastModifyTime, 
-			DateTime submitTime, 
+			long lastModifyTime, 
+			long submitTime,
 			String submitUser, 
 			String source,
 			Condition triggerCondition,
@@ -205,8 +221,8 @@ public class Trigger {
 	
 	public Trigger(
 			int triggerId,
-			DateTime lastModifyTime, 
-			DateTime submitTime, 
+			long lastModifyTime, 
+			long submitTime, 
 			String submitUser, 
 			String source,
 			Condition triggerCondition,
@@ -226,8 +242,8 @@ public class Trigger {
 	
 	public Trigger(
 			int triggerId,
-			DateTime lastModifyTime, 
-			DateTime submitTime, 
+			long lastModifyTime, 
+			long submitTime,
 			String submitUser, 
 			String source,
 			Condition triggerCondition,
@@ -268,11 +284,11 @@ public class Trigger {
 		this.resetOnExpire = resetOnExpire;
 	}
 
-	public DateTime getLastModifyTime() {
+	public long getLastModifyTime() {
 		return lastModifyTime;
 	}
 	
-	public void setLastModifyTime(DateTime lastModifyTime) {
+	public void setLastModifyTime(long lastModifyTime) {
 		this.lastModifyTime = lastModifyTime;
 	}
 
@@ -329,8 +345,8 @@ public class Trigger {
 		jsonObj.put("resetOnExpire", String.valueOf(resetOnExpire));
 		jsonObj.put("submitUser", submitUser);
 		jsonObj.put("source", source);
-		jsonObj.put("submitTime", String.valueOf(submitTime.getMillis()));
-		jsonObj.put("lastModifyTime", String.valueOf(lastModifyTime.getMillis()));
+		jsonObj.put("submitTime", String.valueOf(submitTime));
+		jsonObj.put("lastModifyTime", String.valueOf(lastModifyTime));
 		jsonObj.put("triggerId", String.valueOf(triggerId));
 		jsonObj.put("status", status.toString());
 		jsonObj.put("info", info);
@@ -354,6 +370,7 @@ public class Trigger {
 		
 		Trigger trigger = null;
 		try{
+			logger.info("Decoding for " + JSONUtils.toJSON(obj));
 			Condition triggerCond = Condition.fromJson(jsonObj.get("triggerCondition"));
 			Condition expireCond = Condition.fromJson(jsonObj.get("expireCondition"));
 			List<TriggerAction> actions = new ArrayList<TriggerAction>();
@@ -376,14 +393,15 @@ public class Trigger {
 			boolean resetOnExpire = Boolean.valueOf((String) jsonObj.get("resetOnExpire"));
 			String submitUser = (String) jsonObj.get("submitUser");
 			String source = (String) jsonObj.get("source");
-			long submitTimeMillis = Long.valueOf((String) jsonObj.get("submitTime"));
-			long lastModifyTimeMillis = Long.valueOf((String) jsonObj.get("lastModifyTime"));
-			DateTime submitTime = new DateTime(submitTimeMillis);
-			DateTime lastModifyTime = new DateTime(lastModifyTimeMillis);
+			long submitTime = Long.valueOf((String) jsonObj.get("submitTime"));
+			long lastModifyTime = Long.valueOf((String) jsonObj.get("lastModifyTime"));
 			int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
 			TriggerStatus status = TriggerStatus.valueOf((String)jsonObj.get("status"));
 			Map<String, Object> info = (Map<String, Object>) jsonObj.get("info");
 			Map<String, Object> context = (Map<String, Object>) jsonObj.get("context");
+			if(context == null) {
+				context = new HashMap<String, Object>();
+			}
 			for(ConditionChecker checker : triggerCond.getCheckers().values()) {
 				checker.setContext(context);
 			}
diff --git a/src/java/azkaban/trigger/TriggerAgent.java b/src/java/azkaban/trigger/TriggerAgent.java
index 453f49d..cdabe94 100644
--- a/src/java/azkaban/trigger/TriggerAgent.java
+++ b/src/java/azkaban/trigger/TriggerAgent.java
@@ -3,10 +3,12 @@ package azkaban.trigger;
 import azkaban.utils.Props;
 
 public interface TriggerAgent {
-	void loadTriggerFromProps(Props props) throws Exception;
+	public void loadTriggerFromProps(Props props) throws Exception;
 
-	String getTriggerSource();
+	public String getTriggerSource();
 	
-	void start() throws Exception;
+	public void start() throws Exception;
+	
+	public void shutdown();
 
 }
diff --git a/src/java/azkaban/trigger/TriggerLoader.java b/src/java/azkaban/trigger/TriggerLoader.java
index 7adf742..bddb9cc 100644
--- a/src/java/azkaban/trigger/TriggerLoader.java
+++ b/src/java/azkaban/trigger/TriggerLoader.java
@@ -4,16 +4,16 @@ import java.util.List;
 
 public interface TriggerLoader {
 
-	public void addTrigger(Trigger t) throws TriggerManagerException;	
+	public void addTrigger(Trigger t) throws TriggerLoaderException;	
 
-	public void removeTrigger(Trigger s) throws TriggerManagerException;
+	public void removeTrigger(Trigger s) throws TriggerLoaderException;
 	
-	public void updateTrigger(Trigger t) throws TriggerManagerException;
+	public void updateTrigger(Trigger t) throws TriggerLoaderException;
 	
-	public List<Trigger> loadTriggers() throws TriggerManagerException;
+	public List<Trigger> loadTriggers() throws TriggerLoaderException;
 
-	public Trigger loadTrigger(int triggerId) throws TriggerManagerException;
+	public Trigger loadTrigger(int triggerId) throws TriggerLoaderException;
 
-	public List<Trigger> getUpdatedTriggers(long lastUpdateTime) throws TriggerManagerException;
+	public List<Trigger> getUpdatedTriggers(long lastUpdateTime) throws TriggerLoaderException;
 	
 }
diff --git a/src/java/azkaban/trigger/TriggerLoaderException.java b/src/java/azkaban/trigger/TriggerLoaderException.java
new file mode 100644
index 0000000..f6b9b41
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerLoaderException.java
@@ -0,0 +1,34 @@
+package azkaban.trigger;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+public class TriggerLoaderException extends Exception{
+	private static final long serialVersionUID = 1L;
+
+	public TriggerLoaderException(String message) {
+		super(message);
+	}
+	
+	public TriggerLoaderException(String message, Throwable cause) {
+		super(message, cause);
+	}
+	
+	public TriggerLoaderException(Throwable e) {
+		super(e);
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 0b21166..bdcd30f 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -1,533 +1,450 @@
-/*
- * Copyright 2012 LinkedIn, Inc
- * 
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
 package azkaban.trigger;
 
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.lang.Thread.State;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.impl.client.BasicResponseHandler;
-import org.apache.http.impl.client.DefaultHttpClient;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.log4j.Logger;
 
-import azkaban.triggerapp.TriggerConnectorParams;
-import azkaban.utils.JSONUtils;
-import azkaban.utils.Pair;
 import azkaban.utils.Props;
 
-/**
- * Executor manager used to manage the client side job.
- *
- */
-public class TriggerManager {
+public class TriggerManager implements TriggerManagerAdapter{
 	private static Logger logger = Logger.getLogger(TriggerManager.class);
+	private static final long DEFAULT_SCANNER_INTERVAL_MS = 60000;
 
-	public static final String TRIGGER_SUFFIX = ".trigger";
+	private static Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
 	
-	private TriggerLoader triggerLoader;
 	private CheckerTypeLoader checkerTypeLoader;
 	private ActionTypeLoader actionTypeLoader;
-	
-	private String triggerServerHost;
-	private int triggerServerPort;
-	
-	private Set<Pair<String, Integer>> triggerServers = new HashSet<Pair<String,Integer>>();
-	
-	private ConcurrentHashMap<Integer, Trigger> triggerIdMap = new ConcurrentHashMap<Integer, Trigger>();
-	
-	private Map<String, TriggerAgent> triggerAgents = new HashMap<String, TriggerAgent>();
+	private TriggerLoader triggerLoader;
 
-	private TriggerManagerUpdaterThread triggerManagingThread;
+	private final TriggerScannerThread runnerThread;
+	private long lastRunnerThreadCheckTime = -1;
+	private long runnerThreadIdleTime = -1;
+	private LocalTriggerJMX jmxStats = new LocalTriggerJMX();
 	
-	private long lastThreadCheckTime = -1;
-	
-	private long lastUpdateTime = -1;
-	
-	public TriggerManager(Props props, TriggerLoader loader) throws TriggerManagerException {
-		this.triggerLoader = loader;
-		this.checkerTypeLoader = new CheckerTypeLoader();
-		this.actionTypeLoader = new ActionTypeLoader();
+	public TriggerManager(Props props, TriggerLoader triggerLoader) throws TriggerManagerException {
 
-		triggerServerHost = props.getString("trigger.server.host", "localhost");
-		triggerServerPort = props.getInt("trigger.server.port");
-
-		triggerManagingThread = new TriggerManagerUpdaterThread();
+		this.triggerLoader = triggerLoader;
 		
-		try{
+		long scannerInterval = props.getLong("trigger.scan.interval", DEFAULT_SCANNER_INTERVAL_MS);
+		runnerThread = new TriggerScannerThread(scannerInterval);
+
+		checkerTypeLoader = new CheckerTypeLoader();
+		actionTypeLoader = new ActionTypeLoader();
+
+		try {
 			checkerTypeLoader.init(props);
 			actionTypeLoader.init(props);
-		} catch(Exception e) {
-			e.printStackTrace();
-			logger.error(e.getMessage());
+		} catch (Exception e) {
+			throw new TriggerManagerException(e);
 		}
-		
 		Condition.setCheckerLoader(checkerTypeLoader);
 		Trigger.setActionTypeLoader(actionTypeLoader);
-		
-		triggerServers.add(new Pair<String, Integer>(triggerServerHost, triggerServerPort));
-
-	}
-	
-	public void start() throws Exception {
-		loadTriggers();
-		for(TriggerAgent agent : triggerAgents.values()) {
-			agent.start();
-		}
-		triggerManagingThread.start();
 	}
-	
-	private static class SuffixFilter implements FileFilter {
-		private String suffix;
-		public SuffixFilter(String suffix) {
-			this.suffix = suffix;
-		}
 
-		@Override
-		public boolean accept(File pathname) {
-			String name = pathname.getName();
-			return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
-		}
-	}
-	
-	public String getTriggerServerHost() {
-		return triggerServerHost;
-	}
-	
-	public int getTriggerServerPort() {
-		return triggerServerPort;
-	}
-	
-	public State getUpdaterThreadState() {
-		return triggerManagingThread.getState();
-	}
-	
-	public boolean isThreadActive() {
-		return triggerManagingThread.isAlive();
-	}
-	
-	public long getLastThreadCheckTime() {
-		return lastThreadCheckTime;
-	}
-	
-	public Set<String> getPrimaryServerHosts() {
-		// Only one for now. More probably later.
-		HashSet<String> ports = new HashSet<String>();
-		ports.add(triggerServerHost + ":" + triggerServerPort);
-		return ports;
-	}
-	
-	private void loadTriggers() throws TriggerManagerException {
-		List<Trigger> triggerList = triggerLoader.loadTriggers();
-		for(Trigger t : triggerList) {
-			if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
-				removeTrigger(t, "azkaban");
-			} else {
+	@Override
+	public void start() throws TriggerManagerException{
+		
+		try{
+			// expect loader to return valid triggers
+			List<Trigger> triggers = triggerLoader.loadTriggers();
+			for(Trigger t : triggers) {
+				runnerThread.addTrigger(t);
 				triggerIdMap.put(t.getTriggerId(), t);
 			}
+		}catch(Exception e) {
+			e.printStackTrace();
+			throw new TriggerManagerException(e);
 		}
+		
+		runnerThread.start();
 	}
 	
-	public Trigger getTrigger(int triggerId) {
-		return triggerIdMap.get(triggerId);
-	}
-	
-	public void removeTrigger(Trigger t, String userId) throws TriggerManagerException {
-		synchronized(t) {
-			logger.info("Removing trigger " + t.getTriggerId() + " by " + userId);
-			callTriggerServer(t, TriggerConnectorParams.REMOVE_TRIGGER_ACTION, userId);
-			triggerIdMap.remove(t.getTriggerId());
-		}
+	protected CheckerTypeLoader getCheckerLoader() {
+		return checkerTypeLoader;
 	}
 
-	public void updateTrigger(Trigger t, String userId) throws TriggerManagerException {
-		synchronized(t) {
-			try {
-				triggerLoader.updateTrigger(t);
-				callTriggerServer(t, TriggerConnectorParams.UPDATE_TRIGGER_ACTION, userId);
-			} catch(TriggerManagerException e) {
-				throw new TriggerManagerException(e);
-			}
-		}
+	protected ActionTypeLoader getActionLoader() {
+		return actionTypeLoader;
 	}
-	
-//	public void getUpdatedTriggers() throws TriggerManagerException {
-//		try {
-//			callTriggerServer(triggerServerHost, triggerServerPort, TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", (Pair<String,String>[])null);
-//		} catch(IOException e) {
-//			throw new TriggerManagerException(e);
-//		}
-//	}
-	
-	public String insertTrigger(Trigger t, String userId) throws TriggerManagerException {
-		synchronized(t) {
-			String message = null;
-			logger.info("Inserting trigger into system. " );
-			// The trigger id is set by the loader. So it's unavailable until after this call.
-			t.setStatus(TriggerStatus.PREPARING);
+
+	public synchronized void insertTrigger(Trigger t) throws TriggerManagerException {
+		try {
 			triggerLoader.addTrigger(t);
-			try {
-				callTriggerServer(t,  TriggerConnectorParams.INSERT_TRIGGER_ACTION, userId);
-				triggerIdMap.put(t.getTriggerId(), t);
-				
-				message += "Trigger inserted successfully with trigger id " + t.getTriggerId();
-			}
-			catch (TriggerManagerException e) {
-				throw e;
-			}
-			return message;
+		} catch (TriggerLoaderException e) {
+			throw new TriggerManagerException(e);
 		}
+		runnerThread.addTrigger(t);
+		triggerIdMap.put(t.getTriggerId(), t);
 	}
 	
-	private Map<String, Object> callTriggerServer(Trigger t, String action, String user) throws TriggerManagerException {
-		try {
-			return callTriggerServer(triggerServerHost, triggerServerPort, action, t.getTriggerId(), user, (Pair<String,String>[])null);
-		} catch (IOException e) {
-			throw new TriggerManagerException(e);
+	public synchronized void removeTrigger(int id) throws TriggerManagerException {
+		Trigger t = triggerIdMap.get(id);
+		if(t != null) {
+			removeTrigger(triggerIdMap.get(id));
 		}
 	}
 	
-	private Map<String, Object> callTriggerServer(String host, int port, String action, Integer triggerId, String user, Pair<String,String> ... params) throws IOException {
-		URIBuilder builder = new URIBuilder();
-		builder.setScheme("http")
-			.setHost(host)
-			.setPort(port)
-			.setPath("/trigger");
-
-		builder.setParameter(TriggerConnectorParams.ACTION_PARAM, action);
-		
-		if (triggerId != null) {
-			builder.setParameter(TriggerConnectorParams.TRIGGER_ID_PARAM,String.valueOf(triggerId));
+	public synchronized void updateTrigger(int id) throws TriggerManagerException {
+		if(! triggerIdMap.containsKey(id)) {
+			throw new TriggerManagerException("The trigger to update " + id + " doesn't exist!");
 		}
 		
-		if (user != null) {
-			builder.setParameter(TriggerConnectorParams.USER_PARAM, user);
-		}
-		
-		if (params != null) {
-			for (Pair<String, String> pair: params) {
-				builder.setParameter(pair.getFirst(), pair.getSecond());
-			}
-		}
-
-		URI uri = null;
-		try {
-			uri = builder.build();
-		} catch (URISyntaxException e) {
-			throw new IOException(e);
-		}
-		
-		ResponseHandler<String> responseHandler = new BasicResponseHandler();
-		
-		HttpClient httpclient = new DefaultHttpClient();
-		HttpGet httpget = new HttpGet(uri);
-		String response = null;
+		Trigger t;
 		try {
-			response = httpclient.execute(httpget, responseHandler);
-		} catch (IOException e) {
-			throw e;
-		}
-		finally {
-			httpclient.getConnectionManager().shutdown();
-		}
-		
-		@SuppressWarnings("unchecked")
-		Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
-		String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
-		if (error != null) {
-			throw new IOException(error);
+			t = triggerLoader.loadTrigger(id);
+		} catch (TriggerLoaderException e) {
+			throw new TriggerManagerException(e);
 		}
-		
-		return jsonResponse;
+		updateTrigger(t);
 	}
 	
-	public Map<String, Object> callTriggerServerJMX(String hostPort, String action, String mBean) throws IOException {
-		URIBuilder builder = new URIBuilder();
-		
-		String[] hostPortSplit = hostPort.split(":");
-		builder.setScheme("http")
-			.setHost(hostPortSplit[0])
-			.setPort(Integer.parseInt(hostPortSplit[1]))
-			.setPath("/jmx");
-
-		builder.setParameter(action, "");
-		if (mBean != null) {
-			builder.setParameter(TriggerConnectorParams.JMX_MBEAN, mBean);
-		}
+	public synchronized void updateTrigger(Trigger t) throws TriggerManagerException {
+		runnerThread.deleteTrigger(triggerIdMap.get(t.getTriggerId()));
+		runnerThread.addTrigger(t);
+		triggerIdMap.put(t.getTriggerId(), t);
+	}
 
-		URI uri = null;
-		try {
-			uri = builder.build();
-		} catch (URISyntaxException e) {
-			throw new IOException(e);
-		}
-		
-		ResponseHandler<String> responseHandler = new BasicResponseHandler();
-		
-		HttpClient httpclient = new DefaultHttpClient();
-		HttpGet httpget = new HttpGet(uri);
-		String response = null;
+	public synchronized void removeTrigger(Trigger t) throws TriggerManagerException {
+		runnerThread.deleteTrigger(t);
+		triggerIdMap.remove(t.getTriggerId());
 		try {
-			response = httpclient.execute(httpget, responseHandler);
-		} catch (IOException e) {
-			throw e;
-		}
-		finally {
-			httpclient.getConnectionManager().shutdown();
-		}
-		
-		@SuppressWarnings("unchecked")
-		Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
-		String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
-		if (error != null) {
-			throw new IOException(error);
+			t.stopCheckers();
+			triggerLoader.removeTrigger(t);
+		} catch (TriggerLoaderException e) {
+			throw new TriggerManagerException(e);
 		}
-		return jsonResponse;
 	}
 	
-	public void shutdown() {
-		triggerManagingThread.shutdown();
+	public List<Trigger> getTriggers() {
+		return new ArrayList<Trigger>(triggerIdMap.values());
 	}
 	
-	private class TriggerManagerUpdaterThread extends Thread {
+	public Map<String, Class<? extends ConditionChecker>> getSupportedCheckers() {
+		return checkerTypeLoader.getSupportedCheckers();
+	}
+	
+	private class TriggerScannerThread extends Thread {
+		private BlockingQueue<Trigger> triggers;
 		private boolean shutdown = false;
-
-		public TriggerManagerUpdaterThread() {
-			this.setName("TriggerManagingThread");
+		//private AtomicBoolean stillAlive = new AtomicBoolean(true);
+		private final long scannerInterval;
+		
+		public TriggerScannerThread(long scannerInterval) {
+			triggers = new PriorityBlockingQueue<Trigger>(1, new TriggerComparator());
+			this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
+			this.scannerInterval = scannerInterval;;
 		}
 
-		private int waitTimeIdleMs = 2000;
-		private int waitTimeMs = 500;
-		
-		private void shutdown() {
+		public void shutdown() {
+			logger.error("Shutting down trigger manager thread " + this.getName());
 			shutdown = true;
+			//stillAlive.set(false);
+			this.interrupt();
+		}
+		
+		public synchronized void addTrigger(Trigger t) {
+			t.updateNextCheckTime();
+			triggers.add(t);
 		}
 		
-		@SuppressWarnings("unchecked")
+		public synchronized void deleteTrigger(Trigger t) {
+			triggers.remove(t);
+		}
+
 		public void run() {
+			//while(stillAlive.get()) {
 			while(!shutdown) {
-				try {
-					lastThreadCheckTime = System.currentTimeMillis();
-					
-					Pair<String, Integer> triggerServer = (Pair<String, Integer>) triggerServers.toArray()[0];
-					
-					Pair<String, String> updateTimeParam = new Pair<String, String>("lastUpdateTime", String.valueOf(lastUpdateTime));
-					Map<String, Object> results = null;
+				synchronized (this) {
 					try{
-						results = callTriggerServer(triggerServer.getFirst(), triggerServer.getSecond(), TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", updateTimeParam);
-//						lastUpdateTime = (Long) results.get(TriggerConnectorParams.RESPONSE_UPDATETIME);
-
-						List<Integer> updates = (List<Integer>) results.get("updates");
-						for(Integer update : updates) {
-							Trigger t = triggerLoader.loadTrigger(update);
-							lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
-							
-							if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
-								removeTrigger(t, "azkaban");
-								//triggerIdMap.remove(update);
-							} else {
-								triggerIdMap.put(update, t);
-							}
+						lastRunnerThreadCheckTime = System.currentTimeMillis();
+						
+						try{
+							checkAllTriggers();
+						} catch(Exception e) {
+							e.printStackTrace();
+							logger.error(e.getMessage());
+						} catch(Throwable t) {
+							t.printStackTrace();
+							logger.error(t.getMessage());
 						}
-					} catch (Exception e) {
-						e.printStackTrace();
-						logger.error(e);	
-					}
-					
-					synchronized(this) {
-						try {
-							if (triggerIdMap.size() > 0) {
-								this.wait(waitTimeMs);
-							}
-							else {
-								this.wait(waitTimeIdleMs);
-							}
-						} catch (InterruptedException e) {
+						
+						runnerThreadIdleTime = scannerInterval - (System.currentTimeMillis() - lastRunnerThreadCheckTime);
+						if(runnerThreadIdleTime < 0) {
+							logger.error("Trigger manager thread " + this.getName() + " is too busy!");
+						} else {
+							wait(runnerThreadIdleTime);
 						}
+					} catch(InterruptedException e) {
+						logger.info("Interrupted. Probably to shut down.");
 					}
-				}
-				catch (Exception e) {
-					logger.error(e);
+					
 				}
 			}
 		}
-	}
-	
-	private static class ConnectionInfo {
-		private String host;
-		private int port;
-
-		public ConnectionInfo(String host, int port) {
-			this.host = host;
-			this.port = port;
-		}
-
-		@SuppressWarnings("unused")
-		private ConnectionInfo getOuterType() {
-			return ConnectionInfo.this;
-		}
 		
-		public boolean isEqual(String host, int port) {
-			return this.port == port && this.host.equals(host);
+		private void checkAllTriggers() throws TriggerManagerException {
+			long now = System.currentTimeMillis();
+			for(Trigger t : triggers) {
+				if(t.getNextCheckTime() > now) {
+					logger.info("Skipping trigger" + t.getTriggerId() + " until " + t.getNextCheckTime());
+					continue;
+				}
+				logger.info("Checking trigger " + t.getTriggerId());
+				if(t.getStatus().equals(TriggerStatus.READY)) {
+					if(t.triggerConditionMet()) {
+						onTriggerTrigger(t);
+					} else if (t.expireConditionMet()) {
+						onTriggerExpire(t);
+					}
+				}
+				if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
+					removeTrigger(t);
+				} else {
+					t.updateNextCheckTime();
+				}
+			}
 		}
 		
-		public String getHost() {
-			return host;
+		private void onTriggerTrigger(Trigger t) throws TriggerManagerException {
+			List<TriggerAction> actions = t.getTriggerActions();
+			for(TriggerAction action : actions) {
+				try {
+					logger.info("Doing trigger actions");
+					action.doAction();
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					//throw new TriggerManagerException("action failed to execute", e);
+					logger.error("Failed to do action " + action.getDescription(), e);
+				} catch (Throwable th) {
+					logger.error("Failed to do action " + action.getDescription(), th);
+				}
+			}
+			if(t.isResetOnTrigger()) {
+				t.resetTriggerConditions();
+				t.resetExpireCondition();
+			} else {
+				t.setStatus(TriggerStatus.EXPIRED);
+			}
+			try {
+				triggerLoader.updateTrigger(t);
+			}
+			catch (TriggerLoaderException e) {
+				throw new TriggerManagerException(e);
+			}
+//			updateAgent(t);
 		}
 		
-		public int getPort() {
-			return port;
+		private void onTriggerExpire(Trigger t) throws TriggerManagerException {
+			List<TriggerAction> expireActions = t.getExpireActions();
+			for(TriggerAction action : expireActions) {
+				try {
+					logger.info("Doing expire actions");
+					action.doAction();
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					//throw new TriggerManagerException("action failed to execute", e);
+					logger.error("Failed to do expire action " + action.getDescription(), e);
+				} catch (Throwable th) {
+					logger.error("Failed to do expire action " + action.getDescription(), th);
+				}
+			}
+			if(t.isResetOnExpire()) {
+				t.resetTriggerConditions();
+				t.resetExpireCondition();
+//				updateTrigger(t);
+			} else {
+				t.setStatus(TriggerStatus.EXPIRED);
+			}
+			try {
+				triggerLoader.updateTrigger(t);
+			} catch (TriggerLoaderException e) {
+				throw new TriggerManagerException(e);
+			}
 		}
 		
-		@Override
-		public int hashCode() {
-			final int prime = 31;
-			int result = 1;
-			result = prime * result + ((host == null) ? 0 : host.hashCode());
-			result = prime * result + port;
-			return result;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (this == obj)
-				return true;
-			if (obj == null)
-				return false;
-			if (getClass() != obj.getClass())
-				return false;
-			ConnectionInfo other = (ConnectionInfo) obj;
-			if (host == null) {
-				if (other.host != null)
-					return false;
-			} else if (!host.equals(other.host))
-				return false;
-			if (port != other.port)
-				return false;
-			return true;
-		}
-	}
-
-	public void loadTriggerFromDir(File baseDir, Props props) throws Exception {
-		File[] triggerFiles = baseDir.listFiles(new SuffixFilter(TRIGGER_SUFFIX));
-
-		for(File triggerFile : triggerFiles) {
-			Props triggerProps = new Props(props, triggerFile);
-			String triggerType = triggerProps.getString("trigger.type");
-			TriggerAgent agent = triggerAgents.get(triggerType);
-			if(agent != null) {
-				agent.loadTriggerFromProps(triggerProps);
-			} else {
-				throw new Exception("Trigger " + triggerType + " is not supported.");
+		private class TriggerComparator implements Comparator<Trigger> {
+			@Override
+			public int compare(Trigger arg0, Trigger arg1) {
+				long first = arg1.getNextCheckTime();
+				long second = arg0.getNextCheckTime();
+				
+				if(first == second) {
+					return 0;
+				} else if (first < second) {
+					return 1;
+				}
+				return -1;
 			}
 		}
 	}
-
-	public List<Trigger> getTriggers() {
-		return new ArrayList<Trigger>(triggerIdMap.values());
+	
+	public synchronized Trigger getTrigger(int triggerId) {
+		return triggerIdMap.get(triggerId);
 	}
 
 	public void expireTrigger(int triggerId) {
-		// TODO Auto-generated method stub
-		
-	}
-
-	public CheckerTypeLoader getCheckerLoader() {
-		return checkerTypeLoader;
-	}
-
-	public ActionTypeLoader getActionLoader() {
-		return actionTypeLoader;
-	}
-
-	public void addTriggerAgent(String triggerSource,
-			TriggerAgent agent) {
-		triggerAgents.put(triggerSource, agent);
+		Trigger t = getTrigger(triggerId);
+		t.setStatus(TriggerStatus.EXPIRED);
+//		updateAgent(t);
 	}
 
 	public List<Trigger> getTriggers(String triggerSource) {
-		List<Trigger> results = new ArrayList<Trigger>();
+		List<Trigger> triggers = new ArrayList<Trigger>();
 		for(Trigger t : triggerIdMap.values()) {
 			if(t.getSource().equals(triggerSource)) {
-				results.add(t);
+				triggers.add(t);
 			}
 		}
-		return results;
+		return triggers;
 	}
 
-	public List<Trigger> getUpdatedTriggers(String triggerSource, long lastUpdateTime) throws TriggerManagerException {
-		getUpdatedTriggers();
+	@Override
+	public List<Trigger> getTriggerUpdates(String triggerSource, long lastUpdateTime) throws TriggerManagerException{
 		List<Trigger> triggers = new ArrayList<Trigger>();
 		for(Trigger t : triggerIdMap.values()) {
-			if(t.getSource().equals(triggerSource) && t.getLastModifyTime().getMillis() >= lastUpdateTime) {
+			if(t.getSource().equals(triggerSource) && t.getLastModifyTime() > lastUpdateTime) {
 				triggers.add(t);
 			}
 		}
 		return triggers;
 	}
+	
+	@Override
+	public List<Integer> getTriggerUpdates(long lastUpdateTime) throws TriggerManagerException {
+		List<Integer> triggers = new ArrayList<Integer>();
+		for(Trigger t : triggerIdMap.values()) {
+			if(t.getLastModifyTime() > lastUpdateTime) {
+				triggers.add(t.getTriggerId());
+			}
+		}
+		return triggers;
+	}
 
-	private void getUpdatedTriggers() throws TriggerManagerException {
-		List<Trigger> triggers = triggerLoader.getUpdatedTriggers(this.lastUpdateTime);
-		for(Trigger t : triggers) {
-			this.lastUpdateTime = Math.max(this.lastUpdateTime, t.getLastModifyTime().getMillis());
+	public void loadTrigger(int triggerId) throws TriggerManagerException {
+		Trigger t;
+		try {
+			t = triggerLoader.loadTrigger(triggerId);
+		} catch (TriggerLoaderException e) {
+			throw new TriggerManagerException(e);
+		}
+		if(t.getStatus().equals(TriggerStatus.PREPARING)) {
 			triggerIdMap.put(t.getTriggerId(), t);
+			runnerThread.addTrigger(t);
+			t.setStatus(TriggerStatus.READY);
 		}
 	}
 
-	public void removeTrigger(int scheduleId, String submitUser) throws TriggerManagerException {
-		removeTrigger(triggerIdMap.get(scheduleId), submitUser);
+	@Override
+	public void insertTrigger(Trigger t, String user) throws TriggerManagerException {
+		insertTrigger(t);
 	}
 
-	public Set<String> getAllActiveTriggerServerHosts() {
-		Set<String> hostport = new HashSet<String>();
-		hostport.add(triggerServerHost+":"+triggerServerPort);
-		return hostport;
+	@Override
+	public void removeTrigger(int id, String user) throws TriggerManagerException {
+		removeTrigger(id);
 	}
 
-	public int getNumTriggers() {
-		return triggerIdMap.size();
+	@Override
+	public void updateTrigger(int triggerId, String user) throws TriggerManagerException {
+		updateTrigger(triggerId);
 	}
 
-	public String getTriggerSources() {
-		Set<String> sources = new HashSet<String>();
-		for(Trigger t : triggerIdMap.values()) {
-			sources.add(t.getSource());
+	@Override
+	public void updateTrigger(Trigger t, String user) throws TriggerManagerException {
+		updateTrigger(t);
+	}
+	
+	@Override
+	public void insertTrigger(int triggerId, String user) throws TriggerManagerException {
+		Trigger t;
+		try {
+			t = triggerLoader.loadTrigger(triggerId);
+		} catch (TriggerLoaderException e) {
+			throw new TriggerManagerException(e);
+		}
+		if(t != null) {
+			insertTrigger(t);
 		}
-		return sources.toString();
+	}
+	
+	@Override
+	public void shutdown() {
+		runnerThread.shutdown();
 	}
 
-	public String getTriggerIds() {
-		return triggerIdMap.keySet().toString();
+	@Override
+	public TriggerJMX getJMX() {
+		return this.jmxStats;
 	}
+	
+	private class LocalTriggerJMX implements TriggerJMX {
 
+		@Override
+		public long getLastRunnerThreadCheckTime() {
+			// TODO Auto-generated method stub
+			return lastRunnerThreadCheckTime;
+		}
+
+		@Override
+		public boolean isRunnerThreadActive() {
+			// TODO Auto-generated method stub
+			return runnerThread.isAlive();
+		}
+
+		@Override
+		public String getPrimaryServerHost() {
+			return "local";
+		}
+
+		@Override
+		public int getNumTriggers() {
+			// TODO Auto-generated method stub
+			return triggerIdMap.size();
+		}
+
+		@Override
+		public String getTriggerSources() {
+			Set<String> sources = new HashSet<String>();
+			for(Trigger t : triggerIdMap.values()) {
+				sources.add(t.getSource());
+			}
+			return sources.toString();
+		}
+
+		@Override
+		public String getTriggerIds() {
+			return triggerIdMap.keySet().toString();
+		}
+
+		@Override
+		public long getScannerIdleTime() {
+			// TODO Auto-generated method stub
+			return runnerThreadIdleTime;
+		}
+
+		@Override
+		public Map<String, Object> getAllJMXMbeans() {
+			return new HashMap<String, Object>();
+		}
+		
+	}
+
+	@Override
+	public void registerCheckerType(String name, Class<? extends ConditionChecker> checker) {
+		checkerTypeLoader.registerCheckerType(name, checker);
+	}
+
+	@Override
+	public void registerActionType(String name, Class<? extends TriggerAction> action) {
+		actionTypeLoader.registerActionType(name, action);
+	}
 	
 	
 }
-
diff --git a/src/java/azkaban/trigger/TriggerManager.java.old b/src/java/azkaban/trigger/TriggerManager.java.old
new file mode 100644
index 0000000..3caf113
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerManager.java.old
@@ -0,0 +1,573 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.trigger;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.lang.Thread.State;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.log4j.Logger;
+
+import azkaban.triggerapp.TriggerConnectorParams;
+import azkaban.triggerapp.TriggerRunnerManager;
+import azkaban.triggerapp.TriggerRunnerManagerAdapter;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+
+/**
+ * Executor manager used to manage the client side job.
+ *
+ */
+public class TriggerManager {
+	private static Logger logger = Logger.getLogger(TriggerManager.class);
+
+	public static final String TRIGGER_SUFFIX = ".trigger";
+	
+	private TriggerLoader triggerLoader;
+	private CheckerTypeLoader checkerTypeLoader;
+	private ActionTypeLoader actionTypeLoader;
+	
+	private String triggerServerHost;
+	private int triggerServerPort;
+	
+	private TriggerRunnerManagerAdapter trmAdapter;
+	
+	private Set<Pair<String, Integer>> triggerServers = new HashSet<Pair<String,Integer>>();
+	
+	private ConcurrentHashMap<Integer, Trigger> triggerIdMap = new ConcurrentHashMap<Integer, Trigger>();
+	
+	private Map<String, TriggerAgent> triggerAgents = new HashMap<String, TriggerAgent>();
+
+	private TriggerManagerUpdaterThread triggerManagingThread;
+	
+	private long lastThreadCheckTime = -1;
+	
+	private long lastUpdateTime = -1;
+	
+	public TriggerManager(Props props, TriggerLoader loader) throws TriggerManagerException {
+		this.triggerLoader = loader;
+		this.checkerTypeLoader = new CheckerTypeLoader();
+		this.actionTypeLoader = new ActionTypeLoader();
+
+		String trmMode = props.getString("trigger.runner.manager.mode", "local");
+		
+		try {
+			if(trmMode.equals("local")) {
+				trmAdapter = loadTRMLocalAdapter(props, loader);
+			} else if(trmMode.equals("remote")) {
+				trmAdapter = loadTRMRemoteAdapter(props);
+			} else {
+				throw new TriggerManagerException("Unknown trigger runner manager mode " + trmMode);
+			}
+		} catch(Exception e) {
+			throw new TriggerManagerException("Failed to load Trigger Runner Manager: " + e.getMessage());
+		}
+		
+		triggerServerHost = props.getString("trigger.server.host", "localhost");
+		triggerServerPort = props.getInt("trigger.server.port");
+
+		triggerManagingThread = new TriggerManagerUpdaterThread();
+		
+		try{
+			checkerTypeLoader.init(props);
+			actionTypeLoader.init(props);
+		} catch(Exception e) {
+			e.printStackTrace();
+			logger.error(e.getMessage());
+		}
+		
+		Condition.setCheckerLoader(checkerTypeLoader);
+		Trigger.setActionTypeLoader(actionTypeLoader);
+		
+		triggerServers.add(new Pair<String, Integer>(triggerServerHost, triggerServerPort));
+
+	}
+	
+	private TriggerRunnerManagerAdapter loadTRMLocalAdapter(Props props, TriggerLoader loader) throws IOException {
+		return new TriggerRunnerManager(props, loader);
+	}
+	
+	private TriggerRunnerManagerAdapter loadTRMRemoteAdapter(Props props) {
+		return null;
+	}
+	
+	public void start() throws Exception {
+		loadTriggers();
+		for(TriggerAgent agent : triggerAgents.values()) {
+			agent.start();
+		}
+		triggerManagingThread.start();
+	}
+	
+	private static class SuffixFilter implements FileFilter {
+		private String suffix;
+		public SuffixFilter(String suffix) {
+			this.suffix = suffix;
+		}
+
+		@Override
+		public boolean accept(File pathname) {
+			String name = pathname.getName();
+			return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
+		}
+	}
+	
+	public String getTriggerServerHost() {
+		return triggerServerHost;
+	}
+	
+	public int getTriggerServerPort() {
+		return triggerServerPort;
+	}
+	
+	public State getUpdaterThreadState() {
+		return triggerManagingThread.getState();
+	}
+	
+	public boolean isThreadActive() {
+		return triggerManagingThread.isAlive();
+	}
+	
+	public long getLastThreadCheckTime() {
+		return lastThreadCheckTime;
+	}
+	
+	public Set<String> getPrimaryServerHosts() {
+		// Only one for now. More probably later.
+		HashSet<String> ports = new HashSet<String>();
+		ports.add(triggerServerHost + ":" + triggerServerPort);
+		return ports;
+	}
+	
+	private void loadTriggers() throws TriggerManagerException {
+		List<Trigger> triggerList;
+		try {
+			triggerList = triggerLoader.loadTriggers();
+		} catch (TriggerLoaderException e) {
+			throw new TriggerManagerException(e);
+		}
+		for(Trigger t : triggerList) {
+			if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
+				removeTrigger(t, "azkaban");
+			} else {
+				triggerIdMap.put(t.getTriggerId(), t);
+			}
+		}
+	}
+	
+	public Trigger getTrigger(int triggerId) {
+		return triggerIdMap.get(triggerId);
+	}
+	
+	public void removeTrigger(Trigger t, String userId) throws TriggerManagerException {
+		synchronized(t) {
+			logger.info("Removing trigger " + t.getTriggerId() + " by " + userId);
+			callTriggerServer(t, TriggerConnectorParams.REMOVE_TRIGGER_ACTION, userId);
+			triggerIdMap.remove(t.getTriggerId());
+		}
+	}
+
+	public void updateTrigger(Trigger t, String userId) throws TriggerManagerException {
+		synchronized(t) {
+			try {
+				triggerLoader.updateTrigger(t);
+				callTriggerServer(t, TriggerConnectorParams.UPDATE_TRIGGER_ACTION, userId);
+			} catch(Exception e) {
+				throw new TriggerManagerException(e);
+			}
+		}
+	}
+	
+//	public void getUpdatedTriggers() throws TriggerManagerException {
+//		try {
+//			callTriggerServer(triggerServerHost, triggerServerPort, TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", (Pair<String,String>[])null);
+//		} catch(IOException e) {
+//			throw new TriggerManagerException(e);
+//		}
+//	}
+	
+	public String insertTrigger(Trigger t, String userId) throws TriggerManagerException {
+		synchronized(t) {
+			String message = null;
+			logger.info("Inserting trigger into system. " );
+			// The trigger id is set by the loader. So it's unavailable until after this call.
+			t.setStatus(TriggerStatus.PREPARING);
+			try {
+				triggerLoader.addTrigger(t);
+				callTriggerServer(t,  TriggerConnectorParams.INSERT_TRIGGER_ACTION, userId);
+				triggerIdMap.put(t.getTriggerId(), t);
+				
+				message += "Trigger inserted successfully with trigger id " + t.getTriggerId();
+			}
+			catch (Exception e) {
+				throw new TriggerManagerException(e);
+			}
+			return message;
+		}
+	}
+	
+	private Map<String, Object> callTriggerServer(Trigger t, String action, String user) throws TriggerManagerException {
+		try {
+			return callTriggerServer(triggerServerHost, triggerServerPort, action, t.getTriggerId(), user, (Pair<String,String>[])null);
+		} catch (IOException e) {
+			throw new TriggerManagerException(e);
+		}
+	}
+	
+	private Map<String, Object> callTriggerServer(String host, int port, String action, Integer triggerId, String user, Pair<String,String> ... params) throws IOException {
+		URIBuilder builder = new URIBuilder();
+		builder.setScheme("http")
+			.setHost(host)
+			.setPort(port)
+			.setPath("/trigger");
+
+		builder.setParameter(TriggerConnectorParams.ACTION_PARAM, action);
+		
+		if (triggerId != null) {
+			builder.setParameter(TriggerConnectorParams.TRIGGER_ID_PARAM,String.valueOf(triggerId));
+		}
+		
+		if (user != null) {
+			builder.setParameter(TriggerConnectorParams.USER_PARAM, user);
+		}
+		
+		if (params != null) {
+			for (Pair<String, String> pair: params) {
+				builder.setParameter(pair.getFirst(), pair.getSecond());
+			}
+		}
+
+		URI uri = null;
+		try {
+			uri = builder.build();
+		} catch (URISyntaxException e) {
+			throw new IOException(e);
+		}
+		
+		ResponseHandler<String> responseHandler = new BasicResponseHandler();
+		
+		HttpClient httpclient = new DefaultHttpClient();
+		HttpGet httpget = new HttpGet(uri);
+		String response = null;
+		try {
+			response = httpclient.execute(httpget, responseHandler);
+		} catch (IOException e) {
+			throw e;
+		}
+		finally {
+			httpclient.getConnectionManager().shutdown();
+		}
+		
+		@SuppressWarnings("unchecked")
+		Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+		String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
+		if (error != null) {
+			throw new IOException(error);
+		}
+		
+		return jsonResponse;
+	}
+	
+	public Map<String, Object> callTriggerServerJMX(String hostPort, String action, String mBean) throws IOException {
+		URIBuilder builder = new URIBuilder();
+		
+		String[] hostPortSplit = hostPort.split(":");
+		builder.setScheme("http")
+			.setHost(hostPortSplit[0])
+			.setPort(Integer.parseInt(hostPortSplit[1]))
+			.setPath("/jmx");
+
+		builder.setParameter(action, "");
+		if (mBean != null) {
+			builder.setParameter(TriggerConnectorParams.JMX_MBEAN, mBean);
+		}
+
+		URI uri = null;
+		try {
+			uri = builder.build();
+		} catch (URISyntaxException e) {
+			throw new IOException(e);
+		}
+		
+		ResponseHandler<String> responseHandler = new BasicResponseHandler();
+		
+		HttpClient httpclient = new DefaultHttpClient();
+		HttpGet httpget = new HttpGet(uri);
+		String response = null;
+		try {
+			response = httpclient.execute(httpget, responseHandler);
+		} catch (IOException e) {
+			throw e;
+		}
+		finally {
+			httpclient.getConnectionManager().shutdown();
+		}
+		
+		@SuppressWarnings("unchecked")
+		Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+		String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
+		if (error != null) {
+			throw new IOException(error);
+		}
+		return jsonResponse;
+	}
+	
+	public void shutdown() {
+		triggerManagingThread.shutdown();
+	}
+	
+	private class TriggerManagerUpdaterThread extends Thread {
+		private boolean shutdown = false;
+
+		public TriggerManagerUpdaterThread() {
+			this.setName("TriggerManagingThread");
+		}
+
+		private int waitTimeIdleMs = 2000;
+		private int waitTimeMs = 500;
+		
+		private void shutdown() {
+			shutdown = true;
+		}
+		
+		@SuppressWarnings("unchecked")
+		public void run() {
+			while(!shutdown) {
+				try {
+					lastThreadCheckTime = System.currentTimeMillis();
+					
+					Pair<String, Integer> triggerServer = (Pair<String, Integer>) triggerServers.toArray()[0];
+					
+					Pair<String, String> updateTimeParam = new Pair<String, String>("lastUpdateTime", String.valueOf(lastUpdateTime));
+					Map<String, Object> results = null;
+					try{
+						results = callTriggerServer(triggerServer.getFirst(), triggerServer.getSecond(), TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", updateTimeParam);
+//						lastUpdateTime = (Long) results.get(TriggerConnectorParams.RESPONSE_UPDATETIME);
+
+						List<Integer> updates = (List<Integer>) results.get("updates");
+						for(Integer update : updates) {
+							Trigger t = triggerLoader.loadTrigger(update);
+							lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
+							
+							if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
+								removeTrigger(t, "azkaban");
+								//triggerIdMap.remove(update);
+							} else {
+								triggerIdMap.put(update, t);
+							}
+						}
+					} catch (Exception e) {
+						e.printStackTrace();
+						logger.error(e);	
+					}
+					
+					synchronized(this) {
+						try {
+							if (triggerIdMap.size() > 0) {
+								this.wait(waitTimeMs);
+							}
+							else {
+								this.wait(waitTimeIdleMs);
+							}
+						} catch (InterruptedException e) {
+						}
+					}
+				}
+				catch (Exception e) {
+					logger.error(e);
+				}
+			}
+		}
+	}
+	
+	private static class ConnectionInfo {
+		private String host;
+		private int port;
+
+		public ConnectionInfo(String host, int port) {
+			this.host = host;
+			this.port = port;
+		}
+
+		@SuppressWarnings("unused")
+		private ConnectionInfo getOuterType() {
+			return ConnectionInfo.this;
+		}
+		
+		public boolean isEqual(String host, int port) {
+			return this.port == port && this.host.equals(host);
+		}
+		
+		public String getHost() {
+			return host;
+		}
+		
+		public int getPort() {
+			return port;
+		}
+		
+		@Override
+		public int hashCode() {
+			final int prime = 31;
+			int result = 1;
+			result = prime * result + ((host == null) ? 0 : host.hashCode());
+			result = prime * result + port;
+			return result;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (this == obj)
+				return true;
+			if (obj == null)
+				return false;
+			if (getClass() != obj.getClass())
+				return false;
+			ConnectionInfo other = (ConnectionInfo) obj;
+			if (host == null) {
+				if (other.host != null)
+					return false;
+			} else if (!host.equals(other.host))
+				return false;
+			if (port != other.port)
+				return false;
+			return true;
+		}
+	}
+
+	public void loadTriggerFromDir(File baseDir, Props props) throws TriggerManagerException {
+		File[] triggerFiles = baseDir.listFiles(new SuffixFilter(TRIGGER_SUFFIX));
+
+		for(File triggerFile : triggerFiles) {
+			try{
+				Props triggerProps = new Props(props, triggerFile);
+				String triggerType = triggerProps.getString("trigger.type");
+				TriggerAgent agent = triggerAgents.get(triggerType);
+				if(agent != null) {
+					agent.loadTriggerFromProps(triggerProps);
+				} else {
+					throw new TriggerManagerException("Trigger " + triggerType + " is not supported.");
+				}
+			} catch (Exception e) {
+				throw new TriggerManagerException(e);
+			}
+		}
+	}
+
+	public List<Trigger> getTriggers() {
+		return new ArrayList<Trigger>(triggerIdMap.values());
+	}
+
+	public void expireTrigger(int triggerId) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	public CheckerTypeLoader getCheckerLoader() {
+		return checkerTypeLoader;
+	}
+
+	public ActionTypeLoader getActionLoader() {
+		return actionTypeLoader;
+	}
+
+	public void addTriggerAgent(String triggerSource,
+			TriggerAgent agent) {
+		triggerAgents.put(triggerSource, agent);
+	}
+
+	public List<Trigger> getTriggers(String triggerSource) {
+		List<Trigger> results = new ArrayList<Trigger>();
+		for(Trigger t : triggerIdMap.values()) {
+			if(t.getSource().equals(triggerSource)) {
+				results.add(t);
+			}
+		}
+		return results;
+	}
+
+	public List<Trigger> getUpdatedTriggers(String triggerSource, long lastUpdateTime) throws TriggerManagerException {
+		getUpdatedTriggers();
+		List<Trigger> triggers = new ArrayList<Trigger>();
+		for(Trigger t : triggerIdMap.values()) {
+			if(t.getSource().equals(triggerSource) && t.getLastModifyTime().getMillis() >= lastUpdateTime) {
+				triggers.add(t);
+			}
+		}
+		return triggers;
+	}
+
+	private void getUpdatedTriggers() throws TriggerManagerException {
+		List<Trigger> triggers;
+		try {
+			triggers = triggerLoader.getUpdatedTriggers(this.lastUpdateTime);
+		} catch (TriggerLoaderException e) {
+			throw new TriggerManagerException(e);
+		}
+		for(Trigger t : triggers) {
+			this.lastUpdateTime = Math.max(this.lastUpdateTime, t.getLastModifyTime().getMillis());
+			triggerIdMap.put(t.getTriggerId(), t);
+		}
+	}
+
+	public void removeTrigger(int triggerId, String submitUser) throws TriggerManagerException {
+		removeTrigger(triggerIdMap.get(triggerId), submitUser);
+	}
+
+	public Set<String> getAllActiveTriggerServerHosts() {
+		Set<String> hostport = new HashSet<String>();
+		hostport.add(triggerServerHost+":"+triggerServerPort);
+		return hostport;
+	}
+
+	public int getNumTriggers() {
+		return triggerIdMap.size();
+	}
+
+	public String getTriggerSources() {
+		Set<String> sources = new HashSet<String>();
+		for(Trigger t : triggerIdMap.values()) {
+			sources.add(t.getSource());
+		}
+		return sources.toString();
+	}
+
+	public String getTriggerIds() {
+		return triggerIdMap.keySet().toString();
+	}
+
+	
+	
+}
+
diff --git a/src/java/azkaban/trigger/TriggerManagerAdapter.java b/src/java/azkaban/trigger/TriggerManagerAdapter.java
new file mode 100644
index 0000000..2ce5df5
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerManagerAdapter.java
@@ -0,0 +1,47 @@
+package azkaban.trigger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTimeZone;
+
+import azkaban.triggerapp.TriggerRunnerManagerException;
+
+public interface TriggerManagerAdapter {
+	public void insertTrigger(Trigger t, String user) throws TriggerManagerException;
+	
+	public void removeTrigger(int id, String user) throws TriggerManagerException;
+	
+	public void updateTrigger(int triggerId, String user) throws TriggerManagerException;
+	
+	void updateTrigger(Trigger t, String user) throws TriggerManagerException;
+
+	public void insertTrigger(int triggerId, String user) throws TriggerManagerException;
+
+	public List<Integer> getTriggerUpdates(long lastUpdateTime) throws TriggerManagerException;
+	
+	public List<Trigger> getTriggerUpdates(String triggerSource, long lastUpdateTime) throws TriggerManagerException;
+
+	public void start() throws TriggerManagerException;
+	
+	public void shutdown();
+
+	public void registerCheckerType(String name, Class<? extends ConditionChecker> checker);
+	
+	public void registerActionType(String name, Class<? extends TriggerAction> action);
+	
+	public TriggerJMX getJMX();
+	
+	public interface TriggerJMX {
+		public long getLastRunnerThreadCheckTime();
+		public boolean isRunnerThreadActive();
+		public String getPrimaryServerHost();
+		public int getNumTriggers();
+		public String getTriggerSources();
+		public String getTriggerIds();
+		public long getScannerIdleTime();
+		public Map<String, Object> getAllJMXMbeans();
+	}
+	
+}
diff --git a/src/java/azkaban/trigger/TriggerManagerRemoteAdapter.java b/src/java/azkaban/trigger/TriggerManagerRemoteAdapter.java
new file mode 100644
index 0000000..4f9cab9
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerManagerRemoteAdapter.java
@@ -0,0 +1,181 @@
+package azkaban.trigger;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import azkaban.triggerapp.TriggerConnectorParams;
+import azkaban.triggerapp.TriggerRunnerManagerException;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+
+public class TriggerManagerRemoteAdapter implements TriggerManagerAdapter{
+
+	private final String host;
+	private final int port;
+	
+	public TriggerManagerRemoteAdapter(Props props) {
+		host = props.getString("trigger.server.host", "localhost");
+		port = props.getInt("trigger.server.port");
+	}
+	
+	@Override
+	public void insertTrigger(Trigger t, String user) throws TriggerManagerException {
+		try {
+			callRemoteTriggerRunnerManager(TriggerConnectorParams.INSERT_TRIGGER_ACTION, t.getTriggerId(), user, (Pair<String,String>[])null);
+		} catch(IOException e) {
+			throw new TriggerManagerException(e);
+		}
+	}
+
+	@Override
+	public void removeTrigger(int id, String user) throws TriggerManagerException {
+		try {
+			callRemoteTriggerRunnerManager(TriggerConnectorParams.REMOVE_TRIGGER_ACTION, id, user, (Pair<String,String>[])null);
+		} catch(IOException e) {
+			throw new TriggerManagerException(e);
+		}
+	}
+
+	@Override
+	public void updateTrigger(int triggerId, String user) throws TriggerManagerException {
+		try {
+			callRemoteTriggerRunnerManager(TriggerConnectorParams.UPDATE_TRIGGER_ACTION, triggerId, user, (Pair<String,String>[])null);
+		} catch(IOException e) {
+			throw new TriggerManagerException(e);
+		}
+	}
+	
+	private Map<String, Object> callRemoteTriggerRunnerManager(String action, Integer triggerId, String user, Pair<String,String> ... params) throws IOException {
+		URIBuilder builder = new URIBuilder();
+		builder.setScheme("http")
+			.setHost(host)
+			.setPort(port)
+			.setPath(TriggerManagerServlet.WEB_PATH);
+
+		builder.setParameter(TriggerConnectorParams.ACTION_PARAM, action);
+		
+		if (triggerId != null) {
+			builder.setParameter(TriggerConnectorParams.TRIGGER_ID_PARAM,String.valueOf(triggerId));
+		}
+		
+		if (user != null) {
+			builder.setParameter(TriggerConnectorParams.USER_PARAM, user);
+		}
+		
+		if (params != null) {
+			for (Pair<String, String> pair: params) {
+				builder.setParameter(pair.getFirst(), pair.getSecond());
+			}
+		}
+
+		URI uri = null;
+		try {
+			uri = builder.build();
+		} catch (URISyntaxException e) {
+			throw new IOException(e);
+		}
+		
+		ResponseHandler<String> responseHandler = new BasicResponseHandler();
+		
+		HttpClient httpclient = new DefaultHttpClient();
+		HttpGet httpget = new HttpGet(uri);
+		String response = null;
+		try {
+			response = httpclient.execute(httpget, responseHandler);
+		} catch (IOException e) {
+			throw e;
+		}
+		finally {
+			httpclient.getConnectionManager().shutdown();
+		}
+		
+		@SuppressWarnings("unchecked")
+		Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+		String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
+		if (error != null) {
+			throw new IOException(error);
+		}
+		
+		return jsonResponse;
+	}
+
+	@Override
+	public void start() {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public void insertTrigger(int triggerId, String user) throws TriggerManagerException {
+		try {
+			callRemoteTriggerRunnerManager(TriggerConnectorParams.INSERT_TRIGGER_ACTION, triggerId, user, (Pair<String,String>[])null);
+		} catch(IOException e) {
+			throw new TriggerManagerException(e);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public List<Integer> getTriggerUpdates(long lastUpdateTime) throws TriggerManagerException {
+		List<Integer> updated;
+		try {
+			Map<String, Object> response = callRemoteTriggerRunnerManager(TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", (Pair<String,String>[])null);
+			updated = (List<Integer>) response.get(TriggerConnectorParams.RESPONSE_UPDATED_TRIGGERS);
+			return updated;
+		} catch(IOException e) {
+			throw new TriggerManagerException(e);
+		}
+		
+	}
+
+	@Override
+	public void updateTrigger(Trigger t, String user)
+			throws TriggerManagerException {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public List<Trigger> getTriggerUpdates(String triggerSource,
+			long lastUpdateTime) throws TriggerManagerException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public void shutdown() {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public void registerCheckerType(String name, Class<? extends ConditionChecker> checker) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public void registerActionType(String name,
+			Class<? extends TriggerAction> action) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public TriggerJMX getJMX() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+}
diff --git a/src/java/azkaban/trigger/TriggerManagerServlet.java b/src/java/azkaban/trigger/TriggerManagerServlet.java
new file mode 100644
index 0000000..3595e82
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerManagerServlet.java
@@ -0,0 +1,158 @@
+package azkaban.trigger;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.joda.time.DateTime;
+
+import azkaban.execapp.AzkabanExecutorServer;
+import azkaban.execapp.FlowRunnerManager;
+import azkaban.executor.ConnectorParams;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.triggerapp.AzkabanTriggerServer;
+import azkaban.triggerapp.TriggerConnectorParams;
+import azkaban.triggerapp.TriggerRunnerManagerException;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.FileIOUtils.JobMetaData;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.webapp.servlet.AbstractServiceServlet;
+import azkaban.webapp.servlet.AzkabanServletContextListener;
+
+public class TriggerManagerServlet extends AbstractServiceServlet implements TriggerConnectorParams {
+	private static final long serialVersionUID = 1L;
+	private static final Logger logger = Logger.getLogger(TriggerManagerServlet.class.getName());
+	public static final String JSON_MIME_TYPE = "application/json";
+
+	private AzkabanTriggerServer application;
+	private TriggerManager triggerManager;
+	
+	public static final String WEB_PATH = "/triggermanager";
+	
+	public TriggerManagerServlet() {
+		super();
+	}
+	
+	@Override
+	public void init(ServletConfig config) throws ServletException {
+		application = (AzkabanTriggerServer) config.getServletContext().getAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
+
+		if (application == null) {
+			throw new IllegalStateException(
+					"No batch application is defined in the servlet context!");
+		}
+
+		triggerManager = application.getTriggerManager();
+	}
+	
+	@Override
+	public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+		HashMap<String,Object> respMap= new HashMap<String,Object>();
+		//logger.info("ExecutorServer called by " + req.getRemoteAddr());
+		try {
+			if (!hasParam(req, ACTION_PARAM)) {
+				logger.error("Parameter action not set");
+				respMap.put("error", "Parameter action not set");
+			}
+			else {
+				String action = getParam(req, ACTION_PARAM);
+				if (action.equals(GET_UPDATE_ACTION)) {
+					//logger.info("Updated called");
+					handleAjaxGetUpdateRequest(req, respMap);
+				}
+				else if (action.equals(PING_ACTION)) {
+					respMap.put("status", "alive");
+				}
+				else {
+					int triggerId = Integer.parseInt(getParam(req, TRIGGER_ID_PARAM));
+					String user = getParam(req, USER_PARAM, null);
+					
+					logger.info("User " + user + " has called action " + action + " on " + triggerId);
+					if (action.equals(INSERT_TRIGGER_ACTION)) {
+						logger.info("Insert Trigger Action");
+						handleInsertTrigger(triggerId, user, req, resp, respMap);
+					} else if (action.equals(REMOVE_TRIGGER_ACTION)) {
+						logger.info("Remove Trigger Action");
+						handleRemoveTrigger(triggerId, user, req, resp, respMap);
+					} 
+					else if (action.equals(UPDATE_TRIGGER_ACTION)) {
+						logger.info("Update Trigger Action");
+						handleUpdateTrigger(triggerId, user, req, respMap);
+					}
+					else {
+						logger.error("action: '" + action + "' not supported.");
+						respMap.put("error", "action: '" + action + "' not supported.");
+					}
+				}
+			}
+		} catch (Exception e) {
+			logger.error(e);
+			respMap.put(RESPONSE_ERROR, e.getMessage());
+		}
+		writeJSON(resp, respMap);
+		resp.flushBuffer();
+	}
+	
+	
+
+	private void handleAjaxGetUpdateRequest(HttpServletRequest req, HashMap<String, Object> respMap) {
+		List<Integer> updates = null;
+		try{
+			long lastUpdateTime = getLongParam(req, "lastUpdateTime");
+//			respMap.put(TriggerConnectorParams.RESPONSE_UPDATETIME, DateTime.now().getMillis());
+			updates = triggerManager.getTriggerUpdates(lastUpdateTime);
+			if(updates.size() > 0) {
+				System.out.println("got " + updates.size() + " updates" );
+			}
+			respMap.put("updates", updates);
+		} catch (Exception e) {
+			logger.error(e);
+			respMap.put("error", e.getMessage());
+		}
+	}
+
+	private void handleInsertTrigger(int triggerId, String user, HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> respMap) {
+		try {
+			triggerManager.insertTrigger(triggerId, user);
+		} catch (TriggerManagerException e) {
+			logger.error(e);
+			respMap.put("error", e.getMessage());
+		}
+	}
+	
+	private void handleUpdateTrigger(int triggerId, String user, HttpServletRequest req, HashMap<String, Object> respMap) {
+		try {
+			triggerManager.updateTrigger(triggerId, user);
+		} catch (TriggerManagerException e) {
+			logger.error(e);
+			respMap.put("error", e.getMessage());
+		}
+	}
+
+	private void handleRemoveTrigger(int triggerId, String user, HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> respMap) {
+		try {
+			triggerManager.removeTrigger(triggerId, user);
+		} catch (TriggerManagerException e) {
+			logger.error(e);
+			respMap.put("error", e.getMessage());
+		}
+	}
+
+	@Override
+	public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+		
+	}
+
+}
diff --git a/src/java/azkaban/triggerapp/AzkabanTriggerServer.java b/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
index 5ed1c41..ffef121 100644
--- a/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
+++ b/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
@@ -28,20 +28,23 @@ import org.mortbay.thread.QueuedThreadPool;
 
 import azkaban.executor.ExecutorMailer;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
+import azkaban.executor.ExecutorManagerRemoteAdapter;
 import azkaban.executor.JdbcExecutorLoader;
 import azkaban.executor.ExecutorManager.Alerter;
-import azkaban.jmx.JmxExecutorManager;
+import azkaban.jmx.JmxExecutorManagerAdapter;
 import azkaban.jmx.JmxJettyServer;
-import azkaban.jmx.JmxTriggerRunnerManager;
+import azkaban.jmx.JmxTriggerManager;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectManager;
-import azkaban.trigger.ActionTypeLoader;
-import azkaban.trigger.CheckerTypeLoader;
 import azkaban.trigger.JdbcTriggerLoader;
 import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerAdapter;
+import azkaban.trigger.TriggerManagerServlet;
 import azkaban.trigger.builtin.BasicTimeChecker;
 import azkaban.trigger.builtin.CreateTriggerAction;
-import azkaban.trigger.builtin.ExecutionChecker;
 import azkaban.trigger.builtin.ExecuteFlowAction;
 import azkaban.trigger.builtin.KillExecutionAction;
 import azkaban.trigger.builtin.SlaAlertAction;
@@ -70,7 +73,7 @@ public class AzkabanTriggerServer {
 	private static AzkabanTriggerServer app;
 	
 	private TriggerLoader triggerLoader;
-	private TriggerRunnerManager triggerRunnerManager;
+	private TriggerManager triggerManager;
 	private ExecutorManager executorManager;
 	private ProjectManager projectManager;
 	private Props props;
@@ -102,23 +105,23 @@ public class AzkabanTriggerServer {
 		Context root = new Context(server, "/", Context.SESSIONS);
 		root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);
 		
-		root.addServlet(new ServletHolder(new TriggerServerServlet()), "/trigger");
+		root.addServlet(new ServletHolder(new TriggerManagerServlet()), TriggerManagerServlet.WEB_PATH);
 		root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
 		root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, this);
 		
 		triggerLoader = createTriggerLoader(props);
 		projectManager = loadProjectManager(props);
 		executorManager = loadExecutorManager(props);
-		triggerRunnerManager = loadTriggerRunnerManager(props, triggerLoader);
+		triggerManager = loadTriggerManager(props, triggerLoader);
 		
 		String triggerPluginDir = props.getString("trigger.plugin.dir", "plugins/triggers");
-		loadBuiltinCheckersAndActions(this);
-		loadPluginCheckersAndActions(triggerPluginDir, this);
+		loadBuiltinCheckersAndActions();
+		loadPluginCheckersAndActions(triggerPluginDir);
 		
 		configureMBeanServer();
 		
 		try {
-			triggerRunnerManager.start();
+			triggerManager.start();
 			server.start();
 		} 
 		catch (Exception e) {
@@ -132,20 +135,58 @@ public class AzkabanTriggerServer {
 	
 	
 	
-	private TriggerRunnerManager loadTriggerRunnerManager(Props props, TriggerLoader triggerLoader) throws IOException {
-		logger.info("Loading trigger runner manager");
-		TriggerRunnerManager trm = new TriggerRunnerManager(props, triggerLoader);
-		trm.init();
+	private TriggerManager loadTriggerManager(Props props, TriggerLoader triggerLoader) throws Exception {
+		logger.info("Loading trigger manager");
+		TriggerManager trm;
+		try {
+			trm = new TriggerManager(props, triggerLoader);
+		} catch (TriggerManagerException e) {
+			throw new Exception(e);
+		}
 		return trm;
 	}
 	
+	private TriggerManagerAdapter loadTriggerRunnerManagerAdapter(Props props, TriggerLoader triggerLoader) throws Exception {
+		TriggerManagerAdapter trmAdapter;
+		String trmMode = props.getString("trigger.runner.manager.mode", "local");
+		try {
+			if(trmMode.equals("local")) {
+				trmAdapter = new TriggerManager(props, triggerLoader);
+			} else if(trmMode.equals("remote")) {
+				trmAdapter = null;
+			} else {
+				throw new TriggerManagerException("Unknown trigger runner manager mode " + trmMode);
+			}
+		} catch(Exception e) {
+			throw new Exception("Failed to load Trigger Runner Manager: " + e.getMessage());
+		}
+		return trmAdapter;
+	}
+	
 	private ExecutorManager loadExecutorManager(Props props) throws Exception {
 		logger.info("Loading executor manager");
 		JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
-		ExecutorManager execManager = new ExecutorManager(props, loader, false);
+		ExecutorManager execManager = new ExecutorManager(props, loader);
 		return execManager;
 	}
 	
+	private ExecutorManagerAdapter loadExecutorManagerAdapter(Props props) throws Exception {
+//		JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
+//		ExecutorManager execManager = new ExecutorManager(props, loader, true);
+//		return execManager;
+		String executorMode = props.getString("executor.manager.mode", "local");
+		ExecutorManagerAdapter adapter;
+		if(executorMode.equals("local")) {
+			adapter = loadExecutorManager(props);
+		} else if(executorMode.equals("remote")) {
+			JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
+			adapter = new ExecutorManagerRemoteAdapter(props, loader);
+		} else {
+			throw new Exception("Unknown ExecutorManager mode " + executorMode);
+		}
+		return adapter;
+	}
+	
 	private ProjectManager loadProjectManager(Props props) {
 		logger.info("Loading project manager");
 		JdbcProjectLoader loader = new JdbcProjectLoader(props);
@@ -153,42 +194,31 @@ public class AzkabanTriggerServer {
 		return manager;
 	}
 	
-	private void loadBuiltinCheckersAndActions(AzkabanTriggerServer app) {
+	private void loadBuiltinCheckersAndActions() {
 		logger.info("Loading built-in checker and action types");
-//		ExecutorManager executorManager = app.getExecutorManager();
-//		TriggerRunnerManager triggerRunnerManager = app.getTriggerRunnerManager();
-		CheckerTypeLoader checkerLoader = triggerRunnerManager.getCheckerLoader();
-		ActionTypeLoader actionLoader = triggerRunnerManager.getActionLoader();
-		// time:
-		checkerLoader.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
-//		// execution checker
-//		ExecutionChecker.setExecutorManager(executorManager);
-//		checkerLoader.registerCheckerType(ExecutionChecker.type, ExecutionChecker.class);
-		// Sla checker
-		SlaChecker.setExecutorManager(executorManager);
-		checkerLoader.registerCheckerType(SlaChecker.type, SlaChecker.class);
 		
-		// execut flow action
-		ExecuteFlowAction.setExecutorManager(executorManager);
-		ExecuteFlowAction.setProjectManager(projectManager);
-		ExecuteFlowAction.setTriggerRunnerManager(triggerRunnerManager);
-		actionLoader.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
-		// kill flow action
-		KillExecutionAction.setExecutorManager(executorManager);
-		actionLoader.registerActionType(KillExecutionAction.type, KillExecutionAction.class);
-		// sla alert
-		SlaAlertAction.setExecutorManager(executorManager);
-		Map<String, Alerter> alerters = loadAlerters(props);
-		SlaAlertAction.setAlerters(alerters);
-		SlaAlertAction.setExecutorManager(executorManager);
-		actionLoader.registerActionType(SlaAlertAction.type, SlaAlertAction.class);
-		// create trigger action
-		CreateTriggerAction.setTriggerRunnerManager(triggerRunnerManager);
-		actionLoader.registerActionType(CreateTriggerAction.type, CreateTriggerAction.class);
+		if(triggerManager instanceof TriggerManager) {
+			SlaChecker.setExecutorManager(executorManager);
+			ExecuteFlowAction.setExecutorManager(executorManager);
+			ExecuteFlowAction.setProjectManager(projectManager);
+			ExecuteFlowAction.setTriggerManager(triggerManager);
+			KillExecutionAction.setExecutorManager(executorManager);
+			SlaAlertAction.setExecutorManager(executorManager);
+			Map<String, Alerter> alerters = loadAlerters(props);
+			SlaAlertAction.setAlerters(alerters);
+			SlaAlertAction.setExecutorManager(executorManager);
+			CreateTriggerAction.setTriggerManager(triggerManager);
+		}
 
+		triggerManager.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
+		triggerManager.registerCheckerType(SlaChecker.type, SlaChecker.class);
+		triggerManager.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
+		triggerManager.registerActionType(KillExecutionAction.type, KillExecutionAction.class);
+		triggerManager.registerActionType(SlaAlertAction.type, SlaAlertAction.class);
+		triggerManager.registerActionType(CreateTriggerAction.type, CreateTriggerAction.class);
 	}
 	
-	private void loadPluginCheckersAndActions(String pluginPath, AzkabanTriggerServer app) {
+	private void loadPluginCheckersAndActions(String pluginPath) {
 		logger.info("Loading plug-in checker and action types");
 		File triggerPluginPath = new File(pluginPath);
 		if (!triggerPluginPath.exists()) {
@@ -196,7 +226,7 @@ public class AzkabanTriggerServer {
 			return;
 		}
 			
-		ClassLoader parentLoader = AzkabanTriggerServer.class.getClassLoader();
+		ClassLoader parentLoader = this.getClass().getClassLoader();
 		File[] pluginDirs = triggerPluginPath.listFiles();
 		ArrayList<String> jarPaths = new ArrayList<String>();
 		for (File pluginDir: pluginDirs) {
@@ -577,8 +607,14 @@ public class AzkabanTriggerServer {
 		mbeanServer = ManagementFactory.getPlatformMBeanServer();
 
 		registerMbean("triggerServerJetty", new JmxJettyServer(server));
-		registerMbean("triggerRunnerManager", new JmxTriggerRunnerManager(triggerRunnerManager));
-		registerMbean("executorManager", new JmxExecutorManager(executorManager));
+//		if(triggerRunnerManager instanceof TriggerRunnerManagerLocalAdapter) {
+//			registerMbean("triggerRunnerManager", new JmxTriggerRunnerManager(((TriggerRunnerManagerLocalAdapter)triggerRunnerManager).getTriggerRunnerManager()));
+//		}
+		registerMbean("triggerManager", new JmxTriggerManager(triggerManager));
+//		if(executorManager instanceof ExecutorManagerLocalAdapter) {
+//			registerMbean("executorManager", new JmxExecutorManager(((ExecutorManagerLocalAdapter)executorManager).getExecutorManager()));
+//		}
+		registerMbean("executorManager", new JmxExecutorManagerAdapter(executorManager));
 	}
 	
 	public void close() {
@@ -603,7 +639,14 @@ public class AzkabanTriggerServer {
 		} catch (Exception e) {
 			logger.error("Error registering mbean " + mbeanClass.getCanonicalName(), e);
 		}
-
+//		if(executorManager instanceof ExecutorManagerLocalAdapter) {
+//			((ExecutorManagerLocalAdapter)executorManager).getExecutorManager().shutdown();
+//		}
+//		if(triggerRunnerManager instanceof TriggerRunnerManagerLocalAdapter) {
+//			
+//		}
+		executorManager.shutdown();
+		triggerManager.shutdown();
 	}
 	
 	public List<ObjectName> getMbeanNames() {
@@ -628,8 +671,8 @@ public class AzkabanTriggerServer {
 		}
 	}
 
-	public TriggerRunnerManager getTriggerRunnerManager() {
-		return triggerRunnerManager;
+	public TriggerManager getTriggerManager() {
+		return triggerManager;
 	}
 	
 	public ExecutorManager getExecutorManager() {
diff --git a/src/java/azkaban/triggerapp/TriggerRunnerManagerException.java b/src/java/azkaban/triggerapp/TriggerRunnerManagerException.java
new file mode 100644
index 0000000..4254c7a
--- /dev/null
+++ b/src/java/azkaban/triggerapp/TriggerRunnerManagerException.java
@@ -0,0 +1,33 @@
+package azkaban.triggerapp;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class TriggerRunnerManagerException extends Exception{
+	private static final long serialVersionUID = 1L;
+
+	public TriggerRunnerManagerException(String message) {
+		super(message);
+	}
+	
+	public TriggerRunnerManagerException(String message, Throwable cause) {
+		super(message, cause);
+	}
+	
+	public TriggerRunnerManagerException(Throwable e) {
+		super(e);
+	}
+}
diff --git a/src/java/azkaban/webapp/AzkabanServer.java b/src/java/azkaban/webapp/AzkabanServer.java
index 5da23ee..db9e14a 100644
--- a/src/java/azkaban/webapp/AzkabanServer.java
+++ b/src/java/azkaban/webapp/AzkabanServer.java
@@ -119,4 +119,5 @@ public abstract class AzkabanServer {
 	public abstract VelocityEngine getVelocityEngine();
 	
 	public abstract UserManager getUserManager();
+	
 }
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 39c105c..d0731ed 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -53,10 +53,13 @@ import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
 import azkaban.database.AzkabanDatabaseSetup;
+import azkaban.executor.ExecutorMailer;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
+import azkaban.executor.ExecutorManagerRemoteAdapter;
 import azkaban.executor.JdbcExecutorLoader;
 import azkaban.executor.ExecutorManager.Alerter;
-import azkaban.jmx.JmxExecutorManager;
+import azkaban.jmx.JmxExecutorManagerAdapter;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.jmx.JmxScheduler;
 import azkaban.jmx.JmxTriggerManager;
@@ -146,7 +149,7 @@ public class AzkabanWebServer extends AzkabanServer {
 	private final Server server;
 	private UserManager userManager;
 	private ProjectManager projectManager;
-	private ExecutorManager executorManager;
+	private ExecutorManagerAdapter executorManager;
 	private ScheduleManager scheduleManager;
 //	private TriggerBasedScheduler scheduler;
 	private TriggerManager triggerManager;
@@ -183,9 +186,13 @@ public class AzkabanWebServer extends AzkabanServer {
 		triggerManager = loadTriggerManager(props);
 		executorManager = loadExecutorManager(props);
 		projectManager = loadProjectManager(props, triggerManager);
+		
+		// load all triggger agents here
 		scheduleManager = loadScheduleManager(executorManager, triggerManager, props);
 		
 		loadBuiltinCheckersAndActions();
+		String triggerPluginDir = props.getString("trigger.plugin.dir", "plugins/triggers");
+		loadPluginCheckersAndActions(triggerPluginDir);
 		
 		baseClassLoader = getBaseClassloader();
 		
@@ -239,34 +246,54 @@ public class AzkabanWebServer extends AzkabanServer {
 
 		JdbcProjectLoader loader = new JdbcProjectLoader(props);
 		ProjectManager manager = new ProjectManager(loader, props);
-		manager.setTriggerManager(tm);
-		
 		return manager;
 	}
 
 	private ExecutorManager loadExecutorManager(Props props) throws Exception {
 		JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
-		ExecutorManager execManager = new ExecutorManager(props, loader, true);
+		ExecutorManager execManager = new ExecutorManager(props, loader);
 		return execManager;
 	}
-
-	private ScheduleManager loadScheduleManager(ExecutorManager executorManager, TriggerManager tm, Props props ) throws Exception {
-		ScheduleManager schedManager = null;
-		String scheduleLoaderType = props.getString("azkaban.scheduler.loader", "TriggerBasedScheduleLoader");
-		if(scheduleLoaderType.equals("JdbcScheduleLoader")) {
-			ScheduleLoader loader = new JdbcScheduleLoader(props);
-			schedManager = new ScheduleManager(executorManager, loader, false);
-			schedManager.setProjectManager(projectManager);
-			schedManager.start();
-		} else if(scheduleLoaderType.equals("TriggerBasedScheduleLoader")) {
-			logger.info("Loading trigger based scheduler");
-			ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, executorManager, null, ScheduleManager.triggerSource);
-			schedManager = new ScheduleManager(executorManager, loader, true);
+	
+	private ExecutorManagerAdapter loadExecutorManagerAdapter(Props props) throws Exception {
+//		JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
+//		ExecutorManager execManager = new ExecutorManager(props, loader, true);
+//		return execManager;
+		String executorMode = props.getString("executor.manager.mode", "local");
+		ExecutorManagerAdapter adapter;
+		if(executorMode.equals("local")) {
+			adapter = loadExecutorManager(props);
+		} else if(executorMode.equals("remote")) {
+			JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
+			adapter = new ExecutorManagerRemoteAdapter(props, loader);
+		} else {
+			throw new Exception("Unknown ExecutorManager mode " + executorMode);
 		}
-
-		return schedManager;
+		return adapter;
 	}
+
+//	private ScheduleManager loadScheduleManager(ExecutorManagerAdapter executorManager, TriggerManager tm, Props props ) throws Exception {
+//		ScheduleManager schedManager = null;
+//		String scheduleLoaderType = props.getString("azkaban.scheduler.loader", "TriggerBasedScheduleLoader");
+//		if(scheduleLoaderType.equals("JdbcScheduleLoader")) {
+//			ScheduleLoader loader = new JdbcScheduleLoader(props);
+//			schedManager = new ScheduleManager(executorManager, loader, false);
+//			schedManager.setProjectManager(projectManager);
+//			schedManager.start();
+//		} else if(scheduleLoaderType.equals("TriggerBasedScheduleLoader")) {
+//			logger.info("Loading trigger based scheduler");
+//			ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, executorManager, null, ScheduleManager.triggerSource);
+//			schedManager = new ScheduleManager(executorManager, loader, true);
+//		}
+//
+//		return schedManager;
+//	}
 	
+	private ScheduleManager loadScheduleManager(ExecutorManagerAdapter executorManager, TriggerManager tm, Props props ) throws Exception {
+		logger.info("Loading trigger based scheduler");
+		ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, executorManager, null, ScheduleManager.triggerSource);
+		return new ScheduleManager(executorManager, loader, true);
+	}
 //	private TriggerBasedScheduler loadScheduler(ExecutorManager executorManager, ProjectManager projectManager, TriggerManager triggerManager) {
 //		TriggerBasedScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager);
 //		return new TriggerBasedScheduler(executorManager, projectManager, triggerManager, loader);
@@ -279,35 +306,283 @@ public class AzkabanWebServer extends AzkabanServer {
 	
 	private void loadBuiltinCheckersAndActions() {
 		logger.info("Loading built-in checker and action types");
-//		ExecutorManager executorManager = app.getExecutorManager();
-//		TriggerRunnerManager triggerRunnerManager = app.getTriggerRunnerManager();
-		CheckerTypeLoader checkerLoader = triggerManager.getCheckerLoader();
-		ActionTypeLoader actionLoader = triggerManager.getActionLoader();
-		// time:
-		checkerLoader.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
-//		// execution checker
-//		ExecutionChecker.setExecutorManager(executorManager);
-//		checkerLoader.registerCheckerType(ExecutionChecker.type, ExecutionChecker.class);
-		// Sla checker
-//		SlaChecker.setExecutorManager(executorManager);
-		checkerLoader.registerCheckerType(SlaChecker.type, SlaChecker.class);
 		
-		// execut flow action
-//		ExecuteFlowAction.setExecutorManager(executorManager);
-//		ExecuteFlowAction.setProjectManager(projectManager);
-		actionLoader.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
-		// kill flow action
-//		KillExecutionAction.setExecutorManager(executorManager);
-		actionLoader.registerActionType(KillExecutionAction.type, KillExecutionAction.class);
-		// sla alert
-//		SlaAlertAction.setExecutorManager(executorManager);
-//		Map<String, Alerter> alerters = loadAlerters(props);
-//		SlaAlertAction.setAlerters(alerters);
-		actionLoader.registerActionType(SlaAlertAction.type, SlaAlertAction.class);
-		// create trigger action
-//		CreateTriggerAction.setTriggerRunnerManager(triggerRunnerManager);
-		actionLoader.registerActionType(CreateTriggerAction.type, CreateTriggerAction.class);
+		if(triggerManager instanceof TriggerManager) {
+			SlaChecker.setExecutorManager(executorManager);
+			ExecuteFlowAction.setExecutorManager(executorManager);
+			ExecuteFlowAction.setProjectManager(projectManager);
+			ExecuteFlowAction.setTriggerManager(triggerManager);
+			KillExecutionAction.setExecutorManager(executorManager);
+			SlaAlertAction.setExecutorManager(executorManager);
+			Map<String, Alerter> alerters = loadAlerters(props);
+			SlaAlertAction.setAlerters(alerters);
+			SlaAlertAction.setExecutorManager(executorManager);
+			CreateTriggerAction.setTriggerManager(triggerManager);
+		}
+
+		triggerManager.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
+		triggerManager.registerCheckerType(SlaChecker.type, SlaChecker.class);
+		triggerManager.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
+		triggerManager.registerActionType(KillExecutionAction.type, KillExecutionAction.class);
+		triggerManager.registerActionType(SlaAlertAction.type, SlaAlertAction.class);
+		triggerManager.registerActionType(CreateTriggerAction.type, CreateTriggerAction.class);
+	}
+	
+	private Map<String, Alerter> loadAlerters(Props props) {
+		Map<String, Alerter> allAlerters = new HashMap<String, Alerter>();
+		// load built-in alerters
+		ExecutorMailer mailAlerter = new ExecutorMailer(props);
+		allAlerters.put("email", mailAlerter);
+		// load all plugin alerters
+		String pluginDir = props.getString("alerter.plugin.dir", "plugins/alerter");
+		allAlerters.putAll(loadPluginAlerters(pluginDir));
+		return allAlerters;
+	}
+	
+	private Map<String, Alerter> loadPluginAlerters(String pluginPath) {
+		File alerterPluginPath = new File(pluginPath);
+		if (!alerterPluginPath.exists()) {
+			return Collections.<String, Alerter>emptyMap();
+		}
+			
+		Map<String, Alerter> installedAlerterPlugins = new HashMap<String, Alerter>();
+		ClassLoader parentLoader = SlaAlertAction.class.getClass().getClassLoader();
+		File[] pluginDirs = alerterPluginPath.listFiles();
+		ArrayList<String> jarPaths = new ArrayList<String>();
+		for (File pluginDir: pluginDirs) {
+			if (!pluginDir.isDirectory()) {
+				logger.error("The plugin path " + pluginDir + " is not a directory.");
+				continue;
+			}
+			
+			// Load the conf directory
+			File propertiesDir = new File(pluginDir, "conf");
+			Props pluginProps = null;
+			if (propertiesDir.exists() && propertiesDir.isDirectory()) {
+				File propertiesFile = new File(propertiesDir, "plugin.properties");
+				File propertiesOverrideFile = new File(propertiesDir, "override.properties");
+				
+				if (propertiesFile.exists()) {
+					if (propertiesOverrideFile.exists()) {
+						pluginProps = PropsUtils.loadProps(null, propertiesFile, propertiesOverrideFile);
+					}
+					else {
+						pluginProps = PropsUtils.loadProps(null, propertiesFile);
+					}
+				}
+				else {
+					logger.error("Plugin conf file " + propertiesFile + " not found.");
+					continue;
+				}
+			}
+			else {
+				logger.error("Plugin conf path " + propertiesDir + " not found.");
+				continue;
+			}
+			
+			String pluginName = pluginProps.getString("alerter.name");
+			List<String> extLibClasspath = pluginProps.getStringList("alerter.external.classpaths", (List<String>)null);
+			
+			String pluginClass = pluginProps.getString("alerter.class");
+			if (pluginClass == null) {
+				logger.error("Alerter class is not set.");
+			}
+			else {
+				logger.info("Plugin class " + pluginClass);
+			}
+			
+			URLClassLoader urlClassLoader = null;
+			File libDir = new File(pluginDir, "lib");
+			if (libDir.exists() && libDir.isDirectory()) {
+				File[] files = libDir.listFiles();
+				
+				ArrayList<URL> urls = new ArrayList<URL>();
+				for (int i=0; i < files.length; ++i) {
+					try {
+						URL url = files[i].toURI().toURL();
+						urls.add(url);
+					} catch (MalformedURLException e) {
+						logger.error(e);
+					}
+				}
+				if (extLibClasspath != null) {
+					for (String extLib : extLibClasspath) {
+						try {
+							File file = new File(pluginDir, extLib);
+							URL url = file.toURI().toURL();
+							urls.add(url);
+						} catch (MalformedURLException e) {
+							logger.error(e);
+						}
+					}
+				}
+				
+				urlClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader);
+			}
+			else {
+				logger.error("Library path " + propertiesDir + " not found.");
+				continue;
+			}
+			
+			Class<?> alerterClass = null;
+			try {
+				alerterClass = urlClassLoader.loadClass(pluginClass);
+			}
+			catch (ClassNotFoundException e) {
+				logger.error("Class " + pluginClass + " not found.");
+				continue;
+			}
+
+			String source = FileIOUtils.getSourcePathFromClass(alerterClass);
+			logger.info("Source jar " + source);
+			jarPaths.add("jar:file:" + source);
+			
+			Constructor<?> constructor = null;
+			try {
+				constructor = alerterClass.getConstructor(Props.class);
+			} catch (NoSuchMethodException e) {
+				logger.error("Constructor not found in " + pluginClass);
+				continue;
+			}
+			
+			Object obj = null;
+			try {
+				obj = constructor.newInstance(pluginProps);
+			} catch (Exception e) {
+				logger.error(e);
+			} 
+			
+			if (!(obj instanceof Alerter)) {
+				logger.error("The object is not an Alerter");
+				continue;
+			}
+			
+			Alerter plugin = (Alerter) obj;
+			installedAlerterPlugins.put(pluginName, plugin);
+		}
+		
+		return installedAlerterPlugins;
+		
+	}
+	
+	private void loadPluginCheckersAndActions(String pluginPath) {
+		logger.info("Loading plug-in checker and action types");
+		File triggerPluginPath = new File(pluginPath);
+		if (!triggerPluginPath.exists()) {
+			logger.error("plugin path " + pluginPath + " doesn't exist!");
+			return;
+		}
+			
+		ClassLoader parentLoader = this.getClassLoader();
+		File[] pluginDirs = triggerPluginPath.listFiles();
+		ArrayList<String> jarPaths = new ArrayList<String>();
+		for (File pluginDir: pluginDirs) {
+			if (!pluginDir.exists()) {
+				logger.error("Error! Trigger plugin path " + pluginDir.getPath() + " doesn't exist.");
+				continue;
+			}
+			
+			if (!pluginDir.isDirectory()) {
+				logger.error("The plugin path " + pluginDir + " is not a directory.");
+				continue;
+			}
+			
+			// Load the conf directory
+			File propertiesDir = new File(pluginDir, "conf");
+			Props pluginProps = null;
+			if (propertiesDir.exists() && propertiesDir.isDirectory()) {
+				File propertiesFile = new File(propertiesDir, "plugin.properties");
+				File propertiesOverrideFile = new File(propertiesDir, "override.properties");
+				
+				if (propertiesFile.exists()) {
+					if (propertiesOverrideFile.exists()) {
+						pluginProps = PropsUtils.loadProps(null, propertiesFile, propertiesOverrideFile);
+					}
+					else {
+						pluginProps = PropsUtils.loadProps(null, propertiesFile);
+					}
+				}
+				else {
+					logger.error("Plugin conf file " + propertiesFile + " not found.");
+					continue;
+				}
+			}
+			else {
+				logger.error("Plugin conf path " + propertiesDir + " not found.");
+				continue;
+			}
+			
+			List<String> extLibClasspath = pluginProps.getStringList("trigger.external.classpaths", (List<String>)null);
+			
+			String pluginClass = pluginProps.getString("trigger.class");
+			if (pluginClass == null) {
+				logger.error("Trigger class is not set.");
+			}
+			else {
+				logger.error("Plugin class " + pluginClass);
+			}
+			
+			URLClassLoader urlClassLoader = null;
+			File libDir = new File(pluginDir, "lib");
+			if (libDir.exists() && libDir.isDirectory()) {
+				File[] files = libDir.listFiles();
+				
+				ArrayList<URL> urls = new ArrayList<URL>();
+				for (int i=0; i < files.length; ++i) {
+					try {
+						URL url = files[i].toURI().toURL();
+						urls.add(url);
+					} catch (MalformedURLException e) {
+						logger.error(e);
+					}
+				}
+				if (extLibClasspath != null) {
+					for (String extLib : extLibClasspath) {
+						try {
+							File file = new File(pluginDir, extLib);
+							URL url = file.toURI().toURL();
+							urls.add(url);
+						} catch (MalformedURLException e) {
+							logger.error(e);
+						}
+					}
+				}
+				
+				urlClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader);
+			}
+			else {
+				logger.error("Library path " + propertiesDir + " not found.");
+				continue;
+			}
+			
+			Class<?> triggerClass = null;
+			try {
+				triggerClass = urlClassLoader.loadClass(pluginClass);
+			}
+			catch (ClassNotFoundException e) {
+				logger.error("Class " + pluginClass + " not found.");
+				continue;
+			}
 
+			String source = FileIOUtils.getSourcePathFromClass(triggerClass);
+			logger.info("Source jar " + source);
+			jarPaths.add("jar:file:" + source);
+			
+			try {
+				Utils.invokeStaticMethod(urlClassLoader, pluginClass, "initiateCheckerTypes", pluginProps, app);
+			} catch (Exception e) {
+				logger.error("Unable to initiate checker types for " + pluginClass);
+				continue;
+			}
+			
+			try {
+				Utils.invokeStaticMethod(urlClassLoader, pluginClass, "initiateActionTypes", pluginProps, app);
+			} catch (Exception e) {
+				logger.error("Unable to initiate action types for " + pluginClass);
+				continue;
+			}
+			
+		}
 	}
 	
 	/**
@@ -347,7 +622,7 @@ public class AzkabanWebServer extends AzkabanServer {
 	/**
      * 
      */
-	public ExecutorManager getExecutorManager() {
+	public ExecutorManagerAdapter getExecutorManager() {
 		return executorManager;
 	}
 	
@@ -530,12 +805,13 @@ public class AzkabanWebServer extends AzkabanServer {
 		Map<String, TriggerPlugin> triggerPlugins = loadTriggerPlugins(root, triggerPluginDir, app);
 		app.setTriggerPlugins(triggerPlugins);
 		// always have basic time trigger
-		app.getTriggerManager().addTriggerAgent(app.getScheduleManager().getTriggerSource(), app.getScheduleManager());
+		//TODO: find something else to do the job
+//		app.getTriggerManager().addTriggerAgent(app.getScheduleManager().getTriggerSource(), app.getScheduleManager());
 		// add additional triggers
-		for(TriggerPlugin plugin : triggerPlugins.values()) {
-			TriggerAgent agent = plugin.getAgent();
-			app.getTriggerManager().addTriggerAgent(agent.getTriggerSource(), agent);
-		}
+//		for(TriggerPlugin plugin : triggerPlugins.values()) {
+//			TriggerAgent agent = plugin.getAgent();
+//			app.getTriggerManager().addTriggerAgent(agent.getTriggerSource(), agent);
+//		}
 		// fire up
 		app.getTriggerManager().start();
 
@@ -939,7 +1215,11 @@ public class AzkabanWebServer extends AzkabanServer {
 
 		registerMbean("jetty", new JmxJettyServer(server));
 		registerMbean("triggerManager", new JmxTriggerManager(triggerManager));
-		registerMbean("executorManager", new JmxExecutorManager(executorManager));
+//		if(executorManager instanceof ExecutorManagerLocalAdapter) {
+//			registerMbean("executorManager", new JmxExecutorManager(((ExecutorManagerLocalAdapter)executorManager).getExecutorManager()));
+//		}
+//		registerMbean("executorManager", new JmxExecutorManager(executorManager));
+		registerMbean("executorManager", new JmxExecutorManagerAdapter(executorManager));
 	}
 	
 	public void close() {
@@ -952,6 +1232,9 @@ public class AzkabanWebServer extends AzkabanServer {
 			logger.error("Failed to cleanup MBeanServer", e);
 		}
 		scheduleManager.shutdown();
+//		if(executorManager instanceof ExecutorManagerLocalAdapter) {
+//			((ExecutorManagerLocalAdapter)executorManager).getExecutorManager().shutdown();
+//		}
 		executorManager.shutdown();
 	}
 	
diff --git a/src/java/azkaban/webapp/servlet/AbstractServiceServlet.java b/src/java/azkaban/webapp/servlet/AbstractServiceServlet.java
new file mode 100644
index 0000000..eb1013e
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/AbstractServiceServlet.java
@@ -0,0 +1,91 @@
+package azkaban.webapp.servlet;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.codehaus.jackson.map.ObjectMapper;
+import azkaban.webapp.AzkabanServer;
+
+public class AbstractServiceServlet extends HttpServlet{
+	
+	private static final long serialVersionUID = 1L;
+	public static final String JSON_MIME_TYPE = "application/json";
+	
+	private AzkabanServer application;
+
+	@Override
+	public void init(ServletConfig config) throws ServletException {
+		application = (AzkabanServer) config.getServletContext().getAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
+
+		if (application == null) {
+			throw new IllegalStateException(
+					"No batch application is defined in the servlet context!");
+		}
+	}
+
+	protected void writeJSON(HttpServletResponse resp, Object obj) throws IOException {
+		resp.setContentType(JSON_MIME_TYPE);
+		ObjectMapper mapper = new ObjectMapper();
+		OutputStream stream = resp.getOutputStream();
+		mapper.writeValue(stream, obj);
+	}
+
+	public boolean hasParam(HttpServletRequest request, String param) {
+		return request.getParameter(param) != null;
+	}
+
+	public String getParam(HttpServletRequest request, String name)
+			throws ServletException {
+		String p = request.getParameter(name);
+		if (p == null)
+			throw new ServletException("Missing required parameter '" + name + "'.");
+		else
+			return p;
+	}
+	
+	public String getParam(HttpServletRequest request, String name, String defaultVal ) {
+		String p = request.getParameter(name);
+		if (p == null) {
+			return defaultVal;
+		}
+
+		return p;
+	}
+
+	public int getIntParam(HttpServletRequest request, String name) throws ServletException {
+		String p = getParam(request, name);
+		return Integer.parseInt(p);
+	}
+	
+	public int getIntParam(HttpServletRequest request, String name, int defaultVal) {
+		if (hasParam(request, name)) {
+			try {
+				return getIntParam(request, name);
+			} catch (Exception e) {
+				return defaultVal;
+			}
+		}
+		return defaultVal;
+	}
+	
+	public long getLongParam(HttpServletRequest request, String name) throws ServletException {
+		String p = getParam(request, name);
+		return Long.parseLong(p);
+	}
+	
+	public long getLongParam(HttpServletRequest request, String name, long defaultVal) {
+		if (hasParam(request, name)) {
+			try {
+				return getLongParam(request, name);
+			} catch (Exception e) {
+				return defaultVal;
+			}
+		}
+		return defaultVal;
+	}
+	
+}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index edae8a6..0bbd585 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -30,6 +30,7 @@ import javax.servlet.http.HttpServletResponse;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
@@ -51,7 +52,7 @@ import azkaban.webapp.session.Session;
 public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 	private static final long serialVersionUID = 1L;
 	private ProjectManager projectManager;
-	private ExecutorManager executorManager;
+	private ExecutorManagerAdapter executorManager;
 	private ScheduleManager scheduleManager;
 	private ExecutorVelocityHelper velocityHelper;
 
@@ -768,7 +769,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 		
 		try {
-			String message = executorManager.submitExecutableFlow(exflow);
+			String message = executorManager.submitExecutableFlow(exflow, user.getUserId());
 			ret.put("message", message);
 		}
 		catch (ExecutorManagerException e) {
diff --git a/src/java/azkaban/webapp/servlet/HistoryServlet.java b/src/java/azkaban/webapp/servlet/HistoryServlet.java
index aa6d2ad..0be4206 100644
--- a/src/java/azkaban/webapp/servlet/HistoryServlet.java
+++ b/src/java/azkaban/webapp/servlet/HistoryServlet.java
@@ -33,6 +33,7 @@ import org.joda.time.format.DateTimeFormat;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
@@ -42,7 +43,7 @@ import azkaban.webapp.session.Session;
 public class HistoryServlet extends LoginAbstractAzkabanServlet {
 
 	private static final long serialVersionUID = 1L;
-	private ExecutorManager executorManager;
+	private ExecutorManagerAdapter executorManager;
 	private ProjectManager projectManager;
 	private ExecutorVMHelper vmHelper;
 	
diff --git a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
index 2d4dff5..601abda 100644
--- a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
+++ b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
@@ -17,6 +17,7 @@ import org.apache.log4j.Logger;
 
 import azkaban.executor.ConnectorParams;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.triggerapp.TriggerConnectorParams;
 import azkaban.trigger.TriggerManager;
 import azkaban.user.Permission;
@@ -39,7 +40,7 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
 
 	private UserManager userManager;
 	private AzkabanWebServer server;
-	private ExecutorManager executorManager;
+	private ExecutorManagerAdapter executorManager;
 	private TriggerManager triggerManager;
 	
 	@Override
@@ -49,6 +50,7 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
 		server = (AzkabanWebServer)getApplication();
 		userManager = server.getUserManager();
 		executorManager = server.getExecutorManager();
+
 		triggerManager = server.getTriggerManager();
 	}
 	
@@ -79,12 +81,11 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
 				if(!hasParam(req, JMX_MBEAN) || !hasParam(req, JMX_HOSTPORT)) {
 					ret.put("error", "Parameters '" + JMX_MBEAN + "' and '"+ JMX_HOSTPORT +"' must be set");
 					this.writeJSON(resp, ret, true);
-                    return;
+					return;
 				}
-				String hostPort = getParam(req, JMX_HOSTPORT);
-                String mbean = getParam(req, JMX_MBEAN);
-                Map<String, Object> result = triggerManager.callTriggerServerJMX(hostPort, JMX_GET_ALL_MBEAN_ATTRIBUTES, mbean);
-                ret = result;
+//				String hostPort = getParam(req, JMX_HOSTPORT);
+//				String mbean = getParam(req, JMX_MBEAN);
+				ret = triggerManager.getJMX().getAllJMXMbeans();
 			}
 			else if (JMX_GET_MBEANS.equals(ajax)) {
 				ret.put("mbeans", server.getMbeanNames());
@@ -175,17 +176,18 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
 		page.add("mbeans", server.getMbeanNames());
 		
 		Map<String, Object> executorMBeans = new HashMap<String,Object>();
-		Set<String> primaryServerHosts = executorManager.getPrimaryServerHosts();
+//		Set<String> primaryServerHosts = executorManager.getPrimaryServerHosts();
 		for (String hostPort: executorManager.getAllActiveExecutorServerHosts()) {
 			try {
 				Map<String, Object> mbeans = executorManager.callExecutorJMX(hostPort, JMX_GET_MBEANS, null);
 	
-				if (primaryServerHosts.contains(hostPort)) {
-					executorMBeans.put(hostPort, mbeans.get("mbeans"));
-				}
-				else {
-					executorMBeans.put(hostPort, mbeans.get("mbeans"));
-				}
+				executorMBeans.put(hostPort, mbeans.get("mbeans"));
+//				if (primaryServerHosts.contains(hostPort)) {
+//					executorMBeans.put(hostPort, mbeans.get("mbeans"));
+//				}
+//				else {
+//					executorMBeans.put(hostPort, mbeans.get("mbeans"));
+//				}
 			}
 			catch (IOException e) {
 				logger.error("Cannot contact executor " + hostPort, e);
@@ -195,22 +197,23 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
 		page.add("executorRemoteMBeans", executorMBeans);
 		
 		Map<String, Object> triggerserverMBeans = new HashMap<String,Object>();
-		Set<String> primaryTriggerServerHosts = triggerManager.getPrimaryServerHosts();
-		for (String hostPort: triggerManager.getAllActiveTriggerServerHosts()) {
-			try {
-				Map<String, Object> mbeans = triggerManager.callTriggerServerJMX(hostPort, TriggerConnectorParams.JMX_GET_MBEANS, null);
-				
-				if (primaryTriggerServerHosts.contains(hostPort)) {
-					triggerserverMBeans.put(hostPort, mbeans.get("mbeans"));
-				}
-				else {
-					triggerserverMBeans.put(hostPort, mbeans.get("mbeans"));
-				}
-			}
-			catch (IOException e) {
-				logger.error("Cannot contact executor " + hostPort, e);
-			}
-		}
+//		Set<String> primaryTriggerServerHosts = triggerManager.getPrimaryServerHosts();
+//		for (String hostPort: triggerManager.getAllActiveTriggerServerHosts()) {
+//			try {
+//				Map<String, Object> mbeans = triggerManager.callTriggerServerJMX(hostPort, TriggerConnectorParams.JMX_GET_MBEANS, null);
+//				
+//				if (primaryTriggerServerHosts.contains(hostPort)) {
+//					triggerserverMBeans.put(hostPort, mbeans.get("mbeans"));
+//				}
+//				else {
+//					triggerserverMBeans.put(hostPort, mbeans.get("mbeans"));
+//				}
+//			}
+//			catch (IOException e) {
+//				logger.error("Cannot contact executor " + hostPort, e);
+//			}
+//		}
+		triggerserverMBeans.put(triggerManager.getJMX().getPrimaryServerHost(), triggerManager.getJMX().getAllJMXMbeans());
 		
 		page.add("triggerserverRemoteMBeans", triggerserverMBeans);
 		
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 989cca9..5be504b 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -45,6 +45,7 @@ import org.apache.log4j.Logger;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableJobInfo;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Edge;
 import azkaban.flow.Flow;
@@ -76,7 +77,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 	private static final String LOCKDOWN_CREATE_PROJECTS_KEY = "lockdown.create.projects";
 	
 	private ProjectManager projectManager;
-	private ExecutorManager executorManager;
+	private ExecutorManagerAdapter executorManager;
 	private ScheduleManager scheduleManager;
 	private UserManager userManager;
 
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 1ef2cef..aab3bde 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -44,6 +44,7 @@ import org.joda.time.format.DateTimeFormat;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.flow.Node;
@@ -252,7 +253,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			ExecutionOptions flowOptions = sched.getExecutionOptions();
 			
 			if(slaOptions != null && slaOptions.size() > 0) {
-				ret.put("slaEmails", slaOptions.get(0).getInfo().get("SlaEmails"));
+				ret.put("slaEmails", slaOptions.get(0).getInfo().get(SlaOption.INFO_EMAIL_LIST));
 				
 				List<Object> setObj = new ArrayList<Object>();
 				for(SlaOption sla: slaOptions) {
@@ -473,7 +474,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		List<ExecutableFlow> history = null;
 		try {
 			AzkabanWebServer server = (AzkabanWebServer) getApplication();
-			ExecutorManager executorManager = server.getExecutorManager();
+			ExecutorManagerAdapter executorManager = server.getExecutorManager();
 			history = executorManager.getExecutableFlows(null, null, null, 0, startTime, endTime, -1, -1);
 		} catch (ExecutorManagerException e) {
 			// Return empty should suffice
diff --git a/src/java/azkaban/webapp/servlet/velocity/nav.vm b/src/java/azkaban/webapp/servlet/velocity/nav.vm
index 9326d81..0ecef8e 100644
--- a/src/java/azkaban/webapp/servlet/velocity/nav.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/nav.vm
@@ -25,7 +25,9 @@
 			<ul id="nav" class="nav">
 				<li id="all-jobs-tab" #if($current_page == 'all')class="selected"#end onClick="navMenuClick('$!context/')"><a href="$!context/">Projects</a></li>
 				<li id="scheduled-jobs-tab" #if($current_page == 'schedule')class="selected"#end onClick="navMenuClick('$!context/schedule')"><a href="$!context/schedule">Scheduled</a></li>
-				<li id="triggers-tab" #if($current_page == 'triggers')class="selected"#end onClick="navMenuClick('$!context/triggers')"><a href="$!context/triggers">Triggers</a></li>
+
+				<!--li id="triggers-tab" #if($current_page == 'triggers')class="selected"#end onClick="navMenuClick('$!context/triggers')"><a href="$!context/triggers">Triggers</a></li-->
+
 				<li id="executing-jobs-tab" #if($current_page == 'executing')class="selected"#end onClick="navMenuClick('$!context/executor')"><a href="$!context/executor">Executing</a></li>
 				<li id="history-jobs-tab" #if($current_page == 'history')class="selected"#end onClick="navMenuClick('$!context/history')"><a href="$!context/history">History</a></li>
 				
diff --git a/src/package/triggerserver/bin/azkaban-trigger-shutdown.sh b/src/package/triggerserver/bin/azkaban-trigger-shutdown.sh
new file mode 100755
index 0000000..3dda364
--- /dev/null
+++ b/src/package/triggerserver/bin/azkaban-trigger-shutdown.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+azkaban_dir=$(dirname $0)/..
+
+triggerport=`cat $azkaban_dir/conf/azkaban.properties | grep trigger.port | cut -d = -f 2`
+echo "Shutting down current running AzkabanTriggerServer at port $triggerport"
+
+proc=`cat $azkaban_dir/currentpid`
+
+kill $proc
+
+cat /dev/null > $azkaban_dir/currentpid
diff --git a/src/package/triggerserver/bin/azkaban-trigger-start.sh b/src/package/triggerserver/bin/azkaban-trigger-start.sh
new file mode 100755
index 0000000..00e1077
--- /dev/null
+++ b/src/package/triggerserver/bin/azkaban-trigger-start.sh
@@ -0,0 +1,37 @@
+azkaban_dir=$(dirname $0)/..
+
+if [[ -z "$tmpdir" ]]; then
+tmpdir=temp
+fi
+
+for file in $azkaban_dir/lib/*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $azkaban_dir/extlib/*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $azkaban_dir/plugins/*/*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
+echo $azkaban_dir;
+echo $CLASSPATH;
+
+triggerport=`cat $azkaban_dir/conf/azkaban.properties | grep trigger.port | cut -d = -f 2`
+echo "Starting AzkabanTriggerServer on port $triggerport ..."
+serverpath=`pwd`
+
+if [ -z $AZKABAN_OPTS ]; then
+  AZKABAN_OPTS=-Xmx3G
+fi
+AZKABAN_OPTS=$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dtriggerport=$triggerport -Dserverpath=$serverpath
+
+java $AZKABAN_OPTS -cp $CLASSPATH azkaban.triggerapp.AzkabanTriggerServer -conf $azkaban_dir/conf $@ &
+
+echo $! > currentpid
+
diff --git a/src/package/triggerserver/conf/azkaban.private.properties b/src/package/triggerserver/conf/azkaban.private.properties
new file mode 100644
index 0000000..cce1792
--- /dev/null
+++ b/src/package/triggerserver/conf/azkaban.private.properties
@@ -0,0 +1 @@
+# Optional Properties that are hidden to the executions
\ No newline at end of file
diff --git a/src/package/triggerserver/conf/azkaban.properties b/src/package/triggerserver/conf/azkaban.properties
new file mode 100644
index 0000000..3504854
--- /dev/null
+++ b/src/package/triggerserver/conf/azkaban.properties
@@ -0,0 +1,18 @@
+#Azkaban
+default.timezone.id=America/Los_Angeles
+
+#Loader for projects
+azkaban.project.dir=projects
+
+database.type=mysql
+mysql.port=3306
+mysql.host=localhost
+mysql.database=azkaban2
+mysql.user=azkaban
+mysql.password=azkaban
+mysql.numconnections=100
+
+# Azkaban Executor settings
+trigger.server.maxThreads=50
+trigger.server.port=22321
+jetty.hostname=eat1-spadesaz01.grid.linkedin.com
diff --git a/src/package/triggerserver/conf/global.properties b/src/package/triggerserver/conf/global.properties
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/package/triggerserver/conf/global.properties
diff --git a/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java b/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
index d65ce7b..dc9b970 100644
--- a/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
+++ b/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
@@ -26,7 +26,7 @@ public class BasicTimeCheckerTest {
 		DateTime now = DateTime.now();
 		ReadablePeriod period = Utils.parsePeriodString("10s");
 		
-		BasicTimeChecker timeChecker = new BasicTimeChecker("BasicTimeChecket_1", now, now.getZone(), true, true, period);
+		BasicTimeChecker timeChecker = new BasicTimeChecker("BasicTimeChecket_1", now.getMillis(), now.getZone(), true, true, period);
 		checkers.put(timeChecker.getId(), timeChecker);
 		String expr = timeChecker.getId() + ".eval()";
 		
diff --git a/unit/java/azkaban/test/trigger/ConditionTest.java b/unit/java/azkaban/test/trigger/ConditionTest.java
index 09510cf..c56edab 100644
--- a/unit/java/azkaban/test/trigger/ConditionTest.java
+++ b/unit/java/azkaban/test/trigger/ConditionTest.java
@@ -60,7 +60,7 @@ public class ConditionTest {
 		String period = "6s";
 		
 		//BasicTimeChecker timeChecker = new BasicTimeChecker(now, true, true, period);
-		ConditionChecker timeChecker = new BasicTimeChecker("BasicTimeChecker_1", now, now.getZone(), true, true, Utils.parsePeriodString(period));
+		ConditionChecker timeChecker = new BasicTimeChecker("BasicTimeChecker_1", now.getMillis(), now.getZone(), true, true, Utils.parsePeriodString(period));
 		System.out.println("checker id is " + timeChecker.getId());
 		
 		checkers.put(timeChecker.getId(), timeChecker);
diff --git a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
index 9a06f56..82a9796 100644
--- a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
+++ b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
@@ -32,6 +32,7 @@ import azkaban.trigger.Trigger;
 import azkaban.trigger.TriggerAction;
 import azkaban.trigger.TriggerException;
 import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerLoaderException;
 import azkaban.trigger.TriggerManagerException;
 import azkaban.trigger.builtin.BasicTimeChecker;
 import azkaban.trigger.builtin.ExecuteFlowAction;
@@ -137,7 +138,7 @@ public class JdbcTriggerLoaderTest {
 	}
 	
 	@Test
-	public void addTriggerTest() throws TriggerManagerException {
+	public void addTriggerTest() throws TriggerLoaderException {
 		Trigger t1 = createTrigger("testProj1", "testFlow1", "source1");
 		Trigger t2 = createTrigger("testProj2", "testFlow2", "source2");
 		loader.addTrigger(t1);
@@ -159,7 +160,7 @@ public class JdbcTriggerLoaderTest {
 	}
 	
 	@Test
-	public void removeTriggerTest() throws TriggerManagerException {
+	public void removeTriggerTest() throws TriggerLoaderException {
 		Trigger t1 = createTrigger("testProj1", "testFlow1", "source1");
 		Trigger t2 = createTrigger("testProj2", "testFlow2", "source2");
 		loader.addTrigger(t1);
@@ -173,7 +174,7 @@ public class JdbcTriggerLoaderTest {
 	}
 	
 	@Test
-	public void updateTriggerTest() throws TriggerManagerException {
+	public void updateTriggerTest() throws TriggerLoaderException {
 		Trigger t1 = createTrigger("testProj1", "testFlow1", "source1");
 		t1.setResetOnExpire(true);
 		loader.addTrigger(t1);
@@ -187,7 +188,7 @@ public class JdbcTriggerLoaderTest {
 	
 	private Trigger createTrigger(String projName, String flowName, String source) {
 		DateTime now = DateTime.now();
-		ConditionChecker checker1 = new BasicTimeChecker("timeChecker1", now, now.getZone(), true, true, Utils.parsePeriodString("1h"));
+		ConditionChecker checker1 = new BasicTimeChecker("timeChecker1", now.getMillis(), now.getZone(), true, true, Utils.parsePeriodString("1h"));
 		Map<String, ConditionChecker> checkers1 = new HashMap<String, ConditionChecker>();
 		checkers1.put(checker1.getId(), checker1);
 		String expr1 = checker1.getId() + ".eval()";
@@ -196,7 +197,7 @@ public class JdbcTriggerLoaderTest {
 		List<TriggerAction> actions = new ArrayList<TriggerAction>();
 		TriggerAction action = new ExecuteFlowAction("executeAction", 1, projName, flowName, "azkaban", new ExecutionOptions(), null);
 		actions.add(action);
-		Trigger t = new Trigger(now, now, "azkaban", source, triggerCond, expireCond, actions);
+		Trigger t = new Trigger(now.getMillis(), now.getMillis(), "azkaban", source, triggerCond, expireCond, actions);
 		return t;
 	}
 	
diff --git a/unit/java/azkaban/test/trigger/ThresholdChecker.java b/unit/java/azkaban/test/trigger/ThresholdChecker.java
index c25f566..4e6e85c 100644
--- a/unit/java/azkaban/test/trigger/ThresholdChecker.java
+++ b/unit/java/azkaban/test/trigger/ThresholdChecker.java
@@ -95,5 +95,11 @@ public class ThresholdChecker implements ConditionChecker{
 		
 	}
 
+	@Override
+	public long getNextCheckTime() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
 
 }
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerTest.java b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
index 508436b..fbd5da7 100644
--- a/unit/java/azkaban/test/trigger/TriggerManagerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
@@ -20,6 +20,7 @@ import azkaban.trigger.TriggerAction;
 import azkaban.trigger.ActionTypeLoader;
 import azkaban.trigger.TriggerException;
 import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerLoaderException;
 import azkaban.trigger.TriggerManager;
 import azkaban.trigger.TriggerManagerException;
 import azkaban.utils.Props;
@@ -48,8 +49,8 @@ public class TriggerManagerTest {
 		props.put("trigger.scan.interval", 4000);
 		TriggerManager triggerManager = new TriggerManager(props, triggerLoader);
 		
-		triggerManager.getCheckerLoader().registerCheckerType(ThresholdChecker.type, ThresholdChecker.class);
-		triggerManager.getActionLoader().registerActionType(DummyTriggerAction.type, DummyTriggerAction.class);
+		triggerManager.registerCheckerType(ThresholdChecker.type, ThresholdChecker.class);
+		triggerManager.registerActionType(DummyTriggerAction.type, DummyTriggerAction.class);
 		
 		ThresholdChecker.setVal(1);
 		
@@ -114,19 +115,19 @@ public class TriggerManagerTest {
 		private int idIndex = 0;
 		
 		@Override
-		public void addTrigger(Trigger t) throws TriggerManagerException {
+		public void addTrigger(Trigger t) throws TriggerLoaderException {
 			t.setTriggerId(idIndex++);
 			triggers.put(t.getTriggerId(), t);
 		}
 
 		@Override
-		public void removeTrigger(Trigger s) throws TriggerManagerException {
+		public void removeTrigger(Trigger s) throws TriggerLoaderException {
 			triggers.remove(s.getTriggerId());
 			
 		}
 
 		@Override
-		public void updateTrigger(Trigger t) throws TriggerManagerException {
+		public void updateTrigger(Trigger t) throws TriggerLoaderException {
 			triggers.put(t.getTriggerId(), t);
 		}
 
@@ -137,14 +138,14 @@ public class TriggerManagerTest {
 
 		@Override
 		public Trigger loadTrigger(int triggerId)
-				throws TriggerManagerException {
+				throws TriggerLoaderException {
 			// TODO Auto-generated method stub
 			return null;
 		}
 
 		@Override
 		public List<Trigger> getUpdatedTriggers(long lastUpdateTime)
-				throws TriggerManagerException {
+				throws TriggerLoaderException {
 			// TODO Auto-generated method stub
 			return null;
 		}
@@ -166,7 +167,7 @@ public class TriggerManagerTest {
 		Condition triggerCond = new Condition(checkers, expr);
 		Condition expireCond = new Condition(checkers, expr);
 		
-		Trigger fakeTrigger = new Trigger(DateTime.now(), DateTime.now(), "azkaban", source, triggerCond, expireCond, actions);
+		Trigger fakeTrigger = new Trigger(DateTime.now().getMillis(), DateTime.now().getMillis(), "azkaban", source, triggerCond, expireCond, actions);
 		fakeTrigger.setResetOnTrigger(true);
 		fakeTrigger.setResetOnExpire(true);
 		
diff --git a/unit/java/azkaban/test/trigger/TriggerTest.java b/unit/java/azkaban/test/trigger/TriggerTest.java
index 790fcb4..22f6532 100644
--- a/unit/java/azkaban/test/trigger/TriggerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerTest.java
@@ -45,7 +45,7 @@ public class TriggerTest {
 	@Test
 	public void jsonConversionTest() throws Exception {
 		DateTime now = DateTime.now();
-		ConditionChecker checker1 = new BasicTimeChecker("timeChecker1", now, now.getZone(), true, true, Utils.parsePeriodString("1h"));
+		ConditionChecker checker1 = new BasicTimeChecker("timeChecker1", now.getMillis(), now.getZone(), true, true, Utils.parsePeriodString("1h"));
 		Map<String, ConditionChecker> checkers1 = new HashMap<String, ConditionChecker>();
 		checkers1.put(checker1.getId(), checker1);
 		String expr1 = checker1.getId() + ".eval()";
@@ -54,7 +54,7 @@ public class TriggerTest {
 		List<TriggerAction> actions = new ArrayList<TriggerAction>();
 		TriggerAction action = new ExecuteFlowAction("executeAction", 1, "testProj", "testFlow", "azkaban", new ExecutionOptions(), null);
 		actions.add(action);
-		Trigger t = new Trigger(now, now, "azkaban", "test", triggerCond, expireCond, actions);
+		Trigger t = new Trigger(now.getMillis(), now.getMillis(), "azkaban", "test", triggerCond, expireCond, actions);
 		
 		File temp = File.createTempFile("temptest", "temptest");
 		temp.deleteOnExit();