diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java
index a5415f0..8a70059 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.dao.util;
+import com.datastax.driver.core.*;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -22,6 +23,7 @@ import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.tools.TbRateLimits;
+import org.thingsboard.server.dao.nosql.CassandraStatementTask;
import javax.annotation.Nullable;
import java.util.UUID;
@@ -183,12 +185,39 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
private void logTask(String action, AsyncTaskContext<T, V> taskCtx) {
if (log.isTraceEnabled()) {
- log.trace("[{}] {} task: {}", taskCtx.getId(), action, taskCtx);
+ if (taskCtx.getTask() instanceof CassandraStatementTask) {
+ CassandraStatementTask cassStmtTask = (CassandraStatementTask) taskCtx.getTask();
+ if (cassStmtTask.getStatement() instanceof BoundStatement) {
+ BoundStatement stmt = (BoundStatement) cassStmtTask.getStatement();
+ String query = toStringWithValues(stmt, ProtocolVersion.V5);
+ log.trace("[{}] {} task: {}, BoundStatement query: {}", taskCtx.getId(), action, taskCtx, query);
+ }
+ } else {
+ log.trace("[{}] {} task: {}", taskCtx.getId(), action, taskCtx);
+ }
} else {
log.debug("[{}] {} task", taskCtx.getId(), action);
}
}
+ private static String toStringWithValues(BoundStatement boundStatement, ProtocolVersion protocolVersion) {
+ CodecRegistry codecRegistry = boundStatement.preparedStatement().getCodecRegistry();
+ PreparedStatement preparedStatement = boundStatement.preparedStatement();
+ String query = preparedStatement.getQueryString();
+ ColumnDefinitions defs = preparedStatement.getVariables();
+ int index = 0;
+ for (ColumnDefinitions.Definition def : defs) {
+ DataType type = def.getType();
+ TypeCodec<Object> codec = codecRegistry.codecFor(type);
+ if (boundStatement.getBytesUnsafe(index) != null) {
+ Object value = codec.deserialize(boundStatement.getBytesUnsafe(index), protocolVersion);
+ query = query.replaceFirst("\\?", codec.format(value));
+ }
+ index++;
+ }
+ return query;
+ }
+
protected int getQueueSize() {
return queue.size();
}