/*
* Copyright 2012 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.trigger;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.lang.Thread.State;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.http.client.HttpClient;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
import azkaban.triggerapp.TriggerConnectorParams;
import azkaban.triggerapp.TriggerRunnerManager;
import azkaban.triggerapp.TriggerRunnerManagerAdapter;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
/**
* Executor manager used to manage the client side job.
*
*/
public class TriggerManager {
private static Logger logger = Logger.getLogger(TriggerManager.class);
public static final String TRIGGER_SUFFIX = ".trigger";
private TriggerLoader triggerLoader;
private CheckerTypeLoader checkerTypeLoader;
private ActionTypeLoader actionTypeLoader;
private String triggerServerHost;
private int triggerServerPort;
private TriggerRunnerManagerAdapter trmAdapter;
private Set<Pair<String, Integer>> triggerServers = new HashSet<Pair<String,Integer>>();
private ConcurrentHashMap<Integer, Trigger> triggerIdMap = new ConcurrentHashMap<Integer, Trigger>();
private Map<String, TriggerAgent> triggerAgents = new HashMap<String, TriggerAgent>();
private TriggerManagerUpdaterThread triggerManagingThread;
private long lastThreadCheckTime = -1;
private long lastUpdateTime = -1;
public TriggerManager(Props props, TriggerLoader loader) throws TriggerManagerException {
this.triggerLoader = loader;
this.checkerTypeLoader = new CheckerTypeLoader();
this.actionTypeLoader = new ActionTypeLoader();
String trmMode = props.getString("trigger.runner.manager.mode", "local");
try {
if(trmMode.equals("local")) {
trmAdapter = loadTRMLocalAdapter(props, loader);
} else if(trmMode.equals("remote")) {
trmAdapter = loadTRMRemoteAdapter(props);
} else {
throw new TriggerManagerException("Unknown trigger runner manager mode " + trmMode);
}
} catch(Exception e) {
throw new TriggerManagerException("Failed to load Trigger Runner Manager: " + e.getMessage());
}
triggerServerHost = props.getString("trigger.server.host", "localhost");
triggerServerPort = props.getInt("trigger.server.port");
triggerManagingThread = new TriggerManagerUpdaterThread();
try{
checkerTypeLoader.init(props);
actionTypeLoader.init(props);
} catch(Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
}
Condition.setCheckerLoader(checkerTypeLoader);
Trigger.setActionTypeLoader(actionTypeLoader);
triggerServers.add(new Pair<String, Integer>(triggerServerHost, triggerServerPort));
}
private TriggerRunnerManagerAdapter loadTRMLocalAdapter(Props props, TriggerLoader loader) throws IOException {
return new TriggerRunnerManager(props, loader);
}
private TriggerRunnerManagerAdapter loadTRMRemoteAdapter(Props props) {
return null;
}
public void start() throws Exception {
loadTriggers();
for(TriggerAgent agent : triggerAgents.values()) {
agent.start();
}
triggerManagingThread.start();
}
private static class SuffixFilter implements FileFilter {
private String suffix;
public SuffixFilter(String suffix) {
this.suffix = suffix;
}
@Override
public boolean accept(File pathname) {
String name = pathname.getName();
return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
}
}
public String getTriggerServerHost() {
return triggerServerHost;
}
public int getTriggerServerPort() {
return triggerServerPort;
}
public State getUpdaterThreadState() {
return triggerManagingThread.getState();
}
public boolean isThreadActive() {
return triggerManagingThread.isAlive();
}
public long getLastThreadCheckTime() {
return lastThreadCheckTime;
}
public Set<String> getPrimaryServerHosts() {
// Only one for now. More probably later.
HashSet<String> ports = new HashSet<String>();
ports.add(triggerServerHost + ":" + triggerServerPort);
return ports;
}
private void loadTriggers() throws TriggerManagerException {
List<Trigger> triggerList;
try {
triggerList = triggerLoader.loadTriggers();
} catch (TriggerLoaderException e) {
throw new TriggerManagerException(e);
}
for(Trigger t : triggerList) {
if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
removeTrigger(t, "azkaban");
} else {
triggerIdMap.put(t.getTriggerId(), t);
}
}
}
public Trigger getTrigger(int triggerId) {
return triggerIdMap.get(triggerId);
}
public void removeTrigger(Trigger t, String userId) throws TriggerManagerException {
synchronized(t) {
logger.info("Removing trigger " + t.getTriggerId() + " by " + userId);
callTriggerServer(t, TriggerConnectorParams.REMOVE_TRIGGER_ACTION, userId);
triggerIdMap.remove(t.getTriggerId());
}
}
public void updateTrigger(Trigger t, String userId) throws TriggerManagerException {
synchronized(t) {
try {
triggerLoader.updateTrigger(t);
callTriggerServer(t, TriggerConnectorParams.UPDATE_TRIGGER_ACTION, userId);
} catch(Exception e) {
throw new TriggerManagerException(e);
}
}
}
// public void getUpdatedTriggers() throws TriggerManagerException {
// try {
// callTriggerServer(triggerServerHost, triggerServerPort, TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", (Pair<String,String>[])null);
// } catch(IOException e) {
// throw new TriggerManagerException(e);
// }
// }
public String insertTrigger(Trigger t, String userId) throws TriggerManagerException {
synchronized(t) {
String message = null;
logger.info("Inserting trigger into system. " );
// The trigger id is set by the loader. So it's unavailable until after this call.
t.setStatus(TriggerStatus.PREPARING);
try {
triggerLoader.addTrigger(t);
callTriggerServer(t, TriggerConnectorParams.INSERT_TRIGGER_ACTION, userId);
triggerIdMap.put(t.getTriggerId(), t);
message += "Trigger inserted successfully with trigger id " + t.getTriggerId();
}
catch (Exception e) {
throw new TriggerManagerException(e);
}
return message;
}
}
private Map<String, Object> callTriggerServer(Trigger t, String action, String user) throws TriggerManagerException {
try {
return callTriggerServer(triggerServerHost, triggerServerPort, action, t.getTriggerId(), user, (Pair<String,String>[])null);
} catch (IOException e) {
throw new TriggerManagerException(e);
}
}
private Map<String, Object> callTriggerServer(String host, int port, String action, Integer triggerId, String user, Pair<String,String> ... params) throws IOException {
URIBuilder builder = new URIBuilder();
builder.setScheme("http")
.setHost(host)
.setPort(port)
.setPath("/trigger");
builder.setParameter(TriggerConnectorParams.ACTION_PARAM, action);
if (triggerId != null) {
builder.setParameter(TriggerConnectorParams.TRIGGER_ID_PARAM,String.valueOf(triggerId));
}
if (user != null) {
builder.setParameter(TriggerConnectorParams.USER_PARAM, user);
}
if (params != null) {
for (Pair<String, String> pair: params) {
builder.setParameter(pair.getFirst(), pair.getSecond());
}
}
URI uri = null;
try {
uri = builder.build();
} catch (URISyntaxException e) {
throw new IOException(e);
}
ResponseHandler<String> responseHandler = new BasicResponseHandler();
HttpClient httpclient = new DefaultHttpClient();
HttpGet httpget = new HttpGet(uri);
String response = null;
try {
response = httpclient.execute(httpget, responseHandler);
} catch (IOException e) {
throw e;
}
finally {
httpclient.getConnectionManager().shutdown();
}
@SuppressWarnings("unchecked")
Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
if (error != null) {
throw new IOException(error);
}
return jsonResponse;
}
public Map<String, Object> callTriggerServerJMX(String hostPort, String action, String mBean) throws IOException {
URIBuilder builder = new URIBuilder();
String[] hostPortSplit = hostPort.split(":");
builder.setScheme("http")
.setHost(hostPortSplit[0])
.setPort(Integer.parseInt(hostPortSplit[1]))
.setPath("/jmx");
builder.setParameter(action, "");
if (mBean != null) {
builder.setParameter(TriggerConnectorParams.JMX_MBEAN, mBean);
}
URI uri = null;
try {
uri = builder.build();
} catch (URISyntaxException e) {
throw new IOException(e);
}
ResponseHandler<String> responseHandler = new BasicResponseHandler();
HttpClient httpclient = new DefaultHttpClient();
HttpGet httpget = new HttpGet(uri);
String response = null;
try {
response = httpclient.execute(httpget, responseHandler);
} catch (IOException e) {
throw e;
}
finally {
httpclient.getConnectionManager().shutdown();
}
@SuppressWarnings("unchecked")
Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
if (error != null) {
throw new IOException(error);
}
return jsonResponse;
}
public void shutdown() {
triggerManagingThread.shutdown();
}
private class TriggerManagerUpdaterThread extends Thread {
private boolean shutdown = false;
public TriggerManagerUpdaterThread() {
this.setName("TriggerManagingThread");
}
private int waitTimeIdleMs = 2000;
private int waitTimeMs = 500;
private void shutdown() {
shutdown = true;
}
@SuppressWarnings("unchecked")
public void run() {
while(!shutdown) {
try {
lastThreadCheckTime = System.currentTimeMillis();
Pair<String, Integer> triggerServer = (Pair<String, Integer>) triggerServers.toArray()[0];
Pair<String, String> updateTimeParam = new Pair<String, String>("lastUpdateTime", String.valueOf(lastUpdateTime));
Map<String, Object> results = null;
try{
results = callTriggerServer(triggerServer.getFirst(), triggerServer.getSecond(), TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", updateTimeParam);
// lastUpdateTime = (Long) results.get(TriggerConnectorParams.RESPONSE_UPDATETIME);
List<Integer> updates = (List<Integer>) results.get("updates");
for(Integer update : updates) {
Trigger t = triggerLoader.loadTrigger(update);
lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
removeTrigger(t, "azkaban");
//triggerIdMap.remove(update);
} else {
triggerIdMap.put(update, t);
}
}
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
}
synchronized(this) {
try {
if (triggerIdMap.size() > 0) {
this.wait(waitTimeMs);
}
else {
this.wait(waitTimeIdleMs);
}
} catch (InterruptedException e) {
}
}
}
catch (Exception e) {
logger.error(e);
}
}
}
}
private static class ConnectionInfo {
private String host;
private int port;
public ConnectionInfo(String host, int port) {
this.host = host;
this.port = port;
}
@SuppressWarnings("unused")
private ConnectionInfo getOuterType() {
return ConnectionInfo.this;
}
public boolean isEqual(String host, int port) {
return this.port == port && this.host.equals(host);
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((host == null) ? 0 : host.hashCode());
result = prime * result + port;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ConnectionInfo other = (ConnectionInfo) obj;
if (host == null) {
if (other.host != null)
return false;
} else if (!host.equals(other.host))
return false;
if (port != other.port)
return false;
return true;
}
}
public void loadTriggerFromDir(File baseDir, Props props) throws TriggerManagerException {
File[] triggerFiles = baseDir.listFiles(new SuffixFilter(TRIGGER_SUFFIX));
for(File triggerFile : triggerFiles) {
try{
Props triggerProps = new Props(props, triggerFile);
String triggerType = triggerProps.getString("trigger.type");
TriggerAgent agent = triggerAgents.get(triggerType);
if(agent != null) {
agent.loadTriggerFromProps(triggerProps);
} else {
throw new TriggerManagerException("Trigger " + triggerType + " is not supported.");
}
} catch (Exception e) {
throw new TriggerManagerException(e);
}
}
}
public List<Trigger> getTriggers() {
return new ArrayList<Trigger>(triggerIdMap.values());
}
public void expireTrigger(int triggerId) {
// TODO Auto-generated method stub
}
public CheckerTypeLoader getCheckerLoader() {
return checkerTypeLoader;
}
public ActionTypeLoader getActionLoader() {
return actionTypeLoader;
}
public void addTriggerAgent(String triggerSource,
TriggerAgent agent) {
triggerAgents.put(triggerSource, agent);
}
public List<Trigger> getTriggers(String triggerSource) {
List<Trigger> results = new ArrayList<Trigger>();
for(Trigger t : triggerIdMap.values()) {
if(t.getSource().equals(triggerSource)) {
results.add(t);
}
}
return results;
}
public List<Trigger> getUpdatedTriggers(String triggerSource, long lastUpdateTime) throws TriggerManagerException {
getUpdatedTriggers();
List<Trigger> triggers = new ArrayList<Trigger>();
for(Trigger t : triggerIdMap.values()) {
if(t.getSource().equals(triggerSource) && t.getLastModifyTime().getMillis() >= lastUpdateTime) {
triggers.add(t);
}
}
return triggers;
}
private void getUpdatedTriggers() throws TriggerManagerException {
List<Trigger> triggers;
try {
triggers = triggerLoader.getUpdatedTriggers(this.lastUpdateTime);
} catch (TriggerLoaderException e) {
throw new TriggerManagerException(e);
}
for(Trigger t : triggers) {
this.lastUpdateTime = Math.max(this.lastUpdateTime, t.getLastModifyTime().getMillis());
triggerIdMap.put(t.getTriggerId(), t);
}
}
public void removeTrigger(int triggerId, String submitUser) throws TriggerManagerException {
removeTrigger(triggerIdMap.get(triggerId), submitUser);
}
public Set<String> getAllActiveTriggerServerHosts() {
Set<String> hostport = new HashSet<String>();
hostport.add(triggerServerHost+":"+triggerServerPort);
return hostport;
}
public int getNumTriggers() {
return triggerIdMap.size();
}
public String getTriggerSources() {
Set<String> sources = new HashSet<String>();
for(Trigger t : triggerIdMap.values()) {
sources.add(t.getSource());
}
return sources.toString();
}
public String getTriggerIds() {
return triggerIdMap.keySet().toString();
}
}