azkaban-aplcache

Fix for #408: Azkaban to adjust execution and trigger times

3/10/2016 4:56:47 AM

Details

diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index 97e858c..11292e0 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -18,30 +18,36 @@ package azkaban.trigger;
 
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 
 import org.apache.log4j.Logger;
 
+import com.mysql.jdbc.Util;
+
 import azkaban.event.Event;
+import azkaban.event.Event.Type;
 import azkaban.event.EventHandler;
 import azkaban.event.EventListener;
-import azkaban.event.Event.Type;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManager;
 import azkaban.utils.Props;
+import azkaban.utils.Utils;
 
 public class TriggerManager extends EventHandler implements
     TriggerManagerAdapter {
   private static Logger logger = Logger.getLogger(TriggerManager.class);
   public static final long DEFAULT_SCANNER_INTERVAL_MS = 60000;
-
+  private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
+  
   private static Map<Integer, Trigger> triggerIdMap =
       new ConcurrentHashMap<Integer, Trigger>();
 
@@ -58,9 +64,10 @@ public class TriggerManager extends EventHandler implements
       new ExecutorManagerEventListener();
 
   private final Object syncObj = new Object();
-
+  private String timezone;
   private String scannerStage = "";
-
+  private boolean isDayLightSaving;
+  
   public TriggerManager(Props props, TriggerLoader triggerLoader,
       ExecutorManager executorManager) throws TriggerManagerException {
 
@@ -70,6 +77,12 @@ public class TriggerManager extends EventHandler implements
         props.getLong("trigger.scan.interval", DEFAULT_SCANNER_INTERVAL_MS);
     runnerThread = new TriggerScannerThread(scannerInterval);
 
+    if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
+      this.timezone = props.getString(DEFAULT_TIMEZONE_ID);
+      logger.info("Setting timezone to " + timezone);
+      isDayLightSaving =  Utils.isCurrentlyDaylightSaving(timezone);
+    }
+    
     checkerTypeLoader = new CheckerTypeLoader();
     actionTypeLoader = new ActionTypeLoader();
 
@@ -228,6 +241,11 @@ public class TriggerManager extends EventHandler implements
                     + lastRunnerThreadCheckTime;
 
             try {
+              if (timezone != null
+                      && isDayLightSaving != Utils.isCurrentlyDaylightSaving(timezone)) {
+                Utils.setTimeZone(timezone);
+                isDayLightSaving =  Utils.isCurrentlyDaylightSaving(timezone);
+              }
               checkAllTriggers();
               justFinishedFlows.clear();
             } catch (Exception e) {
diff --git a/azkaban-common/src/main/java/azkaban/utils/Utils.java b/azkaban-common/src/main/java/azkaban/utils/Utils.java
index 138b80f..80665e0 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Utils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Utils.java
@@ -28,13 +28,16 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Collection;
+import java.util.Date;
 import java.util.Enumeration;
 import java.util.Random;
+import java.util.TimeZone;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
 import java.util.zip.ZipOutputStream;
 
 import org.apache.commons.io.IOUtils;
+import org.joda.time.DateTimeZone;
 import org.joda.time.Days;
 import org.joda.time.DurationFieldType;
 import org.joda.time.Hours;
@@ -269,6 +272,16 @@ public class Utils {
   }
 
   /**
+   * Sets global timezone
+   * @param timezone
+   */
+  public static void setTimeZone(String timezone) {
+    System.setProperty("user.timezone", timezone);
+    TimeZone.setDefault(TimeZone.getTimeZone(timezone));
+    DateTimeZone.setDefault(DateTimeZone.forID(timezone));
+  }
+  
+  /**
    * Call the class constructor with the given arguments
    *
    * @param c The class
@@ -350,6 +363,15 @@ public class Utils {
     }
   }
 
+  /**
+   * True, if it is Daylight saving time in @timezone
+   * @param timezone
+   * @return
+   */
+  public static boolean isCurrentlyDaylightSaving(String timezone) {
+    return TimeZone.getTimeZone(timezone).inDaylightTime(new Date());
+  }
+  
   public static ReadablePeriod parsePeriodString(String periodStr) {
     ReadablePeriod period;
     char periodUnit = periodStr.charAt(periodStr.length() - 1);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 47fbfad..80e8a95 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -321,9 +321,7 @@ public class AzkabanExecutorServer {
     // Setup time zone
     if (azkabanSettings.containsKey(DEFAULT_TIMEZONE_ID)) {
       String timezone = azkabanSettings.getString(DEFAULT_TIMEZONE_ID);
-      System.setProperty("user.timezone", timezone);
-      TimeZone.setDefault(TimeZone.getTimeZone(timezone));
-      DateTimeZone.setDefault(DateTimeZone.forID(timezone));
+      Utils.setTimeZone(timezone);
 
       logger.info("Setting timezone to " + timezone);
     }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 01ab373..4ac6d43 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -62,6 +62,7 @@ import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.ThreadPoolExecutingListener;
 import azkaban.utils.TrackingThreadPool;
+import azkaban.utils.Utils;
 
 /**
  * Execution manager for the server side execution.
@@ -94,6 +95,8 @@ public class FlowRunnerManager implements EventListener,
       "executor.threadpool.workqueue.size";
   private static final String EXECUTOR_FLOW_THREADS = "executor.flow.threads";
   private static final String FLOW_NUM_JOB_THREADS = "flow.num.job.threads";
+  private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
+  
   private static Logger logger = Logger.getLogger(FlowRunnerManager.class);
   private File executionDirectory;
   private File projectDirectory;
@@ -147,7 +150,9 @@ public class FlowRunnerManager implements EventListener,
 
   // date time of the the last flow submitted.
   private long lastFlowSubmittedDate = 0;
-
+  private String timezone;
+  private boolean isDayLightSaving;
+  
   public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
       ProjectLoader projectLoader, ClassLoader parentClassLoader)
       throws IOException {
@@ -157,6 +162,12 @@ public class FlowRunnerManager implements EventListener,
         new File(props.getString("azkaban.project.dir", "projects"));
 
     azkabanProps = props;
+    
+    if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
+      this.timezone = props.getString(DEFAULT_TIMEZONE_ID);
+      logger.info("Setting timezone to " + timezone);
+      isDayLightSaving =  Utils.isCurrentlyDaylightSaving(timezone);
+    }
 
     // JobWrappingFactory.init(props, getClass().getClassLoader());
     executionDirRetention =
@@ -447,6 +458,12 @@ public class FlowRunnerManager implements EventListener,
   }
 
   public void submitFlow(int execId) throws ExecutorManagerException {
+    if (timezone != null
+            && isDayLightSaving != Utils.isCurrentlyDaylightSaving(timezone)) {
+      Utils.setTimeZone(timezone);
+      isDayLightSaving =  Utils.isCurrentlyDaylightSaving(timezone);
+    }
+    
     // Load file and submit
     if (runningFlows.containsKey(execId)) {
       throw new ExecutorManagerException("Execution " + execId
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 89dea03..4cbdae0 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -31,7 +31,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.TimeZone;
 
 import javax.management.MBeanInfo;
 import javax.management.MBeanServer;
@@ -44,7 +43,6 @@ import org.apache.velocity.app.VelocityEngine;
 import org.apache.velocity.runtime.log.Log4JLogChute;
 import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
 import org.apache.velocity.runtime.resource.loader.JarResourceLoader;
-import org.joda.time.DateTimeZone;
 import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.bio.SocketConnector;
@@ -54,6 +52,8 @@ import org.mortbay.jetty.servlet.DefaultServlet;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
+import com.linkedin.restli.server.RestliServlet;
+
 import azkaban.alert.Alerter;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.executor.ExecutorManager;
@@ -101,8 +101,6 @@ import azkaban.webapp.servlet.ScheduleServlet;
 import azkaban.webapp.servlet.StatsServlet;
 import azkaban.webapp.servlet.TriggerManagerServlet;
 
-import com.linkedin.restli.server.RestliServlet;
-
 /**
  * The Azkaban Jetty server class
  *
@@ -157,7 +155,7 @@ public class AzkabanWebServer extends AzkabanServer {
   private ScheduleManager scheduleManager;
   private TriggerManager triggerManager;
   private Map<String, Alerter> alerters;
-
+  
   private final ClassLoader baseClassLoader;
 
   private Props props;
@@ -215,10 +213,7 @@ public class AzkabanWebServer extends AzkabanServer {
     // Setup time zone
     if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
       String timezone = props.getString(DEFAULT_TIMEZONE_ID);
-      System.setProperty("user.timezone", timezone);
-      TimeZone.setDefault(TimeZone.getTimeZone(timezone));
-      DateTimeZone.setDefault(DateTimeZone.forID(timezone));
-      logger.info("Setting timezone to " + timezone);
+      Utils.setTimeZone(timezone);
     }
 
     configureMBeanServer();