/*
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import thingsboardApiDevice from './device.service';
import thingsboardApiTelemetryWebsocket from './telemetry-websocket.service';
import thingsboardTypes from '../common/types.constant';
import thingsboardUtils from '../common/utils.service';
import DataAggregator from './data-aggregator';
export default angular.module('thingsboard.api.datasource', [thingsboardApiDevice, thingsboardApiTelemetryWebsocket, thingsboardTypes, thingsboardUtils])
.factory('datasourceService', DatasourceService)
.name;
const YEAR = 1000 * 60 * 60 * 24 * 365;
/*@ngInject*/
function DatasourceService($timeout, $filter, $log, telemetryWebsocketService, types, utils) {
var subscriptions = {};
var service = {
subscribeToDatasource: subscribeToDatasource,
unsubscribeFromDatasource: unsubscribeFromDatasource
}
return service;
function subscribeToDatasource(listener) {
var datasource = listener.datasource;
if (datasource.type === types.datasourceType.entity && (!listener.entityId || !listener.entityType)) {
return;
}
var subscriptionDataKeys = [];
for (var d = 0; d < datasource.dataKeys.length; d++) {
var dataKey = datasource.dataKeys[d];
var subscriptionDataKey = {
name: dataKey.name,
type: dataKey.type,
funcBody: dataKey.funcBody,
postFuncBody: dataKey.postFuncBody
}
subscriptionDataKeys.push(subscriptionDataKey);
}
var datasourceSubscription = {
datasourceType: datasource.type,
dataKeys: subscriptionDataKeys,
type: listener.subscriptionType
};
if (listener.subscriptionType === types.widgetType.timeseries.value) {
datasourceSubscription.subscriptionTimewindow = angular.copy(listener.subscriptionTimewindow);
}
if (datasourceSubscription.datasourceType === types.datasourceType.entity) {
datasourceSubscription.entityType = listener.entityType;
datasourceSubscription.entityId = listener.entityId;
}
listener.datasourceSubscriptionKey = utils.objectHashCode(datasourceSubscription);
var subscription;
if (subscriptions[listener.datasourceSubscriptionKey]) {
subscription = subscriptions[listener.datasourceSubscriptionKey];
subscription.syncListener(listener);
} else {
subscription = new DatasourceSubscription(datasourceSubscription, telemetryWebsocketService, $timeout, $filter, $log, types, utils);
subscriptions[listener.datasourceSubscriptionKey] = subscription;
subscription.start();
}
subscription.addListener(listener);
}
function unsubscribeFromDatasource(listener) {
if (listener.datasourceSubscriptionKey) {
if (subscriptions[listener.datasourceSubscriptionKey]) {
var subscription = subscriptions[listener.datasourceSubscriptionKey];
subscription.removeListener(listener);
if (!subscription.hasListeners()) {
subscription.unsubscribe();
delete subscriptions[listener.datasourceSubscriptionKey];
}
}
listener.datasourceSubscriptionKey = null;
}
}
}
function DatasourceSubscription(datasourceSubscription, telemetryWebsocketService, $timeout, $filter, $log, types, utils) {
var listeners = [];
var datasourceType = datasourceSubscription.datasourceType;
var datasourceData = {};
var dataKeys = {};
var subscribers = [];
var history = datasourceSubscription.subscriptionTimewindow &&
datasourceSubscription.subscriptionTimewindow.fixedWindow;
var realtime = datasourceSubscription.subscriptionTimewindow &&
datasourceSubscription.subscriptionTimewindow.realtimeWindowMs;
var timer;
var frequency;
var tickElapsed = 0;
var tickScheduledTime = 0;
var dataAggregator;
var subscription = {
addListener: addListener,
hasListeners: hasListeners,
removeListener: removeListener,
syncListener: syncListener,
start: start,
unsubscribe: unsubscribe
}
initializeSubscription();
return subscription;
function initializeSubscription() {
for (var i = 0; i < datasourceSubscription.dataKeys.length; i++) {
var dataKey = angular.copy(datasourceSubscription.dataKeys[i]);
dataKey.index = i;
var key;
if (datasourceType === types.datasourceType.function) {
if (!dataKey.func) {
dataKey.func = new Function("time", "prevValue", dataKey.funcBody);
}
} else {
if (dataKey.postFuncBody && !dataKey.postFunc) {
dataKey.postFunc = new Function("time", "value", "prevValue", dataKey.postFuncBody);
}
}
if (datasourceType === types.datasourceType.entity || datasourceSubscription.type === types.widgetType.timeseries.value) {
if (datasourceType === types.datasourceType.function) {
key = dataKey.name + '_' + dataKey.index + '_' + dataKey.type;
} else {
key = dataKey.name + '_' + dataKey.type;
}
var dataKeysList = dataKeys[key];
if (!dataKeysList) {
dataKeysList = [];
dataKeys[key] = dataKeysList;
}
var index = dataKeysList.push(dataKey) - 1;
datasourceData[key + '_' + index] = {
data: []
};
} else {
key = utils.objectHashCode(dataKey);
datasourceData[key] = {
data: []
};
dataKeys[key] = dataKey;
}
dataKey.key = key;
}
if (datasourceType === types.datasourceType.function) {
frequency = 1000;
if (datasourceSubscription.type === types.widgetType.timeseries.value) {
frequency = Math.min(datasourceSubscription.subscriptionTimewindow.aggregation.interval, 5000);
}
}
}
function addListener(listener) {
listeners.push(listener);
if (history) {
start();
}
}
function hasListeners() {
return listeners.length > 0;
}
function removeListener(listener) {
listeners.splice(listeners.indexOf(listener), 1);
}
function syncListener(listener) {
var key;
var dataKey;
if (datasourceType === types.datasourceType.entity || datasourceSubscription.type === types.widgetType.timeseries.value) {
for (key in dataKeys) {
var dataKeysList = dataKeys[key];
for (var i = 0; i < dataKeysList.length; i++) {
dataKey = dataKeysList[i];
var datasourceKey = key + '_' + i;
listener.dataUpdated(datasourceData[datasourceKey],
listener.datasourceIndex,
dataKey.index, false);
}
}
} else {
for (key in dataKeys) {
dataKey = dataKeys[key];
listener.dataUpdated(datasourceData[key],
listener.datasourceIndex,
dataKey.index, false);
}
}
}
function start() {
if (history && !hasListeners()) {
return;
}
var subsTw = datasourceSubscription.subscriptionTimewindow;
var tsKeyNames = [];
var dataKey;
if (datasourceType === types.datasourceType.entity) {
//send subscribe command
var tsKeys = '';
var attrKeys = '';
for (var key in dataKeys) {
var dataKeysList = dataKeys[key];
dataKey = dataKeysList[0];
if (dataKey.type === types.dataKeyType.timeseries) {
if (tsKeys.length > 0) {
tsKeys += ',';
}
tsKeys += dataKey.name;
tsKeyNames.push(dataKey.name);
} else if (dataKey.type === types.dataKeyType.attribute) {
if (attrKeys.length > 0) {
attrKeys += ',';
}
attrKeys += dataKey.name;
}
}
if (tsKeys.length > 0) {
var subscriber;
if (history) {
var historyCommand = {
entityType: datasourceSubscription.entityType,
entityId: datasourceSubscription.entityId,
keys: tsKeys,
startTs: subsTw.fixedWindow.startTimeMs,
endTs: subsTw.fixedWindow.endTimeMs,
interval: subsTw.aggregation.interval,
limit: subsTw.aggregation.limit,
agg: subsTw.aggregation.type
};
subscriber = {
historyCommands: [ historyCommand ],
type: types.dataKeyType.timeseries,
subsTw: subsTw
};
if (subsTw.aggregation.stateData) {
subscriber.firstStateHistoryCommand = createFirstStateHistoryCommand(subsTw.fixedWindow.startTimeMs, tsKeys);
subscriber.historyCommands.push(subscriber.firstStateHistoryCommand);
}
subscriber.onData = function (data, subscriptionId) {
if (this.subsTw.aggregation.stateData &&
this.firstStateHistoryCommand && this.firstStateHistoryCommand.cmdId == subscriptionId) {
if (this.data) {
onStateHistoryData(data, this.data, this.subsTw.aggregation.limit,
subsTw.fixedWindow.startTimeMs, this.subsTw.fixedWindow.endTimeMs,
(data) => {
onData(data.data, types.dataKeyType.timeseries, true);
});
} else {
this.firstStateData = data;
}
} else {
if (this.subsTw.aggregation.stateData) {
if (this.firstStateData) {
onStateHistoryData(this.firstStateData, data, this.subsTw.aggregation.limit,
this.subsTw.fixedWindow.startTimeMs, this.subsTw.fixedWindow.endTimeMs,
(data) => {
onData(data.data, types.dataKeyType.timeseries, true);
});
} else {
this.data = data;
}
} else {
for (key in data.data) {
var keyData = data.data[key];
data.data[key] = $filter('orderBy')(keyData, '+this[0]');
}
onData(data.data, types.dataKeyType.timeseries, true);
}
}
};
subscriber.onReconnected = function() {};
telemetryWebsocketService.subscribe(subscriber);
subscribers.push(subscriber);
} else {
var subscriptionCommand = {
entityType: datasourceSubscription.entityType,
entityId: datasourceSubscription.entityId,
keys: tsKeys
};
subscriber = {
subscriptionCommands: [subscriptionCommand],
type: types.dataKeyType.timeseries
};
if (datasourceSubscription.type === types.widgetType.timeseries.value) {
subscriber.subsTw = subsTw;
updateRealtimeSubscriptionCommand(subscriptionCommand, subsTw);
if (subsTw.aggregation.stateData) {
subscriber.firstStateSubscriptionCommand = createFirstStateHistoryCommand(subsTw.startTs, tsKeys);
subscriber.historyCommands = [subscriber.firstStateSubscriptionCommand];
}
dataAggregator = createRealtimeDataAggregator(subsTw, tsKeyNames, types.dataKeyType.timeseries);
subscriber.onData = function(data, subscriptionId) {
if (this.subsTw.aggregation.stateData &&
this.firstStateSubscriptionCommand && this.firstStateSubscriptionCommand.cmdId == subscriptionId) {
if (this.data) {
onStateHistoryData(data, this.data, this.subsTw.aggregation.limit,
this.subsTw.startTs, this.subsTw.startTs + this.subsTw.aggregation.timeWindow,
(data) => {
dataAggregator.onData(data, false, false, true);
});
this.stateDataReceived = true;
} else {
this.firstStateData = data;
}
} else {
if (this.subsTw.aggregation.stateData && !this.stateDataReceived) {
if (this.firstStateData) {
onStateHistoryData(this.firstStateData, data, this.subsTw.aggregation.limit,
this.subsTw.startTs, this.subsTw.startTs + this.subsTw.aggregation.timeWindow,
(data) => {
dataAggregator.onData(data, false, false, true);
});
this.stateDataReceived = true;
} else {
this.data = data;
}
} else {
dataAggregator.onData(data, false, false, true);
}
}
}
subscriber.onReconnected = function() {
var newSubsTw = null;
for (var i2 = 0; i2 < listeners.length; i2++) {
var listener = listeners[i2];
if (!newSubsTw) {
newSubsTw = listener.updateRealtimeSubscription();
} else {
listener.setRealtimeSubscription(newSubsTw);
}
}
this.subsTw = newSubsTw;
this.firstStateData = null;
this.data = null;
this.stateDataReceived = false;
updateRealtimeSubscriptionCommand(this.subscriptionCommands[0], this.subsTw);
if (this.subsTw.aggregation.stateData) {
updateFirstStateHistoryCommand(this.firstStateSubscriptionCommand, this.subsTw.startTs);
}
dataAggregator.reset(newSubsTw.startTs, newSubsTw.aggregation.timeWindow, newSubsTw.aggregation.interval);
}
} else {
subscriber.onReconnected = function() {}
subscriber.onData = function(data) {
if (data.data) {
onData(data.data, types.dataKeyType.timeseries, true);
}
}
}
telemetryWebsocketService.subscribe(subscriber);
subscribers.push(subscriber);
}
}
if (attrKeys.length > 0) {
var attrsSubscriptionCommand = {
entityType: datasourceSubscription.entityType,
entityId: datasourceSubscription.entityId,
keys: attrKeys
};
subscriber = {
subscriptionCommands: [attrsSubscriptionCommand],
type: types.dataKeyType.attribute,
onData: function (data) {
if (data.data) {
onData(data.data, types.dataKeyType.attribute, true);
}
},
onReconnected: function() {}
};
telemetryWebsocketService.subscribe(subscriber);
subscribers.push(subscriber);
}
} else if (datasourceType === types.datasourceType.function) {
if (datasourceSubscription.type === types.widgetType.timeseries.value) {
for (key in dataKeys) {
var dataKeyList = dataKeys[key];
for (var index = 0; index < dataKeyList.length; index++) {
dataKey = dataKeyList[index];
tsKeyNames.push(dataKey.name+'_'+dataKey.index);
}
}
dataAggregator = createRealtimeDataAggregator(subsTw, tsKeyNames, types.dataKeyType.function);
}
tickScheduledTime = currentTime();
if (history) {
onTick(false);
} else {
timer = $timeout(
function() {
onTick(true)
},
0,
false
);
}
}
}
function createFirstStateHistoryCommand(startTs, tsKeys) {
return {
entityType: datasourceSubscription.entityType,
entityId: datasourceSubscription.entityId,
keys: tsKeys,
startTs: startTs - YEAR,
endTs: startTs,
interval: 1000,
limit: 1,
agg: types.aggregation.none.value
};
}
function updateFirstStateHistoryCommand(stateHistoryCommand, startTs) {
stateHistoryCommand.startTs = startTs - YEAR;
stateHistoryCommand.endTs = startTs;
}
function onStateHistoryData(firstStateData, data, limit, startTs, endTs, onData) {
for (var key in data.data) {
var keyData = data.data[key];
data.data[key] = $filter('orderBy')(keyData, '+this[0]');
keyData = data.data[key];
if (keyData.length < limit) {
var firstStateKeyData = firstStateData.data[key];
if (firstStateKeyData.length) {
var firstStateDataTsKv = firstStateKeyData[0];
firstStateDataTsKv[0] = startTs;
firstStateKeyData = [
[ startTs, firstStateKeyData[0][1] ]
];
keyData.unshift(firstStateDataTsKv);
}
}
if (keyData.length) {
var lastTsKv = angular.copy(keyData[keyData.length-1]);
lastTsKv[0] = endTs;
keyData.push(lastTsKv);
}
}
onData(data);
}
function createRealtimeDataAggregator(subsTw, tsKeyNames, dataKeyType) {
return new DataAggregator(
function(data, apply) {
onData(data, dataKeyType, apply);
},
tsKeyNames,
subsTw.startTs,
subsTw.aggregation.limit,
subsTw.aggregation.type,
subsTw.aggregation.timeWindow,
subsTw.aggregation.interval,
subsTw.aggregation.stateData,
types,
$timeout,
$filter
);
}
function updateRealtimeSubscriptionCommand(subscriptionCommand, subsTw) {
subscriptionCommand.startTs = subsTw.startTs;
subscriptionCommand.timeWindow = subsTw.aggregation.timeWindow;
subscriptionCommand.interval = subsTw.aggregation.interval;
subscriptionCommand.limit = subsTw.aggregation.limit;
subscriptionCommand.agg = subsTw.aggregation.type;
}
function unsubscribe() {
if (timer) {
$timeout.cancel(timer);
timer = null;
}
if (datasourceType === types.datasourceType.entity) {
for (var i=0;i<subscribers.length;i++) {
var subscriber = subscribers[i];
telemetryWebsocketService.unsubscribe(subscriber);
if (subscriber.onDestroy) {
subscriber.onDestroy();
}
}
subscribers.length = 0;
}
if (dataAggregator) {
dataAggregator.destroy();
dataAggregator = null;
}
}
function generateSeries(dataKey, index, startTime, endTime) {
var data = [];
var prevSeries;
var datasourceDataKey = dataKey.key + '_' + index;
var datasourceKeyData = datasourceData[datasourceDataKey].data;
if (datasourceKeyData.length > 0) {
prevSeries = datasourceKeyData[datasourceKeyData.length - 1];
} else {
prevSeries = [0, 0];
}
for (var time = startTime; time <= endTime && (timer || history); time += frequency) {
var series = [];
series.push(time);
var value = dataKey.func(time, prevSeries[1]);
series.push(value);
data.push(series);
prevSeries = series;
}
if (data.length > 0) {
dataKey.lastUpdateTime = data[data.length - 1][0];
}
return data;
}
function generateLatest(dataKey, apply) {
var prevSeries;
var datasourceKeyData = datasourceData[dataKey.key].data;
if (datasourceKeyData.length > 0) {
prevSeries = datasourceKeyData[datasourceKeyData.length - 1];
} else {
prevSeries = [0, 0];
}
var series = [];
var time = (new Date).getTime();
series.push(time);
var value = dataKey.func(time, prevSeries[1]);
series.push(value);
datasourceData[dataKey.key].data = [series];
for (var i = 0; i < listeners.length; i++) {
var listener = listeners[i];
listener.dataUpdated(datasourceData[dataKey.key],
listener.datasourceIndex,
dataKey.index, apply);
}
}
/* eslint-disable */
function currentTime() {
return window.performance && window.performance.now ?
window.performance.now() : Date.now();
}
/* eslint-enable */
function onTick(apply) {
var now = currentTime();
tickElapsed += now - tickScheduledTime;
tickScheduledTime = now;
if (timer) {
$timeout.cancel(timer);
}
var key;
if (datasourceSubscription.type === types.widgetType.timeseries.value) {
var startTime;
var endTime;
var delta;
var generatedData = {
data: {
}
};
if (!history) {
delta = Math.floor(tickElapsed / frequency);
}
var deltaElapsed = history ? frequency : delta * frequency;
tickElapsed = tickElapsed - deltaElapsed;
for (key in dataKeys) {
var dataKeyList = dataKeys[key];
for (var index = 0; index < dataKeyList.length && (timer || history); index ++) {
var dataKey = dataKeyList[index];
if (!startTime) {
if (realtime) {
if (dataKey.lastUpdateTime) {
startTime = dataKey.lastUpdateTime + frequency;
endTime = dataKey.lastUpdateTime + deltaElapsed;
} else {
startTime = datasourceSubscription.subscriptionTimewindow.startTs;
endTime = startTime + datasourceSubscription.subscriptionTimewindow.realtimeWindowMs + frequency;
if (datasourceSubscription.subscriptionTimewindow.aggregation.type == types.aggregation.none.value) {
var time = endTime - frequency * datasourceSubscription.subscriptionTimewindow.aggregation.limit;
startTime = Math.max(time, startTime);
}
}
} else {
startTime = datasourceSubscription.subscriptionTimewindow.fixedWindow.startTimeMs;
endTime = datasourceSubscription.subscriptionTimewindow.fixedWindow.endTimeMs;
}
}
var data = generateSeries(dataKey, index, startTime, endTime);
generatedData.data[dataKey.name+'_'+dataKey.index] = data;
}
}
if (dataAggregator) {
dataAggregator.onData(generatedData, true, history, apply);
}
} else if (datasourceSubscription.type === types.widgetType.latest.value) {
for (key in dataKeys) {
generateLatest(dataKeys[key], apply);
}
}
if (!history) {
timer = $timeout(function() {onTick(true)}, frequency, false);
}
}
function isNumeric(val) {
return (val - parseFloat( val ) + 1) >= 0;
}
function convertValue(val) {
if (val && isNumeric(val)) {
return Number(val);
} else {
return val;
}
}
function onData(sourceData, type, apply) {
for (var keyName in sourceData) {
var keyData = sourceData[keyName];
var key = keyName + '_' + type;
var dataKeyList = dataKeys[key];
for (var keyIndex = 0; dataKeyList && keyIndex < dataKeyList.length; keyIndex++) {
var datasourceKey = key + "_" + keyIndex;
if (datasourceData[datasourceKey].data) {
var dataKey = dataKeyList[keyIndex];
var data = [];
var prevSeries;
var datasourceKeyData;
var update = false;
if (realtime) {
datasourceKeyData = [];
} else {
datasourceKeyData = datasourceData[datasourceKey].data;
}
if (datasourceKeyData.length > 0) {
prevSeries = datasourceKeyData[datasourceKeyData.length - 1];
} else {
prevSeries = [0, 0];
}
if (datasourceSubscription.type === types.widgetType.timeseries.value) {
var series, time, value;
for (var i = 0; i < keyData.length; i++) {
series = keyData[i];
time = series[0];
value = convertValue(series[1]);
if (dataKey.postFunc) {
value = dataKey.postFunc(time, value, prevSeries[1]);
}
series = [time, value];
data.push(series);
prevSeries = series;
}
update = true;
} else if (datasourceSubscription.type === types.widgetType.latest.value) {
if (keyData.length > 0) {
series = keyData[0];
time = series[0];
value = convertValue(series[1]);
if (dataKey.postFunc) {
value = dataKey.postFunc(time, value, prevSeries[1]);
}
series = [time, value];
data.push(series);
}
update = true;
}
if (update) {
datasourceData[datasourceKey].data = data;
for (var i2 = 0; i2 < listeners.length; i2++) {
var listener = listeners[i2];
if (angular.isFunction(listener))
continue;
listener.dataUpdated(datasourceData[datasourceKey],
listener.datasourceIndex,
dataKey.index, apply);
}
}
}
}
}
}
}