Details
diff --git a/azkaban-common/src/main/java/azkaban/database/EncodingType.java b/azkaban-common/src/main/java/azkaban/database/EncodingType.java
index 1b17b5d..1b069f1 100644
--- a/azkaban-common/src/main/java/azkaban/database/EncodingType.java
+++ b/azkaban-common/src/main/java/azkaban/database/EncodingType.java
@@ -19,6 +19,8 @@ package azkaban.database;
/**
* Used for when we store text data. Plain uses UTF8 encoding.
*/
+// TODO kunkun-tang: This class needs to move to azkaban-db module, as database module should be
+// Deprecated soon.
public enum EncodingType {
PLAIN(1), GZIP(2);
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index 26046b2..457c4f5 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -298,26 +298,10 @@ public class ExecutionFlowDao {
if (data != null) {
final EncodingType encType = EncodingType.fromInteger(encodingType);
- final Object flowObj;
-
- /**
- * The below code is a duplicate against many places, like azkaban.database.EncodingType
- * TODO kunkun-tang: Extract these duplicates to a single static method.
- */
try {
- // Convoluted way to inflate strings. Should find common package
- // or helper function.
- if (encType == EncodingType.GZIP) {
- // Decompress the sucker.
- final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- } else {
- final String jsonString = new String(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- }
-
final ExecutableFlow exFlow =
- ExecutableFlow.createExecutableFlowFromObject(flowObj);
+ ExecutableFlow.createExecutableFlowFromObject(
+ GZIPUtils.transformBytesToObject(data, encType));
execFlows.add(exFlow);
} catch (final IOException e) {
throw new SQLException("Error retrieving flow data " + id, e);
@@ -358,21 +342,10 @@ public class ExecutionFlowDao {
logger.error("Found a flow with empty data blob exec_id: " + id);
} else {
final EncodingType encType = EncodingType.fromInteger(encodingType);
- final Object flowObj;
try {
- // Convoluted way to inflate strings. Should find common package or
- // helper function.
- if (encType == EncodingType.GZIP) {
- // Decompress the sucker.
- final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- } else {
- final String jsonString = new String(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- }
-
final ExecutableFlow exFlow =
- ExecutableFlow.createExecutableFlowFromObject(flowObj);
+ ExecutableFlow.createExecutableFlowFromObject(
+ GZIPUtils.transformBytesToObject(data, encType));
final ExecutionReference ref = new ExecutionReference(id);
execFlows.add(new Pair<>(ref, exFlow));
} catch (final IOException e) {
@@ -407,19 +380,10 @@ public class ExecutionFlowDao {
if (data != null) {
final EncodingType encType = EncodingType.fromInteger(encodingType);
- final Object flowObj;
try {
- if (encType == EncodingType.GZIP) {
- final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- } else {
- final String jsonString = new String(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- }
-
final ExecutableFlow exFlow =
- ExecutableFlow.createExecutableFlowFromObject(flowObj);
-
+ ExecutableFlow.createExecutableFlowFromObject(
+ GZIPUtils.transformBytesToObject(data, encType));
execFlows.add(exFlow);
} catch (final IOException e) {
throw new SQLException("Error retrieving flow data " + id, e);
diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index 0b3f06c..fcf1cb3 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -19,7 +19,6 @@ package azkaban.executor;
import azkaban.database.EncodingType;
import azkaban.db.DatabaseOperator;
import azkaban.utils.GZIPUtils;
-import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import java.io.IOException;
import java.sql.ResultSet;
@@ -109,21 +108,11 @@ public class FetchActiveFlowDao {
execFlows.put(id, null);
} else {
final EncodingType encType = EncodingType.fromInteger(encodingType);
- final Object flowObj;
try {
- // Convoluted way to inflate strings. Should find common package or
- // helper function.
- if (encType == EncodingType.GZIP) {
- // Decompress the sucker.
- final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- } else {
- final String jsonString = new String(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- }
-
final ExecutableFlow exFlow =
- ExecutableFlow.createExecutableFlowFromObject(flowObj);
+ ExecutableFlow.createExecutableFlowFromObject(
+ GZIPUtils.transformBytesToObject(data, encType));
+
final Executor executor = new Executor(executorId, host, port, executorStatus);
final ExecutionReference ref = new ExecutionReference(id, executor);
execFlows.put(id, new Pair<>(ref, exFlow));
@@ -173,18 +162,11 @@ public class FetchActiveFlowDao {
logger.error("Found a flow with empty data blob exec_id: " + id);
} else {
final EncodingType encType = EncodingType.fromInteger(encodingType);
- final Object flowObj;
try {
- if (encType == EncodingType.GZIP) {
- final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- } else {
- final String jsonString = new String(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- }
-
final ExecutableFlow exFlow =
- ExecutableFlow.createExecutableFlowFromObject(flowObj);
+ ExecutableFlow.createExecutableFlowFromObject(
+ GZIPUtils.transformBytesToObject(data, encType));
+
final Executor executor = new Executor(executorId, host, port, executorStatus);
final ExecutionReference ref = new ExecutionReference(id, executor);
execFlows.add(new Pair<>(ref, exFlow));
diff --git a/azkaban-common/src/main/java/azkaban/utils/GZIPUtils.java b/azkaban-common/src/main/java/azkaban/utils/GZIPUtils.java
index 97f7472..c9158d5 100644
--- a/azkaban-common/src/main/java/azkaban/utils/GZIPUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/GZIPUtils.java
@@ -16,6 +16,7 @@
package azkaban.utils;
+import azkaban.database.EncodingType;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -63,4 +64,16 @@ public class GZIPUtils {
final byte[] response = unGzipBytes(bytes);
return new String(response, encType);
}
+
+ public static Object transformBytesToObject(final byte[] data, final EncodingType encType)
+ throws IOException {
+ if (encType == EncodingType.GZIP) {
+ final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ return JSONUtils.parseJSONFromString(jsonString);
+ } else {
+ final String jsonString = new String(data, "UTF-8");
+ return JSONUtils.parseJSONFromString(jsonString);
+ }
+ }
+
}