azkaban-memoizeit
Changes
src/java/azkaban/utils/cache/Cache.java 175(+175 -0)
src/java/azkaban/utils/cache/CacheManager.java 116(+116 -0)
src/java/azkaban/utils/cache/Element.java 32(+32 -0)
src/sql/update.schedules.2.2.sql 2(+1 -1)
Details
diff --git a/src/java/azkaban/database/AzkabanDatabaseSetup.java b/src/java/azkaban/database/AzkabanDatabaseSetup.java
index c42ad33..d1687ef 100644
--- a/src/java/azkaban/database/AzkabanDatabaseSetup.java
+++ b/src/java/azkaban/database/AzkabanDatabaseSetup.java
@@ -228,6 +228,10 @@ public class AzkabanDatabaseSetup {
for (File file: createScripts) {
String fileName = file.getName();
if (fileName.compareTo(updateFileNameVersion) > 0) {
+ if (fileName.startsWith(updateFileNameVersion)) {
+ continue;
+ }
+
String[] split = fileName.split("\\.");
String versionNum = "";
@@ -243,7 +247,7 @@ public class AzkabanDatabaseSetup {
if (versionNum.endsWith(".")) {
versionNum = versionNum.substring(0, versionNum.length() - 1);
- if (versionNum.compareTo(version) > 0) {
+ if (versionNum.compareTo(version) == 0) {
versions.add(versionNum);
}
}
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index e718921..0972276 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -6,7 +6,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
src/java/azkaban/utils/cache/Cache.java 175(+175 -0)
diff --git a/src/java/azkaban/utils/cache/Cache.java b/src/java/azkaban/utils/cache/Cache.java
new file mode 100644
index 0000000..2125906
--- /dev/null
+++ b/src/java/azkaban/utils/cache/Cache.java
@@ -0,0 +1,175 @@
+package azkaban.utils.cache;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class Cache {
+ private long nextUpdateTime = 0;
+ private long updateFrequency = 1 * 60 * 1000;
+ private int maxCacheSize = -1;
+
+ private long expireTimeToLive = -1; // Never expires
+ private long expireTimeToIdle = -1;
+
+ private EjectionPolicy ejectionPolicy = EjectionPolicy.LRU;
+ private CacheManager manager = null;
+
+ private Map<Object, Element<?>> elementMap = new ConcurrentHashMap<Object, Element<?>>();
+
+ public enum EjectionPolicy {
+ LRU, FIFO
+ }
+
+ /* package */Cache(CacheManager manager) {
+ this.manager = manager;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T get(Object key) {
+ Element<?> element = elementMap.get(key);
+ if (element == null) {
+ return null;
+ }
+ return (T) element.getElement();
+ }
+
+ public <T> void put(Object key, T item) {
+ Element<T> elem = new Element<T>(key, item);
+ elementMap.put(key, elem);
+ }
+
+ public boolean remove(Object key) {
+ Element<?> elem = elementMap.remove(key);
+ if (elem == null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public Cache setMaxCacheSize(int size) {
+ maxCacheSize = size;
+ return this;
+ }
+
+ public Cache setEjectionPolicy(EjectionPolicy policy) {
+ ejectionPolicy = policy;
+ return this;
+ }
+
+ public Cache setUpdateFrequencyMs(long updateFrequencyMs) {
+ this.updateFrequency = updateFrequencyMs;
+ return this;
+ }
+
+ public Cache setExpiryTimeToLiveMs(long time) {
+ this.expireTimeToLive = time;
+ if (time > 0) {
+ manager.update();
+ }
+
+ return this;
+ }
+
+ public Cache setExpiryIdleTimeMs(long time) {
+ this.expireTimeToIdle = time;
+ if (time > 0) {
+ manager.update();
+ }
+ return this;
+ }
+
+ public int getSize() {
+ return elementMap.size();
+ }
+
+ public long getExpireTimeToLive() {
+ return expireTimeToLive;
+ }
+
+ public long getExpireTimeToIdle() {
+ return expireTimeToIdle;
+ }
+
+ public synchronized <T> void insertElement(Object key, T item) {
+ if (maxCacheSize < 0 || elementMap.size() < maxCacheSize) {
+ Element<T> elem = new Element<T>(key, item);
+ elementMap.put(key, elem);
+ } else {
+ internalExpireCache();
+
+ Element<T> elem = new Element<T>(key, item);
+ if (elementMap.size() < maxCacheSize) {
+ elementMap.put(key, elem);
+ } else {
+ Element<?> element = getNextExpiryElement();
+ if (element != null) {
+ elementMap.remove(element.getKey());
+ }
+
+ elementMap.put(key, elem);
+ }
+ }
+ }
+
+ private Element<?> getNextExpiryElement() {
+ if (ejectionPolicy == EjectionPolicy.LRU) {
+ long latestAccessTime = Long.MAX_VALUE;
+ Element<?> ejectionCandidate = null;
+ for (Element<?> elem : elementMap.values()) {
+ if (latestAccessTime > elem.getLastUpdateTime()) {
+ latestAccessTime = elem.getLastUpdateTime();
+ ejectionCandidate = elem;
+ }
+ }
+
+ return ejectionCandidate;
+ } else if (ejectionPolicy == EjectionPolicy.FIFO) {
+ long earliestCreateTime = Long.MAX_VALUE;
+ Element<?> ejectionCandidate = null;
+ for (Element<?> elem : elementMap.values()) {
+ if (earliestCreateTime > elem.getCreationTime()) {
+ earliestCreateTime = elem.getCreationTime();
+ ejectionCandidate = elem;
+ }
+ }
+ return ejectionCandidate;
+ }
+
+ return null;
+ }
+
+ public synchronized void expireCache() {
+ long currentTime = System.currentTimeMillis();
+ if (nextUpdateTime < currentTime) {
+ internalExpireCache();
+ nextUpdateTime = currentTime + updateFrequency;
+ }
+ }
+
+ private synchronized void internalExpireCache() {
+ ArrayList<Element<?>> elems = new ArrayList<Element<?>>(elementMap.values());
+
+ for (Element<?> elem : elems) {
+ if (shouldExpire(elem)) {
+ elementMap.remove(elem.getKey());
+ }
+ }
+ }
+
+ private boolean shouldExpire(Element<?> elem) {
+ if (expireTimeToLive > -1) {
+ if (System.currentTimeMillis() - elem.getCreationTime() > expireTimeToLive) {
+ return true;
+ }
+ }
+ if (expireTimeToIdle > -1) {
+ if (System.currentTimeMillis() - elem.getLastUpdateTime() > expireTimeToIdle) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
\ No newline at end of file
src/java/azkaban/utils/cache/CacheManager.java 116(+116 -0)
diff --git a/src/java/azkaban/utils/cache/CacheManager.java b/src/java/azkaban/utils/cache/CacheManager.java
new file mode 100644
index 0000000..9b38577
--- /dev/null
+++ b/src/java/azkaban/utils/cache/CacheManager.java
@@ -0,0 +1,116 @@
+package azkaban.utils.cache;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class CacheManager {
+ // Thread that expires caches at
+ private static final long UPDATE_FREQUENCY = 30000; // Every 30 sec by default.
+
+ private long updateFrequency = UPDATE_FREQUENCY;
+ private Set<Cache> caches;
+ private static CacheManager manager = null;
+ private final CacheManagerThread updaterThread;
+
+ private boolean activeExpiry = false;
+
+ public static CacheManager getInstance() {
+ if (manager == null) {
+ manager = new CacheManager();
+ }
+
+ return manager;
+ }
+
+ private CacheManager() {
+ updaterThread = new CacheManagerThread();
+ caches = new HashSet<Cache>();
+
+ updaterThread.start();
+ }
+
+ public static void setUpdateFrequency(long updateFreqMs) {
+ manager.internalUpdateFrequency(updateFreqMs);
+ }
+
+ public static void shutdown() {
+ manager.internalShutdown();
+ }
+
+ public Cache createCache() {
+ Cache cache = new Cache(manager);
+ manager.internalAddCache(cache);
+ return cache;
+ }
+
+ public void removeCache(Cache cache) {
+ manager.internalRemoveCache(cache);
+ }
+
+ private void internalUpdateFrequency(long updateFreq) {
+ updateFrequency = updateFreq;
+ updaterThread.interrupt();
+ }
+
+ private void internalAddCache(Cache cache) {
+ caches.add(cache);
+ updaterThread.interrupt();
+ }
+
+ private void internalRemoveCache(Cache cache) {
+ caches.remove(cache);
+ }
+
+ private synchronized void internalShutdown() {
+ updaterThread.shutdown();
+ }
+
+ /* package */synchronized void update() {
+ boolean activeExpiry = false;
+ for (Cache cache : caches) {
+ if (cache.getExpireTimeToIdle() > 0
+ || cache.getExpireTimeToLive() > 0) {
+ activeExpiry = true;
+ break;
+ }
+ }
+
+ if (this.activeExpiry != activeExpiry && activeExpiry) {
+ this.activeExpiry = activeExpiry;
+ updaterThread.interrupt();
+ }
+ }
+
+ private class CacheManagerThread extends Thread {
+ private boolean shutdown = false;
+
+ public void run() {
+ while (!shutdown) {
+ if (activeExpiry) {
+ for (Cache cache : caches) {
+ cache.expireCache();
+ }
+
+ synchronized (this) {
+ try {
+ wait(updateFrequency);
+ } catch (InterruptedException e) {
+ }
+ }
+ } else {
+ synchronized (this) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+ }
+
+ public void shutdown() {
+ this.shutdown = true;
+ updaterThread.interrupt();
+ }
+ }
+}
\ No newline at end of file
src/java/azkaban/utils/cache/Element.java 32(+32 -0)
diff --git a/src/java/azkaban/utils/cache/Element.java b/src/java/azkaban/utils/cache/Element.java
new file mode 100644
index 0000000..14c2ad9
--- /dev/null
+++ b/src/java/azkaban/utils/cache/Element.java
@@ -0,0 +1,32 @@
+package azkaban.utils.cache;
+
+public class Element<T> {
+ private Object key;
+ private T element;
+ private long creationTime = 0;
+ private long lastAccessTime = 0;
+
+ public Element(Object key, T element) {
+ this.key = key;
+ creationTime = System.currentTimeMillis();
+ lastAccessTime = creationTime;
+ this.element = element;
+ }
+
+ public Object getKey() {
+ return key;
+ }
+
+ public T getElement() {
+ lastAccessTime = System.currentTimeMillis();
+ return element;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ public long getLastUpdateTime() {
+ return lastAccessTime;
+ }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 8810352..8e87919 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -63,7 +63,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
private String color;
private List<ViewerPlugin> viewerPlugins;
-
+
/**
* To retrieve the application for the servlet
*
@@ -92,7 +92,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
viewerPlugins = server.getViewerPlugins();
}
}
-
+
/**
* Checks for the existance of the parameter in the request
*
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 891e896..8ce912b 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -16,7 +16,11 @@
package azkaban.webapp.servlet;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.Writer;
import java.util.HashMap;
import java.util.Map;
@@ -29,6 +33,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.fileupload.servlet.ServletFileUpload;
+import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import azkaban.project.Project;
@@ -51,6 +56,18 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
private static final String SESSION_ID_NAME = "azkaban.browser.session.id";
private static final int DEFAULT_UPLOAD_DISK_SPOOL_SIZE = 20 * 1024 * 1024;
+ private static HashMap<String, String> contextType = new HashMap<String,String>();
+ static {
+ contextType.put(".js", "application/javascript");
+ contextType.put(".css", "text/css");
+ contextType.put(".png", "image/png");
+ contextType.put(".jpeg", "image/jpeg");
+ contextType.put(".jpg", "image/jpeg");
+ }
+
+
+ private File webResourceDirectory = null;
+
private MultipartParser multipartParser;
@Override
@@ -60,6 +77,10 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
}
+ public void setResourceDirectory(File file) {
+ this.webResourceDirectory = file;
+ }
+
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// Set session id
@@ -74,6 +95,10 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
if (session != null) {
logger.info("Found session " + session.getUser());
+ if (handleFileGet(req, resp)) {
+ return;
+ }
+
handleGet(req, resp, session);
} else {
if (hasParam(req, "ajax")) {
@@ -87,6 +112,46 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
}
}
+ private boolean handleFileGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ if (webResourceDirectory == null) {
+ return false;
+ }
+
+ // Check if it's a resource
+ String prefix = req.getContextPath() + req.getServletPath();
+ String path = req.getRequestURI().substring(prefix.length());
+ int index = path.lastIndexOf('.');
+ if (index == -1 ) {
+ return false;
+ }
+
+ String extension = path.substring(index);
+ if (contextType.containsKey(extension)) {
+ File file = new File(webResourceDirectory, path);
+ if (!file.exists() || !file.isFile()) {
+ return false;
+ }
+
+ resp.setContentType(contextType.get(extension));
+
+ OutputStream output = resp.getOutputStream();
+ BufferedInputStream input = null;
+ try {
+ input = new BufferedInputStream(new FileInputStream(file));
+ IOUtils.copy(input, output);
+ }
+ finally {
+ if (input != null) {
+ input.close();
+ }
+ }
+ output.flush();
+ return true;
+ }
+
+ return false;
+ }
+
private Session getSessionFromRequest(HttpServletRequest req) throws ServletException {
String remoteIp = req.getRemoteAddr();
Cookie cookie = getCookieByName(req, SESSION_ID_NAME);
diff --git a/src/java/azkaban/webapp/session/SessionCache.java b/src/java/azkaban/webapp/session/SessionCache.java
index e110be2..be76c55 100644
--- a/src/java/azkaban/webapp/session/SessionCache.java
+++ b/src/java/azkaban/webapp/session/SessionCache.java
@@ -17,11 +17,11 @@
package azkaban.webapp.session;
import azkaban.utils.Props;
-import net.sf.ehcache.Cache;
-import net.sf.ehcache.CacheManager;
-import net.sf.ehcache.Element;
-import net.sf.ehcache.config.CacheConfiguration;
-import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+import azkaban.utils.cache.Cache;
+import azkaban.utils.cache.CacheManager;
+import azkaban.utils.cache.Cache.EjectionPolicy;
+import azkaban.utils.cache.Element;
+
/**
* Cache for web session.
@@ -33,8 +33,8 @@ import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
*/
public class SessionCache {
private static final int MAX_NUM_SESSIONS = 10000;
- private static final int SESSION_TIME_TO_LIVE = 86400;
- private CacheManager manager = CacheManager.create();
+ private static final int SESSION_TIME_TO_LIVE = 10000;
+// private CacheManager manager = CacheManager.create();
private Cache cache;
/**
@@ -43,18 +43,12 @@ public class SessionCache {
* @param props
*/
public SessionCache(Props props) {
- // disable ehcache auto update
- System.setProperty("net.sf.ehcache.skipUpdateCheck", "true");
- CacheConfiguration config = new CacheConfiguration();
- config.setName("sessionCache");
- config.setMaxEntriesLocalHeap(props.getInt("max.num.sessions", MAX_NUM_SESSIONS));
- config.setTimeToLiveSeconds(props.getInt("session.time.to.live", SESSION_TIME_TO_LIVE));
- config.eternal(false);
- config.diskPersistent(false);
- config.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU);
-
- cache = new Cache(config);
- manager.addCache(cache);
+ CacheManager manager = CacheManager.getInstance();
+
+ cache = manager.createCache();
+ cache.setEjectionPolicy(EjectionPolicy.LRU);
+ cache.setMaxCacheSize(props.getInt("max.num.sessions", MAX_NUM_SESSIONS));
+ cache.setExpiryTimeToLiveMs(props.getInt("session.time.to.live", SESSION_TIME_TO_LIVE));
}
/**
@@ -64,12 +58,9 @@ public class SessionCache {
* @return
*/
public Session getSession(String sessionId) {
- Element elem = cache.get(sessionId);
- if (elem == null) {
- return null;
- }
+ Session elem = cache.<Session>get(sessionId);
- return (Session) elem.getObjectValue();
+ return elem;
}
/**
@@ -79,8 +70,7 @@ public class SessionCache {
* @param session
*/
public void addSession(Session session) {
- Element elem = new Element(session.getSessionId(), session);
- cache.put(elem);
+ cache.put(session.getSessionId(), session);
}
/**
src/sql/update.schedules.2.2.sql 2(+1 -1)
diff --git a/src/sql/update.schedules.2.2.sql b/src/sql/update.schedules.2.2.sql
index 98849db..d719119 100644
--- a/src/sql/update.schedules.2.2.sql
+++ b/src/sql/update.schedules.2.2.sql
@@ -1,3 +1,3 @@
ALTER TABLE schedules DROP PRIMARY KEY;
ALTER TABLE schedules ADD COLUMN schedule_id INT PRIMARY KEY NOT NULL AUTO_INCREMENT;
-ALTER TABLE schedules ADD INDEX project_id (project_id, flow_name);
+ALTER TABLE schedules ADD INDEX schedule_project_id (project_id, flow_name);