thingsboard-developers
Changes
application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java 20(+19 -1)
application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java 9(+5 -4)
application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java 4(+2 -2)
common/message/src/main/java/org/thingsboard/server/common/msg/core/ToServerRpcRequestMsg.java 6(+3 -3)
dao/src/main/java/org/thingsboard/server/dao/component/CassandraBaseComponentDescriptorDao.java 4(+2 -2)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/NodeConfiguration.java 22(+22 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java 1(+1 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java 10(+9 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java 1(+1 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java 17(+16 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java 1(+1 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java 11(+10 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java 1(+1 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java 13(+12 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java 1(+1 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java 14(+13 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java 1(+1 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java 19(+18 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java 1(+1 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java 1(+1 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java 14(+13 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java 1(+1 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java 11(+10 -1)
ui/src/app/api/rule-chain.service.js 25(+15 -10)
ui/src/app/components/ace-editor-fix.js 45(+45 -0)
ui/src/app/components/json-object-edit.directive.js 168(+168 -0)
ui/src/app/components/json-object-edit.scss 35(+35 -0)
ui/src/app/layout/index.js 4(+3 -1)
ui/src/app/locale/locale.constant.js 1(+1 -0)
ui/src/app/rulechain/rulechain.controller.js 82(+41 -41)
ui/src/app/rulechain/rulenode.scss 22(+22 -0)
ui/src/scss/main.scss 18(+18 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
index 09aaf80..f6bf54d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
@@ -28,9 +28,14 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.plugin.PluginMetaData;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
+import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
+import org.thingsboard.server.common.msg.session.MsgType;
import org.thingsboard.server.extensions.api.plugins.Plugin;
import org.thingsboard.server.extensions.api.plugins.PluginInitializationException;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
+import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg;
+import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
@@ -98,7 +103,20 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
public void onRuleToPluginMsg(RuleToPluginMsgWrapper msg) throws RuleException {
if (state == ComponentLifecycleState.ACTIVE) {
- pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg());
+ try {
+ pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg());
+ } catch (Exception ex) {
+ logger.debug("[{}] Failed to process RuleToPlugin msg: [{}] [{}]", tenantId, msg.getMsg(), ex);
+ RuleToPluginMsg ruleMsg = msg.getMsg();
+ MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR;
+ Integer requestId = 0;
+ if (ruleMsg.getPayload() instanceof FromDeviceRequestMsg) {
+ requestId = ((FromDeviceRequestMsg) ruleMsg.getPayload()).getRequestId();
+ }
+ trustedCtx.reply(
+ new ResponsePluginToRuleMsg(ruleMsg.getUid(), tenantId, msg.getRuleId(),
+ BasicStatusCodeResponse.onError(responceMsgType, requestId, ex)));
+ }
} else {
//TODO: reply with plugin suspended message
}
diff --git a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
index 52d7d7d..479f424 100644
--- a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
@@ -180,7 +180,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
return scannedComponent;
}
- private NodeDefinition prepareNodeDefinition(RuleNode nodeAnnotation) throws IOException {
+ private NodeDefinition prepareNodeDefinition(RuleNode nodeAnnotation) throws Exception {
NodeDefinition nodeDefinition = new NodeDefinition();
nodeDefinition.setDetails(nodeAnnotation.nodeDetails());
nodeDefinition.setDescription(nodeAnnotation.nodeDescription());
@@ -188,9 +188,10 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
nodeDefinition.setOutEnabled(nodeAnnotation.outEnabled());
nodeDefinition.setRelationTypes(nodeAnnotation.relationTypes());
nodeDefinition.setCustomRelations(nodeAnnotation.customRelations());
- String defaultConfigResourceName = nodeAnnotation.defaultConfigResource();
- nodeDefinition.setDefaultConfiguration(mapper.readTree(
- Resources.toString(Resources.getResource(defaultConfigResourceName), Charsets.UTF_8)));
+ Class<? extends NodeConfiguration> configClazz = nodeAnnotation.configClazz();
+ NodeConfiguration config = configClazz.newInstance();
+ NodeConfiguration defaultConfiguration = config.defaultConfiguration();
+ nodeDefinition.setDefaultConfiguration(mapper.valueToTree(defaultConfiguration));
return nodeDefinition;
}
diff --git a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java
index 1ef805f..908faf2 100644
--- a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java
+++ b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java
@@ -187,7 +187,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
@Override
public void loadSystemRules() throws Exception {
- loadRules(Paths.get(dataDir, JSON_DIR, SYSTEM_DIR, RULES_DIR), null);
+// loadRules(Paths.get(dataDir, JSON_DIR, SYSTEM_DIR, RULES_DIR), null);
}
@Override
@@ -228,7 +228,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
"Raspberry Pi GPIO control sample application");
loadPlugins(Paths.get(dataDir, JSON_DIR, DEMO_DIR, PLUGINS_DIR), demoTenant.getId());
- loadRules(Paths.get(dataDir, JSON_DIR, DEMO_DIR, RULES_DIR), demoTenant.getId());
+// loadRules(Paths.get(dataDir, JSON_DIR, DEMO_DIR, RULES_DIR), demoTenant.getId());
loadDashboards(Paths.get(dataDir, JSON_DIR, DEMO_DIR, DASHBOARDS_DIR), demoTenant.getId(), null);
}
diff --git a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
index c3444d4..751bde6 100644
--- a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
+++ b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
@@ -19,6 +19,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.annotation.EnableScheduling;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import java.util.Arrays;
@@ -26,6 +27,7 @@ import java.util.Arrays;
@SpringBootConfiguration
@EnableAsync
@EnableSwagger2
+@EnableScheduling
@ComponentScan({"org.thingsboard.server"})
public class ThingsboardServerApplication {
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 2758521..fc04a19 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -181,6 +181,10 @@ cassandra:
default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
# Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
+ buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
+ concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
+ permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
+ rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}"
queue:
msg.ttl: 604800 # 7 days
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToServerRpcRequestMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToServerRpcRequestMsg.java
index ace51c0..87708a7 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToServerRpcRequestMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToServerRpcRequestMsg.java
@@ -16,16 +16,16 @@
package org.thingsboard.server.common.msg.core;
import lombok.Data;
-import org.thingsboard.server.common.msg.session.FromDeviceMsg;
+import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
import org.thingsboard.server.common.msg.session.MsgType;
/**
* @author Andrew Shvayka
*/
@Data
-public class ToServerRpcRequestMsg implements FromDeviceMsg {
+public class ToServerRpcRequestMsg implements FromDeviceRequestMsg {
- private final int requestId;
+ private final Integer requestId;
private final String method;
private final String params;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java b/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java
index 4f923fe..64ec718 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java
@@ -148,7 +148,7 @@ public class CassandraAssetDao extends CassandraAbstractSearchTextDao<AssetEntit
query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId));
query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.ASSET));
query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
- ResultSetFuture resultSetFuture = getSession().executeAsync(query);
+ ResultSetFuture resultSetFuture = executeAsyncRead(query);
return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() {
@Nullable
@Override
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java
index 932d6b9..8ae9dc8 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java
@@ -147,12 +147,12 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem
.and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
.and(eq(ATTRIBUTE_KEY_COLUMN, key));
log.debug("Remove request: {}", delete.toString());
- return getFuture(getSession().executeAsync(delete), rs -> null);
+ return getFuture(executeAsyncWrite(delete), rs -> null);
}
private PreparedStatement getSaveStmt() {
if (saveStmt == null) {
- saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF +
+ saveStmt = prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF +
"(" + ENTITY_TYPE_COLUMN +
"," + ENTITY_ID_COLUMN +
"," + ATTRIBUTE_TYPE_COLUMN +
diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java
index 27f7adc..fd02b5f 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java
@@ -244,12 +244,12 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo
values.add("?");
}
String statementString = INSERT_INTO + cfName + " (" + String.join(",", columnsList) + ") VALUES (" + values.toString() + ")";
- return getSession().prepare(statementString);
+ return prepare(statementString);
}
private PreparedStatement getPartitionInsertStmt() {
if (partitionInsertStmt == null) {
- partitionInsertStmt = getSession().prepare(INSERT_INTO + ModelConstants.AUDIT_LOG_BY_TENANT_ID_PARTITIONS_CF +
+ partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.AUDIT_LOG_BY_TENANT_ID_PARTITIONS_CF +
"(" + ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY +
"," + ModelConstants.AUDIT_LOG_PARTITION_PROPERTY + ")" +
" VALUES(?, ?)");
@@ -343,7 +343,7 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo
.where(eq(ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY, tenantId));
select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition));
select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition));
- return getSession().execute(select);
+ return executeRead(select);
}
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/component/CassandraBaseComponentDescriptorDao.java b/dao/src/main/java/org/thingsboard/server/dao/component/CassandraBaseComponentDescriptorDao.java
index 5e03545..b5b9f15 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/component/CassandraBaseComponentDescriptorDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/component/CassandraBaseComponentDescriptorDao.java
@@ -130,7 +130,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch
public boolean removeById(UUID key) {
Statement delete = QueryBuilder.delete().all().from(ModelConstants.COMPONENT_DESCRIPTOR_BY_ID).where(eq(ModelConstants.ID_PROPERTY, key));
log.debug("Remove request: {}", delete.toString());
- return getSession().execute(delete).wasApplied();
+ return executeWrite(delete).wasApplied();
}
@Override
@@ -145,7 +145,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch
log.debug("Delete plugin meta-data entity by id [{}]", clazz);
Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.COMPONENT_DESCRIPTOR_CLASS_PROPERTY, clazz));
log.debug("Remove request: {}", delete.toString());
- ResultSet resultSet = getSession().execute(delete);
+ ResultSet resultSet = executeWrite(delete);
log.debug("Delete result: [{}]", resultSet.wasApplied());
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java
index ac72ae8..641c464 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java
@@ -148,7 +148,7 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao<DeviceEnt
query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId));
query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.DEVICE));
query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
- ResultSetFuture resultSetFuture = getSession().executeAsync(query);
+ ResultSetFuture resultSetFuture = executeAsyncRead(query);
return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() {
@Nullable
@Override
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java
index c2f709f..ba186cc 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.model.type.*;
+import org.thingsboard.server.dao.util.BufferedRateLimiter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -33,16 +34,15 @@ public abstract class CassandraAbstractDao {
private ConcurrentMap<String, PreparedStatement> preparedStatementMap = new ConcurrentHashMap<>();
- protected PreparedStatement prepare(String query) {
- return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i));
- }
+ @Autowired
+ private BufferedRateLimiter rateLimiter;
private Session session;
private ConsistencyLevel defaultReadLevel;
private ConsistencyLevel defaultWriteLevel;
- protected Session getSession() {
+ private Session getSession() {
if (session == null) {
session = cluster.getSession();
defaultReadLevel = cluster.getDefaultReadConsistencyLevel();
@@ -59,6 +59,10 @@ public abstract class CassandraAbstractDao {
return session;
}
+ protected PreparedStatement prepare(String query) {
+ return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i));
+ }
+
private void registerCodecIfNotFound(CodecRegistry registry, TypeCodec<?> codec) {
try {
registry.codecFor(codec.getCqlType(), codec.getJavaType());
@@ -85,10 +89,7 @@ public abstract class CassandraAbstractDao {
private ResultSet execute(Statement statement, ConsistencyLevel level) {
log.debug("Execute cassandra statement {}", statement);
- if (statement.getConsistencyLevel() == null) {
- statement.setConsistencyLevel(level);
- }
- return getSession().execute(statement);
+ return executeAsync(statement, level).getUninterruptibly();
}
private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) {
@@ -96,6 +97,6 @@ public abstract class CassandraAbstractDao {
if (statement.getConsistencyLevel() == null) {
statement.setConsistencyLevel(level);
}
- return getSession().executeAsync(statement);
+ return new RateLimitedResultSetFuture(getSession(), rateLimiter, statement);
}
}
\ No newline at end of file
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java
index 7e87fa8..47d43ba 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java
@@ -63,7 +63,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
List<E> list = Collections.emptyList();
if (statement != null) {
statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
- ResultSet resultSet = getSession().execute(statement);
+ ResultSet resultSet = executeRead(statement);
Result<E> result = getMapper().map(resultSet);
if (result != null) {
list = result.all();
@@ -75,7 +75,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
protected ListenableFuture<List<D>> findListByStatementAsync(Statement statement) {
if (statement != null) {
statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
- ResultSetFuture resultSetFuture = getSession().executeAsync(statement);
+ ResultSetFuture resultSetFuture = executeAsyncRead(statement);
return Futures.transform(resultSetFuture, new Function<ResultSet, List<D>>() {
@Nullable
@Override
@@ -97,7 +97,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
E object = null;
if (statement != null) {
statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
- ResultSet resultSet = getSession().execute(statement);
+ ResultSet resultSet = executeRead(statement);
Result<E> result = getMapper().map(resultSet);
if (result != null) {
object = result.one();
@@ -109,7 +109,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
protected ListenableFuture<D> findOneByStatementAsync(Statement statement) {
if (statement != null) {
statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
- ResultSetFuture resultSetFuture = getSession().executeAsync(statement);
+ ResultSetFuture resultSetFuture = executeAsyncRead(statement);
return Futures.transform(resultSetFuture, new Function<ResultSet, D>() {
@Nullable
@Override
@@ -184,7 +184,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
public boolean removeById(UUID key) {
Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key));
log.debug("Remove request: {}", delete.toString());
- return getSession().execute(delete).wasApplied();
+ return executeWrite(delete).wasApplied();
}
@Override
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
new file mode 100644
index 0000000..2674c6d
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
@@ -0,0 +1,148 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.thingsboard.server.dao.util.AsyncRateLimiter;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.*;
+
+public class RateLimitedResultSetFuture implements ResultSetFuture {
+
+ private final ListenableFuture<ResultSetFuture> originalFuture;
+ private final ListenableFuture<Void> rateLimitFuture;
+
+ public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) {
+ this.rateLimitFuture = rateLimiter.acquireAsync();
+ this.originalFuture = Futures.transform(rateLimitFuture,
+ (Function<Void, ResultSetFuture>) i -> executeAsyncWithRelease(rateLimiter, session, statement));
+ }
+
+ @Override
+ public ResultSet getUninterruptibly() {
+ return safeGet().getUninterruptibly();
+ }
+
+ @Override
+ public ResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException {
+ long rateLimitStart = System.nanoTime();
+ ResultSetFuture resultSetFuture = null;
+ try {
+ resultSetFuture = originalFuture.get(timeout, unit);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+ long rateLimitDurationNano = System.nanoTime() - rateLimitStart;
+ long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano;
+ if (innerTimeoutNano > 0) {
+ return resultSetFuture.getUninterruptibly(innerTimeoutNano, TimeUnit.NANOSECONDS);
+ }
+ throw new TimeoutException("Timeout waiting for task.");
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (originalFuture.isDone()) {
+ return safeGet().cancel(mayInterruptIfRunning);
+ } else {
+ return originalFuture.cancel(mayInterruptIfRunning);
+ }
+ }
+
+ @Override
+ public boolean isCancelled() {
+ if (originalFuture.isDone()) {
+ return safeGet().isCancelled();
+ }
+
+ return originalFuture.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return originalFuture.isDone() && safeGet().isDone();
+ }
+
+ @Override
+ public ResultSet get() throws InterruptedException, ExecutionException {
+ return safeGet().get();
+ }
+
+ @Override
+ public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ long rateLimitStart = System.nanoTime();
+ ResultSetFuture resultSetFuture = originalFuture.get(timeout, unit);
+ long rateLimitDurationNano = System.nanoTime() - rateLimitStart;
+ long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano;
+ if (innerTimeoutNano > 0) {
+ return resultSetFuture.get(innerTimeoutNano, TimeUnit.NANOSECONDS);
+ }
+ throw new TimeoutException("Timeout waiting for task.");
+ }
+
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ originalFuture.addListener(() -> {
+ try {
+ ResultSetFuture resultSetFuture = Uninterruptibles.getUninterruptibly(originalFuture);
+ resultSetFuture.addListener(listener, executor);
+ } catch (CancellationException e) {
+ cancel(false);
+ return;
+ } catch (ExecutionException e) {
+ Futures.immediateFailedFuture(e).addListener(listener, executor);
+ }
+ }, executor);
+ }
+
+ private ResultSetFuture safeGet() {
+ try {
+ return originalFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private ResultSetFuture executeAsyncWithRelease(AsyncRateLimiter rateLimiter, Session session, Statement statement) {
+ try {
+ ResultSetFuture resultSetFuture = session.executeAsync(statement);
+ Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
+ @Override
+ public void onSuccess(@Nullable ResultSet result) {
+ rateLimiter.release();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ rateLimiter.release();
+ }
+ });
+ return resultSetFuture;
+ } catch (RuntimeException re) {
+ rateLimiter.release();
+ throw re;
+ }
+ }
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
index 9e25241..55838d6 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
@@ -242,7 +242,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
private PreparedStatement getSaveStmt() {
if (saveStmt == null) {
- saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
+ saveStmt = prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
"(" + ModelConstants.RELATION_FROM_ID_PROPERTY +
"," + ModelConstants.RELATION_FROM_TYPE_PROPERTY +
"," + ModelConstants.RELATION_TO_ID_PROPERTY +
@@ -257,7 +257,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
private PreparedStatement getDeleteStmt() {
if (deleteStmt == null) {
- deleteStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
+ deleteStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" +
AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?" +
AND + ModelConstants.RELATION_TO_ID_PROPERTY + " = ?" +
@@ -270,7 +270,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
private PreparedStatement getDeleteAllByEntityStmt() {
if (deleteAllByEntityStmt == null) {
- deleteAllByEntityStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
+ deleteAllByEntityStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" +
AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?");
}
@@ -279,7 +279,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
private PreparedStatement getFindAllByFromStmt() {
if (findAllByFromStmt == null) {
- findAllByFromStmt = getSession().prepare(SELECT_COLUMNS + " " +
+ findAllByFromStmt = prepare(SELECT_COLUMNS + " " +
FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -290,7 +290,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
private PreparedStatement getFindAllByFromAndTypeStmt() {
if (findAllByFromAndTypeStmt == null) {
- findAllByFromAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " +
+ findAllByFromAndTypeStmt = prepare(SELECT_COLUMNS + " " +
FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -303,7 +303,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
private PreparedStatement getFindAllByToStmt() {
if (findAllByToStmt == null) {
- findAllByToStmt = getSession().prepare(SELECT_COLUMNS + " " +
+ findAllByToStmt = prepare(SELECT_COLUMNS + " " +
FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " +
WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM +
AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -314,7 +314,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
private PreparedStatement getFindAllByToAndTypeStmt() {
if (findAllByToAndTypeStmt == null) {
- findAllByToAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " +
+ findAllByToAndTypeStmt = prepare(SELECT_COLUMNS + " " +
FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " +
WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM +
AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -327,7 +327,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
private PreparedStatement getCheckRelationStmt() {
if (checkRelationStmt == null) {
- checkRelationStmt = getSession().prepare(SELECT_COLUMNS + " " +
+ checkRelationStmt = prepare(SELECT_COLUMNS + " " +
FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
index 01f60f8..370d770 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
@@ -82,8 +82,9 @@ public class BaseRelationService implements RelationService {
}
@Caching(evict = {
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
})
@@ -95,8 +96,9 @@ public class BaseRelationService implements RelationService {
}
@Caching(evict = {
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
})
@@ -108,11 +110,11 @@ public class BaseRelationService implements RelationService {
}
@Caching(evict = {
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}"),
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}")
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
})
@Override
public boolean deleteRelation(EntityRelation relation) {
@@ -122,11 +124,11 @@ public class BaseRelationService implements RelationService {
}
@Caching(evict = {
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.from"),
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type}"),
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.to"),
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type}"),
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}")
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
})
@Override
public ListenableFuture<Boolean> deleteRelationAsync(EntityRelation relation) {
@@ -136,11 +138,11 @@ public class BaseRelationService implements RelationService {
}
@Caching(evict = {
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#from"),
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}"),
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#to"),
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}"),
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}")
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup}")
})
@Override
public boolean deleteRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
@@ -150,11 +152,11 @@ public class BaseRelationService implements RelationService {
}
@Caching(evict = {
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#from"),
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}"),
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#to"),
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}"),
- @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}")
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup}"),
+ @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup}")
})
@Override
public ListenableFuture<Boolean> deleteRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index d620e11..cda4b16 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -73,7 +73,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
private PreparedStatement partitionInsertStmt;
private PreparedStatement partitionInsertTtlStmt;
- private PreparedStatement[] latestInsertStmts;
+ private PreparedStatement latestInsertStmt;
private PreparedStatement[] saveStmts;
private PreparedStatement[] saveTtlStmts;
private PreparedStatement[] fetchStmts;
@@ -306,13 +306,15 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
@Override
public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
- DataType type = tsKvEntry.getDataType();
- BoundStatement stmt = getLatestStmt(type).bind()
+ BoundStatement stmt = getLatestStmt().bind()
.setString(0, entityId.getEntityType().name())
.setUUID(1, entityId.getId())
.setString(2, tsKvEntry.getKey())
- .setLong(3, tsKvEntry.getTs());
- addValue(tsKvEntry, stmt, 4);
+ .setLong(3, tsKvEntry.getTs())
+ .set(4, tsKvEntry.getBooleanValue().orElse(null), Boolean.class)
+ .set(5, tsKvEntry.getStrValue().orElse(null), String.class)
+ .set(6, tsKvEntry.getLongValue().orElse(null), Long.class)
+ .set(7, tsKvEntry.getDoubleValue().orElse(null), Double.class);
return getFuture(executeAsyncWrite(stmt), rs -> null);
}
@@ -381,7 +383,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
if (saveStmts == null) {
saveStmts = new PreparedStatement[DataType.values().length];
for (DataType type : DataType.values()) {
- saveStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
+ saveStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
"(" + ModelConstants.ENTITY_TYPE_COLUMN +
"," + ModelConstants.ENTITY_ID_COLUMN +
"," + ModelConstants.KEY_COLUMN +
@@ -398,7 +400,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
if (saveTtlStmts == null) {
saveTtlStmts = new PreparedStatement[DataType.values().length];
for (DataType type : DataType.values()) {
- saveTtlStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
+ saveTtlStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
"(" + ModelConstants.ENTITY_TYPE_COLUMN +
"," + ModelConstants.ENTITY_ID_COLUMN +
"," + ModelConstants.KEY_COLUMN +
@@ -420,7 +422,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
} else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
} else {
- fetchStmts[type.ordinal()] = getSession().prepare(SELECT_PREFIX +
+ fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
+ " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
@@ -435,26 +437,26 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return fetchStmts[aggType.ordinal()];
}
- private PreparedStatement getLatestStmt(DataType dataType) {
- if (latestInsertStmts == null) {
- latestInsertStmts = new PreparedStatement[DataType.values().length];
- for (DataType type : DataType.values()) {
- latestInsertStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF +
- "(" + ModelConstants.ENTITY_TYPE_COLUMN +
- "," + ModelConstants.ENTITY_ID_COLUMN +
- "," + ModelConstants.KEY_COLUMN +
- "," + ModelConstants.TS_COLUMN +
- "," + getColumnName(type) + ")" +
- " VALUES(?, ?, ?, ?, ?)");
- }
+ private PreparedStatement getLatestStmt() {
+ if (latestInsertStmt == null) {
+ latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF +
+ "(" + ModelConstants.ENTITY_TYPE_COLUMN +
+ "," + ModelConstants.ENTITY_ID_COLUMN +
+ "," + ModelConstants.KEY_COLUMN +
+ "," + ModelConstants.TS_COLUMN +
+ "," + ModelConstants.BOOLEAN_VALUE_COLUMN +
+ "," + ModelConstants.STRING_VALUE_COLUMN +
+ "," + ModelConstants.LONG_VALUE_COLUMN +
+ "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" +
+ " VALUES(?, ?, ?, ?, ?, ?, ?, ?)");
}
- return latestInsertStmts[dataType.ordinal()];
+ return latestInsertStmt;
}
private PreparedStatement getPartitionInsertStmt() {
if (partitionInsertStmt == null) {
- partitionInsertStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
+ partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
"(" + ModelConstants.ENTITY_TYPE_COLUMN +
"," + ModelConstants.ENTITY_ID_COLUMN +
"," + ModelConstants.PARTITION_COLUMN +
@@ -466,7 +468,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
private PreparedStatement getPartitionInsertTtlStmt() {
if (partitionInsertTtlStmt == null) {
- partitionInsertTtlStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
+ partitionInsertTtlStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
"(" + ModelConstants.ENTITY_TYPE_COLUMN +
"," + ModelConstants.ENTITY_ID_COLUMN +
"," + ModelConstants.PARTITION_COLUMN +
@@ -479,7 +481,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
private PreparedStatement getFindLatestStmt() {
if (findLatestStmt == null) {
- findLatestStmt = getSession().prepare(SELECT_PREFIX +
+ findLatestStmt = prepare(SELECT_PREFIX +
ModelConstants.KEY_COLUMN + "," +
ModelConstants.TS_COLUMN + "," +
ModelConstants.STRING_VALUE_COLUMN + "," +
@@ -496,7 +498,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
private PreparedStatement getFindAllLatestStmt() {
if (findAllLatestStmt == null) {
- findAllLatestStmt = getSession().prepare(SELECT_PREFIX +
+ findAllLatestStmt = prepare(SELECT_PREFIX +
ModelConstants.KEY_COLUMN + "," +
ModelConstants.TS_COLUMN + "," +
ModelConstants.STRING_VALUE_COLUMN + "," +
diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AsyncRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/AsyncRateLimiter.java
new file mode 100644
index 0000000..6fb21d6
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/util/AsyncRateLimiter.java
@@ -0,0 +1,25 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.util;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public interface AsyncRateLimiter {
+
+ ListenableFuture<Void> acquireAsync();
+
+ void release();
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
new file mode 100644
index 0000000..2acd623
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
@@ -0,0 +1,164 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.util;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Component
+@Slf4j
+@NoSqlDao
+public class BufferedRateLimiter implements AsyncRateLimiter {
+
+ private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
+
+ private final int permitsLimit;
+ private final int maxPermitWaitTime;
+ private final AtomicInteger permits;
+ private final BlockingQueue<LockedFuture> queue;
+
+ private final AtomicInteger maxQueueSize = new AtomicInteger();
+ private final AtomicInteger maxGrantedPermissions = new AtomicInteger();
+
+ public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit,
+ @Value("${cassandra.query.concurrent_limit}") int permitsLimit,
+ @Value("${cassandra.query.permit_max_wait_time}") int maxPermitWaitTime) {
+ this.permitsLimit = permitsLimit;
+ this.maxPermitWaitTime = maxPermitWaitTime;
+ this.permits = new AtomicInteger();
+ this.queue = new LinkedBlockingQueue<>(queueLimit);
+ }
+
+ @Override
+ public ListenableFuture<Void> acquireAsync() {
+ if (queue.isEmpty()) {
+ if (permits.incrementAndGet() <= permitsLimit) {
+ if (permits.get() > maxGrantedPermissions.get()) {
+ maxGrantedPermissions.set(permits.get());
+ }
+ return Futures.immediateFuture(null);
+ }
+ permits.decrementAndGet();
+ }
+
+ return putInQueue();
+ }
+
+ @Override
+ public void release() {
+ permits.decrementAndGet();
+ reprocessQueue();
+ }
+
+ private void reprocessQueue() {
+ while (permits.get() < permitsLimit) {
+ if (permits.incrementAndGet() <= permitsLimit) {
+ if (permits.get() > maxGrantedPermissions.get()) {
+ maxGrantedPermissions.set(permits.get());
+ }
+ LockedFuture lockedFuture = queue.poll();
+ if (lockedFuture != null) {
+ lockedFuture.latch.countDown();
+ } else {
+ permits.decrementAndGet();
+ break;
+ }
+ } else {
+ permits.decrementAndGet();
+ }
+ }
+ }
+
+ private LockedFuture createLockedFuture() {
+ CountDownLatch latch = new CountDownLatch(1);
+ ListenableFuture<Void> future = pool.submit(() -> {
+ latch.await();
+ return null;
+ });
+ return new LockedFuture(latch, future, System.currentTimeMillis());
+ }
+
+ private ListenableFuture<Void> putInQueue() {
+
+ int size = queue.size();
+ if (size > maxQueueSize.get()) {
+ maxQueueSize.set(size);
+ }
+
+ if (queue.remainingCapacity() > 0) {
+ try {
+ LockedFuture lockedFuture = createLockedFuture();
+ if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) {
+ lockedFuture.cancelFuture();
+ return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
+ }
+ if(permits.get() < permitsLimit) {
+ reprocessQueue();
+ }
+ return lockedFuture.future;
+ } catch (InterruptedException e) {
+ return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject"));
+ }
+ }
+ return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
+ }
+
+ @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
+ public void printStats() {
+ int expiredCount = 0;
+ for (LockedFuture lockedFuture : queue) {
+ if (lockedFuture.isExpired()) {
+ lockedFuture.cancelFuture();
+ expiredCount++;
+ }
+ }
+ log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0),
+ maxGrantedPermissions.getAndSet(0), expiredCount, permits.get());
+ }
+
+ private class LockedFuture {
+ final CountDownLatch latch;
+ final ListenableFuture<Void> future;
+ final long createTime;
+
+ public LockedFuture(CountDownLatch latch, ListenableFuture<Void> future, long createTime) {
+ this.latch = latch;
+ this.future = future;
+ this.createTime = createTime;
+ }
+
+ void cancelFuture() {
+ future.cancel(false);
+ latch.countDown();
+ }
+
+ boolean isExpired() {
+ return (System.currentTimeMillis() - createTime) > maxPermitWaitTime;
+ }
+
+ }
+
+
+}
diff --git a/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java
new file mode 100644
index 0000000..fa62c2b
--- /dev/null
+++ b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java
@@ -0,0 +1,156 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.exceptions.UnsupportedFeatureException;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import org.thingsboard.server.dao.util.AsyncRateLimiter;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RateLimitedResultSetFutureTest {
+
+ private RateLimitedResultSetFuture resultSetFuture;
+
+ @Mock
+ private AsyncRateLimiter rateLimiter;
+ @Mock
+ private Session session;
+ @Mock
+ private Statement statement;
+ @Mock
+ private ResultSetFuture realFuture;
+ @Mock
+ private ResultSet rows;
+ @Mock
+ private Row row;
+
+ @Test
+ public void doNotReleasePermissionIfRateLimitFutureFailed() throws InterruptedException {
+ when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new IllegalArgumentException()));
+ resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
+ Thread.sleep(1000L);
+ verify(rateLimiter).acquireAsync();
+ try {
+ assertTrue(resultSetFuture.isDone());
+ fail();
+ } catch (Exception e) {
+ assertTrue(e instanceof IllegalStateException);
+ Throwable actualCause = e.getCause();
+ assertTrue(actualCause instanceof ExecutionException);
+ }
+ verifyNoMoreInteractions(session, rateLimiter, statement);
+
+ }
+
+ @Test
+ public void getUninterruptiblyDelegateToCassandra() throws InterruptedException, ExecutionException {
+ when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
+ when(session.executeAsync(statement)).thenReturn(realFuture);
+ Mockito.doAnswer((Answer<Void>) invocation -> {
+ Object[] args = invocation.getArguments();
+ Runnable task = (Runnable) args[0];
+ task.run();
+ return null;
+ }).when(realFuture).addListener(Mockito.any(), Mockito.any());
+
+ when(realFuture.getUninterruptibly()).thenReturn(rows);
+
+ resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
+ ResultSet actual = resultSetFuture.getUninterruptibly();
+ assertSame(rows, actual);
+ verify(rateLimiter, times(1)).acquireAsync();
+ verify(rateLimiter, times(1)).release();
+ }
+
+ @Test
+ public void addListenerAllowsFutureTransformation() throws InterruptedException, ExecutionException {
+ when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
+ when(session.executeAsync(statement)).thenReturn(realFuture);
+ Mockito.doAnswer((Answer<Void>) invocation -> {
+ Object[] args = invocation.getArguments();
+ Runnable task = (Runnable) args[0];
+ task.run();
+ return null;
+ }).when(realFuture).addListener(Mockito.any(), Mockito.any());
+
+ when(realFuture.get()).thenReturn(rows);
+ when(rows.one()).thenReturn(row);
+
+ resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
+
+ ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
+ Row actualRow = transform.get();
+
+ assertSame(row, actualRow);
+ verify(rateLimiter, times(1)).acquireAsync();
+ verify(rateLimiter, times(1)).release();
+ }
+
+ @Test
+ public void immidiateCassandraExceptionReturnsPermit() throws InterruptedException, ExecutionException {
+ when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
+ when(session.executeAsync(statement)).thenThrow(new UnsupportedFeatureException(ProtocolVersion.V3, "hjg"));
+ resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
+ ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
+ try {
+ transform.get();
+ fail();
+ } catch (Exception e) {
+ assertTrue(e instanceof ExecutionException);
+ }
+ verify(rateLimiter, times(1)).acquireAsync();
+ verify(rateLimiter, times(1)).release();
+ }
+
+ @Test
+ public void queryTimeoutReturnsPermit() throws InterruptedException, ExecutionException {
+ when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
+ when(session.executeAsync(statement)).thenReturn(realFuture);
+ Mockito.doAnswer((Answer<Void>) invocation -> {
+ Object[] args = invocation.getArguments();
+ Runnable task = (Runnable) args[0];
+ task.run();
+ return null;
+ }).when(realFuture).addListener(Mockito.any(), Mockito.any());
+
+ when(realFuture.get()).thenThrow(new ExecutionException("Fail", new TimeoutException("timeout")));
+ resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
+ ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
+ try {
+ transform.get();
+ fail();
+ } catch (Exception e) {
+ assertTrue(e instanceof ExecutionException);
+ }
+ verify(rateLimiter, times(1)).acquireAsync();
+ verify(rateLimiter, times(1)).release();
+ }
+
+}
\ No newline at end of file
diff --git a/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java
new file mode 100644
index 0000000..5bfc3b6
--- /dev/null
+++ b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java
@@ -0,0 +1,134 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.util;
+
+import com.google.common.util.concurrent.*;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+
+
+public class BufferedRateLimiterTest {
+
+ @Test
+ public void finishedFutureReturnedIfPermitsAreGranted() {
+ BufferedRateLimiter limiter = new BufferedRateLimiter(10, 10, 100);
+ ListenableFuture<Void> actual = limiter.acquireAsync();
+ assertTrue(actual.isDone());
+ }
+
+ @Test
+ public void notFinishedFutureReturnedIfPermitsAreNotGranted() {
+ BufferedRateLimiter limiter = new BufferedRateLimiter(10, 1, 100);
+ ListenableFuture<Void> actual1 = limiter.acquireAsync();
+ ListenableFuture<Void> actual2 = limiter.acquireAsync();
+ assertTrue(actual1.isDone());
+ assertFalse(actual2.isDone());
+ }
+
+ @Test
+ public void failedFutureReturnedIfQueueIsfull() {
+ BufferedRateLimiter limiter = new BufferedRateLimiter(1, 1, 100);
+ ListenableFuture<Void> actual1 = limiter.acquireAsync();
+ ListenableFuture<Void> actual2 = limiter.acquireAsync();
+ ListenableFuture<Void> actual3 = limiter.acquireAsync();
+
+ assertTrue(actual1.isDone());
+ assertFalse(actual2.isDone());
+ assertTrue(actual3.isDone());
+ try {
+ actual3.get();
+ fail();
+ } catch (Exception e) {
+ assertTrue(e instanceof ExecutionException);
+ Throwable actualCause = e.getCause();
+ assertTrue(actualCause instanceof IllegalStateException);
+ assertEquals("Rate Limit Buffer is full. Reject", actualCause.getMessage());
+ }
+ }
+
+ @Test
+ public void releasedPermitTriggerTasksFromQueue() throws InterruptedException {
+ BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100);
+ ListenableFuture<Void> actual1 = limiter.acquireAsync();
+ ListenableFuture<Void> actual2 = limiter.acquireAsync();
+ ListenableFuture<Void> actual3 = limiter.acquireAsync();
+ ListenableFuture<Void> actual4 = limiter.acquireAsync();
+ assertTrue(actual1.isDone());
+ assertTrue(actual2.isDone());
+ assertFalse(actual3.isDone());
+ assertFalse(actual4.isDone());
+ limiter.release();
+ TimeUnit.MILLISECONDS.sleep(100L);
+ assertTrue(actual3.isDone());
+ assertFalse(actual4.isDone());
+ limiter.release();
+ TimeUnit.MILLISECONDS.sleep(100L);
+ assertTrue(actual4.isDone());
+ }
+
+ @Test
+ public void permitsReleasedInConcurrentMode() throws InterruptedException {
+ BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100);
+ AtomicInteger actualReleased = new AtomicInteger();
+ AtomicInteger actualRejected = new AtomicInteger();
+ ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
+ for (int i = 0; i < 100; i++) {
+ ListenableFuture<ListenableFuture<Void>> submit = pool.submit(limiter::acquireAsync);
+ Futures.addCallback(submit, new FutureCallback<ListenableFuture<Void>>() {
+ @Override
+ public void onSuccess(@Nullable ListenableFuture<Void> result) {
+ Futures.addCallback(result, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable Void result) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ limiter.release();
+ actualReleased.incrementAndGet();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ actualRejected.incrementAndGet();
+ }
+ });
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ }
+ });
+ }
+
+ TimeUnit.SECONDS.sleep(2);
+ assertTrue("Unexpected released count " + actualReleased.get(),
+ actualReleased.get() > 10 && actualReleased.get() < 20);
+ assertTrue("Unexpected rejected count " + actualRejected.get(),
+ actualRejected.get() > 80 && actualRejected.get() < 90);
+
+ }
+
+
+}
\ No newline at end of file
diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties
index 82fcbe1..737687f 100644
--- a/dao/src/test/resources/cassandra-test.properties
+++ b/dao/src/test/resources/cassandra-test.properties
@@ -47,3 +47,8 @@ cassandra.query.default_fetch_size=2000
cassandra.query.ts_key_value_partitioning=HOURS
cassandra.query.max_limit_per_request=1000
+cassandra.query.buffer_size=100000
+cassandra.query.concurrent_limit=1000
+cassandra.query.permit_max_wait_time=20000
+cassandra.query.rate_limit_print_interval_ms=30000
+
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/NodeConfiguration.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/NodeConfiguration.java
new file mode 100644
index 0000000..5e4c4b5
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/NodeConfiguration.java
@@ -0,0 +1,22 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+public interface NodeConfiguration {
+
+ NodeConfiguration defaultConfiguration();
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java
index f8e0fa2..1617034 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java
@@ -35,15 +35,16 @@ public @interface RuleNode {
String nodeDetails();
+ Class<? extends NodeConfiguration> configClazz();
+
boolean inEnabled() default true;
boolean outEnabled() default true;
ComponentScope scope() default ComponentScope.TENANT;
- String defaultConfigResource() default "EmptyNodeConfig.json";
-
String[] relationTypes() default {"Success", "Failure"};
boolean customRelations() default false;
+
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java
index c85d480..07b166d 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java
@@ -30,6 +30,7 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
@RuleNode(
type = ComponentType.FILTER,
name = "script", relationTypes = {"True", "False", "Failure"},
+ configClazz = TbJsFilterNodeConfiguration.class,
nodeDescription = "Filter incoming messages using JS script",
nodeDetails = "Evaluate incoming Message with configured JS condition. " +
"If <b>True</b> - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used." +
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java
index bf543e3..3b19c7c 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java
@@ -16,9 +16,17 @@
package org.thingsboard.rule.engine.filter;
import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
@Data
-public class TbJsFilterNodeConfiguration {
+public class TbJsFilterNodeConfiguration implements NodeConfiguration {
private String jsScript;
+
+ @Override
+ public TbJsFilterNodeConfiguration defaultConfiguration() {
+ TbJsFilterNodeConfiguration configuration = new TbJsFilterNodeConfiguration();
+ configuration.setJsScript("msg.passed < 15 && msg.name === 'Vit' && meta.temp == 10 && msg.bigObj.prop == 42;");
+ return configuration;
+ }
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
index faf97b4..c1236a4 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
@@ -31,6 +31,7 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
@RuleNode(
type = ComponentType.FILTER,
name = "switch", customRelations = true,
+ configClazz = TbJsSwitchNodeConfiguration.class,
nodeDescription = "Route incoming Message to one or multiple output chains",
nodeDetails = "Node executes configured JS script. Script should return array of next Chain names where Message should be routed. " +
"If Array is empty - message not routed to next Node. " +
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
index 331302d..b354c71 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
@@ -15,14 +15,29 @@
*/
package org.thingsboard.rule.engine.filter;
+import com.google.common.collect.Sets;
import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
import java.util.Set;
@Data
-public class TbJsSwitchNodeConfiguration {
+public class TbJsSwitchNodeConfiguration implements NodeConfiguration {
private String jsScript;
private Set<String> allowedRelations;
private boolean routeToAllWithNoCheck;
+
+ @Override
+ public TbJsSwitchNodeConfiguration defaultConfiguration() {
+ TbJsSwitchNodeConfiguration configuration = new TbJsSwitchNodeConfiguration();
+ configuration.setJsScript("function nextRelation(meta, msg) {\n" +
+ " return ['one','nine'];" +
+ "};\n" +
+ "\n" +
+ "nextRelation(meta, msg);");
+ configuration.setAllowedRelations(Sets.newHashSet("one", "two"));
+ configuration.setRouteToAllWithNoCheck(false);
+ return configuration;
+ }
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
index 7a6f9fd..3a86c25 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
@@ -28,6 +28,7 @@ import org.thingsboard.server.common.msg.TbMsg;
@RuleNode(
type = ComponentType.FILTER,
name = "message type",
+ configClazz = TbMsgTypeFilterNodeConfiguration.class,
nodeDescription = "Filter incoming messages by Message Type",
nodeDetails = "Evaluate incoming Message with configured JS condition. " +
"If incoming MessageType is expected - send Message via <b>Success</b> chain, otherwise <b>Failure</b> chain is used.")
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java
index 3b7ba90..a2e1b17 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java
@@ -16,15 +16,24 @@
package org.thingsboard.rule.engine.filter;
import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
/**
* Created by ashvayka on 19.01.18.
*/
@Data
-public class TbMsgTypeFilterNodeConfiguration {
+public class TbMsgTypeFilterNodeConfiguration implements NodeConfiguration {
private List<String> messageTypes;
+ @Override
+ public TbMsgTypeFilterNodeConfiguration defaultConfiguration() {
+ TbMsgTypeFilterNodeConfiguration configuration = new TbMsgTypeFilterNodeConfiguration();
+ configuration.setMessageTypes(Arrays.asList("GET_ATTRIBUTES","POST_ATTRIBUTES","POST_TELEMETRY","RPC_REQUEST"));
+ return configuration;
+ }
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
index 6228206..69ee9d7 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
@@ -38,6 +38,7 @@ import static org.thingsboard.server.common.data.DataConstants.*;
@Slf4j
@RuleNode(type = ComponentType.ENRICHMENT,
name = "originator attributes",
+ configClazz = TbGetAttributesNodeConfiguration.class,
nodeDescription = "Add Message Originator Attributes or Latest Telemetry into Message Metadata",
nodeDetails = "If Attributes enrichment configured, <b>CLIENT/SHARED/SERVER</b> attributes are added into Message metadata " +
"with specific prefix: <i>cs/shared/ss</i>. To access those attributes in other nodes this template can be used " +
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java
index ad92314..103b4de 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java
@@ -16,14 +16,16 @@
package org.thingsboard.rule.engine.metadata;
import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
+import java.util.Collections;
import java.util.List;
/**
* Created by ashvayka on 19.01.18.
*/
@Data
-public class TbGetAttributesNodeConfiguration {
+public class TbGetAttributesNodeConfiguration implements NodeConfiguration {
private List<String> clientAttributeNames;
private List<String> sharedAttributeNames;
@@ -31,4 +33,13 @@ public class TbGetAttributesNodeConfiguration {
private List<String> latestTsKeyNames;
+ @Override
+ public TbGetAttributesNodeConfiguration defaultConfiguration() {
+ TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration();
+ configuration.setClientAttributeNames(Collections.emptyList());
+ configuration.setSharedAttributeNames(Collections.emptyList());
+ configuration.setServerAttributeNames(Collections.emptyList());
+ configuration.setLatestTsKeyNames(Collections.emptyList());
+ return configuration;
+ }
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
index d85fb56..cc6d6a1 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
@@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
@RuleNode(
type = ComponentType.ENRICHMENT,
name="customer attributes",
+ configClazz = TbGetEntityAttrNodeConfiguration.class,
nodeDescription = "Add Originators Customer Attributes or Latest Telemetry into Message Metadata",
nodeDetails = "If Attributes enrichment configured, server scope attributes are added into Message metadata. " +
"To access those attributes in other nodes this template can be used " +
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java
index a5e85c5..5195115 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java
@@ -16,13 +16,25 @@
package org.thingsboard.rule.engine.metadata;
import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
+import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@Data
-public class TbGetEntityAttrNodeConfiguration {
+public class TbGetEntityAttrNodeConfiguration implements NodeConfiguration {
private Map<String, String> attrMapping;
private boolean isTelemetry = false;
+
+ @Override
+ public TbGetEntityAttrNodeConfiguration defaultConfiguration() {
+ TbGetEntityAttrNodeConfiguration configuration = new TbGetEntityAttrNodeConfiguration();
+ Map<String, String> attrMapping = new HashMap<>();
+ attrMapping.putIfAbsent("temperature", "tempo");
+ configuration.setAttrMapping(attrMapping);
+ configuration.setTelemetry(true);
+ return configuration;
+ }
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
index 26b7561..22c0b9f 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
@@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
@RuleNode(
type = ComponentType.ENRICHMENT,
name="related attributes",
+ configClazz = TbGetRelatedAttrNodeConfiguration.class,
nodeDescription = "Add Originators Related Entity Attributes or Latest Telemetry into Message Metadata",
nodeDetails = "Related Entity found using configured relation direction and Relation Type. " +
"If multiple Related Entities are found, only first Entity is used for attributes enrichment, other entities are discarded. " +
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java
index ae0b662..8211992 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java
@@ -16,11 +16,28 @@
package org.thingsboard.rule.engine.metadata;
import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
+import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
+import java.util.HashMap;
+import java.util.Map;
+
@Data
-public class TbGetRelatedAttrNodeConfiguration extends TbGetEntityAttrNodeConfiguration {
+public class TbGetRelatedAttrNodeConfiguration extends TbGetEntityAttrNodeConfiguration {
private String relationType;
private EntitySearchDirection direction;
+
+ @Override
+ public TbGetRelatedAttrNodeConfiguration defaultConfiguration() {
+ TbGetRelatedAttrNodeConfiguration configuration = new TbGetRelatedAttrNodeConfiguration();
+ Map<String, String> attrMapping = new HashMap<>();
+ attrMapping.putIfAbsent("temperature", "tempo");
+ configuration.setAttrMapping(attrMapping);
+ configuration.setTelemetry(true);
+ configuration.setRelationType(EntityRelation.CONTAINS_TYPE);
+ configuration.setDirection(EntitySearchDirection.FROM);
+ return configuration;
+ }
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
index 7d9c50b..b5f5e02 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
@@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
@RuleNode(
type = ComponentType.ENRICHMENT,
name="tenant attributes",
+ configClazz = TbGetEntityAttrNodeConfiguration.class,
nodeDescription = "Add Originators Tenant Attributes or Latest Telemetry into Message Metadata",
nodeDetails = "If Attributes enrichment configured, server scope attributes are added into Message metadata. " +
"To access those attributes in other nodes this template can be used " +
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java
index d237df8..40a647a 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java
@@ -36,6 +36,7 @@ import java.util.HashSet;
@RuleNode(
type = ComponentType.TRANSFORMATION,
name="change originator",
+ configClazz = TbChangeOriginatorNodeConfiguration.class,
nodeDescription = "Change Message Originator To Tenant/Customer/Related Entity",
nodeDetails = "Related Entity found using configured relation direction and Relation Type. " +
"If multiple Related Entities are found, only first Entity is used as new Originator, other entities are discarded. ")
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java
index cf03681..3370408 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java
@@ -16,12 +16,24 @@
package org.thingsboard.rule.engine.transform;
import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
+import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
@Data
-public class TbChangeOriginatorNodeConfiguration extends TbTransformNodeConfiguration{
+public class TbChangeOriginatorNodeConfiguration extends TbTransformNodeConfiguration implements NodeConfiguration {
private String originatorSource;
private EntitySearchDirection direction;
private String relationType;
+
+ @Override
+ public TbChangeOriginatorNodeConfiguration defaultConfiguration() {
+ TbChangeOriginatorNodeConfiguration configuration = new TbChangeOriginatorNodeConfiguration();
+ configuration.setOriginatorSource(TbChangeOriginatorNode.CUSTOMER_SOURCE);
+ configuration.setDirection(EntitySearchDirection.FROM);
+ configuration.setRelationType(EntityRelation.CONTAINS_TYPE);
+ configuration.setStartNewChain(false);
+ return configuration;
+ }
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java
index babdbc3..626790f 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java
@@ -27,6 +27,7 @@ import javax.script.Bindings;
@RuleNode(
type = ComponentType.TRANSFORMATION,
name = "script",
+ configClazz = TbTransformMsgNodeConfiguration.class,
nodeDescription = "Change Message payload and Metadata using JavaScript",
nodeDetails = "JavaScript function recieve 2 input parameters that can be changed inside.<br/> " +
"<code>meta</code> - is a Message metadata.<br/>" +
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java
index 9cc926b..4f9e9eb 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java
@@ -16,9 +16,18 @@
package org.thingsboard.rule.engine.transform;
import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
@Data
-public class TbTransformMsgNodeConfiguration extends TbTransformNodeConfiguration {
+public class TbTransformMsgNodeConfiguration extends TbTransformNodeConfiguration implements NodeConfiguration {
private String jsScript;
+
+ @Override
+ public TbTransformMsgNodeConfiguration defaultConfiguration() {
+ TbTransformMsgNodeConfiguration configuration = new TbTransformMsgNodeConfiguration();
+ configuration.setStartNewChain(false);
+ configuration.setJsScript("msg.passed = msg.passed * meta.temp; msg.bigObj.newProp = 'Ukraine' ");
+ return configuration;
+ }
}
ui/src/app/api/rule-chain.service.js 25(+15 -10)
diff --git a/ui/src/app/api/rule-chain.service.js b/ui/src/app/api/rule-chain.service.js
index f175535..ebc48fa 100644
--- a/ui/src/app/api/rule-chain.service.js
+++ b/ui/src/app/api/rule-chain.service.js
@@ -153,16 +153,21 @@ function RuleChainService($http, $q, $filter, types, componentDescriptorService)
return deferred.promise;
}
- function getRuleNodeSupportedLinks(nodeType) { //eslint-disable-line
- //TODO:
- var deferred = $q.defer();
- var linkLabels = [
- { name: 'Success', custom: false },
- { name: 'Fail', custom: false },
- { name: 'Custom', custom: true },
- ];
- deferred.resolve(linkLabels);
- return deferred.promise;
+ function getRuleNodeSupportedLinks(component) {
+ var relationTypes = component.configurationDescriptor.nodeDefinition.relationTypes;
+ var customRelations = component.configurationDescriptor.nodeDefinition.customRelations;
+ var linkLabels = [];
+ for (var i=0;i<relationTypes.length;i++) {
+ linkLabels.push({
+ name: relationTypes[i], custom: false
+ });
+ }
+ if (customRelations) {
+ linkLabels.push(
+ { name: 'Custom', custom: true }
+ );
+ }
+ return linkLabels;
}
function getRuleNodeComponents() {
ui/src/app/components/ace-editor-fix.js 45(+45 -0)
diff --git a/ui/src/app/components/ace-editor-fix.js b/ui/src/app/components/ace-editor-fix.js
new file mode 100644
index 0000000..f68767e
--- /dev/null
+++ b/ui/src/app/components/ace-editor-fix.js
@@ -0,0 +1,45 @@
+/*
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+export default function fixAceEditor(aceEditor) {
+ aceEditor.$blockScrolling = Infinity;
+ aceEditor.on("showGutterTooltip", function (tooltip) {
+ if (!tooltip.isAttachedToBody) {
+ document.body.appendChild(tooltip.$element); //eslint-disable-line
+ tooltip.isAttachedToBody = true;
+ onElementRemoved(tooltip.$parentNode, () => {
+ if (tooltip.$element.parentNode != null) {
+ tooltip.$element.parentNode.removeChild(tooltip.$element);
+ }
+ });
+ }
+ });
+}
+
+function onElementRemoved(element, callback) {
+ if (!document.body.contains(element)) { //eslint-disable-line
+ callback();
+ } else {
+ var observer;
+ observer = new MutationObserver(function(mutations) { //eslint-disable-line
+ if (!document.body.contains(element)) { //eslint-disable-line
+ callback();
+ observer.disconnect();
+ }
+ });
+ observer.observe(document.body, {childList: true}); //eslint-disable-line
+ }
+}
diff --git a/ui/src/app/components/js-func.directive.js b/ui/src/app/components/js-func.directive.js
index f95d003..33cebde 100644
--- a/ui/src/app/components/js-func.directive.js
+++ b/ui/src/app/components/js-func.directive.js
@@ -22,6 +22,8 @@ import thingsboardToast from '../services/toast';
import thingsboardUtils from '../common/utils.service';
import thingsboardExpandFullscreen from './expand-fullscreen.directive';
+import fixAceEditor from './ace-editor-fix';
+
/* eslint-disable import/no-unresolved, import/default */
import jsFuncTemplate from './js-func.tpl.html';
@@ -83,6 +85,7 @@ function JsFunc($compile, $templateCache, toast, utils, $translate) {
scope.js_editor.session.on("change", function () {
scope.cleanupJsErrors();
});
+ fixAceEditor(_ace);
}
};
ui/src/app/components/json-object-edit.directive.js 168(+168 -0)
diff --git a/ui/src/app/components/json-object-edit.directive.js b/ui/src/app/components/json-object-edit.directive.js
new file mode 100644
index 0000000..db0aa60
--- /dev/null
+++ b/ui/src/app/components/json-object-edit.directive.js
@@ -0,0 +1,168 @@
+/*
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import './json-object-edit.scss';
+
+import 'brace/ext/language_tools';
+import 'brace/mode/json';
+import 'ace-builds/src-min-noconflict/snippets/json';
+
+import fixAceEditor from './ace-editor-fix';
+
+/* eslint-disable import/no-unresolved, import/default */
+
+import jsonObjectEditTemplate from './json-object-edit.tpl.html';
+
+/* eslint-enable import/no-unresolved, import/default */
+
+export default angular.module('thingsboard.directives.jsonObjectEdit', [])
+ .directive('tbJsonObjectEdit', JsonObjectEdit)
+ .name;
+
+/*@ngInject*/
+function JsonObjectEdit($compile, $templateCache, $document, toast, utils) {
+
+ var linker = function (scope, element, attrs, ngModelCtrl) {
+ var template = $templateCache.get(jsonObjectEditTemplate);
+ element.html(template);
+
+ scope.label = attrs.label;
+
+ scope.objectValid = true;
+ scope.validationError = '';
+
+ scope.json_editor;
+
+ scope.onFullscreenChanged = function () {
+ updateEditorSize();
+ };
+
+ function updateEditorSize() {
+ if (scope.json_editor) {
+ scope.json_editor.resize();
+ scope.json_editor.renderer.updateFull();
+ }
+ }
+
+ scope.jsonEditorOptions = {
+ useWrapMode: true,
+ mode: 'json',
+ advanced: {
+ enableSnippets: true,
+ enableBasicAutocompletion: true,
+ enableLiveAutocompletion: true
+ },
+ onLoad: function (_ace) {
+ scope.json_editor = _ace;
+ scope.json_editor.session.on("change", function () {
+ scope.cleanupJsonErrors();
+ });
+ fixAceEditor(_ace);
+ }
+ };
+
+ scope.cleanupJsonErrors = function () {
+ toast.hide();
+ };
+
+ scope.updateValidity = function () {
+ ngModelCtrl.$setValidity('objectValid', scope.objectValid);
+ };
+
+ scope.$watch('contentBody', function (newVal, prevVal) {
+ if (!angular.equals(newVal, prevVal)) {
+ var object = scope.validate();
+ ngModelCtrl.$setViewValue(object);
+ scope.updateValidity();
+ }
+ });
+
+ ngModelCtrl.$render = function () {
+ var object = ngModelCtrl.$viewValue;
+ var content = '';
+ try {
+ if (object) {
+ content = angular.toJson(object, true);
+ }
+ } catch (e) {
+ //
+ }
+ scope.contentBody = content;
+ };
+
+ scope.showError = function (error) {
+ var toastParent = angular.element('#tb-json-panel', element);
+ toast.showError(error, toastParent, 'bottom left');
+ };
+
+ scope.validate = function () {
+ if (!scope.contentBody || !scope.contentBody.length) {
+ if (scope.required) {
+ scope.validationError = 'Json object is required.';
+ scope.objectValid = false;
+ } else {
+ scope.validationError = '';
+ scope.objectValid = true;
+ }
+ return null;
+ } else {
+ try {
+ var object = angular.fromJson(scope.contentBody);
+ scope.validationError = '';
+ scope.objectValid = true;
+ return object;
+ } catch (e) {
+ var details = utils.parseException(e);
+ var errorInfo = 'Error:';
+ if (details.name) {
+ errorInfo += ' ' + details.name + ':';
+ }
+ if (details.message) {
+ errorInfo += ' ' + details.message;
+ }
+ scope.validationError = errorInfo;
+ scope.objectValid = false;
+ return null;
+ }
+ }
+ };
+
+ scope.$on('form-submit', function () {
+ if (!scope.readonly) {
+ scope.cleanupJsonErrors();
+ if (!scope.objectValid) {
+ scope.showError(scope.validationError);
+ }
+ }
+ });
+
+ scope.$on('update-ace-editor-size', function () {
+ updateEditorSize();
+ });
+
+ $compile(element.contents())(scope);
+ }
+
+ return {
+ restrict: "E",
+ require: "^ngModel",
+ scope: {
+ required:'=ngRequired',
+ readonly:'=ngReadonly',
+ fillHeight:'=?'
+ },
+ link: linker
+ };
+}
ui/src/app/components/json-object-edit.scss 35(+35 -0)
diff --git a/ui/src/app/components/json-object-edit.scss b/ui/src/app/components/json-object-edit.scss
new file mode 100644
index 0000000..232d69a
--- /dev/null
+++ b/ui/src/app/components/json-object-edit.scss
@@ -0,0 +1,35 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+tb-json-object-edit {
+ position: relative;
+ .fill-height {
+ height: 100%;
+ }
+}
+
+.tb-json-object-panel {
+ margin-left: 15px;
+ border: 1px solid #C0C0C0;
+ height: 100%;
+ #tb-json-input {
+ min-width: 200px;
+ width: 100%;
+ height: 100%;
+ &:not(.fill-height) {
+ min-height: 200px;
+ }
+ }
+}
diff --git a/ui/src/app/components/json-object-edit.tpl.html b/ui/src/app/components/json-object-edit.tpl.html
new file mode 100644
index 0000000..ebab3c7
--- /dev/null
+++ b/ui/src/app/components/json-object-edit.tpl.html
@@ -0,0 +1,34 @@
+<!--
+
+ Copyright © 2016-2018 The Thingsboard Authors
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<div style="background: #fff;" ng-class="{'fill-height': fillHeight}" tb-expand-fullscreen fullscreen-zindex="100" expand-button-id="expand-button" on-fullscreen-changed="onFullscreenChanged()" layout="column">
+ <div layout="row" layout-align="start center">
+ <label class="tb-title no-padding"
+ ng-class="{'tb-required': required,
+ 'tb-readonly': readonly,
+ 'tb-error': !objectValid}">{{ label }}</label>
+ <span flex></span>
+ <md-button id="expand-button" aria-label="Fullscreen" class="md-icon-button tb-md-32 tb-fullscreen-button-style"></md-button>
+ </div>
+ <div flex id="tb-json-panel" class="tb-json-object-panel" layout="column">
+ <div flex id="tb-json-input" ng-class="{'fill-height': fillHeight}"
+ ng-readonly="readonly"
+ ui-ace="jsonEditorOptions"
+ ng-model="contentBody">
+ </div>
+ </div>
+</div>
diff --git a/ui/src/app/components/react/json-form-ace-editor.jsx b/ui/src/app/components/react/json-form-ace-editor.jsx
index 1c4c02e..5afd3d1 100644
--- a/ui/src/app/components/react/json-form-ace-editor.jsx
+++ b/ui/src/app/components/react/json-form-ace-editor.jsx
@@ -23,6 +23,8 @@ import FlatButton from 'material-ui/FlatButton';
import 'brace/ext/language_tools';
import 'brace/theme/github';
+import fixAceEditor from './../ace-editor-fix';
+
class ThingsboardAceEditor extends React.Component {
constructor(props) {
@@ -31,6 +33,7 @@ class ThingsboardAceEditor extends React.Component {
this.onBlur = this.onBlur.bind(this);
this.onFocus = this.onFocus.bind(this);
this.onTidy = this.onTidy.bind(this);
+ this.onLoad = this.onLoad.bind(this);
var value = props.value ? props.value + '' : '';
this.state = {
value: value,
@@ -72,6 +75,10 @@ class ThingsboardAceEditor extends React.Component {
}
}
+ onLoad(editor) {
+ fixAceEditor(editor);
+ }
+
render() {
const styles = reactCSS({
@@ -117,6 +124,7 @@ class ThingsboardAceEditor extends React.Component {
onChange={this.onValueChanged}
onFocus={this.onFocus}
onBlur={this.onBlur}
+ onLoad={this.onLoad}
name={this.props.form.title}
value={this.state.value}
readOnly={this.props.form.readonly}
diff --git a/ui/src/app/components/widget/widget-config.directive.js b/ui/src/app/components/widget/widget-config.directive.js
index d0ee6a3..4d4d958 100644
--- a/ui/src/app/components/widget/widget-config.directive.js
+++ b/ui/src/app/components/widget/widget-config.directive.js
@@ -23,6 +23,8 @@ import thingsboardJsonForm from '../json-form.directive';
import thingsboardManageWidgetActions from './action/manage-widget-actions.directive';
import 'angular-ui-ace';
+import fixAceEditor from './../ace-editor-fix';
+
import './widget-config.scss';
/* eslint-disable import/no-unresolved, import/default */
@@ -72,6 +74,9 @@ function WidgetConfig($compile, $templateCache, $rootScope, $translate, $timeout
enableSnippets: true,
enableBasicAutocompletion: true,
enableLiveAutocompletion: true
+ },
+ onLoad: function (_ace) {
+ fixAceEditor(_ace);
}
};
diff --git a/ui/src/app/extension/extensions-forms/extension-form-opc.directive.js b/ui/src/app/extension/extensions-forms/extension-form-opc.directive.js
index bf4886c..e1d2519 100644
--- a/ui/src/app/extension/extensions-forms/extension-form-opc.directive.js
+++ b/ui/src/app/extension/extensions-forms/extension-form-opc.directive.js
@@ -128,8 +128,8 @@ export default function ExtensionFormOpcDirective($compile, $templateCache, $tra
let addedFile = event.target.result;
if (addedFile && addedFile.length > 0) {
- model[options.fileName] = $file.name;
- model[options.file] = addedFile.replace(/^data.*base64,/, "");
+ model[options.location] = $file.name;
+ model[options.fileContent] = addedFile.replace(/^data.*base64,/, "");
}
}
@@ -142,8 +142,8 @@ export default function ExtensionFormOpcDirective($compile, $templateCache, $tra
scope.clearFile = function(model, options) {
scope.theForm.$setDirty();
- model[options.fileName] = null;
- model[options.file] = null;
+ model[options.location] = null;
+ model[options.fileContent] = null;
};
diff --git a/ui/src/app/extension/extensions-forms/extension-form-opc.tpl.html b/ui/src/app/extension/extensions-forms/extension-form-opc.tpl.html
index 501eeeb..5a7c00b 100644
--- a/ui/src/app/extension/extensions-forms/extension-form-opc.tpl.html
+++ b/ui/src/app/extension/extensions-forms/extension-form-opc.tpl.html
@@ -212,8 +212,8 @@
</md-input-container>
<section class="dropdown-section">
- <div class="tb-container" ng-class="{'ng-invalid':!server.keystore.file}">
- <span ng-init='fieldsToFill = {"fileName":"fileName", "file":"file"}'></span>
+ <div class="tb-container" ng-class="{'ng-invalid':!server.keystore.fileContent}">
+ <span ng-init='fieldsToFill = {"location":"location", "fileContent":"fileContent"}'></span>
<label class="tb-label" translate>extension.opc-keystore-location</label>
<div flow-init="{singleFile:true}" flow-file-added='fileAdded($file, server.keystore, fieldsToFill)' class="tb-file-select-container">
<div class="tb-file-clear-container">
@@ -231,14 +231,14 @@
class="file-input"
flow-btn id="dropFileKeystore_{{serverIndex}}"
name="keystoreFile"
- ng-model="server.keystore.file"
+ ng-model="server.keystore.fileContent"
>
</div>
</div>
</div>
<div class="dropdown-messages">
- <div ng-if="!server.keystore[fieldsToFill.fileName]" class="tb-error-message" translate>extension.no-file</div>
- <div ng-if="server.keystore[fieldsToFill.fileName]">{{server.keystore[fieldsToFill.fileName]}}</div>
+ <div ng-if="!server.keystore[fieldsToFill.location]" class="tb-error-message" translate>extension.no-file</div>
+ <div ng-if="server.keystore[fieldsToFill.location]">{{server.keystore[fieldsToFill.location]}}</div>
</div>
</section>
ui/src/app/layout/index.js 4(+3 -1)
diff --git a/ui/src/app/layout/index.js b/ui/src/app/layout/index.js
index e90334b..d397d14 100644
--- a/ui/src/app/layout/index.js
+++ b/ui/src/app/layout/index.js
@@ -29,6 +29,7 @@ import thingsboardNoAnimate from '../components/no-animate.directive';
import thingsboardOnFinishRender from '../components/finish-render.directive';
import thingsboardSideMenu from '../components/side-menu.directive';
import thingsboardDashboardAutocomplete from '../components/dashboard-autocomplete.directive';
+import thingsboardJsonObjectEdit from '../components/json-object-edit.directive';
import thingsboardUserMenu from './user-menu.directive';
@@ -90,7 +91,8 @@ export default angular.module('thingsboard.home', [
thingsboardNoAnimate,
thingsboardOnFinishRender,
thingsboardSideMenu,
- thingsboardDashboardAutocomplete
+ thingsboardDashboardAutocomplete,
+ thingsboardJsonObjectEdit
])
.config(HomeRoutes)
.controller('HomeController', HomeController)
ui/src/app/locale/locale.constant.js 1(+1 -0)
diff --git a/ui/src/app/locale/locale.constant.js b/ui/src/app/locale/locale.constant.js
index 360c828..cca2a11 100644
--- a/ui/src/app/locale/locale.constant.js
+++ b/ui/src/app/locale/locale.constant.js
@@ -1179,6 +1179,7 @@ export default angular.module('thingsboard.locale', [])
"delete": "Delete rule node",
"rulenode-details": "Rule node details",
"debug-mode": "Debug mode",
+ "configuration": "Configuration",
"link-details": "Rule node link details",
"add-link": "Add link",
"link-label": "Link label",
ui/src/app/rulechain/rulechain.controller.js 82(+41 -41)
diff --git a/ui/src/app/rulechain/rulechain.controller.js b/ui/src/app/rulechain/rulechain.controller.js
index d9bbf2f..b792f13 100644
--- a/ui/src/app/rulechain/rulechain.controller.js
+++ b/ui/src/app/rulechain/rulechain.controller.js
@@ -151,6 +151,9 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil,
},
'mouseLeave': function () {
destroyTooltips();
+ },
+ 'mouseDown': function () {
+ destroyTooltips();
}
}
};
@@ -226,16 +229,12 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil,
edgeDoubleClick: function (event, edge) {
var sourceNode = vm.modelservice.nodes.getNodeByConnectorId(edge.source);
if (sourceNode.component.type != types.ruleNodeType.INPUT.value) {
- ruleChainService.getRuleNodeSupportedLinks(sourceNode.component.clazz).then(
- (labels) => {
- vm.isEditingRuleNode = false;
- vm.editingRuleNode = null;
- vm.editingRuleNodeLinkLabels = labels;
- vm.isEditingRuleNodeLink = true;
- vm.editingRuleNodeLinkIndex = vm.ruleChainModel.edges.indexOf(edge);
- vm.editingRuleNodeLink = angular.copy(edge);
- }
- );
+ vm.isEditingRuleNode = false;
+ vm.editingRuleNode = null;
+ vm.editingRuleNodeLinkLabels = ruleChainService.getRuleNodeSupportedLinks(sourceNode.component);
+ vm.isEditingRuleNodeLink = true;
+ vm.editingRuleNodeLinkIndex = vm.ruleChainModel.edges.indexOf(edge);
+ vm.editingRuleNodeLink = angular.copy(edge);
}
},
nodeCallbacks: {
@@ -267,16 +266,10 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil,
deferred.resolve(edge);
}
} else {
- ruleChainService.getRuleNodeSupportedLinks(sourceNode.component.clazz).then(
- (labels) => {
- addRuleNodeLink(event, edge, labels).then(
- (link) => {
- deferred.resolve(link);
- },
- () => {
- deferred.reject();
- }
- );
+ var labels = ruleChainService.getRuleNodeSupportedLinks(sourceNode.component);
+ addRuleNodeLink(event, edge, labels).then(
+ (link) => {
+ deferred.resolve(link);
},
() => {
deferred.reject();
@@ -309,24 +302,19 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil,
y: 10+50*model.nodes.length,
connectors: []
};
- if (componentType == types.ruleNodeType.RULE_CHAIN.value) {
- node.connectors.push(
- {
- type: flowchartConstants.leftConnectorType,
- id: model.nodes.length
- }
- );
- } else {
+ if (ruleNodeComponent.configurationDescriptor.nodeDefinition.inEnabled) {
node.connectors.push(
{
type: flowchartConstants.leftConnectorType,
- id: model.nodes.length*2
+ id: model.nodes.length * 2
}
);
+ }
+ if (ruleNodeComponent.configurationDescriptor.nodeDefinition.outEnabled) {
node.connectors.push(
{
type: flowchartConstants.rightConnectorType,
- id: model.nodes.length*2+1
+ id: model.nodes.length * 2 + 1
}
);
}
@@ -398,17 +386,24 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil,
name: ruleNode.name,
nodeClass: vm.types.ruleNodeType[component.type].nodeClass,
icon: vm.types.ruleNodeType[component.type].icon,
- connectors: [
+ connectors: []
+ };
+ if (component.configurationDescriptor.nodeDefinition.inEnabled) {
+ node.connectors.push(
{
type: flowchartConstants.leftConnectorType,
id: vm.nextConnectorID++
- },
+ }
+ );
+ }
+ if (component.configurationDescriptor.nodeDefinition.outEnabled) {
+ node.connectors.push(
{
type: flowchartConstants.rightConnectorType,
id: vm.nextConnectorID++
}
- ]
- };
+ );
+ }
nodes.push(node);
vm.ruleChainModel.nodes.push(node);
}
@@ -590,6 +585,9 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil,
}
function addRuleNode($event, ruleNode) {
+
+ ruleNode.configuration = angular.copy(ruleNode.component.configurationDescriptor.nodeDefinition.defaultConfiguration);
+
$mdDialog.show({
controller: 'AddRuleNodeController',
controllerAs: 'vm',
@@ -601,13 +599,15 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil,
}).then(function (ruleNode) {
ruleNode.id = vm.nextNodeID++;
ruleNode.connectors = [];
- ruleNode.connectors.push(
- {
- id: vm.nextConnectorID++,
- type: flowchartConstants.leftConnectorType
- }
- );
- if (ruleNode.component.type != types.ruleNodeType.RULE_CHAIN.value) {
+ if (ruleNode.component.configurationDescriptor.nodeDefinition.inEnabled) {
+ ruleNode.connectors.push(
+ {
+ id: vm.nextConnectorID++,
+ type: flowchartConstants.leftConnectorType
+ }
+ );
+ }
+ if (ruleNode.component.configurationDescriptor.nodeDefinition.outEnabled) {
ruleNode.connectors.push(
{
id: vm.nextConnectorID++,
diff --git a/ui/src/app/rulechain/rulenode.directive.js b/ui/src/app/rulechain/rulenode.directive.js
index 998e998..be3e9c3 100644
--- a/ui/src/app/rulechain/rulenode.directive.js
+++ b/ui/src/app/rulechain/rulenode.directive.js
@@ -14,6 +14,8 @@
* limitations under the License.
*/
+import './rulenode.scss';
+
/* eslint-disable import/no-unresolved, import/default */
import ruleNodeFieldsetTemplate from './rulenode-fieldset.tpl.html';
ui/src/app/rulechain/rulenode.scss 22(+22 -0)
diff --git a/ui/src/app/rulechain/rulenode.scss b/ui/src/app/rulechain/rulenode.scss
new file mode 100644
index 0000000..febc637
--- /dev/null
+++ b/ui/src/app/rulechain/rulenode.scss
@@ -0,0 +1,22 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+.tb-rulenode {
+ tb-json-object-edit.tb-rule-node-configuration-json {
+ height: 300px;
+ display: block;
+ }
+}
\ No newline at end of file
diff --git a/ui/src/app/rulechain/rulenode.tpl.html b/ui/src/app/rulechain/rulenode.tpl.html
index 5a521a8..ffc8a0f 100644
--- a/ui/src/app/rulechain/rulenode.tpl.html
+++ b/ui/src/app/rulechain/rulenode.tpl.html
@@ -19,7 +19,7 @@
id="{{node.id}}"
ng-attr-style="position: absolute; top: {{ node.y }}px; left: {{ node.x }}px;"
ng-dblclick="callbacks.doubleClick($event, node)"
- ng-mouseover="callbacks.mouseOver($event, node)"
+ ng-mousedown="callbacks.mouseDown($event, node)"
ng-mouseenter="callbacks.mouseEnter($event, node)"
ng-mouseleave="callbacks.mouseLeave($event, node)">
<div class="tb-rule-node {{node.nodeClass}}">
diff --git a/ui/src/app/rulechain/rulenode-fieldset.tpl.html b/ui/src/app/rulechain/rulenode-fieldset.tpl.html
index 0d16e45..30cf075 100644
--- a/ui/src/app/rulechain/rulenode-fieldset.tpl.html
+++ b/ui/src/app/rulechain/rulenode-fieldset.tpl.html
@@ -38,6 +38,11 @@
ng-model="ruleNode.debugMode">{{ 'rulenode.debug-mode' | translate }}
</md-checkbox>
</md-input-container>
+ <tb-json-object-edit class="tb-rule-node-configuration-json" ng-model="ruleNode.configuration"
+ label="{{ 'rulenode.configuration' | translate }}"
+ ng-required="true"
+ fill-height="true">
+ </tb-json-object-edit>
<md-input-container class="md-block">
<label translate>rulenode.description</label>
<textarea ng-model="ruleNode.additionalInfo.description" rows="2"></textarea>
ui/src/scss/main.scss 18(+18 -0)
diff --git a/ui/src/scss/main.scss b/ui/src/scss/main.scss
index 93ff320..6aa662c 100644
--- a/ui/src/scss/main.scss
+++ b/ui/src/scss/main.scss
@@ -203,6 +203,12 @@ md-sidenav {
* THINGSBOARD SPECIFIC
***********************/
+$swift-ease-out-duration: 0.4s !default;
+$swift-ease-out-timing-function: cubic-bezier(0.25, 0.8, 0.25, 1) !default;
+
+$input-label-float-offset: 6px !default;
+$input-label-float-scale: 0.75 !default;
+
label {
&.tb-title {
pointer-events: none;
@@ -213,6 +219,18 @@ label {
&.no-padding {
padding-bottom: 0px;
}
+ &.tb-required:after {
+ content: ' *';
+ font-size: 13px;
+ vertical-align: top;
+ color: rgba(0,0,0,0.54);
+ }
+ &.tb-error {
+ color: rgb(221,44,0);
+ &.tb-required:after {
+ color: rgb(221,44,0);
+ }
+ }
}
}