diff --git a/docs/en/14-reference/03-connector/07-python.mdx b/docs/en/14-reference/03-connector/07-python.mdx
index a83f2047d0..b263af8ea6 100644
--- a/docs/en/14-reference/03-connector/07-python.mdx
+++ b/docs/en/14-reference/03-connector/07-python.mdx
@@ -362,7 +362,7 @@ By using the optional req_id parameter, you can specify a request ID that can be
##### TaosConnection class
-The `TaosConnection` class contains both an implementation of the PEP249 Connection interface (e.g., the `cursor()` method and the `close()` method) and many extensions (e.g., the `execute()`, `query()`, `schemaless_insert()`, and `subscribe()` methods).
+As the way to connect introduced above but add `req_id` argument.
```python title="execute method"
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:insert}}
@@ -372,13 +372,9 @@ The `TaosConnection` class contains both an implementation of the PEP249 Connect
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:query}}
```
-:::tip
-The queried results can only be fetched once. For example, only one of `fetch_all()` and `fetch_all_into_dict()` can be used in the example above. Repeated fetches will result in an empty list.
-:::
-
##### Use of TaosResult class
-In the above example of using the `TaosConnection` class, we have shown two ways to get the result of a query: `fetch_all()` and `fetch_all_into_dict()`. In addition, `TaosResult` also provides methods to iterate through the result set by rows (`rows_iter`) or by data blocks (`blocks_iter`). Using these two methods will be more efficient in scenarios where the query has a large amount of data.
+As the way to fetch data introduced above but add `req_id` argument.
```python title="blocks_iter method"
{{#include docs/examples/python/result_set_with_req_id_examples.py}}
@@ -391,17 +387,12 @@ The `TaosConnection` class and the `TaosResult` class already implement all the
{{#include docs/examples/python/cursor_usage_native_reference_with_req_id.py}}
```
-:::note
-The TaosCursor class uses native connections for write and query operations. In a client-side multi-threaded scenario, this cursor instance must remain thread exclusive and cannot be shared across threads for use, otherwise, it will result in errors in the returned results.
-
-:::
-
##### Use of TaosRestCursor class
-The `TaosRestCursor` class is an implementation of the PEP249 Cursor interface.
+As the way to connect introduced above but add `req_id` argument.
```python title="Use of TaosRestCursor"
{{#include docs/examples/python/connect_rest_with_req_id_examples.py:basic}}
@@ -421,8 +412,11 @@ The `RestClient` class is a direct wrapper for the [REST API](/reference/rest-ap
For a more detailed description of the `sql()` method, please refer to [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html).
+
+As the way to connect introduced above but add `req_id` argument.
+
```python
{{#include docs/examples/python/connect_websocket_with_req_id_examples.py:basic}}
```
diff --git a/docs/zh/08-connector/30-python.mdx b/docs/zh/08-connector/30-python.mdx
index 1cff142e11..1037d66f17 100644
--- a/docs/zh/08-connector/30-python.mdx
+++ b/docs/zh/08-connector/30-python.mdx
@@ -362,7 +362,7 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
##### TaosConnection 类的使用
-`TaosConnection` 类既包含对 PEP249 Connection 接口的实现(如:`cursor`方法和 `close` 方法),也包含很多扩展功能(如: `execute`、 `query`、`schemaless_insert` 和 `subscribe` 方法。
+类似上文介绍的使用方法,增加 `req_id` 参数。
```python title="execute 方法"
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:insert}}
@@ -372,13 +372,9 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:query}}
```
-:::tip
-查询结果只能获取一次。比如上面的示例中 `fetch_all()` 和 `fetch_all_into_dict()` 只能用一个。重复获取得到的结果为空列表。
-:::
-
##### TaosResult 类的使用
-上面 `TaosConnection` 类的使用示例中,我们已经展示了两种获取查询结果的方法: `fetch_all()` 和 `fetch_all_into_dict()`。除此之外 `TaosResult` 还提供了按行迭代(`rows_iter`)或按数据块迭代(`blocks_iter`)结果集的方法。在查询数据量较大的场景,使用这两个方法会更高效。
+类似上文介绍的使用方法,增加 `req_id` 参数。
```python title="blocks_iter 方法"
{{#include docs/examples/python/result_set_with_req_id_examples.py}}
@@ -391,14 +387,11 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
{{#include docs/examples/python/cursor_usage_native_reference_with_req_id.py}}
```
-:::note
-TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能跨线程共享使用,否则会导致返回结果出现错误。
-
-:::
-
+类似上文介绍的使用方法,增加 `req_id` 参数。
+
##### TaosRestCursor 类的使用
`TaosRestCursor` 类是对 PEP249 Cursor 接口的实现。
@@ -420,8 +413,11 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。
+
+类似上文介绍的使用方法,增加 `req_id` 参数。
+
```python
{{#include docs/examples/python/connect_websocket_with_req_id_examples.py:basic}}
```
diff --git a/docs/zh/27-train-faq/01-faq.md b/docs/zh/27-train-faq/01-faq.md
index bf46f3ca1f..15397049dd 100644
--- a/docs/zh/27-train-faq/01-faq.md
+++ b/docs/zh/27-train-faq/01-faq.md
@@ -247,10 +247,17 @@ launchctl limit maxfiles
该提示是创建 db 的 vnode 数量不够了,需要的 vnode 不能超过了 dnode 中 vnode 的上限。因为系统默认是一个 dnode 中有 CPU 核数两倍的 vnode,也可以通过配置文件中的参数 supportVnodes 控制。
正常调大 taos.cfg 中 supportVnodes 参数即可。
-### 21 【查询】在服务器上的使用 tao-CLI 能查到指定时间段的数据,但在客户端机器上查不到?
+### 21 在服务器上的使用 taos-CLI 能查到指定时间段的数据,但在客户端机器上查不到?
这种情况是因为客户端与服务器上设置的时区不一致导致的,调整客户端与服务器的时区一致即可解决。
-### 22 【表名】表名确认是存在的,但写入或查询时报表不存在错误,非常奇怪,什么原因?
+### 22 表名确认是存在的,但在写入或查询时返回表名不存在,什么原因?
TDengine 中的所有名称,包括数据库名、表名等都是区分大小写的,如果这些名称在程序或 taos-CLI 中没有使用反引号(`)括起来使用,即使你输入的是大写的,引擎也会转化成小写来使用,如果名称前后加上了反引号,引擎就不会再转化成小写,会保持原样来使用。
+### 23 在 taos-CLI 中查询,字段内容不能完全显示出来怎么办?
+可以使用 \G 参数来竖式显示,如 show databases\G; (为了输入方便,在"\"后加 TAB 键,会自动补全后面的内容)
+### 24 使用 taosBenchmark 测试工具写入数据查询很快,为什么我写入的数据查询非常慢?
+TDengine 在写入数据时如果有很严重的乱序写入问题,会严重影响查询性能,所以需要在写入前解决乱序的问题。如果业务是从 kafka 消费写入,请合理设计消费者,尽可能的一个子表数据由一个消费者去消费并写入,避免由设计产生的乱序。
+
+### 25 我想统计下前后两条写入记录之间的时间差值是多少?
+使用 DIFF 函数,可以查看时间列或数值列前后两条记录的差值,非常方便,详细说明见 SQL手册->函数->DIFF
diff --git a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h
index c035b6598c..422bcd57ac 100644
--- a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h
+++ b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h
@@ -158,6 +158,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetVgroupId(JN
*/
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableName(JNIEnv *, jobject, jlong);
+/*
+ * Class: com_taosdata_jdbc_tmq_TMQConnector
+ * Method: tmqGetOffset
+ * Signature: (J)Ljava/lang/String;
+ */
+JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNIEnv *, jobject, jlong);
+
/*
* Class: com_taosdata_jdbc_tmq_TMQConnector
* Method: fetchBlockImp
@@ -166,6 +173,12 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableNam
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *, jobject, jlong, jlong,
jobject, jobject);
+JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv *, jobject, jlong, jstring, jint,
+ jlong);
+
+JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong,
+ jstring, jobject);
+
#ifdef __cplusplus
}
#endif
diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c
index 894c51d13c..6ec82aa6ef 100644
--- a/source/client/src/clientTmqConnector.c
+++ b/source/client/src/clientTmqConnector.c
@@ -17,9 +17,16 @@
#include "jniCommon.h"
#include "taos.h"
-int __init_tmq = 0;
+int __init_tmq = 0;
jmethodID g_offsetCallback;
+jclass g_assignmentClass;
+jmethodID g_assignmentConstructor;
+jmethodID g_assignmentSetVgId;
+jmethodID g_assignmentSetCurrentOffset;
+jmethodID g_assignmentSetBegin;
+jmethodID g_assignmentSetEnd;
+
void tmqGlobalMethod(JNIEnv *env) {
// make sure init function executed once
switch (atomic_val_compare_exchange_32(&__init_tmq, 0, 1)) {
@@ -46,6 +53,38 @@ void tmqGlobalMethod(JNIEnv *env) {
jniDebug("tmq method register finished");
}
+int __init_assignment = 0;
+void tmqAssignmentMethod(JNIEnv *env) {
+ // make sure init function executed once
+ switch (atomic_val_compare_exchange_32(&__init_assignment, 0, 1)) {
+ case 0:
+ break;
+ case 1:
+ do {
+ taosMsleep(0);
+ } while (atomic_load_32(&__init_assignment) == 1);
+ case 2:
+ return;
+ }
+
+ if (g_vm == NULL) {
+ (*env)->GetJavaVM(env, &g_vm);
+ }
+
+ jclass assignment = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/Assignment");
+ g_assignmentClass = (*env)->NewGlobalRef(env, assignment);
+ g_assignmentConstructor = (*env)->GetMethodID(env, g_assignmentClass, "", "()V");
+ g_assignmentSetVgId = (*env)->GetMethodID(env, g_assignmentClass, "setVgId", "(I)V"); // int
+ g_assignmentSetCurrentOffset = (*env)->GetMethodID(env, g_assignmentClass, "setCurrentOffset", "(J)V"); // long
+ g_assignmentSetBegin = (*env)->GetMethodID(env, g_assignmentClass, "setBegin", "(J)V"); // long
+ g_assignmentSetEnd = (*env)->GetMethodID(env, g_assignmentClass, "setEnd", "(J)V"); // long
+
+ (*env)->DeleteLocalRef(env, assignment);
+
+ atomic_store_32(&__init_assignment, 2);
+ jniDebug("tmq method assignment finished");
+}
+
// deprecated
void commit_cb(tmq_t *tmq, int32_t code, void *param) {
JNIEnv *env = NULL;
@@ -266,8 +305,9 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
tmq_commit_async(tmq, res, commit_cb, consumer);
}
-JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
- jlong jres, jobject offset) {
+JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj,
+ jlong jtmq, jlong jres,
+ jobject offset) {
tmqGlobalMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
@@ -335,7 +375,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetVgroupId(JN
TAOS_RES *res = (TAOS_RES *)jres;
if (res == NULL) {
jniDebug("jobj:%p, invalid res handle", jobj);
- return -1;
+ return JNI_RESULT_SET_NULL;
}
return tmq_get_vgroup_id(res);
}
@@ -350,6 +390,15 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableNam
return (*env)->NewStringUTF(env, tmq_get_table_name(res));
}
+JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNIEnv *env, jobject jobj, jlong jres) {
+ TAOS_RES *res = (TAOS_RES *)jres;
+ if (res == NULL) {
+ jniDebug("jobj:%p, invalid res handle", jobj);
+ return JNI_RESULT_SET_NULL;
+ }
+ return tmq_get_vgroup_offset(res);
+}
+
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *env, jobject jobj, jlong con,
jlong res, jobject rowobj,
jobject arrayListObj) {
@@ -369,7 +418,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(
jniDebug("jobj:%p, conn:%p, resultset:%p, no data to retrieve", jobj, tscon, (void *)res);
return JNI_FETCH_END;
} else {
- jniError("jobj:%p, conn:%p, query interrupted, tmq fetch block error code:%d, msg:%s", jobj, tscon, error_code, taos_errstr(tres));
+ jniError("jobj:%p, conn:%p, query interrupted, tmq fetch block error code:%d, msg:%s", jobj, tscon, error_code,
+ taos_errstr(tres));
return JNI_RESULT_SET_NULL;
}
}
@@ -399,3 +449,72 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, jniFromNCharToByteArray(env, (char *)data, len));
return JNI_SUCCESS;
}
+
+JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv *env, jobject jobj, jlong jtmq,
+ jstring jtopic, jint partition,
+ jlong offset) {
+ tmq_t *tmq = (tmq_t *)jtmq;
+ if (tmq == NULL) {
+ jniDebug("jobj:%p, tmq is closed", jobj);
+ return TMQ_CONSUMER_NULL;
+ }
+
+ if (jtopic == NULL) {
+ jniDebug("jobj:%p, topic is null", jobj);
+ return TMQ_TOPIC_NULL;
+ }
+ const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
+
+ int32_t res = tmq_offset_seek(tmq, topicName, partition, offset);
+
+ if (res != TSDB_CODE_SUCCESS) {
+ jniError("jobj:%p, tmq seek error, code:%d, msg:%s", jobj, res, tmq_err2str(res));
+ }
+
+ (*env)->ReleaseStringUTFChars(env, jtopic, topicName);
+ return (jint)res;
+}
+
+JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *env, jobject jobj,
+ jlong jtmq, jstring jtopic,
+ jobject jarrayList) {
+ tmqAssignmentMethod(env);
+ tmq_t *tmq = (tmq_t *)jtmq;
+ if (tmq == NULL) {
+ jniDebug("jobj:%p, tmq is closed", jobj);
+ return TMQ_CONSUMER_NULL;
+ }
+
+ if (jtopic == NULL) {
+ jniDebug("jobj:%p, topic is null", jobj);
+ return TMQ_TOPIC_NULL;
+ }
+
+ const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
+
+ tmq_topic_assignment *pAssign = NULL;
+ int32_t numOfAssignment = 0;
+ int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment);
+
+ if (res != TSDB_CODE_SUCCESS) {
+ (*env)->ReleaseStringUTFChars(env, jtopic, topicName);
+ jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res,
+ tmq_err2str(res));
+ tmq_free_assignment(pAssign);
+ return (jint)res;
+ }
+
+ (*env)->ReleaseStringUTFChars(env, jtopic, topicName);
+
+ for (int i = 0; i < numOfAssignment; ++i) {
+ tmq_topic_assignment assignment = pAssign[i];
+ jobject jassignment = (*env)->NewObject(env, g_assignmentClass, g_assignmentConstructor);
+ (*env)->CallVoidMethod(env, jassignment, g_assignmentSetVgId, assignment.vgId);
+ (*env)->CallVoidMethod(env, jassignment, g_assignmentSetCurrentOffset, assignment.currentOffset);
+ (*env)->CallVoidMethod(env, jassignment, g_assignmentSetBegin, assignment.begin);
+ (*env)->CallVoidMethod(env, jassignment, g_assignmentSetEnd, assignment.end);
+ (*env)->CallBooleanMethod(env, jarrayList, g_arrayListAddFp, jassignment);
+ }
+ tmq_free_assignment(pAssign);
+ return JNI_SUCCESS;
+}
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index f8116f49ef..11760b5fd0 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -1243,7 +1243,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
streamProcessRunReq(pTask);
} else {
if (streamTaskShouldPause(&pTask->status)) {
- atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__INACTIVE);
+ atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
}
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
}
diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c
index b90e51f1bc..7cba6ddf0d 100644
--- a/source/libs/catalog/src/ctgDbg.c
+++ b/source/libs/catalog/src/ctgDbg.c
@@ -19,7 +19,7 @@
#include "trpc.h"
extern SCatalogMgmt gCtgMgmt;
-SCtgDebug gCTGDebug = {.statEnable = true};
+SCtgDebug gCTGDebug = {0};
#if 0