azkaban-uncached
Changes
src/java/azkaban/executor/ExecutorManager.java 138(+77 -61)
src/java/azkaban/jmx/JmxTriggerManager.java 41(+25 -16)
src/java/azkaban/project/ProjectManager.java 36(+16 -20)
src/java/azkaban/trigger/Condition.java 20(+19 -1)
src/java/azkaban/trigger/JdbcTriggerLoader.java 62(+32 -30)
src/java/azkaban/trigger/Trigger.java 72(+45 -27)
src/java/azkaban/trigger/TriggerManager.java 745(+331 -414)
src/java/azkaban/trigger/TriggerManager.java.old 573(+573 -0)
src/java/azkaban/trigger/TriggerManagerServlet.java 158(+158 -0)
src/java/azkaban/webapp/AzkabanWebServer.java 391(+337 -54)
Details
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 152a546..16dc8d0 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -477,4 +477,15 @@ public class ExecutableFlow {
public void setVersion(int version) {
this.version = version;
}
+
+ public static boolean isFinished(ExecutableFlow flow) {
+ switch(flow.getStatus()) {
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ return true;
+ default:
+ return false;
+ }
+ }
}
src/java/azkaban/executor/ExecutorManager.java 138(+77 -61)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 92a941c..7e1f2fc 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -59,7 +59,7 @@ import azkaban.utils.PropsUtils;
* Executor manager used to manage the client side job.
*
*/
-public class ExecutorManager {
+public class ExecutorManager implements ExecutorManagerAdapter {
private static Logger logger = Logger.getLogger(ExecutorManager.class);
private ExecutorLoader executorLoader;
private String executorHost;
@@ -77,8 +77,6 @@ public class ExecutorManager {
private long lastThreadCheckTime = -1;
- private final boolean isActive;
-
private Map<String, Alerter> alerters;
public interface Alerter {
@@ -88,28 +86,21 @@ public class ExecutorManager {
void alertOnSla(SlaOption slaOption, String slaMessage) throws Exception;
}
- public ExecutorManager(Props props, ExecutorLoader loader, boolean isActive) throws ExecutorManagerException {
+ public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
this.executorLoader = loader;
this.loadRunningFlows();
executorHost = props.getString("executor.host", "localhost");
executorPort = props.getInt("executor.port");
-
-
- this.isActive = isActive;
-
executingManager = new ExecutingManagerUpdaterThread();
executingManager.start();
- if(isActive) {
-
- long executionLogsRetentionMs = props.getLong("execution.logs.retention.ms", DEFAULT_EXECUTION_LOGS_RETENTION_MS);
- cleanerThread = new CleanerThread(executionLogsRetentionMs);
- cleanerThread.start();
-
- alerters = loadAlerters(props);
- }
+ long executionLogsRetentionMs = props.getLong("execution.logs.retention.ms", DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+ cleanerThread = new CleanerThread(executionLogsRetentionMs);
+ cleanerThread.start();
+
+ alerters = loadAlerters(props);
}
private Map<String, Alerter> loadAlerters(Props props) {
@@ -249,23 +240,26 @@ public class ExecutorManager {
}
- public String getExecutorHost() {
- return executorHost;
- }
-
- public int getExecutorPort() {
- return executorPort;
- }
+// private String getExecutorHost() {
+// return executorHost;
+// }
+//
+// private int getExecutorPort() {
+// return executorPort;
+// }
- public State getExecutorThreadState() {
+ @Override
+ public State getExecutorManagerThreadState() {
return executingManager.getState();
}
- public boolean isThreadActive() {
+ @Override
+ public boolean isExecutorManagerThreadActive() {
return executingManager.isAlive();
}
- public long getLastThreadCheckTime() {
+ @Override
+ public long getLastExecutorManagerThreadCheckTime() {
return lastThreadCheckTime;
}
@@ -273,6 +267,7 @@ public class ExecutorManager {
return this.lastCleanerThreadCheckTime;
}
+ @Override
public Set<String> getPrimaryServerHosts() {
// Only one for now. More probably later.
HashSet<String> ports = new HashSet<String>();
@@ -280,6 +275,7 @@ public class ExecutorManager {
return ports;
}
+ @Override
public Set<String> getAllActiveExecutorServerHosts() {
// Includes non primary server/hosts
HashSet<String> ports = new HashSet<String>();
@@ -292,15 +288,16 @@ public class ExecutorManager {
return ports;
}
- public ExecutableFlow fetchExecutableFlow(int execId) throws ExecutorManagerException {
- ExecutableFlow exflow = executorLoader.fetchExecutableFlow(execId);
- return exflow;
- }
+// private ExecutableFlow fetchExecutableFlow(int execId) throws ExecutorManagerException {
+// ExecutableFlow exflow = executorLoader.fetchExecutableFlow(execId);
+// return exflow;
+// }
private void loadRunningFlows() throws ExecutorManagerException {
runningFlows.putAll(executorLoader.fetchActiveFlows());
}
+ @Override
public List<Integer> getRunningFlows(int projectId, String flowId) {
ArrayList<Integer> executionIds = new ArrayList<Integer>();
for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
@@ -308,31 +305,29 @@ public class ExecutorManager {
executionIds.add(ref.getFirst().getExecId());
}
}
-
return executionIds;
}
+ @Override
public boolean isFlowRunning(int projectId, String flowId) {
for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
-
if (ref.getSecond().getProjectId() == projectId && ref.getSecond().getFlowId().equals(flowId)) {
return true;
}
}
-
return false;
}
+ @Override
public ExecutableFlow getExecutableFlow(int execId) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> active = runningFlows.get(execId);
-
if (active == null) {
return executorLoader.fetchExecutableFlow(execId);
}
-
return active.getSecond();
}
+ @Override
public List<ExecutableFlow> getRunningFlows() {
ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
@@ -341,43 +336,52 @@ public class ExecutorManager {
return flows;
}
+ @Override
public List<ExecutableFlow> getRecentlyFinishedFlows() {
return new ArrayList<ExecutableFlow>(recentlyFinished.values());
}
+ @Override
public List<ExecutableFlow> getExecutableFlows(Project project, String flowId, int skip, int size) throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
return flows;
}
+ @Override
public List<ExecutableFlow> getExecutableFlows(int skip, int size) throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
return flows;
}
+ @Override
public List<ExecutableFlow> getExecutableFlows(String flowIdContains, int skip, int size) throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
return flows;
}
+ @Override
public List<ExecutableFlow> getExecutableFlows(String projContain, String flowContain, String userContain, int status, long begin, long end, int skip, int size) throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(projContain, flowContain, userContain, status, begin, end , skip, size);
return flows;
}
+ @Override
public List<ExecutableJobInfo> getExecutableJobs(Project project, String jobId, int skip, int size) throws ExecutorManagerException {
List<ExecutableJobInfo> nodes = executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
return nodes;
}
+ @Override
public int getNumberOfJobExecutions(Project project, String jobId) throws ExecutorManagerException{
return executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
}
+ @Override
public int getNumberOfExecutions(Project project, String flowId) throws ExecutorManagerException{
return executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
}
+ @Override
public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset, int length) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
@@ -395,10 +399,10 @@ public class ExecutorManager {
}
}
+ @Override
public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
-
Pair<String,String> typeParam = new Pair<String,String>("type", "job");
Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
@@ -415,6 +419,7 @@ public class ExecutorManager {
}
}
+ @Override
public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
@@ -434,6 +439,7 @@ public class ExecutorManager {
}
}
+ @Override
public void cancelFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
synchronized(exFlow) {
Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
@@ -444,6 +450,7 @@ public class ExecutorManager {
}
}
+ @Override
public void resumeFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
synchronized(exFlow) {
Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
@@ -454,6 +461,7 @@ public class ExecutorManager {
}
}
+ @Override
public void pauseFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
synchronized(exFlow) {
Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
@@ -464,30 +472,37 @@ public class ExecutorManager {
}
}
+ @Override
public void pauseExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId, jobIds);
}
+ @Override
public void resumeExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId, jobIds);
}
+ @Override
public void retryFailures(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
}
+ @Override
public void retryExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId, jobIds);
}
+ @Override
public void disableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId, jobIds);
}
+ @Override
public void enableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId, jobIds);
}
+ @Override
public void cancelExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId, jobIds);
}
@@ -530,9 +545,10 @@ public class ExecutorManager {
}
}
- public String submitExecutableFlow(ExecutableFlow exflow) throws ExecutorManagerException {
+ @Override
+ public String submitExecutableFlow(ExecutableFlow exflow, String userId) throws ExecutorManagerException {
synchronized(exflow) {
- logger.info("Submitting execution flow " + exflow.getFlowId());
+ logger.info("Submitting execution flow " + exflow.getFlowId() + " by " + userId);
int projectId = exflow.getProjectId();
String flowId = exflow.getFlowId();
@@ -592,7 +608,7 @@ public class ExecutorManager {
}
- public void cleanOldExecutionLogs(long millis) {
+ private void cleanOldExecutionLogs(long millis) {
try {
int count = executorLoader.removeExecutionLogsByTime(millis);
logger.info("Cleaned up " + count + " log entries.");
@@ -688,6 +704,7 @@ public class ExecutorManager {
return jsonResponse;
}
+ @Override
public Map<String, Object> callExecutorJMX(String hostPort, String action, String mBean) throws IOException {
URIBuilder builder = new URIBuilder();
@@ -732,6 +749,7 @@ public class ExecutorManager {
return jsonResponse;
}
+ @Override
public void shutdown() {
executingManager.shutdown();
}
@@ -762,7 +780,7 @@ public class ExecutorManager {
try {
lastThreadCheckTime = System.currentTimeMillis();
- loadRunningFlows();
+// loadRunningFlows();
Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
@@ -880,27 +898,23 @@ public class ExecutorManager {
}
- if(isActive) {
- // If it's marked finished, we're good. If not, we fail everything and then mark it finished.
- if (!isFinished(dsFlow)) {
- failEverything(dsFlow);
- executorLoader.updateExecutableFlow(dsFlow);
- }
-
- // Delete the executing reference.
- if (flow.getEndTime() == -1) {
- flow.setEndTime(System.currentTimeMillis());
- executorLoader.updateExecutableFlow(dsFlow);
- }
- executorLoader.removeActiveExecutableReference(execId);
-
- runningFlows.remove(execId);
- recentlyFinished.put(execId, dsFlow);
- } else {
- runningFlows.remove(execId);
- recentlyFinished.put(execId, dsFlow);
- return;
+
+ // If it's marked finished, we're good. If not, we fail everything and then mark it finished.
+ if (!isFinished(dsFlow)) {
+ failEverything(dsFlow);
+ executorLoader.updateExecutableFlow(dsFlow);
+ }
+
+ // Delete the executing reference.
+ if (flow.getEndTime() == -1) {
+ flow.setEndTime(System.currentTimeMillis());
+ executorLoader.updateExecutableFlow(dsFlow);
}
+ executorLoader.removeActiveExecutableReference(execId);
+
+ runningFlows.remove(execId);
+ recentlyFinished.put(execId, dsFlow);
+
} catch (ExecutorManagerException e) {
logger.error(e);
}
@@ -1043,7 +1057,7 @@ public class ExecutorManager {
Status newStatus = flow.getStatus();
ExecutionOptions options = flow.getExecutionOptions();
- if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING) && isActive) {
+ if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
// We want to see if we should give an email status on first failure.
if (options.getNotifyOnFirstFailure()) {
Alerter mailAlerter = alerters.get("email");
@@ -1178,12 +1192,14 @@ public class ExecutorManager {
}
}
+ @Override
public int getExecutableFlows(int projectId, String flowId, int from, int length, List<ExecutableFlow> outputList) throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(projectId, flowId, from, length);
outputList.addAll(flows);
return executorLoader.fetchNumExecutableFlows(projectId, flowId);
}
+ @Override
public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId, int from, int length, Status status) throws ExecutorManagerException {
return executorLoader.fetchFlowHistory(projectId, flowId, from, length, status);
}
diff --git a/src/java/azkaban/executor/ExecutorManagerAdapter.java b/src/java/azkaban/executor/ExecutorManagerAdapter.java
new file mode 100644
index 0000000..3ddab0c
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -0,0 +1,133 @@
+package azkaban.executor;
+
+import java.io.IOException;
+import java.lang.Thread.State;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import azkaban.project.Project;
+import azkaban.utils.FileIOUtils.JobMetaData;
+import azkaban.utils.FileIOUtils.LogData;
+
+public interface ExecutorManagerAdapter {
+
+ public static final String LOCAL_MODE = "local";
+ public static final String REMOTE_MODE = "remote";
+
+ public static final String REMOTE_EXECUTOR_MANAGER_HOST = "remote.executor.manager.host";
+ public static final String REMOTE_EXECUTOR_MANAGER_PORT = "remote.executor.manager.port";
+ public static final String REMOTE_EXECUTOR_MANAGER_URL = "/executormanager";
+
+ public static final String ACTION_GET_FLOW_LOG = "getFlowLog";
+ public static final String ACTION_GET_JOB_LOG = "getJobLog";
+ public static final String ACTION_CANCEL_FLOW = "cancelFlow";
+ public static final String ACTION_SUBMIT_FLOW = "submitFlow";
+ public static final String ACTION_RESUME_FLOW = "resumeFlow";
+ public static final String ACTION_PAUSE_FLOW = "pauseFlow";
+ public static final String ACTION_MODIFY_EXECUTION = "modifyExecution";
+ public static final String ACTION_UPDATE = "update";
+ public static final String ACTION_GET_JMX = "getJMX";
+
+ public static final String COMMAND_MODIFY_PAUSE_JOBS = "modifyPauseJobs";
+ public static final String COMMAND_MODIFY_RESUME_JOBS = "modifyResumeJobs";
+ public static final String COMMAND_MODIFY_RETRY_FAILURES = "modifyRetryFailures";
+ public static final String COMMAND_MODIFY_RETRY_JOBS = "modifyRetryJobs";
+ public static final String COMMAND_MODIFY_DISABLE_JOBS = "modifyDisableJobs";
+ public static final String COMMAND_MODIFY_ENABLE_JOBS = "modifyEnableJobs";
+ public static final String COMMAND_MODIFY_CANCEL_JOBS = "modifyCancelJobs";
+
+ public static final String INFO_JMX_TYPE = "jmxType";
+ public static final String INFO_JMX_DATA = "jmxData";
+ public static final String INFO_ACTION = "action";
+ public static final String INFO_TYPE = "type";
+ public static final String INFO_EXEC_ID = "execId";
+ public static final String INFO_EXEC_FLOW_JSON = "execFlowJson";
+ public static final String INFO_PROJECT_ID = "projectId";
+ public static final String INFO_FLOW_NAME = "flowName";
+ public static final String INFO_JOB_NAME = "jobName";
+ public static final String INFO_OFFSET = "offset";
+ public static final String INFO_LENGTH = "length";
+ public static final String INFO_ATTEMPT = "attempt";
+ public static final String INFO_MODIFY_JOB_IDS = "modifyJobIds";
+ public static final String INFO_MODIFY_COMMAND = "modifyCommand";
+ public static final String INFO_MESSAGE = "message";
+ public static final String INFO_ERROR = "error";
+ public static final String INFO_UPDATE_TIME_LIST = "updateTimeList";
+ public static final String INFO_EXEC_ID_LIST = "execIdList";
+ public static final String INFO_UPDATES = "updates";
+ public static final String INFO_USER_ID = "userId";
+ public static final String INFO_LOG = "logData";
+
+ public boolean isFlowRunning(int projectId, String flowId);
+
+ public ExecutableFlow getExecutableFlow(int execId) throws ExecutorManagerException;
+
+ public List<Integer> getRunningFlows(int projectId, String flowId);
+
+ public List<ExecutableFlow> getRunningFlows() throws IOException;
+
+ public List<ExecutableFlow> getRecentlyFinishedFlows();
+
+ public List<ExecutableFlow> getExecutableFlows(Project project, String flowId, int skip, int size) throws ExecutorManagerException;
+
+ public List<ExecutableFlow> getExecutableFlows(int skip, int size) throws ExecutorManagerException;
+
+ public List<ExecutableFlow> getExecutableFlows(String flowIdContains, int skip, int size) throws ExecutorManagerException;
+
+ public List<ExecutableFlow> getExecutableFlows(String projContain, String flowContain, String userContain, int status, long begin, long end, int skip, int size) throws ExecutorManagerException;
+
+ public int getExecutableFlows(int projectId, String flowId, int from, int length, List<ExecutableFlow> outputList) throws ExecutorManagerException;
+
+ public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId, int from, int length, Status status) throws ExecutorManagerException;
+
+ public List<ExecutableJobInfo> getExecutableJobs(Project project, String jobId, int skip, int size) throws ExecutorManagerException;
+
+ public int getNumberOfJobExecutions(Project project, String jobId) throws ExecutorManagerException;
+
+ public int getNumberOfExecutions(Project project, String flowId) throws ExecutorManagerException;
+
+ public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset, int length) throws ExecutorManagerException;
+
+ public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
+
+ public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
+
+ public void cancelFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException;
+
+ public void resumeFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException;
+
+ public void pauseFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException;
+
+ public void pauseExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException;
+
+ public void resumeExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException;
+
+ public void retryFailures(ExecutableFlow exFlow, String userId) throws ExecutorManagerException;
+
+ public void retryExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException;
+
+ public void disableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException;
+
+ public void enableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException;
+
+ public void cancelExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException;
+
+ public String submitExecutableFlow(ExecutableFlow exflow, String userId) throws ExecutorManagerException;
+
+ public Map<String, Object> callExecutorJMX(String hostPort, String action, String mBean) throws IOException;
+
+ public void shutdown();
+
+ public Set<String> getAllActiveExecutorServerHosts();
+
+ public State getExecutorManagerThreadState();
+
+ public boolean isExecutorManagerThreadActive();
+
+ public long getLastExecutorManagerThreadCheckTime();
+
+ public Set<? extends String> getPrimaryServerHosts();
+
+}
diff --git a/src/java/azkaban/executor/ExecutorManagerRemoteAdapter.java b/src/java/azkaban/executor/ExecutorManagerRemoteAdapter.java
new file mode 100644
index 0000000..2b3515f
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutorManagerRemoteAdapter.java
@@ -0,0 +1,761 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.IOException;
+import java.lang.Thread.State;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.log4j.Logger;
+
+import azkaban.project.Project;
+import azkaban.scheduler.ScheduleStatisticManager;
+import azkaban.utils.FileIOUtils.JobMetaData;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+
+/**
+ * Executor manager used to manage the client side job.
+ *
+ */
+public class ExecutorManagerRemoteAdapter implements ExecutorManagerAdapter {
+ private static Logger logger = Logger.getLogger(ExecutorManagerRemoteAdapter.class);
+ private ExecutorLoader executorLoader;
+ private String executorHost;
+ private int executorPort;
+ private String executorManagerHost;
+ private int executorManagerPort;
+
+ private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows = new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+ private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished = new ConcurrentHashMap<Integer, ExecutableFlow>();
+
+ private UpdaterThread updater;
+
+ private long lastThreadCheckTime = -1;
+
+ public ExecutorManagerRemoteAdapter(Props props, ExecutorLoader loader) throws ExecutorManagerException {
+ this.executorLoader = loader;
+ this.loadRunningFlows();
+
+ executorHost = props.getString("executor.host", "localhost");
+ executorPort = props.getInt("executor.port");
+
+ executorManagerHost = props.getString(REMOTE_EXECUTOR_MANAGER_HOST);
+ executorManagerPort = props.getInt(REMOTE_EXECUTOR_MANAGER_PORT);
+
+ updater = new UpdaterThread();
+ updater.start();
+
+ }
+
+ @Override
+ public State getExecutorManagerThreadState() {
+ List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+ params.add(new Pair<String, String>(INFO_JMX_TYPE, "getExecutorManagerThreadState"));
+ Map<String, Object> response;
+ try {
+ response = callRemoteExecutorManager(ACTION_GET_JMX, "azkaban", params);
+ return (State) response.get(INFO_JMX_DATA);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ @Override
+ public boolean isExecutorManagerThreadActive() {
+ List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+ params.add(new Pair<String, String>(INFO_JMX_TYPE, "isExecutorManagerThreadActive"));
+ Map<String, Object> response;
+ try {
+ response = callRemoteExecutorManager(ACTION_GET_JMX, "azkaban", params);
+ return (Boolean) response.get(INFO_JMX_DATA);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ @Override
+ public long getLastExecutorManagerThreadCheckTime() {
+ List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+ params.add(new Pair<String, String>(INFO_JMX_TYPE, "getLastExecutorManagerThreadCheckTime"));
+ Map<String, Object> response;
+ try {
+ response = callRemoteExecutorManager(ACTION_GET_JMX, "azkaban", params);
+ return (Long) response.get(INFO_JMX_DATA);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return -1;
+ }
+ }
+
+ @Override
+ public Set<String> getPrimaryServerHosts() {
+ // Only one for now. More probably later.
+ HashSet<String> ports = new HashSet<String>();
+ ports.add(executorHost + ":" + executorPort);
+ return ports;
+ }
+
+ @Override
+ public Set<String> getAllActiveExecutorServerHosts() {
+ // Includes non primary server/hosts
+ HashSet<String> ports = new HashSet<String>();
+ ports.add(executorHost + ":" + executorPort);
+ for(Pair<ExecutionReference, ExecutableFlow> running: runningFlows.values()) {
+ ExecutionReference ref = running.getFirst();
+ ports.add(ref.getHost() + ":" + ref.getPort());
+ }
+
+ return ports;
+ }
+
+// private ExecutableFlow fetchExecutableFlow(int execId) throws ExecutorManagerException {
+// ExecutableFlow exflow = executorLoader.fetchExecutableFlow(execId);
+// return exflow;
+// }
+
+ private void loadRunningFlows() throws ExecutorManagerException {
+ runningFlows.putAll(executorLoader.fetchActiveFlows());
+ }
+
+ @Override
+ public List<Integer> getRunningFlows(int projectId, String flowId) {
+ ArrayList<Integer> executionIds = new ArrayList<Integer>();
+ for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ if (ref.getSecond().getFlowId().equals(flowId)) {
+ executionIds.add(ref.getFirst().getExecId());
+ }
+ }
+ return executionIds;
+ }
+
+ @Override
+ public boolean isFlowRunning(int projectId, String flowId) {
+ for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ if (ref.getSecond().getProjectId() == projectId && ref.getSecond().getFlowId().equals(flowId)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public ExecutableFlow getExecutableFlow(int execId) throws ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> active = runningFlows.get(execId);
+ if (active == null) {
+ return executorLoader.fetchExecutableFlow(execId);
+ }
+ return active.getSecond();
+ }
+
+ @Override
+ public List<ExecutableFlow> getRunningFlows() {
+ ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
+ for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ flows.add(ref.getSecond());
+ }
+ return flows;
+ }
+
+ @Override
+ public List<ExecutableFlow> getRecentlyFinishedFlows() {
+ return new ArrayList<ExecutableFlow>(recentlyFinished.values());
+ }
+
+ @Override
+ public List<ExecutableFlow> getExecutableFlows(Project project, String flowId, int skip, int size) throws ExecutorManagerException {
+ List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
+ return flows;
+ }
+
+ @Override
+ public List<ExecutableFlow> getExecutableFlows(int skip, int size) throws ExecutorManagerException {
+ List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
+ return flows;
+ }
+
+ @Override
+ public List<ExecutableFlow> getExecutableFlows(String flowIdContains, int skip, int size) throws ExecutorManagerException {
+ List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
+ return flows;
+ }
+
+ @Override
+ public List<ExecutableFlow> getExecutableFlows(String projContain, String flowContain, String userContain, int status, long begin, long end, int skip, int size) throws ExecutorManagerException {
+ List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(projContain, flowContain, userContain, status, begin, end , skip, size);
+ return flows;
+ }
+
+ @Override
+ public List<ExecutableJobInfo> getExecutableJobs(Project project, String jobId, int skip, int size) throws ExecutorManagerException {
+ List<ExecutableJobInfo> nodes = executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
+ return nodes;
+ }
+
+ @Override
+ public int getNumberOfJobExecutions(Project project, String jobId) throws ExecutorManagerException{
+ return executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
+ }
+
+ @Override
+ public int getNumberOfExecutions(Project project, String flowId) throws ExecutorManagerException{
+ return executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
+ }
+
+ @Override
+ public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset, int length) throws ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+ if (pair != null) {
+ Pair<String,String> typeParam = new Pair<String,String>("type", "flow");
+ Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
+ Pair<String,String> lengthParam = new Pair<String,String>("length", String.valueOf(length));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, offsetParam, lengthParam);
+ return LogData.createLogDataFromObject(result);
+ }
+ else {
+ LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset, length);
+ return value;
+ }
+ }
+
+ @Override
+ public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+ if (pair != null) {
+ Pair<String,String> typeParam = new Pair<String,String>("type", "job");
+ Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
+ Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
+ Pair<String,String> lengthParam = new Pair<String,String>("length", String.valueOf(length));
+ Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+ return LogData.createLogDataFromObject(result);
+ }
+ else {
+ LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt, offset, length);
+ return value;
+ }
+ }
+
+ @Override
+ public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+ if (pair != null) {
+
+ Pair<String,String> typeParam = new Pair<String,String>("type", "job");
+ Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
+ Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
+ Pair<String,String> lengthParam = new Pair<String,String>("length", String.valueOf(length));
+ Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+ return JobMetaData.createJobMetaDataFromObject(result);
+ }
+ else {
+ return null;
+ }
+ }
+
+ @Override
+ public void cancelFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
+ List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+ params.add(new Pair<String, String>(INFO_EXEC_ID, String.valueOf(exFlow.getExecutionId())));
+ Map<String, Object> response;
+ try {
+ response = callRemoteExecutorManager(ACTION_CANCEL_FLOW, userId, params);
+ if(response.containsKey(INFO_ERROR)) {
+ throw new ExecutorManagerException((String)response.get(INFO_ERROR));
+ }
+ } catch (Exception e) {
+ throw new ExecutorManagerException(e);
+ }
+ }
+
+ @Override
+ public void resumeFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
+ List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+ params.add(new Pair<String, String>(INFO_EXEC_ID, String.valueOf(exFlow.getExecutionId())));
+ Map<String, Object> response;
+ try {
+ response = callRemoteExecutorManager(ACTION_RESUME_FLOW, userId, params);
+ if(response.containsKey(INFO_ERROR)) {
+ throw new ExecutorManagerException((String)response.get(INFO_ERROR));
+ }
+ } catch (Exception e) {
+ throw new ExecutorManagerException(e);
+ }
+ }
+
+ @Override
+ public void pauseFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
+ List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+ params.add(new Pair<String, String>(INFO_EXEC_ID, String.valueOf(exFlow.getExecutionId())));
+ Map<String, Object> response;
+ try {
+ response = callRemoteExecutorManager(ACTION_PAUSE_FLOW, userId, params);
+ if(response.containsKey(INFO_ERROR)) {
+ throw new ExecutorManagerException((String)response.get(INFO_ERROR));
+ }
+ } catch (Exception e) {
+ throw new ExecutorManagerException(e);
+ }
+ }
+
+ @Override
+ public void pauseExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+ modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId, jobIds);
+ }
+
+ @Override
+ public void resumeExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+ modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId, jobIds);
+ }
+
+ @Override
+ public void retryFailures(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
+ modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
+ }
+
+ @Override
+ public void retryExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+ modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId, jobIds);
+ }
+
+ @Override
+ public void disableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+ modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId, jobIds);
+ }
+
+ @Override
+ public void enableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+ modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId, jobIds);
+ }
+
+ @Override
+ public void cancelExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+ modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId, jobIds);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<String, Object> modifyExecutingJobs(ExecutableFlow exFlow, String command, String userId, String ... jobIds) throws ExecutorManagerException {
+ synchronized(exFlow) {
+ Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+ if (pair == null) {
+ throw new ExecutorManagerException("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
+ }
+
+ Map<String, Object> response = null;
+ if (jobIds != null && jobIds.length > 0) {
+ for (String jobId: jobIds) {
+ if (!jobId.isEmpty()) {
+ ExecutableNode node = exFlow.getExecutableNode(jobId);
+ if (node == null) {
+ throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + exFlow.getExecutionId() + ".");
+ }
+ }
+ }
+ String ids = StringUtils.join(jobIds, ',');
+ response = callExecutorServer(
+ pair.getFirst(),
+ ConnectorParams.MODIFY_EXECUTION_ACTION,
+ userId,
+ new Pair<String,String>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
+ new Pair<String,String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
+ }
+ else {
+ response = callExecutorServer(
+ pair.getFirst(),
+ ConnectorParams.MODIFY_EXECUTION_ACTION,
+ userId,
+ new Pair<String,String>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
+ }
+
+ return response;
+ }
+ }
+
+ @Override
+ public String submitExecutableFlow(ExecutableFlow exflow, String userId) throws ExecutorManagerException {
+ List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+ params.add(new Pair<String, String>(INFO_EXEC_FLOW_JSON, JSONUtils.toJSON(exflow.toObject())));
+ Map<String, Object> response;
+ try {
+ response = callRemoteExecutorManager(ACTION_SUBMIT_FLOW, userId, params);
+ if(response.containsKey(INFO_ERROR)) {
+ throw new ExecutorManagerException((String)response.get(INFO_ERROR));
+ }
+ String message = (String) response.get(INFO_MESSAGE);
+ return message;
+ } catch (Exception e) {
+ throw new ExecutorManagerException(e);
+ }
+ }
+
+ private Map<String, Object> callExecutorServer(ExecutionReference ref, String action, Pair<String,String> ... params) throws ExecutorManagerException {
+ try {
+ return callExecutorServer(ref.getHost(), ref.getPort(), action, ref.getExecId(), null, params);
+ } catch (IOException e) {
+ throw new ExecutorManagerException(e);
+ }
+ }
+
+ private Map<String, Object> callExecutorServer(ExecutionReference ref, String action, String user, Pair<String,String> ... params) throws ExecutorManagerException {
+ try {
+ return callExecutorServer(ref.getHost(), ref.getPort(), action, ref.getExecId(), user, params);
+ } catch (IOException e) {
+ throw new ExecutorManagerException(e);
+ }
+ }
+
+ private Map<String, Object> callExecutorServer(String host, int port, String action, Integer executionId, String user, Pair<String,String> ... params) throws IOException {
+ URIBuilder builder = new URIBuilder();
+ builder.setScheme("http")
+ .setHost(host)
+ .setPort(port)
+ .setPath("/executor");
+
+ builder.setParameter(ConnectorParams.ACTION_PARAM, action);
+
+ if (executionId != null) {
+ builder.setParameter(ConnectorParams.EXECID_PARAM,String.valueOf(executionId));
+ }
+
+ if (user != null) {
+ builder.setParameter(ConnectorParams.USER_PARAM, user);
+ }
+
+ if (params != null) {
+ for (Pair<String, String> pair: params) {
+ builder.setParameter(pair.getFirst(), pair.getSecond());
+ }
+ }
+
+ URI uri = null;
+ try {
+ uri = builder.build();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ ResponseHandler<String> responseHandler = new BasicResponseHandler();
+
+ HttpClient httpclient = new DefaultHttpClient();
+ HttpGet httpget = new HttpGet(uri);
+ String response = null;
+ try {
+ response = httpclient.execute(httpget, responseHandler);
+ } catch (IOException e) {
+ throw e;
+ }
+ finally {
+ httpclient.getConnectionManager().shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+ String error = (String)jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
+ if (error != null) {
+ throw new IOException(error);
+ }
+
+ return jsonResponse;
+ }
+
+ @Override
+ public Map<String, Object> callExecutorJMX(String hostPort, String action, String mBean) throws IOException {
+ URIBuilder builder = new URIBuilder();
+
+ String[] hostPortSplit = hostPort.split(":");
+ builder.setScheme("http")
+ .setHost(hostPortSplit[0])
+ .setPort(Integer.parseInt(hostPortSplit[1]))
+ .setPath("/jmx");
+
+ builder.setParameter(action, "");
+ if (mBean != null) {
+ builder.setParameter(ConnectorParams.JMX_MBEAN, mBean);
+ }
+
+ URI uri = null;
+ try {
+ uri = builder.build();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ ResponseHandler<String> responseHandler = new BasicResponseHandler();
+
+ HttpClient httpclient = new DefaultHttpClient();
+ HttpGet httpget = new HttpGet(uri);
+ String response = null;
+ try {
+ response = httpclient.execute(httpget, responseHandler);
+ } catch (IOException e) {
+ throw e;
+ }
+ finally {
+ httpclient.getConnectionManager().shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+ String error = (String)jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
+ if (error != null) {
+ throw new IOException(error);
+ }
+ return jsonResponse;
+ }
+
+ private Map<String, Object> callRemoteExecutorManager(String action, String user, List<Pair<String,String>> params) throws IOException {
+ URIBuilder builder = new URIBuilder();
+ builder.setScheme("http")
+ .setHost(executorManagerHost)
+ .setPort(executorManagerPort)
+ .setPath(ExecutorManagerServlet.URL);
+
+ builder.setParameter(ConnectorParams.ACTION_PARAM, action);
+
+ if (user != null) {
+ builder.setParameter(ConnectorParams.USER_PARAM, user);
+ }
+
+ if (params != null) {
+ for (Pair<String, String> pair: params) {
+ builder.setParameter(pair.getFirst(), pair.getSecond());
+ }
+ }
+
+ URI uri = null;
+ try {
+ uri = builder.build();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ ResponseHandler<String> responseHandler = new BasicResponseHandler();
+
+ HttpClient httpclient = new DefaultHttpClient();
+ HttpGet httpget = new HttpGet(uri);
+ String response = null;
+ try {
+ response = httpclient.execute(httpget, responseHandler);
+ } catch (IOException e) {
+ throw e;
+ }
+ finally {
+ httpclient.getConnectionManager().shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+ String error = (String)jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
+ if (error != null) {
+ throw new IOException(error);
+ }
+
+ return jsonResponse;
+ }
+
+ @Override
+ public void shutdown() {
+ updater.shutdown();
+ }
+
+ private class UpdaterThread extends Thread {
+ private boolean shutdown = false;
+
+ public UpdaterThread() {
+ this.setName("ExecutorManagerRemoteUpdaterThread");
+ }
+
+ // 10 mins recently finished threshold.
+ private long recentlyFinishedLifetimeMs = 600000;
+ private int waitTimeIdleMs = 2000;
+ private int waitTimeMs = 500;
+
+ private void shutdown() {
+ shutdown = true;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void run() {
+ while(!shutdown) {
+ try {
+ lastThreadCheckTime = System.currentTimeMillis();
+
+// loadRunningFlows();
+ List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
+ ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
+
+ List<Long> updateTimesList = new ArrayList<Long>();
+ List<Integer> executionIdsList = new ArrayList<Integer>();
+
+ // We pack the parameters of the same host together before we query.
+ fillUpdateTimeAndExecId(executionIdsList, updateTimesList);
+
+ params.add(new Pair<String, String>(INFO_UPDATE_TIME_LIST, JSONUtils.toJSON(updateTimesList)));
+ params.add(new Pair<String, String>(INFO_EXEC_ID_LIST, JSONUtils.toJSON(executionIdsList)));
+
+ Map<String, Object> results = null;
+ try {
+ results = callRemoteExecutorManager(ACTION_UPDATE, "azkaban", params);
+ } catch (IOException e) {
+ logger.error(e);
+ }
+
+ // We gets results
+ if (results != null) {
+ List<Map<String,Object>> executionUpdates = (List<Map<String,Object>>)results.get(INFO_UPDATES);
+ for (Map<String,Object> updateMap: executionUpdates) {
+ try {
+ ExecutableFlow flow = updateExecution(updateMap);
+ if (isFinished(flow)) {
+ finishedFlows.add(flow);
+ }
+ } catch (ExecutorManagerException e) {
+ logger.error(e);
+ }
+ }
+
+ evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
+ // Add new finished
+ for (ExecutableFlow flow: finishedFlows) {
+ if(flow.getScheduleId() >= 0 && flow.getStatus() == Status.SUCCEEDED){
+ ScheduleStatisticManager.invalidateCache(flow.getScheduleId());
+ }
+ recentlyFinished.put(flow.getExecutionId(), flow);
+ }
+
+
+ }
+
+ synchronized(this) {
+ try {
+ if (runningFlows.size() > 0) {
+ this.wait(waitTimeMs);
+ }
+ else {
+ this.wait(waitTimeIdleMs);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ catch (Exception e) {
+ logger.error(e);
+ }
+ }
+ }
+ }
+
+ private void evictOldRecentlyFinished(long ageMs) {
+ ArrayList<Integer> recentlyFinishedKeys = new ArrayList<Integer>(recentlyFinished.keySet());
+ long oldAgeThreshold = System.currentTimeMillis() - ageMs;
+ for (Integer key: recentlyFinishedKeys) {
+ ExecutableFlow flow = recentlyFinished.get(key);
+
+ if (flow.getEndTime() < oldAgeThreshold) {
+ // Evict
+ recentlyFinished.remove(key);
+ }
+ }
+ }
+
+ private ExecutableFlow updateExecution(Map<String,Object> updateData) throws ExecutorManagerException {
+
+ Integer execId = (Integer)updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
+ if (execId == null) {
+ throw new ExecutorManagerException("Response is malformed. Need exec id to update.");
+ }
+
+ Pair<ExecutionReference, ExecutableFlow> refPair = this.runningFlows.get(execId);
+ if (refPair == null) {
+ throw new ExecutorManagerException("No running flow found with the execution id.");
+ }
+
+ ExecutionReference ref = refPair.getFirst();
+ ExecutableFlow flow = refPair.getSecond();
+ if (updateData.containsKey("error")) {
+ // The flow should be finished here.
+ throw new ExecutorManagerException((String)updateData.get("error"), flow);
+ }
+
+ // Reset errors.
+ ref.setNextCheckTime(0);
+ ref.setNumErrors(0);
+ flow.applyUpdateObject(updateData);
+
+ return flow;
+ }
+
+ public boolean isFinished(ExecutableFlow flow) {
+ switch(flow.getStatus()) {
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ private void fillUpdateTimeAndExecId(List<Integer> executionIds, List<Long> updateTimes) {
+ for (Pair<ExecutionReference, ExecutableFlow> flow: runningFlows.values()) {
+ executionIds.add(flow.getSecond().getExecutionId());
+ updateTimes.add(flow.getSecond().getUpdateTime());
+ }
+ }
+
+ @Override
+ public int getExecutableFlows(int projectId, String flowId, int from, int length, List<ExecutableFlow> outputList) throws ExecutorManagerException {
+ List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(projectId, flowId, from, length);
+ outputList.addAll(flows);
+ return executorLoader.fetchNumExecutableFlows(projectId, flowId);
+ }
+
+ @Override
+ public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId, int from, int length, Status status) throws ExecutorManagerException {
+ return executorLoader.fetchFlowHistory(projectId, flowId, from, length, status);
+ }
+
+
+}
diff --git a/src/java/azkaban/executor/ExecutorManagerServlet.java b/src/java/azkaban/executor/ExecutorManagerServlet.java
new file mode 100644
index 0000000..ab8d5b2
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutorManagerServlet.java
@@ -0,0 +1,225 @@
+package azkaban.executor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.JSONUtils;
+import azkaban.webapp.servlet.AbstractServiceServlet;
+
+
+public class ExecutorManagerServlet extends AbstractServiceServlet {
+ private final ExecutorManagerAdapter executorManager;
+
+ public static final String URL = "executorManager";
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = Logger.getLogger(ExecutorManagerServlet.class);
+
+ public ExecutorManagerServlet(ExecutorManagerAdapter executorManager) {
+ this.executorManager = executorManager;
+ }
+
+ @Override
+ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ HashMap<String,Object> respMap= new HashMap<String,Object>();
+ //logger.info("ExecutorServer called by " + req.getRemoteAddr());
+ try {
+ if (!hasParam(req, ExecutorManagerAdapter.INFO_ACTION)) {
+ logger.error("Parameter action not set");
+ respMap.put("error", "Parameter action not set");
+ }
+ else {
+ String action = getParam(req, ExecutorManagerAdapter.INFO_ACTION);
+ if (action.equals(ExecutorManagerAdapter.ACTION_UPDATE)) {
+ //logger.info("Updated called");
+ handleAjaxUpdateRequest(req, respMap);
+ }
+ else {
+ int execid = Integer.parseInt(getParam(req, ExecutorManagerAdapter.INFO_EXEC_ID));
+ String user = getParam(req, ExecutorManagerAdapter.INFO_USER_ID, null);
+
+ logger.info("User " + user + " has called action " + action + " on " + execid);
+ if (action.equals(ExecutorManagerAdapter.ACTION_GET_FLOW_LOG)) {
+ handleFetchFlowLogEvent(execid, req, resp, respMap);
+ } else if (action.equals(ExecutorManagerAdapter.ACTION_GET_JOB_LOG)) {
+ handleFetchJobLogEvent(execid, req, resp, respMap);
+ }
+ else if (action.equals(ExecutorManagerAdapter.ACTION_SUBMIT_FLOW)) {
+ handleAjaxSubmitFlow(req, respMap, execid);
+ }
+ else if (action.equals(ExecutorManagerAdapter.ACTION_CANCEL_FLOW)) {
+ logger.info("Cancel called.");
+ handleAjaxCancelFlow(respMap, execid, user);
+ }
+ else if (action.equals(ExecutorManagerAdapter.ACTION_PAUSE_FLOW)) {
+ logger.info("Paused called.");
+ handleAjaxPauseFlow(respMap, execid, user);
+ }
+ else if (action.equals(ExecutorManagerAdapter.ACTION_RESUME_FLOW)) {
+ logger.info("Resume called.");
+ handleAjaxResumeFlow(respMap, execid, user);
+ }
+ else if (action.equals(ExecutorManagerAdapter.ACTION_MODIFY_EXECUTION)) {
+ logger.info("Modify Execution Action");
+ handleModifyExecution(respMap, execid, user, req);
+ }
+ else {
+ logger.error("action: '" + action + "' not supported.");
+ respMap.put("error", "action: '" + action + "' not supported.");
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error(e);
+ respMap.put(ExecutorManagerAdapter.INFO_ERROR, e.getMessage());
+ }
+ writeJSON(resp, respMap);
+ resp.flushBuffer();
+ }
+
+ private void handleModifyExecution(HashMap<String, Object> respMap,
+ int execid, String user, HttpServletRequest req) {
+ if (!hasParam(req, ExecutorManagerAdapter.INFO_MODIFY_COMMAND)) {
+ respMap.put(ExecutorManagerAdapter.INFO_ERROR, "Modification command not set.");
+ return;
+ }
+
+ try {
+ String modificationType = getParam(req, ExecutorManagerAdapter.INFO_MODIFY_COMMAND);
+ ExecutableFlow exflow = executorManager.getExecutableFlow(execid);
+ if (ExecutorManagerAdapter.COMMAND_MODIFY_RETRY_FAILURES.equals(modificationType)) {
+ executorManager.retryFailures(exflow, user);
+ }
+ else {
+// String modifiedJobList = getParam(req, MODIFY_JOBS_LIST);
+// String[] jobIds = modifiedJobList.split("\\s*,\\s*");
+//
+// if (MODIFY_RETRY_JOBS.equals(modificationType)) {
+// }
+// else if (MODIFY_CANCEL_JOBS.equals(modificationType)) {
+// }
+// else if (MODIFY_DISABLE_JOBS.equals(modificationType)) {
+// }
+// else if (MODIFY_ENABLE_JOBS.equals(modificationType)) {
+// }
+// else if (MODIFY_PAUSE_JOBS.equals(modificationType)) {
+// }
+// else if (MODIFY_RESUME_JOBS.equals(modificationType)) {
+// }
+ }
+ } catch (Exception e) {
+ respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+ }
+ }
+
+ private void handleAjaxResumeFlow(HashMap<String, Object> respMap, int execid, String user) {
+ try {
+ ExecutableFlow exFlow = executorManager.getExecutableFlow(execid);
+ executorManager.resumeFlow(exFlow, user);
+ } catch (Exception e) {
+ respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+ }
+
+ }
+
+ private void handleAjaxPauseFlow(HashMap<String, Object> respMap, int execid, String user) {
+ try {
+ ExecutableFlow exFlow = executorManager.getExecutableFlow(execid);
+ executorManager.pauseFlow(exFlow, user);
+ } catch (Exception e) {
+ respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+ }
+ }
+
+ private void handleAjaxCancelFlow(HashMap<String, Object> respMap, int execid, String user) {
+ try {
+ ExecutableFlow exFlow = executorManager.getExecutableFlow(execid);
+ executorManager.cancelFlow(exFlow, user);
+ } catch (Exception e) {
+ respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+ }
+ }
+
+ private void handleAjaxSubmitFlow(HttpServletRequest req, HashMap<String, Object> respMap, int execid) {
+ try{
+ String execFlowJson = getParam(req, ExecutorManagerAdapter.INFO_EXEC_FLOW_JSON);
+ ExecutableFlow exflow = ExecutableFlow.createExecutableFlowFromObject(JSONUtils.parseJSONFromString(execFlowJson));
+ String user = getParam(req, ExecutorManagerAdapter.INFO_USER_ID);
+ executorManager.submitExecutableFlow(exflow, user);
+ respMap.put(ExecutorManagerAdapter.INFO_EXEC_ID, exflow.getExecutionId());
+ } catch (Exception e) {
+ e.printStackTrace();
+ respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+ }
+ }
+
+ private void handleFetchJobLogEvent(int execid, HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> respMap) {
+ try{
+ ExecutableFlow exFlow = executorManager.getExecutableFlow(execid);
+ String jobId = getParam(req, ExecutorManagerAdapter.INFO_JOB_NAME);
+ int offset = getIntParam(req, ExecutorManagerAdapter.INFO_OFFSET);
+ int length = getIntParam(req, ExecutorManagerAdapter.INFO_LENGTH);
+ int attempt = getIntParam(req, ExecutorManagerAdapter.INFO_ATTEMPT);
+ LogData log = executorManager.getExecutionJobLog(exFlow, jobId, offset, length, attempt);
+ respMap.put(ExecutorManagerAdapter.INFO_LOG, JSONUtils.toJSON(log.toObject()));
+ } catch (Exception e) {
+ e.printStackTrace();
+ respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+ }
+ }
+
+ private void handleFetchFlowLogEvent(int execid, HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> respMap) {
+ try{
+ ExecutableFlow exFlow = executorManager.getExecutableFlow(execid);
+ int offset = getIntParam(req, ExecutorManagerAdapter.INFO_OFFSET);
+ int length = getIntParam(req, ExecutorManagerAdapter.INFO_LENGTH);
+ LogData log = executorManager.getExecutableFlowLog(exFlow, offset, length);
+ respMap.put(ExecutorManagerAdapter.INFO_LOG, JSONUtils.toJSON(log.toObject()));
+ } catch (Exception e) {
+ e.printStackTrace();
+ respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private void handleAjaxUpdateRequest(HttpServletRequest req, HashMap<String, Object> respMap) {
+ try {
+ ArrayList<Object> updateTimesList = (ArrayList<Object>)JSONUtils.parseJSONFromString(getParam(req, ExecutorManagerAdapter.INFO_UPDATE_TIME_LIST));
+ ArrayList<Object> execIDList = (ArrayList<Object>)JSONUtils.parseJSONFromString(getParam(req, ExecutorManagerAdapter.INFO_EXEC_ID_LIST));
+
+ ArrayList<Object> updateList = new ArrayList<Object>();
+ for (int i = 0; i < execIDList.size(); ++i) {
+ long updateTime = JSONUtils.getLongFromObject(updateTimesList.get(i));
+ int execId = (Integer)execIDList.get(i);
+
+ ExecutableFlow flow = executorManager.getExecutableFlow(execId);
+ if (flow == null) {
+ Map<String, Object> errorResponse = new HashMap<String,Object>();
+ errorResponse.put(ExecutorManagerAdapter.INFO_ERROR, "Flow does not exist");
+ errorResponse.put(ExecutorManagerAdapter.INFO_EXEC_ID, execId);
+ updateList.add(errorResponse);
+ continue;
+ }
+
+ if (flow.getUpdateTime() > updateTime) {
+ updateList.add(flow.toUpdateObject(updateTime));
+ }
+ }
+
+ respMap.put(ExecutorManagerAdapter.INFO_UPDATES, updateList);
+ } catch (Exception e) {
+ e.printStackTrace();
+ respMap.put(ExecutorManagerAdapter.INFO_ERROR, e);
+ }
+ }
+
+}
diff --git a/src/java/azkaban/jmx/JmxExecutorManagerAdapter.java b/src/java/azkaban/jmx/JmxExecutorManagerAdapter.java
new file mode 100644
index 0000000..67edbc9
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxExecutorManagerAdapter.java
@@ -0,0 +1,47 @@
+package azkaban.jmx;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
+
+public class JmxExecutorManagerAdapter implements JmxExecutorManagerAdapterMBean {
+ private ExecutorManagerAdapter manager;
+
+ public JmxExecutorManagerAdapter(ExecutorManagerAdapter manager) {
+ this.manager = manager;
+ }
+
+ @Override
+ public int getNumRunningFlows() {
+ try {
+ return this.manager.getRunningFlows().size();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return 0;
+ }
+ }
+
+ @Override
+ public String getExecutorManagerThreadState() {
+ return manager.getExecutorManagerThreadState().toString();
+ }
+
+ @Override
+ public boolean isExecutorManagerThreadActive() {
+ return manager.isExecutorManagerThreadActive();
+ }
+
+ @Override
+ public Long getLastExecutorManagerThreadCheckTime() {
+ return manager.getLastExecutorManagerThreadCheckTime();
+ }
+
+ @Override
+ public List<String> getPrimaryExecutorHostPorts() {
+ return new ArrayList<String>(manager.getPrimaryServerHosts());
+ }
+}
diff --git a/src/java/azkaban/jmx/JmxExecutorManagerAdapterMBean.java b/src/java/azkaban/jmx/JmxExecutorManagerAdapterMBean.java
new file mode 100644
index 0000000..b9742cb
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxExecutorManagerAdapterMBean.java
@@ -0,0 +1,20 @@
+package azkaban.jmx;
+
+import java.util.List;
+
+public interface JmxExecutorManagerAdapterMBean {
+ @DisplayName("OPERATION: getNumRunningFlows")
+ public int getNumRunningFlows();
+
+ @DisplayName("OPERATION: getExecutorThreadState")
+ public String getExecutorManagerThreadState();
+
+ @DisplayName("OPERATION: isThreadActive")
+ public boolean isExecutorManagerThreadActive();
+
+ @DisplayName("OPERATION: getLastThreadCheckTime")
+ public Long getLastExecutorManagerThreadCheckTime();
+
+ @DisplayName("OPERATION: getPrimaryExecutorHostPorts")
+ public List<String> getPrimaryExecutorHostPorts();
+}
src/java/azkaban/jmx/JmxTriggerManager.java 41(+25 -16)
diff --git a/src/java/azkaban/jmx/JmxTriggerManager.java b/src/java/azkaban/jmx/JmxTriggerManager.java
index 0bacf7b..dbee790 100644
--- a/src/java/azkaban/jmx/JmxTriggerManager.java
+++ b/src/java/azkaban/jmx/JmxTriggerManager.java
@@ -6,47 +6,56 @@ import java.util.List;
import org.joda.time.DateTime;
import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerAdapter;
+import azkaban.trigger.TriggerManagerAdapter.TriggerJMX;
public class JmxTriggerManager implements JmxTriggerManagerMBean {
- private TriggerManager manager;
+ private TriggerJMX jmxStats;
- public JmxTriggerManager(TriggerManager manager) {
- this.manager = manager;
+ public JmxTriggerManager(TriggerManagerAdapter manager) {
+ this.jmxStats = manager.getJMX();
}
@Override
- public String getLastThreadCheckTime() {
- return new DateTime(manager.getLastThreadCheckTime()).toString();
+ public String getLastRunnerThreadCheckTime() {
+ return new DateTime(jmxStats.getLastRunnerThreadCheckTime()).toString();
}
@Override
- public boolean isThreadActive() {
- return manager.isThreadActive();
+ public boolean isRunnerThreadActive() {
+ return jmxStats.isRunnerThreadActive();
}
@Override
- public List<String> getPrimaryTriggerHostPorts() {
- return new ArrayList<String>(manager.getPrimaryServerHosts());
+ public String getPrimaryTriggerHostPort() {
+ return jmxStats.getPrimaryServerHost();
}
- @Override
- public List<String> getAllTriggerHostPorts() {
- return new ArrayList<String>(manager.getAllActiveTriggerServerHosts());
- }
+// @Override
+// public List<String> getAllTriggerHostPorts() {
+// return new ArrayList<String>(manager.getAllActiveTriggerServerHosts());
+// }
@Override
public int getNumTriggers() {
- return manager.getNumTriggers();
+ return jmxStats.getNumTriggers();
}
@Override
public String getTriggerSources() {
- return manager.getTriggerSources();
+ return jmxStats.getTriggerSources();
}
@Override
public String getTriggerIds() {
- return manager.getTriggerIds();
+ return jmxStats.getTriggerIds();
}
+
+ @Override
+ public long getScannerIdleTime() {
+ return jmxStats.getScannerIdleTime();
+ }
+
+
}
diff --git a/src/java/azkaban/jmx/JmxTriggerManagerMBean.java b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
index af3cc9a..6302885 100644
--- a/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
@@ -5,16 +5,16 @@ import java.util.List;
public interface JmxTriggerManagerMBean {
@DisplayName("OPERATION: getLastThreadCheckTime")
- public String getLastThreadCheckTime();
+ public String getLastRunnerThreadCheckTime();
@DisplayName("OPERATION: isThreadActive")
- public boolean isThreadActive();
+ public boolean isRunnerThreadActive();
- @DisplayName("OPERATION: getPrimaryTriggerHostPorts")
- public List<String> getPrimaryTriggerHostPorts();
+ @DisplayName("OPERATION: getPrimaryTriggerHostPort")
+ public String getPrimaryTriggerHostPort();
- @DisplayName("OPERATION: getAllTriggerHostPorts")
- public List<String> getAllTriggerHostPorts();
+// @DisplayName("OPERATION: getAllTriggerHostPorts")
+// public List<String> getAllTriggerHostPorts();
@DisplayName("OPERATION: getNumTriggers")
public int getNumTriggers();
@@ -24,4 +24,7 @@ public interface JmxTriggerManagerMBean {
@DisplayName("OPERATION: getTriggerIds")
public String getTriggerIds();
+
+ @DisplayName("OPERATION: getScannerIdleTime")
+ public long getScannerIdleTime();
}
src/java/azkaban/project/ProjectManager.java 36(+16 -20)
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index eea9c46..e4a4ab1 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -37,7 +37,6 @@ public class ProjectManager {
private final int projectVersionRetention;
private final boolean creatorDefaultPermissions;
- private TriggerManager triggerManager;
private boolean loadTriggerFromFile = false;
public ProjectManager(ProjectLoader loader, Props props) {
@@ -58,10 +57,6 @@ public class ProjectManager {
loadAllProjects();
}
- public void setTriggerManager(TriggerManager triggerManager) {
- this.triggerManager = triggerManager;
- }
-
public void setLoadTriggerFromFile(boolean enable) {
this.loadTriggerFromFile = enable;
}
@@ -354,21 +349,22 @@ public class ProjectManager {
logger.info("Uploading Props properties");
projectLoader.uploadProjectProperties(project, propProps);
}
-
- if(loadTriggerFromFile) {
- logger.info("Loading triggers.");
- Props triggerProps = new Props();
- triggerProps.put("projectId", project.getId());
- triggerProps.put("projectName", project.getName());
- triggerProps.put("submitUser", uploader.getUserId());
- try {
- triggerManager.loadTriggerFromDir(file, triggerProps);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- logger.error("Failed to load triggers.", e);
- }
- }
+
+ //TODO: find something else to load triggers
+// if(loadTriggerFromFile) {
+// logger.info("Loading triggers.");
+// Props triggerProps = new Props();
+// triggerProps.put("projectId", project.getId());
+// triggerProps.put("projectName", project.getName());
+// triggerProps.put("submitUser", uploader.getUserId());
+// try {
+// triggerManager.loadTriggerFromDir(file, triggerProps);
+// } catch (Exception e) {
+// // TODO Auto-generated catch block
+// e.printStackTrace();
+// logger.error("Failed to load triggers.", e);
+// }
+// }
logger.info("Uploaded project files. Cleaning up temp files.");
projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(), "Uploaded project files zip " + archive.getName());
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index ad4104a..fcf079d 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -37,6 +37,7 @@ import org.joda.time.format.DateTimeFormatter;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.project.Project;
@@ -64,7 +65,7 @@ public class ScheduleManager implements TriggerAgent {
private Map<Integer, Schedule> scheduleIDMap = new LinkedHashMap<Integer, Schedule>();
private Map<Pair<Integer, String>, Schedule> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Schedule>();
- private final ExecutorManager executorManager;
+ private final ExecutorManagerAdapter executorManager;
private ProjectManager projectManager = null;
@@ -80,7 +81,7 @@ public class ScheduleManager implements TriggerAgent {
*
* @param loader
*/
- public ScheduleManager (ExecutorManager executorManager,
+ public ScheduleManager (ExecutorManagerAdapter executorManager,
ScheduleLoader loader,
boolean useExternalRunner)
{
@@ -539,7 +540,7 @@ public class ScheduleManager implements TriggerAgent {
}
try {
- executorManager.submitExecutableFlow(exflow);
+ executorManager.submitExecutableFlow(exflow, s.getSubmitUser());
logger.info("Scheduler has invoked " + exflow.getExecutionId());
}
catch (ExecutorManagerException e) {
diff --git a/src/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
index 4c430de..3aaf9ff 100644
--- a/src/java/azkaban/scheduler/ScheduleStatisticManager.java
+++ b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -9,6 +9,7 @@ import java.util.Map;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.utils.JSONUtils;
@@ -35,7 +36,7 @@ public class ScheduleStatisticManager {
private static Map<String, Object> calculateStats(int scheduleId, AzkabanWebServer server) throws ScheduleManagerException {
Map<String, Object> data = new HashMap<String, Object>();
- ExecutorManager executorManager = server.getExecutorManager();
+ ExecutorManagerAdapter executorManager = server.getExecutorManager();
ScheduleManager scheduleManager = server.getScheduleManager();
Schedule schedule = scheduleManager.getSchedule(scheduleId);
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index ba6bdd0..1f638d7 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -8,9 +8,8 @@ import java.util.Map;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
-import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.project.ProjectManager;
-import azkaban.sla.SlaOption;
import azkaban.trigger.Condition;
import azkaban.trigger.ConditionChecker;
import azkaban.trigger.Trigger;
@@ -18,9 +17,7 @@ import azkaban.trigger.TriggerAction;
import azkaban.trigger.TriggerManager;
import azkaban.trigger.TriggerManagerException;
import azkaban.trigger.builtin.BasicTimeChecker;
-import azkaban.trigger.builtin.CreateTriggerAction;
import azkaban.trigger.builtin.ExecuteFlowAction;
-import azkaban.trigger.builtin.SlaChecker;
public class TriggerBasedScheduleLoader implements ScheduleLoader {
@@ -33,7 +30,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
// private Map<Integer, Trigger> triggersLocalCopy;
private long lastUpdateTime = -1;
- public TriggerBasedScheduleLoader(TriggerManager triggerManager, ExecutorManager executorManager, ProjectManager projectManager, String triggerSource) {
+ public TriggerBasedScheduleLoader(TriggerManager triggerManager, ExecutorManagerAdapter executorManager, ProjectManager projectManager, String triggerSource) {
this.triggerManager = triggerManager;
this.triggerSource = triggerSource;
// need to init the action types and condition checker types
@@ -46,7 +43,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
Condition triggerCondition = createTimeTriggerCondition(s);
Condition expireCondition = createTimeExpireCondition(s);
List<TriggerAction> actions = createActions(s);
- Trigger t = new Trigger(s.getScheduleId(), new DateTime(s.getLastModifyTime()), new DateTime(s.getSubmitTime()), s.getSubmitUser(), triggerSource, triggerCondition, expireCondition, actions);
+ Trigger t = new Trigger(s.getScheduleId(), s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), triggerSource, triggerCondition, expireCondition, actions);
if(s.isRecurring()) {
t.setResetOnTrigger(true);
}
@@ -72,7 +69,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
private Condition createTimeTriggerCondition (Schedule s) {
Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
- ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_1", new DateTime(s.getFirstSchedTime()), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+ ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_1", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
checkers.put(checker.getId(), checker);
String expr = checker.getId() + ".eval()";
Condition cond = new Condition(checkers, expr);
@@ -82,7 +79,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
// if failed to trigger, auto expire?
private Condition createTimeExpireCondition (Schedule s) {
Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
- ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_2", new DateTime(s.getFirstSchedTime()), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+ ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_2", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
checkers.put(checker.getId(), checker);
String expr = checker.getId() + ".eval()";
Condition cond = new Condition(checkers, expr);
@@ -122,7 +119,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
List<Schedule> schedules = new ArrayList<Schedule>();
// triggersLocalCopy = new HashMap<Integer, Trigger>();
for(Trigger t : triggers) {
- lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
+ lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime());
Schedule s = triggerToSchedule(t);
schedules.add(s);
System.out.println("loaded schedule for " + s.getProjectId() + s.getProjectName());
@@ -156,12 +153,12 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
act.getProjectName(),
act.getFlowName(),
t.getStatus().toString(),
- ck.getFirstCheckTime().getMillis(),
- ck.getFirstCheckTime().getZone(),
+ ck.getFirstCheckTime(),
+ ck.getTimeZone(),
ck.getPeriod(),
- t.getLastModifyTime().getMillis(),
- ck.getNextCheckTime().getMillis(),
- t.getSubmitTime().getMillis(),
+ t.getLastModifyTime(),
+ ck.getNextCheckTime(),
+ t.getSubmitTime(),
t.getSubmitUser(),
act.getExecutionOptions(),
act.getSlaOptions());
@@ -196,7 +193,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
public synchronized List<Schedule> loadUpdatedSchedules() throws ScheduleManagerException {
List<Trigger> triggers;
try {
- triggers = triggerManager.getUpdatedTriggers(triggerSource, lastUpdateTime);
+ triggers = triggerManager.getTriggerUpdates(triggerSource, lastUpdateTime);
} catch (TriggerManagerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -204,7 +201,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
}
List<Schedule> schedules = new ArrayList<Schedule>();
for(Trigger t : triggers) {
- lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
+ lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime());
Schedule s = triggerToSchedule(t);
schedules.add(s);
System.out.println("loaded schedule for " + s.getProjectId() + s.getProjectName());
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduler.java b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
index 28b0328..29512f7 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduler.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
@@ -29,6 +29,7 @@ import org.joda.time.format.DateTimeFormatter;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.project.ProjectManager;
import azkaban.sla.SlaOption;
import azkaban.trigger.TriggerManager;
@@ -55,7 +56,7 @@ public class TriggerBasedScheduler {
*
* @param loader
*/
- public TriggerBasedScheduler(ExecutorManager executorManager,
+ public TriggerBasedScheduler(ExecutorManagerAdapter executorManager,
ProjectManager projectManager,
TriggerManager triggerManager,
ScheduleLoader loader)
diff --git a/src/java/azkaban/trigger/builtin/BasicTimeChecker.java b/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
index 6071019..13eaf27 100644
--- a/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
+++ b/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
@@ -14,8 +14,8 @@ public class BasicTimeChecker implements ConditionChecker {
public static final String type = "BasicTimeChecker";
- private DateTime firstCheckTime;
- private DateTime nextCheckTime;
+ private long firstCheckTime;
+ private long nextCheckTime;
private DateTimeZone timezone;
private boolean isRecurring = true;
private boolean skipPastChecks = true;
@@ -25,7 +25,7 @@ public class BasicTimeChecker implements ConditionChecker {
public BasicTimeChecker(
String id,
- DateTime firstCheckTime,
+ long firstCheckTime,
DateTimeZone timezone,
boolean isRecurring,
boolean skipPastChecks,
@@ -36,13 +36,17 @@ public class BasicTimeChecker implements ConditionChecker {
this.isRecurring = isRecurring;
this.skipPastChecks = skipPastChecks;
this.period = period;
- this.nextCheckTime = new DateTime(firstCheckTime);
+ this.nextCheckTime = firstCheckTime;
this.nextCheckTime = calculateNextCheckTime();
}
- public DateTime getFirstCheckTime() {
+ public long getFirstCheckTime() {
return firstCheckTime;
}
+
+ public DateTimeZone getTimeZone() {
+ return timezone;
+ }
public boolean isRecurring() {
return isRecurring;
@@ -56,7 +60,7 @@ public class BasicTimeChecker implements ConditionChecker {
return period;
}
- public DateTime getNextCheckTime() {
+ public long getNextCheckTime() {
return nextCheckTime;
}
@@ -75,9 +79,9 @@ public class BasicTimeChecker implements ConditionChecker {
public BasicTimeChecker(
String id,
- DateTime firstCheckTime,
+ long firstCheckTime,
DateTimeZone timezone,
- DateTime nextCheckTime,
+ long nextCheckTime,
boolean isRecurring,
boolean skipPastChecks,
ReadablePeriod period) {
@@ -92,7 +96,7 @@ public class BasicTimeChecker implements ConditionChecker {
@Override
public Boolean eval() {
- return nextCheckTime.isBeforeNow();
+ return nextCheckTime < System.currentTimeMillis();
}
@Override
@@ -125,21 +129,22 @@ public class BasicTimeChecker implements ConditionChecker {
@SuppressWarnings("unchecked")
public static BasicTimeChecker createFromJson(Object obj) throws Exception {
- Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
- if(!jsonObj.get("type").equals(type)) {
- throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
- }
- long firstTimeMillis = Long.valueOf((String)jsonObj.get("firstCheckTime"));
- String timezoneId = (String) jsonObj.get("timezone");
- long nextTimeMillis = Long.valueOf((String)jsonObj.get("nextCheckTime"));
- DateTimeZone timezone = DateTimeZone.forID(timezoneId);
- DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
- DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
- boolean isRecurring = Boolean.valueOf((String)jsonObj.get("isRecurring"));
- boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
- ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
- String id = (String) jsonObj.get("id");
- return new BasicTimeChecker(id, firstCheckTime, timezone, nextCheckTime, isRecurring, skipPastChecks, period);
+// Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+// if(!jsonObj.get("type").equals(type)) {
+// throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
+// }
+// long firstCheckTime = Long.valueOf((String)jsonObj.get("firstCheckTime"));
+// String timezoneId = (String) jsonObj.get("timezone");
+// long nextCheckTime = Long.valueOf((String)jsonObj.get("nextCheckTime"));
+// DateTimeZone timezone = DateTimeZone.forID(timezoneId);
+//// DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
+//// DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
+// boolean isRecurring = Boolean.valueOf((String)jsonObj.get("isRecurring"));
+// boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
+// ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
+// String id = (String) jsonObj.get("id");
+// return new BasicTimeChecker(id, firstCheckTime, timezone, nextCheckTime, isRecurring, skipPastChecks, period);
+ return createFromJson((HashMap<String, Object>)obj);
}
public static BasicTimeChecker createFromJson(HashMap<String, Object> obj) throws Exception {
@@ -147,12 +152,12 @@ public class BasicTimeChecker implements ConditionChecker {
if(!jsonObj.get("type").equals(type)) {
throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
}
- Long firstTimeMillis = Long.valueOf((String) jsonObj.get("firstCheckTime"));
+ Long firstCheckTime = Long.valueOf((String) jsonObj.get("firstCheckTime"));
String timezoneId = (String) jsonObj.get("timezone");
- long nextTimeMillis = Long.valueOf((String) jsonObj.get("nextCheckTime"));
+ long nextCheckTime = Long.valueOf((String) jsonObj.get("nextCheckTime"));
DateTimeZone timezone = DateTimeZone.forID(timezoneId);
- DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
- DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
+// DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
+// DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
boolean isRecurring = Boolean.valueOf((String)jsonObj.get("isRecurring"));
boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
@@ -212,10 +217,10 @@ public class BasicTimeChecker implements ConditionChecker {
nextCheckTime = calculateNextCheckTime();
}
- private DateTime calculateNextCheckTime(){
- DateTime date = new DateTime(nextCheckTime);
+ private long calculateNextCheckTime(){
+ DateTime date = new DateTime(nextCheckTime).withZone(timezone);
int count = 0;
- while(!DateTime.now().isBefore(date)) {
+ while(!date.isAfterNow()) {
if(count > 100000) {
throw new IllegalStateException("100000 increments of period did not get to present time.");
}
@@ -230,7 +235,7 @@ public class BasicTimeChecker implements ConditionChecker {
continue;
}
}
- return date;
+ return date.getMillis();
}
@Override
@@ -243,9 +248,9 @@ public class BasicTimeChecker implements ConditionChecker {
public Object toJson() {
Map<String, Object> jsonObj = new HashMap<String, Object>();
jsonObj.put("type", type);
- jsonObj.put("firstCheckTime", String.valueOf(firstCheckTime.getMillis()));
+ jsonObj.put("firstCheckTime", String.valueOf(firstCheckTime));
jsonObj.put("timezone", timezone.getID());
- jsonObj.put("nextCheckTime", String.valueOf(nextCheckTime.getMillis()));
+ jsonObj.put("nextCheckTime", String.valueOf(nextCheckTime));
jsonObj.put("isRecurrint", String.valueOf(isRecurring));
jsonObj.put("skipPastChecks", String.valueOf(skipPastChecks));
jsonObj.put("period", Utils.createPeriodString(period));
diff --git a/src/java/azkaban/trigger/builtin/CreateTriggerAction.java b/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
index 4f8baac..b4c5f84 100644
--- a/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
+++ b/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
@@ -5,12 +5,12 @@ import java.util.Map;
import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAction;
-import azkaban.triggerapp.TriggerRunnerManager;
+import azkaban.trigger.TriggerManager;
public class CreateTriggerAction implements TriggerAction {
public static final String type = "CreateTriggerAction";
- private static TriggerRunnerManager triggerRunnerManager;
+ private static TriggerManager triggerManager;
private Trigger trigger;
private Map<String, Object> context;
private String actionId;
@@ -25,8 +25,8 @@ public class CreateTriggerAction implements TriggerAction {
return type;
}
- public static void setTriggerRunnerManager(TriggerRunnerManager trm) {
- triggerRunnerManager = trm;
+ public static void setTriggerManager(TriggerManager trm) {
+ triggerManager = trm;
}
@SuppressWarnings("unchecked")
@@ -58,7 +58,7 @@ public class CreateTriggerAction implements TriggerAction {
@Override
public void doAction() throws Exception {
- triggerRunnerManager.insertTrigger(trigger);
+ triggerManager.insertTrigger(trigger);
}
@Override
diff --git a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index 1ff52cb..6124b22 100644
--- a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -10,6 +10,7 @@ import org.apache.log4j.Logger;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.project.Project;
@@ -19,7 +20,7 @@ import azkaban.trigger.Condition;
import azkaban.trigger.ConditionChecker;
import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAction;
-import azkaban.triggerapp.TriggerRunnerManager;
+import azkaban.trigger.TriggerManager;
public class ExecuteFlowAction implements TriggerAction {
@@ -27,8 +28,8 @@ public class ExecuteFlowAction implements TriggerAction {
public static final String EXEC_ID = "ExecuteFlowAction.execid";
- private static ExecutorManager executorManager;
- private static TriggerRunnerManager triggerRunnerManager;
+ private static ExecutorManagerAdapter executorManager;
+ private static TriggerManager triggerManager;
private String actionId;
private int projectId;
private String projectName;
@@ -99,20 +100,20 @@ public class ExecuteFlowAction implements TriggerAction {
this.slaOptions = slaOptions;
}
- public static ExecutorManager getExecutorManager() {
+ public static ExecutorManagerAdapter getExecutorManager() {
return executorManager;
}
- public static void setExecutorManager(ExecutorManager executorManager) {
+ public static void setExecutorManager(ExecutorManagerAdapter executorManager) {
ExecuteFlowAction.executorManager = executorManager;
}
- public static TriggerRunnerManager getTriggerRunnerManager() {
- return triggerRunnerManager;
+ public static TriggerManager getTriggerManager() {
+ return triggerManager;
}
- public static void setTriggerRunnerManager(TriggerRunnerManager triggerRunnerManager) {
- ExecuteFlowAction.triggerRunnerManager = triggerRunnerManager;
+ public static void setTriggerManager(TriggerManager triggerManager) {
+ ExecuteFlowAction.triggerManager = triggerManager;
}
public static ProjectManager getProjectManager() {
@@ -214,7 +215,7 @@ public class ExecuteFlowAction implements TriggerAction {
exflow.setExecutionOptions(executionOptions);
try{
- executorManager.submitExecutableFlow(exflow);
+ executorManager.submitExecutableFlow(exflow, submitUser);
Map<String, Object> outputProps = new HashMap<String, Object>();
outputProps.put(EXEC_ID, exflow.getExecutionId());
context.put(actionId, outputProps);
@@ -250,7 +251,7 @@ public class ExecuteFlowAction implements TriggerAction {
Trigger slaTrigger = new Trigger("azkaban_sla", "azkaban", triggerCond, expireCond, actions);
slaTrigger.setResetOnTrigger(false);
slaTrigger.setResetOnExpire(false);
- triggerRunnerManager.insertTrigger(slaTrigger);
+ triggerManager.insertTrigger(slaTrigger);
}
}
diff --git a/src/java/azkaban/trigger/builtin/KillExecutionAction.java b/src/java/azkaban/trigger/builtin/KillExecutionAction.java
index 1da405d..0dabe73 100644
--- a/src/java/azkaban/trigger/builtin/KillExecutionAction.java
+++ b/src/java/azkaban/trigger/builtin/KillExecutionAction.java
@@ -7,6 +7,7 @@ import org.apache.log4j.Logger;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.trigger.TriggerAction;
public class KillExecutionAction implements TriggerAction{
@@ -17,14 +18,14 @@ public class KillExecutionAction implements TriggerAction{
private String actionId;
private int execId;
- private static ExecutorManager executorManager;
+ private static ExecutorManagerAdapter executorManager;
public KillExecutionAction(String actionId, int execId) {
this.execId = execId;
this.actionId = actionId;
}
- public static void setExecutorManager(ExecutorManager em) {
+ public static void setExecutorManager(ExecutorManagerAdapter em) {
executorManager = em;
}
@@ -73,7 +74,7 @@ public class KillExecutionAction implements TriggerAction{
public void doAction() throws Exception {
ExecutableFlow exFlow = executorManager.getExecutableFlow(execId);
logger.info("ready to kill execution " + execId);
- if(!executorManager.isFinished(exFlow)) {
+ if(!ExecutableFlow.isFinished(exFlow)) {
logger.info("Killing execution " + execId);
executorManager.cancelFlow(exFlow, "azkaban_sla");
}
diff --git a/src/java/azkaban/trigger/builtin/SlaAlertAction.java b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
index ead2124..11c6bd5 100644
--- a/src/java/azkaban/trigger/builtin/SlaAlertAction.java
+++ b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
@@ -16,6 +16,7 @@ import org.apache.log4j.Logger;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorMailer;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManager.Alerter;
import azkaban.executor.ExecutorManagerException;
import azkaban.sla.SlaOption;
@@ -36,7 +37,7 @@ public class SlaAlertAction implements TriggerAction{
// private List<Map<String, Object>> alerts;
private static Map<String, Alerter> alerters;
private Map<String, Object> context;
- private static ExecutorManager executorManager;
+ private static ExecutorManagerAdapter executorManager;
public SlaAlertAction(String id, SlaOption slaOption, int execId) {
this.actionId = id;
@@ -49,7 +50,7 @@ public class SlaAlertAction implements TriggerAction{
alerters = alts;
}
- public static void setExecutorManager(ExecutorManager em) {
+ public static void setExecutorManager(ExecutorManagerAdapter em) {
executorManager = em;
}
diff --git a/src/java/azkaban/trigger/builtin/SlaChecker.java b/src/java/azkaban/trigger/builtin/SlaChecker.java
index e9826eb..05a00ac 100644
--- a/src/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/src/java/azkaban/trigger/builtin/SlaChecker.java
@@ -11,6 +11,7 @@ import org.joda.time.ReadablePeriod;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.sla.SlaOption;
@@ -28,7 +29,7 @@ public class SlaChecker implements ConditionChecker{
private Map<String, Object> context;
private boolean passChecker = true;
- private static ExecutorManager executorManager;
+ private static ExecutorManagerAdapter executorManager;
public SlaChecker(String id, SlaOption slaOption, int execId, boolean passChecker) {
this.id = id;
@@ -46,7 +47,7 @@ public class SlaChecker implements ConditionChecker{
this.passChecker = passChecker;
}
- public static void setExecutorManager(ExecutorManager em) {
+ public static void setExecutorManager(ExecutorManagerAdapter em) {
executorManager = em;
}
@@ -226,4 +227,10 @@ public class SlaChecker implements ConditionChecker{
this.context = context;
}
+ @Override
+ public long getNextCheckTime() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
src/java/azkaban/trigger/Condition.java 20(+19 -1)
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index 451285b..0311b54 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -9,6 +9,7 @@ import org.apache.commons.jexl2.Expression;
import org.apache.commons.jexl2.JexlEngine;
import org.apache.commons.jexl2.MapContext;
import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
public class Condition {
@@ -20,12 +21,19 @@ public class Condition {
private Expression expression;
private Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
private MapContext context = new MapContext();
+ private long nextCheckTime = -1;
public Condition(Map<String, ConditionChecker> checkers, String expr) {
setCheckers(checkers);
this.expression = jexl.createExpression(expr);
}
+ public Condition(Map<String, ConditionChecker> checkers, String expr, long nextCheckTime) {
+ this.nextCheckTime = nextCheckTime;
+ setCheckers(checkers);
+ this.expression = jexl.createExpression(expr);
+ }
+
public synchronized static void setJexlEngine(JexlEngine jexl) {
Condition.jexl = jexl;
}
@@ -43,6 +51,10 @@ public class Condition {
context.set(checker.getId(), checker);
}
+ public long getNextCheckTime() {
+ return nextCheckTime;
+ }
+
public Map<String, ConditionChecker> getCheckers() {
return this.checkers;
}
@@ -55,9 +67,13 @@ public class Condition {
}
public void resetCheckers() {
+ long time = Long.MAX_VALUE;
for(ConditionChecker checker : checkers.values()) {
checker.reset();
+ time = Math.min(time, checker.getNextCheckTime());
}
+ logger.error("Done resetting checkers. The next check time will be " + new DateTime(time));
+ this.nextCheckTime = time;
}
public String getExpression() {
@@ -84,6 +100,7 @@ public class Condition {
checkersJson.add(oneChecker);
}
jsonObj.put("checkers", checkersJson);
+ jsonObj.put("nextCheckTime", String.valueOf(nextCheckTime));
return jsonObj;
}
@@ -107,8 +124,9 @@ public class Condition {
checkers.put(ck.getId(), ck);
}
String expr = (String) jsonObj.get("expression");
+ long nextCheckTime = Long.valueOf((String) jsonObj.get("nextCheckTime"));
- cond = new Condition(checkers, expr);
+ cond = new Condition(checkers, expr, nextCheckTime);
} catch(Exception e) {
e.printStackTrace();
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
index 85b4003..342312b 100644
--- a/src/java/azkaban/trigger/ConditionChecker.java
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -23,4 +23,5 @@ public interface ConditionChecker {
void setContext(Map<String, Object> context);
+ long getNextCheckTime();
}
src/java/azkaban/trigger/JdbcTriggerLoader.java 62(+32 -30)
diff --git a/src/java/azkaban/trigger/JdbcTriggerLoader.java b/src/java/azkaban/trigger/JdbcTriggerLoader.java
index 658594b..2d70c65 100644
--- a/src/java/azkaban/trigger/JdbcTriggerLoader.java
+++ b/src/java/azkaban/trigger/JdbcTriggerLoader.java
@@ -73,7 +73,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
}
@Override
- public List<Trigger> getUpdatedTriggers(long lastUpdateTime) throws TriggerManagerException {
+ public List<Trigger> getUpdatedTriggers(long lastUpdateTime) throws TriggerLoaderException {
logger.info("Loading triggers changed since " + new DateTime(lastUpdateTime).toString());
Connection connection = getConnection();
@@ -87,7 +87,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
} catch (SQLException e) {
logger.error(GET_ALL_TRIGGERS + " failed.");
- throw new TriggerManagerException("Loading triggers from db failed. ", e);
+ throw new TriggerLoaderException("Loading triggers from db failed. ", e);
} finally {
DbUtils.closeQuietly(connection);
}
@@ -98,7 +98,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
}
@Override
- public List<Trigger> loadTriggers() throws TriggerManagerException {
+ public List<Trigger> loadTriggers() throws TriggerLoaderException {
logger.info("Loading all triggers from db.");
Connection connection = getConnection();
@@ -112,7 +112,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
} catch (SQLException e) {
logger.error(GET_ALL_TRIGGERS + " failed.");
- throw new TriggerManagerException("Loading triggers from db failed. ", e);
+ throw new TriggerLoaderException("Loading triggers from db failed. ", e);
} finally {
DbUtils.closeQuietly(connection);
}
@@ -123,38 +123,38 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
}
@Override
- public void removeTrigger(Trigger t) throws TriggerManagerException {
+ public void removeTrigger(Trigger t) throws TriggerLoaderException {
logger.info("Removing trigger " + t.toString() + " from db.");
QueryRunner runner = createQueryRunner();
try {
int removes = runner.update(REMOVE_TRIGGER, t.getTriggerId());
if (removes == 0) {
- throw new TriggerManagerException("No trigger has been removed.");
+ throw new TriggerLoaderException("No trigger has been removed.");
}
} catch (SQLException e) {
logger.error(REMOVE_TRIGGER + " failed.");
- throw new TriggerManagerException("Remove trigger " + t.toString() + " from db failed. ", e);
+ throw new TriggerLoaderException("Remove trigger " + t.toString() + " from db failed. ", e);
}
}
@Override
- public void addTrigger(Trigger t) throws TriggerManagerException {
+ public void addTrigger(Trigger t) throws TriggerLoaderException {
logger.info("Inserting trigger " + t.toString() + " into db.");
- t.setLastModifyTime(DateTime.now());
+ t.setLastModifyTime(System.currentTimeMillis());
Connection connection = getConnection();
try {
addTrigger(connection, t, defaultEncodingType);
}
catch (Exception e) {
- throw new TriggerManagerException("Error uploading trigger", e);
+ throw new TriggerLoaderException("Error uploading trigger", e);
}
finally {
DbUtils.closeQuietly(connection);
}
}
- private synchronized void addTrigger(Connection connection, Trigger t, EncodingType encType) throws TriggerManagerException {
+ private synchronized void addTrigger(Connection connection, Trigger t, EncodingType encType) throws TriggerLoaderException {
QueryRunner runner = new QueryRunner();
@@ -166,37 +166,37 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
id = runner.query(connection, LastInsertID.LAST_INSERT_ID, new LastInsertID());
if (id == -1l) {
- throw new TriggerManagerException("trigger id is not properly created.");
+ logger.error("trigger id is not properly created.");
+ throw new TriggerLoaderException("trigger id is not properly created.");
}
t.setTriggerId((int)id);
updateTrigger(t);
- logger.info("uploaded trigger " + t.toString());
+ logger.info("uploaded trigger " + t.getDescription());
} catch (SQLException e) {
- throw new TriggerManagerException("Error creating trigger.", e);
+ throw new TriggerLoaderException("Error creating trigger.", e);
}
}
@Override
- public void updateTrigger(Trigger t) throws TriggerManagerException {
- logger.info("Updating trigger " + t.toString() + " into db.");
- t.setLastModifyTime(DateTime.now());
+ public void updateTrigger(Trigger t) throws TriggerLoaderException {
+ logger.info("Updating trigger " + t.getTriggerId() + " into db.");
+ t.setLastModifyTime(System.currentTimeMillis());
Connection connection = getConnection();
try{
- t.setLastModifyTime(DateTime.now());
updateTrigger(connection, t, defaultEncodingType);
}
catch(Exception e) {
e.printStackTrace();
- throw new TriggerManagerException("Failed to update trigger " + t.toString() + " into db!");
+ throw new TriggerLoaderException("Failed to update trigger " + t.toString() + " into db!");
}
finally {
DbUtils.closeQuietly(connection);
}
}
- private void updateTrigger(Connection connection, Trigger t, EncodingType encType) throws TriggerManagerException {
+ private void updateTrigger(Connection connection, Trigger t, EncodingType encType) throws TriggerLoaderException {
String json = JSONUtils.toJSON(t.toJson());
byte[] data = null;
@@ -210,7 +210,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
}
catch (IOException e) {
- throw new TriggerManagerException("Error encoding the trigger " + t.toString());
+ throw new TriggerLoaderException("Error encoding the trigger " + t.toString());
}
QueryRunner runner = new QueryRunner();
@@ -219,20 +219,22 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
int updates = runner.update( connection,
UPDATE_TRIGGER,
t.getSource(),
- t.getLastModifyTime().getMillis(),
+ t.getLastModifyTime(),
encType.getNumVal(),
data,
t.getTriggerId());
+ connection.commit();
if (updates == 0) {
- throw new TriggerManagerException("No trigger has been updated.");
+ throw new TriggerLoaderException("No trigger has been updated.");
//logger.error("No trigger is updated!");
+ } else {
+ logger.info("Updated " + updates + " records.");
}
} catch (SQLException e) {
logger.error(UPDATE_TRIGGER + " failed.");
- throw new TriggerManagerException("Update trigger " + t.toString() + " into db failed. ", e);
+ throw new TriggerLoaderException("Update trigger " + t.toString() + " into db failed. ", e);
}
}
-
private static class LastInsertID implements ResultSetHandler<Long> {
private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
@@ -301,20 +303,20 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
}
- private Connection getConnection() throws TriggerManagerException {
+ private Connection getConnection() throws TriggerLoaderException {
Connection connection = null;
try {
connection = super.getDBConnection(false);
} catch (Exception e) {
DbUtils.closeQuietly(connection);
- throw new TriggerManagerException("Error getting DB connection.", e);
+ throw new TriggerLoaderException("Error getting DB connection.", e);
}
return connection;
}
@Override
- public Trigger loadTrigger(int triggerId) throws TriggerManagerException {
+ public Trigger loadTrigger(int triggerId) throws TriggerLoaderException {
logger.info("Loading trigger " + triggerId + " from db.");
Connection connection = getConnection();
@@ -327,14 +329,14 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
triggers = runner.query(connection, GET_TRIGGER, handler, triggerId);
} catch (SQLException e) {
logger.error(GET_TRIGGER + " failed.");
- throw new TriggerManagerException("Loading trigger from db failed. ", e);
+ throw new TriggerLoaderException("Loading trigger from db failed. ", e);
} finally {
DbUtils.closeQuietly(connection);
}
if(triggers.size() == 0) {
logger.error("Loaded 0 triggers. Failed to load trigger " + triggerId);
- throw new TriggerManagerException("Loaded 0 triggers. Failed to load trigger " + triggerId);
+ throw new TriggerLoaderException("Loaded 0 triggers. Failed to load trigger " + triggerId);
}
return triggers.get(0);
src/java/azkaban/trigger/Trigger.java 72(+45 -27)
diff --git a/src/java/azkaban/trigger/Trigger.java b/src/java/azkaban/trigger/Trigger.java
index d668b5e..013a548 100644
--- a/src/java/azkaban/trigger/Trigger.java
+++ b/src/java/azkaban/trigger/Trigger.java
@@ -8,13 +8,15 @@ import java.util.Map;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
+import azkaban.utils.JSONUtils;
+
public class Trigger {
private static Logger logger = Logger.getLogger(Trigger.class);
private int triggerId = -1;
- private DateTime lastModifyTime;
- private DateTime submitTime;
+ private long lastModifyTime;
+ private long submitTime;
private String submitUser;
private String source;
private TriggerStatus status = TriggerStatus.READY;
@@ -32,12 +34,26 @@ public class Trigger {
private boolean resetOnTrigger = true;
private boolean resetOnExpire = true;
+ private long nextCheckTime = -1;
+
@SuppressWarnings("unused")
private Trigger() throws TriggerManagerException {
throw new TriggerManagerException("Triggers should always be specified");
}
- public DateTime getSubmitTime() {
+ public void updateNextCheckTime() {
+ this.nextCheckTime = Math.min(triggerCondition.getNextCheckTime(), expireCondition.getNextCheckTime());
+ }
+
+ public long getNextCheckTime() {
+ return nextCheckTime;
+ }
+
+ public void setNextCheckTime(long nct) {
+ this.nextCheckTime = nct;
+ }
+
+ public long getSubmitTime() {
return submitTime;
}
@@ -86,8 +102,8 @@ public class Trigger {
}
public Trigger(
- DateTime lastModifyTime,
- DateTime submitTime,
+ long lastModifyTime,
+ long submitTime,
String submitUser,
String source,
Condition triggerCondition,
@@ -109,8 +125,8 @@ public class Trigger {
}
public Trigger(
- DateTime lastModifyTime,
- DateTime submitTime,
+ long lastModifyTime,
+ long submitTime,
String submitUser,
String source,
Condition triggerCondition,
@@ -134,8 +150,8 @@ public class Trigger {
Condition expireCondition,
List<TriggerAction> actions,
List<TriggerAction> expireActions) {
- this.lastModifyTime = DateTime.now();
- this.submitTime = DateTime.now();
+ this.lastModifyTime = DateTime.now().getMillis();
+ this.submitTime = DateTime.now().getMillis();
this.submitUser = submitUser;
this.source = source;
this.triggerCondition = triggerCondition;
@@ -150,8 +166,8 @@ public class Trigger {
Condition triggerCondition,
Condition expireCondition,
List<TriggerAction> actions) {
- this.lastModifyTime = DateTime.now();
- this.submitTime = DateTime.now();
+ this.lastModifyTime = DateTime.now().getMillis();
+ this.submitTime = DateTime.now().getMillis();
this.submitUser = submitUser;
this.source = source;
this.triggerCondition = triggerCondition;
@@ -161,8 +177,8 @@ public class Trigger {
}
public Trigger(
- DateTime lastModifyTime,
- DateTime submitTime,
+ long lastModifyTime,
+ long submitTime,
String submitUser,
String source,
Condition triggerCondition,
@@ -180,8 +196,8 @@ public class Trigger {
public Trigger(
int triggerId,
- DateTime lastModifyTime,
- DateTime submitTime,
+ long lastModifyTime,
+ long submitTime,
String submitUser,
String source,
Condition triggerCondition,
@@ -205,8 +221,8 @@ public class Trigger {
public Trigger(
int triggerId,
- DateTime lastModifyTime,
- DateTime submitTime,
+ long lastModifyTime,
+ long submitTime,
String submitUser,
String source,
Condition triggerCondition,
@@ -226,8 +242,8 @@ public class Trigger {
public Trigger(
int triggerId,
- DateTime lastModifyTime,
- DateTime submitTime,
+ long lastModifyTime,
+ long submitTime,
String submitUser,
String source,
Condition triggerCondition,
@@ -268,11 +284,11 @@ public class Trigger {
this.resetOnExpire = resetOnExpire;
}
- public DateTime getLastModifyTime() {
+ public long getLastModifyTime() {
return lastModifyTime;
}
- public void setLastModifyTime(DateTime lastModifyTime) {
+ public void setLastModifyTime(long lastModifyTime) {
this.lastModifyTime = lastModifyTime;
}
@@ -329,8 +345,8 @@ public class Trigger {
jsonObj.put("resetOnExpire", String.valueOf(resetOnExpire));
jsonObj.put("submitUser", submitUser);
jsonObj.put("source", source);
- jsonObj.put("submitTime", String.valueOf(submitTime.getMillis()));
- jsonObj.put("lastModifyTime", String.valueOf(lastModifyTime.getMillis()));
+ jsonObj.put("submitTime", String.valueOf(submitTime));
+ jsonObj.put("lastModifyTime", String.valueOf(lastModifyTime));
jsonObj.put("triggerId", String.valueOf(triggerId));
jsonObj.put("status", status.toString());
jsonObj.put("info", info);
@@ -354,6 +370,7 @@ public class Trigger {
Trigger trigger = null;
try{
+ logger.info("Decoding for " + JSONUtils.toJSON(obj));
Condition triggerCond = Condition.fromJson(jsonObj.get("triggerCondition"));
Condition expireCond = Condition.fromJson(jsonObj.get("expireCondition"));
List<TriggerAction> actions = new ArrayList<TriggerAction>();
@@ -376,14 +393,15 @@ public class Trigger {
boolean resetOnExpire = Boolean.valueOf((String) jsonObj.get("resetOnExpire"));
String submitUser = (String) jsonObj.get("submitUser");
String source = (String) jsonObj.get("source");
- long submitTimeMillis = Long.valueOf((String) jsonObj.get("submitTime"));
- long lastModifyTimeMillis = Long.valueOf((String) jsonObj.get("lastModifyTime"));
- DateTime submitTime = new DateTime(submitTimeMillis);
- DateTime lastModifyTime = new DateTime(lastModifyTimeMillis);
+ long submitTime = Long.valueOf((String) jsonObj.get("submitTime"));
+ long lastModifyTime = Long.valueOf((String) jsonObj.get("lastModifyTime"));
int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
TriggerStatus status = TriggerStatus.valueOf((String)jsonObj.get("status"));
Map<String, Object> info = (Map<String, Object>) jsonObj.get("info");
Map<String, Object> context = (Map<String, Object>) jsonObj.get("context");
+ if(context == null) {
+ context = new HashMap<String, Object>();
+ }
for(ConditionChecker checker : triggerCond.getCheckers().values()) {
checker.setContext(context);
}
diff --git a/src/java/azkaban/trigger/TriggerAgent.java b/src/java/azkaban/trigger/TriggerAgent.java
index 453f49d..cdabe94 100644
--- a/src/java/azkaban/trigger/TriggerAgent.java
+++ b/src/java/azkaban/trigger/TriggerAgent.java
@@ -3,10 +3,12 @@ package azkaban.trigger;
import azkaban.utils.Props;
public interface TriggerAgent {
- void loadTriggerFromProps(Props props) throws Exception;
+ public void loadTriggerFromProps(Props props) throws Exception;
- String getTriggerSource();
+ public String getTriggerSource();
- void start() throws Exception;
+ public void start() throws Exception;
+
+ public void shutdown();
}
diff --git a/src/java/azkaban/trigger/TriggerLoader.java b/src/java/azkaban/trigger/TriggerLoader.java
index 7adf742..bddb9cc 100644
--- a/src/java/azkaban/trigger/TriggerLoader.java
+++ b/src/java/azkaban/trigger/TriggerLoader.java
@@ -4,16 +4,16 @@ import java.util.List;
public interface TriggerLoader {
- public void addTrigger(Trigger t) throws TriggerManagerException;
+ public void addTrigger(Trigger t) throws TriggerLoaderException;
- public void removeTrigger(Trigger s) throws TriggerManagerException;
+ public void removeTrigger(Trigger s) throws TriggerLoaderException;
- public void updateTrigger(Trigger t) throws TriggerManagerException;
+ public void updateTrigger(Trigger t) throws TriggerLoaderException;
- public List<Trigger> loadTriggers() throws TriggerManagerException;
+ public List<Trigger> loadTriggers() throws TriggerLoaderException;
- public Trigger loadTrigger(int triggerId) throws TriggerManagerException;
+ public Trigger loadTrigger(int triggerId) throws TriggerLoaderException;
- public List<Trigger> getUpdatedTriggers(long lastUpdateTime) throws TriggerManagerException;
+ public List<Trigger> getUpdatedTriggers(long lastUpdateTime) throws TriggerLoaderException;
}
diff --git a/src/java/azkaban/trigger/TriggerLoaderException.java b/src/java/azkaban/trigger/TriggerLoaderException.java
new file mode 100644
index 0000000..f6b9b41
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerLoaderException.java
@@ -0,0 +1,34 @@
+package azkaban.trigger;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+public class TriggerLoaderException extends Exception{
+ private static final long serialVersionUID = 1L;
+
+ public TriggerLoaderException(String message) {
+ super(message);
+ }
+
+ public TriggerLoaderException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TriggerLoaderException(Throwable e) {
+ super(e);
+ }
+}
\ No newline at end of file
src/java/azkaban/trigger/TriggerManager.java 745(+331 -414)
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 0b21166..bdcd30f 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -1,533 +1,450 @@
-/*
- * Copyright 2012 LinkedIn, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
package azkaban.trigger;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.lang.Thread.State;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.impl.client.BasicResponseHandler;
-import org.apache.http.impl.client.DefaultHttpClient;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import org.apache.log4j.Logger;
-import azkaban.triggerapp.TriggerConnectorParams;
-import azkaban.utils.JSONUtils;
-import azkaban.utils.Pair;
import azkaban.utils.Props;
-/**
- * Executor manager used to manage the client side job.
- *
- */
-public class TriggerManager {
+public class TriggerManager implements TriggerManagerAdapter{
private static Logger logger = Logger.getLogger(TriggerManager.class);
+ private static final long DEFAULT_SCANNER_INTERVAL_MS = 60000;
- public static final String TRIGGER_SUFFIX = ".trigger";
+ private static Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
- private TriggerLoader triggerLoader;
private CheckerTypeLoader checkerTypeLoader;
private ActionTypeLoader actionTypeLoader;
-
- private String triggerServerHost;
- private int triggerServerPort;
-
- private Set<Pair<String, Integer>> triggerServers = new HashSet<Pair<String,Integer>>();
-
- private ConcurrentHashMap<Integer, Trigger> triggerIdMap = new ConcurrentHashMap<Integer, Trigger>();
-
- private Map<String, TriggerAgent> triggerAgents = new HashMap<String, TriggerAgent>();
+ private TriggerLoader triggerLoader;
- private TriggerManagerUpdaterThread triggerManagingThread;
+ private final TriggerScannerThread runnerThread;
+ private long lastRunnerThreadCheckTime = -1;
+ private long runnerThreadIdleTime = -1;
+ private LocalTriggerJMX jmxStats = new LocalTriggerJMX();
- private long lastThreadCheckTime = -1;
-
- private long lastUpdateTime = -1;
-
- public TriggerManager(Props props, TriggerLoader loader) throws TriggerManagerException {
- this.triggerLoader = loader;
- this.checkerTypeLoader = new CheckerTypeLoader();
- this.actionTypeLoader = new ActionTypeLoader();
+ public TriggerManager(Props props, TriggerLoader triggerLoader) throws TriggerManagerException {
- triggerServerHost = props.getString("trigger.server.host", "localhost");
- triggerServerPort = props.getInt("trigger.server.port");
-
- triggerManagingThread = new TriggerManagerUpdaterThread();
+ this.triggerLoader = triggerLoader;
- try{
+ long scannerInterval = props.getLong("trigger.scan.interval", DEFAULT_SCANNER_INTERVAL_MS);
+ runnerThread = new TriggerScannerThread(scannerInterval);
+
+ checkerTypeLoader = new CheckerTypeLoader();
+ actionTypeLoader = new ActionTypeLoader();
+
+ try {
checkerTypeLoader.init(props);
actionTypeLoader.init(props);
- } catch(Exception e) {
- e.printStackTrace();
- logger.error(e.getMessage());
+ } catch (Exception e) {
+ throw new TriggerManagerException(e);
}
-
Condition.setCheckerLoader(checkerTypeLoader);
Trigger.setActionTypeLoader(actionTypeLoader);
-
- triggerServers.add(new Pair<String, Integer>(triggerServerHost, triggerServerPort));
-
- }
-
- public void start() throws Exception {
- loadTriggers();
- for(TriggerAgent agent : triggerAgents.values()) {
- agent.start();
- }
- triggerManagingThread.start();
}
-
- private static class SuffixFilter implements FileFilter {
- private String suffix;
- public SuffixFilter(String suffix) {
- this.suffix = suffix;
- }
- @Override
- public boolean accept(File pathname) {
- String name = pathname.getName();
- return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
- }
- }
-
- public String getTriggerServerHost() {
- return triggerServerHost;
- }
-
- public int getTriggerServerPort() {
- return triggerServerPort;
- }
-
- public State getUpdaterThreadState() {
- return triggerManagingThread.getState();
- }
-
- public boolean isThreadActive() {
- return triggerManagingThread.isAlive();
- }
-
- public long getLastThreadCheckTime() {
- return lastThreadCheckTime;
- }
-
- public Set<String> getPrimaryServerHosts() {
- // Only one for now. More probably later.
- HashSet<String> ports = new HashSet<String>();
- ports.add(triggerServerHost + ":" + triggerServerPort);
- return ports;
- }
-
- private void loadTriggers() throws TriggerManagerException {
- List<Trigger> triggerList = triggerLoader.loadTriggers();
- for(Trigger t : triggerList) {
- if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
- removeTrigger(t, "azkaban");
- } else {
+ @Override
+ public void start() throws TriggerManagerException{
+
+ try{
+ // expect loader to return valid triggers
+ List<Trigger> triggers = triggerLoader.loadTriggers();
+ for(Trigger t : triggers) {
+ runnerThread.addTrigger(t);
triggerIdMap.put(t.getTriggerId(), t);
}
+ }catch(Exception e) {
+ e.printStackTrace();
+ throw new TriggerManagerException(e);
}
+
+ runnerThread.start();
}
- public Trigger getTrigger(int triggerId) {
- return triggerIdMap.get(triggerId);
- }
-
- public void removeTrigger(Trigger t, String userId) throws TriggerManagerException {
- synchronized(t) {
- logger.info("Removing trigger " + t.getTriggerId() + " by " + userId);
- callTriggerServer(t, TriggerConnectorParams.REMOVE_TRIGGER_ACTION, userId);
- triggerIdMap.remove(t.getTriggerId());
- }
+ protected CheckerTypeLoader getCheckerLoader() {
+ return checkerTypeLoader;
}
- public void updateTrigger(Trigger t, String userId) throws TriggerManagerException {
- synchronized(t) {
- try {
- triggerLoader.updateTrigger(t);
- callTriggerServer(t, TriggerConnectorParams.UPDATE_TRIGGER_ACTION, userId);
- } catch(TriggerManagerException e) {
- throw new TriggerManagerException(e);
- }
- }
+ protected ActionTypeLoader getActionLoader() {
+ return actionTypeLoader;
}
-
-// public void getUpdatedTriggers() throws TriggerManagerException {
-// try {
-// callTriggerServer(triggerServerHost, triggerServerPort, TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", (Pair<String,String>[])null);
-// } catch(IOException e) {
-// throw new TriggerManagerException(e);
-// }
-// }
-
- public String insertTrigger(Trigger t, String userId) throws TriggerManagerException {
- synchronized(t) {
- String message = null;
- logger.info("Inserting trigger into system. " );
- // The trigger id is set by the loader. So it's unavailable until after this call.
- t.setStatus(TriggerStatus.PREPARING);
+
+ public synchronized void insertTrigger(Trigger t) throws TriggerManagerException {
+ try {
triggerLoader.addTrigger(t);
- try {
- callTriggerServer(t, TriggerConnectorParams.INSERT_TRIGGER_ACTION, userId);
- triggerIdMap.put(t.getTriggerId(), t);
-
- message += "Trigger inserted successfully with trigger id " + t.getTriggerId();
- }
- catch (TriggerManagerException e) {
- throw e;
- }
- return message;
+ } catch (TriggerLoaderException e) {
+ throw new TriggerManagerException(e);
}
+ runnerThread.addTrigger(t);
+ triggerIdMap.put(t.getTriggerId(), t);
}
- private Map<String, Object> callTriggerServer(Trigger t, String action, String user) throws TriggerManagerException {
- try {
- return callTriggerServer(triggerServerHost, triggerServerPort, action, t.getTriggerId(), user, (Pair<String,String>[])null);
- } catch (IOException e) {
- throw new TriggerManagerException(e);
+ public synchronized void removeTrigger(int id) throws TriggerManagerException {
+ Trigger t = triggerIdMap.get(id);
+ if(t != null) {
+ removeTrigger(triggerIdMap.get(id));
}
}
- private Map<String, Object> callTriggerServer(String host, int port, String action, Integer triggerId, String user, Pair<String,String> ... params) throws IOException {
- URIBuilder builder = new URIBuilder();
- builder.setScheme("http")
- .setHost(host)
- .setPort(port)
- .setPath("/trigger");
-
- builder.setParameter(TriggerConnectorParams.ACTION_PARAM, action);
-
- if (triggerId != null) {
- builder.setParameter(TriggerConnectorParams.TRIGGER_ID_PARAM,String.valueOf(triggerId));
+ public synchronized void updateTrigger(int id) throws TriggerManagerException {
+ if(! triggerIdMap.containsKey(id)) {
+ throw new TriggerManagerException("The trigger to update " + id + " doesn't exist!");
}
- if (user != null) {
- builder.setParameter(TriggerConnectorParams.USER_PARAM, user);
- }
-
- if (params != null) {
- for (Pair<String, String> pair: params) {
- builder.setParameter(pair.getFirst(), pair.getSecond());
- }
- }
-
- URI uri = null;
- try {
- uri = builder.build();
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
-
- ResponseHandler<String> responseHandler = new BasicResponseHandler();
-
- HttpClient httpclient = new DefaultHttpClient();
- HttpGet httpget = new HttpGet(uri);
- String response = null;
+ Trigger t;
try {
- response = httpclient.execute(httpget, responseHandler);
- } catch (IOException e) {
- throw e;
- }
- finally {
- httpclient.getConnectionManager().shutdown();
- }
-
- @SuppressWarnings("unchecked")
- Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
- String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
- if (error != null) {
- throw new IOException(error);
+ t = triggerLoader.loadTrigger(id);
+ } catch (TriggerLoaderException e) {
+ throw new TriggerManagerException(e);
}
-
- return jsonResponse;
+ updateTrigger(t);
}
- public Map<String, Object> callTriggerServerJMX(String hostPort, String action, String mBean) throws IOException {
- URIBuilder builder = new URIBuilder();
-
- String[] hostPortSplit = hostPort.split(":");
- builder.setScheme("http")
- .setHost(hostPortSplit[0])
- .setPort(Integer.parseInt(hostPortSplit[1]))
- .setPath("/jmx");
-
- builder.setParameter(action, "");
- if (mBean != null) {
- builder.setParameter(TriggerConnectorParams.JMX_MBEAN, mBean);
- }
+ public synchronized void updateTrigger(Trigger t) throws TriggerManagerException {
+ runnerThread.deleteTrigger(triggerIdMap.get(t.getTriggerId()));
+ runnerThread.addTrigger(t);
+ triggerIdMap.put(t.getTriggerId(), t);
+ }
- URI uri = null;
- try {
- uri = builder.build();
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
-
- ResponseHandler<String> responseHandler = new BasicResponseHandler();
-
- HttpClient httpclient = new DefaultHttpClient();
- HttpGet httpget = new HttpGet(uri);
- String response = null;
+ public synchronized void removeTrigger(Trigger t) throws TriggerManagerException {
+ runnerThread.deleteTrigger(t);
+ triggerIdMap.remove(t.getTriggerId());
try {
- response = httpclient.execute(httpget, responseHandler);
- } catch (IOException e) {
- throw e;
- }
- finally {
- httpclient.getConnectionManager().shutdown();
- }
-
- @SuppressWarnings("unchecked")
- Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
- String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
- if (error != null) {
- throw new IOException(error);
+ t.stopCheckers();
+ triggerLoader.removeTrigger(t);
+ } catch (TriggerLoaderException e) {
+ throw new TriggerManagerException(e);
}
- return jsonResponse;
}
- public void shutdown() {
- triggerManagingThread.shutdown();
+ public List<Trigger> getTriggers() {
+ return new ArrayList<Trigger>(triggerIdMap.values());
}
- private class TriggerManagerUpdaterThread extends Thread {
+ public Map<String, Class<? extends ConditionChecker>> getSupportedCheckers() {
+ return checkerTypeLoader.getSupportedCheckers();
+ }
+
+ private class TriggerScannerThread extends Thread {
+ private BlockingQueue<Trigger> triggers;
private boolean shutdown = false;
-
- public TriggerManagerUpdaterThread() {
- this.setName("TriggerManagingThread");
+ //private AtomicBoolean stillAlive = new AtomicBoolean(true);
+ private final long scannerInterval;
+
+ public TriggerScannerThread(long scannerInterval) {
+ triggers = new PriorityBlockingQueue<Trigger>(1, new TriggerComparator());
+ this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
+ this.scannerInterval = scannerInterval;;
}
- private int waitTimeIdleMs = 2000;
- private int waitTimeMs = 500;
-
- private void shutdown() {
+ public void shutdown() {
+ logger.error("Shutting down trigger manager thread " + this.getName());
shutdown = true;
+ //stillAlive.set(false);
+ this.interrupt();
+ }
+
+ public synchronized void addTrigger(Trigger t) {
+ t.updateNextCheckTime();
+ triggers.add(t);
}
- @SuppressWarnings("unchecked")
+ public synchronized void deleteTrigger(Trigger t) {
+ triggers.remove(t);
+ }
+
public void run() {
+ //while(stillAlive.get()) {
while(!shutdown) {
- try {
- lastThreadCheckTime = System.currentTimeMillis();
-
- Pair<String, Integer> triggerServer = (Pair<String, Integer>) triggerServers.toArray()[0];
-
- Pair<String, String> updateTimeParam = new Pair<String, String>("lastUpdateTime", String.valueOf(lastUpdateTime));
- Map<String, Object> results = null;
+ synchronized (this) {
try{
- results = callTriggerServer(triggerServer.getFirst(), triggerServer.getSecond(), TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", updateTimeParam);
-// lastUpdateTime = (Long) results.get(TriggerConnectorParams.RESPONSE_UPDATETIME);
-
- List<Integer> updates = (List<Integer>) results.get("updates");
- for(Integer update : updates) {
- Trigger t = triggerLoader.loadTrigger(update);
- lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
-
- if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
- removeTrigger(t, "azkaban");
- //triggerIdMap.remove(update);
- } else {
- triggerIdMap.put(update, t);
- }
+ lastRunnerThreadCheckTime = System.currentTimeMillis();
+
+ try{
+ checkAllTriggers();
+ } catch(Exception e) {
+ e.printStackTrace();
+ logger.error(e.getMessage());
+ } catch(Throwable t) {
+ t.printStackTrace();
+ logger.error(t.getMessage());
}
- } catch (Exception e) {
- e.printStackTrace();
- logger.error(e);
- }
-
- synchronized(this) {
- try {
- if (triggerIdMap.size() > 0) {
- this.wait(waitTimeMs);
- }
- else {
- this.wait(waitTimeIdleMs);
- }
- } catch (InterruptedException e) {
+
+ runnerThreadIdleTime = scannerInterval - (System.currentTimeMillis() - lastRunnerThreadCheckTime);
+ if(runnerThreadIdleTime < 0) {
+ logger.error("Trigger manager thread " + this.getName() + " is too busy!");
+ } else {
+ wait(runnerThreadIdleTime);
}
+ } catch(InterruptedException e) {
+ logger.info("Interrupted. Probably to shut down.");
}
- }
- catch (Exception e) {
- logger.error(e);
+
}
}
}
- }
-
- private static class ConnectionInfo {
- private String host;
- private int port;
-
- public ConnectionInfo(String host, int port) {
- this.host = host;
- this.port = port;
- }
-
- @SuppressWarnings("unused")
- private ConnectionInfo getOuterType() {
- return ConnectionInfo.this;
- }
- public boolean isEqual(String host, int port) {
- return this.port == port && this.host.equals(host);
+ private void checkAllTriggers() throws TriggerManagerException {
+ long now = System.currentTimeMillis();
+ for(Trigger t : triggers) {
+ if(t.getNextCheckTime() > now) {
+ logger.info("Skipping trigger" + t.getTriggerId() + " until " + t.getNextCheckTime());
+ continue;
+ }
+ logger.info("Checking trigger " + t.getTriggerId());
+ if(t.getStatus().equals(TriggerStatus.READY)) {
+ if(t.triggerConditionMet()) {
+ onTriggerTrigger(t);
+ } else if (t.expireConditionMet()) {
+ onTriggerExpire(t);
+ }
+ }
+ if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
+ removeTrigger(t);
+ } else {
+ t.updateNextCheckTime();
+ }
+ }
}
- public String getHost() {
- return host;
+ private void onTriggerTrigger(Trigger t) throws TriggerManagerException {
+ List<TriggerAction> actions = t.getTriggerActions();
+ for(TriggerAction action : actions) {
+ try {
+ logger.info("Doing trigger actions");
+ action.doAction();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ //throw new TriggerManagerException("action failed to execute", e);
+ logger.error("Failed to do action " + action.getDescription(), e);
+ } catch (Throwable th) {
+ logger.error("Failed to do action " + action.getDescription(), th);
+ }
+ }
+ if(t.isResetOnTrigger()) {
+ t.resetTriggerConditions();
+ t.resetExpireCondition();
+ } else {
+ t.setStatus(TriggerStatus.EXPIRED);
+ }
+ try {
+ triggerLoader.updateTrigger(t);
+ }
+ catch (TriggerLoaderException e) {
+ throw new TriggerManagerException(e);
+ }
+// updateAgent(t);
}
- public int getPort() {
- return port;
+ private void onTriggerExpire(Trigger t) throws TriggerManagerException {
+ List<TriggerAction> expireActions = t.getExpireActions();
+ for(TriggerAction action : expireActions) {
+ try {
+ logger.info("Doing expire actions");
+ action.doAction();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ //throw new TriggerManagerException("action failed to execute", e);
+ logger.error("Failed to do expire action " + action.getDescription(), e);
+ } catch (Throwable th) {
+ logger.error("Failed to do expire action " + action.getDescription(), th);
+ }
+ }
+ if(t.isResetOnExpire()) {
+ t.resetTriggerConditions();
+ t.resetExpireCondition();
+// updateTrigger(t);
+ } else {
+ t.setStatus(TriggerStatus.EXPIRED);
+ }
+ try {
+ triggerLoader.updateTrigger(t);
+ } catch (TriggerLoaderException e) {
+ throw new TriggerManagerException(e);
+ }
}
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((host == null) ? 0 : host.hashCode());
- result = prime * result + port;
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- ConnectionInfo other = (ConnectionInfo) obj;
- if (host == null) {
- if (other.host != null)
- return false;
- } else if (!host.equals(other.host))
- return false;
- if (port != other.port)
- return false;
- return true;
- }
- }
-
- public void loadTriggerFromDir(File baseDir, Props props) throws Exception {
- File[] triggerFiles = baseDir.listFiles(new SuffixFilter(TRIGGER_SUFFIX));
-
- for(File triggerFile : triggerFiles) {
- Props triggerProps = new Props(props, triggerFile);
- String triggerType = triggerProps.getString("trigger.type");
- TriggerAgent agent = triggerAgents.get(triggerType);
- if(agent != null) {
- agent.loadTriggerFromProps(triggerProps);
- } else {
- throw new Exception("Trigger " + triggerType + " is not supported.");
+ private class TriggerComparator implements Comparator<Trigger> {
+ @Override
+ public int compare(Trigger arg0, Trigger arg1) {
+ long first = arg1.getNextCheckTime();
+ long second = arg0.getNextCheckTime();
+
+ if(first == second) {
+ return 0;
+ } else if (first < second) {
+ return 1;
+ }
+ return -1;
}
}
}
-
- public List<Trigger> getTriggers() {
- return new ArrayList<Trigger>(triggerIdMap.values());
+
+ public synchronized Trigger getTrigger(int triggerId) {
+ return triggerIdMap.get(triggerId);
}
public void expireTrigger(int triggerId) {
- // TODO Auto-generated method stub
-
- }
-
- public CheckerTypeLoader getCheckerLoader() {
- return checkerTypeLoader;
- }
-
- public ActionTypeLoader getActionLoader() {
- return actionTypeLoader;
- }
-
- public void addTriggerAgent(String triggerSource,
- TriggerAgent agent) {
- triggerAgents.put(triggerSource, agent);
+ Trigger t = getTrigger(triggerId);
+ t.setStatus(TriggerStatus.EXPIRED);
+// updateAgent(t);
}
public List<Trigger> getTriggers(String triggerSource) {
- List<Trigger> results = new ArrayList<Trigger>();
+ List<Trigger> triggers = new ArrayList<Trigger>();
for(Trigger t : triggerIdMap.values()) {
if(t.getSource().equals(triggerSource)) {
- results.add(t);
+ triggers.add(t);
}
}
- return results;
+ return triggers;
}
- public List<Trigger> getUpdatedTriggers(String triggerSource, long lastUpdateTime) throws TriggerManagerException {
- getUpdatedTriggers();
+ @Override
+ public List<Trigger> getTriggerUpdates(String triggerSource, long lastUpdateTime) throws TriggerManagerException{
List<Trigger> triggers = new ArrayList<Trigger>();
for(Trigger t : triggerIdMap.values()) {
- if(t.getSource().equals(triggerSource) && t.getLastModifyTime().getMillis() >= lastUpdateTime) {
+ if(t.getSource().equals(triggerSource) && t.getLastModifyTime() > lastUpdateTime) {
triggers.add(t);
}
}
return triggers;
}
+
+ @Override
+ public List<Integer> getTriggerUpdates(long lastUpdateTime) throws TriggerManagerException {
+ List<Integer> triggers = new ArrayList<Integer>();
+ for(Trigger t : triggerIdMap.values()) {
+ if(t.getLastModifyTime() > lastUpdateTime) {
+ triggers.add(t.getTriggerId());
+ }
+ }
+ return triggers;
+ }
- private void getUpdatedTriggers() throws TriggerManagerException {
- List<Trigger> triggers = triggerLoader.getUpdatedTriggers(this.lastUpdateTime);
- for(Trigger t : triggers) {
- this.lastUpdateTime = Math.max(this.lastUpdateTime, t.getLastModifyTime().getMillis());
+ public void loadTrigger(int triggerId) throws TriggerManagerException {
+ Trigger t;
+ try {
+ t = triggerLoader.loadTrigger(triggerId);
+ } catch (TriggerLoaderException e) {
+ throw new TriggerManagerException(e);
+ }
+ if(t.getStatus().equals(TriggerStatus.PREPARING)) {
triggerIdMap.put(t.getTriggerId(), t);
+ runnerThread.addTrigger(t);
+ t.setStatus(TriggerStatus.READY);
}
}
- public void removeTrigger(int scheduleId, String submitUser) throws TriggerManagerException {
- removeTrigger(triggerIdMap.get(scheduleId), submitUser);
+ @Override
+ public void insertTrigger(Trigger t, String user) throws TriggerManagerException {
+ insertTrigger(t);
}
- public Set<String> getAllActiveTriggerServerHosts() {
- Set<String> hostport = new HashSet<String>();
- hostport.add(triggerServerHost+":"+triggerServerPort);
- return hostport;
+ @Override
+ public void removeTrigger(int id, String user) throws TriggerManagerException {
+ removeTrigger(id);
}
- public int getNumTriggers() {
- return triggerIdMap.size();
+ @Override
+ public void updateTrigger(int triggerId, String user) throws TriggerManagerException {
+ updateTrigger(triggerId);
}
- public String getTriggerSources() {
- Set<String> sources = new HashSet<String>();
- for(Trigger t : triggerIdMap.values()) {
- sources.add(t.getSource());
+ @Override
+ public void updateTrigger(Trigger t, String user) throws TriggerManagerException {
+ updateTrigger(t);
+ }
+
+ @Override
+ public void insertTrigger(int triggerId, String user) throws TriggerManagerException {
+ Trigger t;
+ try {
+ t = triggerLoader.loadTrigger(triggerId);
+ } catch (TriggerLoaderException e) {
+ throw new TriggerManagerException(e);
+ }
+ if(t != null) {
+ insertTrigger(t);
}
- return sources.toString();
+ }
+
+ @Override
+ public void shutdown() {
+ runnerThread.shutdown();
}
- public String getTriggerIds() {
- return triggerIdMap.keySet().toString();
+ @Override
+ public TriggerJMX getJMX() {
+ return this.jmxStats;
}
+
+ private class LocalTriggerJMX implements TriggerJMX {
+ @Override
+ public long getLastRunnerThreadCheckTime() {
+ // TODO Auto-generated method stub
+ return lastRunnerThreadCheckTime;
+ }
+
+ @Override
+ public boolean isRunnerThreadActive() {
+ // TODO Auto-generated method stub
+ return runnerThread.isAlive();
+ }
+
+ @Override
+ public String getPrimaryServerHost() {
+ return "local";
+ }
+
+ @Override
+ public int getNumTriggers() {
+ // TODO Auto-generated method stub
+ return triggerIdMap.size();
+ }
+
+ @Override
+ public String getTriggerSources() {
+ Set<String> sources = new HashSet<String>();
+ for(Trigger t : triggerIdMap.values()) {
+ sources.add(t.getSource());
+ }
+ return sources.toString();
+ }
+
+ @Override
+ public String getTriggerIds() {
+ return triggerIdMap.keySet().toString();
+ }
+
+ @Override
+ public long getScannerIdleTime() {
+ // TODO Auto-generated method stub
+ return runnerThreadIdleTime;
+ }
+
+ @Override
+ public Map<String, Object> getAllJMXMbeans() {
+ return new HashMap<String, Object>();
+ }
+
+ }
+
+ @Override
+ public void registerCheckerType(String name, Class<? extends ConditionChecker> checker) {
+ checkerTypeLoader.registerCheckerType(name, checker);
+ }
+
+ @Override
+ public void registerActionType(String name, Class<? extends TriggerAction> action) {
+ actionTypeLoader.registerActionType(name, action);
+ }
}
-
src/java/azkaban/trigger/TriggerManager.java.old 573(+573 -0)
diff --git a/src/java/azkaban/trigger/TriggerManager.java.old b/src/java/azkaban/trigger/TriggerManager.java.old
new file mode 100644
index 0000000..3caf113
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerManager.java.old
@@ -0,0 +1,573 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.trigger;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.lang.Thread.State;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.log4j.Logger;
+
+import azkaban.triggerapp.TriggerConnectorParams;
+import azkaban.triggerapp.TriggerRunnerManager;
+import azkaban.triggerapp.TriggerRunnerManagerAdapter;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+
+/**
+ * Executor manager used to manage the client side job.
+ *
+ */
+public class TriggerManager {
+ private static Logger logger = Logger.getLogger(TriggerManager.class);
+
+ public static final String TRIGGER_SUFFIX = ".trigger";
+
+ private TriggerLoader triggerLoader;
+ private CheckerTypeLoader checkerTypeLoader;
+ private ActionTypeLoader actionTypeLoader;
+
+ private String triggerServerHost;
+ private int triggerServerPort;
+
+ private TriggerRunnerManagerAdapter trmAdapter;
+
+ private Set<Pair<String, Integer>> triggerServers = new HashSet<Pair<String,Integer>>();
+
+ private ConcurrentHashMap<Integer, Trigger> triggerIdMap = new ConcurrentHashMap<Integer, Trigger>();
+
+ private Map<String, TriggerAgent> triggerAgents = new HashMap<String, TriggerAgent>();
+
+ private TriggerManagerUpdaterThread triggerManagingThread;
+
+ private long lastThreadCheckTime = -1;
+
+ private long lastUpdateTime = -1;
+
+ public TriggerManager(Props props, TriggerLoader loader) throws TriggerManagerException {
+ this.triggerLoader = loader;
+ this.checkerTypeLoader = new CheckerTypeLoader();
+ this.actionTypeLoader = new ActionTypeLoader();
+
+ String trmMode = props.getString("trigger.runner.manager.mode", "local");
+
+ try {
+ if(trmMode.equals("local")) {
+ trmAdapter = loadTRMLocalAdapter(props, loader);
+ } else if(trmMode.equals("remote")) {
+ trmAdapter = loadTRMRemoteAdapter(props);
+ } else {
+ throw new TriggerManagerException("Unknown trigger runner manager mode " + trmMode);
+ }
+ } catch(Exception e) {
+ throw new TriggerManagerException("Failed to load Trigger Runner Manager: " + e.getMessage());
+ }
+
+ triggerServerHost = props.getString("trigger.server.host", "localhost");
+ triggerServerPort = props.getInt("trigger.server.port");
+
+ triggerManagingThread = new TriggerManagerUpdaterThread();
+
+ try{
+ checkerTypeLoader.init(props);
+ actionTypeLoader.init(props);
+ } catch(Exception e) {
+ e.printStackTrace();
+ logger.error(e.getMessage());
+ }
+
+ Condition.setCheckerLoader(checkerTypeLoader);
+ Trigger.setActionTypeLoader(actionTypeLoader);
+
+ triggerServers.add(new Pair<String, Integer>(triggerServerHost, triggerServerPort));
+
+ }
+
+ private TriggerRunnerManagerAdapter loadTRMLocalAdapter(Props props, TriggerLoader loader) throws IOException {
+ return new TriggerRunnerManager(props, loader);
+ }
+
+ private TriggerRunnerManagerAdapter loadTRMRemoteAdapter(Props props) {
+ return null;
+ }
+
+ public void start() throws Exception {
+ loadTriggers();
+ for(TriggerAgent agent : triggerAgents.values()) {
+ agent.start();
+ }
+ triggerManagingThread.start();
+ }
+
+ private static class SuffixFilter implements FileFilter {
+ private String suffix;
+ public SuffixFilter(String suffix) {
+ this.suffix = suffix;
+ }
+
+ @Override
+ public boolean accept(File pathname) {
+ String name = pathname.getName();
+ return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
+ }
+ }
+
+ public String getTriggerServerHost() {
+ return triggerServerHost;
+ }
+
+ public int getTriggerServerPort() {
+ return triggerServerPort;
+ }
+
+ public State getUpdaterThreadState() {
+ return triggerManagingThread.getState();
+ }
+
+ public boolean isThreadActive() {
+ return triggerManagingThread.isAlive();
+ }
+
+ public long getLastThreadCheckTime() {
+ return lastThreadCheckTime;
+ }
+
+ public Set<String> getPrimaryServerHosts() {
+ // Only one for now. More probably later.
+ HashSet<String> ports = new HashSet<String>();
+ ports.add(triggerServerHost + ":" + triggerServerPort);
+ return ports;
+ }
+
+ private void loadTriggers() throws TriggerManagerException {
+ List<Trigger> triggerList;
+ try {
+ triggerList = triggerLoader.loadTriggers();
+ } catch (TriggerLoaderException e) {
+ throw new TriggerManagerException(e);
+ }
+ for(Trigger t : triggerList) {
+ if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
+ removeTrigger(t, "azkaban");
+ } else {
+ triggerIdMap.put(t.getTriggerId(), t);
+ }
+ }
+ }
+
+ public Trigger getTrigger(int triggerId) {
+ return triggerIdMap.get(triggerId);
+ }
+
+ public void removeTrigger(Trigger t, String userId) throws TriggerManagerException {
+ synchronized(t) {
+ logger.info("Removing trigger " + t.getTriggerId() + " by " + userId);
+ callTriggerServer(t, TriggerConnectorParams.REMOVE_TRIGGER_ACTION, userId);
+ triggerIdMap.remove(t.getTriggerId());
+ }
+ }
+
+ public void updateTrigger(Trigger t, String userId) throws TriggerManagerException {
+ synchronized(t) {
+ try {
+ triggerLoader.updateTrigger(t);
+ callTriggerServer(t, TriggerConnectorParams.UPDATE_TRIGGER_ACTION, userId);
+ } catch(Exception e) {
+ throw new TriggerManagerException(e);
+ }
+ }
+ }
+
+// public void getUpdatedTriggers() throws TriggerManagerException {
+// try {
+// callTriggerServer(triggerServerHost, triggerServerPort, TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", (Pair<String,String>[])null);
+// } catch(IOException e) {
+// throw new TriggerManagerException(e);
+// }
+// }
+
+ public String insertTrigger(Trigger t, String userId) throws TriggerManagerException {
+ synchronized(t) {
+ String message = null;
+ logger.info("Inserting trigger into system. " );
+ // The trigger id is set by the loader. So it's unavailable until after this call.
+ t.setStatus(TriggerStatus.PREPARING);
+ try {
+ triggerLoader.addTrigger(t);
+ callTriggerServer(t, TriggerConnectorParams.INSERT_TRIGGER_ACTION, userId);
+ triggerIdMap.put(t.getTriggerId(), t);
+
+ message += "Trigger inserted successfully with trigger id " + t.getTriggerId();
+ }
+ catch (Exception e) {
+ throw new TriggerManagerException(e);
+ }
+ return message;
+ }
+ }
+
+ private Map<String, Object> callTriggerServer(Trigger t, String action, String user) throws TriggerManagerException {
+ try {
+ return callTriggerServer(triggerServerHost, triggerServerPort, action, t.getTriggerId(), user, (Pair<String,String>[])null);
+ } catch (IOException e) {
+ throw new TriggerManagerException(e);
+ }
+ }
+
+ private Map<String, Object> callTriggerServer(String host, int port, String action, Integer triggerId, String user, Pair<String,String> ... params) throws IOException {
+ URIBuilder builder = new URIBuilder();
+ builder.setScheme("http")
+ .setHost(host)
+ .setPort(port)
+ .setPath("/trigger");
+
+ builder.setParameter(TriggerConnectorParams.ACTION_PARAM, action);
+
+ if (triggerId != null) {
+ builder.setParameter(TriggerConnectorParams.TRIGGER_ID_PARAM,String.valueOf(triggerId));
+ }
+
+ if (user != null) {
+ builder.setParameter(TriggerConnectorParams.USER_PARAM, user);
+ }
+
+ if (params != null) {
+ for (Pair<String, String> pair: params) {
+ builder.setParameter(pair.getFirst(), pair.getSecond());
+ }
+ }
+
+ URI uri = null;
+ try {
+ uri = builder.build();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ ResponseHandler<String> responseHandler = new BasicResponseHandler();
+
+ HttpClient httpclient = new DefaultHttpClient();
+ HttpGet httpget = new HttpGet(uri);
+ String response = null;
+ try {
+ response = httpclient.execute(httpget, responseHandler);
+ } catch (IOException e) {
+ throw e;
+ }
+ finally {
+ httpclient.getConnectionManager().shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+ String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
+ if (error != null) {
+ throw new IOException(error);
+ }
+
+ return jsonResponse;
+ }
+
+ public Map<String, Object> callTriggerServerJMX(String hostPort, String action, String mBean) throws IOException {
+ URIBuilder builder = new URIBuilder();
+
+ String[] hostPortSplit = hostPort.split(":");
+ builder.setScheme("http")
+ .setHost(hostPortSplit[0])
+ .setPort(Integer.parseInt(hostPortSplit[1]))
+ .setPath("/jmx");
+
+ builder.setParameter(action, "");
+ if (mBean != null) {
+ builder.setParameter(TriggerConnectorParams.JMX_MBEAN, mBean);
+ }
+
+ URI uri = null;
+ try {
+ uri = builder.build();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ ResponseHandler<String> responseHandler = new BasicResponseHandler();
+
+ HttpClient httpclient = new DefaultHttpClient();
+ HttpGet httpget = new HttpGet(uri);
+ String response = null;
+ try {
+ response = httpclient.execute(httpget, responseHandler);
+ } catch (IOException e) {
+ throw e;
+ }
+ finally {
+ httpclient.getConnectionManager().shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+ String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
+ if (error != null) {
+ throw new IOException(error);
+ }
+ return jsonResponse;
+ }
+
+ public void shutdown() {
+ triggerManagingThread.shutdown();
+ }
+
+ private class TriggerManagerUpdaterThread extends Thread {
+ private boolean shutdown = false;
+
+ public TriggerManagerUpdaterThread() {
+ this.setName("TriggerManagingThread");
+ }
+
+ private int waitTimeIdleMs = 2000;
+ private int waitTimeMs = 500;
+
+ private void shutdown() {
+ shutdown = true;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void run() {
+ while(!shutdown) {
+ try {
+ lastThreadCheckTime = System.currentTimeMillis();
+
+ Pair<String, Integer> triggerServer = (Pair<String, Integer>) triggerServers.toArray()[0];
+
+ Pair<String, String> updateTimeParam = new Pair<String, String>("lastUpdateTime", String.valueOf(lastUpdateTime));
+ Map<String, Object> results = null;
+ try{
+ results = callTriggerServer(triggerServer.getFirst(), triggerServer.getSecond(), TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", updateTimeParam);
+// lastUpdateTime = (Long) results.get(TriggerConnectorParams.RESPONSE_UPDATETIME);
+
+ List<Integer> updates = (List<Integer>) results.get("updates");
+ for(Integer update : updates) {
+ Trigger t = triggerLoader.loadTrigger(update);
+ lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
+
+ if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
+ removeTrigger(t, "azkaban");
+ //triggerIdMap.remove(update);
+ } else {
+ triggerIdMap.put(update, t);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error(e);
+ }
+
+ synchronized(this) {
+ try {
+ if (triggerIdMap.size() > 0) {
+ this.wait(waitTimeMs);
+ }
+ else {
+ this.wait(waitTimeIdleMs);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ catch (Exception e) {
+ logger.error(e);
+ }
+ }
+ }
+ }
+
+ private static class ConnectionInfo {
+ private String host;
+ private int port;
+
+ public ConnectionInfo(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ @SuppressWarnings("unused")
+ private ConnectionInfo getOuterType() {
+ return ConnectionInfo.this;
+ }
+
+ public boolean isEqual(String host, int port) {
+ return this.port == port && this.host.equals(host);
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((host == null) ? 0 : host.hashCode());
+ result = prime * result + port;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ConnectionInfo other = (ConnectionInfo) obj;
+ if (host == null) {
+ if (other.host != null)
+ return false;
+ } else if (!host.equals(other.host))
+ return false;
+ if (port != other.port)
+ return false;
+ return true;
+ }
+ }
+
+ public void loadTriggerFromDir(File baseDir, Props props) throws TriggerManagerException {
+ File[] triggerFiles = baseDir.listFiles(new SuffixFilter(TRIGGER_SUFFIX));
+
+ for(File triggerFile : triggerFiles) {
+ try{
+ Props triggerProps = new Props(props, triggerFile);
+ String triggerType = triggerProps.getString("trigger.type");
+ TriggerAgent agent = triggerAgents.get(triggerType);
+ if(agent != null) {
+ agent.loadTriggerFromProps(triggerProps);
+ } else {
+ throw new TriggerManagerException("Trigger " + triggerType + " is not supported.");
+ }
+ } catch (Exception e) {
+ throw new TriggerManagerException(e);
+ }
+ }
+ }
+
+ public List<Trigger> getTriggers() {
+ return new ArrayList<Trigger>(triggerIdMap.values());
+ }
+
+ public void expireTrigger(int triggerId) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public CheckerTypeLoader getCheckerLoader() {
+ return checkerTypeLoader;
+ }
+
+ public ActionTypeLoader getActionLoader() {
+ return actionTypeLoader;
+ }
+
+ public void addTriggerAgent(String triggerSource,
+ TriggerAgent agent) {
+ triggerAgents.put(triggerSource, agent);
+ }
+
+ public List<Trigger> getTriggers(String triggerSource) {
+ List<Trigger> results = new ArrayList<Trigger>();
+ for(Trigger t : triggerIdMap.values()) {
+ if(t.getSource().equals(triggerSource)) {
+ results.add(t);
+ }
+ }
+ return results;
+ }
+
+ public List<Trigger> getUpdatedTriggers(String triggerSource, long lastUpdateTime) throws TriggerManagerException {
+ getUpdatedTriggers();
+ List<Trigger> triggers = new ArrayList<Trigger>();
+ for(Trigger t : triggerIdMap.values()) {
+ if(t.getSource().equals(triggerSource) && t.getLastModifyTime().getMillis() >= lastUpdateTime) {
+ triggers.add(t);
+ }
+ }
+ return triggers;
+ }
+
+ private void getUpdatedTriggers() throws TriggerManagerException {
+ List<Trigger> triggers;
+ try {
+ triggers = triggerLoader.getUpdatedTriggers(this.lastUpdateTime);
+ } catch (TriggerLoaderException e) {
+ throw new TriggerManagerException(e);
+ }
+ for(Trigger t : triggers) {
+ this.lastUpdateTime = Math.max(this.lastUpdateTime, t.getLastModifyTime().getMillis());
+ triggerIdMap.put(t.getTriggerId(), t);
+ }
+ }
+
+ public void removeTrigger(int triggerId, String submitUser) throws TriggerManagerException {
+ removeTrigger(triggerIdMap.get(triggerId), submitUser);
+ }
+
+ public Set<String> getAllActiveTriggerServerHosts() {
+ Set<String> hostport = new HashSet<String>();
+ hostport.add(triggerServerHost+":"+triggerServerPort);
+ return hostport;
+ }
+
+ public int getNumTriggers() {
+ return triggerIdMap.size();
+ }
+
+ public String getTriggerSources() {
+ Set<String> sources = new HashSet<String>();
+ for(Trigger t : triggerIdMap.values()) {
+ sources.add(t.getSource());
+ }
+ return sources.toString();
+ }
+
+ public String getTriggerIds() {
+ return triggerIdMap.keySet().toString();
+ }
+
+
+
+}
+
diff --git a/src/java/azkaban/trigger/TriggerManagerAdapter.java b/src/java/azkaban/trigger/TriggerManagerAdapter.java
new file mode 100644
index 0000000..2ce5df5
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerManagerAdapter.java
@@ -0,0 +1,47 @@
+package azkaban.trigger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTimeZone;
+
+import azkaban.triggerapp.TriggerRunnerManagerException;
+
+public interface TriggerManagerAdapter {
+ public void insertTrigger(Trigger t, String user) throws TriggerManagerException;
+
+ public void removeTrigger(int id, String user) throws TriggerManagerException;
+
+ public void updateTrigger(int triggerId, String user) throws TriggerManagerException;
+
+ void updateTrigger(Trigger t, String user) throws TriggerManagerException;
+
+ public void insertTrigger(int triggerId, String user) throws TriggerManagerException;
+
+ public List<Integer> getTriggerUpdates(long lastUpdateTime) throws TriggerManagerException;
+
+ public List<Trigger> getTriggerUpdates(String triggerSource, long lastUpdateTime) throws TriggerManagerException;
+
+ public void start() throws TriggerManagerException;
+
+ public void shutdown();
+
+ public void registerCheckerType(String name, Class<? extends ConditionChecker> checker);
+
+ public void registerActionType(String name, Class<? extends TriggerAction> action);
+
+ public TriggerJMX getJMX();
+
+ public interface TriggerJMX {
+ public long getLastRunnerThreadCheckTime();
+ public boolean isRunnerThreadActive();
+ public String getPrimaryServerHost();
+ public int getNumTriggers();
+ public String getTriggerSources();
+ public String getTriggerIds();
+ public long getScannerIdleTime();
+ public Map<String, Object> getAllJMXMbeans();
+ }
+
+}
diff --git a/src/java/azkaban/trigger/TriggerManagerRemoteAdapter.java b/src/java/azkaban/trigger/TriggerManagerRemoteAdapter.java
new file mode 100644
index 0000000..4f9cab9
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerManagerRemoteAdapter.java
@@ -0,0 +1,181 @@
+package azkaban.trigger;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import azkaban.triggerapp.TriggerConnectorParams;
+import azkaban.triggerapp.TriggerRunnerManagerException;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+
+public class TriggerManagerRemoteAdapter implements TriggerManagerAdapter{
+
+ private final String host;
+ private final int port;
+
+ public TriggerManagerRemoteAdapter(Props props) {
+ host = props.getString("trigger.server.host", "localhost");
+ port = props.getInt("trigger.server.port");
+ }
+
+ @Override
+ public void insertTrigger(Trigger t, String user) throws TriggerManagerException {
+ try {
+ callRemoteTriggerRunnerManager(TriggerConnectorParams.INSERT_TRIGGER_ACTION, t.getTriggerId(), user, (Pair<String,String>[])null);
+ } catch(IOException e) {
+ throw new TriggerManagerException(e);
+ }
+ }
+
+ @Override
+ public void removeTrigger(int id, String user) throws TriggerManagerException {
+ try {
+ callRemoteTriggerRunnerManager(TriggerConnectorParams.REMOVE_TRIGGER_ACTION, id, user, (Pair<String,String>[])null);
+ } catch(IOException e) {
+ throw new TriggerManagerException(e);
+ }
+ }
+
+ @Override
+ public void updateTrigger(int triggerId, String user) throws TriggerManagerException {
+ try {
+ callRemoteTriggerRunnerManager(TriggerConnectorParams.UPDATE_TRIGGER_ACTION, triggerId, user, (Pair<String,String>[])null);
+ } catch(IOException e) {
+ throw new TriggerManagerException(e);
+ }
+ }
+
+ private Map<String, Object> callRemoteTriggerRunnerManager(String action, Integer triggerId, String user, Pair<String,String> ... params) throws IOException {
+ URIBuilder builder = new URIBuilder();
+ builder.setScheme("http")
+ .setHost(host)
+ .setPort(port)
+ .setPath(TriggerManagerServlet.WEB_PATH);
+
+ builder.setParameter(TriggerConnectorParams.ACTION_PARAM, action);
+
+ if (triggerId != null) {
+ builder.setParameter(TriggerConnectorParams.TRIGGER_ID_PARAM,String.valueOf(triggerId));
+ }
+
+ if (user != null) {
+ builder.setParameter(TriggerConnectorParams.USER_PARAM, user);
+ }
+
+ if (params != null) {
+ for (Pair<String, String> pair: params) {
+ builder.setParameter(pair.getFirst(), pair.getSecond());
+ }
+ }
+
+ URI uri = null;
+ try {
+ uri = builder.build();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ ResponseHandler<String> responseHandler = new BasicResponseHandler();
+
+ HttpClient httpclient = new DefaultHttpClient();
+ HttpGet httpget = new HttpGet(uri);
+ String response = null;
+ try {
+ response = httpclient.execute(httpget, responseHandler);
+ } catch (IOException e) {
+ throw e;
+ }
+ finally {
+ httpclient.getConnectionManager().shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+ String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
+ if (error != null) {
+ throw new IOException(error);
+ }
+
+ return jsonResponse;
+ }
+
+ @Override
+ public void start() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void insertTrigger(int triggerId, String user) throws TriggerManagerException {
+ try {
+ callRemoteTriggerRunnerManager(TriggerConnectorParams.INSERT_TRIGGER_ACTION, triggerId, user, (Pair<String,String>[])null);
+ } catch(IOException e) {
+ throw new TriggerManagerException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<Integer> getTriggerUpdates(long lastUpdateTime) throws TriggerManagerException {
+ List<Integer> updated;
+ try {
+ Map<String, Object> response = callRemoteTriggerRunnerManager(TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", (Pair<String,String>[])null);
+ updated = (List<Integer>) response.get(TriggerConnectorParams.RESPONSE_UPDATED_TRIGGERS);
+ return updated;
+ } catch(IOException e) {
+ throw new TriggerManagerException(e);
+ }
+
+ }
+
+ @Override
+ public void updateTrigger(Trigger t, String user)
+ throws TriggerManagerException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public List<Trigger> getTriggerUpdates(String triggerSource,
+ long lastUpdateTime) throws TriggerManagerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void shutdown() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void registerCheckerType(String name, Class<? extends ConditionChecker> checker) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void registerActionType(String name,
+ Class<? extends TriggerAction> action) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public TriggerJMX getJMX() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
src/java/azkaban/trigger/TriggerManagerServlet.java 158(+158 -0)
diff --git a/src/java/azkaban/trigger/TriggerManagerServlet.java b/src/java/azkaban/trigger/TriggerManagerServlet.java
new file mode 100644
index 0000000..3595e82
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerManagerServlet.java
@@ -0,0 +1,158 @@
+package azkaban.trigger;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.joda.time.DateTime;
+
+import azkaban.execapp.AzkabanExecutorServer;
+import azkaban.execapp.FlowRunnerManager;
+import azkaban.executor.ConnectorParams;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.triggerapp.AzkabanTriggerServer;
+import azkaban.triggerapp.TriggerConnectorParams;
+import azkaban.triggerapp.TriggerRunnerManagerException;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.FileIOUtils.JobMetaData;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.webapp.servlet.AbstractServiceServlet;
+import azkaban.webapp.servlet.AzkabanServletContextListener;
+
+public class TriggerManagerServlet extends AbstractServiceServlet implements TriggerConnectorParams {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = Logger.getLogger(TriggerManagerServlet.class.getName());
+ public static final String JSON_MIME_TYPE = "application/json";
+
+ private AzkabanTriggerServer application;
+ private TriggerManager triggerManager;
+
+ public static final String WEB_PATH = "/triggermanager";
+
+ public TriggerManagerServlet() {
+ super();
+ }
+
+ @Override
+ public void init(ServletConfig config) throws ServletException {
+ application = (AzkabanTriggerServer) config.getServletContext().getAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
+
+ if (application == null) {
+ throw new IllegalStateException(
+ "No batch application is defined in the servlet context!");
+ }
+
+ triggerManager = application.getTriggerManager();
+ }
+
+ @Override
+ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ HashMap<String,Object> respMap= new HashMap<String,Object>();
+ //logger.info("ExecutorServer called by " + req.getRemoteAddr());
+ try {
+ if (!hasParam(req, ACTION_PARAM)) {
+ logger.error("Parameter action not set");
+ respMap.put("error", "Parameter action not set");
+ }
+ else {
+ String action = getParam(req, ACTION_PARAM);
+ if (action.equals(GET_UPDATE_ACTION)) {
+ //logger.info("Updated called");
+ handleAjaxGetUpdateRequest(req, respMap);
+ }
+ else if (action.equals(PING_ACTION)) {
+ respMap.put("status", "alive");
+ }
+ else {
+ int triggerId = Integer.parseInt(getParam(req, TRIGGER_ID_PARAM));
+ String user = getParam(req, USER_PARAM, null);
+
+ logger.info("User " + user + " has called action " + action + " on " + triggerId);
+ if (action.equals(INSERT_TRIGGER_ACTION)) {
+ logger.info("Insert Trigger Action");
+ handleInsertTrigger(triggerId, user, req, resp, respMap);
+ } else if (action.equals(REMOVE_TRIGGER_ACTION)) {
+ logger.info("Remove Trigger Action");
+ handleRemoveTrigger(triggerId, user, req, resp, respMap);
+ }
+ else if (action.equals(UPDATE_TRIGGER_ACTION)) {
+ logger.info("Update Trigger Action");
+ handleUpdateTrigger(triggerId, user, req, respMap);
+ }
+ else {
+ logger.error("action: '" + action + "' not supported.");
+ respMap.put("error", "action: '" + action + "' not supported.");
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error(e);
+ respMap.put(RESPONSE_ERROR, e.getMessage());
+ }
+ writeJSON(resp, respMap);
+ resp.flushBuffer();
+ }
+
+
+
+ private void handleAjaxGetUpdateRequest(HttpServletRequest req, HashMap<String, Object> respMap) {
+ List<Integer> updates = null;
+ try{
+ long lastUpdateTime = getLongParam(req, "lastUpdateTime");
+// respMap.put(TriggerConnectorParams.RESPONSE_UPDATETIME, DateTime.now().getMillis());
+ updates = triggerManager.getTriggerUpdates(lastUpdateTime);
+ if(updates.size() > 0) {
+ System.out.println("got " + updates.size() + " updates" );
+ }
+ respMap.put("updates", updates);
+ } catch (Exception e) {
+ logger.error(e);
+ respMap.put("error", e.getMessage());
+ }
+ }
+
+ private void handleInsertTrigger(int triggerId, String user, HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> respMap) {
+ try {
+ triggerManager.insertTrigger(triggerId, user);
+ } catch (TriggerManagerException e) {
+ logger.error(e);
+ respMap.put("error", e.getMessage());
+ }
+ }
+
+ private void handleUpdateTrigger(int triggerId, String user, HttpServletRequest req, HashMap<String, Object> respMap) {
+ try {
+ triggerManager.updateTrigger(triggerId, user);
+ } catch (TriggerManagerException e) {
+ logger.error(e);
+ respMap.put("error", e.getMessage());
+ }
+ }
+
+ private void handleRemoveTrigger(int triggerId, String user, HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> respMap) {
+ try {
+ triggerManager.removeTrigger(triggerId, user);
+ } catch (TriggerManagerException e) {
+ logger.error(e);
+ respMap.put("error", e.getMessage());
+ }
+ }
+
+ @Override
+ public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+
+ }
+
+}
diff --git a/src/java/azkaban/triggerapp/AzkabanTriggerServer.java b/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
index 5ed1c41..ffef121 100644
--- a/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
+++ b/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
@@ -28,20 +28,23 @@ import org.mortbay.thread.QueuedThreadPool;
import azkaban.executor.ExecutorMailer;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
+import azkaban.executor.ExecutorManagerRemoteAdapter;
import azkaban.executor.JdbcExecutorLoader;
import azkaban.executor.ExecutorManager.Alerter;
-import azkaban.jmx.JmxExecutorManager;
+import azkaban.jmx.JmxExecutorManagerAdapter;
import azkaban.jmx.JmxJettyServer;
-import azkaban.jmx.JmxTriggerRunnerManager;
+import azkaban.jmx.JmxTriggerManager;
import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectManager;
-import azkaban.trigger.ActionTypeLoader;
-import azkaban.trigger.CheckerTypeLoader;
import azkaban.trigger.JdbcTriggerLoader;
import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerAdapter;
+import azkaban.trigger.TriggerManagerServlet;
import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.trigger.builtin.CreateTriggerAction;
-import azkaban.trigger.builtin.ExecutionChecker;
import azkaban.trigger.builtin.ExecuteFlowAction;
import azkaban.trigger.builtin.KillExecutionAction;
import azkaban.trigger.builtin.SlaAlertAction;
@@ -70,7 +73,7 @@ public class AzkabanTriggerServer {
private static AzkabanTriggerServer app;
private TriggerLoader triggerLoader;
- private TriggerRunnerManager triggerRunnerManager;
+ private TriggerManager triggerManager;
private ExecutorManager executorManager;
private ProjectManager projectManager;
private Props props;
@@ -102,23 +105,23 @@ public class AzkabanTriggerServer {
Context root = new Context(server, "/", Context.SESSIONS);
root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);
- root.addServlet(new ServletHolder(new TriggerServerServlet()), "/trigger");
+ root.addServlet(new ServletHolder(new TriggerManagerServlet()), TriggerManagerServlet.WEB_PATH);
root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, this);
triggerLoader = createTriggerLoader(props);
projectManager = loadProjectManager(props);
executorManager = loadExecutorManager(props);
- triggerRunnerManager = loadTriggerRunnerManager(props, triggerLoader);
+ triggerManager = loadTriggerManager(props, triggerLoader);
String triggerPluginDir = props.getString("trigger.plugin.dir", "plugins/triggers");
- loadBuiltinCheckersAndActions(this);
- loadPluginCheckersAndActions(triggerPluginDir, this);
+ loadBuiltinCheckersAndActions();
+ loadPluginCheckersAndActions(triggerPluginDir);
configureMBeanServer();
try {
- triggerRunnerManager.start();
+ triggerManager.start();
server.start();
}
catch (Exception e) {
@@ -132,20 +135,58 @@ public class AzkabanTriggerServer {
- private TriggerRunnerManager loadTriggerRunnerManager(Props props, TriggerLoader triggerLoader) throws IOException {
- logger.info("Loading trigger runner manager");
- TriggerRunnerManager trm = new TriggerRunnerManager(props, triggerLoader);
- trm.init();
+ private TriggerManager loadTriggerManager(Props props, TriggerLoader triggerLoader) throws Exception {
+ logger.info("Loading trigger manager");
+ TriggerManager trm;
+ try {
+ trm = new TriggerManager(props, triggerLoader);
+ } catch (TriggerManagerException e) {
+ throw new Exception(e);
+ }
return trm;
}
+ private TriggerManagerAdapter loadTriggerRunnerManagerAdapter(Props props, TriggerLoader triggerLoader) throws Exception {
+ TriggerManagerAdapter trmAdapter;
+ String trmMode = props.getString("trigger.runner.manager.mode", "local");
+ try {
+ if(trmMode.equals("local")) {
+ trmAdapter = new TriggerManager(props, triggerLoader);
+ } else if(trmMode.equals("remote")) {
+ trmAdapter = null;
+ } else {
+ throw new TriggerManagerException("Unknown trigger runner manager mode " + trmMode);
+ }
+ } catch(Exception e) {
+ throw new Exception("Failed to load Trigger Runner Manager: " + e.getMessage());
+ }
+ return trmAdapter;
+ }
+
private ExecutorManager loadExecutorManager(Props props) throws Exception {
logger.info("Loading executor manager");
JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
- ExecutorManager execManager = new ExecutorManager(props, loader, false);
+ ExecutorManager execManager = new ExecutorManager(props, loader);
return execManager;
}
+ private ExecutorManagerAdapter loadExecutorManagerAdapter(Props props) throws Exception {
+// JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
+// ExecutorManager execManager = new ExecutorManager(props, loader, true);
+// return execManager;
+ String executorMode = props.getString("executor.manager.mode", "local");
+ ExecutorManagerAdapter adapter;
+ if(executorMode.equals("local")) {
+ adapter = loadExecutorManager(props);
+ } else if(executorMode.equals("remote")) {
+ JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
+ adapter = new ExecutorManagerRemoteAdapter(props, loader);
+ } else {
+ throw new Exception("Unknown ExecutorManager mode " + executorMode);
+ }
+ return adapter;
+ }
+
private ProjectManager loadProjectManager(Props props) {
logger.info("Loading project manager");
JdbcProjectLoader loader = new JdbcProjectLoader(props);
@@ -153,42 +194,31 @@ public class AzkabanTriggerServer {
return manager;
}
- private void loadBuiltinCheckersAndActions(AzkabanTriggerServer app) {
+ private void loadBuiltinCheckersAndActions() {
logger.info("Loading built-in checker and action types");
-// ExecutorManager executorManager = app.getExecutorManager();
-// TriggerRunnerManager triggerRunnerManager = app.getTriggerRunnerManager();
- CheckerTypeLoader checkerLoader = triggerRunnerManager.getCheckerLoader();
- ActionTypeLoader actionLoader = triggerRunnerManager.getActionLoader();
- // time:
- checkerLoader.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
-// // execution checker
-// ExecutionChecker.setExecutorManager(executorManager);
-// checkerLoader.registerCheckerType(ExecutionChecker.type, ExecutionChecker.class);
- // Sla checker
- SlaChecker.setExecutorManager(executorManager);
- checkerLoader.registerCheckerType(SlaChecker.type, SlaChecker.class);
- // execut flow action
- ExecuteFlowAction.setExecutorManager(executorManager);
- ExecuteFlowAction.setProjectManager(projectManager);
- ExecuteFlowAction.setTriggerRunnerManager(triggerRunnerManager);
- actionLoader.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
- // kill flow action
- KillExecutionAction.setExecutorManager(executorManager);
- actionLoader.registerActionType(KillExecutionAction.type, KillExecutionAction.class);
- // sla alert
- SlaAlertAction.setExecutorManager(executorManager);
- Map<String, Alerter> alerters = loadAlerters(props);
- SlaAlertAction.setAlerters(alerters);
- SlaAlertAction.setExecutorManager(executorManager);
- actionLoader.registerActionType(SlaAlertAction.type, SlaAlertAction.class);
- // create trigger action
- CreateTriggerAction.setTriggerRunnerManager(triggerRunnerManager);
- actionLoader.registerActionType(CreateTriggerAction.type, CreateTriggerAction.class);
+ if(triggerManager instanceof TriggerManager) {
+ SlaChecker.setExecutorManager(executorManager);
+ ExecuteFlowAction.setExecutorManager(executorManager);
+ ExecuteFlowAction.setProjectManager(projectManager);
+ ExecuteFlowAction.setTriggerManager(triggerManager);
+ KillExecutionAction.setExecutorManager(executorManager);
+ SlaAlertAction.setExecutorManager(executorManager);
+ Map<String, Alerter> alerters = loadAlerters(props);
+ SlaAlertAction.setAlerters(alerters);
+ SlaAlertAction.setExecutorManager(executorManager);
+ CreateTriggerAction.setTriggerManager(triggerManager);
+ }
+ triggerManager.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
+ triggerManager.registerCheckerType(SlaChecker.type, SlaChecker.class);
+ triggerManager.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
+ triggerManager.registerActionType(KillExecutionAction.type, KillExecutionAction.class);
+ triggerManager.registerActionType(SlaAlertAction.type, SlaAlertAction.class);
+ triggerManager.registerActionType(CreateTriggerAction.type, CreateTriggerAction.class);
}
- private void loadPluginCheckersAndActions(String pluginPath, AzkabanTriggerServer app) {
+ private void loadPluginCheckersAndActions(String pluginPath) {
logger.info("Loading plug-in checker and action types");
File triggerPluginPath = new File(pluginPath);
if (!triggerPluginPath.exists()) {
@@ -196,7 +226,7 @@ public class AzkabanTriggerServer {
return;
}
- ClassLoader parentLoader = AzkabanTriggerServer.class.getClassLoader();
+ ClassLoader parentLoader = this.getClass().getClassLoader();
File[] pluginDirs = triggerPluginPath.listFiles();
ArrayList<String> jarPaths = new ArrayList<String>();
for (File pluginDir: pluginDirs) {
@@ -577,8 +607,14 @@ public class AzkabanTriggerServer {
mbeanServer = ManagementFactory.getPlatformMBeanServer();
registerMbean("triggerServerJetty", new JmxJettyServer(server));
- registerMbean("triggerRunnerManager", new JmxTriggerRunnerManager(triggerRunnerManager));
- registerMbean("executorManager", new JmxExecutorManager(executorManager));
+// if(triggerRunnerManager instanceof TriggerRunnerManagerLocalAdapter) {
+// registerMbean("triggerRunnerManager", new JmxTriggerRunnerManager(((TriggerRunnerManagerLocalAdapter)triggerRunnerManager).getTriggerRunnerManager()));
+// }
+ registerMbean("triggerManager", new JmxTriggerManager(triggerManager));
+// if(executorManager instanceof ExecutorManagerLocalAdapter) {
+// registerMbean("executorManager", new JmxExecutorManager(((ExecutorManagerLocalAdapter)executorManager).getExecutorManager()));
+// }
+ registerMbean("executorManager", new JmxExecutorManagerAdapter(executorManager));
}
public void close() {
@@ -603,7 +639,14 @@ public class AzkabanTriggerServer {
} catch (Exception e) {
logger.error("Error registering mbean " + mbeanClass.getCanonicalName(), e);
}
-
+// if(executorManager instanceof ExecutorManagerLocalAdapter) {
+// ((ExecutorManagerLocalAdapter)executorManager).getExecutorManager().shutdown();
+// }
+// if(triggerRunnerManager instanceof TriggerRunnerManagerLocalAdapter) {
+//
+// }
+ executorManager.shutdown();
+ triggerManager.shutdown();
}
public List<ObjectName> getMbeanNames() {
@@ -628,8 +671,8 @@ public class AzkabanTriggerServer {
}
}
- public TriggerRunnerManager getTriggerRunnerManager() {
- return triggerRunnerManager;
+ public TriggerManager getTriggerManager() {
+ return triggerManager;
}
public ExecutorManager getExecutorManager() {
diff --git a/src/java/azkaban/triggerapp/TriggerRunnerManagerException.java b/src/java/azkaban/triggerapp/TriggerRunnerManagerException.java
new file mode 100644
index 0000000..4254c7a
--- /dev/null
+++ b/src/java/azkaban/triggerapp/TriggerRunnerManagerException.java
@@ -0,0 +1,33 @@
+package azkaban.triggerapp;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class TriggerRunnerManagerException extends Exception{
+ private static final long serialVersionUID = 1L;
+
+ public TriggerRunnerManagerException(String message) {
+ super(message);
+ }
+
+ public TriggerRunnerManagerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TriggerRunnerManagerException(Throwable e) {
+ super(e);
+ }
+}
diff --git a/src/java/azkaban/webapp/AzkabanServer.java b/src/java/azkaban/webapp/AzkabanServer.java
index 5da23ee..db9e14a 100644
--- a/src/java/azkaban/webapp/AzkabanServer.java
+++ b/src/java/azkaban/webapp/AzkabanServer.java
@@ -119,4 +119,5 @@ public abstract class AzkabanServer {
public abstract VelocityEngine getVelocityEngine();
public abstract UserManager getUserManager();
+
}
\ No newline at end of file
src/java/azkaban/webapp/AzkabanWebServer.java 391(+337 -54)
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 39c105c..d0731ed 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -53,10 +53,13 @@ import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
import azkaban.database.AzkabanDatabaseSetup;
+import azkaban.executor.ExecutorMailer;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
+import azkaban.executor.ExecutorManagerRemoteAdapter;
import azkaban.executor.JdbcExecutorLoader;
import azkaban.executor.ExecutorManager.Alerter;
-import azkaban.jmx.JmxExecutorManager;
+import azkaban.jmx.JmxExecutorManagerAdapter;
import azkaban.jmx.JmxJettyServer;
import azkaban.jmx.JmxScheduler;
import azkaban.jmx.JmxTriggerManager;
@@ -146,7 +149,7 @@ public class AzkabanWebServer extends AzkabanServer {
private final Server server;
private UserManager userManager;
private ProjectManager projectManager;
- private ExecutorManager executorManager;
+ private ExecutorManagerAdapter executorManager;
private ScheduleManager scheduleManager;
// private TriggerBasedScheduler scheduler;
private TriggerManager triggerManager;
@@ -183,9 +186,13 @@ public class AzkabanWebServer extends AzkabanServer {
triggerManager = loadTriggerManager(props);
executorManager = loadExecutorManager(props);
projectManager = loadProjectManager(props, triggerManager);
+
+ // load all triggger agents here
scheduleManager = loadScheduleManager(executorManager, triggerManager, props);
loadBuiltinCheckersAndActions();
+ String triggerPluginDir = props.getString("trigger.plugin.dir", "plugins/triggers");
+ loadPluginCheckersAndActions(triggerPluginDir);
baseClassLoader = getBaseClassloader();
@@ -239,34 +246,54 @@ public class AzkabanWebServer extends AzkabanServer {
JdbcProjectLoader loader = new JdbcProjectLoader(props);
ProjectManager manager = new ProjectManager(loader, props);
- manager.setTriggerManager(tm);
-
return manager;
}
private ExecutorManager loadExecutorManager(Props props) throws Exception {
JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
- ExecutorManager execManager = new ExecutorManager(props, loader, true);
+ ExecutorManager execManager = new ExecutorManager(props, loader);
return execManager;
}
-
- private ScheduleManager loadScheduleManager(ExecutorManager executorManager, TriggerManager tm, Props props ) throws Exception {
- ScheduleManager schedManager = null;
- String scheduleLoaderType = props.getString("azkaban.scheduler.loader", "TriggerBasedScheduleLoader");
- if(scheduleLoaderType.equals("JdbcScheduleLoader")) {
- ScheduleLoader loader = new JdbcScheduleLoader(props);
- schedManager = new ScheduleManager(executorManager, loader, false);
- schedManager.setProjectManager(projectManager);
- schedManager.start();
- } else if(scheduleLoaderType.equals("TriggerBasedScheduleLoader")) {
- logger.info("Loading trigger based scheduler");
- ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, executorManager, null, ScheduleManager.triggerSource);
- schedManager = new ScheduleManager(executorManager, loader, true);
+
+ private ExecutorManagerAdapter loadExecutorManagerAdapter(Props props) throws Exception {
+// JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
+// ExecutorManager execManager = new ExecutorManager(props, loader, true);
+// return execManager;
+ String executorMode = props.getString("executor.manager.mode", "local");
+ ExecutorManagerAdapter adapter;
+ if(executorMode.equals("local")) {
+ adapter = loadExecutorManager(props);
+ } else if(executorMode.equals("remote")) {
+ JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
+ adapter = new ExecutorManagerRemoteAdapter(props, loader);
+ } else {
+ throw new Exception("Unknown ExecutorManager mode " + executorMode);
}
-
- return schedManager;
+ return adapter;
}
+
+// private ScheduleManager loadScheduleManager(ExecutorManagerAdapter executorManager, TriggerManager tm, Props props ) throws Exception {
+// ScheduleManager schedManager = null;
+// String scheduleLoaderType = props.getString("azkaban.scheduler.loader", "TriggerBasedScheduleLoader");
+// if(scheduleLoaderType.equals("JdbcScheduleLoader")) {
+// ScheduleLoader loader = new JdbcScheduleLoader(props);
+// schedManager = new ScheduleManager(executorManager, loader, false);
+// schedManager.setProjectManager(projectManager);
+// schedManager.start();
+// } else if(scheduleLoaderType.equals("TriggerBasedScheduleLoader")) {
+// logger.info("Loading trigger based scheduler");
+// ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, executorManager, null, ScheduleManager.triggerSource);
+// schedManager = new ScheduleManager(executorManager, loader, true);
+// }
+//
+// return schedManager;
+// }
+ private ScheduleManager loadScheduleManager(ExecutorManagerAdapter executorManager, TriggerManager tm, Props props ) throws Exception {
+ logger.info("Loading trigger based scheduler");
+ ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, executorManager, null, ScheduleManager.triggerSource);
+ return new ScheduleManager(executorManager, loader, true);
+ }
// private TriggerBasedScheduler loadScheduler(ExecutorManager executorManager, ProjectManager projectManager, TriggerManager triggerManager) {
// TriggerBasedScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager);
// return new TriggerBasedScheduler(executorManager, projectManager, triggerManager, loader);
@@ -279,35 +306,283 @@ public class AzkabanWebServer extends AzkabanServer {
private void loadBuiltinCheckersAndActions() {
logger.info("Loading built-in checker and action types");
-// ExecutorManager executorManager = app.getExecutorManager();
-// TriggerRunnerManager triggerRunnerManager = app.getTriggerRunnerManager();
- CheckerTypeLoader checkerLoader = triggerManager.getCheckerLoader();
- ActionTypeLoader actionLoader = triggerManager.getActionLoader();
- // time:
- checkerLoader.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
-// // execution checker
-// ExecutionChecker.setExecutorManager(executorManager);
-// checkerLoader.registerCheckerType(ExecutionChecker.type, ExecutionChecker.class);
- // Sla checker
-// SlaChecker.setExecutorManager(executorManager);
- checkerLoader.registerCheckerType(SlaChecker.type, SlaChecker.class);
- // execut flow action
-// ExecuteFlowAction.setExecutorManager(executorManager);
-// ExecuteFlowAction.setProjectManager(projectManager);
- actionLoader.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
- // kill flow action
-// KillExecutionAction.setExecutorManager(executorManager);
- actionLoader.registerActionType(KillExecutionAction.type, KillExecutionAction.class);
- // sla alert
-// SlaAlertAction.setExecutorManager(executorManager);
-// Map<String, Alerter> alerters = loadAlerters(props);
-// SlaAlertAction.setAlerters(alerters);
- actionLoader.registerActionType(SlaAlertAction.type, SlaAlertAction.class);
- // create trigger action
-// CreateTriggerAction.setTriggerRunnerManager(triggerRunnerManager);
- actionLoader.registerActionType(CreateTriggerAction.type, CreateTriggerAction.class);
+ if(triggerManager instanceof TriggerManager) {
+ SlaChecker.setExecutorManager(executorManager);
+ ExecuteFlowAction.setExecutorManager(executorManager);
+ ExecuteFlowAction.setProjectManager(projectManager);
+ ExecuteFlowAction.setTriggerManager(triggerManager);
+ KillExecutionAction.setExecutorManager(executorManager);
+ SlaAlertAction.setExecutorManager(executorManager);
+ Map<String, Alerter> alerters = loadAlerters(props);
+ SlaAlertAction.setAlerters(alerters);
+ SlaAlertAction.setExecutorManager(executorManager);
+ CreateTriggerAction.setTriggerManager(triggerManager);
+ }
+
+ triggerManager.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
+ triggerManager.registerCheckerType(SlaChecker.type, SlaChecker.class);
+ triggerManager.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
+ triggerManager.registerActionType(KillExecutionAction.type, KillExecutionAction.class);
+ triggerManager.registerActionType(SlaAlertAction.type, SlaAlertAction.class);
+ triggerManager.registerActionType(CreateTriggerAction.type, CreateTriggerAction.class);
+ }
+
+ private Map<String, Alerter> loadAlerters(Props props) {
+ Map<String, Alerter> allAlerters = new HashMap<String, Alerter>();
+ // load built-in alerters
+ ExecutorMailer mailAlerter = new ExecutorMailer(props);
+ allAlerters.put("email", mailAlerter);
+ // load all plugin alerters
+ String pluginDir = props.getString("alerter.plugin.dir", "plugins/alerter");
+ allAlerters.putAll(loadPluginAlerters(pluginDir));
+ return allAlerters;
+ }
+
+ private Map<String, Alerter> loadPluginAlerters(String pluginPath) {
+ File alerterPluginPath = new File(pluginPath);
+ if (!alerterPluginPath.exists()) {
+ return Collections.<String, Alerter>emptyMap();
+ }
+
+ Map<String, Alerter> installedAlerterPlugins = new HashMap<String, Alerter>();
+ ClassLoader parentLoader = SlaAlertAction.class.getClass().getClassLoader();
+ File[] pluginDirs = alerterPluginPath.listFiles();
+ ArrayList<String> jarPaths = new ArrayList<String>();
+ for (File pluginDir: pluginDirs) {
+ if (!pluginDir.isDirectory()) {
+ logger.error("The plugin path " + pluginDir + " is not a directory.");
+ continue;
+ }
+
+ // Load the conf directory
+ File propertiesDir = new File(pluginDir, "conf");
+ Props pluginProps = null;
+ if (propertiesDir.exists() && propertiesDir.isDirectory()) {
+ File propertiesFile = new File(propertiesDir, "plugin.properties");
+ File propertiesOverrideFile = new File(propertiesDir, "override.properties");
+
+ if (propertiesFile.exists()) {
+ if (propertiesOverrideFile.exists()) {
+ pluginProps = PropsUtils.loadProps(null, propertiesFile, propertiesOverrideFile);
+ }
+ else {
+ pluginProps = PropsUtils.loadProps(null, propertiesFile);
+ }
+ }
+ else {
+ logger.error("Plugin conf file " + propertiesFile + " not found.");
+ continue;
+ }
+ }
+ else {
+ logger.error("Plugin conf path " + propertiesDir + " not found.");
+ continue;
+ }
+
+ String pluginName = pluginProps.getString("alerter.name");
+ List<String> extLibClasspath = pluginProps.getStringList("alerter.external.classpaths", (List<String>)null);
+
+ String pluginClass = pluginProps.getString("alerter.class");
+ if (pluginClass == null) {
+ logger.error("Alerter class is not set.");
+ }
+ else {
+ logger.info("Plugin class " + pluginClass);
+ }
+
+ URLClassLoader urlClassLoader = null;
+ File libDir = new File(pluginDir, "lib");
+ if (libDir.exists() && libDir.isDirectory()) {
+ File[] files = libDir.listFiles();
+
+ ArrayList<URL> urls = new ArrayList<URL>();
+ for (int i=0; i < files.length; ++i) {
+ try {
+ URL url = files[i].toURI().toURL();
+ urls.add(url);
+ } catch (MalformedURLException e) {
+ logger.error(e);
+ }
+ }
+ if (extLibClasspath != null) {
+ for (String extLib : extLibClasspath) {
+ try {
+ File file = new File(pluginDir, extLib);
+ URL url = file.toURI().toURL();
+ urls.add(url);
+ } catch (MalformedURLException e) {
+ logger.error(e);
+ }
+ }
+ }
+
+ urlClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader);
+ }
+ else {
+ logger.error("Library path " + propertiesDir + " not found.");
+ continue;
+ }
+
+ Class<?> alerterClass = null;
+ try {
+ alerterClass = urlClassLoader.loadClass(pluginClass);
+ }
+ catch (ClassNotFoundException e) {
+ logger.error("Class " + pluginClass + " not found.");
+ continue;
+ }
+
+ String source = FileIOUtils.getSourcePathFromClass(alerterClass);
+ logger.info("Source jar " + source);
+ jarPaths.add("jar:file:" + source);
+
+ Constructor<?> constructor = null;
+ try {
+ constructor = alerterClass.getConstructor(Props.class);
+ } catch (NoSuchMethodException e) {
+ logger.error("Constructor not found in " + pluginClass);
+ continue;
+ }
+
+ Object obj = null;
+ try {
+ obj = constructor.newInstance(pluginProps);
+ } catch (Exception e) {
+ logger.error(e);
+ }
+
+ if (!(obj instanceof Alerter)) {
+ logger.error("The object is not an Alerter");
+ continue;
+ }
+
+ Alerter plugin = (Alerter) obj;
+ installedAlerterPlugins.put(pluginName, plugin);
+ }
+
+ return installedAlerterPlugins;
+
+ }
+
+ private void loadPluginCheckersAndActions(String pluginPath) {
+ logger.info("Loading plug-in checker and action types");
+ File triggerPluginPath = new File(pluginPath);
+ if (!triggerPluginPath.exists()) {
+ logger.error("plugin path " + pluginPath + " doesn't exist!");
+ return;
+ }
+
+ ClassLoader parentLoader = this.getClassLoader();
+ File[] pluginDirs = triggerPluginPath.listFiles();
+ ArrayList<String> jarPaths = new ArrayList<String>();
+ for (File pluginDir: pluginDirs) {
+ if (!pluginDir.exists()) {
+ logger.error("Error! Trigger plugin path " + pluginDir.getPath() + " doesn't exist.");
+ continue;
+ }
+
+ if (!pluginDir.isDirectory()) {
+ logger.error("The plugin path " + pluginDir + " is not a directory.");
+ continue;
+ }
+
+ // Load the conf directory
+ File propertiesDir = new File(pluginDir, "conf");
+ Props pluginProps = null;
+ if (propertiesDir.exists() && propertiesDir.isDirectory()) {
+ File propertiesFile = new File(propertiesDir, "plugin.properties");
+ File propertiesOverrideFile = new File(propertiesDir, "override.properties");
+
+ if (propertiesFile.exists()) {
+ if (propertiesOverrideFile.exists()) {
+ pluginProps = PropsUtils.loadProps(null, propertiesFile, propertiesOverrideFile);
+ }
+ else {
+ pluginProps = PropsUtils.loadProps(null, propertiesFile);
+ }
+ }
+ else {
+ logger.error("Plugin conf file " + propertiesFile + " not found.");
+ continue;
+ }
+ }
+ else {
+ logger.error("Plugin conf path " + propertiesDir + " not found.");
+ continue;
+ }
+
+ List<String> extLibClasspath = pluginProps.getStringList("trigger.external.classpaths", (List<String>)null);
+
+ String pluginClass = pluginProps.getString("trigger.class");
+ if (pluginClass == null) {
+ logger.error("Trigger class is not set.");
+ }
+ else {
+ logger.error("Plugin class " + pluginClass);
+ }
+
+ URLClassLoader urlClassLoader = null;
+ File libDir = new File(pluginDir, "lib");
+ if (libDir.exists() && libDir.isDirectory()) {
+ File[] files = libDir.listFiles();
+
+ ArrayList<URL> urls = new ArrayList<URL>();
+ for (int i=0; i < files.length; ++i) {
+ try {
+ URL url = files[i].toURI().toURL();
+ urls.add(url);
+ } catch (MalformedURLException e) {
+ logger.error(e);
+ }
+ }
+ if (extLibClasspath != null) {
+ for (String extLib : extLibClasspath) {
+ try {
+ File file = new File(pluginDir, extLib);
+ URL url = file.toURI().toURL();
+ urls.add(url);
+ } catch (MalformedURLException e) {
+ logger.error(e);
+ }
+ }
+ }
+
+ urlClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader);
+ }
+ else {
+ logger.error("Library path " + propertiesDir + " not found.");
+ continue;
+ }
+
+ Class<?> triggerClass = null;
+ try {
+ triggerClass = urlClassLoader.loadClass(pluginClass);
+ }
+ catch (ClassNotFoundException e) {
+ logger.error("Class " + pluginClass + " not found.");
+ continue;
+ }
+ String source = FileIOUtils.getSourcePathFromClass(triggerClass);
+ logger.info("Source jar " + source);
+ jarPaths.add("jar:file:" + source);
+
+ try {
+ Utils.invokeStaticMethod(urlClassLoader, pluginClass, "initiateCheckerTypes", pluginProps, app);
+ } catch (Exception e) {
+ logger.error("Unable to initiate checker types for " + pluginClass);
+ continue;
+ }
+
+ try {
+ Utils.invokeStaticMethod(urlClassLoader, pluginClass, "initiateActionTypes", pluginProps, app);
+ } catch (Exception e) {
+ logger.error("Unable to initiate action types for " + pluginClass);
+ continue;
+ }
+
+ }
}
/**
@@ -347,7 +622,7 @@ public class AzkabanWebServer extends AzkabanServer {
/**
*
*/
- public ExecutorManager getExecutorManager() {
+ public ExecutorManagerAdapter getExecutorManager() {
return executorManager;
}
@@ -530,12 +805,13 @@ public class AzkabanWebServer extends AzkabanServer {
Map<String, TriggerPlugin> triggerPlugins = loadTriggerPlugins(root, triggerPluginDir, app);
app.setTriggerPlugins(triggerPlugins);
// always have basic time trigger
- app.getTriggerManager().addTriggerAgent(app.getScheduleManager().getTriggerSource(), app.getScheduleManager());
+ //TODO: find something else to do the job
+// app.getTriggerManager().addTriggerAgent(app.getScheduleManager().getTriggerSource(), app.getScheduleManager());
// add additional triggers
- for(TriggerPlugin plugin : triggerPlugins.values()) {
- TriggerAgent agent = plugin.getAgent();
- app.getTriggerManager().addTriggerAgent(agent.getTriggerSource(), agent);
- }
+// for(TriggerPlugin plugin : triggerPlugins.values()) {
+// TriggerAgent agent = plugin.getAgent();
+// app.getTriggerManager().addTriggerAgent(agent.getTriggerSource(), agent);
+// }
// fire up
app.getTriggerManager().start();
@@ -939,7 +1215,11 @@ public class AzkabanWebServer extends AzkabanServer {
registerMbean("jetty", new JmxJettyServer(server));
registerMbean("triggerManager", new JmxTriggerManager(triggerManager));
- registerMbean("executorManager", new JmxExecutorManager(executorManager));
+// if(executorManager instanceof ExecutorManagerLocalAdapter) {
+// registerMbean("executorManager", new JmxExecutorManager(((ExecutorManagerLocalAdapter)executorManager).getExecutorManager()));
+// }
+// registerMbean("executorManager", new JmxExecutorManager(executorManager));
+ registerMbean("executorManager", new JmxExecutorManagerAdapter(executorManager));
}
public void close() {
@@ -952,6 +1232,9 @@ public class AzkabanWebServer extends AzkabanServer {
logger.error("Failed to cleanup MBeanServer", e);
}
scheduleManager.shutdown();
+// if(executorManager instanceof ExecutorManagerLocalAdapter) {
+// ((ExecutorManagerLocalAdapter)executorManager).getExecutorManager().shutdown();
+// }
executorManager.shutdown();
}
diff --git a/src/java/azkaban/webapp/servlet/AbstractServiceServlet.java b/src/java/azkaban/webapp/servlet/AbstractServiceServlet.java
new file mode 100644
index 0000000..eb1013e
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/AbstractServiceServlet.java
@@ -0,0 +1,91 @@
+package azkaban.webapp.servlet;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.codehaus.jackson.map.ObjectMapper;
+import azkaban.webapp.AzkabanServer;
+
+public class AbstractServiceServlet extends HttpServlet{
+
+ private static final long serialVersionUID = 1L;
+ public static final String JSON_MIME_TYPE = "application/json";
+
+ private AzkabanServer application;
+
+ @Override
+ public void init(ServletConfig config) throws ServletException {
+ application = (AzkabanServer) config.getServletContext().getAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
+
+ if (application == null) {
+ throw new IllegalStateException(
+ "No batch application is defined in the servlet context!");
+ }
+ }
+
+ protected void writeJSON(HttpServletResponse resp, Object obj) throws IOException {
+ resp.setContentType(JSON_MIME_TYPE);
+ ObjectMapper mapper = new ObjectMapper();
+ OutputStream stream = resp.getOutputStream();
+ mapper.writeValue(stream, obj);
+ }
+
+ public boolean hasParam(HttpServletRequest request, String param) {
+ return request.getParameter(param) != null;
+ }
+
+ public String getParam(HttpServletRequest request, String name)
+ throws ServletException {
+ String p = request.getParameter(name);
+ if (p == null)
+ throw new ServletException("Missing required parameter '" + name + "'.");
+ else
+ return p;
+ }
+
+ public String getParam(HttpServletRequest request, String name, String defaultVal ) {
+ String p = request.getParameter(name);
+ if (p == null) {
+ return defaultVal;
+ }
+
+ return p;
+ }
+
+ public int getIntParam(HttpServletRequest request, String name) throws ServletException {
+ String p = getParam(request, name);
+ return Integer.parseInt(p);
+ }
+
+ public int getIntParam(HttpServletRequest request, String name, int defaultVal) {
+ if (hasParam(request, name)) {
+ try {
+ return getIntParam(request, name);
+ } catch (Exception e) {
+ return defaultVal;
+ }
+ }
+ return defaultVal;
+ }
+
+ public long getLongParam(HttpServletRequest request, String name) throws ServletException {
+ String p = getParam(request, name);
+ return Long.parseLong(p);
+ }
+
+ public long getLongParam(HttpServletRequest request, String name, long defaultVal) {
+ if (hasParam(request, name)) {
+ try {
+ return getLongParam(request, name);
+ } catch (Exception e) {
+ return defaultVal;
+ }
+ }
+ return defaultVal;
+ }
+
+}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index edae8a6..0bbd585 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -30,6 +30,7 @@ import javax.servlet.http.HttpServletResponse;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
@@ -51,7 +52,7 @@ import azkaban.webapp.session.Session;
public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
private ProjectManager projectManager;
- private ExecutorManager executorManager;
+ private ExecutorManagerAdapter executorManager;
private ScheduleManager scheduleManager;
private ExecutorVelocityHelper velocityHelper;
@@ -768,7 +769,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
try {
- String message = executorManager.submitExecutableFlow(exflow);
+ String message = executorManager.submitExecutableFlow(exflow, user.getUserId());
ret.put("message", message);
}
catch (ExecutorManagerException e) {
diff --git a/src/java/azkaban/webapp/servlet/HistoryServlet.java b/src/java/azkaban/webapp/servlet/HistoryServlet.java
index aa6d2ad..0be4206 100644
--- a/src/java/azkaban/webapp/servlet/HistoryServlet.java
+++ b/src/java/azkaban/webapp/servlet/HistoryServlet.java
@@ -33,6 +33,7 @@ import org.joda.time.format.DateTimeFormat;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
@@ -42,7 +43,7 @@ import azkaban.webapp.session.Session;
public class HistoryServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
- private ExecutorManager executorManager;
+ private ExecutorManagerAdapter executorManager;
private ProjectManager projectManager;
private ExecutorVMHelper vmHelper;
diff --git a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
index 2d4dff5..601abda 100644
--- a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
+++ b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
@@ -17,6 +17,7 @@ import org.apache.log4j.Logger;
import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.triggerapp.TriggerConnectorParams;
import azkaban.trigger.TriggerManager;
import azkaban.user.Permission;
@@ -39,7 +40,7 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
private UserManager userManager;
private AzkabanWebServer server;
- private ExecutorManager executorManager;
+ private ExecutorManagerAdapter executorManager;
private TriggerManager triggerManager;
@Override
@@ -49,6 +50,7 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
server = (AzkabanWebServer)getApplication();
userManager = server.getUserManager();
executorManager = server.getExecutorManager();
+
triggerManager = server.getTriggerManager();
}
@@ -79,12 +81,11 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
if(!hasParam(req, JMX_MBEAN) || !hasParam(req, JMX_HOSTPORT)) {
ret.put("error", "Parameters '" + JMX_MBEAN + "' and '"+ JMX_HOSTPORT +"' must be set");
this.writeJSON(resp, ret, true);
- return;
+ return;
}
- String hostPort = getParam(req, JMX_HOSTPORT);
- String mbean = getParam(req, JMX_MBEAN);
- Map<String, Object> result = triggerManager.callTriggerServerJMX(hostPort, JMX_GET_ALL_MBEAN_ATTRIBUTES, mbean);
- ret = result;
+// String hostPort = getParam(req, JMX_HOSTPORT);
+// String mbean = getParam(req, JMX_MBEAN);
+ ret = triggerManager.getJMX().getAllJMXMbeans();
}
else if (JMX_GET_MBEANS.equals(ajax)) {
ret.put("mbeans", server.getMbeanNames());
@@ -175,17 +176,18 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
page.add("mbeans", server.getMbeanNames());
Map<String, Object> executorMBeans = new HashMap<String,Object>();
- Set<String> primaryServerHosts = executorManager.getPrimaryServerHosts();
+// Set<String> primaryServerHosts = executorManager.getPrimaryServerHosts();
for (String hostPort: executorManager.getAllActiveExecutorServerHosts()) {
try {
Map<String, Object> mbeans = executorManager.callExecutorJMX(hostPort, JMX_GET_MBEANS, null);
- if (primaryServerHosts.contains(hostPort)) {
- executorMBeans.put(hostPort, mbeans.get("mbeans"));
- }
- else {
- executorMBeans.put(hostPort, mbeans.get("mbeans"));
- }
+ executorMBeans.put(hostPort, mbeans.get("mbeans"));
+// if (primaryServerHosts.contains(hostPort)) {
+// executorMBeans.put(hostPort, mbeans.get("mbeans"));
+// }
+// else {
+// executorMBeans.put(hostPort, mbeans.get("mbeans"));
+// }
}
catch (IOException e) {
logger.error("Cannot contact executor " + hostPort, e);
@@ -195,22 +197,23 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
page.add("executorRemoteMBeans", executorMBeans);
Map<String, Object> triggerserverMBeans = new HashMap<String,Object>();
- Set<String> primaryTriggerServerHosts = triggerManager.getPrimaryServerHosts();
- for (String hostPort: triggerManager.getAllActiveTriggerServerHosts()) {
- try {
- Map<String, Object> mbeans = triggerManager.callTriggerServerJMX(hostPort, TriggerConnectorParams.JMX_GET_MBEANS, null);
-
- if (primaryTriggerServerHosts.contains(hostPort)) {
- triggerserverMBeans.put(hostPort, mbeans.get("mbeans"));
- }
- else {
- triggerserverMBeans.put(hostPort, mbeans.get("mbeans"));
- }
- }
- catch (IOException e) {
- logger.error("Cannot contact executor " + hostPort, e);
- }
- }
+// Set<String> primaryTriggerServerHosts = triggerManager.getPrimaryServerHosts();
+// for (String hostPort: triggerManager.getAllActiveTriggerServerHosts()) {
+// try {
+// Map<String, Object> mbeans = triggerManager.callTriggerServerJMX(hostPort, TriggerConnectorParams.JMX_GET_MBEANS, null);
+//
+// if (primaryTriggerServerHosts.contains(hostPort)) {
+// triggerserverMBeans.put(hostPort, mbeans.get("mbeans"));
+// }
+// else {
+// triggerserverMBeans.put(hostPort, mbeans.get("mbeans"));
+// }
+// }
+// catch (IOException e) {
+// logger.error("Cannot contact executor " + hostPort, e);
+// }
+// }
+ triggerserverMBeans.put(triggerManager.getJMX().getPrimaryServerHost(), triggerManager.getJMX().getAllJMXMbeans());
page.add("triggerserverRemoteMBeans", triggerserverMBeans);
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 989cca9..5be504b 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -45,6 +45,7 @@ import org.apache.log4j.Logger;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableJobInfo;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
@@ -76,7 +77,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
private static final String LOCKDOWN_CREATE_PROJECTS_KEY = "lockdown.create.projects";
private ProjectManager projectManager;
- private ExecutorManager executorManager;
+ private ExecutorManagerAdapter executorManager;
private ScheduleManager scheduleManager;
private UserManager userManager;
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 1ef2cef..aab3bde 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -44,6 +44,7 @@ import org.joda.time.format.DateTimeFormat;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.flow.Node;
@@ -252,7 +253,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
ExecutionOptions flowOptions = sched.getExecutionOptions();
if(slaOptions != null && slaOptions.size() > 0) {
- ret.put("slaEmails", slaOptions.get(0).getInfo().get("SlaEmails"));
+ ret.put("slaEmails", slaOptions.get(0).getInfo().get(SlaOption.INFO_EMAIL_LIST));
List<Object> setObj = new ArrayList<Object>();
for(SlaOption sla: slaOptions) {
@@ -473,7 +474,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
List<ExecutableFlow> history = null;
try {
AzkabanWebServer server = (AzkabanWebServer) getApplication();
- ExecutorManager executorManager = server.getExecutorManager();
+ ExecutorManagerAdapter executorManager = server.getExecutorManager();
history = executorManager.getExecutableFlows(null, null, null, 0, startTime, endTime, -1, -1);
} catch (ExecutorManagerException e) {
// Return empty should suffice
diff --git a/src/java/azkaban/webapp/servlet/velocity/nav.vm b/src/java/azkaban/webapp/servlet/velocity/nav.vm
index 9326d81..0ecef8e 100644
--- a/src/java/azkaban/webapp/servlet/velocity/nav.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/nav.vm
@@ -25,7 +25,9 @@
<ul id="nav" class="nav">
<li id="all-jobs-tab" #if($current_page == 'all')class="selected"#end onClick="navMenuClick('$!context/')"><a href="$!context/">Projects</a></li>
<li id="scheduled-jobs-tab" #if($current_page == 'schedule')class="selected"#end onClick="navMenuClick('$!context/schedule')"><a href="$!context/schedule">Scheduled</a></li>
- <li id="triggers-tab" #if($current_page == 'triggers')class="selected"#end onClick="navMenuClick('$!context/triggers')"><a href="$!context/triggers">Triggers</a></li>
+
+ <!--li id="triggers-tab" #if($current_page == 'triggers')class="selected"#end onClick="navMenuClick('$!context/triggers')"><a href="$!context/triggers">Triggers</a></li-->
+
<li id="executing-jobs-tab" #if($current_page == 'executing')class="selected"#end onClick="navMenuClick('$!context/executor')"><a href="$!context/executor">Executing</a></li>
<li id="history-jobs-tab" #if($current_page == 'history')class="selected"#end onClick="navMenuClick('$!context/history')"><a href="$!context/history">History</a></li>
diff --git a/src/package/triggerserver/bin/azkaban-trigger-shutdown.sh b/src/package/triggerserver/bin/azkaban-trigger-shutdown.sh
new file mode 100755
index 0000000..3dda364
--- /dev/null
+++ b/src/package/triggerserver/bin/azkaban-trigger-shutdown.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+azkaban_dir=$(dirname $0)/..
+
+triggerport=`cat $azkaban_dir/conf/azkaban.properties | grep trigger.port | cut -d = -f 2`
+echo "Shutting down current running AzkabanTriggerServer at port $triggerport"
+
+proc=`cat $azkaban_dir/currentpid`
+
+kill $proc
+
+cat /dev/null > $azkaban_dir/currentpid
diff --git a/src/package/triggerserver/bin/azkaban-trigger-start.sh b/src/package/triggerserver/bin/azkaban-trigger-start.sh
new file mode 100755
index 0000000..00e1077
--- /dev/null
+++ b/src/package/triggerserver/bin/azkaban-trigger-start.sh
@@ -0,0 +1,37 @@
+azkaban_dir=$(dirname $0)/..
+
+if [[ -z "$tmpdir" ]]; then
+tmpdir=temp
+fi
+
+for file in $azkaban_dir/lib/*.jar;
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $azkaban_dir/extlib/*.jar;
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $azkaban_dir/plugins/*/*.jar;
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
+echo $azkaban_dir;
+echo $CLASSPATH;
+
+triggerport=`cat $azkaban_dir/conf/azkaban.properties | grep trigger.port | cut -d = -f 2`
+echo "Starting AzkabanTriggerServer on port $triggerport ..."
+serverpath=`pwd`
+
+if [ -z $AZKABAN_OPTS ]; then
+ AZKABAN_OPTS=-Xmx3G
+fi
+AZKABAN_OPTS=$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dtriggerport=$triggerport -Dserverpath=$serverpath
+
+java $AZKABAN_OPTS -cp $CLASSPATH azkaban.triggerapp.AzkabanTriggerServer -conf $azkaban_dir/conf $@ &
+
+echo $! > currentpid
+
diff --git a/src/package/triggerserver/conf/azkaban.private.properties b/src/package/triggerserver/conf/azkaban.private.properties
new file mode 100644
index 0000000..cce1792
--- /dev/null
+++ b/src/package/triggerserver/conf/azkaban.private.properties
@@ -0,0 +1 @@
+# Optional Properties that are hidden to the executions
\ No newline at end of file
diff --git a/src/package/triggerserver/conf/azkaban.properties b/src/package/triggerserver/conf/azkaban.properties
new file mode 100644
index 0000000..3504854
--- /dev/null
+++ b/src/package/triggerserver/conf/azkaban.properties
@@ -0,0 +1,18 @@
+#Azkaban
+default.timezone.id=America/Los_Angeles
+
+#Loader for projects
+azkaban.project.dir=projects
+
+database.type=mysql
+mysql.port=3306
+mysql.host=localhost
+mysql.database=azkaban2
+mysql.user=azkaban
+mysql.password=azkaban
+mysql.numconnections=100
+
+# Azkaban Executor settings
+trigger.server.maxThreads=50
+trigger.server.port=22321
+jetty.hostname=eat1-spadesaz01.grid.linkedin.com
diff --git a/src/package/triggerserver/conf/global.properties b/src/package/triggerserver/conf/global.properties
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/package/triggerserver/conf/global.properties
diff --git a/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java b/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
index d65ce7b..dc9b970 100644
--- a/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
+++ b/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
@@ -26,7 +26,7 @@ public class BasicTimeCheckerTest {
DateTime now = DateTime.now();
ReadablePeriod period = Utils.parsePeriodString("10s");
- BasicTimeChecker timeChecker = new BasicTimeChecker("BasicTimeChecket_1", now, now.getZone(), true, true, period);
+ BasicTimeChecker timeChecker = new BasicTimeChecker("BasicTimeChecket_1", now.getMillis(), now.getZone(), true, true, period);
checkers.put(timeChecker.getId(), timeChecker);
String expr = timeChecker.getId() + ".eval()";
diff --git a/unit/java/azkaban/test/trigger/ConditionTest.java b/unit/java/azkaban/test/trigger/ConditionTest.java
index 09510cf..c56edab 100644
--- a/unit/java/azkaban/test/trigger/ConditionTest.java
+++ b/unit/java/azkaban/test/trigger/ConditionTest.java
@@ -60,7 +60,7 @@ public class ConditionTest {
String period = "6s";
//BasicTimeChecker timeChecker = new BasicTimeChecker(now, true, true, period);
- ConditionChecker timeChecker = new BasicTimeChecker("BasicTimeChecker_1", now, now.getZone(), true, true, Utils.parsePeriodString(period));
+ ConditionChecker timeChecker = new BasicTimeChecker("BasicTimeChecker_1", now.getMillis(), now.getZone(), true, true, Utils.parsePeriodString(period));
System.out.println("checker id is " + timeChecker.getId());
checkers.put(timeChecker.getId(), timeChecker);
diff --git a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
index 9a06f56..82a9796 100644
--- a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
+++ b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
@@ -32,6 +32,7 @@ import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAction;
import azkaban.trigger.TriggerException;
import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerLoaderException;
import azkaban.trigger.TriggerManagerException;
import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.trigger.builtin.ExecuteFlowAction;
@@ -137,7 +138,7 @@ public class JdbcTriggerLoaderTest {
}
@Test
- public void addTriggerTest() throws TriggerManagerException {
+ public void addTriggerTest() throws TriggerLoaderException {
Trigger t1 = createTrigger("testProj1", "testFlow1", "source1");
Trigger t2 = createTrigger("testProj2", "testFlow2", "source2");
loader.addTrigger(t1);
@@ -159,7 +160,7 @@ public class JdbcTriggerLoaderTest {
}
@Test
- public void removeTriggerTest() throws TriggerManagerException {
+ public void removeTriggerTest() throws TriggerLoaderException {
Trigger t1 = createTrigger("testProj1", "testFlow1", "source1");
Trigger t2 = createTrigger("testProj2", "testFlow2", "source2");
loader.addTrigger(t1);
@@ -173,7 +174,7 @@ public class JdbcTriggerLoaderTest {
}
@Test
- public void updateTriggerTest() throws TriggerManagerException {
+ public void updateTriggerTest() throws TriggerLoaderException {
Trigger t1 = createTrigger("testProj1", "testFlow1", "source1");
t1.setResetOnExpire(true);
loader.addTrigger(t1);
@@ -187,7 +188,7 @@ public class JdbcTriggerLoaderTest {
private Trigger createTrigger(String projName, String flowName, String source) {
DateTime now = DateTime.now();
- ConditionChecker checker1 = new BasicTimeChecker("timeChecker1", now, now.getZone(), true, true, Utils.parsePeriodString("1h"));
+ ConditionChecker checker1 = new BasicTimeChecker("timeChecker1", now.getMillis(), now.getZone(), true, true, Utils.parsePeriodString("1h"));
Map<String, ConditionChecker> checkers1 = new HashMap<String, ConditionChecker>();
checkers1.put(checker1.getId(), checker1);
String expr1 = checker1.getId() + ".eval()";
@@ -196,7 +197,7 @@ public class JdbcTriggerLoaderTest {
List<TriggerAction> actions = new ArrayList<TriggerAction>();
TriggerAction action = new ExecuteFlowAction("executeAction", 1, projName, flowName, "azkaban", new ExecutionOptions(), null);
actions.add(action);
- Trigger t = new Trigger(now, now, "azkaban", source, triggerCond, expireCond, actions);
+ Trigger t = new Trigger(now.getMillis(), now.getMillis(), "azkaban", source, triggerCond, expireCond, actions);
return t;
}
diff --git a/unit/java/azkaban/test/trigger/ThresholdChecker.java b/unit/java/azkaban/test/trigger/ThresholdChecker.java
index c25f566..4e6e85c 100644
--- a/unit/java/azkaban/test/trigger/ThresholdChecker.java
+++ b/unit/java/azkaban/test/trigger/ThresholdChecker.java
@@ -95,5 +95,11 @@ public class ThresholdChecker implements ConditionChecker{
}
+ @Override
+ public long getNextCheckTime() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerTest.java b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
index 508436b..fbd5da7 100644
--- a/unit/java/azkaban/test/trigger/TriggerManagerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
@@ -20,6 +20,7 @@ import azkaban.trigger.TriggerAction;
import azkaban.trigger.ActionTypeLoader;
import azkaban.trigger.TriggerException;
import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerLoaderException;
import azkaban.trigger.TriggerManager;
import azkaban.trigger.TriggerManagerException;
import azkaban.utils.Props;
@@ -48,8 +49,8 @@ public class TriggerManagerTest {
props.put("trigger.scan.interval", 4000);
TriggerManager triggerManager = new TriggerManager(props, triggerLoader);
- triggerManager.getCheckerLoader().registerCheckerType(ThresholdChecker.type, ThresholdChecker.class);
- triggerManager.getActionLoader().registerActionType(DummyTriggerAction.type, DummyTriggerAction.class);
+ triggerManager.registerCheckerType(ThresholdChecker.type, ThresholdChecker.class);
+ triggerManager.registerActionType(DummyTriggerAction.type, DummyTriggerAction.class);
ThresholdChecker.setVal(1);
@@ -114,19 +115,19 @@ public class TriggerManagerTest {
private int idIndex = 0;
@Override
- public void addTrigger(Trigger t) throws TriggerManagerException {
+ public void addTrigger(Trigger t) throws TriggerLoaderException {
t.setTriggerId(idIndex++);
triggers.put(t.getTriggerId(), t);
}
@Override
- public void removeTrigger(Trigger s) throws TriggerManagerException {
+ public void removeTrigger(Trigger s) throws TriggerLoaderException {
triggers.remove(s.getTriggerId());
}
@Override
- public void updateTrigger(Trigger t) throws TriggerManagerException {
+ public void updateTrigger(Trigger t) throws TriggerLoaderException {
triggers.put(t.getTriggerId(), t);
}
@@ -137,14 +138,14 @@ public class TriggerManagerTest {
@Override
public Trigger loadTrigger(int triggerId)
- throws TriggerManagerException {
+ throws TriggerLoaderException {
// TODO Auto-generated method stub
return null;
}
@Override
public List<Trigger> getUpdatedTriggers(long lastUpdateTime)
- throws TriggerManagerException {
+ throws TriggerLoaderException {
// TODO Auto-generated method stub
return null;
}
@@ -166,7 +167,7 @@ public class TriggerManagerTest {
Condition triggerCond = new Condition(checkers, expr);
Condition expireCond = new Condition(checkers, expr);
- Trigger fakeTrigger = new Trigger(DateTime.now(), DateTime.now(), "azkaban", source, triggerCond, expireCond, actions);
+ Trigger fakeTrigger = new Trigger(DateTime.now().getMillis(), DateTime.now().getMillis(), "azkaban", source, triggerCond, expireCond, actions);
fakeTrigger.setResetOnTrigger(true);
fakeTrigger.setResetOnExpire(true);
diff --git a/unit/java/azkaban/test/trigger/TriggerTest.java b/unit/java/azkaban/test/trigger/TriggerTest.java
index 790fcb4..22f6532 100644
--- a/unit/java/azkaban/test/trigger/TriggerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerTest.java
@@ -45,7 +45,7 @@ public class TriggerTest {
@Test
public void jsonConversionTest() throws Exception {
DateTime now = DateTime.now();
- ConditionChecker checker1 = new BasicTimeChecker("timeChecker1", now, now.getZone(), true, true, Utils.parsePeriodString("1h"));
+ ConditionChecker checker1 = new BasicTimeChecker("timeChecker1", now.getMillis(), now.getZone(), true, true, Utils.parsePeriodString("1h"));
Map<String, ConditionChecker> checkers1 = new HashMap<String, ConditionChecker>();
checkers1.put(checker1.getId(), checker1);
String expr1 = checker1.getId() + ".eval()";
@@ -54,7 +54,7 @@ public class TriggerTest {
List<TriggerAction> actions = new ArrayList<TriggerAction>();
TriggerAction action = new ExecuteFlowAction("executeAction", 1, "testProj", "testFlow", "azkaban", new ExecutionOptions(), null);
actions.add(action);
- Trigger t = new Trigger(now, now, "azkaban", "test", triggerCond, expireCond, actions);
+ Trigger t = new Trigger(now.getMillis(), now.getMillis(), "azkaban", "test", triggerCond, expireCond, actions);
File temp = File.createTempFile("temptest", "temptest");
temp.deleteOnExit();