LocalFileScheduleLoader.java

381 lines | 11.689 kB Blame History Raw Download
package azkaban.scheduler;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

import org.joda.time.DurationFieldType;
import org.joda.time.Months;
import org.joda.time.Weeks;
import org.joda.time.Days;
import org.joda.time.Hours;
import org.joda.time.Minutes;
import org.joda.time.ReadablePeriod;
import org.joda.time.Seconds;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import azkaban.utils.Props;
import azkaban.utils.JSONUtils;

/**
 * Loads the schedule from a schedule file that is JSON like. The format would be as follows:
 * 
 * {
 * 		schedule: [
 * 			{
 * 				"project": "<project>",
 * 				"user": "<user>",
 * 				"flow": "<flow>",
 * 				"time": "<time>",
 * 				"recurrence":"<period>",
 * 				"dependency":<boolean>
 * 			}
 * 		]
 * }	
 * 
 * 
 * @author rpark
 *
 */
public class LocalFileScheduleLoader implements ScheduleLoader {
	private static final String SCHEDULEID = "scheduleId";
	private static final String PROJECTID = "projectId";
	private static final String FLOWID = "flowId";
	private static final String USER = "user";
	private static final String USERSUBMIT = "userSubmit";
	private static final String SUBMITTIME = "submitTime";
	private static final String FIRSTSCHEDTIME = "firstSchedTime";
	
	private static final String SCHEDULE = "schedule";
	private static final String NEXTEXECTIME = "nextExecTime";
	private static final String TIMEZONE = "timezone";
	private static final String RECURRENCE = "recurrence";
	
	private static final String SCHEDULESTATUS = "schedulestatus";
	
	private static DateTimeFormatter FILE_DATEFORMAT = DateTimeFormat.forPattern("yyyy-MM-dd.HH.mm.ss.SSS");
    private static Logger logger = Logger.getLogger(LocalFileScheduleLoader.class);

    private File basePath;
	private File scheduleFile;
	private File backupScheduleFile;
	
	public LocalFileScheduleLoader(Props props) throws IOException {
			
			
		basePath = new File(props.getString("schedule.directory"));
		if (!basePath.exists()) {
			logger.info("Schedule directory " + basePath + " not found.");
			if (basePath.mkdirs()) {
				logger.info("Schedule directory " + basePath + " created.");
			}
			else {
				throw new RuntimeException("Schedule directory " + basePath + " does not exist and cannot be created.");
			}
		}
			
		scheduleFile = new File(basePath, "schedule");
		if(!scheduleFile.exists() || scheduleFile.isDirectory()) {
			logger.info("Schedule file " + scheduleFile + " not found.");
			if(scheduleFile.createNewFile() && scheduleFile.canRead() && scheduleFile.canWrite()) {
				logger.info("Schedule file " + scheduleFile + " created.");
			}
			else {
				throw new RuntimeException("Schedule file " + scheduleFile + " cannot be created.");
			}
		}

		backupScheduleFile = new File(basePath, "backup");
		if(!backupScheduleFile.exists() || backupScheduleFile.isDirectory()) {
			logger.info("Backup schedule file " + backupScheduleFile + " not found.");
			if(backupScheduleFile.createNewFile() && backupScheduleFile.canRead() && backupScheduleFile.canWrite()) {
				logger.info("Backup schedule file " + backupScheduleFile + " created.");
			}
			else {
				throw new RuntimeException("Backup schedule file " + backupScheduleFile + " cannot be created.");
			}
		}

	}


	@Override
	public List<ScheduledFlow> loadSchedule() {
		if (scheduleFile != null && backupScheduleFile != null) {
			if (scheduleFile.exists()) {
				if (scheduleFile.length() == 0)
					return new ArrayList<ScheduledFlow>();
				return loadFromFile(scheduleFile);
			} else if (backupScheduleFile.exists()) {
				backupScheduleFile.renameTo(scheduleFile);
				return loadFromFile(scheduleFile);
			} else {
				logger.warn("No schedule files found looking for "
						+ scheduleFile.getAbsolutePath());
			}
		}

		return new ArrayList<ScheduledFlow>();
	}

	@Override
	public void saveSchedule(List<ScheduledFlow> schedule) {
		if (scheduleFile != null && backupScheduleFile != null) {
			// Delete the backup if it exists and a current file exists.
			if (backupScheduleFile.exists() && scheduleFile.exists()) {
				backupScheduleFile.delete();
			}

			// Rename the schedule if it exists.
			if (scheduleFile.exists()) {
				scheduleFile.renameTo(backupScheduleFile);
			}

			HashMap<String, Object> obj = new HashMap<String, Object>();
			ArrayList<Object> schedules = new ArrayList<Object>();
			obj.put(SCHEDULE, schedules);
			// Write out schedule.

			for (ScheduledFlow schedFlow : schedule) {
				schedules.add(createJSONObject(schedFlow));
			}

			try {
				FileWriter writer = new FileWriter(scheduleFile);
				writer.write(JSONUtils.toJSON(obj, true));
				writer.flush();
			} catch (Exception e) {
				throw new RuntimeException("Error saving flow file", e);
			}
			logger.info("schedule saved");
		}
	}
	
    @SuppressWarnings("unchecked")
	private List<ScheduledFlow> loadFromFile(File schedulefile) {
		BufferedReader reader = null;
		try {
			reader = new BufferedReader(new FileReader(schedulefile));
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			logger.error("Error loading schedule file ", e);
		}
		List<ScheduledFlow> scheduleList = new ArrayList<ScheduledFlow>();

		HashMap<String, Object> schedule;
		try {
			// TODO handle first time empty schedule file
			schedule = (HashMap<String, Object>) JSONUtils
					.parseJSONFromReader(reader);
		} catch (Exception e) {
			// schedule = loadLegacyFile(schedulefile);
			logger.error("Error parsing the schedule file", e);
			throw new RuntimeException("Error parsing the schedule file", e);
		} finally {
			try {
				reader.close();
			} catch (IOException e) {
			}
		}
		
		ArrayList<Object> array = (ArrayList<Object>)schedule.get("schedule");
		for (int i = 0; i < array.size(); ++i) {
			HashMap<String, Object> schedItem = (HashMap<String, Object>)array.get(i);
			ScheduledFlow sched = createScheduledFlow(schedItem);
			if (sched != null) {
				scheduleList.add(sched);	
			}
		}
		
		return scheduleList;
	}

	private ScheduledFlow createScheduledFlow(HashMap<String, Object> obj) {
		String scheduleId = (String) obj.get(SCHEDULEID);
		String projectId = (String) obj.get(PROJECTID);
		String flowId = (String) obj.get(FLOWID);
		String user = (String) obj.get(USER);
		String userSubmit = (String) obj.get(USERSUBMIT);
		String submitTimeRaw = (String) obj.get(SUBMITTIME);
		String firstSchedTimeRaw = (String) obj.get(FIRSTSCHEDTIME);
		String nextExecTimeRaw = (String) obj.get(NEXTEXECTIME);
		String timezone = (String) obj.get(TIMEZONE);
		String recurrence = (String) obj.get(RECURRENCE);
		// String scheduleStatus = (String)obj.get(SCHEDULESTATUS);

		DateTime nextExecTime = FILE_DATEFORMAT.parseDateTime(nextExecTimeRaw);
		DateTime submitTime = FILE_DATEFORMAT.parseDateTime(submitTimeRaw);
		DateTime firstSchedTime = FILE_DATEFORMAT
				.parseDateTime(firstSchedTimeRaw);

		if (nextExecTime == null) {
			logger.error("No next execution time has been set");
			return null;
		}

		if (submitTime == null) {
			logger.error("No submitTime has been set");
		}

		if (firstSchedTime == null) {
			logger.error("No first scheduled time has been set");
		}

		if (timezone != null) {
			nextExecTime = nextExecTime.withZoneRetainFields(DateTimeZone
					.forID(timezone));
		}

		ReadablePeriod period = null;
		if (recurrence != null) {
			period = parsePeriodString(scheduleId, recurrence);
		}

		ScheduledFlow scheduledFlow = new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime, nextExecTime, period);
		if (scheduledFlow.updateTime()) {
			return scheduledFlow;
		}

		logger.info("Removed " + scheduleId
				+ " off out of scheduled. It is not recurring.");
		return null;
	}

	private HashMap<String, Object> createJSONObject(ScheduledFlow flow) {
		HashMap<String, Object> object = new HashMap<String, Object>();
		object.put(SCHEDULEID, flow.getScheduleId());
		object.put(PROJECTID, flow.getProjectId());
		object.put(FLOWID, flow.getFlowId());
		object.put(USER, flow.getUser());
		object.put(USERSUBMIT, flow.getUserSubmit());

		object.put(SUBMITTIME, FILE_DATEFORMAT.print(flow.getSubmitTime()));
		object.put(FIRSTSCHEDTIME,
				FILE_DATEFORMAT.print(flow.getFirstSchedTime()));

		object.put(NEXTEXECTIME, FILE_DATEFORMAT.print(flow.getNextExecTime()));
		object.put(TIMEZONE, flow.getNextExecTime().getZone().getID());
		object.put(RECURRENCE, createPeriodString(flow.getPeriod()));
		// object.put(SCHEDULESTATUS, flow.getSchedStatus());

		return object;
	}

	private ReadablePeriod parsePeriodString(String scheduleId, String periodStr) {
		ReadablePeriod period;
		char periodUnit = periodStr.charAt(periodStr.length() - 1);
		if (periodUnit == 'n') {
			return null;
		}

		int periodInt = Integer.parseInt(periodStr.substring(0,
				periodStr.length() - 1));
		switch (periodUnit) {
		case 'M':
			period = Months.months(periodInt);
			break;
		case 'w':
			period = Weeks.weeks(periodInt);
			break;
		case 'd':
			period = Days.days(periodInt);
			break;
		case 'h':
			period = Hours.hours(periodInt);
			break;
		case 'm':
			period = Minutes.minutes(periodInt);
			break;
		case 's':
			period = Seconds.seconds(periodInt);
			break;
		default:
			throw new IllegalArgumentException("Invalid schedule period unit '"
					+ periodUnit + "' for flow " + scheduleId);
		}

		return period;
	}

	private String createPeriodString(ReadablePeriod period) {
		String periodStr = "n";

		if (period == null) {
			return "n";
		}

		if (period.get(DurationFieldType.months()) > 0) {
			int months = period.get(DurationFieldType.months());
			periodStr = months + "M";
		} else if (period.get(DurationFieldType.weeks()) > 0) {
			int weeks = period.get(DurationFieldType.weeks());
			periodStr = weeks + "w";
		} else if (period.get(DurationFieldType.days()) > 0) {
			int days = period.get(DurationFieldType.days());
			periodStr = days + "d";
		} else if (period.get(DurationFieldType.hours()) > 0) {
			int hours = period.get(DurationFieldType.hours());
			periodStr = hours + "h";
		} else if (period.get(DurationFieldType.minutes()) > 0) {
			int minutes = period.get(DurationFieldType.minutes());
			periodStr = minutes + "m";
		} else if (period.get(DurationFieldType.seconds()) > 0) {
			int seconds = period.get(DurationFieldType.seconds());
			periodStr = seconds + "s";
		}

		return periodStr;
	}

//	private HashMap<String, Object> loadLegacyFile(File schedulefile) {
//		Props schedule = null;
//		try {
//			schedule = new Props(null, schedulefile.getAbsolutePath());
//		} catch (Exception e) {
//			throw new RuntimeException("Error loading schedule from "
//					+ schedulefile);
//		}
//
//		ArrayList<Object> flowScheduleList = new ArrayList<Object>();
//		for (String key : schedule.getKeySet()) {
//			HashMap<String, Object> scheduledMap = parseScheduledFlow(key,
//					schedule.get(key));
//			if (scheduledMap == null) {
//				flowScheduleList.add(scheduledMap);
//			}
//		}
//
//		HashMap<String, Object> scheduleMap = new HashMap<String, Object>();
//		scheduleMap.put(SCHEDULE, flowScheduleList);
//
//		return scheduleMap;
//	}

//	private HashMap<String, Object> parseScheduledFlow(String name, String flow) {
//		String[] pieces = flow.split("\\s+");
//
//		if (pieces.length != 3) {
//			logger.warn("Error loading schedule from file " + name);
//			return null;
//		}
//
//		HashMap<String, Object> scheduledFlow = new HashMap<String, Object>();
//		scheduledFlow.put(PROJECTID, name);
//		scheduledFlow.put(TIME, pieces[0]);
//		scheduledFlow.put(RECURRENCE, pieces[1]);
//		Boolean dependency = Boolean.parseBoolean(pieces[2]);
//
//		return scheduledFlow;
//	}
}