azkaban-uncached
Changes
eclipse-styles.xml 283(+283 -0)
README.md 4(+2 -2)
src/java/azkaban/executor/ExecutableFlow.java 164(+83 -81)
src/java/azkaban/executor/ExecutorMailer.java 200(+76 -124)
src/java/azkaban/flow/Flow.java 37(+37 -0)
src/java/azkaban/jmx/JmxExecutorManager.java 12(+12 -0)
src/java/azkaban/project/Project.java 28(+28 -0)
src/java/azkaban/user/User.java 75(+64 -11)
src/java/azkaban/user/XmlUserManager.java 11(+11 -0)
src/java/azkaban/utils/EmailMessage.java 25(+19 -6)
src/java/azkaban/utils/Utils.java 20(+12 -8)
src/java/azkaban/utils/WebUtils.java 2(+0 -2)
src/web/js/azkaban.schedule.svg.js 2(+0 -2)
Details
eclipse-styles.xml 283(+283 -0)
diff --git a/eclipse-styles.xml b/eclipse-styles.xml
new file mode 100644
index 0000000..07e74f7
--- /dev/null
+++ b/eclipse-styles.xml
@@ -0,0 +1,283 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<profiles version="12">
+<profile kind="CodeFormatterProfile" name="Azkaban" version="12">
+<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.disabling_tag" value="@formatter:off"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_block_boundaries" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_closing_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_field" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.use_on_off_tags" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_ellipsis" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_multiple_fields" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_conditional_expression" value="80"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_binary_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_array_initializer" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_package" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.continuation_indentation" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_binary_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_package" value="0"/>
+<setting id="org.eclipse.jdt.core.compiler.source" value="1.5"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_line_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.join_wrapped_lines" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_member_type" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.align_type_members_on_columns" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_unary_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.indent_parameter_description" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.lineSplit" value="2000"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_method" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indentation.size" value="4"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.enabling_tag" value="@formatter:on"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_assignment" value="0"/>
+<setting id="org.eclipse.jdt.core.compiler.problem.assertIdentifier" value="error"/>
+<setting id="org.eclipse.jdt.core.formatter.tabulation.char" value="tab"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_body" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_method" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_method_declaration" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_switch" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.preserve_white_space_between_code_and_line_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.compiler.problem.enumIdentifier" value="error"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_ellipsis" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_method_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.compact_else_if" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_field" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_constant" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.indent_root_tags" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.tabulation.size" value="4"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_empty_lines" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block_in_case" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.compiler.compliance" value="1.5"/>
+<setting id="org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_unary_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_binary_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_type" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode" value="enabled"/>
+<setting id="org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_label" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_javadoc_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.line_length" value="2000"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_package" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_import_groups" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_before_binary_operator" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_block" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.join_lines_in_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_compact_if" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_imports" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_html" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_source_code" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.compiler.codegen.targetPlatform" value="1.5"/>
+<setting id="org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_header" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_block_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_enum_constants" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_javadoc_boundaries" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_imports" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line" value="false"/>
+</profile>
+</profiles>
README.md 4(+2 -2)
diff --git a/README.md b/README.md
index 7375cbe..d0a972f 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,5 @@
-# Azkaban2
+## Azkaban2
-For all Azkaban Plugins documentation, please go to
+For Azkaban documentation, please go to
[Azkaban Project Site](http://azkaban.github.io/azkaban2/)
There is a google groups: [Azkaban Group](https://groups.google.com/forum/?fromgroups#!forum/azkaban-dev)
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 2bf471b..8c98086 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -389,12 +389,24 @@ public class FlowRunnerManager implements EventListener {
}
}
+ int numJobThreads = numJobThreadPerFlow;
+ if(options.getFlowParameters().containsKey("flow.num.job.threads")) {
+ try{
+ int numJobs = Integer.valueOf(options.getFlowParameters().get("flow.num.job.threads"));
+ if(numJobs > 0 && numJobs <= numJobThreads) {
+ numJobThreads = numJobs;
+ }
+ } catch (Exception e) {
+ throw new ExecutorManagerException("Failed to set the number of job threads " + options.getFlowParameters().get("flow.num.job.threads") + " for flow " + execId, e);
+ }
+ }
+
FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
runner.setFlowWatcher(watcher)
.setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
.setValidateProxyUser(validateProxyUser)
.setGlobalProps(globalProps)
- .setNumJobThreads(numJobThreadPerFlow)
+ .setNumJobThreads(numJobThreads)
.addListener(this);
// Check again.
@@ -634,6 +646,12 @@ public class FlowRunnerManager implements EventListener {
public int getNumExecutingFlows() {
return runningFlows.size();
}
+
+ public String getRunningFlowIds() {
+ List<Integer> ids = new ArrayList<Integer>(runningFlows.keySet());
+ Collections.sort(ids);
+ return ids.toString();
+ }
public int getNumExecutingJobs() {
int jobCount = 0;
@@ -643,5 +661,7 @@ public class FlowRunnerManager implements EventListener {
return jobCount;
}
+
+
}
src/java/azkaban/executor/ExecutableFlow.java 164(+83 -81)
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index a0b0626..5bae020 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -39,7 +39,7 @@ public class ExecutableFlow {
private int version;
private String executionPath;
-
+
private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();
private ArrayList<String> startNodes;
@@ -52,10 +52,10 @@ public class ExecutableFlow {
private Status flowStatus = Status.READY;
private String submitUser;
-
+
private HashSet<String> proxyUsers = new HashSet<String>();
private ExecutionOptions executionOptions;
-
+
public ExecutableFlow(Flow flow) {
this.projectId = flow.getProjectId();
this.scheduleId = -1;
@@ -63,79 +63,81 @@ public class ExecutableFlow {
this.version = flow.getVersion();
this.setFlow(flow);
}
-
+
public ExecutableFlow(int executionId, Flow flow) {
this.projectId = flow.getProjectId();
this.scheduleId = -1;
this.flowId = flow.getId();
this.version = flow.getVersion();
this.executionId = executionId;
-
+
this.setFlow(flow);
}
-
+
public ExecutableFlow() {
}
-
+
public long getUpdateTime() {
return updateTime;
}
-
+
public void setUpdateTime(long updateTime) {
this.updateTime = updateTime;
}
-
+
public List<ExecutableNode> getExecutableNodes() {
return new ArrayList<ExecutableNode>(executableNodes.values());
}
-
+
public ExecutableNode getExecutableNode(String id) {
return executableNodes.get(id);
}
-
+
public Collection<FlowProps> getFlowProps() {
return flowProps.values();
}
-
+
public void addAllProxyUsers(Collection<String> proxyUsers) {
this.proxyUsers.addAll(proxyUsers);
}
-
+
public Set<String> getProxyUsers() {
return new HashSet<String>(this.proxyUsers);
}
-
+
public void setExecutionOptions(ExecutionOptions options) {
executionOptions = options;
}
-
+
public ExecutionOptions getExecutionOptions() {
return executionOptions;
}
-
+
private void setFlow(Flow flow) {
executionOptions = new ExecutionOptions();
-
+
for (Node node: flow.getNodes()) {
String id = node.getId();
ExecutableNode exNode = new ExecutableNode(node, this);
executableNodes.put(id, exNode);
}
-
+
for (Edge edge: flow.getEdges()) {
ExecutableNode sourceNode = executableNodes.get(edge.getSourceId());
ExecutableNode targetNode = executableNodes.get(edge.getTargetId());
-
+
sourceNode.addOutNode(edge.getTargetId());
targetNode.addInNode(edge.getSourceId());
}
-
+
if (flow.getSuccessEmails() != null) {
executionOptions.setSuccessEmails(flow.getSuccessEmails());
}
if (flow.getFailureEmails() != null) {
executionOptions.setFailureEmails(flow.getFailureEmails());
}
+ executionOptions.setMailCreator(flow.getMailCreator());
+
flowProps.putAll(flow.getAllFlowProps());
}
@@ -148,10 +150,10 @@ public class ExecutableFlow {
}
}
}
-
+
return startNodes;
}
-
+
public List<String> getEndNodes() {
if (endNodes == null) {
endNodes = new ArrayList<String>();
@@ -161,10 +163,10 @@ public class ExecutableFlow {
}
}
}
-
+
return endNodes;
}
-
+
public boolean setNodeStatus(String nodeId, Status status) {
ExecutableNode exNode = executableNodes.get(nodeId);
if (exNode == null) {
@@ -179,18 +181,18 @@ public class ExecutableFlow {
if (exNode == null) {
return;
}
-
+
exNode.setExternalExecutionId(externalExecutionId);
}
-
+
public int getExecutionId() {
return executionId;
}
public void setExecutionId(int executionId) {
this.executionId = executionId;
-
- for(ExecutableNode node: executableNodes.values()) {
+
+ for (ExecutableNode node: executableNodes.values()) {
node.setExecutionId(executionId);
}
}
@@ -226,31 +228,31 @@ public class ExecutableFlow {
public void setExecutionPath(String executionPath) {
this.executionPath = executionPath;
}
-
+
public long getStartTime() {
return startTime;
}
-
+
public void setStartTime(long time) {
this.startTime = time;
}
-
+
public long getEndTime() {
return endTime;
}
-
+
public void setEndTime(long time) {
this.endTime = time;
}
-
+
public long getSubmitTime() {
return submitTime;
}
-
+
public void setSubmitTime(long time) {
this.submitTime = time;
}
-
+
public Status getStatus() {
return flowStatus;
}
@@ -258,16 +260,16 @@ public class ExecutableFlow {
public void setStatus(Status flowStatus) {
this.flowStatus = flowStatus;
}
-
- public Map<String,Object> toObject() {
+
+ public Map<String, Object> toObject() {
HashMap<String, Object> flowObj = new HashMap<String, Object>();
flowObj.put("type", "executableflow");
flowObj.put("executionId", executionId);
flowObj.put("executionPath", executionPath);
flowObj.put("flowId", flowId);
flowObj.put("projectId", projectId);
-
- if(scheduleId >= 0) {
+
+ if (scheduleId >= 0) {
flowObj.put("scheduleId", scheduleId);
}
flowObj.put("submitTime", submitTime);
@@ -276,16 +278,16 @@ public class ExecutableFlow {
flowObj.put("status", flowStatus.toString());
flowObj.put("submitUser", submitUser);
flowObj.put("version", version);
-
+
flowObj.put("executionOptions", this.executionOptions.toObject());
flowObj.put("version", version);
-
+
ArrayList<Object> props = new ArrayList<Object>();
for (FlowProps fprop: flowProps.values()) {
HashMap<String, Object> propObj = new HashMap<String, Object>();
String source = fprop.getSource();
String inheritedSource = fprop.getInheritedSource();
-
+
propObj.put("source", source);
if (inheritedSource != null) {
propObj.put("inherited", inheritedSource);
@@ -293,13 +295,13 @@ public class ExecutableFlow {
props.add(propObj);
}
flowObj.put("properties", props);
-
+
ArrayList<Object> nodes = new ArrayList<Object>();
for (ExecutableNode node: executableNodes.values()) {
nodes.add(node.toObject());
}
flowObj.put("nodes", nodes);
-
+
ArrayList<String> proxyUserList = new ArrayList<String>(proxyUsers);
flowObj.put("proxyUsers", proxyUserList);
@@ -307,57 +309,57 @@ public class ExecutableFlow {
}
public Object toUpdateObject(long lastUpdateTime) {
- Map<String, Object> updateData = new HashMap<String,Object>();
+ Map<String, Object> updateData = new HashMap<String, Object>();
updateData.put("execId", this.executionId);
updateData.put("status", this.flowStatus.getNumVal());
updateData.put("startTime", this.startTime);
updateData.put("endTime", this.endTime);
updateData.put("updateTime", this.updateTime);
-
- List<Map<String,Object>> updatedNodes = new ArrayList<Map<String,Object>>();
+
+ List<Map<String, Object>> updatedNodes = new ArrayList<Map<String, Object>>();
for (ExecutableNode node: executableNodes.values()) {
-
+
if (node.getUpdateTime() > lastUpdateTime) {
- Map<String, Object> updatedNodeMap = new HashMap<String,Object>();
+ Map<String, Object> updatedNodeMap = new HashMap<String, Object>();
updatedNodeMap.put("jobId", node.getJobId());
updatedNodeMap.put("status", node.getStatus().getNumVal());
updatedNodeMap.put("startTime", node.getStartTime());
updatedNodeMap.put("endTime", node.getEndTime());
updatedNodeMap.put("updateTime", node.getUpdateTime());
updatedNodeMap.put("attempt", node.getAttempt());
-
+
if (node.getAttempt() > 0) {
- ArrayList<Map<String,Object>> pastAttempts = new ArrayList<Map<String,Object>>();
+ ArrayList<Map<String, Object>> pastAttempts = new ArrayList<Map<String, Object>>();
for (Attempt attempt: node.getPastAttemptList()) {
pastAttempts.add(attempt.toObject());
}
updatedNodeMap.put("pastAttempts", pastAttempts);
}
-
+
updatedNodes.add(updatedNodeMap);
}
}
-
+
updateData.put("nodes", updatedNodes);
return updateData;
}
-
+
@SuppressWarnings("unchecked")
public void applyUpdateObject(Map<String, Object> updateData) {
- List<Map<String,Object>> updatedNodes = (List<Map<String,Object>>)updateData.get("nodes");
- for (Map<String,Object> node: updatedNodes) {
+ List<Map<String, Object>> updatedNodes = (List<Map<String, Object>>)updateData.get("nodes");
+ for (Map<String, Object> node: updatedNodes) {
String jobId = (String)node.get("jobId");
Status status = Status.fromInteger((Integer)node.get("status"));
long startTime = JSONUtils.getLongFromObject(node.get("startTime"));
long endTime = JSONUtils.getLongFromObject(node.get("endTime"));
long updateTime = JSONUtils.getLongFromObject(node.get("updateTime"));
-
+
ExecutableNode exNode = executableNodes.get(jobId);
exNode.setEndTime(endTime);
exNode.setStartTime(startTime);
exNode.setUpdateTime(updateTime);
exNode.setStatus(status);
-
+
int attempt = 0;
if (node.containsKey("attempt")) {
attempt = (Integer)node.get("attempt");
@@ -365,22 +367,22 @@ public class ExecutableFlow {
exNode.updatePastAttempts((List<Object>)node.get("pastAttempts"));
}
}
-
+
exNode.setAttempt(attempt);
}
-
+
this.flowStatus = Status.fromInteger((Integer)updateData.get("status"));
-
+
this.startTime = JSONUtils.getLongFromObject(updateData.get("startTime"));
this.endTime = JSONUtils.getLongFromObject(updateData.get("endTime"));
this.updateTime = JSONUtils.getLongFromObject(updateData.get("updateTime"));
}
-
+
@SuppressWarnings("unchecked")
public static ExecutableFlow createExecutableFlowFromObject(Object obj) {
ExecutableFlow exFlow = new ExecutableFlow();
-
- HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
+
+ HashMap<String, Object> flowObj = (HashMap<String, Object>)obj;
exFlow.executionId = (Integer)flowObj.get("executionId");
exFlow.executionPath = (String)flowObj.get("executionPath");
exFlow.flowId = (String)flowObj.get("flowId");
@@ -394,7 +396,7 @@ public class ExecutableFlow {
exFlow.flowStatus = Status.valueOf((String)flowObj.get("status"));
exFlow.submitUser = (String)flowObj.get("submitUser");
exFlow.version = (Integer)flowObj.get("version");
-
+
if (flowObj.containsKey("executionOptions")) {
exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj.get("executionOptions"));
}
@@ -402,7 +404,7 @@ public class ExecutableFlow {
// for backawards compatibility should remove in a few versions.
exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj);
}
-
+
// Copy nodes
List<Object> nodes = (List<Object>)flowObj.get("nodes");
for (Object nodeObj: nodes) {
@@ -411,57 +413,57 @@ public class ExecutableFlow {
}
List<Object> properties = (List<Object>)flowObj.get("properties");
- for (Object propNode : properties) {
+ for (Object propNode: properties) {
HashMap<String, Object> fprop = (HashMap<String, Object>)propNode;
String source = (String)fprop.get("source");
String inheritedSource = (String)fprop.get("inherited");
-
+
FlowProps flowProps = new FlowProps(inheritedSource, source);
exFlow.flowProps.put(source, flowProps);
}
-
- if(flowObj.containsKey("proxyUsers")) {
- ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get("proxyUsers");
+
+ if (flowObj.containsKey("proxyUsers")) {
+ ArrayList<String> proxyUserList = (ArrayList<String>)flowObj.get("proxyUsers");
exFlow.addAllProxyUsers(proxyUserList);
}
-
+
return exFlow;
}
-
+
@SuppressWarnings("unchecked")
public void updateExecutableFlowFromObject(Object obj) {
- HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
+ HashMap<String, Object> flowObj = (HashMap<String, Object>)obj;
submitTime = JSONUtils.getLongFromObject(flowObj.get("submitTime"));
startTime = JSONUtils.getLongFromObject(flowObj.get("startTime"));
endTime = JSONUtils.getLongFromObject(flowObj.get("endTime"));
flowStatus = Status.valueOf((String)flowObj.get("status"));
-
+
List<Object> nodes = (List<Object>)flowObj.get("nodes");
for (Object nodeObj: nodes) {
- HashMap<String, Object> nodeHash= (HashMap<String, Object>)nodeObj;
+ HashMap<String, Object> nodeHash = (HashMap<String, Object>)nodeObj;
String nodeId = (String)nodeHash.get("id");
ExecutableNode node = executableNodes.get(nodeId);
if (nodeId == null) {
throw new RuntimeException("Node " + nodeId + " doesn't exist in flow.");
}
-
+
node.updateNodeFromObject(nodeObj);
}
}
-
+
public Set<String> getSources() {
HashSet<String> set = new HashSet<String>();
for (ExecutableNode exNode: executableNodes.values()) {
set.add(exNode.getJobPropsSource());
}
-
+
for (FlowProps props: flowProps.values()) {
set.add(props.getSource());
}
return set;
}
-
+
public String getSubmitUser() {
return submitUser;
}
@@ -469,7 +471,7 @@ public class ExecutableFlow {
public void setSubmitUser(String submitUser) {
this.submitUser = submitUser;
}
-
+
public int getVersion() {
return version;
}
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index 9f656e1..1461dcc 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -24,6 +24,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import azkaban.executor.mail.DefaultMailCreator;
+
/**
* Execution options for submitted flows and scheduled flows
*/
@@ -43,6 +45,7 @@ public class ExecutionOptions {
private Integer pipelineExecId = null;
private Integer queueLevel = 0;
private String concurrentOption = CONCURRENT_OPTION_IGNORE;
+ private String mailCreator = DefaultMailCreator.DEFAULT_MAIL_CREATOR;
private Map<String, String> flowParameters = new HashMap<String, String>();
public enum FailureAction {
@@ -56,7 +59,7 @@ public class ExecutionOptions {
private Set<String> initiallyDisabledJobs = new HashSet<String>();
public void setFlowParameters(Map<String,String> flowParam) {
- flowParameters.get(flowParam);
+ flowParameters.putAll(flowParam);
}
public Map<String,String> getFlowParameters() {
@@ -123,10 +126,18 @@ public class ExecutionOptions {
this.concurrentOption = concurrentOption;
}
+ public void setMailCreator(String mailCreator) {
+ this.mailCreator = mailCreator;
+ }
+
public String getConcurrentOption() {
return concurrentOption;
}
+ public String getMailCreator() {
+ return mailCreator;
+ }
+
public Integer getPipelineLevel() {
return pipelineLevel;
}
@@ -168,6 +179,7 @@ public class ExecutionOptions {
flowOptionObj.put("pipelineExecId", pipelineExecId);
flowOptionObj.put("queueLevel", queueLevel);
flowOptionObj.put("concurrentOption", concurrentOption);
+ flowOptionObj.put("mailCreator", mailCreator);
flowOptionObj.put("disabled", initiallyDisabledJobs);
flowOptionObj.put("failureEmailsOverride", failureEmailsOverride);
flowOptionObj.put("successEmailsOverride", successEmailsOverride);
@@ -196,6 +208,9 @@ public class ExecutionOptions {
if (optionsMap.containsKey("concurrentOption")) {
options.concurrentOption = (String)optionsMap.get("concurrentOption");
}
+ if (optionsMap.containsKey("mailCreator")) {
+ options.mailCreator = (String)optionsMap.get("mailCreator");
+ }
if (optionsMap.containsKey("disabled")) {
options.initiallyDisabledJobs = new HashSet<String>((List<String>)optionsMap.get("disabled"));
}
src/java/azkaban/executor/ExecutorMailer.java 200(+76 -124)
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index 2d9d85b..1be17ca 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -23,161 +23,113 @@ import javax.mail.MessagingException;
import org.apache.log4j.Logger;
-import azkaban.executor.ExecutionOptions.FailureAction;
-import azkaban.utils.AbstractMailer;
+import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.executor.mail.MailCreator;
import azkaban.utils.EmailMessage;
import azkaban.utils.Props;
import azkaban.utils.Utils;
-public class ExecutorMailer extends AbstractMailer {
+public class ExecutorMailer {
private static Logger logger = Logger.getLogger(ExecutorMailer.class);
-
+
private boolean testMode = false;
-
+
+ private String clientHostname;
+ private String clientPortNumber;
+
+ private String mailHost;
+ private String mailUser;
+ private String mailPassword;
+ private String mailSender;
+ private String azkabanName;
+
public ExecutorMailer(Props props) {
- super(props);
+ this.azkabanName = props.getString("azkaban.name", "azkaban");
+ this.mailHost = props.getString("mail.host", "localhost");
+ this.mailUser = props.getString("mail.user", "");
+ this.mailPassword = props.getString("mail.password", "");
+ this.mailSender = props.getString("mail.sender", "");
+ int mailTimeout = props.getInt("mail.timeout.millis", 10000);
+ EmailMessage.setTimeout(mailTimeout);
+ int connectionTimeout = props.getInt("mail.connection.timeout.millis", 10000);
+ EmailMessage.setConnectionTimeout(connectionTimeout);
+
+ this.clientHostname = props.getString("jetty.hostname", "localhost");
+ this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
+
testMode = props.getBoolean("test.mode", false);
}
-
+
public void sendFirstErrorMessage(ExecutableFlow flow) {
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+
ExecutionOptions option = flow.getExecutionOptions();
- List<String> emailList = option.getDisabledJobs();
- int execId = flow.getExecutionId();
-
- if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = super.createEmailMessage(
- "Flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName(),
- "text/html",
- emailList);
-
- message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + getAzkabanName() + "</h2>");
-
- if (option.getFailureAction() == FailureAction.CANCEL_ALL) {
- message.println("This flow is set to cancel all currently running jobs.");
- }
- else if (option.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE){
- message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
- }
- else {
- message.println("This flow is set to complete all currently running jobs before stopping.");
- }
-
- message.println("<table>");
- message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
- message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
- message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
- message.println("</table>");
- message.println("");
- String executionUrl = super.getReferenceURL() + "executor?" + "execid=" + execId;
- message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-
- message.println("");
- message.println("<h3>Reason</h3>");
- List<String> failedJobs = findFailedJobs(flow);
- message.println("<ul>");
- for (String jobId : failedJobs) {
- message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
- }
-
- message.println("</ul>");
-
- if (!testMode) {
- try {
- message.sendEmail();
- } catch (MessagingException e) {
- logger.error("Email message send failed" , e);
- }
+
+ MailCreator mailCreator = DefaultMailCreator.getCreator(option.getMailCreator());
+
+ logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
+
+ boolean mailCreated = mailCreator.createFirstErrorMessage(flow, message, azkabanName, clientHostname, clientPortNumber);
+
+ if (mailCreated && !testMode) {
+ try {
+ message.sendEmail();
+ } catch (MessagingException e) {
+ logger.error("Email message send failed", e);
}
}
}
-
- public void sendErrorEmail(ExecutableFlow flow, String ... extraReasons) {
+
+ public void sendErrorEmail(ExecutableFlow flow, String... extraReasons) {
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+
ExecutionOptions option = flow.getExecutionOptions();
-
- List<String> emailList = option.getFailureEmails();
- int execId = flow.getExecutionId();
-
- if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = super.createEmailMessage(
- "Flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName(),
- "text/html",
- emailList);
-
- message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName() + "</h2>");
- message.println("<table>");
- message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
- message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
- message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
- message.println("</table>");
- message.println("");
-
- String executionUrl = super.getReferenceURL() + "executor?" + "execid=" + execId;
- message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-
- message.println("");
- message.println("<h3>Reason</h3>");
- List<String> failedJobs = findFailedJobs(flow);
- message.println("<ul>");
- for (String jobId : failedJobs) {
- message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
- }
- for (String reasons: extraReasons) {
- message.println("<li>" + reasons + "</li>");
- }
-
- message.println("</ul>");
-
- if (!testMode) {
- try {
- message.sendEmail();
- } catch (MessagingException e) {
- logger.error("Email message send failed" , e);
- }
+
+ MailCreator mailCreator = DefaultMailCreator.getCreator(option.getMailCreator());
+ logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
+
+ boolean mailCreated = mailCreator.createErrorEmail(flow, message, azkabanName, clientHostname, clientPortNumber, extraReasons);
+
+ if (mailCreated && !testMode) {
+ try {
+ message.sendEmail();
+ } catch (MessagingException e) {
+ logger.error("Email message send failed", e);
}
}
}
public void sendSuccessEmail(ExecutableFlow flow) {
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+
ExecutionOptions option = flow.getExecutionOptions();
- List<String> emailList = option.getSuccessEmails();
- int execId = flow.getExecutionId();
-
- if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = super.createEmailMessage(
- "Flow '" + flow.getFlowId() + "' has succeeded on " + getAzkabanName(),
- "text/html",
- emailList);
-
- message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + getAzkabanName() + "</h2>");
- message.println("<table>");
- message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
- message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
- message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
- message.println("</table>");
- message.println("");
- String executionUrl = super.getReferenceURL() + "executor?" + "execid=" + execId;
- message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-
- if (!testMode) {
- try {
- message.sendEmail();
- } catch (MessagingException e) {
- logger.error("Email message send failed" , e);
- }
+ MailCreator mailCreator = DefaultMailCreator.getCreator(option.getMailCreator());
+ logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
+
+ boolean mailCreated = mailCreator.createSuccessEmail(flow, message, azkabanName, clientHostname, clientPortNumber);
+
+ if (mailCreated && !testMode) {
+ try {
+ message.sendEmail();
+ } catch (MessagingException e) {
+ logger.error("Email message send failed", e);
}
}
}
-
- private List<String> findFailedJobs(ExecutableFlow flow) {
+
+ public static List<String> findFailedJobs(ExecutableFlow flow) {
ArrayList<String> failedJobs = new ArrayList<String>();
- for (ExecutableNode node: flow.getExecutableNodes()) {
+ for (ExecutableNode node : flow.getExecutableNodes()) {
if (node.getStatus() == Status.FAILED) {
failedJobs.add(node.getJobId());
}
}
-
+
return failedJobs;
}
}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 73fcf07..f512b4d 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -16,6 +16,7 @@
package azkaban.executor;
+import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
import java.net.URI;
@@ -69,6 +70,9 @@ public class ExecutorManager {
private long lastCleanerThreadCheckTime = -1;
private long lastThreadCheckTime = -1;
+ private String updaterStage = "not started";
+
+ File cacheDir;
public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
this.executorLoader = loader;
@@ -76,6 +80,7 @@ public class ExecutorManager {
executorHost = props.getString("executor.host", "localhost");
executorPort = props.getInt("executor.port");
+ cacheDir = new File(props.getString("cache.directory", "cache"));
mailer = new ExecutorMailer(props);
executingManager = new ExecutingManagerUpdaterThread();
executingManager.start();
@@ -97,6 +102,10 @@ public class ExecutorManager {
return executingManager.getState();
}
+ public String getExecutorThreadStage() {
+ return updaterStage;
+ }
+
public boolean isThreadActive() {
return executingManager.isAlive();
}
@@ -172,6 +181,15 @@ public class ExecutorManager {
return flows;
}
+ public String getRunningFlowIds() {
+ List<Integer> allIds = new ArrayList<Integer>();
+ for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ allIds.add(ref.getSecond().getExecutionId());
+ }
+ Collections.sort(allIds);
+ return allIds.toString();
+ }
+
public List<ExecutableFlow> getRecentlyFinishedFlows() {
return new ArrayList<ExecutableFlow>(recentlyFinished.values());
}
@@ -593,6 +611,8 @@ public class ExecutorManager {
try {
lastThreadCheckTime = System.currentTimeMillis();
+ updaterStage = "Starting update all flows.";
+
Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
ArrayList<ExecutableFlow> finalizeFlows = new ArrayList<ExecutableFlow>();
@@ -602,6 +622,10 @@ public class ExecutorManager {
List<Long> updateTimesList = new ArrayList<Long>();
List<Integer> executionIdsList = new ArrayList<Integer>();
+ ConnectionInfo connection = entry.getKey();
+
+ updaterStage = "Starting update flows on " + connection.getHost() + ":" + connection.getPort();
+
// We pack the parameters of the same host together before we query.
fillUpdateTimeAndExecId(entry.getValue(), executionIdsList, updateTimesList);
@@ -612,7 +636,7 @@ public class ExecutorManager {
ConnectorParams.EXEC_ID_LIST_PARAM,
JSONUtils.toJSON(executionIdsList));
- ConnectionInfo connection = entry.getKey();
+
Map<String, Object> results = null;
try {
results = callExecutorServer(connection.getHost(), connection.getPort(), ConnectorParams.UPDATE_ACTION, null, null, executionIds, updateTimes);
@@ -620,6 +644,9 @@ public class ExecutorManager {
logger.error(e);
for (ExecutableFlow flow: entry.getValue()) {
Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(flow.getExecutionId());
+
+ updaterStage = "Failed to get update. Doing some clean up for flow " + pair.getSecond().getExecutionId();
+
if (pair != null) {
ExecutionReference ref = pair.getFirst();
int numErrors = ref.getNumErrors();
@@ -642,6 +669,9 @@ public class ExecutorManager {
for (Map<String,Object> updateMap: executionUpdates) {
try {
ExecutableFlow flow = updateExecution(updateMap);
+
+ updaterStage = "Updated flow " + flow.getExecutionId();
+
if (isFinished(flow)) {
finishedFlows.add(flow);
finalizeFlows.add(flow);
@@ -659,21 +689,27 @@ public class ExecutorManager {
}
}
+ updaterStage = "Evicting old recently finished flows.";
+
evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
// Add new finished
for (ExecutableFlow flow: finishedFlows) {
if(flow.getScheduleId() >= 0 && flow.getStatus() == Status.SUCCEEDED){
- ScheduleStatisticManager.invalidateCache(flow.getScheduleId());
+ ScheduleStatisticManager.invalidateCache(flow.getScheduleId(), cacheDir);
}
recentlyFinished.put(flow.getExecutionId(), flow);
}
+ updaterStage = "Finalizing " + finalizeFlows.size() + " error flows.";
+
// Kill error flows
for (ExecutableFlow flow: finalizeFlows) {
finalizeFlows(flow);
}
}
+ updaterStage = "Updated all active flows. Waiting for next round.";
+
synchronized(this) {
try {
if (runningFlows.size() > 0) {
@@ -688,7 +724,7 @@ public class ExecutorManager {
}
catch (Exception e) {
logger.error(e);
- }
+ }
}
}
}
@@ -696,6 +732,7 @@ public class ExecutorManager {
private void finalizeFlows(ExecutableFlow flow) {
int execId = flow.getExecutionId();
+ updaterStage = "finalizing flow " + execId;
// First we check if the execution in the datastore is complete
try {
ExecutableFlow dsFlow;
@@ -703,15 +740,18 @@ public class ExecutorManager {
dsFlow = flow;
}
else {
+ updaterStage = "finalizing flow " + execId + " loading from db";
dsFlow = executorLoader.fetchExecutableFlow(execId);
// If it's marked finished, we're good. If not, we fail everything and then mark it finished.
if (!isFinished(dsFlow)) {
+ updaterStage = "finalizing flow " + execId + " failing the flow";
failEverything(dsFlow);
executorLoader.updateExecutableFlow(dsFlow);
}
}
+ updaterStage = "finalizing flow " + execId + " deleting active reference";
// Delete the executing reference.
if (flow.getEndTime() == -1) {
flow.setEndTime(System.currentTimeMillis());
@@ -719,6 +759,7 @@ public class ExecutorManager {
}
executorLoader.removeActiveExecutableReference(execId);
+ updaterStage = "finalizing flow " + execId + " cleaning from memory";
runningFlows.remove(execId);
recentlyFinished.put(execId, dsFlow);
} catch (ExecutorManagerException e) {
@@ -728,6 +769,7 @@ public class ExecutorManager {
// TODO append to the flow log that we forced killed this flow because the target no longer had
// the reference.
+ updaterStage = "finalizing flow " + execId + " alerting and emailing";
ExecutionOptions options = flow.getExecutionOptions();
// But we can definitely email them.
if(flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED)
@@ -1006,4 +1048,8 @@ public class ExecutorManager {
cleanOldExecutionLogs(DateTime.now().getMillis() - executionLogsRetentionMs);
}
}
+
+
+
+
}
diff --git a/src/java/azkaban/executor/mail/DefaultMailCreator.java b/src/java/azkaban/executor/mail/DefaultMailCreator.java
new file mode 100644
index 0000000..fcebcac
--- /dev/null
+++ b/src/java/azkaban/executor/mail/DefaultMailCreator.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.mail;
+
+import java.util.HashMap;
+import java.util.List;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.ExecutorMailer;
+import azkaban.utils.EmailMessage;
+import azkaban.utils.Utils;
+
+public class DefaultMailCreator implements MailCreator {
+ public static final String DEFAULT_MAIL_CREATOR = "default";
+ private static HashMap<String, MailCreator> registeredCreators = new HashMap<String, MailCreator>();
+ private static MailCreator defaultCreator;
+
+ public static void registerCreator(String name, MailCreator creator) {
+ registeredCreators.put(name, creator);
+ }
+
+ public static MailCreator getCreator(String name) {
+ MailCreator creator = registeredCreators.get(name);
+ if (creator == null) {
+ creator = defaultCreator;
+ }
+ return creator;
+ }
+
+ static {
+ defaultCreator = new DefaultMailCreator();
+ registerCreator(DEFAULT_MAIL_CREATOR, defaultCreator);
+ }
+
+ @Override
+ public boolean createFirstErrorMessage(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars) {
+
+ ExecutionOptions option = flow.getExecutionOptions();
+ List<String> emailList = option.getDisabledJobs();
+ int execId = flow.getExecutionId();
+
+ if (emailList != null && !emailList.isEmpty()) {
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+
+ message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
+
+ if (option.getFailureAction() == FailureAction.CANCEL_ALL) {
+ message.println("This flow is set to cancel all currently running jobs.");
+ }
+ else if (option.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE) {
+ message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
+ }
+ else {
+ message.println("This flow is set to complete all currently running jobs before stopping.");
+ }
+
+ message.println("<table>");
+ message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() + "</td></tr>");
+ message.println("<tr><td>End Time</td><td>" + flow.getEndTime() + "</td></tr>");
+ message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) + "</td></tr>");
+ message.println("</table>");
+ message.println("");
+ String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+ message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+
+ message.println("");
+ message.println("<h3>Reason</h3>");
+ List<String> failedJobs = ExecutorMailer.findFailedJobs(flow);
+ message.println("<ul>");
+ for (String jobId : failedJobs) {
+ message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>");
+ }
+
+ message.println("</ul>");
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean createErrorEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars) {
+
+ ExecutionOptions option = flow.getExecutionOptions();
+
+ List<String> emailList = option.getFailureEmails();
+ int execId = flow.getExecutionId();
+
+ if (emailList != null && !emailList.isEmpty()) {
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+
+ message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
+ message.println("<table>");
+ message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() + "</td></tr>");
+ message.println("<tr><td>End Time</td><td>" + flow.getEndTime() + "</td></tr>");
+ message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) + "</td></tr>");
+ message.println("</table>");
+ message.println("");
+ String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+ message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+
+ message.println("");
+ message.println("<h3>Reason</h3>");
+ List<String> failedJobs = ExecutorMailer.findFailedJobs(flow);
+ message.println("<ul>");
+ for (String jobId : failedJobs) {
+ message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>");
+ }
+ for (String reasons : vars) {
+ message.println("<li>" + reasons + "</li>");
+ }
+
+ message.println("</ul>");
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars) {
+
+ ExecutionOptions option = flow.getExecutionOptions();
+ List<String> emailList = option.getSuccessEmails();
+
+ int execId = flow.getExecutionId();
+
+ if (emailList != null && !emailList.isEmpty()) {
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
+
+ message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
+ message.println("<table>");
+ message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() + "</td></tr>");
+ message.println("<tr><td>End Time</td><td>" + flow.getEndTime() + "</td></tr>");
+ message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) + "</td></tr>");
+ message.println("</table>");
+ message.println("");
+ String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+ message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/src/java/azkaban/executor/mail/MailCreator.java b/src/java/azkaban/executor/mail/MailCreator.java
new file mode 100644
index 0000000..19bed3b
--- /dev/null
+++ b/src/java/azkaban/executor/mail/MailCreator.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.mail;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.utils.EmailMessage;
+
+public interface MailCreator {
+ public boolean createFirstErrorMessage(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars);
+ public boolean createErrorEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars);
+ public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars);
+}
src/java/azkaban/flow/Flow.java 37(+37 -0)
diff --git a/src/java/azkaban/flow/Flow.java b/src/java/azkaban/flow/Flow.java
index 31ac3cb..3abb05b 100644
--- a/src/java/azkaban/flow/Flow.java
+++ b/src/java/azkaban/flow/Flow.java
@@ -24,6 +24,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import azkaban.executor.mail.DefaultMailCreator;
+
public class Flow {
private final String id;
private int projectId;
@@ -40,8 +42,10 @@ public class Flow {
private List<String> failureEmail = new ArrayList<String>();
private List<String> successEmail = new ArrayList<String>();
+ private String mailCreator = DefaultMailCreator.DEFAULT_MAIL_CREATOR;
private ArrayList<String> errors;
private int version = -1;
+ private Map<String, Object> metadata = new HashMap<String, Object>();
private boolean isLayedOut = false;
@@ -106,10 +110,18 @@ public class Flow {
return successEmail;
}
+ public String getMailCreator() {
+ return mailCreator;
+ }
+
public List<String> getFailureEmails() {
return failureEmail;
}
+ public void setMailCreator(String mailCreator) {
+ this.mailCreator = mailCreator;
+ }
+
public void addSuccessEmails(Collection<String> emails) {
successEmail.addAll(emails);
}
@@ -226,10 +238,15 @@ public class Flow {
flowObj.put("edges", objectizeEdges());
flowObj.put("failure.email", failureEmail);
flowObj.put("success.email", successEmail);
+ flowObj.put("mailCreator", mailCreator);
flowObj.put("layedout", isLayedOut);
if (errors != null) {
flowObj.put("errors", errors);
}
+
+ if (metadata != null) {
+ flowObj.put("metadata", metadata);
+ }
return flowObj;
}
@@ -294,9 +311,18 @@ public class Flow {
List<Object> edgeList = (List<Object>)flowObject.get("edges");
List<Edge> edges = loadEdgeFromObjects(edgeList, nodes);
flow.addAllEdges(edges);
+
+ Map<String, Object> metadata = (Map<String, Object>)flowObject.get("metadata");
+
+ if (metadata != null) {
+ flow.setMetadata(metadata);
+ }
flow.failureEmail = (List<String>)flowObject.get("failure.email");
flow.successEmail = (List<String>)flowObject.get("success.email");
+ if (flowObject.containsKey("mailCreator")) {
+ flow.mailCreator = flowObject.get("mailCreator").toString();
+ }
return flow;
}
@@ -337,6 +363,17 @@ public class Flow {
return isLayedOut;
}
+ public Map<String, Object> getMetadata() {
+ if(metadata == null){
+ metadata = new HashMap<String, Object>();
+ }
+ return metadata;
+ }
+
+ public void setMetadata(Map<String, Object> metadata) {
+ this.metadata = metadata;
+ }
+
public void setLayedOut(boolean layedOut) {
this.isLayedOut = layedOut;
}
src/java/azkaban/jmx/JmxExecutorManager.java 12(+12 -0)
diff --git a/src/java/azkaban/jmx/JmxExecutorManager.java b/src/java/azkaban/jmx/JmxExecutorManager.java
index 123340d..37f52f8 100644
--- a/src/java/azkaban/jmx/JmxExecutorManager.java
+++ b/src/java/azkaban/jmx/JmxExecutorManager.java
@@ -21,6 +21,11 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
public String getExecutorThreadState() {
return manager.getExecutorThreadState().toString();
}
+
+ @Override
+ public String getExecutorThreadStage() {
+ return manager.getExecutorThreadStage();
+ }
@Override
public boolean isThreadActive() {
@@ -36,4 +41,11 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
public List<String> getPrimaryExecutorHostPorts() {
return new ArrayList<String>(manager.getPrimaryServerHosts());
}
+
+ @Override
+ public String getRunningFlows() {
+ return manager.getRunningFlowIds();
+ }
+
+
}
diff --git a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
index b4a3888..b29d00a 100644
--- a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -6,8 +6,14 @@ public interface JmxExecutorManagerMBean {
@DisplayName("OPERATION: getNumRunningFlows")
public int getNumRunningFlows();
+ @DisplayName("OPERATION: getRunningFlows")
+ public String getRunningFlows();
+
@DisplayName("OPERATION: getExecutorThreadState")
public String getExecutorThreadState();
+
+ @DisplayName("OPERATION: getExecutorThreadStage")
+ public String getExecutorThreadStage();
@DisplayName("OPERATION: isThreadActive")
public boolean isThreadActive();
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManager.java b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
index f4f59d3..3541140 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManager.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
@@ -54,4 +54,9 @@ public class JmxFlowRunnerManager implements JmxFlowRunnerManagerMBean {
return manager.getNumExecutingJobs();
}
+ @Override
+ public String getRunningFlows() {
+ return manager.getRunningFlowIds();
+ }
+
}
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
index 47c6a02..ed509ef 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
@@ -25,6 +25,9 @@ public interface JmxFlowRunnerManagerMBean {
@DisplayName("OPERATION: getNumExecutingFlows")
public int getNumExecutingFlows();
+ @DisplayName("OPERATION: getRunningFlows")
+ public String getRunningFlows();
+
@DisplayName("OPERATION: getTotalNumRunningJobs")
public int countTotalNumRunningJobs();
}
diff --git a/src/java/azkaban/jmx/JmxScheduler.java b/src/java/azkaban/jmx/JmxScheduler.java
index 73bcf98..8efc576 100644
--- a/src/java/azkaban/jmx/JmxScheduler.java
+++ b/src/java/azkaban/jmx/JmxScheduler.java
@@ -28,4 +28,9 @@ public class JmxScheduler implements JmxSchedulerMBean {
public Boolean isThreadActive() {
return manager.isThreadActive();
}
+
+ @Override
+ public String getScheduleThreadStage() {
+ return manager.getThreadStage();
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/jmx/JmxSchedulerMBean.java b/src/java/azkaban/jmx/JmxSchedulerMBean.java
index 19a8b70..4de54ef 100644
--- a/src/java/azkaban/jmx/JmxSchedulerMBean.java
+++ b/src/java/azkaban/jmx/JmxSchedulerMBean.java
@@ -4,6 +4,9 @@ public interface JmxSchedulerMBean {
@DisplayName("OPERATION: getScheduleThreadState")
String getScheduleThreadState();
+ @DisplayName("OPERATION: getScheduleThreadStage")
+ String getScheduleThreadStage();
+
@DisplayName("OPERATION: getNextScheduleTime")
Long getNextScheduleTime();
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 88f50c5..3d6362a 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -269,7 +269,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
}
}
- @SuppressWarnings("resource")
private void uploadProjectFile(Connection connection, Project project, int version, String filetype, String filename, File localFile, String uploader) throws ProjectManagerException {
QueryRunner runner = new QueryRunner();
long updateTime = System.currentTimeMillis();
@@ -359,7 +358,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
return handler;
}
- @SuppressWarnings("resource")
private ProjectFileHandler getUploadedFile(Connection connection, int projectId, int version) throws ProjectManagerException {
QueryRunner runner = new QueryRunner();
ProjectVersionResultHandler pfHandler = new ProjectVersionResultHandler();
@@ -697,6 +695,41 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
}
}
+ @Override
+ public void updateFlow(Project project, int version, Flow flow) throws ProjectManagerException {
+ logger.info("Uploading flows");
+ Connection connection = getConnection();
+
+ try {
+ QueryRunner runner = new QueryRunner();
+ String json = JSONUtils.toJSON(flow.toObject());
+ byte[] stringData = json.getBytes("UTF-8");
+ byte[] data = stringData;
+
+ logger.info("UTF-8 size:" + data.length);
+ if (defaultEncodingType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(stringData);
+ }
+
+ logger.info("Flow upload " + flow.getId() + " is byte size " + data.length);
+ final String UPDATE_FLOW = "UPDATE project_flows SET encoding_type=?,json=? WHERE project_id=? AND version=? AND flow_id=?";
+ try {
+ runner.update(connection, UPDATE_FLOW, defaultEncodingType.getNumVal(), data, project.getId(), version, flow.getId());
+ } catch (SQLException e) {
+ e.printStackTrace();
+ throw new ProjectManagerException("Error inserting flow " + flow.getId(), e);
+ }
+ connection.commit();
+ } catch (IOException e) {
+ throw new ProjectManagerException("Flow Upload failed.", e);
+ } catch (SQLException e) {
+ throw new ProjectManagerException("Flow Upload failed commit.", e);
+ }
+ finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+
public EncodingType getDefaultEncodingType() {
return defaultEncodingType;
}
src/java/azkaban/project/Project.java 28(+28 -0)
diff --git a/src/java/azkaban/project/Project.java b/src/java/azkaban/project/Project.java
index b1dda6a..76bf207 100644
--- a/src/java/azkaban/project/Project.java
+++ b/src/java/azkaban/project/Project.java
@@ -45,6 +45,7 @@ public class Project {
private LinkedHashMap<String, Permission> groupPermissionMap = new LinkedHashMap<String, Permission>();
private Map<String, Flow> flows = null;
private HashSet<String> proxyUsers = new HashSet<String>();
+ private Map<String, Object> metadata = new HashMap<String, Object>();
public Project(int id, String name) {
this.id = id;
@@ -67,6 +68,10 @@ public class Project {
return flows.get(flowId);
}
+ public Map<String, Flow> getFlowMap() {
+ return flows;
+ }
+
public List<Flow> getFlows() {
List<Flow> retFlow = null;
if (flows != null) {
@@ -218,6 +223,10 @@ public class Project {
userPermissionMap.remove(userId);
}
+ public void clearUserPermission() {
+ userPermissionMap.clear();
+ }
+
public long getCreateTimestamp() {
return createTimestamp;
}
@@ -251,6 +260,10 @@ public class Project {
if (source != null) {
projectObject.put("source", source);
}
+
+ if (metadata != null) {
+ projectObject.put("metadata", metadata);
+ }
ArrayList<Map<String, Object>> users = new ArrayList<Map<String, Object>>();
for (Map.Entry<String, Permission> entry : userPermissionMap.entrySet()) {
@@ -282,6 +295,7 @@ public class Project {
Boolean active = (Boolean)projectObject.get("active");
active = active == null ? true : active;
int version = (Integer)projectObject.get("version");
+ Map<String, Object> metadata = (Map<String, Object>)projectObject.get("metadata");
Project project = new Project(id, name);
project.setVersion(version);
@@ -294,6 +308,9 @@ public class Project {
if (source != null) {
project.setSource(source);
}
+ if (metadata != null) {
+ project.setMetadata(metadata);
+ }
List<Map<String, Object>> users = (List<Map<String, Object>>) projectObject
.get("users");
@@ -403,6 +420,17 @@ public class Project {
this.source = source;
}
+ public Map<String, Object> getMetadata() {
+ if(metadata == null){
+ metadata = new HashMap<String, Object>();
+ }
+ return metadata;
+ }
+
+ protected void setMetadata(Map<String, Object> metadata) {
+ this.metadata = metadata;
+ }
+
public int getId() {
return id;
}
diff --git a/src/java/azkaban/project/ProjectLoader.java b/src/java/azkaban/project/ProjectLoader.java
index 9c50051..986ba55 100644
--- a/src/java/azkaban/project/ProjectLoader.java
+++ b/src/java/azkaban/project/ProjectLoader.java
@@ -138,6 +138,9 @@ public interface ProjectLoader {
* @throws ProjectManagerException
*/
public void changeProjectVersion(Project project, int version, String user) throws ProjectManagerException;
+
+
+ public void updateFlow(Project project, int version, Flow flow) throws ProjectManagerException;
/**
* Uploads all computed flows
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index febdf3b..8a83fcb 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -368,6 +368,10 @@ public class ProjectManager {
projectLoader.cleanOlderProjectVersion(project.getId(), project.getVersion() - projectVersionRetention);
}
+ public void updateFlow(Project project, Flow flow) throws ProjectManagerException {
+ projectLoader.updateFlow(project, flow.getVersion(), flow);
+ }
+
private File unzipFile(File archiveFile) throws IOException {
ZipFile zipfile = new ZipFile(archiveFile);
File unzipped = Utils.createTempDir(tempDir);
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index f66ae0f..fededda 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -70,6 +70,7 @@ public class ScheduleManager {
// Used for mbeans to query Scheduler status
private long lastCheckTime = -1;
private long nextWakupTime = -1;
+ private String runnerStage = "not started";
/**
* Give the schedule manager a loader class that will properly load the
@@ -159,7 +160,6 @@ public class ScheduleManager {
* @param id
*/
public synchronized void removeSchedule(Schedule sched) {
-
Pair<Integer,String> identityPairMap = sched.getScheduleIdentityPair();
Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
if(schedules != null) {
@@ -372,6 +372,8 @@ public class ScheduleManager {
synchronized (this) {
try {
lastCheckTime = System.currentTimeMillis();
+
+ runnerStage = "Starting schedule scan.";
// TODO clear up the exception handling
Schedule s = schedules.peek();
@@ -380,6 +382,7 @@ public class ScheduleManager {
// there's something to do. Most likely there will not be.
try {
logger.info("Nothing scheduled to run. Checking again soon.");
+ runnerStage = "Waiting for next round scan.";
nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
this.wait(TIMEOUT_MS);
} catch (InterruptedException e) {
@@ -391,6 +394,8 @@ public class ScheduleManager {
// Run flow. The invocation of flows should be quick.
Schedule runningSched = schedules.poll();
+ runnerStage = "Ready to run schedule " + runningSched.toString();
+
logger.info("Scheduler ready to run " + runningSched.toString());
// Execute the flow here
try {
@@ -406,7 +411,7 @@ public class ScheduleManager {
logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
}
-
+
// Create ExecutableFlow
ExecutableFlow exflow = new ExecutableFlow(flow);
System.out.println("ScheduleManager: creating schedule: " +runningSched.getScheduleId());
@@ -428,6 +433,9 @@ public class ScheduleManager {
flowOptions.setSuccessEmails(flow.getSuccessEmails());
}
+ runnerStage = "Submitting flow " + exflow.getFlowId();
+ flowOptions.setMailCreator(flow.getMailCreator());
+
try {
executorManager.submitExecutableFlow(exflow);
logger.info("Scheduler has invoked " + exflow.getExecutionId());
@@ -443,6 +451,7 @@ public class ScheduleManager {
SlaOptions slaOptions = runningSched.getSlaOptions();
if(slaOptions != null) {
logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
+ runnerStage = "Submitting SLA checkings for " + runningSched.getFlowName();
// submit flow slas
List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
for(SlaSetting set : slaOptions.getSettings()) {
@@ -472,6 +481,7 @@ public class ScheduleManager {
logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
}
+ runnerStage = "Done running schedule for " + runningSched.toString();
removeRunnerSchedule(runningSched);
// Immediately reschedule if it's possible. Let
@@ -485,6 +495,7 @@ public class ScheduleManager {
removeSchedule(runningSched);
}
} else {
+ runnerStage = "Waiting for next round scan.";
// wait until flow run
long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
try {
@@ -542,4 +553,9 @@ public class ScheduleManager {
public boolean isThreadActive() {
return runner.isAlive();
}
+
+ public String getThreadStage() {
+ return runnerStage;
+ }
+
}
diff --git a/src/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
index 6f324ef..beabbbf 100644
--- a/src/java/azkaban/scheduler/ScheduleStatisticManager.java
+++ b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -32,10 +32,13 @@ import azkaban.webapp.AzkabanWebServer;
public class ScheduleStatisticManager {
private static HashMap<Integer, Object> cacheLock = new HashMap<Integer, Object>();
- private static File cacheDirectory = new File("cache/schedule-statistics");
+ private static File cacheDirectory;
private static final int STAT_NUMBERS = 10;
public static Map<String, Object> getStatistics(int scheduleId, AzkabanWebServer server) {
+ if (cacheDirectory == null) {
+ setCacheFolder(new File(server.getServerProps().getString("cache.directory", "cache")));
+ }
Map<String, Object> data = loadCache(scheduleId);
if (data != null) {
return data;
@@ -90,7 +93,8 @@ public class ScheduleStatisticManager {
return data;
}
- public static void invalidateCache(int scheduleId) {
+ public static void invalidateCache(int scheduleId, File cacheDir) {
+ setCacheFolder(cacheDir);
// This should be silent and not fail
try {
Object lock = getLock(scheduleId);
@@ -163,4 +167,10 @@ public class ScheduleStatisticManager {
cacheLock.remove(scheduleId);
}
}
+
+ private static void setCacheFolder(File cacheDir) {
+ if (cacheDirectory == null) {
+ cacheDirectory = new File(cacheDir, "schedule-statistics");
+ }
+ }
}
src/java/azkaban/user/User.java 75(+64 -11)
diff --git a/src/java/azkaban/user/User.java b/src/java/azkaban/user/User.java
index 51cf4a7..2262d92 100644
--- a/src/java/azkaban/user/User.java
+++ b/src/java/azkaban/user/User.java
@@ -23,17 +23,42 @@ import java.util.Set;
public class User {
private final String userid;
+ private String email = "";
private Set<String> roles = new HashSet<String>();
private Set<String> groups = new HashSet<String>();
-
+ private UserPermissions userPermissions;
+
public User(String userid) {
this.userid = userid;
}
-
+
public String getUserId() {
return userid;
}
+ public void setEmail(String email) {
+ this.email = email;
+ }
+
+ public String getEmail() {
+ return email;
+ }
+
+ public void setPermissions(UserPermissions checker) {
+ this.userPermissions = checker;
+ }
+
+ public UserPermissions getPermissions() {
+ return userPermissions;
+ }
+
+ public boolean hasPermission(String permission) {
+ if (userPermissions == null) {
+ return false;
+ }
+ return this.userPermissions.hasPermission(permission);
+ }
+
public List<String> getGroups() {
return new ArrayList<String>(groups);
}
@@ -41,27 +66,27 @@ public class User {
public void clearGroup() {
groups.clear();
}
-
+
public void addGroup(String name) {
groups.add(name);
}
-
+
public boolean isInGroup(String group) {
return this.groups.contains(group);
}
-
+
public List<String> getRoles() {
return new ArrayList<String>(roles);
}
-
+
public void addRole(String role) {
this.roles.add(role);
}
-
+
public boolean hasRole(String role) {
return roles.contains(role);
}
-
+
public String toString() {
String groupStr = "[";
for (String group: groups) {
@@ -70,7 +95,7 @@ public class User {
groupStr += "]";
return userid + ": " + groupStr;
}
-
+
@Override
public int hashCode() {
final int prime = 31;
@@ -87,12 +112,40 @@ public class User {
return false;
if (getClass() != obj.getClass())
return false;
- User other = (User) obj;
+ User other = (User)obj;
if (userid == null) {
if (other.userid != null)
return false;
- } else if (!userid.equals(other.userid))
+ }
+ else if (!userid.equals(other.userid))
return false;
return true;
}
+
+ public static interface UserPermissions {
+ public boolean hasPermission(String permission);
+ public void addPermission(String permission);
+ }
+
+ public static class DefaultUserPermission implements UserPermissions {
+ Set<String> permissions;
+
+ public DefaultUserPermission() {
+ this(new HashSet<String>());
+ }
+
+ public DefaultUserPermission(Set<String> permissions) {
+ this.permissions = permissions;
+ }
+
+ @Override
+ public boolean hasPermission(String permission) {
+ return permissions.contains(permission);
+ }
+
+ @Override
+ public void addPermission(String permission) {
+ permissions.add(permission);
+ }
+ }
}
src/java/azkaban/user/XmlUserManager.java 11(+11 -0)
diff --git a/src/java/azkaban/user/XmlUserManager.java b/src/java/azkaban/user/XmlUserManager.java
index 7e93b88..5b94cdd 100644
--- a/src/java/azkaban/user/XmlUserManager.java
+++ b/src/java/azkaban/user/XmlUserManager.java
@@ -33,6 +33,7 @@ import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
+import azkaban.user.User.UserPermissions;
import azkaban.utils.Props;
/**
@@ -262,6 +263,16 @@ public class XmlUserManager implements UserManager {
// Add all the roles the group has to the user
resolveGroupRoles(user);
+ user.setPermissions(new UserPermissions() {
+ @Override
+ public boolean hasPermission(String permission) {
+ return true;
+ }
+
+ @Override
+ public void addPermission(String permission) {
+ }
+ });
return user;
}
src/java/azkaban/utils/EmailMessage.java 25(+19 -6)
diff --git a/src/java/azkaban/utils/EmailMessage.java b/src/java/azkaban/utils/EmailMessage.java
index 6593fe4..bbcfdd1 100644
--- a/src/java/azkaban/utils/EmailMessage.java
+++ b/src/java/azkaban/utils/EmailMessage.java
@@ -48,7 +48,9 @@ public class EmailMessage {
private String _fromAddress;
private String _mimeType = "text/plain";
private StringBuffer _body = new StringBuffer();
-
+ private static int _mailTimeout = 10000;
+ private static int _connectionTimeout = 10000;
+
private ArrayList<BodyPart> _attachments = new ArrayList<BodyPart>();
public EmailMessage() {
@@ -60,7 +62,15 @@ public class EmailMessage {
_mailHost = host;
_mailPassword = password;
}
-
+
+ public static void setTimeout(int timeoutMillis) {
+ _mailTimeout = timeoutMillis;
+ }
+
+ public static void setConnectionTimeout(int timeoutMillis) {
+ _connectionTimeout = timeoutMillis;
+ }
+
public EmailMessage setMailHost(String host) {
_mailHost = host;
return this;
@@ -152,6 +162,8 @@ public class EmailMessage {
props.put("mail."+protocol+".auth", "true");
props.put("mail.user", _mailUser);
props.put("mail.password", _mailPassword);
+ props.put("mail."+protocol+".timeout", _mailTimeout);
+ props.put("mail."+protocol+".connectiontimeout", _connectionTimeout);
Session session = Session.getInstance(props, null);
Message message = new MimeMessage(session);
@@ -165,15 +177,16 @@ public class EmailMessage {
if (_attachments.size() > 0) {
MimeMultipart multipart = new MimeMultipart("related");
+
+ BodyPart messageBodyPart = new MimeBodyPart();
+ messageBodyPart.setContent(_body.toString(), _mimeType);
+ multipart.addBodyPart(messageBodyPart);
+
// Add attachments
for (BodyPart part : _attachments) {
multipart.addBodyPart(part);
}
- BodyPart messageBodyPart = new MimeBodyPart();
- messageBodyPart.setContent(_body.toString(), _mimeType);
- multipart.addBodyPart(messageBodyPart);
-
message.setContent(multipart);
} else {
message.setContent(_body.toString(), _mimeType);
diff --git a/src/java/azkaban/utils/PropsUtils.java b/src/java/azkaban/utils/PropsUtils.java
index 2071386..54993ee 100644
--- a/src/java/azkaban/utils/PropsUtils.java
+++ b/src/java/azkaban/utils/PropsUtils.java
@@ -24,7 +24,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.StringTokenizer;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
diff --git a/src/java/azkaban/utils/SplitterOutputStream.java b/src/java/azkaban/utils/SplitterOutputStream.java
index 799e2d7..896d900 100644
--- a/src/java/azkaban/utils/SplitterOutputStream.java
+++ b/src/java/azkaban/utils/SplitterOutputStream.java
@@ -54,15 +54,31 @@ public class SplitterOutputStream extends OutputStream {
@Override
public void flush() throws IOException {
+ IOException exception = null;
for (OutputStream output : outputs) {
- output.flush();
+ try {
+ output.flush();
+ } catch (IOException e) {
+ exception = e;
+ }
+ }
+ if (exception != null) {
+ throw exception;
}
}
@Override
public void close() throws IOException {
+ IOException exception = null;
for (OutputStream output : outputs) {
- output.close();
+ try {
+ output.close();
+ } catch (IOException e) {
+ exception = e;
+ }
+ }
+ if (exception != null) {
+ throw exception;
}
}
diff --git a/src/java/azkaban/utils/StringUtils.java b/src/java/azkaban/utils/StringUtils.java
index 127db0a..02bad64 100644
--- a/src/java/azkaban/utils/StringUtils.java
+++ b/src/java/azkaban/utils/StringUtils.java
@@ -17,7 +17,6 @@
package azkaban.utils;
import java.util.Collection;
-import java.util.List;
public class StringUtils {
public static final char SINGLE_QUOTE = '\'';
src/java/azkaban/utils/Utils.java 20(+12 -8)
diff --git a/src/java/azkaban/utils/Utils.java b/src/java/azkaban/utils/Utils.java
index 48d1d74..8e13ffe 100644
--- a/src/java/azkaban/utils/Utils.java
+++ b/src/java/azkaban/utils/Utils.java
@@ -116,6 +116,18 @@ public class Utils {
zOut.close();
}
+ public static void zipFolderContent(File folder, File output) throws IOException {
+ FileOutputStream out = new FileOutputStream(output);
+ ZipOutputStream zOut = new ZipOutputStream(out);
+ File[] files = folder.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ zipFile("", f, zOut);
+ }
+ }
+ zOut.close();
+ }
+
private static void zipFile(String path, File input, ZipOutputStream zOut) throws IOException {
if (input.isDirectory()) {
File[] files = input.listFiles();
@@ -277,12 +289,4 @@ public class Utils {
Method method = clazz.getDeclaredMethod(methodName, argTypes);
return method.invoke(null, args);
}
-
- public static void copyStream(InputStream input, OutputStream output) throws IOException {
- byte[] buffer = new byte[1024];
- int bytesRead;
- while ((bytesRead = input.read(buffer)) != -1) {
- output.write(buffer, 0, bytesRead);
- }
- }
}
src/java/azkaban/utils/WebUtils.java 2(+0 -2)
diff --git a/src/java/azkaban/utils/WebUtils.java b/src/java/azkaban/utils/WebUtils.java
index 64a4f65..95fc6aa 100644
--- a/src/java/azkaban/utils/WebUtils.java
+++ b/src/java/azkaban/utils/WebUtils.java
@@ -170,6 +170,4 @@ public class WebUtils {
else
return sizeBytes + " B";
}
-
-
}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 7e6355b..02a81aa 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -74,10 +74,11 @@ import azkaban.webapp.servlet.AzkabanServletContextListener;
import azkaban.webapp.servlet.AbstractAzkabanServlet;
import azkaban.webapp.servlet.ExecutorServlet;
+import azkaban.webapp.servlet.IndexRedirectServlet;
import azkaban.webapp.servlet.JMXHttpServlet;
import azkaban.webapp.servlet.ScheduleServlet;
import azkaban.webapp.servlet.HistoryServlet;
-import azkaban.webapp.servlet.IndexServlet;
+import azkaban.webapp.servlet.ProjectServlet;
import azkaban.webapp.servlet.ProjectManagerServlet;
import azkaban.webapp.servlet.ViewerPlugin;
import azkaban.webapp.session.SessionCache;
@@ -420,11 +421,13 @@ public class AzkabanWebServer extends AzkabanServer {
logger.info("Setting up web resource dir " + staticDir);
Context root = new Context(server, "/", Context.SESSIONS);
root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);
-
+
+ String defaultServletPath = azkabanSettings.getString("azkaban.default.servlet.path", "/index");
root.setResourceBase(staticDir);
- ServletHolder index = new ServletHolder(new IndexServlet());
+ ServletHolder indexRedirect = new ServletHolder(new IndexRedirectServlet(defaultServletPath));
+ root.addServlet(indexRedirect, "/");
+ ServletHolder index = new ServletHolder(new ProjectServlet());
root.addServlet(index, "/index");
- root.addServlet(index, "/");
ServletHolder staticServlet = new ServletHolder(new DefaultServlet());
root.addServlet(staticServlet, "/css/*");
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 67923ed..4b64bc4 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -760,6 +760,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
if (!options.isSuccessEmailsOverridden()) {
options.setSuccessEmails(flow.getSuccessEmails());
}
+ options.setMailCreator(flow.getMailCreator());
try {
String message = executorManager.submitExecutableFlow(exflow);
diff --git a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
index 5c8006c..896d93b 100644
--- a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
+++ b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
@@ -26,6 +26,7 @@ import javax.servlet.http.HttpServletRequest;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.mail.DefaultMailCreator;
public class HttpRequestUtils {
public static ExecutionOptions parseFlowOptions(HttpServletRequest req) throws ServletException {
@@ -52,7 +53,7 @@ public class HttpRequestUtils {
boolean override = getBooleanParam(req, "successEmailsOverride", false);
execOptions.setSuccessEmailsOverridden(override);
}
-
+
if (hasParam(req, "failureEmails")) {
String emails = getParam(req, "failureEmails");
if (!emails.isEmpty()) {
@@ -88,6 +89,11 @@ public class HttpRequestUtils {
execOptions.setPipelineLevel(queueLevel);
}
}
+ String mailCreator = DefaultMailCreator.DEFAULT_MAIL_CREATOR;
+ if (hasParam(req, "mailCreator")) {
+ mailCreator = getParam(req, "mailCreator");
+ execOptions.setMailCreator(mailCreator);
+ }
Map<String, String> flowParamGroup = getParamGroup(req, "flowOverride");
execOptions.setFlowParameters(flowParamGroup);
diff --git a/src/java/azkaban/webapp/servlet/IndexRedirectServlet.java b/src/java/azkaban/webapp/servlet/IndexRedirectServlet.java
new file mode 100644
index 0000000..f788b40
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/IndexRedirectServlet.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.webapp.servlet;
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import azkaban.webapp.session.Session;
+
+/**
+ * The main page
+ */
+public class IndexRedirectServlet extends LoginAbstractAzkabanServlet {
+ private static final long serialVersionUID = -1;
+ private String defaultServletPath;
+
+ public IndexRedirectServlet(String defaultServletPath) {
+ this.defaultServletPath = defaultServletPath;
+ if (this.defaultServletPath.isEmpty() || this.defaultServletPath.equals("/")) {
+ this.defaultServletPath = "/index";
+ }
+ }
+
+ @Override
+ protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ resp.sendRedirect(defaultServletPath);
+ }
+
+ @Override
+ protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ resp.sendRedirect(defaultServletPath);
+ }
+}
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 2b2aee9..b1d78c9 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -62,6 +62,7 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
contextType.put(".css", "text/css");
contextType.put(".png", "image/png");
contextType.put(".jpeg", "image/jpeg");
+ contextType.put(".gif", "image/gif");
contextType.put(".jpg", "image/jpeg");
}
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index e6b74ea..7e881dc 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -16,6 +16,8 @@
package azkaban.webapp.servlet;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -33,6 +35,7 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -64,7 +67,6 @@ import azkaban.user.Permission.Type;
import azkaban.user.User;
import azkaban.utils.JSONUtils;
import azkaban.utils.SplitterOutputStream;
-import azkaban.utils.Utils;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.session.Session;
@@ -416,7 +418,9 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
int loadAll = getIntParam(req, "loadAll");
// Cache file
- File cache = new File("cache/schedule-history/" + startTime + ".cache");
+ String cacheDir = getApplication().getServerProps().getString("cache.directory", "cache");
+ File cacheDirFile = new File(cacheDir, "schedule-history");
+ File cache = new File(cacheDirFile, startTime + ".cache");
cache.getParentFile().mkdirs();
if (useCache) {
@@ -427,8 +431,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
if (cacheExists) {
// Send the cache instead
- InputStream cacheInput = new FileInputStream(cache);
- Utils.copyStream(cacheInput, resp.getOutputStream());
+ InputStream cacheInput = new BufferedInputStream(new FileInputStream(cache));
+ IOUtils.copy(cacheInput, resp.getOutputStream());
// System.out.println("Using cache copy for " + start);
return;
}
@@ -441,8 +445,9 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
ExecutorManager executorManager = server.getExecutorManager();
history = executorManager.getExecutableFlows(null, null, null, 0, startTime, endTime, -1, -1);
} catch (ExecutorManagerException e) {
- // Return empty should suffice
+ logger.error(e);
}
+
HashMap<String, Object> ret = new HashMap<String, Object>();
List<HashMap<String, Object>> output = new ArrayList<HashMap<String, Object>>();
ret.put("items", output);
@@ -462,14 +467,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
//Create cache file
- File cacheTemp = new File("cache/schedule-history/" + startTime + ".tmp");
+ File cacheTemp = new File(cacheDirFile, startTime + ".tmp");
cacheTemp.createNewFile();
- OutputStream cacheOutput = new FileOutputStream(cacheTemp);
-
+ OutputStream cacheOutput = new BufferedOutputStream(new FileOutputStream(cacheTemp));
+ OutputStream outputStream = new SplitterOutputStream(cacheOutput, resp.getOutputStream());
// Write to both the cache file and web output
- JSONUtils.toJSON(ret, new SplitterOutputStream(cacheOutput, resp.getOutputStream()), false);
- // System.out.println("Writing cache file for " + start);
- // JSONUtils.toJSON(ret, new JSONCompressorOutputStream(resp.getOutputStream()), false);
+ JSONUtils.toJSON(ret, outputStream, false);
+ cacheOutput.close();
//Move cache file
synchronized (this) {
diff --git a/src/java/azkaban/webapp/servlet/velocity/index.vm b/src/java/azkaban/webapp/servlet/velocity/index.vm
index 4b6f32f..3ca2c05 100644
--- a/src/java/azkaban/webapp/servlet/velocity/index.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/index.vm
@@ -50,12 +50,12 @@
#if ($allProjects)
<h2>All Projects</h2>
<div class="section-sub-hd">
- <h4><a href="${context}/">My Projects</a></h4>
+ <h4><a href="${context}/index">My Projects</a></h4>
</div>
#else
<h2>My Projects</h2>
<div class="section-sub-hd">
- <h4><a href="${context}/?all">All Projects</a></h4>
+ <h4><a href="${context}/index?all">All Projects</a></h4>
</div>
#end
<form id="search-form" method="get">
diff --git a/src/java/azkaban/webapp/servlet/velocity/nav.vm b/src/java/azkaban/webapp/servlet/velocity/nav.vm
index 67404f9..0ac1eb7 100644
--- a/src/java/azkaban/webapp/servlet/velocity/nav.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/nav.vm
@@ -23,7 +23,7 @@
</script>
<ul id="nav" class="nav">
- <li id="all-jobs-tab" #if($current_page == 'all')class="selected"#end onClick="navMenuClick('$!context/')"><a href="$!context/">Projects</a></li>
+ <li id="all-jobs-tab" #if($current_page == 'all')class="selected"#end onClick="navMenuClick('$!context/index')"><a href="$!context/index">Projects</a></li>
<li id="scheduled-jobs-tab" #if($current_page == 'schedule')class="selected"#end onClick="navMenuClick('$!context/schedule')"><a href="$!context/schedule">Scheduled</a></li>
<li id="executing-jobs-tab" #if($current_page == 'executing')class="selected"#end onClick="navMenuClick('$!context/executor')"><a href="$!context/executor">Executing</a></li>
<li id="history-jobs-tab" #if($current_page == 'history')class="selected"#end onClick="navMenuClick('$!context/history')"><a href="$!context/history">History</a></li>
diff --git a/src/package/webserver/conf/azkaban.properties b/src/package/webserver/conf/azkaban.properties
index c9aed8d..3ccb2f3 100644
--- a/src/package/webserver/conf/azkaban.properties
+++ b/src/package/webserver/conf/azkaban.properties
@@ -1,7 +1,8 @@
#Azkaban Personalization Settings
-azkaban.name=Local
+azkaban.name=Test
azkaban.label=My Local Azkaban
azkaban.color=#FF3601
+azkaban.default.servlet.path=/index
web.resource.dir=web/
default.timezone.id=America/Los_Angeles
@@ -43,4 +44,6 @@ mail.host=
job.failure.email=
job.success.email=
-lockdown.create.projects=false
\ No newline at end of file
+lockdown.create.projects=false
+
+cache.directory=cache
src/web/js/azkaban.schedule.svg.js 2(+0 -2)
diff --git a/src/web/js/azkaban.schedule.svg.js b/src/web/js/azkaban.schedule.svg.js
index 05f434e..a49a8a9 100644
--- a/src/web/js/azkaban.schedule.svg.js
+++ b/src/web/js/azkaban.schedule.svg.js
@@ -448,8 +448,6 @@ $(function() {
{
var items = data.items;
- console.log(data);
-
//Sort items by day
for(var i = 0; i < items.length; i++)
{
diff --git a/unit/java/azkaban/test/execapp/MockProjectLoader.java b/unit/java/azkaban/test/execapp/MockProjectLoader.java
index b10ea29..ef76de7 100644
--- a/unit/java/azkaban/test/execapp/MockProjectLoader.java
+++ b/unit/java/azkaban/test/execapp/MockProjectLoader.java
@@ -224,4 +224,10 @@ public class MockProjectLoader implements ProjectLoader {
// TODO Auto-generated method stub
}
+
+ @Override
+ public void updateFlow(Project project, int version, Flow flow) throws ProjectManagerException {
+ // TODO Auto-generated method stub
+
+ }
}
\ No newline at end of file