thingsboard-aplcache

Details

diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
index 19a260f..10651cb 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
@@ -53,8 +53,9 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 @Slf4j
 public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao {
 
-    @Value("${cassandra.query.min_aggregation_step_ms}")
-    private int minAggregationStepMs;
+    //@Value("${cassandra.query.min_aggregation_step_ms}")
+    //TODO:
+    private int minAggregationStepMs = 1000;
 
     @Value("${cassandra.query.ts_key_value_partitioning}")
     private String partitioning;
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index 4bc7ae0..8385bf1 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -234,7 +234,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
         return new PluginCallback<List<TsKvEntry>>() {
             @Override
             public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
-                sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
+                sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), startTs, data));
 
                 Map<String, Long> subState = new HashMap<>(keys.size());
                 keys.forEach(key -> subState.put(key, startTs));
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionUpdate.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionUpdate.java
index 4d8cf53..8a9e7b2 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionUpdate.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionUpdate.java
@@ -26,10 +26,16 @@ public class SubscriptionUpdate {
     private int errorCode;
     private String errorMsg;
     private Map<String, List<Object>> data;
+    private long serverStartTs;
 
     public SubscriptionUpdate(int subscriptionId, List<TsKvEntry> data) {
+        this(subscriptionId, 0L, data);
+    }
+
+    public SubscriptionUpdate(int subscriptionId, long serverStartTs, List<TsKvEntry> data) {
         super();
         this.subscriptionId = subscriptionId;
+        this.serverStartTs = serverStartTs;
         this.data = new TreeMap<>();
         for (TsKvEntry tsEntry : data) {
             List<Object> values = this.data.get(tsEntry.getKey());
@@ -89,9 +95,13 @@ public class SubscriptionUpdate {
         return errorMsg;
     }
 
+    public long getServerStartTs() {
+        return serverStartTs;
+    }
+
     @Override
     public String toString() {
         return "SubscriptionUpdate [subscriptionId=" + subscriptionId + ", errorCode=" + errorCode + ", errorMsg=" + errorMsg + ", data="
-                + data + "]";
+                + data + ", serverStartTs=" + serverStartTs+ "]";
     }
 }
diff --git a/ui/src/app/api/data-aggregator.js b/ui/src/app/api/data-aggregator.js
new file mode 100644
index 0000000..314ba64
--- /dev/null
+++ b/ui/src/app/api/data-aggregator.js
@@ -0,0 +1,238 @@
+/*
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+export default class DataAggregator {
+
+    constructor(onDataCb, limit, aggregationType, timeWindow, types, $timeout, $filter) {
+        this.onDataCb = onDataCb;
+        this.aggregationType = aggregationType;
+        this.types = types;
+        this.$timeout = $timeout;
+        this.$filter = $filter;
+        this.dataReceived = false;
+        this.noAggregation = aggregationType === types.aggregation.none.value;
+        var interval = Math.floor(timeWindow / limit);
+        if (!this.noAggregation) {
+            this.interval = Math.max(interval, 1000);
+            this.limit = Math.ceil(interval/this.interval * limit);
+            this.timeWindow = this.interval * this.limit;
+        } else {
+            this.limit = limit;
+            this.timeWindow = interval * this.limit;
+            this.interval = 1000;
+        }
+        this.aggregationTimeout = this.interval;
+        switch (aggregationType) {
+            case types.aggregation.min.value:
+                this.aggFunction = min;
+                break;
+            case types.aggregation.max.value:
+                this.aggFunction = max
+                break;
+            case types.aggregation.avg.value:
+                this.aggFunction = avg;
+                break;
+            case types.aggregation.sum.value:
+                this.aggFunction = sum;
+                break;
+            case types.aggregation.count.value:
+                this.aggFunction = count;
+                break;
+            case types.aggregation.none.value:
+                this.aggFunction = none;
+                break;
+            default:
+                this.aggFunction = avg;
+        }
+    }
+
+    onData(data) {
+        if (!this.dataReceived) {
+            this.elapsed = 0;
+            this.dataReceived = true;
+            this.startTs = data.serverStartTs;
+            this.endTs = this.startTs + this.timeWindow;
+            this.aggregationMap = processAggregatedData(data.data, this.aggregationType === this.types.aggregation.count.value, this.noAggregation);
+            this.onInterval(currentTime());
+        } else {
+            updateAggregatedData(this.aggregationMap, this.aggregationType === this.types.aggregation.count.value,
+                this.noAggregation, this.aggFunction, data.data, this.interval, this.startTs);
+        }
+    }
+
+    onInterval(startedTime) {
+        var now = currentTime();
+        this.elapsed += now - startedTime;
+        if (this.intervalTimeoutHandle) {
+            this.$timeout.cancel(this.intervalTimeoutHandle);
+            this.intervalTimeoutHandle = null;
+        }
+        var delta = Math.floor(this.elapsed / this.interval);
+        if (delta || !this.data) {
+            this.startTs += delta * this.interval;
+            this.endTs += delta * this.interval;
+            this.data = toData(this.aggregationMap, this.startTs, this.endTs, this.$filter, this.limit);
+            this.elapsed = this.elapsed - delta * this.interval;
+        }
+        if (this.onDataCb) {
+            this.onDataCb(this.data, this.startTs, this.endTs);
+        }
+
+        var self = this;
+        this.intervalTimeoutHandle = this.$timeout(function() {
+            self.onInterval(now);
+        }, this.aggregationTimeout, false);
+    }
+
+    reset() {
+        this.destroy();
+        this.dataReceived = false;
+    }
+
+    destroy() {
+        if (this.intervalTimeoutHandle) {
+            this.$timeout.cancel(this.intervalTimeoutHandle);
+            this.intervalTimeoutHandle = null;
+        }
+        this.aggregationMap = null;
+    }
+
+}
+
+/* eslint-disable */
+function currentTime() {
+    return window.performance && window.performance.now ?
+        window.performance.now() : Date.now();
+}
+/* eslint-enable */
+
+function processAggregatedData(data, isCount, noAggregation) {
+    var aggregationMap = {};
+    for (var key in data) {
+        var aggKeyData = aggregationMap[key];
+        if (!aggKeyData) {
+            aggKeyData = {};
+            aggregationMap[key] = aggKeyData;
+        }
+        var keyData = data[key];
+        for (var i in keyData) {
+            var kvPair = keyData[i];
+            var timestamp = kvPair[0];
+            var value = convertValue(kvPair[1], noAggregation);
+            var aggKey = timestamp;
+            var aggData = {
+                count: isCount ? value : 1,
+                sum: value,
+                aggValue: value
+            }
+            aggKeyData[aggKey] = aggData;
+        }
+    }
+    return aggregationMap;
+}
+
+function updateAggregatedData(aggregationMap, isCount, noAggregation, aggFunction, data, interval, startTs) {
+    for (var key in data) {
+        var aggKeyData = aggregationMap[key];
+        if (!aggKeyData) {
+            aggKeyData = {};
+            aggregationMap[key] = aggKeyData;
+        }
+        var keyData = data[key];
+        for (var i in keyData) {
+            var kvPair = keyData[i];
+            var timestamp = kvPair[0];
+            var value = convertValue(kvPair[1], noAggregation);
+            var aggTimestamp = noAggregation ? timestamp : (startTs + Math.floor((timestamp - startTs) / interval) * interval + interval/2);
+            var aggData = aggKeyData[aggTimestamp];
+            if (!aggData) {
+                aggData = {
+                    count: 1,
+                    sum: value,
+                    aggValue: isCount ? 1 : value
+                }
+                aggKeyData[aggTimestamp] = aggData;
+            } else {
+                aggFunction(aggData, value);
+            }
+        }
+    }
+}
+
+function toData(aggregationMap, startTs, endTs, $filter, limit) {
+    var data = {};
+    for (var key in aggregationMap) {
+        if (!data[key]) {
+            data[key] = [];
+        }
+        var aggKeyData = aggregationMap[key];
+        var keyData = data[key];
+        for (var aggTimestamp in aggKeyData) {
+            if (aggTimestamp <= startTs) {
+                delete aggKeyData[aggTimestamp];
+            } else if (aggTimestamp <= endTs) {
+                var aggData = aggKeyData[aggTimestamp];
+                var kvPair = [aggTimestamp, aggData.aggValue];
+                keyData.push(kvPair);
+            }
+        }
+        keyData = $filter('orderBy')(keyData, '+this[0]');
+        if (keyData.length > limit) {
+            keyData = keyData.slice(keyData.length - limit);
+        }
+        data[key] = keyData;
+    }
+    return data;
+}
+
+function convertValue(value, noAggregation) {
+    if (!noAggregation || value && isNumeric(value)) {
+        return Number(value);
+    } else {
+        return value;
+    }
+}
+
+function isNumeric(value) {
+    return (value - parseFloat( value ) + 1) >= 0;
+}
+
+function avg(aggData, value) {
+    aggData.count++;
+    aggData.sum += value;
+    aggData.aggValue = aggData.sum / aggData.count;
+}
+
+function min(aggData, value) {
+    aggData.aggValue = Math.min(aggData.aggValue, value);
+}
+
+function max(aggData, value) {
+    aggData.aggValue = Math.max(aggData.aggValue, value);
+}
+
+function sum(aggData, value) {
+    aggData.aggValue = aggData.aggValue + value;
+}
+
+function count(aggData) {
+    aggData.count++;
+    aggData.aggValue = aggData.count;
+}
+
+function none(aggData, value) {
+    aggData.aggValue = value;
+}
diff --git a/ui/src/app/api/datasource.service.js b/ui/src/app/api/datasource.service.js
index 565f511..acfe124 100644
--- a/ui/src/app/api/datasource.service.js
+++ b/ui/src/app/api/datasource.service.js
@@ -17,13 +17,14 @@ import thingsboardApiDevice from './device.service';
 import thingsboardApiTelemetryWebsocket from './telemetry-websocket.service';
 import thingsboardTypes from '../common/types.constant';
 import thingsboardUtils from '../common/utils.service';
+import DataAggregator from './data-aggregator';
 
 export default angular.module('thingsboard.api.datasource', [thingsboardApiDevice, thingsboardApiTelemetryWebsocket, thingsboardTypes, thingsboardUtils])
     .factory('datasourceService', DatasourceService)
     .name;
 
 /*@ngInject*/
-function DatasourceService($timeout, $log, telemetryWebsocketService, types, utils) {
+function DatasourceService($timeout, $filter, $log, telemetryWebsocketService, types, utils) {
 
     var subscriptions = {};
 
@@ -73,7 +74,7 @@ function DatasourceService($timeout, $log, telemetryWebsocketService, types, uti
             subscription = subscriptions[listener.datasourceSubscriptionKey];
             subscription.syncListener(listener);
         } else {
-            subscription = new DatasourceSubscription(datasourceSubscription, telemetryWebsocketService, $timeout, $log, types, utils);
+            subscription = new DatasourceSubscription(datasourceSubscription, telemetryWebsocketService, $timeout, $filter, $log, types, utils);
             subscriptions[listener.datasourceSubscriptionKey] = subscription;
             subscription.start();
         }
@@ -96,7 +97,7 @@ function DatasourceService($timeout, $log, telemetryWebsocketService, types, uti
 
 }
 
-function DatasourceSubscription(datasourceSubscription, telemetryWebsocketService, $timeout, $log, types, utils) {
+function DatasourceSubscription(datasourceSubscription, telemetryWebsocketService, $timeout, $filter, $log, types, utils) {
 
     var listeners = [];
     var datasourceType = datasourceSubscription.datasourceType;
@@ -134,7 +135,9 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic
                 if (!dataKey.func) {
                     dataKey.func = new Function("time", "prevValue", dataKey.funcBody);
                 }
-                datasourceData[key] = [];
+                datasourceData[key] = {
+                    data: []
+                };
                 dataKeys[key] = dataKey;
             } else if (datasourceType === types.datasourceType.device) {
                 key = dataKey.name + '_' + dataKey.type;
@@ -147,7 +150,9 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic
                     dataKeys[key] = dataKeysList;
                 }
                 var index = dataKeysList.push(dataKey) - 1;
-                datasourceData[key + '_' + index] = [];
+                datasourceData[key + '_' + index] = {
+                    data: []
+                };
             }
             dataKey.key = key;
         }
@@ -248,14 +253,18 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic
                         deviceId: datasourceSubscription.deviceId,
                         keys: tsKeys,
                         startTs: datasourceSubscription.subscriptionTimewindow.fixedWindow.startTimeMs,
-                        endTs: datasourceSubscription.subscriptionTimewindow.fixedWindow.endTimeMs
+                        endTs: datasourceSubscription.subscriptionTimewindow.fixedWindow.endTimeMs,
+                        limit: datasourceSubscription.subscriptionTimewindow.aggregation.limit,
+                        agg: datasourceSubscription.subscriptionTimewindow.aggregation.type
                     };
 
                     subscriber = {
                         historyCommand: historyCommand,
                         type: types.dataKeyType.timeseries,
                         onData: function (data) {
-                            onData(data, types.dataKeyType.timeseries);
+                            if (data.data) {
+                                onData(data.data, types.dataKeyType.timeseries);
+                            }
                         },
                         onReconnected: function() {
                             onReconnected();
@@ -272,20 +281,46 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic
                         keys: tsKeys
                     };
 
-                    if (datasourceSubscription.type === types.widgetType.timeseries.value) {
-                        subscriptionCommand.timeWindow = datasourceSubscription.subscriptionTimewindow.realtimeWindowMs;
-                    }
-
                     subscriber = {
                         subscriptionCommand: subscriptionCommand,
-                        type: types.dataKeyType.timeseries,
-                        onData: function (data) {
-                            onData(data, types.dataKeyType.timeseries);
-                        },
-                        onReconnected: function() {
+                        type: types.dataKeyType.timeseries
+                    };
+
+                    if (datasourceSubscription.type === types.widgetType.timeseries.value) {
+                        subscriptionCommand.timeWindow = datasourceSubscription.subscriptionTimewindow.realtimeWindowMs;
+                        subscriptionCommand.limit = datasourceSubscription.subscriptionTimewindow.aggregation.limit;
+                        subscriptionCommand.agg = datasourceSubscription.subscriptionTimewindow.aggregation.type;
+                        var dataAggregator = new DataAggregator(
+                            function(data, startTs, endTs) {
+                                onData(data, types.dataKeyType.timeseries, startTs, endTs);
+                            },
+                            subscriptionCommand.limit,
+                            subscriptionCommand.agg,
+                            subscriptionCommand.timeWindow,
+                            types,
+                            $timeout,
+                            $filter
+                        );
+                        subscriber.onData = function(data) {
+                            dataAggregator.onData(data);
+                        }
+                        subscriber.onReconnected = function() {
+                            dataAggregator.reset();
                             onReconnected();
                         }
-                    };
+                        subscriber.onDestroy = function() {
+                            dataAggregator.destroy();
+                        }
+                    } else {
+                        subscriber.onReconnected = function() {
+                            onReconnected();
+                        }
+                        subscriber.onData = function(data) {
+                            if (data.data) {
+                                onData(data.data, types.dataKeyType.timeseries);
+                            }
+                        }
+                    }
 
                     telemetryWebsocketService.subscribe(subscriber);
                     subscribers[subscriber.subscriptionCommand.cmdId] = subscriber;
@@ -304,7 +339,9 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic
                     subscriptionCommand: subscriptionCommand,
                     type: types.dataKeyType.attribute,
                     onData: function (data) {
-                        onData(data, types.dataKeyType.attribute);
+                        if (data.data) {
+                            onData(data.data, types.dataKeyType.attribute);
+                        }
                     },
                     onReconnected: function() {
                         onReconnected();
@@ -332,11 +369,14 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic
         }
         if (datasourceType === types.datasourceType.device) {
             for (var cmdId in subscribers) {
-                telemetryWebsocketService.unsubscribe(subscribers[cmdId]);
+                var subscriber = subscribers[cmdId];
+                telemetryWebsocketService.unsubscribe(subscriber);
+                if (subscriber.onDestroy) {
+                    subscriber.onDestroy();
+                }
             }
             subscribers = {};
         }
-        //$log.debug("unsibscribed!");
     }
 
     function boundToInterval(data, timewindowMs) {
@@ -360,7 +400,7 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic
     function generateSeries(dataKey, startTime, endTime) {
         var data = [];
         var prevSeries;
-        var datasourceKeyData = datasourceData[dataKey.key];
+        var datasourceKeyData = datasourceData[dataKey.key].data;
         if (datasourceKeyData.length > 0) {
             prevSeries = datasourceKeyData[datasourceKeyData.length - 1];
         } else {
@@ -378,10 +418,10 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic
             dataKey.lastUpdateTime = data[data.length - 1][0];
         }
         if (realtime) {
-            datasourceData[dataKey.key] = boundToInterval(datasourceKeyData.concat(data),
+            datasourceData[dataKey.key].data = boundToInterval(datasourceKeyData.concat(data),
                 datasourceSubscription.subscriptionTimewindow.realtimeWindowMs);
         } else {
-            datasourceData[dataKey.key] = data;
+            datasourceData[dataKey.key].data = data;
         }
         for (var i in listeners) {
             var listener = listeners[i];
@@ -393,7 +433,7 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic
 
     function generateLatest(dataKey) {
         var prevSeries;
-        var datasourceKeyData = datasourceData[dataKey.key];
+        var datasourceKeyData = datasourceData[dataKey.key].data;
         if (datasourceKeyData.length > 0) {
             prevSeries = datasourceKeyData[datasourceKeyData.length - 1];
         } else {
@@ -404,7 +444,7 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic
         series.push(time);
         var value = dataKey.func(time, prevSeries[1]);
         series.push(value);
-        datasourceData[dataKey.key] = [series];
+        datasourceData[dataKey.key].data = [series];
         for (var i in listeners) {
             var listener = listeners[i];
             listener.dataUpdated(datasourceData[dataKey.key],
@@ -453,7 +493,9 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic
                 for (var i = 0; i < dataKeysList.length; i++) {
                     var dataKey = dataKeysList[i];
                     var datasourceKey = key + '_' + i;
-                    datasourceData[datasourceKey] = [];
+                    datasourceData[datasourceKey] = {
+                        data: []
+                    };
                     for (var l in listeners) {
                         var listener = listeners[l];
                         listener.dataUpdated(datasourceData[datasourceKey],
@@ -477,18 +519,23 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic
         }
     }
 
-    function onData(sourceData, type) {
+    function onData(sourceData, type, startTs, endTs) {
         for (var keyName in sourceData) {
             var keyData = sourceData[keyName];
             var key = keyName + '_' + type;
             var dataKeyList = dataKeys[key];
             for (var keyIndex = 0; keyIndex < dataKeyList.length; keyIndex++) {
                 var datasourceKey = key + "_" + keyIndex;
-                if (datasourceData[datasourceKey]) {
+                if (datasourceData[datasourceKey].data) {
                     var dataKey = dataKeyList[keyIndex];
                     var data = [];
                     var prevSeries;
-                    var datasourceKeyData = datasourceData[datasourceKey];
+                    var datasourceKeyData;
+                    if (realtime) {
+                        datasourceKeyData = [];
+                    } else {
+                        datasourceKeyData = datasourceData[datasourceKey].data;
+                    }
                     if (datasourceKeyData.length > 0) {
                         prevSeries = datasourceKeyData[datasourceKeyData.length - 1];
                     } else {
@@ -519,12 +566,10 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic
                             data.push(series);
                         }
                     }
-                    if (data.length > 0) {
-                        if (realtime) {
-                            datasourceData[datasourceKey] = boundToInterval(datasourceKeyData.concat(data), datasourceSubscription.subscriptionTimewindow.realtimeWindowMs);
-                        } else {
-                            datasourceData[datasourceKey] = data;
-                        }
+                    if (data.length > 0 || (startTs && endTs)) {
+                        datasourceData[datasourceKey].data = data;
+                        datasourceData[datasourceKey].startTs = startTs;
+                        datasourceData[datasourceKey].endTs = endTs;
                         for (var i2 in listeners) {
                             var listener = listeners[i2];
                             listener.dataUpdated(datasourceData[datasourceKey],
@@ -537,3 +582,4 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic
         }
     }
 }
+
diff --git a/ui/src/app/api/device.service.js b/ui/src/app/api/device.service.js
index 46c64da..1680058 100644
--- a/ui/src/app/api/device.service.js
+++ b/ui/src/app/api/device.service.js
@@ -304,7 +304,9 @@ function DeviceService($http, $q, $filter, telemetryWebsocketService, types) {
                 subscriptionCommand: subscriptionCommand,
                 type: type,
                 onData: function (data) {
-                    onSubscriptionData(data, subscriptionId);
+                    if (data.data) {
+                        onSubscriptionData(data.data, subscriptionId);
+                    }
                 }
             };
             deviceAttributesSubscription = {
diff --git a/ui/src/app/api/telemetry-websocket.service.js b/ui/src/app/api/telemetry-websocket.service.js
index 353c736..e504220 100644
--- a/ui/src/app/api/telemetry-websocket.service.js
+++ b/ui/src/app/api/telemetry-websocket.service.js
@@ -131,8 +131,8 @@ function TelemetryWebsocketService($rootScope, $websocket, $timeout, $window, ty
             var data = angular.fromJson(message.data);
             if (data.subscriptionId) {
                 var subscriber = subscribers[data.subscriptionId];
-                if (subscriber && data.data) {
-                    subscriber.onData(data.data);
+                if (subscriber && data) {
+                    subscriber.onData(data);
                 }
             }
         }
diff --git a/ui/src/app/common/types.constant.js b/ui/src/app/common/types.constant.js
index 5314038..79eb2c2 100644
--- a/ui/src/app/common/types.constant.js
+++ b/ui/src/app/common/types.constant.js
@@ -33,6 +33,32 @@ export default angular.module('thingsboard.types', [])
             id: {
                 nullUid: "13814000-1dd2-11b2-8080-808080808080",
             },
+            aggregation: {
+                min: {
+                    value: "MIN",
+                    name: "aggregation.min"
+                },
+                max: {
+                    value: "MAX",
+                    name: "aggregation.max"
+                },
+                avg: {
+                    value: "AVG",
+                    name: "aggregation.avg"
+                },
+                sum: {
+                    value: "SUM",
+                    name: "aggregation.sum"
+                },
+                count: {
+                    value: "COUNT",
+                    name: "aggregation.count"
+                },
+                none: {
+                    value: "NONE",
+                    name: "aggregation.none"
+                }
+            },
             datasourceType: {
                 function: "function",
                 device: "device"
diff --git a/ui/src/app/components/dashboard.tpl.html b/ui/src/app/components/dashboard.tpl.html
index a2110cc..0e367e9 100644
--- a/ui/src/app/components/dashboard.tpl.html
+++ b/ui/src/app/components/dashboard.tpl.html
@@ -47,7 +47,7 @@
             									   padding: vm.widgetPadding(widget)}">
 								<div class="tb-widget-title" layout="column" ng-show="vm.showWidgetTitle(widget) || vm.hasTimewindow(widget)">
 									<span ng-show="vm.showWidgetTitle(widget)" ng-style="vm.widgetTitleStyle(widget)" class="md-subhead">{{widget.config.title}}</span>
-									<tb-timewindow button-color="vm.widgetColor(widget)" ng-if="vm.hasTimewindow(widget)" ng-model="widget.config.timewindow"></tb-timewindow>
+									<tb-timewindow button-color="vm.widgetColor(widget)" aggregation ng-if="vm.hasTimewindow(widget)" ng-model="widget.config.timewindow"></tb-timewindow>
 								</div>
 								<div class="tb-widget-actions" layout="row" layout-align="start center">
 									<md-button id="expand-button"
diff --git a/ui/src/app/components/timewindow.directive.js b/ui/src/app/components/timewindow.directive.js
index d3be637..f06c119 100644
--- a/ui/src/app/components/timewindow.directive.js
+++ b/ui/src/app/components/timewindow.directive.js
@@ -15,6 +15,7 @@
  */
 import './timewindow.scss';
 
+import $ from 'jquery';
 import thingsboardTimeinterval from './timeinterval.directive';
 import thingsboardDatetimePeriod from './datetime-period.directive';
 
@@ -34,8 +35,9 @@ export default angular.module('thingsboard.directives.timewindow', [thingsboardT
     .filter('milliSecondsToTimeString', MillisecondsToTimeString)
     .name;
 
+/* eslint-disable angular/angularelement */
 /*@ngInject*/
-function Timewindow($compile, $templateCache, $filter, $mdPanel, $document, $translate) {
+function Timewindow($compile, $templateCache, $filter, $mdPanel, $document, $mdMedia, $translate, types) {
 
     var linker = function (scope, element, attrs, ngModelCtrl) {
 
@@ -50,12 +52,18 @@ function Timewindow($compile, $templateCache, $filter, $mdPanel, $document, $tra
          * 				startTimeMs: 0,
          * 				endTimeMs: 0
          * 			}
+         * 	  },
+         * 	  aggregation: {
+         * 	        limit: 200,
+         * 	        type: types.aggregation.avg.value
          * 	  }
          * }
          */
 
         scope.historyOnly = angular.isDefined(attrs.historyOnly);
 
+        scope.aggregation = angular.isDefined(attrs.aggregation);
+
         var translationPending = false;
 
         $translate.onReady(function() {
@@ -84,9 +92,27 @@ function Timewindow($compile, $templateCache, $filter, $mdPanel, $document, $tra
         }
 
         scope.openEditMode = function (event) {
-            var position = $mdPanel.newPanelPosition()
-                .relativeTo(element)
-                .addPanelPosition($mdPanel.xPosition.ALIGN_START, $mdPanel.yPosition.BELOW);
+            var position;
+            var isGtSm = $mdMedia('gt-sm');
+            if (isGtSm) {
+                var panelHeight = 375;
+                var offset = element[0].getBoundingClientRect();
+                var bottomY = offset.bottom - $(window).scrollTop(); //eslint-disable-line
+                var yPosition;
+                if (bottomY + panelHeight > $( window ).height()) { //eslint-disable-line
+                    yPosition = $mdPanel.yPosition.ABOVE;
+                } else {
+                    yPosition = $mdPanel.yPosition.BELOW;
+                }
+                position = $mdPanel.newPanelPosition()
+                    .relativeTo(element)
+                    .addPanelPosition($mdPanel.xPosition.ALIGN_START, yPosition);
+            } else {
+                position = $mdPanel.newPanelPosition()
+                    .absolute()
+                    .top('0%')
+                    .left('0%');
+            }
             var config = {
                 attachTo: angular.element($document[0].body),
                 controller: 'TimewindowPanelController',
@@ -94,9 +120,11 @@ function Timewindow($compile, $templateCache, $filter, $mdPanel, $document, $tra
                 templateUrl: timewindowPanelTemplate,
                 panelClass: 'tb-timewindow-panel',
                 position: position,
+                fullscreen: !isGtSm,
                 locals: {
                     'timewindow': angular.copy(scope.model),
                     'historyOnly': scope.historyOnly,
+                    'aggregation': scope.aggregation,
                     'onTimewindowUpdate': function (timewindow) {
                         scope.model = timewindow;
                         scope.updateView();
@@ -131,7 +159,10 @@ function Timewindow($compile, $templateCache, $filter, $mdPanel, $document, $tra
                     };
                 }
             }
-
+            value.aggregation = {
+                limit: model.aggregation.limit,
+                type: model.aggregation.type
+            };
             ngModelCtrl.$setViewValue(value);
             scope.updateDisplayValue();
         }
@@ -173,6 +204,10 @@ function Timewindow($compile, $templateCache, $filter, $mdPanel, $document, $tra
                         startTimeMs: currentTime - 24 * 60 * 60 * 1000, // 1 day by default
                         endTimeMs: currentTime
                     }
+                },
+                aggregation: {
+                    limit: 200,
+                    type: types.aggregation.avg.value
                 }
             };
             if (ngModelCtrl.$viewValue) {
@@ -192,6 +227,12 @@ function Timewindow($compile, $templateCache, $filter, $mdPanel, $document, $tra
                         model.history.fixedTimewindow.endTimeMs = value.history.fixedTimewindow.endTimeMs;
                     }
                 }
+                if (angular.isDefined(value.aggregation)) {
+                    model.aggregation.limit = value.aggregation.limit || 200;
+                    if (angular.isDefined(value.aggregation.type) && value.aggregation.type.length > 0) {
+                        model.aggregation.type = value.aggregation.type;
+                    }
+                }
             }
             scope.updateDisplayValue();
         };
@@ -240,4 +281,5 @@ function MillisecondsToTimeString($translate) {
         }
         return timeString;
     }
-}
\ No newline at end of file
+}
+/* eslint-enable angular/angularelement */
\ No newline at end of file
diff --git a/ui/src/app/components/timewindow.scss b/ui/src/app/components/timewindow.scss
index fc4a991..16c89e8 100644
--- a/ui/src/app/components/timewindow.scss
+++ b/ui/src/app/components/timewindow.scss
@@ -13,8 +13,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+.md-panel {
+  &.tb-timewindow-panel {
+    position: absolute;
+  }
+}
+
 .tb-timewindow-panel {
-  position: absolute;
+  min-height: 375px;
   background: white;
   border-radius: 4px;
   box-shadow: 0 7px 8px -4px rgba(0, 0, 0, 0.2),
diff --git a/ui/src/app/components/timewindow-panel.controller.js b/ui/src/app/components/timewindow-panel.controller.js
index f08485c..ab81e9d 100644
--- a/ui/src/app/components/timewindow-panel.controller.js
+++ b/ui/src/app/components/timewindow-panel.controller.js
@@ -14,14 +14,16 @@
  * limitations under the License.
  */
 /*@ngInject*/
-export default function TimewindowPanelController(mdPanelRef, $scope, timewindow, historyOnly, onTimewindowUpdate) {
+export default function TimewindowPanelController(mdPanelRef, $scope, types, timewindow, historyOnly, aggregation, onTimewindowUpdate) {
 
     var vm = this;
 
     vm._mdPanelRef = mdPanelRef;
     vm.timewindow = timewindow;
     vm.historyOnly = historyOnly;
+    vm.aggregation = aggregation;
     vm.onTimewindowUpdate = onTimewindowUpdate;
+    vm.aggregationTypes = types.aggregation;
 
     if (vm.historyOnly) {
         vm.timewindow.selectedTab = 1;
diff --git a/ui/src/app/components/timewindow-panel.tpl.html b/ui/src/app/components/timewindow-panel.tpl.html
index e99b06d..89825b0 100644
--- a/ui/src/app/components/timewindow-panel.tpl.html
+++ b/ui/src/app/components/timewindow-panel.tpl.html
@@ -17,7 +17,7 @@
 -->
 <form name="theForm" ng-submit="vm.update()">
 	<fieldset ng-disabled="loading">
-		<section layout="column">
+		<md-content layout="column">
 			<md-tabs ng-class="{'tb-headless': vm.historyOnly}" flex md-dynamic-height md-selected="vm.timewindow.selectedTab" md-border-bottom>
 				<md-tab label="{{ 'timewindow.realtime' | translate }}">
 					<md-content class="md-padding" layout="column">
@@ -52,6 +52,24 @@
 					</md-content>
 				</md-tab>
 			</md-tabs>
+			<md-content ng-if="vm.aggregation" class="md-padding" layout="column">
+				<md-input-container>
+					<label translate>aggregation.function</label>
+					<md-select ng-model="vm.timewindow.aggregation.type" style="min-width: 150px;">
+						<md-option ng-repeat="type in vm.aggregationTypes" ng-value="type.value">
+							{{type.name | translate}}
+						</md-option>
+					</md-select>
+				</md-input-container>
+				<md-slider-container>
+					<span translate>aggregation.limit</span>
+					<md-slider flex min="10" max="500" ng-model="vm.timewindow.aggregation.limit" aria-label="limit" id="limit-slider">
+					</md-slider>
+					<md-input-container>
+						<input flex type="number" ng-model="vm.timewindow.aggregation.limit" aria-label="limit" aria-controls="limit-slider">
+					</md-input-container>
+				</md-slider-container>
+			</md-content>
 			<section layout="row" layout-alignment="start center">
 			      <span flex></span>
 				  <md-button ng-disabled="loading || theForm.$invalid || !theForm.$dirty" type="submit" class="md-raised md-primary">
diff --git a/ui/src/app/components/widget.controller.js b/ui/src/app/components/widget.controller.js
index cb759a6..cdcdbfb 100644
--- a/ui/src/app/components/widget.controller.js
+++ b/ui/src/app/components/widget.controller.js
@@ -43,9 +43,9 @@ export default function WidgetController($scope, $timeout, $window, $element, $q
     var originalTimewindow = null;
     var subscriptionTimewindow = {
         fixedWindow: null,
-        realtimeWindowMs: null
+        realtimeWindowMs: null,
+        aggregation: null
     };
-    var timer = null;
     var dataUpdateTimer = null;
     var dataUpdateCaf = null;
 
@@ -154,10 +154,10 @@ export default function WidgetController($scope, $timeout, $window, $element, $q
         }
     }
 
-    function updateTimewindow() {
+    function updateTimewindow(startTs, endTs) {
         if (subscriptionTimewindow.realtimeWindowMs) {
-            widgetContext.timeWindow.maxTime = (new Date).getTime();
-            widgetContext.timeWindow.minTime = widgetContext.timeWindow.maxTime - subscriptionTimewindow.realtimeWindowMs;
+            widgetContext.timeWindow.maxTime = endTs || (new Date).getTime();
+            widgetContext.timeWindow.minTime = startTs || (widgetContext.timeWindow.maxTime - subscriptionTimewindow.realtimeWindowMs);
         } else if (subscriptionTimewindow.fixedWindow) {
             widgetContext.timeWindow.maxTime = subscriptionTimewindow.fixedWindow.endTimeMs;
             widgetContext.timeWindow.minTime = subscriptionTimewindow.fixedWindow.startTimeMs;
@@ -170,13 +170,6 @@ export default function WidgetController($scope, $timeout, $window, $element, $q
             dataUpdateTimer = null;
         }
         if (widgetContext.inited) {
-            if (widget.type === types.widgetType.timeseries.value) {
-                if (!widgetContext.tickUpdate && timer) {
-                    $timeout.cancel(timer);
-                    timer = $timeout(onTick, 1500, false);
-                }
-                updateTimewindow();
-            }
             if (dataUpdateCaf) {
                 dataUpdateCaf();
                 dataUpdateCaf = null;
@@ -188,7 +181,6 @@ export default function WidgetController($scope, $timeout, $window, $element, $q
                         handleWidgetException(e);
                     }
                 });
-            widgetContext.tickUpdate = false;
         } else {
             widgetContext.dataUpdatePending = true;
         }
@@ -512,17 +504,20 @@ export default function WidgetController($scope, $timeout, $window, $element, $q
         var update = true;
         if (widget.type === types.widgetType.latest.value) {
             var prevData = widgetContext.data[datasourceIndex + dataKeyIndex].data;
-            if (prevData && prevData[0] && prevData[0].length > 1 && sourceData.length > 0) {
+            if (prevData && prevData[0] && prevData[0].length > 1 && sourceData.data.length > 0) {
                 var prevValue = prevData[0][1];
-                if (prevValue === sourceData[0][1]) {
+                if (prevValue === sourceData.data[0][1]) {
                     update = false;
                 }
             }
         }
         if (update) {
-            widgetContext.data[datasourceIndex + dataKeyIndex].data = sourceData;
+            if (subscriptionTimewindow.realtimeWindowMs) {
+                updateTimewindow(sourceData.startTs, sourceData.endTs);
+            }
+            widgetContext.data[datasourceIndex + dataKeyIndex].data = sourceData.data;
             if (widgetContext.data.length > 1 && !dataUpdateTimer) {
-                dataUpdateTimer = $timeout(onDataUpdated, 100, false);
+                dataUpdateTimer = $timeout(onDataUpdated, 300, false);
             } else {
                 onDataUpdated();
             }
@@ -557,10 +552,6 @@ export default function WidgetController($scope, $timeout, $window, $element, $q
 
     function unsubscribe() {
         if (widget.type !== types.widgetType.rpc.value) {
-            if (timer) {
-                $timeout.cancel(timer);
-                timer = null;
-            }
             if (dataUpdateTimer) {
                 $timeout.cancel(dataUpdateTimer);
                 dataUpdateTimer = null;
@@ -573,19 +564,25 @@ export default function WidgetController($scope, $timeout, $window, $element, $q
         }
     }
 
-    function onTick() {
-        widgetContext.tickUpdate = true;
-        onDataUpdated();
-        timer = $timeout(onTick, 1000, false);
-    }
-
     function subscribe() {
         if (widget.type !== types.widgetType.rpc.value) {
             var index = 0;
             subscriptionTimewindow.fixedWindow = null;
             subscriptionTimewindow.realtimeWindowMs = null;
+            subscriptionTimewindow.aggregation = {
+                limit: 200,
+                type: types.aggregation.avg.value
+            };
             if (widget.type === types.widgetType.timeseries.value &&
                 angular.isDefined(widget.config.timewindow)) {
+
+                if (angular.isDefined(widget.config.timewindow.aggregation)) {
+                    subscriptionTimewindow.aggregation = {
+                        limit: widget.config.timewindow.aggregation.limit || 200,
+                        type: widget.config.timewindow.aggregation.type || types.aggregation.avg.value
+                    };
+                }
+
                 if (angular.isDefined(widget.config.timewindow.realtime)) {
                     subscriptionTimewindow.realtimeWindowMs = widget.config.timewindow.realtime.timewindowMs;
                 } else if (angular.isDefined(widget.config.timewindow.history)) {
@@ -635,10 +632,6 @@ export default function WidgetController($scope, $timeout, $window, $element, $q
                 datasourceListeners.push(listener);
                 datasourceService.subscribeToDatasource(listener);
             }
-
-            if (subscriptionTimewindow.realtimeWindowMs) {
-                timer = $timeout(onTick, 0, false);
-            }
         }
     }
 
diff --git a/ui/src/app/components/widget-config.tpl.html b/ui/src/app/components/widget-config.tpl.html
index 58242ed..23c64fc 100644
--- a/ui/src/app/components/widget-config.tpl.html
+++ b/ui/src/app/components/widget-config.tpl.html
@@ -91,7 +91,7 @@
                 <div ng-show="widgetType === types.widgetType.timeseries.value" layout="row"
                      layout-align="center center">
                     <span translate style="padding-right: 8px;">widget-config.timewindow</span>
-                    <tb-timewindow as-button="true" flex ng-model="timewindow"></tb-timewindow>
+                    <tb-timewindow as-button="true" aggregation flex ng-model="timewindow"></tb-timewindow>
                 </div>
                 <v-accordion id="datasources-accordion" control="datasourcesAccordion" class="vAccordion--default"
                              ng-show="widgetType !== types.widgetType.rpc.value && widgetType !== types.widgetType.static.value">
diff --git a/ui/src/app/locale/locale.constant.js b/ui/src/app/locale/locale.constant.js
index 57b1e2e..525c307 100644
--- a/ui/src/app/locale/locale.constant.js
+++ b/ui/src/app/locale/locale.constant.js
@@ -63,6 +63,17 @@ export default angular.module('thingsboard.locale', [])
                     "import": "Import",
                     "export": "Export"
                 },
+                "aggregation": {
+                    "aggregation": "Aggregation",
+                    "function": "Data aggregation function",
+                    "limit": "Max values",
+                    "min": "Min",
+                    "max": "Max",
+                    "avg": "Average",
+                    "sum": "Sum",
+                    "count": "Count",
+                    "none": "None"
+                },
                 "admin": {
                     "general": "General",
                     "general-settings": "General Settings",