RemoteFlowWatcher.java

128 lines | 3.805 kB Blame History Raw Download
/*
 * Copyright 2012 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.execapp.event;

import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import java.util.ArrayList;
import java.util.Map;

public class RemoteFlowWatcher extends FlowWatcher {

  private final static long CHECK_INTERVAL_MS = 60 * 1000;

  private int execId;
  private ExecutorLoader loader;
  private ExecutableFlow flow;
  private RemoteUpdaterThread thread;
  private boolean isShutdown = false;

  // Every minute
  private long checkIntervalMs = CHECK_INTERVAL_MS;

  public RemoteFlowWatcher(final int execId, final ExecutorLoader loader) {
    this(execId, loader, CHECK_INTERVAL_MS);
  }

  public RemoteFlowWatcher(final int execId, final ExecutorLoader loader, final long interval) {
    super(execId);
    this.checkIntervalMs = interval;

    try {
      this.flow = loader.fetchExecutableFlow(execId);
    } catch (final ExecutorManagerException e) {
      return;
    }

    super.setFlow(this.flow);
    this.loader = loader;
    this.execId = execId;
    if (this.flow != null) {
      this.thread = new RemoteUpdaterThread();
      this.thread.setName("Remote-watcher-flow-" + execId);
      this.thread.start();
    }
  }

  @Override
  public synchronized void stopWatcher() {
    if (this.isShutdown) {
      return;
    }
    this.isShutdown = true;
    if (this.thread != null) {
      this.thread.interrupt();
    }
    super.unblockAllWatches();
    this.loader = null;
    this.flow = null;
  }

  private class RemoteUpdaterThread extends Thread {

    @Override
    public void run() {
      do {
        ExecutableFlow updateFlow = null;
        try {
          updateFlow = RemoteFlowWatcher.this.loader.fetchExecutableFlow(
              RemoteFlowWatcher.this.execId);
        } catch (final ExecutorManagerException e) {
          e.printStackTrace();
          RemoteFlowWatcher.this.isShutdown = true;
        }

        long updateTime = 0;
        if (RemoteFlowWatcher.this.flow == null) {
          RemoteFlowWatcher.this.flow = updateFlow;
        } else {
          final Map<String, Object> updateData =
              updateFlow.toUpdateObject(updateTime);
          final ArrayList<ExecutableNode> updatedNodes =
              new ArrayList<>();
          RemoteFlowWatcher.this.flow.applyUpdateObject(updateData, updatedNodes);

          RemoteFlowWatcher.this.flow.setStatus(updateFlow.getStatus());
          RemoteFlowWatcher.this.flow.setEndTime(updateFlow.getEndTime());
          RemoteFlowWatcher.this.flow.setUpdateTime(updateFlow.getUpdateTime());

          for (final ExecutableNode node : updatedNodes) {
            handleJobStatusChange(node.getNestedId(), node.getStatus());
          }

          updateTime = RemoteFlowWatcher.this.flow.getUpdateTime();
        }

        if (Status.isStatusFinished(RemoteFlowWatcher.this.flow.getStatus())) {
          RemoteFlowWatcher.this.isShutdown = true;
        } else {
          synchronized (this) {
            try {
              wait(RemoteFlowWatcher.this.checkIntervalMs);
            } catch (final InterruptedException e) {
            }
          }
        }
      } while (!RemoteFlowWatcher.this.isShutdown);
    }

  }
}