/*
 * Decompiled with CFR 0.152.
 */
package azkaban.test.execapp.event;

import azkaban.execapp.FlowRunner;
import azkaban.execapp.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
import azkaban.execapp.event.LocalFlowWatcher;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.test.execapp.EventCollectorListener;
import azkaban.test.execapp.MockExecutorLoader;
import azkaban.test.execapp.MockProjectLoader;
import azkaban.test.executor.JavaJob;
import azkaban.utils.JSONUtils;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class LocalFlowWatcherTest {
    private File workingDir;
    private JobTypeManager jobtypeManager;
    private ProjectLoader fakeProjectLoader;
    private int dirVal = 0;

    @Before
    public void setUp() throws Exception {
        this.jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
        this.jobtypeManager.registerJobType("java", JavaJob.class);
        this.fakeProjectLoader = new MockProjectLoader(this.workingDir);
    }

    @After
    public void tearDown() throws IOException {
    }

    public File setupDirectory() throws IOException {
        System.out.println("Create temp dir");
        File workingDir = new File("_AzkabanTestDir_" + this.dirVal);
        if (workingDir.exists()) {
            FileUtils.deleteDirectory((File)workingDir);
        }
        workingDir.mkdirs();
        ++this.dirVal;
        return workingDir;
    }

    @Test
    public void testBasicLocalFlowWatcher() throws Exception {
        MockExecutorLoader loader = new MockExecutorLoader();
        EventCollectorListener eventCollector = new EventCollectorListener();
        File workingDir1 = this.setupDirectory();
        FlowRunner runner1 = this.createFlowRunner(workingDir1, (ExecutorLoader)loader, eventCollector, "exec1", 1, null, null);
        Thread runner1Thread = new Thread((Runnable)runner1);
        File workingDir2 = this.setupDirectory();
        LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
        FlowRunner runner2 = this.createFlowRunner(workingDir2, (ExecutorLoader)loader, eventCollector, "exec1", 2, (FlowWatcher)watcher, Integer.valueOf(2));
        Thread runner2Thread = new Thread((Runnable)runner2);
        runner1Thread.start();
        runner2Thread.start();
        runner2Thread.join();
        FileUtils.deleteDirectory((File)workingDir1);
        FileUtils.deleteDirectory((File)workingDir2);
        this.testPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow());
    }

    @Test
    public void testLevel1LocalFlowWatcher() throws Exception {
        MockExecutorLoader loader = new MockExecutorLoader();
        EventCollectorListener eventCollector = new EventCollectorListener();
        File workingDir1 = this.setupDirectory();
        FlowRunner runner1 = this.createFlowRunner(workingDir1, (ExecutorLoader)loader, eventCollector, "exec1", 1, null, null);
        Thread runner1Thread = new Thread((Runnable)runner1);
        File workingDir2 = this.setupDirectory();
        LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
        FlowRunner runner2 = this.createFlowRunner(workingDir2, (ExecutorLoader)loader, eventCollector, "exec1", 2, (FlowWatcher)watcher, Integer.valueOf(1));
        Thread runner2Thread = new Thread((Runnable)runner2);
        runner1Thread.start();
        runner2Thread.start();
        runner2Thread.join();
        FileUtils.deleteDirectory((File)workingDir1);
        FileUtils.deleteDirectory((File)workingDir2);
        this.testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
    }

    @Test
    public void testLevel2DiffLocalFlowWatcher() throws Exception {
        MockExecutorLoader loader = new MockExecutorLoader();
        EventCollectorListener eventCollector = new EventCollectorListener();
        File workingDir1 = this.setupDirectory();
        FlowRunner runner1 = this.createFlowRunner(workingDir1, (ExecutorLoader)loader, eventCollector, "exec1", 1, null, null);
        Thread runner1Thread = new Thread((Runnable)runner1);
        File workingDir2 = this.setupDirectory();
        LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
        FlowRunner runner2 = this.createFlowRunner(workingDir2, (ExecutorLoader)loader, eventCollector, "exec1-mod", 2, (FlowWatcher)watcher, Integer.valueOf(1));
        Thread runner2Thread = new Thread((Runnable)runner2);
        runner1Thread.start();
        runner2Thread.start();
        runner2Thread.join();
        FileUtils.deleteDirectory((File)workingDir1);
        FileUtils.deleteDirectory((File)workingDir2);
        this.testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
    }

    private void testPipelineLevel1(ExecutableFlow first, ExecutableFlow second) {
        for (ExecutableNode node : second.getExecutableNodes()) {
            Assert.assertEquals((Object)node.getStatus(), (Object)Status.SUCCEEDED);
            ExecutableNode watchedNode = first.getExecutableNode(node.getId());
            if (watchedNode == null) continue;
            Assert.assertEquals((Object)watchedNode.getStatus(), (Object)Status.SUCCEEDED);
            System.out.println("Node " + node.getId() + " start: " + node.getStartTime() + " dependent on " + watchedNode.getId() + " " + watchedNode.getEndTime() + " diff: " + (node.getStartTime() - watchedNode.getEndTime()));
            Assert.assertTrue((node.getStartTime() >= watchedNode.getEndTime() ? 1 : 0) != 0);
            long minParentDiff = 0L;
            if (node.getInNodes().size() > 0) {
                minParentDiff = Long.MAX_VALUE;
                for (String dependency : node.getInNodes()) {
                    ExecutableNode parent = second.getExecutableNode(dependency);
                    long diff = node.getStartTime() - parent.getEndTime();
                    minParentDiff = Math.min(minParentDiff, diff);
                }
            }
            long diff = node.getStartTime() - watchedNode.getEndTime();
            System.out.println("   minPipelineTimeDiff:" + diff + " minDependencyTimeDiff:" + minParentDiff);
            Assert.assertTrue((minParentDiff < 100L || diff < 100L ? 1 : 0) != 0);
        }
    }

    private void testPipelineLevel2(ExecutableFlow first, ExecutableFlow second) {
        for (ExecutableNode node : second.getExecutableNodes()) {
            Assert.assertEquals((Object)node.getStatus(), (Object)Status.SUCCEEDED);
            ExecutableNode watchedNode = first.getExecutableNode(node.getId());
            if (watchedNode == null) continue;
            Assert.assertEquals((Object)watchedNode.getStatus(), (Object)Status.SUCCEEDED);
            long minDiff = Long.MAX_VALUE;
            for (String watchedChild : watchedNode.getOutNodes()) {
                ExecutableNode child = first.getExecutableNode(watchedChild);
                if (child == null) continue;
                Assert.assertEquals((Object)child.getStatus(), (Object)Status.SUCCEEDED);
                long diff = node.getStartTime() - child.getEndTime();
                minDiff = Math.min(minDiff, diff);
                System.out.println("Node " + node.getId() + " start: " + node.getStartTime() + " dependent on " + watchedChild + " " + child.getEndTime() + " diff: " + diff);
                Assert.assertTrue((node.getStartTime() >= child.getEndTime() ? 1 : 0) != 0);
            }
            long minParentDiff = Long.MAX_VALUE;
            for (String dependency : node.getInNodes()) {
                ExecutableNode parent = second.getExecutableNode(dependency);
                long diff = node.getStartTime() - parent.getEndTime();
                minParentDiff = Math.min(minParentDiff, diff);
            }
            System.out.println("   minPipelineTimeDiff:" + minDiff + " minDependencyTimeDiff:" + minParentDiff);
            Assert.assertTrue((minParentDiff < 100L || minDiff < 100L ? 1 : 0) != 0);
        }
    }

    private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader, EventCollectorListener eventCollector, String flowName, int execId, FlowWatcher watcher, Integer pipeline) throws Exception {
        File testDir = new File("unit/executions/exectest1");
        ExecutableFlow exFlow = this.prepareExecDir(workingDir, testDir, flowName, execId);
        ExecutionOptions option = exFlow.getExecutionOptions();
        if (watcher != null) {
            option.setPipelineLevel(pipeline);
            option.setPipelineExecutionId(Integer.valueOf(watcher.getExecId()));
        }
        loader.uploadExecutableFlow(exFlow);
        FlowRunner runner = new FlowRunner(exFlow, loader, this.fakeProjectLoader, this.jobtypeManager);
        runner.setFlowWatcher(watcher);
        runner.addListener((EventListener)eventCollector);
        return runner;
    }

    private ExecutableFlow prepareExecDir(File workingDir, File execDir, String flowName, int execId) throws IOException {
        FileUtils.copyDirectory((File)execDir, (File)workingDir);
        File jsonFlowFile = new File(workingDir, String.valueOf(flowName) + ".flow");
        HashMap flowObj = (HashMap)JSONUtils.parseJSONFromFile((File)jsonFlowFile);
        Project project = new Project(1, "test");
        Flow flow = Flow.flowFromObject((Object)flowObj);
        ExecutableFlow execFlow = new ExecutableFlow(project, flow);
        execFlow.setExecutionId(execId);
        execFlow.setExecutionPath(workingDir.getPath());
        return execFlow;
    }
}

