azkaban-aplcache

Consolidate duplicate transformByte To Object code (#1388) *

8/25/2017 4:04:51 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/database/EncodingType.java b/azkaban-common/src/main/java/azkaban/database/EncodingType.java
index 1b17b5d..1b069f1 100644
--- a/azkaban-common/src/main/java/azkaban/database/EncodingType.java
+++ b/azkaban-common/src/main/java/azkaban/database/EncodingType.java
@@ -19,6 +19,8 @@ package azkaban.database;
 /**
  * Used for when we store text data. Plain uses UTF8 encoding.
  */
+// TODO kunkun-tang: This class needs to move to azkaban-db module, as database module should be
+// Deprecated soon.
 public enum EncodingType {
   PLAIN(1), GZIP(2);
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index 26046b2..457c4f5 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -298,26 +298,10 @@ public class ExecutionFlowDao {
 
         if (data != null) {
           final EncodingType encType = EncodingType.fromInteger(encodingType);
-          final Object flowObj;
-
-          /**
-           * The below code is a duplicate against many places, like azkaban.database.EncodingType
-           * TODO kunkun-tang: Extract these duplicates to a single static method.
-           */
           try {
-            // Convoluted way to inflate strings. Should find common package
-            // or helper function.
-            if (encType == EncodingType.GZIP) {
-              // Decompress the sucker.
-              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            } else {
-              final String jsonString = new String(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            }
-
             final ExecutableFlow exFlow =
-                ExecutableFlow.createExecutableFlowFromObject(flowObj);
+                ExecutableFlow.createExecutableFlowFromObject(
+                    GZIPUtils.transformBytesToObject(data, encType));
             execFlows.add(exFlow);
           } catch (final IOException e) {
             throw new SQLException("Error retrieving flow data " + id, e);
@@ -358,21 +342,10 @@ public class ExecutionFlowDao {
           logger.error("Found a flow with empty data blob exec_id: " + id);
         } else {
           final EncodingType encType = EncodingType.fromInteger(encodingType);
-          final Object flowObj;
           try {
-            // Convoluted way to inflate strings. Should find common package or
-            // helper function.
-            if (encType == EncodingType.GZIP) {
-              // Decompress the sucker.
-              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            } else {
-              final String jsonString = new String(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            }
-
             final ExecutableFlow exFlow =
-                ExecutableFlow.createExecutableFlowFromObject(flowObj);
+                ExecutableFlow.createExecutableFlowFromObject(
+                    GZIPUtils.transformBytesToObject(data, encType));
             final ExecutionReference ref = new ExecutionReference(id);
             execFlows.add(new Pair<>(ref, exFlow));
           } catch (final IOException e) {
@@ -407,19 +380,10 @@ public class ExecutionFlowDao {
 
         if (data != null) {
           final EncodingType encType = EncodingType.fromInteger(encodingType);
-          final Object flowObj;
           try {
-            if (encType == EncodingType.GZIP) {
-              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            } else {
-              final String jsonString = new String(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            }
-
             final ExecutableFlow exFlow =
-                ExecutableFlow.createExecutableFlowFromObject(flowObj);
-
+                ExecutableFlow.createExecutableFlowFromObject(
+                    GZIPUtils.transformBytesToObject(data, encType));
             execFlows.add(exFlow);
           } catch (final IOException e) {
             throw new SQLException("Error retrieving flow data " + id, e);
diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index 0b3f06c..fcf1cb3 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -19,7 +19,6 @@ package azkaban.executor;
 import azkaban.database.EncodingType;
 import azkaban.db.DatabaseOperator;
 import azkaban.utils.GZIPUtils;
-import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import java.io.IOException;
 import java.sql.ResultSet;
@@ -109,21 +108,11 @@ public class FetchActiveFlowDao {
           execFlows.put(id, null);
         } else {
           final EncodingType encType = EncodingType.fromInteger(encodingType);
-          final Object flowObj;
           try {
-            // Convoluted way to inflate strings. Should find common package or
-            // helper function.
-            if (encType == EncodingType.GZIP) {
-              // Decompress the sucker.
-              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            } else {
-              final String jsonString = new String(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            }
-
             final ExecutableFlow exFlow =
-                ExecutableFlow.createExecutableFlowFromObject(flowObj);
+                ExecutableFlow.createExecutableFlowFromObject(
+                    GZIPUtils.transformBytesToObject(data, encType));
+
             final Executor executor = new Executor(executorId, host, port, executorStatus);
             final ExecutionReference ref = new ExecutionReference(id, executor);
             execFlows.put(id, new Pair<>(ref, exFlow));
@@ -173,18 +162,11 @@ public class FetchActiveFlowDao {
           logger.error("Found a flow with empty data blob exec_id: " + id);
         } else {
           final EncodingType encType = EncodingType.fromInteger(encodingType);
-          final Object flowObj;
           try {
-            if (encType == EncodingType.GZIP) {
-              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            } else {
-              final String jsonString = new String(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            }
-
             final ExecutableFlow exFlow =
-                ExecutableFlow.createExecutableFlowFromObject(flowObj);
+                ExecutableFlow.createExecutableFlowFromObject(
+                    GZIPUtils.transformBytesToObject(data, encType));
+
             final Executor executor = new Executor(executorId, host, port, executorStatus);
             final ExecutionReference ref = new ExecutionReference(id, executor);
             execFlows.add(new Pair<>(ref, exFlow));
diff --git a/azkaban-common/src/main/java/azkaban/utils/GZIPUtils.java b/azkaban-common/src/main/java/azkaban/utils/GZIPUtils.java
index 97f7472..c9158d5 100644
--- a/azkaban-common/src/main/java/azkaban/utils/GZIPUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/GZIPUtils.java
@@ -16,6 +16,7 @@
 
 package azkaban.utils;
 
+import azkaban.database.EncodingType;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -63,4 +64,16 @@ public class GZIPUtils {
     final byte[] response = unGzipBytes(bytes);
     return new String(response, encType);
   }
+
+  public static Object transformBytesToObject(final byte[] data, final EncodingType encType)
+      throws IOException {
+    if (encType == EncodingType.GZIP) {
+      final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+      return JSONUtils.parseJSONFromString(jsonString);
+    } else {
+      final String jsonString = new String(data, "UTF-8");
+      return JSONUtils.parseJSONFromString(jsonString);
+    }
+  }
+
 }