thingsboard-memoizeit
Changes
application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java 8(+7 -1)
extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/TelemetryUploadRequestRuleToPluginMsg.java 7(+6 -1)
extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java 5(+4 -1)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginAction.java 21(+16 -5)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginActionConfiguration.java 31(+31 -0)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java 8(+7 -1)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index 57a838f..fcde477 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -168,12 +168,18 @@ public final class PluginProcessingContext implements PluginContext {
@Override
public void saveTsData(final EntityId entityId, final List<TsKvEntry> entries, final PluginCallback<Void> callback) {
+ saveTsData(entityId, entries, 0L, callback);
+ }
+
+ @Override
+ public void saveTsData(final EntityId entityId, final List<TsKvEntry> entries, long ttl, final PluginCallback<Void> callback) {
validate(entityId, new ValidationCallback(callback, ctx -> {
- ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(entityId, entries);
+ ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(entityId, entries, ttl);
Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
}));
}
+
@Override
public void loadTimeseries(final EntityId entityId, final List<TsKvQuery> queries, final PluginCallback<List<TsKvEntry>> callback) {
validate(entityId, new ValidationCallback(callback, ctx -> {
diff --git a/application/src/main/resources/actor-system.conf b/application/src/main/resources/actor-system.conf
index e8e899a..3cd319f 100644
--- a/application/src/main/resources/actor-system.conf
+++ b/application/src/main/resources/actor-system.conf
@@ -19,7 +19,7 @@ akka {
# JVM shutdown, System.exit(-1), in case of a fatal error,
# such as OutOfMemoryError
jvm-exit-on-fatal-error = off
- loglevel = "INFO"
+ loglevel = "DEBUG"
loggers = ["akka.event.slf4j.Slf4jLogger"]
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
index 26fe3fb..3de2127 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
@@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.dao.AbstractAsyncDao;
-import org.thingsboard.server.dao.AbstractDao;
import org.thingsboard.server.dao.model.ModelConstants;
import javax.annotation.Nullable;
@@ -40,8 +39,6 @@ import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
@@ -64,8 +61,10 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao
private TsPartitionDate tsFormat;
private PreparedStatement partitionInsertStmt;
+ private PreparedStatement partitionInsertTtlStmt;
private PreparedStatement[] latestInsertStmts;
private PreparedStatement[] saveStmts;
+ private PreparedStatement[] saveTtlStmts;
private PreparedStatement[] fetchStmts;
private PreparedStatement findLatestStmt;
private PreparedStatement findAllLatestStmt;
@@ -255,15 +254,32 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao
}
@Override
- public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry) {
+ public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
DataType type = tsKvEntry.getDataType();
- BoundStatement stmt = getSaveStmt(type).bind()
- .setString(0, entityId.getEntityType().name())
+ BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
+ stmt.setString(0, entityId.getEntityType().name())
.setUUID(1, entityId.getId())
.setString(2, tsKvEntry.getKey())
.setLong(3, partition)
.setLong(4, tsKvEntry.getTs());
addValue(tsKvEntry, stmt, 5);
+ if (ttl > 0) {
+ stmt.setInt(6, (int) ttl);
+ }
+ return executeAsyncWrite(stmt);
+ }
+
+ @Override
+ public ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl) {
+ log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
+ BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind();
+ stmt = stmt.setString(0, entityId.getEntityType().name())
+ .setUUID(1, entityId.getId())
+ .setLong(2, partition)
+ .setString(3, key);
+ if (ttl > 0) {
+ stmt.setInt(4, (int) ttl);
+ }
return executeAsyncWrite(stmt);
}
@@ -280,16 +296,6 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao
}
@Override
- public ResultSetFuture savePartition(EntityId entityId, long partition, String key) {
- log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
- return executeAsyncWrite(getPartitionInsertStmt().bind()
- .setString(0, entityId.getEntityType().name())
- .setUUID(1, entityId.getId())
- .setLong(2, partition)
- .setString(3, key));
- }
-
- @Override
public List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
List<TsKvEntry> entries = new ArrayList<>(rows.size());
if (!rows.isEmpty()) {
@@ -365,6 +371,23 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao
return saveStmts[dataType.ordinal()];
}
+ private PreparedStatement getSaveTtlStmt(DataType dataType) {
+ if (saveTtlStmts == null) {
+ saveTtlStmts = new PreparedStatement[DataType.values().length];
+ for (DataType type : DataType.values()) {
+ saveTtlStmts[type.ordinal()] = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_CF +
+ "(" + ModelConstants.ENTITY_TYPE_COLUMN +
+ "," + ModelConstants.ENTITY_ID_COLUMN +
+ "," + ModelConstants.KEY_COLUMN +
+ "," + ModelConstants.PARTITION_COLUMN +
+ "," + ModelConstants.TS_COLUMN +
+ "," + getColumnName(type) + ")" +
+ " VALUES(?, ?, ?, ?, ?, ?) USING TTL ?");
+ }
+ }
+ return saveTtlStmts[dataType.ordinal()];
+ }
+
private PreparedStatement getFetchStmt(Aggregation aggType) {
if (fetchStmts == null) {
fetchStmts = new PreparedStatement[Aggregation.values().length];
@@ -418,6 +441,19 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao
return partitionInsertStmt;
}
+ private PreparedStatement getPartitionInsertTtlStmt() {
+ if (partitionInsertTtlStmt == null) {
+ partitionInsertTtlStmt = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_PARTITIONS_CF +
+ "(" + ModelConstants.ENTITY_TYPE_COLUMN +
+ "," + ModelConstants.ENTITY_ID_COLUMN +
+ "," + ModelConstants.PARTITION_COLUMN +
+ "," + ModelConstants.KEY_COLUMN + ")" +
+ " VALUES(?, ?, ?, ?) USING TTL ?");
+ }
+ return partitionInsertTtlStmt;
+ }
+
+
private PreparedStatement getFindLatestStmt() {
if (findLatestStmt == null) {
findLatestStmt = getSession().prepare("SELECT " +
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index 5d722f9..40135b2 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -87,29 +87,33 @@ public class BaseTimeseriesService implements TimeseriesService {
if (tsKvEntry == null) {
throw new IncorrectParameterException("Key value entry can't be null");
}
- UUID uid = entityId.getId();
long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
- saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs);
+ saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs, 0L);
return Futures.allAsList(futures);
}
@Override
public ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntries) {
+ return save(entityId, tsKvEntries, 0L);
+ }
+
+ @Override
+ public ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
validate(entityId);
List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY);
for (TsKvEntry tsKvEntry : tsKvEntries) {
if (tsKvEntry == null) {
throw new IncorrectParameterException("Key value entry can't be null");
}
- UUID uid = entityId.getId();
long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
- saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs);
+ saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs, ttl);
}
return Futures.allAsList(futures);
}
+
@Override
public TsKvEntry convertResultToTsKvEntry(Row row) {
return timeseriesDao.convertResultToTsKvEntry(row);
@@ -120,10 +124,10 @@ public class BaseTimeseriesService implements TimeseriesService {
return timeseriesDao.convertResultToTsKvEntryList(rs.all());
}
- private void saveAndRegisterFutures(List<ResultSetFuture> futures, EntityId entityId, TsKvEntry tsKvEntry, long partitionTs) {
- futures.add(timeseriesDao.savePartition(entityId, partitionTs, tsKvEntry.getKey()));
+ private void saveAndRegisterFutures(List<ResultSetFuture> futures, EntityId entityId, TsKvEntry tsKvEntry, long partitionTs, long ttl) {
+ futures.add(timeseriesDao.savePartition(entityId, partitionTs, tsKvEntry.getKey(), ttl));
futures.add(timeseriesDao.saveLatest(entityId, tsKvEntry));
- futures.add(timeseriesDao.save(entityId, partitionTs, tsKvEntry));
+ futures.add(timeseriesDao.save(entityId, partitionTs, tsKvEntry, ttl));
}
private static void validate(EntityId entityId) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
index 1f9871e..08a61f6 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
@@ -23,9 +23,6 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
/**
* @author Andrew Shvayka
@@ -40,9 +37,9 @@ public interface TimeseriesDao {
ResultSetFuture findAllLatest(EntityId entityId);
- ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry);
+ ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl);
- ResultSetFuture savePartition(EntityId entityId, long partition, String key);
+ ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl);
ResultSetFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
index 5c9c961..fe56074 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
@@ -44,6 +44,8 @@ public interface TimeseriesService {
ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntry);
+ ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
+
TsKvEntry convertResultToTsKvEntry(Row row);
List<TsKvEntry> convertResultSetToTsKvEntryList(ResultSet rs);
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/TelemetryUploadRequestRuleToPluginMsg.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/TelemetryUploadRequestRuleToPluginMsg.java
index 2d1678a..df5c900 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/TelemetryUploadRequestRuleToPluginMsg.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/TelemetryUploadRequestRuleToPluginMsg.java
@@ -23,9 +23,14 @@ import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
public class TelemetryUploadRequestRuleToPluginMsg extends AbstractRuleToPluginMsg<TelemetryUploadRequest> {
private static final long serialVersionUID = 1L;
+ private final long ttl;
- public TelemetryUploadRequestRuleToPluginMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, TelemetryUploadRequest payload) {
+ public TelemetryUploadRequestRuleToPluginMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, TelemetryUploadRequest payload, long ttl) {
super(tenantId, customerId, deviceId, payload);
+ this.ttl = ttl;
}
+ public long getTtl() {
+ return ttl;
+ }
}
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
index 2477ac3..2d346e7 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
@@ -79,7 +79,9 @@ public interface PluginContext {
void saveTsData(EntityId entityId, TsKvEntry entry, PluginCallback<Void> callback);
- void saveTsData(EntityId entityId, List<TsKvEntry> entry, PluginCallback<Void> callback);
+ void saveTsData(EntityId entityId, List<TsKvEntry> entries, PluginCallback<Void> callback);
+
+ void saveTsData(EntityId deviceId, List<TsKvEntry> entries, long ttl, PluginCallback<Void> pluginCallback);
void loadTimeseries(EntityId entityId, List<TsKvQuery> queries, PluginCallback<List<TsKvEntry>> callback);
@@ -106,4 +108,5 @@ public interface PluginContext {
void loadAttributes(EntityId entityId, Collection<String> attributeTypes, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback);
void getCustomerDevices(TenantId tenantId, CustomerId customerId, int limit, PluginCallback<List<Device>> callback);
+
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginAction.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginAction.java
index 63006aa..ce6fd57 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginAction.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginAction.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.extensions.core.action.telemetry;
+import org.springframework.util.StringUtils;
import org.thingsboard.server.common.msg.core.GetAttributesRequest;
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
import org.thingsboard.server.common.msg.core.UpdateAttributesRequest;
@@ -23,7 +24,6 @@ import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.MsgType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
import org.thingsboard.server.extensions.api.component.Action;
-import org.thingsboard.server.extensions.api.component.EmptyComponentConfiguration;
import org.thingsboard.server.extensions.api.plugins.PluginAction;
import org.thingsboard.server.extensions.api.plugins.msg.*;
import org.thingsboard.server.extensions.api.rules.RuleContext;
@@ -31,11 +31,22 @@ import org.thingsboard.server.extensions.api.rules.RuleProcessingMetaData;
import org.thingsboard.server.extensions.api.rules.SimpleRuleLifecycleComponent;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
-@Action(name = "Telemetry Plugin Action")
-public class TelemetryPluginAction extends SimpleRuleLifecycleComponent implements PluginAction<EmptyComponentConfiguration> {
+@Action(name = "Telemetry Plugin Action", descriptor = "TelemetryPluginActionDescriptor.json", configuration = TelemetryPluginActionConfiguration.class)
+public class TelemetryPluginAction extends SimpleRuleLifecycleComponent implements PluginAction<TelemetryPluginActionConfiguration> {
- public void init(EmptyComponentConfiguration configuration) {
+ protected TelemetryPluginActionConfiguration configuration;
+ protected long ttl;
+
+ @Override
+ public void init(TelemetryPluginActionConfiguration configuration) {
+ this.configuration = configuration;
+ if (StringUtils.isEmpty(configuration.getTimeUnit()) || configuration.getTtlValue() == 0L) {
+ this.ttl = 0L;
+ } else {
+ this.ttl = TimeUnit.valueOf(configuration.getTimeUnit()).toSeconds(configuration.getTtlValue());
+ }
}
@Override
@@ -44,7 +55,7 @@ public class TelemetryPluginAction extends SimpleRuleLifecycleComponent implemen
if (msg.getMsgType() == MsgType.POST_TELEMETRY_REQUEST) {
TelemetryUploadRequest payload = (TelemetryUploadRequest) msg;
return Optional.of(new TelemetryUploadRequestRuleToPluginMsg(toDeviceActorMsg.getTenantId(), toDeviceActorMsg.getCustomerId(),
- toDeviceActorMsg.getDeviceId(), payload));
+ toDeviceActorMsg.getDeviceId(), payload, ttl));
} else if (msg.getMsgType() == MsgType.POST_ATTRIBUTES_REQUEST) {
UpdateAttributesRequest payload = (UpdateAttributesRequest) msg;
return Optional.of(new UpdateAttributesRequestRuleToPluginMsg(toDeviceActorMsg.getTenantId(), toDeviceActorMsg.getCustomerId(),
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginActionConfiguration.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginActionConfiguration.java
new file mode 100644
index 0000000..896481e
--- /dev/null
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginActionConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.extensions.core.action.telemetry;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.Data;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Data
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class TelemetryPluginActionConfiguration {
+
+ private String timeUnit;
+ private int ttlValue;
+
+}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
index 5b32474..440e28b 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
@@ -148,6 +148,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
String[] pathParams = request.getPathParams();
EntityId entityId;
String scope;
+ long ttl = 0L;
TelemetryFeature feature;
if (pathParams.length == 2) {
entityId = DeviceId.fromString(pathParams[0]);
@@ -161,6 +162,11 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
entityId = EntityIdFactory.getByTypeAndId(pathParams[0], pathParams[1]);
feature = TelemetryFeature.forName(pathParams[2].toUpperCase());
scope = pathParams[3];
+ } else if (pathParams.length == 5) {
+ entityId = EntityIdFactory.getByTypeAndId(pathParams[0], pathParams[1]);
+ feature = TelemetryFeature.forName(pathParams[2].toUpperCase());
+ scope = pathParams[3];
+ ttl = Long.parseLong(pathParams[4]);
} else {
msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
return;
@@ -211,7 +217,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
entries.add(new BasicTsKvEntry(entry.getKey(), kv));
}
}
- ctx.saveTsData(entityId, entries, new PluginCallback<Void>() {
+ ctx.saveTsData(entityId, entries, ttl, new PluginCallback<Void>() {
@Override
public void onSuccess(PluginContext ctx, Void value) {
msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
index 1ce797f..242345d 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
@@ -92,7 +92,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
tsKvEntries.add(new BasicTsKvEntry(entry.getKey(), kv));
}
}
- ctx.saveTsData(msg.getDeviceId(), tsKvEntries, new PluginCallback<Void>() {
+ ctx.saveTsData(msg.getDeviceId(), tsKvEntries, msg.getTtl(), new PluginCallback<Void>() {
@Override
public void onSuccess(PluginContext ctx, Void data) {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onSuccess(request.getMsgType(), request.getRequestId())));
diff --git a/extensions-core/src/main/resources/TelemetryPluginActionDescriptor.json b/extensions-core/src/main/resources/TelemetryPluginActionDescriptor.json
new file mode 100644
index 0000000..0486816
--- /dev/null
+++ b/extensions-core/src/main/resources/TelemetryPluginActionDescriptor.json
@@ -0,0 +1,50 @@
+{
+ "schema": {
+ "title": "Telemetry Plugin Action Configuration",
+ "type": "object",
+ "properties": {
+ "timeUnit": {
+ "title": "Time Unit",
+ "type": "string",
+ "default": "DAYS"
+ },
+ "ttlValue": {
+ "title": "TTL",
+ "type": "integer",
+ "default": 365,
+ "minimum": 0,
+ "maximum": 100000
+ }
+ },
+ "required": [
+ "timeUnit",
+ "ttlValue"
+ ]
+ },
+ "form": [
+ {
+ "key": "timeUnit",
+ "type": "rc-select",
+ "multiple": false,
+ "items": [
+ {
+ "value": "SECONDS",
+ "label": "Seconds"
+ },
+ {
+ "value": "MINUTES",
+ "label": "Minutes"
+ },
+ {
+ "value": "HOURS",
+ "label": "Hours"
+ },
+ {
+ "value": "DAYS",
+ "label": "Days"
+ }
+ ]
+ },
+ "ttlValue"
+ ]
+}
\ No newline at end of file