azkaban-memoizeit

Removing docs, now that they're obsolete. Removing EhCache

5/20/2013 4:43:07 PM

Details

diff --git a/src/java/azkaban/database/AzkabanDatabaseSetup.java b/src/java/azkaban/database/AzkabanDatabaseSetup.java
index c42ad33..d1687ef 100644
--- a/src/java/azkaban/database/AzkabanDatabaseSetup.java
+++ b/src/java/azkaban/database/AzkabanDatabaseSetup.java
@@ -228,6 +228,10 @@ public class AzkabanDatabaseSetup {
 		for (File file: createScripts) {
 			String fileName = file.getName();
 			if (fileName.compareTo(updateFileNameVersion) > 0) {
+				if (fileName.startsWith(updateFileNameVersion)) {
+					continue;
+				}
+				
 				String[] split = fileName.split("\\.");
 				String versionNum = "";
 				
@@ -243,7 +247,7 @@ public class AzkabanDatabaseSetup {
 				if (versionNum.endsWith(".")) {
 					versionNum = versionNum.substring(0, versionNum.length() - 1);
 					
-					if (versionNum.compareTo(version) > 0) {
+					if (versionNum.compareTo(version) == 0) {
 						versions.add(versionNum);
 					}
 				}
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index e718921..0972276 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -6,7 +6,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
diff --git a/src/java/azkaban/utils/cache/Cache.java b/src/java/azkaban/utils/cache/Cache.java
new file mode 100644
index 0000000..2125906
--- /dev/null
+++ b/src/java/azkaban/utils/cache/Cache.java
@@ -0,0 +1,175 @@
+package azkaban.utils.cache;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class Cache {
+	private long nextUpdateTime = 0;
+	private long updateFrequency = 1 * 60 * 1000;
+	private int maxCacheSize = -1;
+
+	private long expireTimeToLive = -1; // Never expires
+	private long expireTimeToIdle = -1;
+
+	private EjectionPolicy ejectionPolicy = EjectionPolicy.LRU;
+	private CacheManager manager = null;
+
+	private Map<Object, Element<?>> elementMap = new ConcurrentHashMap<Object, Element<?>>();
+
+	public enum EjectionPolicy {
+		LRU, FIFO
+	}
+
+	/* package */Cache(CacheManager manager) {
+		this.manager = manager;
+	}
+
+	@SuppressWarnings("unchecked")
+	public <T> T get(Object key) {
+		Element<?> element = elementMap.get(key);
+		if (element == null) {
+			return null;
+		}
+		return (T) element.getElement();
+	}
+
+	public <T> void put(Object key, T item) {
+		Element<T> elem = new Element<T>(key, item);
+		elementMap.put(key, elem);
+	}
+
+	public boolean remove(Object key) {
+		Element<?> elem = elementMap.remove(key);
+		if (elem == null) {
+			return false;
+		}
+		
+		return true;
+	}
+
+	public Cache setMaxCacheSize(int size) {
+		maxCacheSize = size;
+		return this;
+	}
+
+	public Cache setEjectionPolicy(EjectionPolicy policy) {
+		ejectionPolicy = policy;
+		return this;
+	}
+
+	public Cache setUpdateFrequencyMs(long updateFrequencyMs) {
+		this.updateFrequency = updateFrequencyMs;
+		return this;
+	}
+
+	public Cache setExpiryTimeToLiveMs(long time) {
+		this.expireTimeToLive = time;
+		if (time > 0) {
+			manager.update();
+		}
+
+		return this;
+	}
+
+	public Cache setExpiryIdleTimeMs(long time) {
+		this.expireTimeToIdle = time;
+		if (time > 0) {
+			manager.update();
+		}
+		return this;
+	}
+
+	public int getSize() {
+		return elementMap.size();
+	}
+
+	public long getExpireTimeToLive() {
+		return expireTimeToLive;
+	}
+
+	public long getExpireTimeToIdle() {
+		return expireTimeToIdle;
+	}
+
+	public synchronized <T> void insertElement(Object key, T item) {
+		if (maxCacheSize < 0 || elementMap.size() < maxCacheSize) {
+			Element<T> elem = new Element<T>(key, item);
+			elementMap.put(key, elem);
+		} else {
+			internalExpireCache();
+
+			Element<T> elem = new Element<T>(key, item);
+			if (elementMap.size() < maxCacheSize) {
+				elementMap.put(key, elem);
+			} else {
+				Element<?> element = getNextExpiryElement();
+				if (element != null) {
+					elementMap.remove(element.getKey());
+				}
+
+				elementMap.put(key, elem);
+			}
+		}
+	}
+
+	private Element<?> getNextExpiryElement() {
+		if (ejectionPolicy == EjectionPolicy.LRU) {
+			long latestAccessTime = Long.MAX_VALUE;
+			Element<?> ejectionCandidate = null;
+			for (Element<?> elem : elementMap.values()) {
+				if (latestAccessTime > elem.getLastUpdateTime()) {
+					latestAccessTime = elem.getLastUpdateTime();
+					ejectionCandidate = elem;
+				}
+			}
+
+			return ejectionCandidate;
+		} else if (ejectionPolicy == EjectionPolicy.FIFO) {
+			long earliestCreateTime = Long.MAX_VALUE;
+			Element<?> ejectionCandidate = null;
+			for (Element<?> elem : elementMap.values()) {
+				if (earliestCreateTime > elem.getCreationTime()) {
+					earliestCreateTime = elem.getCreationTime();
+					ejectionCandidate = elem;
+				}
+			}
+			return ejectionCandidate;
+		}
+
+		return null;
+	}
+
+	public synchronized void expireCache() {
+		long currentTime = System.currentTimeMillis();
+		if (nextUpdateTime < currentTime) {
+			internalExpireCache();
+			nextUpdateTime = currentTime + updateFrequency;
+		}
+	}
+
+	private synchronized void internalExpireCache() {
+		ArrayList<Element<?>> elems = new ArrayList<Element<?>>(elementMap.values());
+		
+		for (Element<?> elem : elems) {
+			if (shouldExpire(elem)) {
+				elementMap.remove(elem.getKey());
+			}
+		}
+	}
+
+	private boolean shouldExpire(Element<?> elem) {
+		if (expireTimeToLive > -1) {
+			if (System.currentTimeMillis() - elem.getCreationTime() > expireTimeToLive) {
+				return true;
+			}
+		}
+		if (expireTimeToIdle > -1) {
+			if (System.currentTimeMillis() - elem.getLastUpdateTime() > expireTimeToIdle) {
+				return true;
+			}
+		}
+
+		return false;
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/utils/cache/CacheManager.java b/src/java/azkaban/utils/cache/CacheManager.java
new file mode 100644
index 0000000..9b38577
--- /dev/null
+++ b/src/java/azkaban/utils/cache/CacheManager.java
@@ -0,0 +1,116 @@
+package azkaban.utils.cache;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class CacheManager {
+	// Thread that expires caches at
+	private static final long UPDATE_FREQUENCY = 30000; // Every 30 sec by default.
+
+	private long updateFrequency = UPDATE_FREQUENCY;
+	private Set<Cache> caches;
+	private static CacheManager manager = null;
+	private final CacheManagerThread updaterThread;
+
+	private boolean activeExpiry = false;
+
+	public static CacheManager getInstance() {
+		if (manager == null) {
+			manager = new CacheManager();
+		}
+		
+		return manager;
+	}
+	
+	private CacheManager() {
+		updaterThread = new CacheManagerThread();
+		caches = new HashSet<Cache>();
+
+		updaterThread.start();
+	}
+
+	public static void setUpdateFrequency(long updateFreqMs) {
+		manager.internalUpdateFrequency(updateFreqMs);
+	}
+
+	public static void shutdown() {
+		manager.internalShutdown();
+	}
+
+	public Cache createCache() {
+		Cache cache = new Cache(manager);
+		manager.internalAddCache(cache);
+		return cache;
+	}
+
+	public void removeCache(Cache cache) {
+		manager.internalRemoveCache(cache);
+	}
+
+	private void internalUpdateFrequency(long updateFreq) {
+		updateFrequency = updateFreq;
+		updaterThread.interrupt();
+	}
+
+	private void internalAddCache(Cache cache) {
+		caches.add(cache);
+		updaterThread.interrupt();
+	}
+
+	private void internalRemoveCache(Cache cache) {
+		caches.remove(cache);
+	}
+
+	private synchronized void internalShutdown() {
+		updaterThread.shutdown();
+	}
+
+	/* package */synchronized void update() {
+		boolean activeExpiry = false;
+		for (Cache cache : caches) {
+			if (cache.getExpireTimeToIdle() > 0
+					|| cache.getExpireTimeToLive() > 0) {
+				activeExpiry = true;
+				break;
+			}
+		}
+
+		if (this.activeExpiry != activeExpiry && activeExpiry) {
+			this.activeExpiry = activeExpiry;
+			updaterThread.interrupt();
+		}
+	}
+
+	private class CacheManagerThread extends Thread {
+		private boolean shutdown = false;
+
+		public void run() {
+			while (!shutdown) {
+				if (activeExpiry) {
+					for (Cache cache : caches) {
+						cache.expireCache();
+					}
+
+					synchronized (this) {
+						try {
+							wait(updateFrequency);
+						} catch (InterruptedException e) {
+						}
+					}
+				} else {
+					synchronized (this) {
+						try {
+							wait();
+						} catch (InterruptedException e) {
+						}
+					}
+				}
+			}
+		}
+
+		public void shutdown() {
+			this.shutdown = true;
+			updaterThread.interrupt();
+		}
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/utils/cache/Element.java b/src/java/azkaban/utils/cache/Element.java
new file mode 100644
index 0000000..14c2ad9
--- /dev/null
+++ b/src/java/azkaban/utils/cache/Element.java
@@ -0,0 +1,32 @@
+package azkaban.utils.cache;
+
+public class Element<T> {
+	private Object key;
+	private T element;
+	private long creationTime = 0;
+	private long lastAccessTime = 0;
+
+	public Element(Object key, T element) {
+		this.key = key;
+		creationTime = System.currentTimeMillis();
+		lastAccessTime = creationTime;
+		this.element = element;
+	}
+
+	public Object getKey() {
+		return key;
+	}
+
+	public T getElement() {
+		lastAccessTime = System.currentTimeMillis();
+		return element;
+	}
+
+	public long getCreationTime() {
+		return creationTime;
+	}
+
+	public long getLastUpdateTime() {
+		return lastAccessTime;
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 8810352..8e87919 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -63,7 +63,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	private String color;
 
 	private List<ViewerPlugin> viewerPlugins;
-	
+
 	/**
 	 * To retrieve the application for the servlet
 	 * 
@@ -92,7 +92,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 			viewerPlugins = server.getViewerPlugins();
 		}
 	}
-
+	
 	/**
 	 * Checks for the existance of the parameter in the request
 	 * 
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 891e896..8ce912b 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -16,7 +16,11 @@
 
 package azkaban.webapp.servlet;
 
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.Writer;
 import java.util.HashMap;
 import java.util.Map;
@@ -29,6 +33,7 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.fileupload.servlet.ServletFileUpload;
+import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 
 import azkaban.project.Project;
@@ -51,6 +56,18 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 	private static final String SESSION_ID_NAME = "azkaban.browser.session.id";
 	private static final int DEFAULT_UPLOAD_DISK_SPOOL_SIZE = 20 * 1024 * 1024;
 	
+	private static HashMap<String, String> contextType = new HashMap<String,String>();
+	static {
+		contextType.put(".js", "application/javascript");
+		contextType.put(".css", "text/css");
+		contextType.put(".png", "image/png");
+		contextType.put(".jpeg", "image/jpeg");
+		contextType.put(".jpg", "image/jpeg");
+	}
+	
+	
+	private File webResourceDirectory = null;
+	
 	private MultipartParser multipartParser;
 	
 	@Override
@@ -60,6 +77,10 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 		multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
 	}
 	
+	public void setResourceDirectory(File file) {
+		this.webResourceDirectory = file;
+	}
+	
 	@Override
 	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 		// Set session id
@@ -74,6 +95,10 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 
 		if (session != null) {
 			logger.info("Found session " + session.getUser());
+			if (handleFileGet(req, resp)) {
+				return;
+			}
+			
 			handleGet(req, resp, session);
 		} else {
 			if (hasParam(req, "ajax")) {
@@ -87,6 +112,46 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 		}
 	}
 	
+	private boolean handleFileGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+		if (webResourceDirectory == null) {
+			return false;
+		}
+		
+		// Check if it's a resource
+		String prefix = req.getContextPath() + req.getServletPath();
+		String path = req.getRequestURI().substring(prefix.length());
+		int index = path.lastIndexOf('.');
+		if (index == -1 ) {
+			return false;
+		}
+		
+		String extension = path.substring(index);
+		if (contextType.containsKey(extension)) {
+			File file = new File(webResourceDirectory, path);
+			if (!file.exists() || !file.isFile()) {
+				return false;
+			}
+			
+			resp.setContentType(contextType.get(extension));
+			
+			OutputStream output = resp.getOutputStream();
+			BufferedInputStream input = null;
+			try {
+				input = new BufferedInputStream(new FileInputStream(file));
+				IOUtils.copy(input, output);
+			}
+			finally {
+				if (input != null) {
+					input.close();
+				}
+			}
+			output.flush();
+			return true;
+		}
+
+		return false;
+	}
+	
 	private Session getSessionFromRequest(HttpServletRequest req) throws ServletException {
 		String remoteIp = req.getRemoteAddr();
 		Cookie cookie = getCookieByName(req, SESSION_ID_NAME);
diff --git a/src/java/azkaban/webapp/session/SessionCache.java b/src/java/azkaban/webapp/session/SessionCache.java
index e110be2..be76c55 100644
--- a/src/java/azkaban/webapp/session/SessionCache.java
+++ b/src/java/azkaban/webapp/session/SessionCache.java
@@ -17,11 +17,11 @@
 package azkaban.webapp.session;
 
 import azkaban.utils.Props;
-import net.sf.ehcache.Cache;
-import net.sf.ehcache.CacheManager;
-import net.sf.ehcache.Element;
-import net.sf.ehcache.config.CacheConfiguration;
-import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+import azkaban.utils.cache.Cache;
+import azkaban.utils.cache.CacheManager;
+import azkaban.utils.cache.Cache.EjectionPolicy;
+import azkaban.utils.cache.Element;
+
 
 /**
  * Cache for web session.
@@ -33,8 +33,8 @@ import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
  */
 public class SessionCache {
 	private static final int MAX_NUM_SESSIONS = 10000;
-	private static final int SESSION_TIME_TO_LIVE = 86400;
-	private CacheManager manager = CacheManager.create();
+	private static final int SESSION_TIME_TO_LIVE = 10000;
+//	private CacheManager manager = CacheManager.create();
 	private Cache cache;
 
 	/**
@@ -43,18 +43,12 @@ public class SessionCache {
 	 * @param props
 	 */
 	public SessionCache(Props props) {
-		// disable ehcache auto update
-		System.setProperty("net.sf.ehcache.skipUpdateCheck", "true");
-		CacheConfiguration config = new CacheConfiguration();
-		config.setName("sessionCache");
-		config.setMaxEntriesLocalHeap(props.getInt("max.num.sessions", MAX_NUM_SESSIONS));
-		config.setTimeToLiveSeconds(props.getInt("session.time.to.live", SESSION_TIME_TO_LIVE));
-		config.eternal(false);
-		config.diskPersistent(false);
-		config.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU);
-
-		cache = new Cache(config);
-		manager.addCache(cache);
+		CacheManager manager = CacheManager.getInstance();
+		
+		cache = manager.createCache();
+		cache.setEjectionPolicy(EjectionPolicy.LRU);
+		cache.setMaxCacheSize(props.getInt("max.num.sessions", MAX_NUM_SESSIONS));
+		cache.setExpiryTimeToLiveMs(props.getInt("session.time.to.live", SESSION_TIME_TO_LIVE));
 	}
 
 	/**
@@ -64,12 +58,9 @@ public class SessionCache {
 	 * @return
 	 */
 	public Session getSession(String sessionId) {
-		Element elem = cache.get(sessionId);
-		if (elem == null) {
-			return null;
-		}
+		Session elem = cache.<Session>get(sessionId);
 
-		return (Session) elem.getObjectValue();
+		return elem;
 	}
 
 	/**
@@ -79,8 +70,7 @@ public class SessionCache {
 	 * @param session
 	 */
 	public void addSession(Session session) {
-		Element elem = new Element(session.getSessionId(), session);
-		cache.put(elem);
+		cache.put(session.getSessionId(), session);
 	}
 
 	/**
diff --git a/src/sql/update.schedules.2.2.sql b/src/sql/update.schedules.2.2.sql
index 98849db..d719119 100644
--- a/src/sql/update.schedules.2.2.sql
+++ b/src/sql/update.schedules.2.2.sql
@@ -1,3 +1,3 @@
 ALTER TABLE schedules DROP PRIMARY KEY;
 ALTER TABLE schedules ADD COLUMN schedule_id INT PRIMARY KEY NOT NULL AUTO_INCREMENT;
-ALTER TABLE schedules ADD INDEX project_id (project_id, flow_name);
+ALTER TABLE schedules ADD INDEX schedule_project_id (project_id, flow_name);