azkaban-uncached

Changes

eclipse-styles.xml 283(+283 -0)

Details

eclipse-styles.xml 283(+283 -0)

diff --git a/eclipse-styles.xml b/eclipse-styles.xml
new file mode 100644
index 0000000..07e74f7
--- /dev/null
+++ b/eclipse-styles.xml
@@ -0,0 +1,283 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<profiles version="12">
+<profile kind="CodeFormatterProfile" name="Azkaban" version="12">
+<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.disabling_tag" value="@formatter:off"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_block_boundaries" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_closing_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_field" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.use_on_off_tags" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_ellipsis" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_multiple_fields" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_conditional_expression" value="80"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_binary_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_array_initializer" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_package" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.continuation_indentation" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_binary_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_package" value="0"/>
+<setting id="org.eclipse.jdt.core.compiler.source" value="1.5"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_line_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.join_wrapped_lines" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_member_type" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.align_type_members_on_columns" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_unary_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.indent_parameter_description" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.lineSplit" value="2000"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_method" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indentation.size" value="4"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.enabling_tag" value="@formatter:on"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_assignment" value="0"/>
+<setting id="org.eclipse.jdt.core.compiler.problem.assertIdentifier" value="error"/>
+<setting id="org.eclipse.jdt.core.formatter.tabulation.char" value="tab"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_body" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_method" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_method_declaration" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_switch" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.preserve_white_space_between_code_and_line_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.compiler.problem.enumIdentifier" value="error"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_ellipsis" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_method_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.compact_else_if" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_field" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_constant" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.indent_root_tags" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.tabulation.size" value="4"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_empty_lines" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block_in_case" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.compiler.compliance" value="1.5"/>
+<setting id="org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_unary_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_binary_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_type" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode" value="enabled"/>
+<setting id="org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_label" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_javadoc_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.line_length" value="2000"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_package" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_import_groups" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_before_binary_operator" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_block" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.join_lines_in_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_compact_if" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_imports" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_html" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_source_code" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.compiler.codegen.targetPlatform" value="1.5"/>
+<setting id="org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_header" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_block_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_enum_constants" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_javadoc_boundaries" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_imports" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line" value="false"/>
+</profile>
+</profiles>
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 16dc8d0..fb07641 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -39,7 +39,7 @@ public class ExecutableFlow {
 	private int version;
 
 	private String executionPath;
-	
+
 	private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
 	private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();
 	private ArrayList<String> startNodes;
@@ -52,10 +52,10 @@ public class ExecutableFlow {
 
 	private Status flowStatus = Status.READY;
 	private String submitUser;
-	
+
 	private HashSet<String> proxyUsers = new HashSet<String>();
 	private ExecutionOptions executionOptions;
-	
+
 	public ExecutableFlow(Flow flow) {
 		this.projectId = flow.getProjectId();
 		this.scheduleId = -1;
@@ -63,79 +63,81 @@ public class ExecutableFlow {
 		this.version = flow.getVersion();
 		this.setFlow(flow);
 	}
-	
+
 	public ExecutableFlow(int executionId, Flow flow) {
 		this.projectId = flow.getProjectId();
 		this.scheduleId = -1;
 		this.flowId = flow.getId();
 		this.version = flow.getVersion();
 		this.executionId = executionId;
-		
+
 		this.setFlow(flow);
 	}
-	
+
 	public ExecutableFlow() {
 	}
-	
+
 	public long getUpdateTime() {
 		return updateTime;
 	}
-	
+
 	public void setUpdateTime(long updateTime) {
 		this.updateTime = updateTime;
 	}
-	
+
 	public List<ExecutableNode> getExecutableNodes() {
 		return new ArrayList<ExecutableNode>(executableNodes.values());
 	}
-	
+
 	public ExecutableNode getExecutableNode(String id) {
 		return executableNodes.get(id);
 	}
-	
+
 	public Collection<FlowProps> getFlowProps() {
 		return flowProps.values();
 	}
-	
+
 	public void addAllProxyUsers(Collection<String> proxyUsers) {
 		this.proxyUsers.addAll(proxyUsers);
 	}
-	
+
 	public Set<String> getProxyUsers() {
 		return new HashSet<String>(this.proxyUsers);
 	}
-	
+
 	public void setExecutionOptions(ExecutionOptions options) {
 		executionOptions = options;
 	}
-	
+
 	public ExecutionOptions getExecutionOptions() {
 		return executionOptions;
 	}
-	
+
 	private void setFlow(Flow flow) {
 		executionOptions = new ExecutionOptions();
-		
+
 		for (Node node: flow.getNodes()) {
 			String id = node.getId();
 			ExecutableNode exNode = new ExecutableNode(node, this);
 			executableNodes.put(id, exNode);
 		}
-		
+
 		for (Edge edge: flow.getEdges()) {
 			ExecutableNode sourceNode = executableNodes.get(edge.getSourceId());
 			ExecutableNode targetNode = executableNodes.get(edge.getTargetId());
-			
+
 			sourceNode.addOutNode(edge.getTargetId());
 			targetNode.addInNode(edge.getSourceId());
 		}
-		
+
 		if (flow.getSuccessEmails() != null) {
 			executionOptions.setSuccessEmails(flow.getSuccessEmails());
 		}
 		if (flow.getFailureEmails() != null) {
 			executionOptions.setFailureEmails(flow.getFailureEmails());
 		}
+		executionOptions.setMailCreator(flow.getMailCreator());
+
 		flowProps.putAll(flow.getAllFlowProps());
 	}
 
@@ -148,10 +150,10 @@ public class ExecutableFlow {
 				}
 			}
 		}
-		
+
 		return startNodes;
 	}
-	
+
 	public List<String> getEndNodes() {
 		if (endNodes == null) {
 			endNodes = new ArrayList<String>();
@@ -161,10 +163,10 @@ public class ExecutableFlow {
 				}
 			}
 		}
-		
+
 		return endNodes;
 	}
-	
+
 	public boolean setNodeStatus(String nodeId, Status status) {
 		ExecutableNode exNode = executableNodes.get(nodeId);
 		if (exNode == null) {
@@ -179,18 +181,18 @@ public class ExecutableFlow {
 		if (exNode == null) {
 			return;
 		}
-		
+
 		exNode.setExternalExecutionId(externalExecutionId);
 	}
-	
+
 	public int getExecutionId() {
 		return executionId;
 	}
 
 	public void setExecutionId(int executionId) {
 		this.executionId = executionId;
-		
-		for(ExecutableNode node: executableNodes.values()) {
+
+		for (ExecutableNode node: executableNodes.values()) {
 			node.setExecutionId(executionId);
 		}
 	}
@@ -226,31 +228,31 @@ public class ExecutableFlow {
 	public void setExecutionPath(String executionPath) {
 		this.executionPath = executionPath;
 	}
-	
+
 	public long getStartTime() {
 		return startTime;
 	}
-	
+
 	public void setStartTime(long time) {
 		this.startTime = time;
 	}
-	
+
 	public long getEndTime() {
 		return endTime;
 	}
-	
+
 	public void setEndTime(long time) {
 		this.endTime = time;
 	}
-	
+
 	public long getSubmitTime() {
 		return submitTime;
 	}
-	
+
 	public void setSubmitTime(long time) {
 		this.submitTime = time;
 	}
-	
+
 	public Status getStatus() {
 		return flowStatus;
 	}
@@ -258,16 +260,16 @@ public class ExecutableFlow {
 	public void setStatus(Status flowStatus) {
 		this.flowStatus = flowStatus;
 	}
-	
-	public Map<String,Object> toObject() {
+
+	public Map<String, Object> toObject() {
 		HashMap<String, Object> flowObj = new HashMap<String, Object>();
 		flowObj.put("type", "executableflow");
 		flowObj.put("executionId", executionId);
 		flowObj.put("executionPath", executionPath);
 		flowObj.put("flowId", flowId);
 		flowObj.put("projectId", projectId);
-		
-		if(scheduleId >= 0) {
+
+		if (scheduleId >= 0) {
 			flowObj.put("scheduleId", scheduleId);
 		}
 		flowObj.put("submitTime", submitTime);
@@ -276,16 +278,16 @@ public class ExecutableFlow {
 		flowObj.put("status", flowStatus.toString());
 		flowObj.put("submitUser", submitUser);
 		flowObj.put("version", version);
-		
+
 		flowObj.put("executionOptions", this.executionOptions.toObject());
 		flowObj.put("version", version);
-		
+
 		ArrayList<Object> props = new ArrayList<Object>();
 		for (FlowProps fprop: flowProps.values()) {
 			HashMap<String, Object> propObj = new HashMap<String, Object>();
 			String source = fprop.getSource();
 			String inheritedSource = fprop.getInheritedSource();
-			
+
 			propObj.put("source", source);
 			if (inheritedSource != null) {
 				propObj.put("inherited", inheritedSource);
@@ -293,13 +295,13 @@ public class ExecutableFlow {
 			props.add(propObj);
 		}
 		flowObj.put("properties", props);
-		
+
 		ArrayList<Object> nodes = new ArrayList<Object>();
 		for (ExecutableNode node: executableNodes.values()) {
 			nodes.add(node.toObject());
 		}
 		flowObj.put("nodes", nodes);
-		
+
 		ArrayList<String> proxyUserList = new ArrayList<String>(proxyUsers);
 		flowObj.put("proxyUsers", proxyUserList);
 
@@ -307,57 +309,57 @@ public class ExecutableFlow {
 	}
 
 	public Object toUpdateObject(long lastUpdateTime) {
-		Map<String, Object> updateData = new HashMap<String,Object>();
+		Map<String, Object> updateData = new HashMap<String, Object>();
 		updateData.put("execId", this.executionId);
 		updateData.put("status", this.flowStatus.getNumVal());
 		updateData.put("startTime", this.startTime);
 		updateData.put("endTime", this.endTime);
 		updateData.put("updateTime", this.updateTime);
-		
-		List<Map<String,Object>> updatedNodes = new ArrayList<Map<String,Object>>();
+
+		List<Map<String, Object>> updatedNodes = new ArrayList<Map<String, Object>>();
 		for (ExecutableNode node: executableNodes.values()) {
-			
+
 			if (node.getUpdateTime() > lastUpdateTime) {
-				Map<String, Object> updatedNodeMap = new HashMap<String,Object>();
+				Map<String, Object> updatedNodeMap = new HashMap<String, Object>();
 				updatedNodeMap.put("jobId", node.getJobId());
 				updatedNodeMap.put("status", node.getStatus().getNumVal());
 				updatedNodeMap.put("startTime", node.getStartTime());
 				updatedNodeMap.put("endTime", node.getEndTime());
 				updatedNodeMap.put("updateTime", node.getUpdateTime());
 				updatedNodeMap.put("attempt", node.getAttempt());
-				
+
 				if (node.getAttempt() > 0) {
-					ArrayList<Map<String,Object>> pastAttempts = new ArrayList<Map<String,Object>>();
+					ArrayList<Map<String, Object>> pastAttempts = new ArrayList<Map<String, Object>>();
 					for (Attempt attempt: node.getPastAttemptList()) {
 						pastAttempts.add(attempt.toObject());
 					}
 					updatedNodeMap.put("pastAttempts", pastAttempts);
 				}
-				
+
 				updatedNodes.add(updatedNodeMap);
 			}
 		}
-		
+
 		updateData.put("nodes", updatedNodes);
 		return updateData;
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	public void applyUpdateObject(Map<String, Object> updateData) {
-		List<Map<String,Object>> updatedNodes = (List<Map<String,Object>>)updateData.get("nodes");
-		for (Map<String,Object> node: updatedNodes) {
+		List<Map<String, Object>> updatedNodes = (List<Map<String, Object>>)updateData.get("nodes");
+		for (Map<String, Object> node: updatedNodes) {
 			String jobId = (String)node.get("jobId");
 			Status status = Status.fromInteger((Integer)node.get("status"));
 			long startTime = JSONUtils.getLongFromObject(node.get("startTime"));
 			long endTime = JSONUtils.getLongFromObject(node.get("endTime"));
 			long updateTime = JSONUtils.getLongFromObject(node.get("updateTime"));
-			
+
 			ExecutableNode exNode = executableNodes.get(jobId);
 			exNode.setEndTime(endTime);
 			exNode.setStartTime(startTime);
 			exNode.setUpdateTime(updateTime);
 			exNode.setStatus(status);
-			
+
 			int attempt = 0;
 			if (node.containsKey("attempt")) {
 				attempt = (Integer)node.get("attempt");
@@ -365,22 +367,22 @@ public class ExecutableFlow {
 					exNode.updatePastAttempts((List<Object>)node.get("pastAttempts"));
 				}
 			}
-			
+
 			exNode.setAttempt(attempt);
 		}
-		
+
 		this.flowStatus = Status.fromInteger((Integer)updateData.get("status"));
-		
+
 		this.startTime = JSONUtils.getLongFromObject(updateData.get("startTime"));
 		this.endTime = JSONUtils.getLongFromObject(updateData.get("endTime"));
 		this.updateTime = JSONUtils.getLongFromObject(updateData.get("updateTime"));
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	public static ExecutableFlow createExecutableFlowFromObject(Object obj) {
 		ExecutableFlow exFlow = new ExecutableFlow();
-		
-		HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
+
+		HashMap<String, Object> flowObj = (HashMap<String, Object>)obj;
 		exFlow.executionId = (Integer)flowObj.get("executionId");
 		exFlow.executionPath = (String)flowObj.get("executionPath");
 		exFlow.flowId = (String)flowObj.get("flowId");
@@ -394,7 +396,7 @@ public class ExecutableFlow {
 		exFlow.flowStatus = Status.valueOf((String)flowObj.get("status"));
 		exFlow.submitUser = (String)flowObj.get("submitUser");
 		exFlow.version = (Integer)flowObj.get("version");
-		
+
 		if (flowObj.containsKey("executionOptions")) {
 			exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj.get("executionOptions"));
 		}
@@ -402,7 +404,7 @@ public class ExecutableFlow {
 			// for backawards compatibility should remove in a few versions.
 			exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj);
 		}
-		
+
 		// Copy nodes
 		List<Object> nodes = (List<Object>)flowObj.get("nodes");
 		for (Object nodeObj: nodes) {
@@ -411,57 +413,57 @@ public class ExecutableFlow {
 		}
 
 		List<Object> properties = (List<Object>)flowObj.get("properties");
-		for (Object propNode : properties) {
+		for (Object propNode: properties) {
 			HashMap<String, Object> fprop = (HashMap<String, Object>)propNode;
 			String source = (String)fprop.get("source");
 			String inheritedSource = (String)fprop.get("inherited");
-			
+
 			FlowProps flowProps = new FlowProps(inheritedSource, source);
 			exFlow.flowProps.put(source, flowProps);
 		}
-		
-		if(flowObj.containsKey("proxyUsers")) {
-			ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get("proxyUsers");
+
+		if (flowObj.containsKey("proxyUsers")) {
+			ArrayList<String> proxyUserList = (ArrayList<String>)flowObj.get("proxyUsers");
 			exFlow.addAllProxyUsers(proxyUserList);
 		}
-		
+
 		return exFlow;
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	public void updateExecutableFlowFromObject(Object obj) {
-		HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
+		HashMap<String, Object> flowObj = (HashMap<String, Object>)obj;
 
 		submitTime = JSONUtils.getLongFromObject(flowObj.get("submitTime"));
 		startTime = JSONUtils.getLongFromObject(flowObj.get("startTime"));
 		endTime = JSONUtils.getLongFromObject(flowObj.get("endTime"));
 		flowStatus = Status.valueOf((String)flowObj.get("status"));
-		
+
 		List<Object> nodes = (List<Object>)flowObj.get("nodes");
 		for (Object nodeObj: nodes) {
-			HashMap<String, Object> nodeHash= (HashMap<String, Object>)nodeObj;
+			HashMap<String, Object> nodeHash = (HashMap<String, Object>)nodeObj;
 			String nodeId = (String)nodeHash.get("id");
 			ExecutableNode node = executableNodes.get(nodeId);
 			if (nodeId == null) {
 				throw new RuntimeException("Node " + nodeId + " doesn't exist in flow.");
 			}
-			
+
 			node.updateNodeFromObject(nodeObj);
 		}
 	}
-	
+
 	public Set<String> getSources() {
 		HashSet<String> set = new HashSet<String>();
 		for (ExecutableNode exNode: executableNodes.values()) {
 			set.add(exNode.getJobPropsSource());
 		}
-		
+
 		for (FlowProps props: flowProps.values()) {
 			set.add(props.getSource());
 		}
 		return set;
 	}
-	
+
 	public String getSubmitUser() {
 		return submitUser;
 	}
@@ -469,7 +471,7 @@ public class ExecutableFlow {
 	public void setSubmitUser(String submitUser) {
 		this.submitUser = submitUser;
 	}
-	
+
 	public int getVersion() {
 		return version;
 	}
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index 687dd71..8818120 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -8,6 +8,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import azkaban.executor.mail.DefaultMailCreator;
+
 /**
  * Execution options for submitted flows and scheduled flows
  */
@@ -27,6 +29,7 @@ public class ExecutionOptions {
 	private Integer pipelineExecId = null;
 	private Integer queueLevel = 0;
 	private String concurrentOption = CONCURRENT_OPTION_IGNORE;
+	private String mailCreator = DefaultMailCreator.DEFAULT_MAIL_CREATOR;
 	private Map<String, String> flowParameters = new HashMap<String, String>();
 	
 	public enum FailureAction {
@@ -107,10 +110,18 @@ public class ExecutionOptions {
 		this.concurrentOption = concurrentOption;
 	}
 	
+	public void setMailCreator(String mailCreator) {
+		this.mailCreator = mailCreator;
+	}
+	
 	public String getConcurrentOption() {
 		return concurrentOption;
 	}
 	
+	public String getMailCreator() {
+		return mailCreator;
+	}
+	
 	public Integer getPipelineLevel() {
 		return pipelineLevel;
 	}
@@ -152,6 +163,7 @@ public class ExecutionOptions {
 		flowOptionObj.put("pipelineExecId", pipelineExecId);
 		flowOptionObj.put("queueLevel", queueLevel);
 		flowOptionObj.put("concurrentOption", concurrentOption);
+		flowOptionObj.put("mailCreator", mailCreator);
 		flowOptionObj.put("disabled", initiallyDisabledJobs);
 		flowOptionObj.put("failureEmailsOverride", failureEmailsOverride);
 		flowOptionObj.put("successEmailsOverride", successEmailsOverride);
@@ -180,6 +192,9 @@ public class ExecutionOptions {
 		if (optionsMap.containsKey("concurrentOption")) {
 			options.concurrentOption = (String)optionsMap.get("concurrentOption");
 		}
+		if (optionsMap.containsKey("mailCreator")) {
+			options.mailCreator = (String)optionsMap.get("mailCreator");
+		}
 		if (optionsMap.containsKey("disabled")) {
 			options.initiallyDisabledJobs = new HashSet<String>((List<String>)optionsMap.get("disabled"));
 		}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 1fa6989..07adb24 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -16,6 +16,7 @@
 
 package azkaban.executor;
 
+import java.io.File;
 import java.io.IOException;
 import java.lang.Thread.State;
 import java.net.URI;
@@ -70,18 +71,22 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 	
 	private long lastThreadCheckTime = -1;
 	private String updaterStage = "not started";
-	
+
 	private Map<String, Alerter> alerters;
 	
+	File cacheDir;
+	
 	public ExecutorManager(Props props, ExecutorLoader loader, Map<String, Alerter> alters) throws ExecutorManagerException {
+		
 		this.executorLoader = loader;
 		this.loadRunningFlows();
-		
 		executorHost = props.getString("executor.host", "localhost");
 		executorPort = props.getInt("executor.port");
-
+		
 		alerters = alters;
 		
+		cacheDir = new File(props.getString("cache.directory", "cache"));
+
 		executingManager = new ExecutingManagerUpdaterThread();
 		executingManager.start();
 		
@@ -866,7 +871,7 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 						// Add new finished
 						for (ExecutableFlow flow: finishedFlows) {
 							if(flow.getScheduleId() >= 0 && flow.getStatus() == Status.SUCCEEDED){
-								ScheduleStatisticManager.invalidateCache(flow.getScheduleId());
+								ScheduleStatisticManager.invalidateCache(flow.getScheduleId(), cacheDir);
 							}
 							recentlyFinished.put(flow.getExecutionId(), flow);
 						}
@@ -895,7 +900,7 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 				}
 				catch (Exception e) {
 					logger.error(e);
-				}
+				} 
 			}
 		}
 	}
@@ -903,7 +908,8 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 	private void finalizeFlows(ExecutableFlow flow) {
 
 		int execId = flow.getExecutionId();
-
+		
+		updaterStage = "finalizing flow " + execId;
 		// First we check if the execution in the datastore is complete
 		try {
 			ExecutableFlow dsFlow;
@@ -911,18 +917,19 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 				dsFlow = flow;
 			}
 			else {
+				updaterStage = "finalizing flow " + execId + " loading from db";
 				dsFlow = executorLoader.fetchExecutableFlow(execId);
 			
-				
+				// If it's marked finished, we're good. If not, we fail everything and then mark it finished.
+				if (!isFinished(dsFlow)) {
+					updaterStage = "finalizing flow " + execId + " failing the flow";
+					failEverything(dsFlow);
+					executorLoader.updateExecutableFlow(dsFlow);
+				}
 			}
 
-
-			// If it's marked finished, we're good. If not, we fail everything and then mark it finished.
-			if (!isFinished(dsFlow)) {
-				failEverything(dsFlow);
-				executorLoader.updateExecutableFlow(dsFlow);
-			}
-		
+			updaterStage = "finalizing flow " + execId + " deleting active reference";
+			
 			// Delete the executing reference.
 			if (flow.getEndTime() == -1) {
 				flow.setEndTime(System.currentTimeMillis());
@@ -930,6 +937,7 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 			}
 			executorLoader.removeActiveExecutableReference(execId);
 			
+			updaterStage = "finalizing flow " + execId + " cleaning from memory";
 			runningFlows.remove(execId);
 			recentlyFinished.put(execId, dsFlow);
 
@@ -940,6 +948,7 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 		// TODO append to the flow log that we forced killed this flow because the target no longer had
 		// the reference.
 		
+		updaterStage = "finalizing flow " + execId + " alerting and emailing";
 		ExecutionOptions options = flow.getExecutionOptions();
 		// But we can definitely email them.
 		Alerter mailAlerter = alerters.get("email");
diff --git a/src/java/azkaban/executor/mail/DefaultMailCreator.java b/src/java/azkaban/executor/mail/DefaultMailCreator.java
new file mode 100644
index 0000000..9275ccf
--- /dev/null
+++ b/src/java/azkaban/executor/mail/DefaultMailCreator.java
@@ -0,0 +1,149 @@
+package azkaban.executor.mail;
+
+import java.util.HashMap;
+import java.util.List;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.utils.EmailMessage;
+import azkaban.utils.Emailer;
+import azkaban.utils.Utils;
+
+public class DefaultMailCreator implements MailCreator {
+	public static final String DEFAULT_MAIL_CREATOR = "default";
+	private static HashMap<String, MailCreator> registeredCreators = new HashMap<String, MailCreator>();
+	private static MailCreator defaultCreator;
+
+	public static void registerCreator(String name, MailCreator creator) {
+		registeredCreators.put(name, creator);
+	}
+
+	public static MailCreator getCreator(String name) {
+		MailCreator creator = registeredCreators.get(name);
+		if (creator == null) {
+			creator = defaultCreator;
+		}
+		return creator;
+	}
+
+	static {
+		defaultCreator = new DefaultMailCreator();
+		registerCreator(DEFAULT_MAIL_CREATOR, defaultCreator);
+	}
+
+	@Override
+	public boolean createFirstErrorMessage(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars) {
+
+		ExecutionOptions option = flow.getExecutionOptions();
+		List<String> emailList = option.getDisabledJobs();
+		int execId = flow.getExecutionId();
+
+		if (emailList != null && !emailList.isEmpty()) {
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+
+			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
+
+			if (option.getFailureAction() == FailureAction.CANCEL_ALL) {
+				message.println("This flow is set to cancel all currently running jobs.");
+			}
+			else if (option.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE) {
+				message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
+			}
+			else {
+				message.println("This flow is set to complete all currently running jobs before stopping.");
+			}
+
+			message.println("<table>");
+			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() + "</td></tr>");
+			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() + "</td></tr>");
+			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) + "</td></tr>");
+			message.println("</table>");
+			message.println("");
+			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+
+			message.println("");
+			message.println("<h3>Reason</h3>");
+			List<String> failedJobs = Emailer.findFailedJobs(flow);
+			message.println("<ul>");
+			for (String jobId : failedJobs) {
+				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>");
+			}
+
+			message.println("</ul>");
+			return true;
+		}
+
+		return false;
+	}
+
+	@Override
+	public boolean createErrorEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars) {
+
+		ExecutionOptions option = flow.getExecutionOptions();
+
+		List<String> emailList = option.getFailureEmails();
+		int execId = flow.getExecutionId();
+
+		if (emailList != null && !emailList.isEmpty()) {
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+
+			message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
+			message.println("<table>");
+			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() + "</td></tr>");
+			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() + "</td></tr>");
+			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) + "</td></tr>");
+			message.println("</table>");
+			message.println("");
+			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+
+			message.println("");
+			message.println("<h3>Reason</h3>");
+			List<String> failedJobs = Emailer.findFailedJobs(flow);
+			message.println("<ul>");
+			for (String jobId : failedJobs) {
+				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>");
+			}
+			for (String reasons : vars) {
+				message.println("<li>" + reasons + "</li>");
+			}
+
+			message.println("</ul>");
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars) {
+
+		ExecutionOptions option = flow.getExecutionOptions();
+		List<String> emailList = option.getSuccessEmails();
+
+		int execId = flow.getExecutionId();
+
+		if (emailList != null && !emailList.isEmpty()) {
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
+
+			message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
+			message.println("<table>");
+			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() + "</td></tr>");
+			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() + "</td></tr>");
+			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) + "</td></tr>");
+			message.println("</table>");
+			message.println("");
+			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+			message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+			return true;
+		}
+		return false;
+	}
+}
diff --git a/src/java/azkaban/executor/mail/MailCreator.java b/src/java/azkaban/executor/mail/MailCreator.java
new file mode 100644
index 0000000..d80afd4
--- /dev/null
+++ b/src/java/azkaban/executor/mail/MailCreator.java
@@ -0,0 +1,10 @@
+package azkaban.executor.mail;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.utils.EmailMessage;
+
+public interface MailCreator {
+	public boolean createFirstErrorMessage(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars);
+	public boolean createErrorEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars);
+	public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars);
+}
diff --git a/src/java/azkaban/flow/Flow.java b/src/java/azkaban/flow/Flow.java
index 6c01df8..3846394 100644
--- a/src/java/azkaban/flow/Flow.java
+++ b/src/java/azkaban/flow/Flow.java
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import azkaban.executor.mail.DefaultMailCreator;
+
 public class Flow {
 	private final String id;
 	private int projectId;
@@ -40,8 +42,10 @@ public class Flow {
 
 	private List<String> failureEmail = new ArrayList<String>();
 	private List<String> successEmail = new ArrayList<String>();
+	private String mailCreator = DefaultMailCreator.DEFAULT_MAIL_CREATOR;
 	private ArrayList<String> errors;
 	private int version = -1;
+	private Map<String, Object> metadata = new HashMap<String, Object>();
 	
 	private boolean isLayedOut = false;
 	
@@ -106,10 +110,18 @@ public class Flow {
 		return successEmail;
 	}
 	
+	public String getMailCreator() {
+		return mailCreator;
+	}
+	
 	public List<String> getFailureEmails() {
 		return failureEmail;
 	}
 	
+	public void setMailCreator(String mailCreator) {
+		this.mailCreator = mailCreator;
+	}
+	
 	public void addSuccessEmails(Collection<String> emails) {
 		successEmail.addAll(emails);
 	}
@@ -226,10 +238,15 @@ public class Flow {
 		flowObj.put("edges", objectizeEdges());
 		flowObj.put("failure.email", failureEmail);
 		flowObj.put("success.email", successEmail);
+		flowObj.put("mailCreator", mailCreator);
 		flowObj.put("layedout", isLayedOut);
 		if (errors != null) {
 			flowObj.put("errors", errors);
 		}
+
+		if (metadata != null) {
+			flowObj.put("metadata", metadata);
+		}
 		
 		return flowObj;
 	}
@@ -294,9 +311,18 @@ public class Flow {
 		List<Object> edgeList = (List<Object>)flowObject.get("edges");
 		List<Edge> edges = loadEdgeFromObjects(edgeList, nodes);
 		flow.addAllEdges(edges);
+
+		Map<String, Object> metadata = (Map<String, Object>)flowObject.get("metadata");
+		
+		if (metadata != null) {
+			flow.setMetadata(metadata);
+		}
 		
 		flow.failureEmail = (List<String>)flowObject.get("failure.email");
 		flow.successEmail = (List<String>)flowObject.get("success.email");
+		if (flowObject.containsKey("mailCreator")) {
+			flow.mailCreator = flowObject.get("mailCreator").toString();
+		}
 		return flow;
 	}
 
@@ -337,6 +363,17 @@ public class Flow {
 		return isLayedOut;
 	}
 
+	public Map<String, Object> getMetadata() {
+		if(metadata == null){
+			metadata = new HashMap<String, Object>();
+		}
+		return metadata;
+	}
+
+	public void setMetadata(Map<String, Object> metadata) {
+		this.metadata = metadata;
+	}
+
 	public void setLayedOut(boolean layedOut) {
 		this.isLayedOut = layedOut;
 	}
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 7825694..a0a3d7d 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -253,7 +253,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
 		}
 	}
 
-	@SuppressWarnings("resource")
 	private void uploadProjectFile(Connection connection, Project project, int version, String filetype, String filename, File localFile, String uploader) throws ProjectManagerException {
 		QueryRunner runner = new QueryRunner();
 		long updateTime = System.currentTimeMillis();
@@ -343,7 +342,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
 		return handler;
 	}
 	
-	@SuppressWarnings("resource")
 	private ProjectFileHandler getUploadedFile(Connection connection, int projectId, int version) throws ProjectManagerException {
 		QueryRunner runner = new QueryRunner();
 		ProjectVersionResultHandler pfHandler = new ProjectVersionResultHandler();
@@ -681,6 +679,41 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
 		}
 	}
 
+	@Override
+	public void updateFlow(Project project, int version, Flow flow) throws ProjectManagerException {
+		logger.info("Uploading flows");
+		Connection connection = getConnection();
+
+		try {
+			QueryRunner runner = new QueryRunner();
+			String json = JSONUtils.toJSON(flow.toObject());
+			byte[] stringData = json.getBytes("UTF-8");
+			byte[] data = stringData;
+
+			logger.info("UTF-8 size:" + data.length);
+			if (defaultEncodingType == EncodingType.GZIP) {
+				data = GZIPUtils.gzipBytes(stringData);
+			}
+
+			logger.info("Flow upload " + flow.getId() + " is byte size " + data.length);
+			final String UPDATE_FLOW = "UPDATE project_flows SET encoding_type=?,json=? WHERE project_id=? AND version=? AND flow_id=?";
+			try {
+				runner.update(connection, UPDATE_FLOW, defaultEncodingType.getNumVal(), data, project.getId(), version, flow.getId());
+			} catch (SQLException e) {
+				e.printStackTrace();
+				throw new ProjectManagerException("Error inserting flow " + flow.getId(), e);
+			}
+			connection.commit();
+		} catch (IOException e) {
+			throw new ProjectManagerException("Flow Upload failed.", e);
+		} catch (SQLException e) {
+			throw new ProjectManagerException("Flow Upload failed commit.", e);
+		}
+		finally {
+			DbUtils.closeQuietly(connection);
+		}
+	}
+
 	public EncodingType getDefaultEncodingType() {
 		return defaultEncodingType;
 	}
diff --git a/src/java/azkaban/project/Project.java b/src/java/azkaban/project/Project.java
index d2e1f52..47ebc4b 100644
--- a/src/java/azkaban/project/Project.java
+++ b/src/java/azkaban/project/Project.java
@@ -45,6 +45,7 @@ public class Project {
 	private LinkedHashMap<String, Permission> groupPermissionMap = new LinkedHashMap<String, Permission>();
 	private Map<String, Flow> flows = null;
 	private HashSet<String> proxyUsers = new HashSet<String>();
+	private Map<String, Object> metadata = new HashMap<String, Object>();
 	
 	public Project(int id, String name) {
 		this.id = id;
@@ -67,6 +68,10 @@ public class Project {
 		return flows.get(flowId);
 	}
 	
+	public Map<String, Flow> getFlowMap() {
+		return flows;
+	}
+	
 	public List<Flow> getFlows() {
 		List<Flow> retFlow = null;
 		if (flows != null) {
@@ -218,6 +223,10 @@ public class Project {
 		userPermissionMap.remove(userId);
 	}
 	
+	public void clearUserPermission() {
+		userPermissionMap.clear();
+	}
+	
 	public long getCreateTimestamp() {
 		return createTimestamp;
 	}
@@ -251,6 +260,10 @@ public class Project {
 		if (source != null) {
 			projectObject.put("source", source);
 		}
+		
+		if (metadata != null) {
+			projectObject.put("metadata", metadata);
+		}
 
 		ArrayList<Map<String, Object>> users = new ArrayList<Map<String, Object>>();
 		for (Map.Entry<String, Permission> entry : userPermissionMap.entrySet()) {
@@ -282,6 +295,7 @@ public class Project {
 		Boolean active = (Boolean)projectObject.get("active");
 		active = active == null ? true : active;
 		int version = (Integer)projectObject.get("version");
+		Map<String, Object> metadata = (Map<String, Object>)projectObject.get("metadata");
 		
 		Project project = new Project(id, name);
 		project.setVersion(version);
@@ -294,6 +308,9 @@ public class Project {
 		if (source != null) {
 			project.setSource(source);
 		}
+		if (metadata != null) {
+			project.setMetadata(metadata);
+		}
 		
 		List<Map<String, Object>> users = (List<Map<String, Object>>) projectObject
 				.get("users");
@@ -403,6 +420,17 @@ public class Project {
 		this.source = source;
 	}
 
+	public Map<String, Object> getMetadata() {
+		if(metadata == null){
+			metadata = new HashMap<String, Object>();
+		}
+		return metadata;
+	}
+
+	protected void setMetadata(Map<String, Object> metadata) {
+		this.metadata = metadata;
+	}
+
 	public int getId() {
 		return id;
 	}
diff --git a/src/java/azkaban/project/ProjectLoader.java b/src/java/azkaban/project/ProjectLoader.java
index 7e99298..ea6e5df 100644
--- a/src/java/azkaban/project/ProjectLoader.java
+++ b/src/java/azkaban/project/ProjectLoader.java
@@ -122,6 +122,9 @@ public interface ProjectLoader {
 	 * @throws ProjectManagerException
 	 */
 	public void changeProjectVersion(Project project, int version, String user) throws ProjectManagerException;
+
+
+	public void updateFlow(Project project, int version, Flow flow) throws ProjectManagerException;
 	
 	/**
 	 * Uploads all computed flows
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index e69ed5e..526eb1e 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -376,6 +376,10 @@ public class ProjectManager {
 		projectLoader.cleanOlderProjectVersion(project.getId(), project.getVersion() - projectVersionRetention);
 	}
 	
+	public void updateFlow(Project project, Flow flow) throws ProjectManagerException {
+		projectLoader.updateFlow(project, flow.getVersion(), flow);
+	}
+	
 	private File unzipFile(File archiveFile) throws IOException {
 		ZipFile zipfile = new ZipFile(archiveFile);
 		File unzipped = Utils.createTempDir(tempDir);
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index ed41ce4..01fd0c9 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -206,7 +206,6 @@ public class ScheduleManager implements TriggerAgent {
 	 * @param id
 	 */
 	public synchronized void removeSchedule(Schedule sched) {
-		
 		Pair<Integer,String> identityPairMap = sched.getScheduleIdentityPair();
 //		Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
 //		if(schedules != null) {
@@ -347,11 +346,268 @@ public class ScheduleManager implements TriggerAgent {
 		}
 	}
 	
+
 	@Override
 	public void loadTriggerFromProps(Props props) throws ScheduleManagerException {
 		throw new ScheduleManagerException("create " + getTriggerSource() + " from json not supported yet" );
-		
-	}
+	}	
+
+	/**
+	 * Thread that simply invokes the running of flows when the schedule is
+	 * ready.
+	 * 
+	 * @author Richard Park
+	 * 
+	 */
+//	public class ScheduleRunner extends Thread {
+//		private final PriorityBlockingQueue<Schedule> schedules;
+//		private AtomicBoolean stillAlive = new AtomicBoolean(true);
+//
+//		// Five minute minimum intervals
+//		private static final int TIMEOUT_MS = 300000;
+//
+//		public ScheduleRunner() {
+//			schedules = new PriorityBlockingQueue<Schedule>(1,new ScheduleComparator());
+//		}
+//
+//		public void shutdown() {
+//			logger.error("Shutting down scheduler thread");
+//			stillAlive.set(false);
+//			this.interrupt();
+//		}
+//
+//		/**
+//		 * Return a list of scheduled flow
+//		 * 
+//		 * @return
+//		 */
+//		public synchronized List<Schedule> getRunnerSchedules() {
+//			return new ArrayList<Schedule>(schedules);
+//		}
+//
+//		/**
+//		 * Adds the flow to the schedule and then interrupts so it will update
+//		 * its wait time.
+//		 * 
+//		 * @param flow
+//		 */
+//		public synchronized void addRunnerSchedule(Schedule s) {
+//			logger.info("Adding " + s + " to schedule runner.");
+//			schedules.add(s);
+//			// MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
+//			// System.currentTimeMillis(),
+//			// WorkflowAction.SCHEDULE_WORKFLOW,
+//			// WorkflowState.NOP,
+//			// flow.getId());
+//
+//			this.interrupt();
+//		}
+//
+//		/**
+//		 * Remove scheduled flows. Does not interrupt.
+//		 * 
+//		 * @param flow
+//		 */
+//		public synchronized void removeRunnerSchedule(Schedule s) {
+//			logger.info("Removing " + s + " from the schedule runner.");
+//			schedules.remove(s);
+//			// MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
+//			// System.currentTimeMillis(),
+//			// WorkflowAction.UNSCHEDULE_WORKFLOW,
+//			// WorkflowState.NOP,
+//			// flow.getId());
+//			// Don't need to interrupt, because if this is originally on the top
+//			// of the queue,
+//			// it'll just skip it.
+//		}
+//
+//		public void run() {
+//			while (stillAlive.get()) {
+//				synchronized (this) {
+//					try {
+//						lastCheckTime = System.currentTimeMillis();
+//						
+//						runnerStage = "Starting schedule scan.";
+//						// TODO clear up the exception handling
+//						Schedule s = schedules.peek();
+//
+//						if (s == null) {
+//							// If null, wake up every minute or so to see if
+//							// there's something to do. Most likely there will not be.
+//							try {
+//								logger.info("Nothing scheduled to run. Checking again soon.");
+//								runnerStage = "Waiting for next round scan.";
+//								nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
+//								this.wait(TIMEOUT_MS);
+//							} catch (InterruptedException e) {
+//								// interruption should occur when items are added or removed from the queue.
+//							}
+//						} else {
+//							// We've passed the flow execution time, so we will run.
+//							if (!(new DateTime(s.getNextExecTime())).isAfterNow()) {
+//								// Run flow. The invocation of flows should be quick.
+//								Schedule runningSched = schedules.poll();
+//
+//								runnerStage = "Ready to run schedule " + runningSched.toString();
+//								
+//								logger.info("Scheduler ready to run " + runningSched.toString());
+//								// Execute the flow here
+//								try {
+//									Project project = projectManager.getProject(runningSched.getProjectId());
+//									if (project == null) {
+//										logger.error("Scheduled Project " + runningSched.getProjectId() + " does not exist!");
+//										throw new RuntimeException("Error finding the scheduled project. "+ runningSched.getProjectId());
+//									}	
+//									//TODO It is possible that the project is there, but the flow doesn't exist because upload a version that changes flow structure
+//
+//									Flow flow = project.getFlow(runningSched.getFlowName());
+//									if (flow == null) {
+//										logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
+//										throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
+//									}
+//									
+//									// Create ExecutableFlow
+//									ExecutableFlow exflow = new ExecutableFlow(flow);
+//									System.out.println("ScheduleManager: creating schedule: " +runningSched.getScheduleId());
+//									exflow.setScheduleId(runningSched.getScheduleId());
+//									exflow.setSubmitUser(runningSched.getSubmitUser());
+//									exflow.addAllProxyUsers(project.getProxyUsers());
+//									
+//									ExecutionOptions flowOptions = runningSched.getExecutionOptions();
+//									if(flowOptions == null) {
+//										flowOptions = new ExecutionOptions();
+//										flowOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+//									}
+//									exflow.setExecutionOptions(flowOptions);
+//									
+//									if (!flowOptions.isFailureEmailsOverridden()) {
+//										flowOptions.setFailureEmails(flow.getFailureEmails());
+//									}
+//									if (!flowOptions.isSuccessEmailsOverridden()) {
+//										flowOptions.setSuccessEmails(flow.getSuccessEmails());
+//									}
+//									
+//									runnerStage = "Submitting flow " + exflow.getFlowId();
+//									flowOptions.setMailCreator(flow.getMailCreator());
+//									
+//									try {
+//										executorManager.submitExecutableFlow(exflow);
+//										logger.info("Scheduler has invoked " + exflow.getExecutionId());
+//									} 
+//									catch (ExecutorManagerException e) {
+//										throw e;
+//									}
+//									catch (Exception e) {	
+//										e.printStackTrace();
+//										throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
+//									}
+//									
+//									SlaOptions slaOptions = runningSched.getSlaOptions();
+//									if(slaOptions != null) {
+//										logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
+//										runnerStage = "Submitting SLA checkings for " + runningSched.getFlowName();
+//										// submit flow slas
+//										List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
+//										for(SlaSetting set : slaOptions.getSettings()) {
+//											if(set.getId().equals("")) {
+//												DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
+//												slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
+//											}
+//											else {
+//												jobsettings.add(set);
+//											}
+//										}
+//										if(jobsettings.size() > 0) {
+//											slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
+//										}
+//									}
+//									
+//								} 
+//								catch (ExecutorManagerException e) {
+//									if (e.getReason() != null && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
+//										logger.info(e.getMessage());
+//									}
+//									else {
+//										e.printStackTrace();
+//									}
+//								}
+//								catch (Exception e) {
+//									logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
+//								}
+//
+//								runnerStage = "Done running schedule for " + runningSched.toString();
+//								removeRunnerSchedule(runningSched);
+//
+//								// Immediately reschedule if it's possible. Let
+//								// the execution manager
+//								// handle any duplicate runs.
+//								if (runningSched.updateTime()) {
+//									addRunnerSchedule(runningSched);
+//									loader.updateSchedule(runningSched);
+//								}
+//								else {
+//									removeSchedule(runningSched);
+//								}								
+//							} else {
+//								runnerStage = "Waiting for next round scan.";
+//								// wait until flow run
+//								long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
+//								try {
+//									nextWakupTime = System.currentTimeMillis() + millisWait;
+//									this.wait(Math.min(millisWait, TIMEOUT_MS));
+//								} catch (InterruptedException e) {
+//									// interruption should occur when items are
+//									// added or removed from the queue.
+//								}
+//							}
+//						}
+//					} catch (Exception e) {
+//						logger.error("Unexpected exception has been thrown in scheduler", e);
+//					} catch (Throwable e) {
+//						logger.error("Unexpected throwable has been thrown in scheduler", e);
+//					}
+//				}
+//			}
+//		}
+//
+//		/**
+//		 * Class to sort the schedule based on time.
+//		 * 
+//		 * @author Richard Park
+//		 */
+//		private class ScheduleComparator implements Comparator<Schedule> {
+//			@Override
+//			public int compare(Schedule arg0, Schedule arg1) {
+//				long first = arg1.getNextExecTime();
+//				long second = arg0.getNextExecTime();
+//
+//				if (first == second) {
+//					return 0;
+//				} else if (first < second) {
+//					return 1;
+//				}
+//
+//				return -1;
+//			}
+//		}
+//	}
+	
+//	public long getLastCheckTime() {
+//		return lastCheckTime;
+//	}
+//	
+//	public long getNextUpdateTime() {
+//		return nextWakupTime;
+//	}
+//	
+//	public State getThreadState() {
+//		return runner.getState();
+//	}
+//	
+//	public boolean isThreadActive() {
+//		return runner.isAlive();
+//>>>>>>> df6eb48ad044ae68afffae2254991289792f33a0
+//	}
 
 	@Override
 	public String getTriggerSource() {
diff --git a/src/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
index 482334d..884378d 100644
--- a/src/java/azkaban/scheduler/ScheduleStatisticManager.java
+++ b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -16,10 +16,13 @@ import azkaban.webapp.AzkabanWebServer;
 
 public class ScheduleStatisticManager {
 	private static HashMap<Integer, Object> cacheLock = new HashMap<Integer, Object>();
-	private static File cacheDirectory = new File("cache/schedule-statistics");
+	private static File cacheDirectory;
 	private static final int STAT_NUMBERS = 10;
 
 	public static Map<String, Object> getStatistics(int scheduleId, AzkabanWebServer server) throws ScheduleManagerException {
+		if (cacheDirectory == null) {
+			setCacheFolder(new File(server.getServerProps().getString("cache.directory", "cache")));
+		}
 		Map<String, Object> data = loadCache(scheduleId);
 		if (data != null) {
 			return data;
@@ -74,7 +77,8 @@ public class ScheduleStatisticManager {
 		return data;
 	}
 
-	public static void invalidateCache(int scheduleId) {
+	public static void invalidateCache(int scheduleId, File cacheDir) {
+		setCacheFolder(cacheDir);
 		// This should be silent and not fail
 		try {
 			Object lock = getLock(scheduleId);
@@ -147,4 +151,10 @@ public class ScheduleStatisticManager {
 			cacheLock.remove(scheduleId);
 		}
 	}
+
+	private static void setCacheFolder(File cacheDir) {
+		if (cacheDirectory == null) {
+			cacheDirectory = new File(cacheDir, "schedule-statistics");
+		}
+	}
 }
diff --git a/src/java/azkaban/user/User.java b/src/java/azkaban/user/User.java
index d538e72..4f69d02 100644
--- a/src/java/azkaban/user/User.java
+++ b/src/java/azkaban/user/User.java
@@ -23,17 +23,42 @@ import java.util.Set;
 
 public class User {
 	private final String userid;
+	private String email = "";
 	private Set<String> roles = new HashSet<String>();
 	private Set<String> groups = new HashSet<String>();
-	
+	private UserPermissions userPermissions;
+
 	public User(String userid) {
 		this.userid = userid;
 	}
-	
+
 	public String getUserId() {
 		return userid;
 	}
 
+	public void setEmail(String email) {
+		this.email = email;
+	}
+
+	public String getEmail() {
+		return email;
+	}
+
+	public void setPermissions(UserPermissions checker) {
+		this.userPermissions = checker;
+	}
+
+	public UserPermissions getPermissions() {
+		return userPermissions;
+	}
+
+	public boolean hasPermission(String permission) {
+		if (userPermissions == null) {
+			return false;
+		}
+		return this.userPermissions.hasPermission(permission);
+	}
+
 	public List<String> getGroups() {
 		return new ArrayList<String>(groups);
 	}
@@ -41,27 +66,27 @@ public class User {
 	public void clearGroup() {
 		groups.clear();
 	}
-	
+
 	public void addGroup(String name) {
 		groups.add(name);
 	}
-	
+
 	public boolean isInGroup(String group) {
 		return this.groups.contains(group);
 	}
-	
+
 	public List<String> getRoles() {
 		return new ArrayList<String>(roles);
 	}
-	
+
 	public void addRole(String role) {
 		this.roles.add(role);
 	}
-	
+
 	public boolean hasRole(String role) {
 		return roles.contains(role);
 	}
-	
+
 	public String toString() {
 		String groupStr = "[";
 		for (String group: groups) {
@@ -70,7 +95,7 @@ public class User {
 		groupStr += "]";
 		return userid + ": " + groupStr;
 	}
-	
+
 	@Override
 	public int hashCode() {
 		final int prime = 31;
@@ -87,12 +112,40 @@ public class User {
 			return false;
 		if (getClass() != obj.getClass())
 			return false;
-		User other = (User) obj;
+		User other = (User)obj;
 		if (userid == null) {
 			if (other.userid != null)
 				return false;
-		} else if (!userid.equals(other.userid))
+		}
+		else if (!userid.equals(other.userid))
 			return false;
 		return true;
 	}
+
+	public static interface UserPermissions {
+		public boolean hasPermission(String permission);
+		public void addPermission(String permission);
+	}
+
+	public static class DefaultUserPermission implements UserPermissions {
+		Set<String> permissions;
+
+		public DefaultUserPermission() {
+			this(new HashSet<String>());
+		}
+
+		public DefaultUserPermission(Set<String> permissions) {
+			this.permissions = permissions;
+		}
+
+		@Override
+		public boolean hasPermission(String permission) {
+			return permissions.contains(permission);
+		}
+
+		@Override
+		public void addPermission(String permission) {
+			permissions.add(permission);
+		}
+	}
 }
diff --git a/src/java/azkaban/user/XmlUserManager.java b/src/java/azkaban/user/XmlUserManager.java
index 55ee224..5686f4c 100644
--- a/src/java/azkaban/user/XmlUserManager.java
+++ b/src/java/azkaban/user/XmlUserManager.java
@@ -33,6 +33,7 @@ import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 
+import azkaban.user.User.UserPermissions;
 import azkaban.utils.Props;
 
 /**
@@ -262,6 +263,16 @@ public class XmlUserManager implements UserManager {
 
 		// Add all the roles the group has to the user
 		resolveGroupRoles(user);
+		user.setPermissions(new UserPermissions() {
+			@Override
+			public boolean hasPermission(String permission) {
+				return true;
+			}
+
+			@Override
+			public void addPermission(String permission) {
+			}
+		});
 		return user;
 	}
 
diff --git a/src/java/azkaban/utils/Emailer.java b/src/java/azkaban/utils/Emailer.java
index 830337a..dd016c6 100644
--- a/src/java/azkaban/utils/Emailer.java
+++ b/src/java/azkaban/utils/Emailer.java
@@ -11,10 +11,11 @@ import azkaban.alert.Alerter;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.Status;
 import azkaban.sla.SlaOption;
 import azkaban.utils.AbstractMailer;
+import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.executor.mail.MailCreator;
 import azkaban.utils.EmailMessage;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
@@ -23,10 +24,32 @@ public class Emailer extends AbstractMailer implements Alerter {
 	private static Logger logger = Logger.getLogger(Emailer.class);
 	
 	private boolean testMode = false;
-	
+
+	private String clientHostname;
+	private String clientPortNumber;
+
+	private String mailHost;
+	private String mailUser;
+	private String mailPassword;
+	private String mailSender;
+	private String azkabanName;
+
 	public Emailer(Props props) {
 		super(props);
+		this.azkabanName = props.getString("azkaban.name", "azkaban");
+		this.mailHost = props.getString("mail.host", "localhost");
+		this.mailUser = props.getString("mail.user", "");
+		this.mailPassword = props.getString("mail.password", "");
+		this.mailSender = props.getString("mail.sender", "");
 
+		int mailTimeout = props.getInt("mail.timeout.millis", 10000);
+		EmailMessage.setTimeout(mailTimeout);
+		int connectionTimeout = props.getInt("mail.connection.timeout.millis", 10000);
+		EmailMessage.setConnectionTimeout(connectionTimeout);
+		
+		this.clientHostname = props.getString("jetty.hostname", "localhost");
+		this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
+		
 		testMode = props.getBoolean("test.mode", false);
 	}
 	
@@ -52,145 +75,75 @@ public class Emailer extends AbstractMailer implements Alerter {
 			}
 		}
 	}
-	
+
 	public void sendFirstErrorMessage(ExecutableFlow flow) {
+		EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+		message.setFromAddress(mailSender);
+
 		ExecutionOptions option = flow.getExecutionOptions();
-		List<String> emailList = option.getDisabledJobs();
-		int execId = flow.getExecutionId();
-		
-		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = super.createEmailMessage(
-					"Flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName(), 
-					"text/html", 
-					emailList);
-			
-			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + getAzkabanName() + "</h2>");
-			
-			if (option.getFailureAction() == FailureAction.CANCEL_ALL) {
-				message.println("This flow is set to cancel all currently running jobs.");
-			}
-			else if (option.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE){
-				message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
-			}
-			else {
-				message.println("This flow is set to complete all currently running jobs before stopping.");
-			}
-			
-			message.println("<table>");
-			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
-			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
-			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
-			message.println("</table>");
-			message.println("");
-			String executionUrl = super.getReferenceURL() + "executor?" + "execid=" + execId;
-			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-			
-			message.println("");
-			message.println("<h3>Reason</h3>");
-			List<String> failedJobs = findFailedJobs(flow);
-			message.println("<ul>");
-			for (String jobId : failedJobs) {
-				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
-			}
-			
-			message.println("</ul>");
-			
-			if (!testMode) {
-				try {
-					message.sendEmail();
-				} catch (MessagingException e) {
-					logger.error("Email message send failed" , e);
-				}
+
+		MailCreator mailCreator = DefaultMailCreator.getCreator(option.getMailCreator());
+
+		logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
+
+		boolean mailCreated = mailCreator.createFirstErrorMessage(flow, message, azkabanName, clientHostname, clientPortNumber);
+
+		if (mailCreated && !testMode) {
+			try {
+				message.sendEmail();
+			} catch (MessagingException e) {
+				logger.error("Email message send failed", e);
 			}
 		}
 	}
-	
-	public void sendErrorEmail(ExecutableFlow flow, String ... extraReasons) {
+
+	public void sendErrorEmail(ExecutableFlow flow, String... extraReasons) {
+		EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+		message.setFromAddress(mailSender);
+
 		ExecutionOptions option = flow.getExecutionOptions();
-		
-		List<String> emailList = option.getFailureEmails();
-		int execId = flow.getExecutionId();
-		
-		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = super.createEmailMessage(
-					"Flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName(), 
-					"text/html", 
-					emailList);
-			
-			message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName() + "</h2>");
-			message.println("<table>");
-			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
-			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
-			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
-			message.println("</table>");
-			message.println("");
-			
-			String executionUrl = super.getReferenceURL() + "executor?" + "execid=" + execId;
-			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-			
-			message.println("");
-			message.println("<h3>Reason</h3>");
-			List<String> failedJobs = findFailedJobs(flow);
-			message.println("<ul>");
-			for (String jobId : failedJobs) {
-				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
-			}
-			for (String reasons: extraReasons) {
-				message.println("<li>" + reasons + "</li>");
-			}
-			
-			message.println("</ul>");
-			
-			if (!testMode) {
-				try {
-					message.sendEmail();
-				} catch (MessagingException e) {
-					logger.error("Email message send failed" , e);
-				}
+
+		MailCreator mailCreator = DefaultMailCreator.getCreator(option.getMailCreator());
+		logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
+
+		boolean mailCreated = mailCreator.createErrorEmail(flow, message, azkabanName, clientHostname, clientPortNumber, extraReasons);
+
+		if (mailCreated && !testMode) {
+			try {
+				message.sendEmail();
+			} catch (MessagingException e) {
+				logger.error("Email message send failed", e);
 			}
 		}
 	}
 
 	public void sendSuccessEmail(ExecutableFlow flow) {
+		EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+		message.setFromAddress(mailSender);
+
 		ExecutionOptions option = flow.getExecutionOptions();
-		List<String> emailList = option.getSuccessEmails();
 
-		int execId = flow.getExecutionId();
-		
-		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = super.createEmailMessage(
-					"Flow '" + flow.getFlowId() + "' has succeeded on " + getAzkabanName(), 
-					"text/html", 
-					emailList);
-			
-			message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + getAzkabanName() + "</h2>");
-			message.println("<table>");
-			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
-			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
-			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
-			message.println("</table>");
-			message.println("");
-			String executionUrl = super.getReferenceURL() + "executor?" + "execid=" + execId;
-			message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-			
-			if (!testMode) {
-				try {
-					message.sendEmail();
-				} catch (MessagingException e) {
-					logger.error("Email message send failed" , e);
-				}
+		MailCreator mailCreator = DefaultMailCreator.getCreator(option.getMailCreator());
+		logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
+
+		boolean mailCreated = mailCreator.createSuccessEmail(flow, message, azkabanName, clientHostname, clientPortNumber);
+
+		if (mailCreated && !testMode) {
+			try {
+				message.sendEmail();
+			} catch (MessagingException e) {
+				logger.error("Email message send failed", e);
 			}
 		}
 	}
-	
-	private List<String> findFailedJobs(ExecutableFlow flow) {
+
+	public static List<String> findFailedJobs(ExecutableFlow flow) {
 		ArrayList<String> failedJobs = new ArrayList<String>();
-		for (ExecutableNode node: flow.getExecutableNodes()) {
+		for (ExecutableNode node : flow.getExecutableNodes()) {
 			if (node.getStatus() == Status.FAILED) {
 				failedJobs.add(node.getJobId());
 			}
 		}
-		
 		return failedJobs;
 	}
 
@@ -214,4 +167,4 @@ public class Emailer extends AbstractMailer implements Alerter {
 			throws Exception {
 		sendSlaAlertEmail(slaOption, slaMessage);		
 	}
-}
\ No newline at end of file
+}
diff --git a/src/java/azkaban/utils/EmailMessage.java b/src/java/azkaban/utils/EmailMessage.java
index 06ca9bc..3a3475d 100644
--- a/src/java/azkaban/utils/EmailMessage.java
+++ b/src/java/azkaban/utils/EmailMessage.java
@@ -32,7 +32,9 @@ public class EmailMessage {
 	private String _fromAddress;
 	private String _mimeType = "text/plain";
 	private StringBuffer _body = new StringBuffer();
-
+	private static int _mailTimeout = 10000;
+	private static int _connectionTimeout = 10000;
+	
 	private ArrayList<BodyPart> _attachments = new ArrayList<BodyPart>();
 
 	public EmailMessage() {
@@ -44,7 +46,15 @@ public class EmailMessage {
 		_mailHost = host;
 		_mailPassword = password;
 	}
-
+	
+	public static void setTimeout(int timeoutMillis) {
+		_mailTimeout = timeoutMillis;
+	}
+	
+	public static void setConnectionTimeout(int timeoutMillis) {
+		_connectionTimeout = timeoutMillis;
+	}
+	
 	public EmailMessage setMailHost(String host) {
 		_mailHost = host;
 		return this;
@@ -136,6 +146,8 @@ public class EmailMessage {
 		props.put("mail."+protocol+".auth", "true");
 		props.put("mail.user", _mailUser);
 		props.put("mail.password", _mailPassword);
+		props.put("mail."+protocol+".timeout", _mailTimeout);
+		props.put("mail."+protocol+".connectiontimeout", _connectionTimeout);
 
 		Session session = Session.getInstance(props, null);
 		Message message = new MimeMessage(session);
@@ -149,15 +161,16 @@ public class EmailMessage {
 
 		if (_attachments.size() > 0) {
 			MimeMultipart multipart = new MimeMultipart("related");
+			
+			BodyPart messageBodyPart = new MimeBodyPart();
+			messageBodyPart.setContent(_body.toString(), _mimeType);
+			multipart.addBodyPart(messageBodyPart);
+			
 			// Add attachments
 			for (BodyPart part : _attachments) {
 				multipart.addBodyPart(part);
 			}
 
-			BodyPart messageBodyPart = new MimeBodyPart();
-			messageBodyPart.setContent(_body.toString(), _mimeType);
-			multipart.addBodyPart(messageBodyPart);
-
 			message.setContent(multipart);
 		} else {
 			message.setContent(_body.toString(), _mimeType);
diff --git a/src/java/azkaban/utils/SplitterOutputStream.java b/src/java/azkaban/utils/SplitterOutputStream.java
index 81821c6..cc1b4f5 100644
--- a/src/java/azkaban/utils/SplitterOutputStream.java
+++ b/src/java/azkaban/utils/SplitterOutputStream.java
@@ -38,15 +38,31 @@ public class SplitterOutputStream extends OutputStream {
 
 	@Override
 	public void flush() throws IOException {
+		IOException exception = null;
 		for (OutputStream output : outputs) {
-			output.flush();
+			try {
+				output.flush();
+			} catch (IOException e) {
+				exception = e;
+			}
+		}
+		if (exception != null) {
+			throw exception;
 		}
 	}
 
 	@Override
 	public void close() throws IOException {
+		IOException exception = null;
 		for (OutputStream output : outputs) {
-			output.close();
+			try {
+				output.close();
+			} catch (IOException e) {
+				exception = e;
+			}
+		}
+		if (exception != null) {
+			throw exception;
 		}
 	}
 
diff --git a/src/java/azkaban/utils/Utils.java b/src/java/azkaban/utils/Utils.java
index 5dfe71e..d65981d 100644
--- a/src/java/azkaban/utils/Utils.java
+++ b/src/java/azkaban/utils/Utils.java
@@ -135,6 +135,18 @@ public class Utils {
 		zOut.close();
 	}
 
+	public static void zipFolderContent(File folder, File output) throws IOException {
+		FileOutputStream out = new FileOutputStream(output);
+		ZipOutputStream zOut = new ZipOutputStream(out);
+		File[] files = folder.listFiles();
+		if (files != null) {
+			for (File f : files) {
+				zipFile("", f, zOut);
+			}
+		}
+		zOut.close();
+	}
+
 	private static void zipFile(String path, File input, ZipOutputStream zOut) throws IOException {
 		if (input.isDirectory()) {
 			File[] files = input.listFiles();
diff --git a/src/java/azkaban/utils/WebUtils.java b/src/java/azkaban/utils/WebUtils.java
index ef63fb6..c2cb3f8 100644
--- a/src/java/azkaban/utils/WebUtils.java
+++ b/src/java/azkaban/utils/WebUtils.java
@@ -154,6 +154,4 @@ public class WebUtils {
         else
             return sizeBytes + " B";
     }
-    
-    
 }
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 83ffba4..d9d8ed0 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -87,10 +87,11 @@ import azkaban.webapp.servlet.AzkabanServletContextListener;
 
 import azkaban.webapp.servlet.AbstractAzkabanServlet;
 import azkaban.webapp.servlet.ExecutorServlet;
+import azkaban.webapp.servlet.IndexRedirectServlet;
 import azkaban.webapp.servlet.JMXHttpServlet;
 import azkaban.webapp.servlet.ScheduleServlet;
 import azkaban.webapp.servlet.HistoryServlet;
-import azkaban.webapp.servlet.IndexServlet;
+import azkaban.webapp.servlet.ProjectServlet;
 import azkaban.webapp.servlet.ProjectManagerServlet;
 import azkaban.webapp.servlet.TriggerManagerServlet;
 import azkaban.webapp.servlet.TriggerPlugin;
@@ -744,11 +745,13 @@ public class AzkabanWebServer extends AzkabanServer {
 		logger.info("Setting up web resource dir " + staticDir);
 		Context root = new Context(server, "/", Context.SESSIONS);
 		root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);
-		
+
+		String defaultServletPath = azkabanSettings.getString("azkaban.default.servlet.path", "/index");
 		root.setResourceBase(staticDir);
-		ServletHolder index = new ServletHolder(new IndexServlet());
+		ServletHolder indexRedirect = new ServletHolder(new IndexRedirectServlet(defaultServletPath));
+		root.addServlet(indexRedirect, "/");
+		ServletHolder index = new ServletHolder(new ProjectServlet());
 		root.addServlet(index, "/index");
-		root.addServlet(index, "/");
 
 		ServletHolder staticServlet = new ServletHolder(new DefaultServlet());
 		root.addServlet(staticServlet, "/css/*");
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index fa4471b..ecaac63 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -766,6 +766,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		if (!options.isSuccessEmailsOverridden()) {
 			options.setSuccessEmails(flow.getSuccessEmails());
 		}
+		options.setMailCreator(flow.getMailCreator());
 		
 		try {
 			String message = executorManager.submitExecutableFlow(exflow, user.getUserId());
diff --git a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
index 4e71930..7b80139 100644
--- a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
+++ b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
@@ -10,6 +10,7 @@ import javax.servlet.http.HttpServletRequest;
 
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.mail.DefaultMailCreator;
 
 public class HttpRequestUtils {
 	public static ExecutionOptions parseFlowOptions(HttpServletRequest req) throws ServletException {
@@ -36,7 +37,7 @@ public class HttpRequestUtils {
 			boolean override = getBooleanParam(req, "successEmailsOverride", false);
 			execOptions.setSuccessEmailsOverridden(override);
 		}
-		
+
 		if (hasParam(req, "failureEmails")) {
 			String emails = getParam(req, "failureEmails");
 			if (!emails.isEmpty()) {
@@ -72,6 +73,11 @@ public class HttpRequestUtils {
 				execOptions.setPipelineLevel(queueLevel);
 			}
 		}
+		String mailCreator = DefaultMailCreator.DEFAULT_MAIL_CREATOR;
+		if (hasParam(req, "mailCreator")) {
+			mailCreator = getParam(req, "mailCreator");
+			execOptions.setMailCreator(mailCreator);
+		}
 		
 		Map<String, String> flowParamGroup = getParamGroup(req, "flowOverride");
 		execOptions.setFlowParameters(flowParamGroup);
diff --git a/src/java/azkaban/webapp/servlet/IndexRedirectServlet.java b/src/java/azkaban/webapp/servlet/IndexRedirectServlet.java
new file mode 100644
index 0000000..d02f759
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/IndexRedirectServlet.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.webapp.servlet;
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import azkaban.webapp.session.Session;
+
+/**
+ * The main page
+ */
+public class IndexRedirectServlet extends LoginAbstractAzkabanServlet {
+	private static final long serialVersionUID = -1;
+	private String defaultServletPath;
+
+	public IndexRedirectServlet(String defaultServletPath) {
+		this.defaultServletPath = defaultServletPath;
+		if (this.defaultServletPath.isEmpty() || this.defaultServletPath.equals("/")) {
+			this.defaultServletPath = "/index";
+		}
+	}
+
+	@Override
+	protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+		resp.sendRedirect(defaultServletPath);
+	}
+
+	@Override
+	protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+		resp.sendRedirect(defaultServletPath);
+	}
+}
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 8ce912b..3769c5b 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -62,6 +62,7 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 		contextType.put(".css", "text/css");
 		contextType.put(".png", "image/png");
 		contextType.put(".jpeg", "image/jpeg");
+		contextType.put(".gif", "image/gif");
 		contextType.put(".jpg", "image/jpeg");
 	}
 	
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 4109d63..62b181b 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -16,6 +16,8 @@
 
 package azkaban.webapp.servlet;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -33,6 +35,7 @@ import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -43,7 +46,6 @@ import org.joda.time.format.DateTimeFormat;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
@@ -452,7 +454,9 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		int loadAll = getIntParam(req, "loadAll");
 
 		// Cache file
-		File cache = new File("cache/schedule-history/" + startTime + ".cache");
+		String cacheDir = getApplication().getServerProps().getString("cache.directory", "cache");
+		File cacheDirFile = new File(cacheDir, "schedule-history");
+		File cache = new File(cacheDirFile, startTime + ".cache");
 		cache.getParentFile().mkdirs();
 
 		if (useCache) {
@@ -463,8 +467,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			}
 			if (cacheExists) {
 				// Send the cache instead
-				InputStream cacheInput = new FileInputStream(cache);
-				Utils.copyStream(cacheInput, resp.getOutputStream());
+				InputStream cacheInput = new BufferedInputStream(new FileInputStream(cache));
+				IOUtils.copy(cacheInput, resp.getOutputStream());
 				// System.out.println("Using cache copy for " + start);
 				return;
 			}
@@ -477,8 +481,9 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			ExecutorManagerAdapter executorManager = server.getExecutorManager();
 			history = executorManager.getExecutableFlows(null, null, null, 0, startTime, endTime, -1, -1);
 		} catch (ExecutorManagerException e) {
-			// Return empty should suffice
+			logger.error(e);
 		}
+		
 		HashMap<String, Object> ret = new HashMap<String, Object>();
 		List<HashMap<String, Object>> output = new ArrayList<HashMap<String, Object>>();
 		ret.put("items", output);
@@ -498,14 +503,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		}
 		
 		//Create cache file
-		File cacheTemp = new File("cache/schedule-history/" + startTime + ".tmp");
+		File cacheTemp = new File(cacheDirFile, startTime + ".tmp");
 		cacheTemp.createNewFile();
-		OutputStream cacheOutput = new FileOutputStream(cacheTemp);
-
+		OutputStream cacheOutput = new BufferedOutputStream(new FileOutputStream(cacheTemp));
+		OutputStream outputStream = new SplitterOutputStream(cacheOutput, resp.getOutputStream());
 		// Write to both the cache file and web output
-		JSONUtils.toJSON(ret, new SplitterOutputStream(cacheOutput, resp.getOutputStream()), false);
-		// System.out.println("Writing cache file for " + start);
-		// JSONUtils.toJSON(ret, new JSONCompressorOutputStream(resp.getOutputStream()), false);
+		JSONUtils.toJSON(ret, outputStream, false);
+		cacheOutput.close();
 		
 		//Move cache file
 		synchronized (this) {
diff --git a/src/java/azkaban/webapp/servlet/velocity/index.vm b/src/java/azkaban/webapp/servlet/velocity/index.vm
index 46e2ed0..73760bd 100644
--- a/src/java/azkaban/webapp/servlet/velocity/index.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/index.vm
@@ -50,12 +50,12 @@
 #if ($allProjects)
 					<h2>All Projects</h2>
 					<div class="section-sub-hd">
-						<h4><a href="${context}/">My Projects</a></h4>
+						<h4><a href="${context}/index">My Projects</a></h4>
 					</div>
 #else
 					<h2>My Projects</h2>
 					<div class="section-sub-hd">
-						<h4><a href="${context}/?all">All Projects</a></h4>
+						<h4><a href="${context}/index?all">All Projects</a></h4>
 					</div>
 #end
 					<form id="search-form" method="get">
diff --git a/src/java/azkaban/webapp/servlet/velocity/nav.vm b/src/java/azkaban/webapp/servlet/velocity/nav.vm
index 0ecef8e..95dd674 100644
--- a/src/java/azkaban/webapp/servlet/velocity/nav.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/nav.vm
@@ -23,7 +23,7 @@
 			</script>
 
 			<ul id="nav" class="nav">
-				<li id="all-jobs-tab" #if($current_page == 'all')class="selected"#end onClick="navMenuClick('$!context/')"><a href="$!context/">Projects</a></li>
+				<li id="all-jobs-tab" #if($current_page == 'all')class="selected"#end onClick="navMenuClick('$!context/index')"><a href="$!context/index">Projects</a></li>
 				<li id="scheduled-jobs-tab" #if($current_page == 'schedule')class="selected"#end onClick="navMenuClick('$!context/schedule')"><a href="$!context/schedule">Scheduled</a></li>
 
 				<!--li id="triggers-tab" #if($current_page == 'triggers')class="selected"#end onClick="navMenuClick('$!context/triggers')"><a href="$!context/triggers">Triggers</a></li-->
diff --git a/src/package/webserver/conf/azkaban.properties b/src/package/webserver/conf/azkaban.properties
index c9aed8d..3ccb2f3 100644
--- a/src/package/webserver/conf/azkaban.properties
+++ b/src/package/webserver/conf/azkaban.properties
@@ -1,7 +1,8 @@
 #Azkaban Personalization Settings
-azkaban.name=Local
+azkaban.name=Test
 azkaban.label=My Local Azkaban
 azkaban.color=#FF3601
+azkaban.default.servlet.path=/index
 web.resource.dir=web/
 default.timezone.id=America/Los_Angeles
 
@@ -43,4 +44,6 @@ mail.host=
 job.failure.email=
 job.success.email=
 
-lockdown.create.projects=false
\ No newline at end of file
+lockdown.create.projects=false
+
+cache.directory=cache
diff --git a/src/web/js/azkaban.schedule.svg.js b/src/web/js/azkaban.schedule.svg.js
index e9e583c..89118d1 100644
--- a/src/web/js/azkaban.schedule.svg.js
+++ b/src/web/js/azkaban.schedule.svg.js
@@ -432,8 +432,6 @@ $(function() {
 					{
 						var items = data.items;
 
-						console.log(data);
-
 						//Sort items by day
 						for(var i = 0; i < items.length; i++)
 						{
diff --git a/unit/java/azkaban/test/execapp/MockProjectLoader.java b/unit/java/azkaban/test/execapp/MockProjectLoader.java
index b10ea29..ef76de7 100644
--- a/unit/java/azkaban/test/execapp/MockProjectLoader.java
+++ b/unit/java/azkaban/test/execapp/MockProjectLoader.java
@@ -224,4 +224,10 @@ public class MockProjectLoader implements ProjectLoader {
 		// TODO Auto-generated method stub
 		
 	}
+
+	@Override
+	public void updateFlow(Project project, int version, Flow flow) throws ProjectManagerException {
+		// TODO Auto-generated method stub
+		
+	}
 }
\ No newline at end of file