azkaban-uncached

Details

README.md 4(+2 -2)

diff --git a/README.md b/README.md
index 7375cbe..d0a972f 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,5 @@
-# Azkaban2
+## Azkaban2
 
-For all Azkaban Plugins documentation, please go to
+For Azkaban documentation, please go to
 [Azkaban Project Site](http://azkaban.github.io/azkaban2/)
 There is a google groups: [Azkaban Group](https://groups.google.com/forum/?fromgroups#!forum/azkaban-dev)
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index d707c1f..09bf00a 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -389,12 +389,24 @@ public class FlowRunnerManager implements EventListener {
 			}
 		}
 
+		int numJobThreads = numJobThreadPerFlow;
+		if(options.getFlowParameters().containsKey("flow.num.job.threads")) {
+			try{
+				int numJobs = Integer.valueOf(options.getFlowParameters().get("flow.num.job.threads"));
+				if(numJobs > 0 && numJobs <= numJobThreads) {
+					numJobThreads = numJobs;
+				}
+			} catch (Exception e) {
+				throw new ExecutorManagerException("Failed to set the number of job threads " + options.getFlowParameters().get("flow.num.job.threads") + " for flow " + execId, e);
+			}
+		}
+		
 		FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
 		runner.setFlowWatcher(watcher)
 			.setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
 			.setValidateProxyUser(validateProxyUser)
 			.setGlobalProps(globalProps)
-			.setNumJobThreads(numJobThreadPerFlow)
+			.setNumJobThreads(numJobThreads)
 			.addListener(this);
 		
 		// Check again.
@@ -634,6 +646,12 @@ public class FlowRunnerManager implements EventListener {
 	public int getNumExecutingFlows() {
 		return runningFlows.size();
 	}
+	
+	public String getRunningFlowIds() {
+		List<Integer> ids = new ArrayList<Integer>(runningFlows.keySet());
+		Collections.sort(ids);
+		return ids.toString();
+	}
 
 	public int getNumExecutingJobs() {
 		int jobCount = 0;
@@ -643,5 +661,7 @@ public class FlowRunnerManager implements EventListener {
 		
 		return jobCount;
 	}
+
+	
 	
 }
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index a91a6d0..687dd71 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -40,7 +40,7 @@ public class ExecutionOptions {
 	private Set<String> initiallyDisabledJobs = new HashSet<String>();
 	
 	public void setFlowParameters(Map<String,String> flowParam) {
-		flowParameters.get(flowParam);
+		flowParameters.putAll(flowParam);
 	}
 	
 	public Map<String,String> getFlowParameters() {
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index c6c6f41..c7338cc 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -18,9 +18,22 @@ public class ExecutorMailer extends AbstractMailer {
 	
 	private boolean testMode = false;
 	
+	private int mailTimeout;
+	private int connectionTimeout;
+	
 	public ExecutorMailer(Props props) {
-		super(props);
+		this.azkabanName = props.getString("azkaban.name", "azkaban");
+		this.mailHost = props.getString("mail.host", "localhost");
+		this.mailUser = props.getString("mail.user", "");
+		this.mailPassword = props.getString("mail.password", "");
+		this.mailSender = props.getString("mail.sender", "");
 
+		this.mailTimeout = props.getInt("mail.timeout.millis", 10000);
+		this.connectionTimeout = props.getInt("mail.connection.timeout.millis", 10000);
+		
+		this.clientHostname = props.getString("jetty.hostname", "localhost");
+		this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
+		
 		testMode = props.getBoolean("test.mode", false);
 	}
 	
@@ -30,10 +43,13 @@ public class ExecutorMailer extends AbstractMailer {
 		int execId = flow.getExecutionId();
 		
 		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = super.createEmailMessage(
-					"Flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName(), 
-					"text/html", 
-					emailList);
+			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+			message.setFromAddress(mailSender);
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setTimeout(mailTimeout);
+			message.setConnectionTimeout(connectionTimeout);
+			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
 			
 			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + getAzkabanName() + "</h2>");
 			
@@ -83,10 +99,13 @@ public class ExecutorMailer extends AbstractMailer {
 		int execId = flow.getExecutionId();
 		
 		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = super.createEmailMessage(
-					"Flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName(), 
-					"text/html", 
-					emailList);
+			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+			message.setFromAddress(mailSender);
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setTimeout(mailTimeout);
+			message.setConnectionTimeout(connectionTimeout);
+			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
 			
 			message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName() + "</h2>");
 			message.println("<table>");
@@ -129,10 +148,13 @@ public class ExecutorMailer extends AbstractMailer {
 		int execId = flow.getExecutionId();
 		
 		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = super.createEmailMessage(
-					"Flow '" + flow.getFlowId() + "' has succeeded on " + getAzkabanName(), 
-					"text/html", 
-					emailList);
+			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+			message.setFromAddress(mailSender);
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setTimeout(mailTimeout);
+			message.setConnectionTimeout(connectionTimeout);
+			message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
 			
 			message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + getAzkabanName() + "</h2>");
 			message.println("<table>");
@@ -164,4 +186,4 @@ public class ExecutorMailer extends AbstractMailer {
 		
 		return failedJobs;
 	}
-}
\ No newline at end of file
+}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 527956d..eab7ade 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -69,6 +69,7 @@ public class ExecutorManager {
 	private long lastCleanerThreadCheckTime = -1;
 	
 	private long lastThreadCheckTime = -1;
+	private String updaterStage = "not started";
 	
 	public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
 		this.executorLoader = loader;
@@ -97,6 +98,10 @@ public class ExecutorManager {
 		return executingManager.getState();
 	}
 	
+	public String getExecutorThreadStage() {
+		return updaterStage;
+	}
+	
 	public boolean isThreadActive() {
 		return executingManager.isAlive();
 	}
@@ -172,6 +177,15 @@ public class ExecutorManager {
 		return flows;
 	}
 	
+	public String getRunningFlowIds() {
+		List<Integer> allIds = new ArrayList<Integer>();
+		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+			allIds.add(ref.getSecond().getExecutionId());
+		}
+		Collections.sort(allIds);
+		return allIds.toString();
+	}
+	
 	public List<ExecutableFlow> getRecentlyFinishedFlows() {
 		return new ArrayList<ExecutableFlow>(recentlyFinished.values());
 	}
@@ -593,6 +607,8 @@ public class ExecutorManager {
 				try {
 					lastThreadCheckTime = System.currentTimeMillis();
 					
+					updaterStage = "Starting update all flows.";
+					
 					Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
 					ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
 					ArrayList<ExecutableFlow> finalizeFlows = new ArrayList<ExecutableFlow>();
@@ -602,6 +618,10 @@ public class ExecutorManager {
 							List<Long> updateTimesList = new ArrayList<Long>();
 							List<Integer> executionIdsList = new ArrayList<Integer>();
 						
+							ConnectionInfo connection = entry.getKey();
+							
+							updaterStage = "Starting update flows on " + connection.getHost() + ":" + connection.getPort();
+							
 							// We pack the parameters of the same host together before we query.
 							fillUpdateTimeAndExecId(entry.getValue(), executionIdsList, updateTimesList);
 							
@@ -612,7 +632,7 @@ public class ExecutorManager {
 									ConnectorParams.EXEC_ID_LIST_PARAM, 
 									JSONUtils.toJSON(executionIdsList));
 							
-							ConnectionInfo connection = entry.getKey();
+							
 							Map<String, Object> results = null;
 							try {
 								results = callExecutorServer(connection.getHost(), connection.getPort(), ConnectorParams.UPDATE_ACTION, null, null, executionIds, updateTimes);
@@ -620,6 +640,9 @@ public class ExecutorManager {
 								logger.error(e);
 								for (ExecutableFlow flow: entry.getValue()) {
 									Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(flow.getExecutionId());
+									
+									updaterStage = "Failed to get update. Doing some clean up for flow " + pair.getSecond().getExecutionId();
+									
 									if (pair != null) {
 										ExecutionReference ref = pair.getFirst();
 										int numErrors = ref.getNumErrors();
@@ -642,6 +665,9 @@ public class ExecutorManager {
 								for (Map<String,Object> updateMap: executionUpdates) {
 									try {
 										ExecutableFlow flow = updateExecution(updateMap);
+										
+										updaterStage = "Updated flow " + flow.getExecutionId();
+										
 										if (isFinished(flow)) {
 											finishedFlows.add(flow);
 											finalizeFlows.add(flow);
@@ -659,6 +685,8 @@ public class ExecutorManager {
 							}
 						}
 	
+						updaterStage = "Evicting old recently finished flows.";
+						
 						evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
 						// Add new finished
 						for (ExecutableFlow flow: finishedFlows) {
@@ -668,12 +696,16 @@ public class ExecutorManager {
 							recentlyFinished.put(flow.getExecutionId(), flow);
 						}
 						
+						updaterStage = "Finalizing " + finalizeFlows.size() + " error flows.";
+						
 						// Kill error flows
 						for (ExecutableFlow flow: finalizeFlows) {
 							finalizeFlows(flow);
 						}
 					}
 					
+					updaterStage = "Updated all active flows. Waiting for next round.";
+					
 					synchronized(this) {
 						try {
 							if (runningFlows.size() > 0) {
@@ -688,7 +720,7 @@ public class ExecutorManager {
 				}
 				catch (Exception e) {
 					logger.error(e);
-				}
+				} 
 			}
 		}
 	}
@@ -696,6 +728,7 @@ public class ExecutorManager {
 	private void finalizeFlows(ExecutableFlow flow) {
 		int execId = flow.getExecutionId();
 		
+		updaterStage = "finalizing flow " + execId;
 		// First we check if the execution in the datastore is complete
 		try {
 			ExecutableFlow dsFlow;
@@ -703,15 +736,18 @@ public class ExecutorManager {
 				dsFlow = flow;
 			}
 			else {
+				updaterStage = "finalizing flow " + execId + " loading from db";
 				dsFlow = executorLoader.fetchExecutableFlow(execId);
 			
 				// If it's marked finished, we're good. If not, we fail everything and then mark it finished.
 				if (!isFinished(dsFlow)) {
+					updaterStage = "finalizing flow " + execId + " failing the flow";
 					failEverything(dsFlow);
 					executorLoader.updateExecutableFlow(dsFlow);
 				}
 			}
 
+			updaterStage = "finalizing flow " + execId + " deleting active reference";
 			// Delete the executing reference.
 			if (flow.getEndTime() == -1) {
 				flow.setEndTime(System.currentTimeMillis());
@@ -719,6 +755,7 @@ public class ExecutorManager {
 			}
 			executorLoader.removeActiveExecutableReference(execId);
 			
+			updaterStage = "finalizing flow " + execId + " cleaning from memory";
 			runningFlows.remove(execId);
 			recentlyFinished.put(execId, dsFlow);
 		} catch (ExecutorManagerException e) {
@@ -728,6 +765,7 @@ public class ExecutorManager {
 		// TODO append to the flow log that we forced killed this flow because the target no longer had
 		// the reference.
 		
+		updaterStage = "finalizing flow " + execId + " alerting and emailing";
 		ExecutionOptions options = flow.getExecutionOptions();
 		// But we can definitely email them.
 		if(flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED)
@@ -1006,4 +1044,8 @@ public class ExecutorManager {
 			cleanOldExecutionLogs(DateTime.now().getMillis() - executionLogsRetentionMs);
 		}
 	}
+
+	
+
+	
 }
diff --git a/src/java/azkaban/jmx/JmxExecutorManager.java b/src/java/azkaban/jmx/JmxExecutorManager.java
index 123340d..37f52f8 100644
--- a/src/java/azkaban/jmx/JmxExecutorManager.java
+++ b/src/java/azkaban/jmx/JmxExecutorManager.java
@@ -21,6 +21,11 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
 	public String getExecutorThreadState() {
 		return manager.getExecutorThreadState().toString();
 	}
+	
+	@Override
+	public String getExecutorThreadStage() {
+		return manager.getExecutorThreadStage();
+	}
 
 	@Override
 	public boolean isThreadActive() {
@@ -36,4 +41,11 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
 	public List<String> getPrimaryExecutorHostPorts() {
 		return new ArrayList<String>(manager.getPrimaryServerHosts());
 	}
+
+	@Override
+	public String getRunningFlows() {
+		return manager.getRunningFlowIds();
+	}
+
+	
 }
diff --git a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
index b4a3888..b29d00a 100644
--- a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -6,8 +6,14 @@ public interface JmxExecutorManagerMBean {
 	@DisplayName("OPERATION: getNumRunningFlows")
 	public int getNumRunningFlows();
 	
+	@DisplayName("OPERATION: getRunningFlows")
+	public String getRunningFlows();
+	
 	@DisplayName("OPERATION: getExecutorThreadState")
 	public String getExecutorThreadState();
+	
+	@DisplayName("OPERATION: getExecutorThreadStage")
+	public String getExecutorThreadStage();
 
 	@DisplayName("OPERATION: isThreadActive")
 	public boolean isThreadActive();
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManager.java b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
index f4f59d3..3541140 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManager.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
@@ -54,4 +54,9 @@ public class JmxFlowRunnerManager implements JmxFlowRunnerManagerMBean {
 		return manager.getNumExecutingJobs();
 	}
 
+	@Override
+	public String getRunningFlows() {
+		return manager.getRunningFlowIds();
+	}
+
 }
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
index 47c6a02..ed509ef 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
@@ -25,6 +25,9 @@ public interface JmxFlowRunnerManagerMBean {
 	@DisplayName("OPERATION: getNumExecutingFlows")
 	public int getNumExecutingFlows();
 	
+	@DisplayName("OPERATION: getRunningFlows")
+	public String getRunningFlows();
+	
 	@DisplayName("OPERATION: getTotalNumRunningJobs")
 	public int countTotalNumRunningJobs();
 }
diff --git a/src/java/azkaban/jmx/JmxScheduler.java b/src/java/azkaban/jmx/JmxScheduler.java
index 73bcf98..8efc576 100644
--- a/src/java/azkaban/jmx/JmxScheduler.java
+++ b/src/java/azkaban/jmx/JmxScheduler.java
@@ -28,4 +28,9 @@ public class JmxScheduler implements JmxSchedulerMBean {
 	public Boolean isThreadActive() {
 		return manager.isThreadActive();
 	}
+
+	@Override
+	public String getScheduleThreadStage() {
+		return manager.getThreadStage();
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/jmx/JmxSchedulerMBean.java b/src/java/azkaban/jmx/JmxSchedulerMBean.java
index 19a8b70..4de54ef 100644
--- a/src/java/azkaban/jmx/JmxSchedulerMBean.java
+++ b/src/java/azkaban/jmx/JmxSchedulerMBean.java
@@ -4,6 +4,9 @@ public interface JmxSchedulerMBean {
 	@DisplayName("OPERATION: getScheduleThreadState")
 	String getScheduleThreadState();
 	
+	@DisplayName("OPERATION: getScheduleThreadStage")
+	String getScheduleThreadStage();
+	
 	@DisplayName("OPERATION: getNextScheduleTime")
 	Long getNextScheduleTime();
 	
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index c60f143..3e9c173 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -70,6 +70,7 @@ public class ScheduleManager {
 	// Used for mbeans to query Scheduler status
 	private long lastCheckTime = -1;
 	private long nextWakupTime = -1;
+	private String runnerStage = "not started";
 
 	/**
 	 * Give the schedule manager a loader class that will properly load the
@@ -372,6 +373,8 @@ public class ScheduleManager {
 				synchronized (this) {
 					try {
 						lastCheckTime = System.currentTimeMillis();
+						
+						runnerStage = "Starting schedule scan.";
 						// TODO clear up the exception handling
 						Schedule s = schedules.peek();
 
@@ -380,6 +383,7 @@ public class ScheduleManager {
 							// there's something to do. Most likely there will not be.
 							try {
 								logger.info("Nothing scheduled to run. Checking again soon.");
+								runnerStage = "Waiting for next round scan.";
 								nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
 								this.wait(TIMEOUT_MS);
 							} catch (InterruptedException e) {
@@ -391,6 +395,8 @@ public class ScheduleManager {
 								// Run flow. The invocation of flows should be quick.
 								Schedule runningSched = schedules.poll();
 
+								runnerStage = "Ready to run schedule " + runningSched.toString();
+								
 								logger.info("Scheduler ready to run " + runningSched.toString());
 								// Execute the flow here
 								try {
@@ -406,7 +412,7 @@ public class ScheduleManager {
 										logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
 										throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
 									}
-
+									
 									// Create ExecutableFlow
 									ExecutableFlow exflow = new ExecutableFlow(flow);
 									System.out.println("ScheduleManager: creating schedule: " +runningSched.getScheduleId());
@@ -428,6 +434,8 @@ public class ScheduleManager {
 										flowOptions.setSuccessEmails(flow.getSuccessEmails());
 									}
 									
+									runnerStage = "Submitting flow " + exflow.getFlowId();
+									
 									try {
 										executorManager.submitExecutableFlow(exflow);
 										logger.info("Scheduler has invoked " + exflow.getExecutionId());
@@ -443,6 +451,7 @@ public class ScheduleManager {
 									SlaOptions slaOptions = runningSched.getSlaOptions();
 									if(slaOptions != null) {
 										logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
+										runnerStage = "Submitting SLA checkings for " + runningSched.getFlowName();
 										// submit flow slas
 										List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
 										for(SlaSetting set : slaOptions.getSettings()) {
@@ -472,6 +481,7 @@ public class ScheduleManager {
 									logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
 								}
 
+								runnerStage = "Done running schedule for " + runningSched.toString();
 								removeRunnerSchedule(runningSched);
 
 								// Immediately reschedule if it's possible. Let
@@ -485,6 +495,7 @@ public class ScheduleManager {
 									removeSchedule(runningSched);
 								}								
 							} else {
+								runnerStage = "Waiting for next round scan.";
 								// wait until flow run
 								long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
 								try {
@@ -542,4 +553,9 @@ public class ScheduleManager {
 	public boolean isThreadActive() {
 		return runner.isAlive();
 	}
+
+	public String getThreadStage() {
+		return runnerStage;
+	}
+	
 }
diff --git a/src/java/azkaban/utils/EmailMessage.java b/src/java/azkaban/utils/EmailMessage.java
index 06ca9bc..3bb0e04 100644
--- a/src/java/azkaban/utils/EmailMessage.java
+++ b/src/java/azkaban/utils/EmailMessage.java
@@ -32,7 +32,9 @@ public class EmailMessage {
 	private String _fromAddress;
 	private String _mimeType = "text/plain";
 	private StringBuffer _body = new StringBuffer();
-
+	private int _mailTimeout = 10000;
+	private int _connectionTimeout = 10000;
+	
 	private ArrayList<BodyPart> _attachments = new ArrayList<BodyPart>();
 
 	public EmailMessage() {
@@ -44,7 +46,17 @@ public class EmailMessage {
 		_mailHost = host;
 		_mailPassword = password;
 	}
-
+	
+	public EmailMessage setTimeout(int timeoutMillis) {
+		_mailTimeout = timeoutMillis;
+		return this;
+	}
+	
+	public EmailMessage setConnectionTimeout(int timeoutMillis) {
+		_connectionTimeout = timeoutMillis;
+		return this;
+	}
+	
 	public EmailMessage setMailHost(String host) {
 		_mailHost = host;
 		return this;
@@ -136,6 +148,8 @@ public class EmailMessage {
 		props.put("mail."+protocol+".auth", "true");
 		props.put("mail.user", _mailUser);
 		props.put("mail.password", _mailPassword);
+		props.put("mail."+protocol+".timeout", _mailTimeout);
+		props.put("mail."+protocol+".connectiontimeout", _connectionTimeout);
 
 		Session session = Session.getInstance(props, null);
 		Message message = new MimeMessage(session);