azkaban-developers
Changes
.travis.yml 6(+6 -0)
azkaban-common/src/main/c/execute-as-user.c 125(+125 -0)
azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java 25(+24 -1)
azkaban-test/src/test/resources/azkaban/test/executions/embedded2/pipelineEmbeddedFlow3.job 0(+0 -0)
azkaban-test/src/test/resources/azkaban/test/executions/execpropstest/subdir/shared.properties 0(+0 -0)
build.gradle 53(+47 -6)
gradle.properties 2(+1 -1)
gradle/wrapper/gradle-wrapper.jar 0(+0 -0)
gradlew 6(+1 -5)
Details
.travis.yml 6(+6 -0)
diff --git a/.travis.yml b/.travis.yml
index 09aad73..d7354f9 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,2 +1,8 @@
languages: java
+# TODO(dzc): Add openjdk8 once it is available in Travis.
+jdk:
+ - openjdk7
+ - oraclejdk7
+ - oraclejdk8
+sudo: false
script: ./gradlew distTar
azkaban-common/src/main/c/execute-as-user.c 125(+125 -0)
diff --git a/azkaban-common/src/main/c/execute-as-user.c b/azkaban-common/src/main/c/execute-as-user.c
new file mode 100644
index 0000000..f27d5a1
--- /dev/null
+++ b/azkaban-common/src/main/c/execute-as-user.c
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+#include <dirent.h>
+#include <fcntl.h>
+#include <fts.h>
+#include <errno.h>
+#include <grp.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <limits.h>
+#include <sys/stat.h>
+#include <sys/mount.h>
+#include <sys/types.h>
+#include <pwd.h>
+
+FILE *LOGFILE = NULL;
+FILE *ERRORFILE = NULL;
+int SETUID_OPER_FAILED = 10;
+int USER_NOT_FOUND = 20;
+int INVALID_INPUT = 30;
+
+/*
+ * Change the real and effective user and group from super user to the specified user
+ *
+ * Adopted from:
+ * ./hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
+ *
+ */
+
+int change_user(uid_t user, gid_t group) {
+ if (user == getuid() && user == geteuid() &&
+ group == getgid() && group == getegid()) {
+ return 0;
+ }
+
+ if (seteuid(0) != 0) {
+ fprintf(LOGFILE, "unable to reacquire root - %s\n", strerror(errno));
+ fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
+ getuid(), getgid(), geteuid(), getegid());
+ return SETUID_OPER_FAILED;
+ }
+ if (setgid(group) != 0) {
+ fprintf(LOGFILE, "unable to set group to %d - %s\n", group,
+ strerror(errno));
+ fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
+ getuid(), getgid(), geteuid(), getegid());
+ return SETUID_OPER_FAILED;
+ }
+ if (setuid(user) != 0) {
+ fprintf(LOGFILE, "unable to set user to %d - %s\n", user, strerror(errno));
+ fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
+ getuid(), getgid(), geteuid(), getegid());
+ return SETUID_OPER_FAILED;
+ }
+
+ return 0;
+}
+
+int main(int argc, char **argv){
+
+ // set up the logging stream
+ if (!LOGFILE){
+ LOGFILE=stdout;
+ }
+ if (!ERRORFILE){
+ ERRORFILE=stderr;
+ }
+
+ if (argc < 3) {
+ fprintf(ERRORFILE, "Requires at least 3 variables: ./execute-as-user uid command [args]");
+ return INVALID_INPUT;
+ }
+
+ char *uid = argv[1];
+
+ // gather information about user
+ struct passwd *user_info = getpwnam(uid);
+ if (user_info == NULL){
+ fprintf(LOGFILE, "user does not exist: %s", uid);
+ return USER_NOT_FOUND;
+ }
+
+ // try to change user
+ fprintf(LOGFILE, "Changing user: user: %s, uid: %d, gid: %d\n", uid, user_info->pw_uid, user_info->pw_gid);
+ int retval = change_user(user_info->pw_uid, user_info->pw_gid);
+ if (retval != 0){
+ fprintf(LOGFILE, "Error changing user to %s\n", uid);
+ return SETUID_OPER_FAILED;
+ }
+
+ // execute the command
+ char **user_argv = &argv[2];
+ fprintf(LOGFILE, "user command starting from: %s\n", user_argv[0]);
+ fflush(LOGFILE);
+ retval = execvp(*user_argv, user_argv);
+ fprintf(LOGFILE, "system call return value: %d\n", retval);
+
+ // sometimes system(cmd) returns 256, which is interpreted to 0, making a failed job a successful job
+ // hence this goofy piece of if statement.
+ if (retval != 0){
+ return 1;
+ }
+ else{
+ return 0;
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 6120002..80e8167 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -171,6 +171,20 @@ public interface ExecutorLoader {
public void removeActiveExecutableReference(int execId)
throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Unset executor Id for an execution
+ * Note:-
+ * throws an Exception in case of a SQL issue
+ * </pre>
+ *
+ * @param executorId
+ * @param execId
+ * @throws ExecutorManagerException
+ */
+ public void unassignExecutor(int executionId) throws ExecutorManagerException;
+
/**
* <pre>
* Set an executor Id to an execution
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 39dd831..cffabe7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -74,12 +74,14 @@ public class ExecutorManager extends EventHandler implements
"azkaban.use.multiple.executors";
private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
"azkaban.webserver.queue.size";
- private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS =
+ private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS =
"azkaban.activeexecutor.refresh.milisecinterval";
- private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW =
+ private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
"azkaban.activeexecutor.refresh.flowinterval";
private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
"azkaban.executorinfo.refresh.maxThreads";
+ private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED =
+ "azkaban.maxDispatchingErrors";
private static Logger logger = Logger.getLogger(ExecutorManager.class);
private ExecutorLoader executorLoader;
@@ -99,7 +101,7 @@ public class ExecutorManager extends EventHandler implements
private ExecutingManagerUpdaterThread executingManager;
// 12 weeks
private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
- * 24 * 60 * 60 * 1000l;
+ * 24 * 60 * 60 * 1000L;
private long lastCleanerThreadCheckTime = -1;
private long lastThreadCheckTime = -1;
@@ -115,20 +117,19 @@ public class ExecutorManager extends EventHandler implements
private long lastSuccessfulExecutorInfoRefresh;
private ExecutorService executorInforRefresherService;
- public ExecutorManager(Props props, ExecutorLoader loader,
- Map<String, Alerter> alters) throws ExecutorManagerException {
- azkProps = props;
+ public ExecutorManager(Props azkProps, ExecutorLoader loader,
+ Map<String, Alerter> alerters) throws ExecutorManagerException {
+ this.alerters = alerters;
+ this.azkProps = azkProps;
this.executorLoader = loader;
this.setupExecutors();
this.loadRunningFlows();
queuedFlows =
- new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+ new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
this.loadQueuedFlows();
- alerters = alters;
-
- cacheDir = new File(props.getString("cache.directory", "cache"));
+ cacheDir = new File(azkProps.getString("cache.directory", "cache"));
executingManager = new ExecutingManagerUpdaterThread();
executingManager.start();
@@ -138,7 +139,7 @@ public class ExecutorManager extends EventHandler implements
}
long executionLogsRetentionMs =
- props.getLong("execution.logs.retention.ms",
+ azkProps.getLong("execution.logs.retention.ms",
DEFAULT_EXECUTION_LOGS_RETENTION_MS);
cleanerThread = new CleanerThread(executionLogsRetentionMs);
@@ -172,8 +173,9 @@ public class ExecutorManager extends EventHandler implements
queueProcessor =
new QueueProcessorThread(azkProps.getBoolean(
AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
- AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000), azkProps.getInt(
- AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW, 1000));
+ AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000), azkProps.getInt(
+ AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5), azkProps.getInt(
+ AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED, activeExecutors.size()));
queueProcessor.start();
}
@@ -241,7 +243,7 @@ public class ExecutorManager extends EventHandler implements
@Override
public String call() throws Exception {
return callExecutorForJsonString(executor.getHost(),
- executor.getPort(), "/serverstastics", null);
+ executor.getPort(), "/serverStatistics", null);
}
});
futures.add(new Pair<Executor, Future<String>>(executor,
@@ -251,12 +253,14 @@ public class ExecutorManager extends EventHandler implements
boolean wasSuccess = true;
for (Pair<Executor, Future<String>> refreshPair : futures) {
Executor executor = refreshPair.getFirst();
+ executor.setExecutorInfo(null); // invalidate cached EXecutorInfo
try {
// max 5 secs
String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
- logger.info("Successfully refreshed ExecutorInfo for executor: "
- + executor);
+ logger.info(String.format(
+ "Successfully refreshed executor: %s with executor info : %s",
+ executor, jsonString));
} catch (TimeoutException e) {
wasSuccess = false;
logger.error("Timed out while waiting for ExecutorInfo refresh"
@@ -425,13 +429,6 @@ public class ExecutorManager extends EventHandler implements
private void loadRunningFlows() throws ExecutorManagerException {
runningFlows.putAll(executorLoader.fetchActiveFlows());
- // Finalize all flows which were running on an executor which is now
- // inactive
- for (Pair<ExecutionReference, ExecutableFlow> pair : runningFlows.values()) {
- if (!activeExecutors.contains(pair.getFirst().getExecutor())) {
- finalizeFlows(pair.getSecond());
- }
- }
}
/*
@@ -450,7 +447,8 @@ public class ExecutorManager extends EventHandler implements
/**
* Gets a list of all the active (running flows and non-dispatched flows)
- * executions for a given project and flow {@inheritDoc}
+ * executions for a given project and flow {@inheritDoc}. Results should
+ * be sorted as we assume this while setting up pipelined execution Id.
*
* @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int,
* java.lang.String)
@@ -462,6 +460,7 @@ public class ExecutorManager extends EventHandler implements
queuedFlows.getAllEntries()));
executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
runningFlows.values()));
+ Collections.sort(executionIds);
return executionIds;
}
@@ -1035,14 +1034,11 @@ public class ExecutorManager extends EventHandler implements
executorLoader.addActiveExecutableReference(reference);
queuedFlows.enqueue(exflow, reference);
} else {
- Executor executor = activeExecutors.iterator().next();
// assign only local executor we have
- reference.setExecutor(executor);
+ Executor choosenExecutor = activeExecutors.iterator().next();
executorLoader.addActiveExecutableReference(reference);
try {
- callExecutorServer(exflow, executor, ConnectorParams.EXECUTE_ACTION);
- runningFlows.put(exflow.getExecutionId(),
- new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+ dispatch(reference, exflow, choosenExecutor);
} catch (ExecutorManagerException e) {
executorLoader.removeActiveExecutableReference(reference
.getExecId());
@@ -1729,13 +1725,43 @@ public class ExecutorManager extends EventHandler implements
}
}
+ /**
+ * Calls executor to dispatch the flow, update db to assign the executor and
+ * in-memory state of executableFlow
+ */
+ private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
+ Executor choosenExecutor) throws ExecutorManagerException {
+ exflow.setUpdateTime(System.currentTimeMillis());
+
+ executorLoader.assignExecutor(choosenExecutor.getId(),
+ exflow.getExecutionId());
+ try {
+ callExecutorServer(exflow, choosenExecutor,
+ ConnectorParams.EXECUTE_ACTION);
+ } catch (ExecutorManagerException ex) {
+ logger.error("Rolling back executor assignment for execution id:"
+ + exflow.getExecutionId(), ex);
+ executorLoader.unassignExecutor(exflow.getExecutionId());
+ throw new ExecutorManagerException(ex);
+ }
+ reference.setExecutor(choosenExecutor);
+
+ // move from flow to running flows
+ runningFlows.put(exflow.getExecutionId(),
+ new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+
+ logger.info(String.format(
+ "Successfully dispatched exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
+ }
+
/*
* This thread is responsible for processing queued flows using dispatcher and
* making rest api calls to executor server
*/
private class QueueProcessorThread extends Thread {
private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
- private static final int MAX_DISPATCHING_ERRORS_PERMITTED = 5;
+ private final int maxDispatchingErrors;
private final long activeExecutorRefreshWindowInMilisec;
private final int activeExecutorRefreshWindowInFlows;
@@ -1744,8 +1770,10 @@ public class ExecutorManager extends EventHandler implements
public QueueProcessorThread(boolean isActive,
long activeExecutorRefreshWindowInTime,
- int activeExecutorRefreshWindowInFlows) {
+ int activeExecutorRefreshWindowInFlows,
+ int maxDispatchingErrors) {
setActive(isActive);
+ this.maxDispatchingErrors = maxDispatchingErrors;
this.activeExecutorRefreshWindowInFlows =
activeExecutorRefreshWindowInFlows;
this.activeExecutorRefreshWindowInMilisec =
@@ -1790,7 +1818,7 @@ public class ExecutorManager extends EventHandler implements
private void processQueuedFlows(long activeExecutorsRefreshWindow,
int maxContinuousFlowProcessed) throws InterruptedException,
ExecutorManagerException {
- long lastExecutorRefreshTime = System.currentTimeMillis();
+ long lastExecutorRefreshTime = 0;
Pair<ExecutionReference, ExecutableFlow> runningCandidate;
int currentContinuousFlowProcessed = 0;
@@ -1811,15 +1839,42 @@ public class ExecutorManager extends EventHandler implements
currentContinuousFlowProcessed = 0;
}
- exflow.setUpdateTime(currentTime);
- // process flow with current snapshot of activeExecutors
- processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
- currentContinuousFlowProcessed++;
+ /**
+ * <pre>
+ * TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
+ * Currently we try each queued flow once to infer a global busy state
+ * Possible improvements:-
+ * 1. Move system level filters in refreshExecutors and sleep if we have all executors busy after refresh
+ * 2. Implement GlobalSystemState in selector or in a third place to manage system filters. Basically
+ * taking out all the filters which do not depend on the flow but are still being part of Selector.
+ * Assumptions:-
+ * 1. no one else except QueueProcessor is updating ExecutableFlow update time
+ * 2. re-attempting a flow (which has been tried before) is considered as all executors are busy
+ * </pre>
+ */
+ if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
+ // put back in the queue
+ queuedFlows.enqueue(exflow, reference);
+ long sleepInterval =
+ activeExecutorsRefreshWindow
+ - (currentTime - lastExecutorRefreshTime);
+ // wait till next executor refresh
+ sleep(sleepInterval);
+ } else {
+ exflow.setUpdateTime(currentTime);
+ // process flow with current snapshot of activeExecutors
+ selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+ }
+
+ // do not count failed flow processsing (flows still in queue)
+ if(queuedFlows.getFlow(exflow.getExecutionId()) == null) {
+ currentContinuousFlowProcessed++;
+ }
}
}
/* process flow with a snapshot of available Executors */
- private void processFlow(ExecutionReference reference,
+ private void selectExecutorAndDispatchFlow(ExecutionReference reference,
ExecutableFlow exflow, Set<Executor> availableExecutors)
throws ExecutorManagerException {
synchronized (exflow) {
@@ -1889,7 +1944,7 @@ public class ExecutorManager extends EventHandler implements
logger.info("Using dispatcher for execution id :"
+ exflow.getExecutionId());
ExecutorSelector selector = new ExecutorSelector(filterList, comparatorWeightsMap);
- choosenExecutor = selector.getBest(activeExecutors, exflow);
+ choosenExecutor = selector.getBest(availableExecutors, exflow);
}
return choosenExecutor;
}
@@ -1903,14 +1958,14 @@ public class ExecutorManager extends EventHandler implements
"Reached handleDispatchExceptionCase stage for exec %d with error count %d",
exflow.getExecutionId(), reference.getNumErrors()));
reference.setNumErrors(reference.getNumErrors() + 1);
- if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED
+ if (reference.getNumErrors() > this.maxDispatchingErrors
|| remainingExecutors.size() <= 1) {
logger.error("Failed to process queued flow");
finalizeFlows(exflow);
} else {
remainingExecutors.remove(lastSelectedExecutor);
// try other executors except chosenExecutor
- processFlow(reference, exflow, remainingExecutors);
+ selectExecutorAndDispatchFlow(reference, exflow, remainingExecutors);
}
}
@@ -1925,23 +1980,5 @@ public class ExecutorManager extends EventHandler implements
// schedule can starve all others
queuedFlows.enqueue(exflow, reference);
}
-
- private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
- Executor choosenExecutor) throws ExecutorManagerException {
- exflow.setUpdateTime(System.currentTimeMillis());
- callExecutorServer(exflow, choosenExecutor,
- ConnectorParams.EXECUTE_ACTION);
- executorLoader.assignExecutor(choosenExecutor.getId(),
- exflow.getExecutionId());
- reference.setExecutor(choosenExecutor);
-
- // move from flow to running flows
- runningFlows.put(exflow.getExecutionId(),
- new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
-
- logger.info(String.format(
- "Successfully dispatched exec %d with error count %d",
- exflow.getExecutionId(), reference.getNumErrors()));
- }
}
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 7740163..a588cca 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -104,7 +104,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
runner.query(connection, LastInsertID.LAST_INSERT_ID,
new LastInsertID());
- if (id == -1l) {
+ if (id == -1L) {
throw new ExecutorManagerException(
"Execution id is not properly created.");
}
@@ -1061,7 +1061,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public Long handle(ResultSet rs) throws SQLException {
if (!rs.next()) {
- return -1l;
+ return -1L;
}
long id = rs.getLong(1);
return id;
@@ -1559,4 +1559,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return events;
}
}
+
+ /**
+ *
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorLoader#unassignExecutor(int)
+ */
+ @Override
+ public void unassignExecutor(int executionId) throws ExecutorManagerException {
+ final String UPDATE =
+ "UPDATE execution_flows SET executor_id=NULL where exec_id=?";
+
+ QueryRunner runner = createQueryRunner();
+ try {
+ int rows = runner.update(UPDATE, executionId);
+ if (rows == 0) {
+ throw new ExecutorManagerException(String.format(
+ "Failed to unassign executor for execution : %d ", executionId));
+ }
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error updating execution id "
+ + executionId, e);
+ }
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
index 2f89129..f83788f 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
@@ -66,7 +66,7 @@ public abstract class CandidateComparator<T> implements Comparator<T> {
// add or replace the Comparator.
this.factorComparatorList.put(comparator.getFactorName(),comparator);
- logger.info(String.format("Factor comparator added for '%s'. Weight = '%s'",
+ logger.debug(String.format("Factor comparator added for '%s'. Weight = '%s'",
comparator.getFactorName(), comparator.getWeight()));
}
@@ -104,7 +104,7 @@ public abstract class CandidateComparator<T> implements Comparator<T> {
* @return a pair structure contains the score for both sides.
* */
public Pair<Integer,Integer> getComparisonScore(T object1, T object2){
- logger.info(String.format("start comparing '%s' with '%s', total weight = %s ",
+ logger.debug(String.format("start comparing '%s' with '%s', total weight = %s ",
object1 == null ? "(null)" : object1.toString(),
object2 == null ? "(null)" : object2.toString(),
this.getTotalWeight()));
@@ -114,16 +114,16 @@ public abstract class CandidateComparator<T> implements Comparator<T> {
// short cut if object equals.
if (object1 == object2){
- logger.info("[Comparator] same object.");
+ logger.debug("[Comparator] same object.");
} else
// left side is null.
if (object1 == null){
- logger.info("[Comparator] left side is null, right side gets total weight.");
+ logger.debug("[Comparator] left side is null, right side gets total weight.");
result2 = this.getTotalWeight();
} else
// right side is null.
if (object2 == null){
- logger.info("[Comparator] right side is null, left side gets total weight.");
+ logger.debug("[Comparator] right side is null, left side gets total weight.");
result1 = this.getTotalWeight();
} else
// both side is not null,put them thru the full loop
@@ -133,20 +133,20 @@ public abstract class CandidateComparator<T> implements Comparator<T> {
int result = comparator.compare(object1, object2);
result1 = result1 + (result > 0 ? comparator.getWeight() : 0);
result2 = result2 + (result < 0 ? comparator.getWeight() : 0);
- logger.info(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
+ logger.debug(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
comparator.getFactorName(), result, result1, result2));
}
}
// in case of same score, use tie-breaker to stabilize the result.
if (result1 == result2){
boolean result = this.tieBreak(object1, object2);
- logger.info("[TieBreaker] TieBreaker chose " +
+ logger.debug("[TieBreaker] TieBreaker chose " +
(result? String.format("left side (%s)", null== object1 ? "null": object1.toString()) :
String.format("right side (%s)", null== object2 ? "null": object2.toString()) ));
if (result) result1++; else result2++;
}
- logger.info(String.format("Result : %s vs %s ",result1,result2));
+ logger.debug(String.format("Result : %s vs %s ",result1,result2));
return new Pair<Integer,Integer>(result1,result2);
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
index 94b8dfa..f927a2a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
@@ -51,7 +51,7 @@ public abstract class CandidateFilter<T,V> {
// add or replace the filter.
this.factorFilterList.put(filter.getFactorName(),filter);
- logger.info(String.format("Factor filter added for '%s'.",
+ logger.debug(String.format("Factor filter added for '%s'.",
filter.getFactorName()));
}
@@ -62,7 +62,7 @@ public abstract class CandidateFilter<T,V> {
* @return true if the check passed, false if check failed, which means the item need to be filtered.
* */
public boolean filterTarget(T filteringTarget, V referencingObject){
- logger.info(String.format("start filtering '%s' with factor filter for '%s'",
+ logger.debug(String.format("start filtering '%s' with factor filter for '%s'",
filteringTarget == null ? "(null)" : filteringTarget.toString(),
this.getName()));
@@ -70,13 +70,13 @@ public abstract class CandidateFilter<T,V> {
boolean result = true;
for (FactorFilter<T,V> filter : filterList){
result &= filter.filterTarget(filteringTarget,referencingObject);
- logger.info(String.format("[Factor: %s] filter result : %s ",
+ logger.debug(String.format("[Factor: %s] filter result : %s ",
filter.getFactorName(), result));
if (!result){
break;
}
}
- logger.info(String.format("Final filtering result : %s ",result));
+ logger.debug(String.format("Final filtering result : %s ",result));
return result;
}
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
index 5cae9ef..8fa91d0 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
@@ -50,8 +50,8 @@ public class CandidateSelector<K extends Comparable<K>, V> implements Selector<K
return null;
}
- logger.info("start candidate selection logic.");
- logger.info(String.format("candidate count before filtering: %s", candidateList.size()));
+ logger.debug("start candidate selection logic.");
+ logger.debug(String.format("candidate count before filtering: %s", candidateList.size()));
// to keep the input untouched, we will form up a new list based off the filtering result.
Collection<K> filteredList = new ArrayList<K>();
@@ -64,22 +64,22 @@ public class CandidateSelector<K extends Comparable<K>, V> implements Selector<K
}
} else{
filteredList = candidateList;
- logger.info("skipping the candidate filtering as the filter object is not specifed.");
+ logger.debug("skipping the candidate filtering as the filter object is not specifed.");
}
- logger.info(String.format("candidate count after filtering: %s", filteredList.size()));
+ logger.debug(String.format("candidate count after filtering: %s", filteredList.size()));
if (filteredList.size() == 0){
- logger.info("failed to select candidate as the filtered candidate list is empty.");
+ logger.debug("failed to select candidate as the filtered candidate list is empty.");
return null;
}
if (null == comparator){
- logger.info("candidate comparator is not specified, default hash code comparator class will be used.");
+ logger.debug("candidate comparator is not specified, default hash code comparator class will be used.");
}
// final work - find the best candidate from the filtered list.
K executor = Collections.max(filteredList, comparator);
- logger.info(String.format("candidate selected %s",
+ logger.debug(String.format("candidate selected %s",
null == executor ? "(null)" : executor.toString()));
return executor;
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
index eafab67..978bcb9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -122,14 +122,14 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
result = 0 ;
// both doesn't expose the info
if (null == statisticsObj1 && null == statisticsObj2){
- logger.info(String.format("%s : neither of the executors exposed statistics info.",
+ logger.debug(String.format("%s : neither of the executors exposed statistics info.",
caller));
return true;
}
//right side doesn't expose the info.
if (null == statisticsObj2 ){
- logger.info(String.format("%s : choosing left side and the right side executor doesn't expose statistics info",
+ logger.debug(String.format("%s : choosing left side and the right side executor doesn't expose statistics info",
caller));
result = 1;
return true;
@@ -137,7 +137,7 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
//left side doesn't expose the info.
if (null == statisticsObj1 ){
- logger.info(String.format("%s : choosing right side and the left side executor doesn't expose statistics info",
+ logger.debug(String.format("%s : choosing right side and the left side executor doesn't expose statistics info",
caller));
result = -1;
return true;
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
index 19baa10..a47bc16 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -42,7 +42,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
// factor filter names.
private static final String STATICREMAININGFLOWSIZE_FILTER_NAME = "StaticRemainingFlowSize";
- private static final String MINIMUMFREEMEMORY_FILTER_NAME = "MinimunFreeMemory";
+ private static final String MINIMUMFREEMEMORY_FILTER_NAME = "MinimumFreeMemory";
private static final String CPUSTATUS_FILTER_NAME = "CpuStatus";
/**<pre>
@@ -98,13 +98,13 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
return FactorFilter.create(STATICREMAININGFLOWSIZE_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
if (null == filteringTarget){
- logger.info(String.format("%s : filtering out the target as it is null.", STATICREMAININGFLOWSIZE_FILTER_NAME));
+ logger.debug(String.format("%s : filtering out the target as it is null.", STATICREMAININGFLOWSIZE_FILTER_NAME));
return false;
}
ExecutorInfo stats = filteringTarget.getExecutorInfo();
if (null == stats) {
- logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+ logger.debug(String.format("%s : filtering out %s as it's stats is unavailable.",
STATICREMAININGFLOWSIZE_FILTER_NAME,
filteringTarget.toString()));
return false;
@@ -126,13 +126,13 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
private static final int MINIMUM_FREE_MEMORY = 6 * 1024;
public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
if (null == filteringTarget){
- logger.info(String.format("%s : filtering out the target as it is null.", MINIMUMFREEMEMORY_FILTER_NAME));
+ logger.debug(String.format("%s : filtering out the target as it is null.", MINIMUMFREEMEMORY_FILTER_NAME));
return false;
}
ExecutorInfo stats = filteringTarget.getExecutorInfo();
if (null == stats) {
- logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+ logger.debug(String.format("%s : filtering out %s as it's stats is unavailable.",
MINIMUMFREEMEMORY_FILTER_NAME,
filteringTarget.toString()));
return false;
@@ -156,13 +156,13 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
private static final int MAX_CPU_CURRENT_USAGE = 95;
public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
if (null == filteringTarget){
- logger.info(String.format("%s : filtering out the target as it is null.", CPUSTATUS_FILTER_NAME));
+ logger.debug(String.format("%s : filtering out the target as it is null.", CPUSTATUS_FILTER_NAME));
return false;
}
ExecutorInfo stats = filteringTarget.getExecutorInfo();
if (null == stats) {
- logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+ logger.debug(String.format("%s : filtering out %s as it's stats is unavailable.",
MINIMUMFREEMEMORY_FILTER_NAME,
filteringTarget.toString()));
return false;
diff --git a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
index 0c98ecb..8c208a5 100644
--- a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
+++ b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
@@ -133,6 +133,11 @@ public class CommonJobProperties {
* hotspot occurs.
*/
public static final String PROJECT_VERSION = "azkaban.flow.projectversion";
+
+ /**
+ * Find out who is the submit user, in addition to the user.to.proxy (they may be different)
+ */
+ public static final String SUBMIT_USER = "azkaban.flow.submituser";
/**
* A uuid assigned to every execution
diff --git a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
index 7b1462f..a85d1a3 100644
--- a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
+++ b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
@@ -54,8 +54,10 @@ public class JobCallbackValidator {
maxPostBodyLength);
}
- logger.info("Found " + totalCallbackCount + " job callbacks for job "
- + jobName);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Found " + totalCallbackCount + " job callbacks for job "
+ + jobName);
+ }
return totalCallbackCount;
}
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
index e8e2dac..d3ab3f1 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -22,8 +22,8 @@ import java.util.List;
import org.apache.log4j.Logger;
+import azkaban.project.DirectoryFlowLoader;
import azkaban.server.AzkabanServer;
-import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.Utils;
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 4e268a0..a6e4f30 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
+import azkaban.flow.CommonJobProperties;
import azkaban.jobExecutor.utils.process.AzkabanProcess;
import azkaban.jobExecutor.utils.process.AzkabanProcessBuilder;
import azkaban.utils.Pair;
@@ -36,15 +37,32 @@ import azkaban.utils.SystemMemoryInfo;
public class ProcessJob extends AbstractProcessJob {
public static final String COMMAND = "command";
+
private static final long KILL_TIME_MS = 5000;
+
private volatile AzkabanProcess process;
+
private static final String MEMCHECK_ENABLED = "memCheck.enabled";
- private static final String MEMCHECK_FREEMEMDECRAMT = "memCheck.freeMemDecrAmt";
+
+ private static final String MEMCHECK_FREEMEMDECRAMT =
+ "memCheck.freeMemDecrAmt";
+
public static final String AZKABAN_MEMORY_CHECK = "azkaban.memory.check";
+ public static final String NATIVE_LIB_FOLDER = "azkaban.native.lib";
+ public static final String EXECUTE_AS_USER = "execute.as.user";
+ public static final String EXECUTE_AS_USER_OVERRIDE =
+ "execute.as.user.override";
+ public static final String USER_TO_PROXY = "user.to.proxy";
+ public static final String KRB5CCNAME = "KRB5CCNAME";
+
public ProcessJob(final String jobId, final Props sysProps,
final Props jobProps, final Logger log) {
super(jobId, sysProps, jobProps, log);
+
+ // this is in line with what other job types (hadoopJava, spark, pig, hive)
+ // is doing
+ jobProps.put(CommonJobProperties.JOB_ID, jobId);
}
@Override
@@ -55,13 +73,19 @@ public class ProcessJob extends AbstractProcessJob {
handleError("Bad property definition! " + e.getMessage(), e);
}
- if (sysProps.getBoolean(MEMCHECK_ENABLED, true) && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
+ if (sysProps.getBoolean(MEMCHECK_ENABLED, true)
+ && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
long freeMemDecrAmt = sysProps.getLong(MEMCHECK_FREEMEMDECRAMT, 0);
Pair<Long, Long> memPair = getProcMemoryRequirement();
- boolean isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
+ boolean isMemGranted =
+ SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(),
+ memPair.getSecond(), freeMemDecrAmt);
if (!isMemGranted) {
- throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
- memPair.getFirst(), memPair.getSecond(), getId()));
+ throw new Exception(
+ String
+ .format(
+ "Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
+ memPair.getFirst(), memPair.getSecond(), getId()));
}
}
@@ -80,13 +104,45 @@ public class ProcessJob extends AbstractProcessJob {
info(commands.size() + " commands to execute.");
File[] propFiles = initPropsFiles();
+
+ // change krb5ccname env var so that each job execution gets its own cache
Map<String, String> envVars = getEnvironmentVariables();
+ envVars.put(KRB5CCNAME, getKrb5ccname(jobProps));
+
+ // determine whether to run as Azkaban or run as effectiveUser
+ String executeAsUserBinaryPath = null;
+ String effectiveUser = null;
+ boolean isExecuteAsUser = determineExecuteAsUser(sysProps, jobProps);
+
+ if (isExecuteAsUser) {
+ String nativeLibFolder = sysProps.getString(NATIVE_LIB_FOLDER);
+ executeAsUserBinaryPath =
+ String.format("%s/%s", nativeLibFolder, "execute-as-user");
+ effectiveUser = getEffectiveUser(jobProps);
+ if ("root".equals(effectiveUser)) {
+ throw new RuntimeException(
+ "Not permitted to proxy as root through Azkaban");
+ }
+ }
for (String command : commands) {
- info("Command: " + command);
- AzkabanProcessBuilder builder =
- new AzkabanProcessBuilder(partitionCommandLine(command))
- .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog());
+ AzkabanProcessBuilder builder = null;
+ if (isExecuteAsUser) {
+ command =
+ String.format("%s %s %s", executeAsUserBinaryPath, effectiveUser,
+ command);
+ info("Command: " + command);
+ builder =
+ new AzkabanProcessBuilder(partitionCommandLine(command))
+ .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog())
+ .enableExecuteAsUser().setExecuteAsUserBinaryPath(executeAsUserBinaryPath)
+ .setEffectiveUser(effectiveUser);
+ } else {
+ info("Command: " + command);
+ builder =
+ new AzkabanProcessBuilder(partitionCommandLine(command))
+ .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog());
+ }
if (builder.getEnv().size() > 0) {
info("Environment variables: " + builder.getEnv());
@@ -119,10 +175,71 @@ public class ProcessJob extends AbstractProcessJob {
generateProperties(propFiles[1]);
}
+ private boolean determineExecuteAsUser(Props sysProps, Props jobProps) {
+ boolean isExecuteAsUser = sysProps.getBoolean(EXECUTE_AS_USER, false);
+ // putting an override in case user needs to override. A temporary opening
+ if (jobProps.containsKey(EXECUTE_AS_USER_OVERRIDE))
+ isExecuteAsUser = jobProps.getBoolean(EXECUTE_AS_USER_OVERRIDE, false);
+
+ return isExecuteAsUser;
+ }
+
+ /**
+ * <pre>
+ * This method extracts the kerberos ticket cache file name from the jobprops.
+ * This method will ensure that each job execution will have its own kerberos ticket cache file
+ * Given that the code only sets an environmental variable, the number of files created corresponds
+ * to the number of processes that are doing kinit in their flow, which should not be an inordinately
+ * high number.
+ * </pre>
+ *
+ * @return file name: the kerberos ticket cache file to use
+ */
+ private String getKrb5ccname(Props jobProps) {
+ String effectiveUser = getEffectiveUser(jobProps);
+ String projectName =
+ jobProps.getString(CommonJobProperties.PROJECT_NAME).replace(" ", "_");
+ String flowId =
+ jobProps.getString(CommonJobProperties.FLOW_ID).replace(" ", "_");
+ String jobId =
+ jobProps.getString(CommonJobProperties.JOB_ID).replace(" ", "_");
+ // execId should be an int and should not have space in it, ever
+ String execId = jobProps.getString(CommonJobProperties.EXEC_ID);
+ String krb5ccname =
+ String.format("/tmp/krb5cc__%s__%s__%s__%s__%s", projectName, flowId,
+ jobId, execId, effectiveUser);
+
+ return krb5ccname;
+ }
+
+ /**
+ * <pre>
+ * Determines what user id should the process job run as, in the following order of precedence:
+ * 1. USER_TO_PROXY
+ * 2. SUBMIT_USER
+ * </pre>
+ *
+ * @param jobProps
+ * @return the user that Azkaban is going to execute as
+ */
+ private String getEffectiveUser(Props jobProps) {
+ String effectiveUser = null;
+ if (jobProps.containsKey(USER_TO_PROXY)) {
+ effectiveUser = jobProps.getString(USER_TO_PROXY);
+ } else if (jobProps.containsKey(CommonJobProperties.SUBMIT_USER)) {
+ effectiveUser = jobProps.getString(CommonJobProperties.SUBMIT_USER);
+ } else {
+ throw new RuntimeException(
+ "Internal Error: No user.to.proxy or submit.user in the jobProps");
+ }
+ info("effective user is: " + effectiveUser);
+ return effectiveUser;
+ }
+
/**
* This is used to get the min/max memory size requirement by processes.
- * SystemMemoryInfo can use the info to determine if the memory request
- * can be fulfilled. For Java process, this should be Xms/Xmx setting.
+ * SystemMemoryInfo can use the info to determine if the memory request can be
+ * fulfilled. For Java process, this should be Xms/Xmx setting.
*
* @return pair of min/max memory size
*/
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index 9c3f092..9ad88d3 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -40,6 +40,9 @@ import com.google.common.base.Joiner;
* loggers.
*/
public class AzkabanProcess {
+
+ public static String KILL_COMMAND = "kill";
+
private final String workingDir;
private final List<String> cmd;
private final Map<String, String> env;
@@ -50,6 +53,10 @@ public class AzkabanProcess {
private volatile int processId;
private volatile Process process;
+ private boolean isExecuteAsUser = false;
+ private String executeAsUserBinary = null;
+ private String effectiveUser = null;
+
public AzkabanProcess(final List<String> cmd, final Map<String, String> env,
final String workingDir, final Logger logger) {
this.cmd = cmd;
@@ -61,6 +68,15 @@ public class AzkabanProcess {
this.logger = logger;
}
+ public AzkabanProcess(List<String> cmd, Map<String, String> env,
+ String workingDir, Logger logger, String executeAsUserBinary,
+ String effectiveUser) {
+ this(cmd, env, workingDir, logger);
+ this.isExecuteAsUser = true;
+ this.executeAsUserBinary = executeAsUserBinary;
+ this.effectiveUser = effectiveUser;
+ }
+
/**
* Execute this process, blocking until it has completed.
*/
@@ -101,13 +117,20 @@ public class AzkabanProcess {
}
completeLatch.countDown();
- if (exitCode != 0) {
- throw new ProcessFailureException(exitCode, errorGobbler.getRecentLog());
- }
// try to wait for everything to get logged out before exiting
outputGobbler.awaitCompletion(5000);
errorGobbler.awaitCompletion(5000);
+
+ if (exitCode != 0) {
+ String output =
+ new StringBuilder().append("Stdout:\n")
+ .append(outputGobbler.getRecentLog()).append("\n\n")
+ .append("Stderr:\n").append(errorGobbler.getRecentLog())
+ .append("\n").toString();
+ throw new ProcessFailureException(exitCode, output);
+ }
+
} finally {
IOUtils.closeQuietly(process.getInputStream());
IOUtils.closeQuietly(process.getOutputStream());
@@ -155,7 +178,15 @@ public class AzkabanProcess {
checkStarted();
if (processId != 0 && isStarted()) {
try {
- Runtime.getRuntime().exec("kill " + processId);
+ if (isExecuteAsUser) {
+ String cmd =
+ String.format("%s %s %s %d", executeAsUserBinary,
+ effectiveUser, KILL_COMMAND, processId);
+ Runtime.getRuntime().exec(cmd);
+ } else {
+ String cmd = String.format("%s %d", KILL_COMMAND, processId);
+ Runtime.getRuntime().exec(cmd);
+ }
return completeLatch.await(time, unit);
} catch (IOException e) {
logger.error("Kill attempt failed.", e);
@@ -173,7 +204,15 @@ public class AzkabanProcess {
if (isRunning()) {
if (processId != 0) {
try {
- Runtime.getRuntime().exec("kill -9 " + processId);
+ if (isExecuteAsUser) {
+ String cmd =
+ String.format("%s %s %s -9 %d", executeAsUserBinary,
+ effectiveUser, KILL_COMMAND, processId);
+ Runtime.getRuntime().exec(cmd);
+ } else {
+ String cmd = String.format("%s -9 %d", KILL_COMMAND, processId);
+ Runtime.getRuntime().exec(cmd);
+ }
} catch (IOException e) {
logger.error("Kill attempt failed.", e);
}
@@ -234,4 +273,12 @@ public class AzkabanProcess {
return "Process(cmd = " + Joiner.on(" ").join(cmd) + ", env = " + env
+ ", cwd = " + workingDir + ")";
}
+
+ public boolean isExecuteAsUser() {
+ return isExecuteAsUser;
+ }
+
+ public String getEffectiveUser() {
+ return effectiveUser;
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java
index 8832195..9e2c2f7 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java
@@ -35,6 +35,9 @@ public class AzkabanProcessBuilder {
private Map<String, String> env = new HashMap<String, String>();
private String workingDir = System.getProperty("user.dir");
private Logger logger = Logger.getLogger(AzkabanProcess.class);
+ private boolean isExecuteAsUser = false;
+ private String executeAsUserBinaryPath = null;
+ private String effectiveUser = null;
private int stdErrSnippetSize = 30;
private int stdOutSnippetSize = 30;
@@ -100,7 +103,12 @@ public class AzkabanProcessBuilder {
}
public AzkabanProcess build() {
- return new AzkabanProcess(cmd, env, workingDir, logger);
+ if (isExecuteAsUser) {
+ return new AzkabanProcess(cmd, env, workingDir, logger,
+ executeAsUserBinaryPath, effectiveUser);
+ } else {
+ return new AzkabanProcess(cmd, env, workingDir, logger);
+ }
}
public List<String> getCommand() {
@@ -116,4 +124,19 @@ public class AzkabanProcessBuilder {
return "ProcessBuilder(cmd = " + Joiner.on(" ").join(cmd) + ", env = "
+ env + ", cwd = " + workingDir + ")";
}
+
+ public AzkabanProcessBuilder enableExecuteAsUser() {
+ this.isExecuteAsUser = true;
+ return this;
+ }
+
+ public AzkabanProcessBuilder setExecuteAsUserBinaryPath(String executeAsUserBinaryPath) {
+ this.executeAsUserBinaryPath = executeAsUserBinaryPath;
+ return this;
+ }
+
+ public AzkabanProcessBuilder setEffectiveUser(String effectiveUser) {
+ this.effectiveUser = effectiveUser;
+ return this;
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java b/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java
index 212f1a9..f1cf351 100644
--- a/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java
+++ b/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java
@@ -340,6 +340,7 @@ public class JobTypeManager {
jobProps, jobType));
}
+ // TODO: should the logic below mirror the logic for PluginLoadProps?
Props pluginJobProps = pluginSet.getPluginJobProps(jobType);
if (pluginJobProps != null) {
for (String k : pluginJobProps.getKeySet()) {
@@ -354,7 +355,11 @@ public class JobTypeManager {
if (pluginLoadProps != null) {
pluginLoadProps = PropsUtils.resolveProps(pluginLoadProps);
} else {
- pluginLoadProps = new Props();
+ // pluginSet.getCommonPluginLoadProps() will return null if there is no plugins directory.
+ // hence assigning default Props() if that's the case
+ pluginLoadProps = pluginSet.getCommonPluginLoadProps();
+ if(pluginJobProps == null)
+ pluginJobProps = new Props();
}
job =
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 12b72dd..d726216 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -845,7 +845,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
byte[] stringData = json.getBytes("UTF-8");
byte[] data = stringData;
- logger.info("UTF-8 size:" + data.length);
if (defaultEncodingType == EncodingType.GZIP) {
data = GZIPUtils.gzipBytes(stringData);
}
@@ -888,7 +887,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
byte[] stringData = json.getBytes("UTF-8");
byte[] data = stringData;
- logger.info("UTF-8 size:" + data.length);
if (encType == EncodingType.GZIP) {
data = GZIPUtils.gzipBytes(stringData);
}
@@ -1009,7 +1007,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
String propertyJSON = PropsUtils.toJSONString(props, true);
byte[] data = propertyJSON.getBytes("UTF-8");
- logger.info("UTF-8 size:" + data.length);
if (defaultEncodingType == EncodingType.GZIP) {
data = GZIPUtils.gzipBytes(data);
}
@@ -1032,7 +1029,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
String propertyJSON = PropsUtils.toJSONString(props, true);
byte[] data = propertyJSON.getBytes("UTF-8");
- logger.info("UTF-8 size:" + data.length);
if (defaultEncodingType == EncodingType.GZIP) {
data = GZIPUtils.gzipBytes(data);
}
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 0f09b9c..52024f1 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -32,6 +32,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import azkaban.flow.Flow;
+import azkaban.project.DirectoryFlowLoader;
import azkaban.project.ProjectLogEvent.EventType;
import azkaban.project.ProjectWhitelist.WhitelistType;
import azkaban.project.validator.ValidationReport;
@@ -42,7 +43,6 @@ import azkaban.project.validator.XmlValidatorManager;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.User;
-import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.Props;
import azkaban.utils.Utils;
@@ -269,7 +269,7 @@ public class ProjectManager {
"Project names must start with a letter, followed by any number of letters, digits, '-' or '_'.");
}
- if (projectsByName.contains(projectName)) {
+ if (projectsByName.containsKey(projectName)) {
throw new ProjectManagerException("Project already exists.");
}
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
index a6b1a39..039ab3a 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
@@ -22,14 +22,14 @@ import azkaban.utils.Props;
/**
* @author wkang
- *
+ *
* This class manages project whitelist defined in xml config file.
* An single xml config file contains different types of whitelisted
* projects. For additional type of whitelist, modify WhitelistType enum.
- *
+ *
* The xml config file should in the following format. Please note
* the tag <MemoryCheck> is same as the defined enum MemoryCheck
- *
+ *
* <ProjectWhitelist>
* <MemoryCheck>
* <project projectname="project1" />
@@ -84,7 +84,7 @@ public class ProjectWhitelist {
Map<WhitelistType, Set<Integer>> projsWhitelisted = new HashMap<WhitelistType, Set<Integer>>();
NodeList tagList = doc.getChildNodes();
if (!tagList.item(0).getNodeName().equals(PROJECT_WHITELIST_TAG)) {
- throw new RuntimeException("Cannot find tag '" + PROJECT_WHITELIST_TAG + "' in " + xmlFile);
+ throw new RuntimeException("Cannot find tag '" + PROJECT_WHITELIST_TAG + "' in " + xmlFile);
}
NodeList whitelist = tagList.item(0).getChildNodes();
@@ -114,7 +114,7 @@ public class ProjectWhitelist {
NamedNodeMap projectAttrMap = node.getAttributes();
Node projectIdAttr = projectAttrMap.getNamedItem(PROJECTID_ATTR);
if (projectIdAttr == null) {
- throw new RuntimeException("Error loading project. The '" + PROJECTID_ATTR
+ throw new RuntimeException("Error loading project. The '" + PROJECTID_ATTR
+ "' attribute doesn't exist");
}
@@ -127,7 +127,7 @@ public class ProjectWhitelist {
if (projsWhitelisted != null) {
Set<Integer> projs = projsWhitelisted.get(whitelistType);
if (projs != null) {
- return projs.contains(project);
+ return projs.contains(project);
}
}
return false;
@@ -138,6 +138,7 @@ public class ProjectWhitelist {
* the defined enums.
*/
public static enum WhitelistType {
- MemoryCheck
+ MemoryCheck,
+ NumJobPerFlow
}
}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
index 0f240da..cc6d005 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
@@ -24,7 +24,7 @@ import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import azkaban.project.Project;
-import azkaban.utils.DirectoryFlowLoader;
+import azkaban.project.DirectoryFlowLoader;
import azkaban.utils.Props;
/**
diff --git a/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java b/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java
index fc4d64f..20f4496 100644
--- a/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java
+++ b/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java
@@ -16,10 +16,12 @@
package azkaban.server.session;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.Cache;
+
+import java.util.concurrent.TimeUnit;
+
import azkaban.utils.Props;
-import azkaban.utils.cache.Cache;
-import azkaban.utils.cache.CacheManager;
-import azkaban.utils.cache.Cache.EjectionPolicy;
/**
* Cache for web session.
@@ -34,7 +36,7 @@ public class SessionCache {
private static final long SESSION_TIME_TO_LIVE = 24 * 60 * 60 * 1000L;
// private CacheManager manager = CacheManager.create();
- private Cache cache;
+ private Cache<String, Session> cache;
/**
* Constructor taking global props.
@@ -42,13 +44,12 @@ public class SessionCache {
* @param props
*/
public SessionCache(Props props) {
- CacheManager manager = CacheManager.getInstance();
-
- cache = manager.createCache();
- cache.setEjectionPolicy(EjectionPolicy.LRU);
- cache.setMaxCacheSize(props.getInt("max.num.sessions", MAX_NUM_SESSIONS));
- cache.setExpiryTimeToLiveMs(props.getLong("session.time.to.live",
- SESSION_TIME_TO_LIVE));
+ cache = CacheBuilder.newBuilder()
+ .maximumSize(props.getInt("max.num.sessions", MAX_NUM_SESSIONS))
+ .expireAfterAccess(
+ props.getLong("session.time.to.live", SESSION_TIME_TO_LIVE),
+ TimeUnit.MILLISECONDS)
+ .build();
}
/**
@@ -58,8 +59,7 @@ public class SessionCache {
* @return
*/
public Session getSession(String sessionId) {
- Session elem = cache.<Session> get(sessionId);
-
+ Session elem = cache.getIfPresent(sessionId);
return elem;
}
@@ -79,7 +79,7 @@ public class SessionCache {
* @param id
* @return
*/
- public boolean removeSession(String id) {
- return cache.remove(id);
+ public void removeSession(String id) {
+ cache.invalidate(id);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
index 1141176..27b9ba4 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/SlaChecker.java
@@ -20,7 +20,6 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
-
import org.joda.time.DateTime;
import org.joda.time.ReadablePeriod;
@@ -57,7 +56,6 @@ public class SlaChecker implements ConditionChecker {
private Boolean isSlaMissed(ExecutableFlow flow) {
String type = slaOption.getType();
- logger.info("flow is " + flow.getStatus());
if (flow.getStartTime() < 0) {
return Boolean.FALSE;
}
@@ -136,7 +134,6 @@ public class SlaChecker implements ConditionChecker {
private Boolean isSlaGood(ExecutableFlow flow) {
String type = slaOption.getType();
- logger.info("flow is " + flow.getStatus());
if (flow.getStartTime() < 0) {
return Boolean.FALSE;
}
@@ -218,13 +215,11 @@ public class SlaChecker implements ConditionChecker {
}
public Object isSlaFailed() {
- logger.info("Testing if sla failed for execution " + execId);
ExecutableFlow flow;
try {
flow = executorManager.getExecutableFlow(execId);
} catch (ExecutorManagerException e) {
logger.error("Can't get executable flow.", e);
- e.printStackTrace();
// something wrong, send out alerts
return Boolean.TRUE;
}
@@ -232,13 +227,11 @@ public class SlaChecker implements ConditionChecker {
}
public Object isSlaPassed() {
- logger.info("Testing if sla is good for execution " + execId);
ExecutableFlow flow;
try {
flow = executorManager.getExecutableFlow(execId);
} catch (ExecutorManagerException e) {
logger.error("Can't get executable flow.", e);
- e.printStackTrace();
// something wrong, send out alerts
return Boolean.TRUE;
}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Condition.java b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
index 7bb275b..5e8e7b6 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Condition.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
@@ -25,7 +25,6 @@ import org.apache.commons.jexl2.Expression;
import org.apache.commons.jexl2.JexlEngine;
import org.apache.commons.jexl2.MapContext;
import org.apache.log4j.Logger;
-
import org.joda.time.DateTime;
public class Condition {
@@ -119,7 +118,9 @@ public class Condition {
}
public boolean isMet() {
- logger.info("Testing condition " + expression);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Testing condition " + expression);
+ }
return expression.evaluate(context).equals(Boolean.TRUE);
}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java b/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java
index 68c0508..007d872 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerLoader.java
@@ -178,7 +178,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements
runner.query(connection, LastInsertID.LAST_INSERT_ID,
new LastInsertID());
- if (id == -1l) {
+ if (id == -1L) {
logger.error("trigger id is not properly created.");
throw new TriggerLoaderException("trigger id is not properly created.");
}
@@ -194,7 +194,9 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements
@Override
public void updateTrigger(Trigger t) throws TriggerLoaderException {
- logger.info("Updating trigger " + t.getTriggerId() + " into db.");
+ if (logger.isDebugEnabled()) {
+ logger.debug("Updating trigger " + t.getTriggerId() + " into db.");
+ }
t.setLastModifyTime(System.currentTimeMillis());
Connection connection = getConnection();
try {
@@ -238,7 +240,9 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements
if (updates == 0) {
throw new TriggerLoaderException("No trigger has been updated.");
} else {
- logger.info("Updated " + updates + " records.");
+ if (logger.isDebugEnabled()) {
+ logger.debug("Updated " + updates + " records.");
+ }
}
} catch (SQLException e) {
logger.error(UPDATE_TRIGGER + " failed.");
@@ -253,7 +257,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements
@Override
public Long handle(ResultSet rs) throws SQLException {
if (!rs.next()) {
- return -1l;
+ return -1L;
}
long id = rs.getLong(1);
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index fa8d130..97e858c 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -281,7 +281,9 @@ public class TriggerManager extends EventHandler implements
logger.info("Skipping trigger" + t.getTriggerId() + " until " + t.getNextCheckTime());
}
- logger.info("Checking trigger " + t.getTriggerId());
+ if (logger.isDebugEnabled()) {
+ logger.info("Checking trigger " + t.getTriggerId());
+ }
if (t.getStatus().equals(TriggerStatus.READY)) {
if (t.triggerConditionMet()) {
onTriggerTrigger(t);
diff --git a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
index 596e8b3..3412425 100644
--- a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
@@ -292,6 +292,7 @@ public class PropsUtils {
props.put(CommonJobProperties.FLOW_UUID, UUID.randomUUID().toString());
props.put(CommonJobProperties.PROJECT_LAST_CHANGED_BY, flow.getLastModifiedByUser());
props.put(CommonJobProperties.PROJECT_LAST_CHANGED_DATE, flow.getLastModifiedTimestamp());
+ props.put(CommonJobProperties.SUBMIT_USER, flow.getExecutableFlow().getSubmitUser());
DateTime loadTime = new DateTime();
diff --git a/azkaban-common/src/main/java/azkaban/utils/StringUtils.java b/azkaban-common/src/main/java/azkaban/utils/StringUtils.java
index 17792a0..924afd2 100644
--- a/azkaban-common/src/main/java/azkaban/utils/StringUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/StringUtils.java
@@ -18,6 +18,7 @@ package azkaban.utils;
import java.util.Collection;
import java.util.List;
+import java.util.regex.Pattern;
public class StringUtils {
public static final char SINGLE_QUOTE = '\'';
@@ -88,4 +89,19 @@ public class StringUtils {
return buffer.toString();
}
+
+ private static final Pattern BROWSWER_PATTERN = Pattern
+ .compile(".*Gecko.*|.*AppleWebKit.*|.*Trident.*|.*Chrome.*");
+
+ public static boolean isFromBrowser(String userAgent) {
+ if (userAgent == null) {
+ return false;
+ }
+
+ if (BROWSWER_PATTERN.matcher(userAgent).matches()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java b/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java
index 7118e8c..62208f3 100644
--- a/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java
+++ b/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java
@@ -71,7 +71,7 @@ public class TypedMapWrapper<K, V> {
}
public Long getLong(K key) {
- return getLong(key, -1l);
+ return getLong(key, -1L);
}
public Long getLong(K key, Long defaultVal) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
index 91a4290..c6400f8 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
@@ -28,13 +28,13 @@ import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.flow.Flow;
+import azkaban.project.DirectoryFlowLoader;
import azkaban.project.Project;
-import azkaban.utils.DirectoryFlowLoader;
+import azkaban.test.executions.TestExecutions;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
@@ -47,7 +47,8 @@ public class ExecutableFlowTest {
Logger logger = Logger.getLogger(this.getClass());
DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
- loader.loadProjectFlow(project, new File("unit/executions/embedded"));
+
+ loader.loadProjectFlow(project, TestExecutions.getFlowDir("embedded"));
Assert.assertEquals(0, loader.getErrors().size());
project.setFlows(loader.getFlowMap());
@@ -58,7 +59,7 @@ public class ExecutableFlowTest {
public void tearDown() throws Exception {
}
- @Ignore @Test
+ @Test
public void testExecutorFlowCreation() throws Exception {
Flow flow = project.getFlow("jobe");
Assert.assertNotNull(flow);
@@ -96,7 +97,7 @@ public class ExecutableFlowTest {
Assert.assertEquals(4, jobdFlow.getExecutableNodes().size());
}
- @Ignore @Test
+ @Test
public void testExecutorFlowJson() throws Exception {
Flow flow = project.getFlow("jobe");
Assert.assertNotNull(flow);
@@ -114,7 +115,7 @@ public class ExecutableFlowTest {
testEquals(exFlow, parsedExFlow);
}
- @Ignore @Test
+ @Test
public void testExecutorFlowJson2() throws Exception {
Flow flow = project.getFlow("jobe");
Assert.assertNotNull(flow);
@@ -154,7 +155,7 @@ public class ExecutableFlowTest {
}
@SuppressWarnings("rawtypes")
- @Ignore @Test
+ @Test
public void testExecutorFlowUpdates() throws Exception {
Flow flow = project.getFlow("jobe");
ExecutableFlow exFlow = new ExecutableFlow(project, flow);
@@ -250,7 +251,7 @@ public class ExecutableFlowTest {
}
}
- public static void testEquals(ExecutableNode a, ExecutableNode b) {
+ private static void testEquals(ExecutableNode a, ExecutableNode b) {
if (a instanceof ExecutableFlow) {
if (b instanceof ExecutableFlow) {
ExecutableFlow exA = (ExecutableFlow) a;
@@ -304,7 +305,7 @@ public class ExecutableFlowTest {
Assert.assertEquals(a.getOutNodes(), a.getOutNodes());
}
- public static void testEquals(ExecutionOptions optionsA,
+ private static void testEquals(ExecutionOptions optionsA,
ExecutionOptions optionsB) {
Assert.assertEquals(optionsA.getConcurrentOption(),
optionsB.getConcurrentOption());
@@ -329,7 +330,7 @@ public class ExecutableFlowTest {
testEquals(optionsA.getFlowParameters(), optionsB.getFlowParameters());
}
- public static void testEquals(Set<String> a, Set<String> b) {
+ private static void testEquals(Set<String> a, Set<String> b) {
if (a == b) {
return;
}
@@ -348,7 +349,7 @@ public class ExecutableFlowTest {
}
}
- public static void testEquals(List<String> a, List<String> b) {
+ private static void testEquals(List<String> a, List<String> b) {
if (a == b) {
return;
}
@@ -370,7 +371,7 @@ public class ExecutableFlowTest {
}
@SuppressWarnings("unchecked")
- public static void testDisabledEquals(List<Object> a, List<Object> b) {
+ private static void testDisabledEquals(List<Object> a, List<Object> b) {
if (a == b) {
return;
}
@@ -401,7 +402,7 @@ public class ExecutableFlowTest {
}
}
- public static void testEquals(Map<String, String> a, Map<String, String> b) {
+ private static void testEquals(Map<String, String> a, Map<String, String> b) {
if (a == b) {
return;
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/JavaJobRunnerMain.java b/azkaban-common/src/test/java/azkaban/executor/JavaJobRunnerMain.java
index 67d0284..cf46637 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JavaJobRunnerMain.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JavaJobRunnerMain.java
@@ -196,7 +196,7 @@ public class JavaJobRunnerMain {
}
writer.write("}".getBytes());
} catch (Exception e) {
- new RuntimeException("Unable to store output properties to: "
+ throw new RuntimeException("Unable to store output properties to: "
+ outputFileStr);
} finally {
try {
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index a7e2b5f..dd32302 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -335,6 +335,43 @@ public class JdbcExecutorLoaderTest {
}
+ /* Test exception when unassigning an missing execution */
+ @Test
+ public void testUnassignExecutorException() throws ExecutorManagerException,
+ IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ try {
+ loader.unassignExecutor(2);
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ }
+
+ /* Test happy case when unassigning executor for a flow execution */
+ @Test
+ public void testUnassignExecutor() throws ExecutorManagerException,
+ IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ String host = "localhost";
+ int port = 12345;
+ Executor executor = loader.addExecutor(host, port);
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow);
+ loader.assignExecutor(executor.getId(), flow.getExecutionId());
+ Assert.assertEquals(
+ loader.fetchExecutorByExecutionId(flow.getExecutionId()), executor);
+ loader.unassignExecutor(flow.getExecutionId());
+ Assert.assertEquals(
+ loader.fetchExecutorByExecutionId(flow.getExecutionId()), null);
+ }
+
/* Test exception when assigning a non-existent executor to a flow */
@Test
public void testAssignExecutorInvalidExecutor()
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 833e0c6..0db2de5 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -368,4 +368,9 @@ public class MockExecutorLoader implements ExecutorLoader {
}
return queuedFlows;
}
+
+ @Override
+ public void unassignExecutor(int executionId) throws ExecutorManagerException {
+ executionExecutorMapping.remove(executionId);
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
index 94bbd7e..b938b56 100644
--- a/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
@@ -13,14 +13,12 @@ import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
+import azkaban.utils.TestUtils;
public class QueuedExecutionsTest {
- /* Directory with serialized description of test flows */
- private static final String UNIT_BASE_DIR =
- "../azkaban-test/src/test/resources/executions/exectest1/";
private File getFlowDir(String flow) {
- return new File(UNIT_BASE_DIR + flow + ".flow");
+ return TestUtils.getFlowDir("exectest1", flow);
}
/*
@@ -205,4 +203,4 @@ public class QueuedExecutionsTest {
queue.getReference(pair.getFirst().getExecId()));
}
}
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
index 05a0a28..068935f 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
@@ -22,7 +22,6 @@ import java.util.Date;
import java.util.Properties;
import org.apache.log4j.Logger;
-
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -33,6 +32,7 @@ import org.junit.Test;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
+import azkaban.flow.CommonJobProperties;
import azkaban.utils.Props;
public class JavaProcessJobTest {
@@ -108,6 +108,14 @@ public class JavaProcessJobTest {
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getCanonicalPath());
props.put("type", "java");
props.put("fullPath", ".");
+
+ props.put(CommonJobProperties.PROJECT_NAME, "test_project");
+ props.put(CommonJobProperties.FLOW_ID, "test_flow");
+ props.put(CommonJobProperties.JOB_ID, "test_job");
+ props.put(CommonJobProperties.EXEC_ID, "123");
+ props.put(CommonJobProperties.SUBMIT_USER, "test_user");
+
+
job = new JavaProcessJob("testJavaProcess", props, props, log);
}
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
index 974895b..57f4505 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
@@ -20,7 +20,6 @@ import java.io.File;
import java.io.IOException;
import org.apache.log4j.Logger;
-
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -28,6 +27,7 @@ import org.junit.Test;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
+import azkaban.flow.CommonJobProperties;
import azkaban.utils.Props;
public class ProcessJobTest {
@@ -46,6 +46,12 @@ public class ProcessJobTest {
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getCanonicalPath());
props.put("type", "command");
props.put("fullPath", ".");
+
+ props.put(CommonJobProperties.PROJECT_NAME, "test_project");
+ props.put(CommonJobProperties.FLOW_ID, "test_flow");
+ props.put(CommonJobProperties.JOB_ID, "test_job");
+ props.put(CommonJobProperties.EXEC_ID, "123");
+ props.put(CommonJobProperties.SUBMIT_USER, "test_user");
job = new ProcessJob("TestProcess", props, props, log);
}
@@ -62,6 +68,37 @@ public class ProcessJobTest {
job.run();
}
+
+ /**
+ * this job should run fine if the props contain user.to.proxy
+ * @throws Exception
+ */
+ @Test
+ public void testOneUnixCommandWithProxyUserInsteadOfSubmitUser() throws Exception {
+
+ // Initialize the Props
+ props.removeLocal(CommonJobProperties.SUBMIT_USER);
+ props.put("user.to.proxy", "test_user");
+ props.put(ProcessJob.COMMAND, "ls -al");
+
+ job.run();
+
+ }
+
+ /**
+ * this job should fail because there is no user.to.proxy and no CommonJobProperties.SUBMIT_USER
+ * @throws Exception
+ */
+ @Test (expected=RuntimeException.class)
+ public void testOneUnixCommandWithNoUser() throws Exception {
+
+ // Initialize the Props
+ props.removeLocal(CommonJobProperties.SUBMIT_USER);
+ props.put(ProcessJob.COMMAND, "ls -al");
+
+ job.run();
+
+ }
@Test
public void testFailedUnixCommand() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/PythonJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/PythonJobTest.java
index ee767de..7320c15 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/PythonJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/PythonJobTest.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import azkaban.utils.Props;
@@ -82,6 +83,7 @@ public class PythonJobTest {
Utils.removeFile(scriptFile);
}
+ @Ignore("Test appears to hang.")
@Test
public void testPythonJob() {
diff --git a/azkaban-common/src/test/java/azkaban/project/ProjectTest.java b/azkaban-common/src/test/java/azkaban/project/ProjectTest.java
index da36d46..63c19f0 100644
--- a/azkaban-common/src/test/java/azkaban/project/ProjectTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/ProjectTest.java
@@ -28,8 +28,8 @@ public class ProjectTest {
@Test
public void testToAndFromObject() throws Exception {
Project project = new Project(1, "tesTing");
- project.setCreateTimestamp(1l);
- project.setLastModifiedTimestamp(2l);
+ project.setCreateTimestamp(1L);
+ project.setLastModifiedTimestamp(2L);
project.setDescription("I am a test");
project.setUserPermission("user1", new Permission(new Type[] { Type.ADMIN,
Type.EXECUTE }));
diff --git a/azkaban-common/src/test/java/azkaban/utils/StringUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/StringUtilsTest.java
new file mode 100644
index 0000000..71c3d21
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/StringUtilsTest.java
@@ -0,0 +1,80 @@
+package azkaban.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StringUtilsTest {
+
+ private static final String chromeOnMac =
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.155 Safari/537.36";
+ private static final String fireFoxOnMac =
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:40.0) Gecko/20100101 Firefox/40.0";
+ private static final String safariOnMac =
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.78.2 (KHTML, like Gecko) Version/7.0.6 Safari/537.78.2";
+ private static final String chromeOnLinux =
+ "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36";
+ private static final String fireFoxOnLinux =
+ "Mozilla/5.0 (X11; Linux x86_64; rv:39.0) Gecko/20100101 Firefox/39.0";
+
+ private static final String[] browserVariants = { chromeOnMac, fireFoxOnMac,
+ safariOnMac, chromeOnLinux, fireFoxOnLinux };
+
+ private static final String[] BROWSER_NAMES = { "AppleWebKit", "Gecko",
+ "Chrome" };
+
+ @Test
+ public void isBrowser() throws Exception {
+
+ for (String browser : browserVariants) {
+ Assert.assertTrue(browser, StringUtils.isFromBrowser(browser));
+ }
+ }
+
+ @Test
+ public void notBrowserWithLowercase() throws Exception {
+
+ for (String browser : browserVariants) {
+ Assert.assertFalse(browser.toLowerCase(),
+ StringUtils.isFromBrowser(browser.toLowerCase()));
+ }
+ }
+
+ @Test
+ public void notBrowser() throws Exception {
+ String testStr = "curl";
+ Assert.assertFalse(testStr, StringUtils.isFromBrowser(testStr));
+ }
+
+ @Test
+ public void emptyBrowserString() throws Exception {
+
+ Assert.assertFalse("empty string", StringUtils.isFromBrowser(""));
+ }
+
+ @Test
+ public void nullBrowserString() throws Exception {
+
+ Assert.assertFalse("null string", StringUtils.isFromBrowser(null));
+ }
+
+ @Test
+ public void startsWithBrowserName() {
+ for (String name : BROWSER_NAMES) {
+ Assert.assertTrue(StringUtils.isFromBrowser(name + " is awesome"));
+ }
+ }
+
+ @Test
+ public void endsWithBrowserName() {
+ for (String name : BROWSER_NAMES) {
+ Assert.assertTrue(StringUtils.isFromBrowser("awesome is" + name));
+ }
+ }
+
+ @Test
+ public void containsBrowserName() {
+ for (String name : BROWSER_NAMES) {
+ Assert.assertTrue(StringUtils.isFromBrowser("awesome " + name + " is"));
+ }
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
index 68b10ee..3eff490 100644
--- a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -33,7 +33,7 @@ import azkaban.user.XmlUserManager;
public class TestUtils {
/* Base resource direcotyr for unit tests */
private static final String UNIT_RESOURCE_DIR =
- "../azkaban-test/src/test/resources";
+ "../azkaban-test/src/test/resources/azkaban/test";
/* Directory with serialized description of test flows */
private static final String UNIT_EXECUTION_DIR =
UNIT_RESOURCE_DIR + "/executions";
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 8f5df1a..47fbfad 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -132,7 +132,7 @@ public class AzkabanExecutorServer {
root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
- root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverstastics");
+ root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");
root.setAttribute(ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, this);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 361db00..01ab373 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -52,6 +52,8 @@ import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
import azkaban.metric.MetricReportManager;
import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectWhitelist;
+import azkaban.project.ProjectWhitelist.WhitelistType;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
@@ -481,7 +483,9 @@ public class FlowRunnerManager implements EventListener,
int numJobs =
Integer.valueOf(options.getFlowParameters().get(
FLOW_NUM_JOB_THREADS));
- if (numJobs > 0 && numJobs <= numJobThreads) {
+ if (numJobs > 0 && (numJobs <= numJobThreads || ProjectWhitelist
+ .isProjectWhitelisted(flow.getProjectId(),
+ WhitelistType.NumJobPerFlow))) {
numJobThreads = numJobs;
}
} catch (Exception e) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
index d5cb550..17ba6e9 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
@@ -21,8 +21,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
-import java.util.List;
-
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@@ -33,11 +31,17 @@ import org.apache.log4j.Logger;
import azkaban.executor.ExecutorInfo;
import azkaban.utils.JSONUtils;
-public class ServerStatisticsServlet extends HttpServlet {
+
+public class ServerStatisticsServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
- private static final int cacheTimeInMilliseconds = 1000;
+ private static final int cacheTimeInMilliseconds = 1000;
private static final Logger logger = Logger.getLogger(ServerStatisticsServlet.class);
private static final String noCacheParamName = "nocache";
+ private static final boolean exists_Bash = new File("/bin/bash").exists();
+ private static final boolean exists_Cat = new File("/bin/cat").exists();
+ private static final boolean exists_Grep = new File("/bin/grep").exists();
+ private static final boolean exists_Meminfo = new File("/proc/meminfo").exists();
+ private static final boolean exists_LoadAvg = new File("/proc/loadavg").exists();
protected static long lastRefreshedTime = 0;
protected static ExecutorInfo cachedstats = null;
@@ -48,12 +52,11 @@ public class ServerStatisticsServlet extends HttpServlet {
* @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
* javax.servlet.http.HttpServletResponse)
*/
- protected void doGet(HttpServletRequest req, HttpServletResponse resp)
- throws ServletException, IOException {
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
- boolean noCache = null!= req && Boolean.valueOf(req.getParameter(noCacheParamName));
+ boolean noCache = null != req && Boolean.valueOf(req.getParameter(noCacheParamName));
- if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds){
+ if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds) {
this.populateStatistics(noCache);
}
@@ -69,82 +72,116 @@ public class ServerStatisticsServlet extends HttpServlet {
* a double value will be used to present the remaining memory,
* a returning value of '55.6' means 55.6%
*/
- protected void fillRemainingMemoryPercent(ExecutorInfo stats){
- if (new File("/bin/bash").exists()
- && new File("/bin/cat").exists()
- && new File("/bin/grep").exists()
- && new File("/proc/meminfo").exists()) {
- java.lang.ProcessBuilder processBuilder =
- new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/cat /proc/meminfo | grep -E \"MemTotal|MemFree\"");
- try {
- ArrayList<String> output = new ArrayList<String>();
- Process process = processBuilder.start();
- process.waitFor();
- InputStream inputStream = process.getInputStream();
+ protected void fillRemainingMemoryPercent(ExecutorInfo stats) {
+ if (exists_Bash && exists_Cat && exists_Grep && exists_Meminfo) {
+ java.lang.ProcessBuilder processBuilder =
+ new java.lang.ProcessBuilder("/bin/bash", "-c",
+ "/bin/cat /proc/meminfo | grep -E \"^MemTotal:|^MemFree:|^Buffers:|^Cached:|^SwapCached:\"");
try {
- java.io.BufferedReader reader =
- new java.io.BufferedReader(new InputStreamReader(inputStream));
- String line = null;
- while ((line = reader.readLine()) != null) {
- output.add(line);
+ ArrayList<String> output = new ArrayList<String>();
+ Process process = processBuilder.start();
+ process.waitFor();
+ InputStream inputStream = process.getInputStream();
+ try {
+ java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(inputStream));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ output.add(line);
+ }
+ } finally {
+ inputStream.close();
}
- }finally {
- inputStream.close();
- }
- long totalMemory = 0;
- long freeMemory = 0;
-
- // process the output from bash call.
- // we expect the result from the bash call to be something like following -
- // MemTotal: 65894264 kB
- // MemFree: 61104076 kB
- if (output.size() == 2) {
- for (String result : output){
- // find the total memory and value the variable.
- if (result.contains("MemTotal") && result.split("\\s+").length > 2){
- try {
- totalMemory = Long.parseLong(result.split("\\s+")[1]);
- logger.info("Total memory : " + totalMemory);
- }catch(NumberFormatException e){
- logger.error("yielding 0 for total memory as output is invalid -" + result);
+ long totalMemory = 0;
+ long totalFreeMemory = 0;
+ Long parsedResult = (long) 0;
+
+ // process the output from bash call.
+ // we expect the result from the bash call to be something like following -
+ // MemTotal: 65894264 kB
+ // MemFree: 57753844 kB
+ // Buffers: 305552 kB
+ // Cached: 3802432 kB
+ // SwapCached: 0 kB
+ // Note : total free memory = freeMemory + cached + buffers + swapCached
+ // TODO : think about merging the logic in systemMemoryInfo as the logic is similar
+ if (output.size() == 5) {
+ for (String result : output) {
+ // find the total memory and value the variable.
+ parsedResult = extractMemoryInfo("MemTotal", result);
+ if (null != parsedResult) {
+ totalMemory = parsedResult;
+ continue;
}
- }
- // find the free memory and value the variable.
- if (result.contains("MemFree") && result.split("\\s+").length > 2){
- try {
- freeMemory = Long.parseLong(result.split("\\s+")[1]);
- logger.info("Free memory : " + freeMemory);
- }catch(NumberFormatException e){
- logger.error("yielding 0 for total memory as output is invalid -" + result);
+
+ // find the free memory.
+ parsedResult = extractMemoryInfo("MemFree", result);
+ if (null != parsedResult) {
+ totalFreeMemory += parsedResult;
+ continue;
+ }
+
+ // find the Buffers.
+ parsedResult = extractMemoryInfo("Buffers", result);
+ if (null != parsedResult) {
+ totalFreeMemory += parsedResult;
+ continue;
+ }
+
+ // find the Cached.
+ parsedResult = extractMemoryInfo("SwapCached", result);
+ if (null != parsedResult) {
+ totalFreeMemory += parsedResult;
+ continue;
+ }
+
+ // find the Cached.
+ parsedResult = extractMemoryInfo("Cached", result);
+ if (null != parsedResult) {
+ totalFreeMemory += parsedResult;
+ continue;
}
}
+ } else {
+ logger.error("failed to get total/free memory info as the bash call returned invalid result."
+ + String.format(" Output from the bash call - %s ", output.toString()));
}
- }else {
- logger.error("failed to get total/free memory info as the bash call returned invalid result.");
- }
- // the number got from the proc file is in KBs we want to see the number in MBs so we are deviding it by 1024.
- stats.setRemainingMemoryInMB(freeMemory/1024);
- stats.setRemainingMemoryPercent(totalMemory == 0 ? 0 : ((double)freeMemory / (double)totalMemory)*100);
- }
- catch (Exception ex){
- logger.error("failed fetch system memory info " +
- "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
+ // the number got from the proc file is in KBs we want to see the number in MBs so we are dividing it by 1024.
+ stats.setRemainingMemoryInMB(totalFreeMemory / 1024);
+ stats.setRemainingMemoryPercent(totalMemory == 0 ? 0 : ((double) totalFreeMemory / (double) totalMemory) * 100);
+ } catch (Exception ex) {
+ logger.error("failed fetch system memory info "
+ + "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
+ }
+ } else {
+ logger.error("failed fetch system memory info, one or more files from the following list are missing - "
+ + "'/bin/bash'," + "'/bin/cat'," + "'/proc/loadavg'");
}
- } else {
- logger.error("failed fetch system memory info, one or more files from the following list are missing - " +
- "'/bin/bash'," + "'/bin/cat'," +"'/proc/loadavg'");
}
+
+ private Long extractMemoryInfo(String field, String result) {
+ Long returnResult = null;
+ if (null != result && null != field && result.matches(String.format("^%s:.*", field))
+ && result.split("\\s+").length > 2) {
+ try {
+ returnResult = Long.parseLong(result.split("\\s+")[1]);
+ logger.debug(field + ":" + returnResult);
+ } catch (NumberFormatException e) {
+ returnResult = 0L;
+ logger.error(String.format("yielding 0 for %s as output is invalid - %s", field, result));
+ }
+ }
+ return returnResult;
}
/**
* call the data providers to fill the returning data container for statistics data.
* This function refreshes the static cached copy of data in case if necessary.
* */
- protected synchronized void populateStatistics(boolean noCache){
+ protected synchronized void populateStatistics(boolean noCache) {
//check again before starting the work.
- if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds){
+ if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds) {
final ExecutorInfo stats = new ExecutorInfo();
fillRemainingMemoryPercent(stats);
@@ -152,7 +189,7 @@ public class ServerStatisticsServlet extends HttpServlet {
fillCpuUsage(stats);
cachedstats = stats;
- lastRefreshedTime = System.currentTimeMillis();
+ lastRefreshedTime = System.currentTimeMillis();
}
}
@@ -161,33 +198,32 @@ public class ServerStatisticsServlet extends HttpServlet {
* @param stats reference to the result container which contains all the results, this specific method
* will only work on the property "remainingFlowCapacity".
*/
- protected void fillRemainingFlowCapacityAndLastDispatchedTime(ExecutorInfo stats){
+ protected void fillRemainingFlowCapacityAndLastDispatchedTime(ExecutorInfo stats) {
AzkabanExecutorServer server = AzkabanExecutorServer.getApp();
- if (server != null){
- FlowRunnerManager runnerMgr = AzkabanExecutorServer.getApp().getFlowRunnerManager();
+ if (server != null) {
+ FlowRunnerManager runnerMgr = AzkabanExecutorServer.getApp().getFlowRunnerManager();
int assignedFlows = runnerMgr.getNumRunningFlows() + runnerMgr.getNumQueuedFlows();
stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() - assignedFlows);
stats.setNumberOfAssignedFlows(assignedFlows);
stats.setLastDispatchedTime(runnerMgr.getLastFlowSubmittedTime());
- }else {
- logger.error("failed to get data for remaining flow capacity or LastDispatchedTime" +
- " as the AzkabanExecutorServer has yet been initialized.");
+ } else {
+ logger.error("failed to get data for remaining flow capacity or LastDispatchedTime"
+ + " as the AzkabanExecutorServer has yet been initialized.");
}
}
-
/**<pre>
- * fill the result set with the Remaining temp Storage .
- * Note : As the Top bash call doesn't yield accurate result for the system load,
+ * fill the result set with the CPU usage .
+ * Note : As the 'Top' bash call doesn't yield accurate result for the system load,
* the implementation has been changed to load from the "proc/loadavg" which keeps
* the moving average of the system load, we are pulling the average for the recent 1 min.
*</pre>
* @param stats reference to the result container which contains all the results, this specific method
- * will only work on the property "cpuUdage".
+ * will only work on the property "cpuUsage".
*/
- protected void fillCpuUsage(ExecutorInfo stats){
- if (new File("/bin/bash").exists() && new File("/bin/cat").exists() && new File("/proc/loadavg").exists()) {
+ protected void fillCpuUsage(ExecutorInfo stats) {
+ if (exists_Bash && exists_Cat && exists_LoadAvg) {
java.lang.ProcessBuilder processBuilder =
new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/cat /proc/loadavg");
try {
@@ -196,13 +232,12 @@ public class ServerStatisticsServlet extends HttpServlet {
process.waitFor();
InputStream inputStream = process.getInputStream();
try {
- java.io.BufferedReader reader =
- new java.io.BufferedReader(new InputStreamReader(inputStream));
+ java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(inputStream));
String line = null;
while ((line = reader.readLine()) != null) {
output.add(line);
}
- }finally {
+ } finally {
inputStream.close();
}
@@ -213,20 +248,19 @@ public class ServerStatisticsServlet extends HttpServlet {
try {
cpuUsage = Double.parseDouble(splitedresult[0]);
- }catch(NumberFormatException e){
+ } catch (NumberFormatException e) {
logger.error("yielding 0.0 for CPU usage as output is invalid -" + output.get(0));
}
logger.info("System load : " + cpuUsage);
stats.setCpuUpsage(cpuUsage);
}
- }
- catch (Exception ex){
- logger.error("failed fetch system load info " +
- "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
+ } catch (Exception ex) {
+ logger.error("failed fetch system load info "
+ + "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
}
} else {
- logger.error("failed fetch system load info, one or more files from the following list are missing - " +
- "'/bin/bash'," + "'/bin/cat'," +"'/proc/loadavg'");
+ logger.error("failed fetch system load info, one or more files from the following list are missing - "
+ + "'/bin/bash'," + "'/bin/cat'," + "'/proc/loadavg'");
}
}
}
diff --git a/azkaban-execserver/src/main/resources/log4j.properties b/azkaban-execserver/src/main/resources/log4j.properties
index e304ac7..464f36a 100644
--- a/azkaban-execserver/src/main/resources/log4j.properties
+++ b/azkaban-execserver/src/main/resources/log4j.properties
@@ -1,12 +1,14 @@
-log4j.rootLogger=INFO, Console
+log_dir=${log4j.log.dir}
+
+log4j.rootLogger=INFO, ExecServer
log4j.logger.azkaban.execapp=INFO, ExecServer
-log4j.appender.ExecServer=org.apache.log4j.RollingFileAppender
+log4j.appender.ExecServer=org.apache.log4j.DailyRollingFileAppender
log4j.appender.ExecServer.layout=org.apache.log4j.PatternLayout
-log4j.appender.ExecServer.File=azkaban-execserver.log
+log4j.appender.ExecServer.File=${log_dir}/azkaban-execserver.log
log4j.appender.ExecServer.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
-log4j.appender.ExecServer.MaxFileSize=102400MB
log4j.appender.ExecServer.MaxBackupIndex=2
+log4j.appender.ExecServer.DatePattern='.'yyyy-MM-dd
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
diff --git a/azkaban-execserver/src/package/bin/azkaban-executor-start.sh b/azkaban-execserver/src/package/bin/azkaban-executor-start.sh
index c281890..446571b 100755
--- a/azkaban-execserver/src/package/bin/azkaban-executor-start.sh
+++ b/azkaban-execserver/src/package/bin/azkaban-executor-start.sh
@@ -44,7 +44,7 @@ serverpath=`pwd`
if [ -z $AZKABAN_OPTS ]; then
AZKABAN_OPTS="-Xmx3G"
fi
-AZKABAN_OPTS="$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath"
+AZKABAN_OPTS="$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath -Dlog4j.log.dir=$azkaban_dir/logs"
java $AZKABAN_OPTS $JAVA_LIB_PATH -cp $CLASSPATH azkaban.execapp.AzkabanExecutorServer -conf $azkaban_dir/conf $@ &
diff --git a/azkaban-execserver/src/package/bin/start-exec.sh b/azkaban-execserver/src/package/bin/start-exec.sh
index fbb7124..d8f0f73 100755
--- a/azkaban-execserver/src/package/bin/start-exec.sh
+++ b/azkaban-execserver/src/package/bin/start-exec.sh
@@ -1,6 +1,5 @@
#!/bin/bash
-base_dir=$(dirname $0)/..
-
-bin/azkaban-executor-start.sh $base_dir 2>&1>logs/executorServerLog__`date +%F+%T`.out &
+# pass along command line arguments to azkaban-executor-start.sh script
+bin/azkaban-executor-start.sh "$@" 2>&1>logs/executorServerLog__`date +%F+%T`.out &
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 360d4f4..e29e973 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -43,11 +43,11 @@ import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
+import azkaban.project.DirectoryFlowLoader;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
import azkaban.project.MockProjectLoader;
-import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.Props;
/**
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index d5988bb..422f622 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -39,11 +39,11 @@ import azkaban.executor.JavaJob;
import azkaban.executor.MockExecutorLoader;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
+import azkaban.project.DirectoryFlowLoader;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
import azkaban.project.MockProjectLoader;
-import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.Props;
/**
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index c78abaa..84416de 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -42,11 +42,11 @@ import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
+import azkaban.project.DirectoryFlowLoader;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
import azkaban.project.MockProjectLoader;
-import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.Props;
/**
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
index 55078b5..15078b4 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -1,37 +1,41 @@
package azkaban.execapp;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import azkaban.executor.ExecutorInfo;
+@Ignore
public class StatisticsServletTest {
- private class MockStatisticsServlet extends ServerStatisticsServlet{
+ private class MockStatisticsServlet extends ServerStatisticsServlet {
/** */
private static final long serialVersionUID = 1L;
- public ExecutorInfo getStastics(){
+ public ExecutorInfo getStastics() {
return cachedstats;
}
- public long getUpdatedTime(){
+ public long getUpdatedTime() {
return lastRefreshedTime;
}
- public void callPopulateStatistics(){
- this.populateStatistics(false);
+ public void callPopulateStatistics() {
+ this.populateStatistics(false);
}
- public void callFillCpuUsage(ExecutorInfo stats){
- this.fillCpuUsage(stats);}
+ public void callFillCpuUsage(ExecutorInfo stats) {
+ this.fillCpuUsage(stats);
+ }
- public void callFillRemainingMemoryPercent(ExecutorInfo stats){
- this.fillRemainingMemoryPercent(stats);}
+ public void callFillRemainingMemoryPercent(ExecutorInfo stats) {
+ this.fillRemainingMemoryPercent(stats);
+ }
}
private MockStatisticsServlet statServlet = new MockStatisticsServlet();
@Test
- public void testFillMemory() {
+ public void testFillMemory() {
ExecutorInfo stats = new ExecutorInfo();
this.statServlet.callFillRemainingMemoryPercent(stats);
// assume any machine that runs this test should
@@ -41,14 +45,14 @@ public class StatisticsServletTest {
}
@Test
- public void testFillCpu() {
+ public void testFillCpu() {
ExecutorInfo stats = new ExecutorInfo();
this.statServlet.callFillCpuUsage(stats);
Assert.assertTrue(stats.getCpuUsage() > 0);
}
@Test
- public void testPopulateStatistics() {
+ public void testPopulateStatistics() {
this.statServlet.callPopulateStatistics();
Assert.assertNotNull(this.statServlet.getStastics());
Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryInMB() > 0);
@@ -57,17 +61,18 @@ public class StatisticsServletTest {
}
@Test
- public void testPopulateStatisticsCache() {
+ public void testPopulateStatisticsCache() {
this.statServlet.callPopulateStatistics();
final long updatedTime = this.statServlet.getUpdatedTime();
- while (System.currentTimeMillis() - updatedTime < 1000){
+ while (System.currentTimeMillis() - updatedTime < 1000) {
this.statServlet.callPopulateStatistics();
Assert.assertEquals(updatedTime, this.statServlet.getUpdatedTime());
}
try {
Thread.sleep(1000);
- } catch (InterruptedException e) {}
+ } catch (InterruptedException e) {
+ }
// make sure cache expires after timeout.
this.statServlet.callPopulateStatistics();
diff --git a/azkaban-test/src/test/java/azkaban/test/executions/TestExecutions.java b/azkaban-test/src/test/java/azkaban/test/executions/TestExecutions.java
new file mode 100644
index 0000000..56cf0c6
--- /dev/null
+++ b/azkaban-test/src/test/java/azkaban/test/executions/TestExecutions.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2015 Azkaban Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.executions;
+
+import java.io.File;
+import java.net.URL;
+import java.net.URISyntaxException;
+
+import org.junit.Assert;
+
+public class TestExecutions {
+ public static File getFlowDir(final String path) throws URISyntaxException {
+ URL url = TestExecutions.class.getResource(path);
+ Assert.assertNotNull(url);
+ return new File(url.toURI());
+ }
+}
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 3171e9a..89dea03 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -39,6 +39,7 @@ import javax.management.ObjectName;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
+import org.apache.log4j.jmx.HierarchyDynamicMBean;
import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.log.Log4JLogChute;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
@@ -53,8 +54,6 @@ import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
-import com.linkedin.restli.server.RestliServlet;
-
import azkaban.alert.Alerter;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.executor.ExecutorManager;
@@ -88,19 +87,21 @@ import azkaban.utils.FileIOUtils;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.Utils;
+import azkaban.webapp.plugin.PluginRegistry;
+import azkaban.webapp.plugin.TriggerPlugin;
+import azkaban.webapp.plugin.ViewerPlugin;
import azkaban.webapp.servlet.AbstractAzkabanServlet;
import azkaban.webapp.servlet.ExecutorServlet;
+import azkaban.webapp.servlet.HistoryServlet;
import azkaban.webapp.servlet.IndexRedirectServlet;
import azkaban.webapp.servlet.JMXHttpServlet;
-import azkaban.webapp.servlet.ScheduleServlet;
-import azkaban.webapp.servlet.HistoryServlet;
-import azkaban.webapp.servlet.ProjectServlet;
import azkaban.webapp.servlet.ProjectManagerServlet;
+import azkaban.webapp.servlet.ProjectServlet;
+import azkaban.webapp.servlet.ScheduleServlet;
import azkaban.webapp.servlet.StatsServlet;
import azkaban.webapp.servlet.TriggerManagerServlet;
-import azkaban.webapp.plugin.TriggerPlugin;
-import azkaban.webapp.plugin.ViewerPlugin;
-import azkaban.webapp.plugin.PluginRegistry;
+
+import com.linkedin.restli.server.RestliServlet;
/**
* The Azkaban Jetty server class
@@ -123,6 +124,9 @@ import azkaban.webapp.plugin.PluginRegistry;
* Jetty truststore password
*/
public class AzkabanWebServer extends AzkabanServer {
+ private static final String AZKABAN_ACCESS_LOGGER_NAME =
+ "azkaban.webapp.servlet.LoginAbstractAzkabanServlet";
+
private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
public static final String AZKABAN_HOME = "AZKABAN_HOME";
@@ -705,6 +709,13 @@ public class AzkabanWebServer extends AzkabanServer {
.getString("jetty.trustpassword"));
secureConnector.setHeaderBufferSize(MAX_HEADER_BUFFER_SIZE);
+ // set up vulnerable cipher suites to exclude
+ List<String> cipherSuitesToExclude = azkabanSettings.getStringList("jetty.excludeCipherSuites");
+ logger.info("Excluded Cipher Suites: " + String.valueOf(cipherSuitesToExclude));
+ if (cipherSuitesToExclude != null && !cipherSuitesToExclude.isEmpty()) {
+ secureConnector.setExcludeCipherSuites(cipherSuitesToExclude.toArray(new String[0]));
+ }
+
server.addConnector(secureConnector);
} else {
ssl = false;
@@ -823,23 +834,25 @@ public class AzkabanWebServer extends AzkabanServer {
public void logTopMemoryConsumers() throws Exception, IOException {
if (new File("/bin/bash").exists() && new File("/bin/ps").exists()
- && new File("/usr/bin/head").exists()) {
+ && new File("/usr/bin/head").exists()) {
logger.info("logging top memeory consumer");
java.lang.ProcessBuilder processBuilder =
- new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/ps aux --sort -rss | /usr/bin/head");
+ new java.lang.ProcessBuilder("/bin/bash", "-c",
+ "/bin/ps aux --sort -rss | /usr/bin/head");
Process p = processBuilder.start();
p.waitFor();
-
+
InputStream is = p.getInputStream();
- java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(is));
+ java.io.BufferedReader reader =
+ new java.io.BufferedReader(new InputStreamReader(is));
String line = null;
while ((line = reader.readLine()) != null) {
logger.info(line);
}
is.close();
}
- }
+ }
});
logger.info("Server running on " + (ssl ? "ssl" : "") + " port " + port
+ ".");
@@ -1233,6 +1246,22 @@ public class AzkabanWebServer extends AzkabanServer {
registerMbean("executorManager", new JmxExecutorManager(
(ExecutorManager) executorManager));
}
+
+ // Register Log4J loggers as JMX beans so the log level can be
+ // updated via JConsole or Java VisualVM
+ HierarchyDynamicMBean log4jMBean = new HierarchyDynamicMBean();
+ registerMbean("log4jmxbean", log4jMBean);
+ ObjectName accessLogLoggerObjName =
+ log4jMBean.addLoggerMBean(AZKABAN_ACCESS_LOGGER_NAME);
+
+ if (accessLogLoggerObjName == null) {
+ System.out
+ .println("************* loginLoggerObjName is null, make sure there is a logger with name "
+ + AZKABAN_ACCESS_LOGGER_NAME);
+ } else {
+ System.out.println("******** loginLoggerObjName: "
+ + accessLogLoggerObjName.getCanonicalName());
+ }
}
public void close() {
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 4970fce..ef588f7 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -503,7 +503,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
} else {
ret.put("length", data.getLength());
ret.put("offset", data.getOffset());
- ret.put("data", data.getData());
+ ret.put("data", StringEscapeUtils.escapeHtml(data.getData()));
}
} catch (ExecutorManagerException e) {
throw new ServletException(e);
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index e0b9e38..1d9f315 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -43,6 +43,7 @@ import azkaban.user.Role;
import azkaban.user.User;
import azkaban.user.UserManager;
import azkaban.user.UserManagerException;
+import azkaban.utils.StringUtils;
/**
* Abstract Servlet that handles auto login when the session hasn't been
@@ -77,11 +78,17 @@ public abstract class LoginAbstractAzkabanServlet extends
private MultipartParser multipartParser;
+ private boolean shouldLogRawUserAgent = false;
+
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
+
+ shouldLogRawUserAgent =
+ getApplication().getServerProps().getBoolean("accesslog.raw.useragent",
+ false);
}
public void setResourceDirectory(File file) {
@@ -93,6 +100,7 @@ public abstract class LoginAbstractAzkabanServlet extends
throws ServletException, IOException {
// Set session id
Session session = getSessionFromRequest(req);
+ logRequest(req, session);
if (hasParam(req, "logout")) {
resp.sendRedirect(req.getContextPath());
if (session != null) {
@@ -103,7 +111,9 @@ public abstract class LoginAbstractAzkabanServlet extends
}
if (session != null) {
- logger.info("Found session " + session.getUser());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Found session " + session.getUser());
+ }
if (handleFileGet(req, resp)) {
return;
}
@@ -120,6 +130,46 @@ public abstract class LoginAbstractAzkabanServlet extends
}
}
+ /**
+ * Log out request - the format should be close to Apache access log format
+ *
+ * @param req
+ * @param session
+ */
+ private void logRequest(HttpServletRequest req, Session session) {
+ StringBuilder buf = new StringBuilder();
+ buf.append(req.getRemoteAddr()).append(" ");
+ if (session != null && session.getUser() != null) {
+ buf.append(session.getUser().getUserId()).append(" ");
+ } else {
+ buf.append(" - ").append(" ");
+ }
+
+ buf.append("\"");
+ buf.append(req.getMethod()).append(" ");
+ buf.append(req.getRequestURI()).append(" ");
+ if (req.getQueryString() != null) {
+ buf.append(req.getQueryString()).append(" ");
+ } else {
+ buf.append("-").append(" ");
+ }
+ buf.append(req.getProtocol()).append("\" ");
+
+ String userAgent = req.getHeader("User-Agent");
+ if (shouldLogRawUserAgent) {
+ buf.append(userAgent);
+ } else {
+ // simply log a short string to indicate browser or not
+ if (StringUtils.isFromBrowser(userAgent)) {
+ buf.append("browser");
+ } else {
+ buf.append("not-browser");
+ }
+ }
+
+ logger.info(buf.toString());
+ }
+
private boolean handleFileGet(HttpServletRequest req, HttpServletResponse resp)
throws IOException {
if (webResourceDirectory == null) {
@@ -168,7 +218,6 @@ public abstract class LoginAbstractAzkabanServlet extends
if (cookie != null) {
sessionId = cookie.getValue();
- logger.info("Session id " + sessionId);
}
if (sessionId == null && hasParam(req, "session.id")) {
@@ -211,6 +260,8 @@ public abstract class LoginAbstractAzkabanServlet extends
throws ServletException, IOException {
Session session = getSessionFromRequest(req);
+ logRequest(req, session);
+
// Handle Multipart differently from other post messages
if (ServletFileUpload.isMultipartContent(req)) {
Map<String, Object> params = multipartParser.parseMultipart(req);
diff --git a/azkaban-webserver/src/main/resources/log4j.properties b/azkaban-webserver/src/main/resources/log4j.properties
index b9fe746..4001d2c 100644
--- a/azkaban-webserver/src/main/resources/log4j.properties
+++ b/azkaban-webserver/src/main/resources/log4j.properties
@@ -1,21 +1,22 @@
-log4j.rootLogger=INFO, Console
+log_dir=${log4j.log.dir}
+
+log4j.rootLogger=INFO, WebServer
log4j.logger.azkaban.webapp=INFO, WebServer
-log4j.logger.azkaban.webapp.servlet.AbstractAzkabanServlet=INFO, R
-log4j.logger.azkaban.webapp.servlet.LoginAbstractAzkabanServlet=INFO, R
+log4j.logger.azkaban.webapp.servlet.AbstractAzkabanServlet=INFO, Access
+log4j.logger.azkaban.webapp.servlet.LoginAbstractAzkabanServlet=INFO, Access
+log4j.additivity.azkaban.webapp.servlet.LoginAbstractAzkabanServlet=false
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.File=azkaban-access.log
-log4j.appender.R.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
-log4j.appender.R.MaxFileSize=102400MB
-log4j.appender.R.MaxBackupIndex=2
+log4j.appender.Access=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.Access.layout=org.apache.log4j.PatternLayout
+log4j.appender.Access.File=${log_dir}/azkaban-access.log
+log4j.appender.Access.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
+log4j.appender.Access.DatePattern='.'yyyy-MM-dd
-log4j.appender.WebServer=org.apache.log4j.RollingFileAppender
+log4j.appender.WebServer=org.apache.log4j.DailyRollingFileAppender
log4j.appender.WebServer.layout=org.apache.log4j.PatternLayout
-log4j.appender.WebServer.File=azkaban-webserver.log
+log4j.appender.WebServer.File=${log_dir}/azkaban-webserver.log
log4j.appender.WebServer.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
-log4j.appender.WebServer.MaxFileSize=102400MB
-log4j.appender.WebServer.MaxBackupIndex=2
+log4j.appender.WebServer.DatePattern='.'yyyy-MM-dd
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
diff --git a/azkaban-webserver/src/package/bin/azkaban-web-start.sh b/azkaban-webserver/src/package/bin/azkaban-web-start.sh
index 744320a..f0cc3c8 100755
--- a/azkaban-webserver/src/package/bin/azkaban-web-start.sh
+++ b/azkaban-webserver/src/package/bin/azkaban-web-start.sh
@@ -43,7 +43,7 @@ serverpath=`pwd`
if [ -z $AZKABAN_OPTS ]; then
AZKABAN_OPTS="-Xmx4G"
fi
-AZKABAN_OPTS="$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath"
+AZKABAN_OPTS="$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath -Dlog4j.log.dir=$azkaban_dir/logs"
java $AZKABAN_OPTS $JAVA_LIB_PATH -cp $CLASSPATH azkaban.webapp.AzkabanWebServer -conf $azkaban_dir/conf $@ &
diff --git a/azkaban-webserver/src/package/conf/azkaban.properties b/azkaban-webserver/src/package/conf/azkaban.properties
index 609a917..a89a46a 100644
--- a/azkaban-webserver/src/package/conf/azkaban.properties
+++ b/azkaban-webserver/src/package/conf/azkaban.properties
@@ -34,6 +34,7 @@ jetty.password=password
jetty.keypassword=password
jetty.truststore=keystore
jetty.trustpassword=password
+jetty.excludeCipherSuites=SSL_RSA_WITH_DES_CBC_SHA,SSL_DHE_RSA_WITH_DES_CBC_SHA,SSL_DHE_DSS_WITH_DES_CBC_SHA,SSL_RSA_EXPORT_WITH_RC4_40_MD5,SSL_RSA_EXPORT_WITH_DES40_CBC_SHA,SSL_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA,SSL_DHE_RSA_WITH_3DES_EDE_CBC_SHA,SSL_DHE_DSS_WITH_3DES_EDE_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA256,TLS_DHE_DSS_WITH_AES_256_CBC_SHA256,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_DSS_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_DSS_WITH_AES_128_CBC_SHA256,TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_DSS_WITH_AES_128_CBC_SHA
# Azkaban Executor settings
executor.port=12321
build.gradle 53(+47 -6)
diff --git a/build.gradle b/build.gradle
index d8afcca..51e4a15 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,10 +1,14 @@
buildscript {
repositories {
mavenCentral()
+ maven {
+ url 'https://plugins.gradle.org/m2/'
+ }
}
dependencies {
classpath 'com.linkedin:gradle-dustjs-plugin:1.0.0'
classpath 'de.obqo.gradle:gradle-lesscss-plugin:1.0-1.3.3'
+ classpath 'net.ltgt.gradle:gradle-errorprone-plugin:0.0.8'
}
}
@@ -32,6 +36,11 @@ def cmdCaller = { commandln ->
subprojects {
apply plugin: 'java'
apply plugin: 'eclipse'
+ apply plugin: 'net.ltgt.errorprone'
+
+ configurations.errorprone {
+ resolutionStrategy.force 'com.google.errorprone:error_prone_core:2.0.5'
+ }
/**
* Gets the version name from the latest Git tag
@@ -78,6 +87,25 @@ project(':azkaban-common') {
all {
transitive = false
}
+ errorprone {
+ transitive = true
+ }
+ }
+
+ apply plugin: 'c'
+ model {
+ components {
+ main(NativeExecutableSpec) {
+ sources {
+ c {
+ source {
+ srcDir "src/main"
+ include "**/*.c"
+ }
+ }
+ }
+ }
+ }
}
dependencies {
@@ -111,14 +139,15 @@ project(':azkaban-common') {
compile('org.mortbay.jetty:jetty-util:6.1.26')
compile('org.slf4j:slf4j-api:1.6.1')
+ testCompile(project(':azkaban-test').sourceSets.test.output)
testCompile('junit:junit:4.11')
testCompile('org.hamcrest:hamcrest-all:1.3')
}
-
+
tasks.withType(JavaCompile) {
options.encoding = "UTF-8"
}
-
+
}
project(':azkaban-migration') {
@@ -126,6 +155,9 @@ project(':azkaban-migration') {
all {
transitive = false
}
+ errorprone {
+ transitive = true
+ }
}
dependencies {
@@ -174,6 +206,9 @@ project(':azkaban-webserver') {
generateRestli {
transitive = true
}
+ errorprone {
+ transitive = true
+ }
}
dependencies {
@@ -297,6 +332,9 @@ project(':azkaban-execserver') {
all {
transitive = false
}
+ errorprone {
+ transitive = true
+ }
}
dependencies {
@@ -312,7 +350,6 @@ project(':azkaban-execserver') {
compile('org.mortbay.jetty:jetty-util:6.1.26')
compile('org.codehaus.jackson:jackson-core-asl:1.9.5')
compile('org.codehaus.jackson:jackson-mapper-asl:1.9.5')
-
testCompile('junit:junit:4.11')
testCompile('org.hamcrest:hamcrest-all:1.3')
@@ -413,6 +450,10 @@ project(':azkaban-sql') {
project(':azkaban-test') {
apply plugin: 'distribution'
+ dependencies {
+ testCompile('junit:junit:4.11')
+ }
+
distributions {
animal {
baseName = 'test-animal'
@@ -478,11 +519,11 @@ project(':azkaban-test') {
}
}
- distZip.dependsOn build, animalDistZip, embeddedDistZip, embedded2DistZip,
+ distZip.dependsOn animalDistZip, embeddedDistZip, embedded2DistZip,
embedded3DistZip, embeddedBadDistZip, execpropstestDistZip,
exectest1DistZip, exectest2DistZip, logtestDistZip
- distTar.dependsOn build, animalDistTar, embeddedDistTar, embedded2DistTar,
+ distTar.dependsOn animalDistTar, embeddedDistTar, embedded2DistTar,
embedded3DistTar, embeddedBadDistTar, execpropstestDistTar,
exectest1DistTar, exectest2DistTar, logtestDistTar
}
@@ -570,5 +611,5 @@ distZip.dependsOn migrationDistZip, webserverDistZip, execserverDistZip, soloser
* Gradle wrapper task.
*/
task wrapper(type: Wrapper) {
- gradleVersion = '1.12'
+ gradleVersion = '2.7'
}
gradle.properties 2(+1 -1)
diff --git a/gradle.properties b/gradle.properties
index 6f7f687..ef3a7de 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,3 +1,3 @@
org.gradle.daemon=true
group=com.linkedin
-version=2.6.4
+version=2.7.0
gradle/wrapper/gradle-wrapper.jar 0(+0 -0)
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index 0087cd3..e8c6bf7 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 506745b..bbc82a1 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
-#Wed Jun 11 01:55:01 PDT 2014
+#Sat Sep 26 15:48:43 PDT 2015
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-1.12-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-2.7-bin.zip
gradlew 6(+1 -5)
diff --git a/gradlew b/gradlew
index 91a7e26..97fac78 100755
--- a/gradlew
+++ b/gradlew
@@ -42,11 +42,6 @@ case "`uname`" in
;;
esac
-# For Cygwin, ensure paths are in UNIX format before anything is touched.
-if $cygwin ; then
- [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
-fi
-
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
@@ -114,6 +109,7 @@ fi
if $cygwin ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
+ JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`