killbill-memoizeit

analytics: ensure all graphs share the same X axis This revisit

5/17/2013 4:26:51 PM

Details

diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java
index 6ceed5f..e9530c3 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java
@@ -39,6 +39,7 @@ public class AnalyticsActivator extends KillbillActivatorBase {
 
     private OSGIKillbillEventHandler analyticsListener;
     private JobsScheduler jobsScheduler;
+    private ReportsUserApi reportsUserApi;
 
     @Override
     public void start(final BundleContext context) throws Exception {
@@ -54,7 +55,7 @@ public class AnalyticsActivator extends KillbillActivatorBase {
         reportsConfiguration.initialize();
 
         final AnalyticsUserApi analyticsUserApi = new AnalyticsUserApi(logService, killbillAPI, dataSource, executor);
-        final ReportsUserApi reportsUserApi = new ReportsUserApi(dataSource, reportsConfiguration);
+        reportsUserApi = new ReportsUserApi(dataSource, reportsConfiguration);
         final AnalyticsServlet analyticsServlet = new AnalyticsServlet(analyticsUserApi, reportsUserApi, logService);
         registerServlet(context, analyticsServlet);
     }
@@ -64,6 +65,9 @@ public class AnalyticsActivator extends KillbillActivatorBase {
         if (jobsScheduler != null) {
             jobsScheduler.shutdownNow();
         }
+        if (reportsUserApi != null) {
+            reportsUserApi.shutdownNow();
+        }
         super.stop(context);
     }
 
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
index 607b2c8..995ee80 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
@@ -16,7 +16,7 @@
 
 package com.ning.billing.osgi.bundles.analytics;
 
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
@@ -28,13 +28,17 @@ import com.google.common.annotations.VisibleForTesting;
 public class BusinessExecutor {
 
     @VisibleForTesting
-    static final Integer NB_THREADS = Integer.valueOf(System.getProperty("com.ning.billing.osgi.bundles.analytics.nb_threads", "100"));
+    static final Integer NB_THREADS = Integer.valueOf(System.getProperty("com.ning.billing.osgi.bundles.analytics.refresh.nb_threads", "100"));
 
-    public static Executor newCachedThreadPool() {
+    public static ExecutorService newCachedThreadPool() {
+        return newCachedThreadPool(NB_THREADS, "osgi-analytics-refresh");
+    }
+
+    public static ExecutorService newCachedThreadPool(final int nbThreads, final String name) {
         // Note: we don't use the default rejection handler here (AbortPolicy) as we always want the tasks to be executed
         return Executors.newCachedThreadPool(0,
-                                             NB_THREADS,
-                                             "osgi-analytics-refresh",
+                                             nbThreads,
+                                             name,
                                              60L,
                                              TimeUnit.SECONDS,
                                              new CallerRunsPolicy());
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/json/XY.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/json/XY.java
index fc3dee1..638a597 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/json/XY.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/json/XY.java
@@ -16,6 +16,9 @@
 
 package com.ning.billing.osgi.bundles.analytics.json;
 
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
@@ -24,20 +27,33 @@ public class XY {
     private final String x;
     private final Float y;
 
+    private final DateTime xDate;
+
     @JsonCreator
     public XY(@JsonProperty("x") final String x, @JsonProperty("y") final Float y) {
         this.x = x;
         this.y = y;
+        this.xDate = new DateTime(x, DateTimeZone.UTC);
     }
 
     public XY(final String x, final Integer y) {
         this(x, new Float(y.doubleValue()));
     }
 
+    public XY(final DateTime xDate, final Float y) {
+        this.x = xDate.toString();
+        this.y = y;
+        this.xDate = xDate;
+    }
+
     public String getX() {
         return x;
     }
 
+    public DateTime getxDate() {
+        return xDate;
+    }
+
     public Float getY() {
         return y;
     }
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/reports/ReportsUserApi.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/reports/ReportsUserApi.java
index 945bd54..f7b18d4 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/reports/ReportsUserApi.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/reports/ReportsUserApi.java
@@ -18,13 +18,14 @@ package com.ning.billing.osgi.bundles.analytics.reports;
 
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import javax.annotation.Nullable;
 
@@ -34,17 +35,23 @@ import org.joda.time.LocalDate;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
 
+import com.ning.billing.osgi.bundles.analytics.BusinessExecutor;
 import com.ning.billing.osgi.bundles.analytics.dao.BusinessDBIProvider;
 import com.ning.billing.osgi.bundles.analytics.json.NamedXYTimeSeries;
 import com.ning.billing.osgi.bundles.analytics.json.XY;
 import com.ning.killbill.osgi.libs.killbill.OSGIKillbillDataSource;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 
 public class ReportsUserApi {
 
+    private static final Integer NB_THREADS = Integer.valueOf(System.getProperty("com.ning.billing.osgi.bundles.analytics.dashboard.nb_threads", "10"));
     private static final String NO_PIVOT = "____NO_PIVOT____";
 
+    private final ExecutorService dbiThreadsExecutor = BusinessExecutor.newCachedThreadPool(NB_THREADS, "osgi-analytics-dashboard");
+
     private final IDBI dbi;
     private final ReportsConfiguration reportsConfiguration;
 
@@ -54,26 +61,26 @@ public class ReportsUserApi {
         dbi = BusinessDBIProvider.get(osgiKillbillDataSource.getDataSource());
     }
 
-    public List<NamedXYTimeSeries> getTimeSeriesDataForReport(final String[] reportNames) {
-        return getTimeSeriesDataForReport(reportNames, null, null);
+    public void shutdownNow() {
+        dbiThreadsExecutor.shutdownNow();
     }
 
-    public List<NamedXYTimeSeries> getTimeSeriesDataForReport(final String[] reportNames, @Nullable final LocalDate startDate, @Nullable final LocalDate endDate) {
+    public List<NamedXYTimeSeries> getTimeSeriesDataForReport(final String[] reportNames,
+                                                              @Nullable final LocalDate startDate,
+                                                              @Nullable final LocalDate endDate) {
         // Mapping of report name -> pivots -> data
-        final Map<String, Map<String, List<XY>>> dataForReports = new LinkedHashMap<String, Map<String, List<XY>>>();
+        final Map<String, Map<String, List<XY>>> dataForReports = new ConcurrentHashMap<String, Map<String, List<XY>>>();
 
-        // TODO parallel
-        for (final String reportName : reportNames) {
-            final String tableName = reportsConfiguration.getTableNameForReport(reportName);
-            if (tableName != null) {
-                final Map<String, List<XY>> data = getData(tableName);
-                dataForReports.put(reportName, data);
-            }
-        }
+        // Fetch the data
+        fetchData(reportNames, dataForReports);
 
-        normalizeXValues(dataForReports);
+        // Filter the data first
         filterValues(dataForReports, startDate, endDate);
 
+        // Normalize the data
+        normalizeXValues(dataForReports, startDate, endDate);
+
+        // Build the named timeseries
         final List<NamedXYTimeSeries> results = new LinkedList<NamedXYTimeSeries>();
         for (final String reportName : dataForReports.keySet()) {
             // Sort the pivots by name for a consistent display in the dashboard
@@ -89,63 +96,123 @@ public class ReportsUserApi {
                 results.add(new NamedXYTimeSeries(timeSeriesName, timeSeries));
             }
         }
+
         return results;
     }
 
-    private void filterValues(final Map<String, Map<String, List<XY>>> dataForReports, @Nullable final LocalDate startDate, @Nullable final LocalDate endDate) {
-        for (final Map<String, List<XY>> dataForReport : dataForReports.values()) {
-            for (final List<XY> dataForPivot : dataForReport.values()) {
-                final Iterator<XY> iterator = dataForPivot.iterator();
-                while (iterator.hasNext()) {
-                    final XY xy = iterator.next();
-                    if (startDate != null && new DateTime(xy.getX(), DateTimeZone.UTC).toLocalDate().isBefore(startDate) ||
-                        endDate != null && new DateTime(xy.getX(), DateTimeZone.UTC).toLocalDate().isAfter(endDate)) {
-                        iterator.remove();
+    private void fetchData(final String[] reportNames, final Map<String, Map<String, List<XY>>> dataForReports) {
+        final List<Future> jobs = new LinkedList<Future>();
+        for (final String reportName : reportNames) {
+            final String tableName = reportsConfiguration.getTableNameForReport(reportName);
+            if (tableName != null) {
+                jobs.add(dbiThreadsExecutor.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        final Map<String, List<XY>> data = getData(tableName);
+                        dataForReports.put(reportName, data);
                     }
-                }
+                }));
+            }
+        }
+
+        for (final Future job : jobs) {
+            try {
+                job.get();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } catch (ExecutionException e) {
+                throw new RuntimeException(e);
             }
         }
     }
 
-    private void normalizeXValues(final Map<String, Map<String, List<XY>>> dataForReports) {
-        final Set<String> xValues = new HashSet<String>();
+    private void filterValues(final Map<String, Map<String, List<XY>>> dataForReports, @Nullable final LocalDate startDate, @Nullable final LocalDate endDate) {
+        if (startDate == null && endDate == null) {
+            return;
+        }
+
         for (final Map<String, List<XY>> dataForReport : dataForReports.values()) {
             for (final List<XY> dataForPivot : dataForReport.values()) {
-                for (final XY xy : dataForPivot) {
-                    xValues.add(xy.getX());
-                }
+                Iterables.removeIf(dataForPivot,
+                                   new Predicate<XY>() {
+                                       @Override
+                                       public boolean apply(final XY xy) {
+                                           return startDate != null && xy.getxDate().toLocalDate().isBefore(startDate) ||
+                                                  endDate != null && xy.getxDate().toLocalDate().isAfter(endDate);
+                                       }
+                                   });
             }
         }
+    }
 
-        for (final Map<String, List<XY>> dataForReport : dataForReports.values()) {
-            for (final List<XY> dataForPivot : dataForReport.values()) {
-                for (final String x : xValues) {
-                    if (!hasX(dataForPivot, x)) {
-                        dataForPivot.add(new XY(x, 0));
+    // TODO PIERRE Naive implementation
+    private void normalizeXValues(final Map<String, Map<String, List<XY>>> dataForReports, @Nullable final LocalDate startDate, @Nullable final LocalDate endDate) {
+        DateTime minDate = null;
+        if (startDate != null) {
+            minDate = startDate.toDateTimeAtStartOfDay(DateTimeZone.UTC);
+        }
+
+        DateTime maxDate = null;
+        if (endDate != null) {
+            maxDate = endDate.toDateTimeAtStartOfDay(DateTimeZone.UTC);
+        }
+
+        // If no min and/or max was specified, infer them from the data
+        if (minDate == null || maxDate == null) {
+            for (final Map<String, List<XY>> dataForReport : dataForReports.values()) {
+                for (final List<XY> dataForPivot : dataForReport.values()) {
+                    for (final XY xy : dataForPivot) {
+                        if (minDate == null || xy.getxDate().isBefore(minDate)) {
+                            minDate = xy.getxDate();
+                        }
+                        if (maxDate == null || xy.getxDate().isAfter(maxDate)) {
+                            maxDate = xy.getxDate();
+                        }
                     }
                 }
             }
         }
 
+        if (minDate == null || maxDate == null) {
+            throw new IllegalStateException();
+        }
+
+        // Add 0 for missing days
+        DateTime curDate = minDate;
+        while (maxDate.isAfter(curDate)) {
+            for (final Map<String, List<XY>> dataForReport : dataForReports.values()) {
+                for (final List<XY> dataForPivot : dataForReport.values()) {
+                    addMissingValueForDateIfNeeded(curDate, dataForPivot);
+                }
+            }
+            curDate = curDate.plusDays(1);
+        }
+
+        // Sort the data for the dashboard
         for (final String reportName : dataForReports.keySet()) {
             for (final String pivotName : dataForReports.get(reportName).keySet()) {
-                Collections.sort(dataForReports.get(reportName).get(pivotName), new Comparator<XY>() {
-                    @Override
-                    public int compare(final XY o1, final XY o2) {
-                        return new DateTime(o1.getX(), DateTimeZone.UTC).compareTo(new DateTime(o2.getX(), DateTimeZone.UTC));
-                    }
-                });
+                Collections.sort(dataForReports.get(reportName).get(pivotName),
+                                 new Comparator<XY>() {
+                                     @Override
+                                     public int compare(final XY o1, final XY o2) {
+                                         return o1.getxDate().compareTo(o2.getxDate());
+                                     }
+                                 });
             }
         }
     }
 
-    private boolean hasX(final List<XY> values, final String x) {
-        for (final XY xy : values) {
-            if (xy.getX().equals(x)) {
-                return true;
+    private void addMissingValueForDateIfNeeded(final DateTime curDate, final List<XY> dataForPivot) {
+        final XY valueForCurrentDate = Iterables.tryFind(dataForPivot, new Predicate<XY>() {
+            @Override
+            public boolean apply(final XY xy) {
+                return xy.getxDate().compareTo(curDate) == 0;
             }
+        }).orNull();
+
+        if (valueForCurrentDate == null) {
+            dataForPivot.add(new XY(curDate, (float) 0));
         }
-        return false;
     }
 
     private Map<String, List<XY>> getData(final String tableName) {
diff --git a/osgi-bundles/bundles/analytics/src/main/resources/static/analytics.html b/osgi-bundles/bundles/analytics/src/main/resources/static/analytics.html
index fa59a85..c1a76ba 100644
--- a/osgi-bundles/bundles/analytics/src/main/resources/static/analytics.html
+++ b/osgi-bundles/bundles/analytics/src/main/resources/static/analytics.html
@@ -79,13 +79,15 @@
 
         // The URL structure is expected to be in the form: analytics.html?report1=new_trials_per_day&report1=cancellations_per_day&report2=conversions_per_day
         $(document).ready(function() {
+          // Set (sane?) default values for from and to if unspecified. This is to make sure all graphs will share the exact same X axis (the server will normalize the data)
+          var now = new Date();
           var from = $.url().param('startDate');
           if (!from) {
-            from = '';
+            from = now.getFullYear() + '-' + (now.getMonth() - 3) + '-' + now.getDay();
           }
           var to = $.url().param('endDate');
           if (!to) {
-            to = '';
+            to = now.getFullYear() + '-' + (now.getMonth() + 3) + '-' + now.getDay();
           }
 
           // Map of position (starting from the top) to an array of reports