diff --git a/docs/en/29-roadmap/index.md b/docs/en/29-roadmap/index.md
new file mode 100644
index 0000000000..e79565ff40
--- /dev/null
+++ b/docs/en/29-roadmap/index.md
@@ -0,0 +1,14 @@
+---
+title: Product Roadmap
+---
+
+The 2025 roadmap for TDengine OSS is described in the following table.
+
+| Quarter | Feature |
+| :----- | :----- |
+| 2025Q1 |
- Virtual tables
- Query engine: conditional expressions in
REGEXP
, GREATEST
, LEAST
, and CAST
functions; improvements in single-row selection functions; time range interpolation with INTERP
- Storage engine: support for writing query results into supertables;
KEEP
parameter for supertables; performance improvements for the parameter binding interface - Stream processing: support for virtual tables; decreased compute resource usage; new mechanism for event notification; faster stream creation
- Data types: support for the decimal data type
- High availability: faster recovery from downtime; improved client failover
- Stability: LTS release TDengine 3.3.6.x
- JDBC driver: more efficient data ingestion
- Ecosystem: integration with Microsoft Excel
|
+| 2025Q2 | - Query engine: relaxed restrictions on
JOIN
queries; support for all mathematical functions in MySQL; integral, integral average, and continuous variance functions; optimization of the CSUM
function; support for COUNT(DISTINCT)
syntax; enhancements to event windows; faster filtering by tag; faster INTERP
queries - Storage engine: decreased compute resource usage for TSMAs; improved write jitter
- Stream processing: high availability of snodes
- Data types: support for the blob data type
- Data subscription: support for the MQTT protocol
- High availability: faster replica configuration changes; faster recovery from downtime for clusters; improved data recovery after power outage
- Observability: diagnostic tool for data ingestion
|
+| 2025Q3 | - Query engine: more subqueries; support for all operators in MySQL; support for all time functions in MySQL; improved window calculation; reduced jitter in query performance; support for specifying columns in count windows
- Storage engine: faster ingestion in SQL mode
- Observability: diagnostic tool for queries; improved
EXPLAIN
output; monitoring of long-running tasks
|
+| 2025Q4 | - Query engine: window functions (i.e. the
OVER
clause); support for all string, aggregation, and conditional functions in MySQL; sorting within groups for partition queries; controls for query resource usage; faster aggregate queries on subtables; time range interpolation in INTERVAL
windows - Data types: support for variable-length strings
- Caching: faster row-oriented caching
- Observability: more insight into operations and maintenance
|
+
+For more information, see [TDengine Public Roadmap](https://github.com/orgs/taosdata/projects/4).
diff --git a/docs/zh/05-basic/02-insert.md b/docs/zh/05-basic/02-insert.md
index b129fdbff1..c5d3096aab 100644
--- a/docs/zh/05-basic/02-insert.md
+++ b/docs/zh/05-basic/02-insert.md
@@ -111,7 +111,7 @@ TDengine 还支持直接向超级表写入数据。需要注意的是,超级
```sql
insert into meters (tbname, ts, current, voltage, phase, location, group_id)
-values( "d1001, "2018-10-03 14:38:05", 10.2, 220, 0.23, "California.SanFrancisco", 2)
+values("d1001", "2018-10-03 14:38:05", 10.2, 220, 0.23, "California.SanFrancisco", 2)
```
### 零代码写入
diff --git a/docs/zh/14-reference/03-taos-sql/02-database.md b/docs/zh/14-reference/03-taos-sql/02-database.md
index 35cb99a3dd..3742b7c571 100644
--- a/docs/zh/14-reference/03-taos-sql/02-database.md
+++ b/docs/zh/14-reference/03-taos-sql/02-database.md
@@ -67,7 +67,7 @@ database_option: {
- DURATION:数据文件存储数据的时间跨度。可以使用加单位的表示形式,如 DURATION 100h、DURATION 10d 等,支持 m(分钟)、h(小时)和 d(天)三个单位。不加时间单位时默认单位为天,如 DURATION 50 表示 50 天。
- MAXROWS:文件块中记录的最大条数,默认为 4096 条。
- MINROWS:文件块中记录的最小条数,默认为 100 条。
-- KEEP:表示数据文件保存的天数,缺省值为 3650,取值范围 [1, 365000],且必须大于或等于 3 倍的 DURATION 参数值。数据库会自动删除保存时间超过 KEEP 值的数据从而释放存储空间。KEEP 可以使用加单位的表示形式,如 KEEP 100h、KEEP 10d 等,支持 m(分钟)、h(小时)和 d(天)三个单位。也可以不写单位,如 KEEP 50,此时默认单位为天。企业版支持[多级存储](../../operation/planning/#%E5%A4%9A%E7%BA%A7%E5%AD%98%E5%82%A8)功能, 因此, 可以设置多个保存时间(多个以英文逗号分隔,最多 3 个,满足 keep 0 \<= keep 1 \<= keep 2,如 KEEP 100h,100d,3650d); 社区版不支持多级存储功能(即使配置了多个保存时间, 也不会生效, KEEP 会取最大的保存时间)。了解更多,请点击 [关于主键时间戳](https://docs.taosdata.com/reference/taos-sql/insert/)
+- KEEP:表示数据文件保存的天数,缺省值为 3650,取值范围 [1, 365000],且必须大于或等于 3 倍的 DURATION 参数值。数据库会自动删除保存时间超过 KEEP 值的数据从而释放存储空间。KEEP 可以使用加单位的表示形式,如 KEEP 100h、KEEP 10d 等,支持 m(分钟)、h(小时)和 d(天)三个单位。也可以不写单位,如 KEEP 50,此时默认单位为天。企业版支持[多级存储](https://docs.taosdata.com/operation/planning/#%E5%A4%9A%E7%BA%A7%E5%AD%98%E5%82%A8)功能, 因此, 可以设置多个保存时间(多个以英文逗号分隔,最多 3 个,满足 keep 0 \<= keep 1 \<= keep 2,如 KEEP 100h,100d,3650d); 社区版不支持多级存储功能(即使配置了多个保存时间, 也不会生效, KEEP 会取最大的保存时间)。了解更多,请点击 [关于主键时间戳](https://docs.taosdata.com/reference/taos-sql/insert/)
- KEEP_TIME_OFFSET:自 3.2.0.0 版本生效。删除或迁移保存时间超过 KEEP 值的数据的延迟执行时间,默认值为 0 (小时)。在数据文件保存时间超过 KEEP 后,删除或迁移操作不会立即执行,而会额外等待本参数指定的时间间隔,以实现与业务高峰期错开的目的。
- STT_TRIGGER:表示落盘文件触发文件合并的个数。对于少表高频写入场景,此参数建议使用默认配置;而对于多表低频写入场景,此参数建议配置较大的值。
diff --git a/docs/zh/29-roadmap/index.md b/docs/zh/29-roadmap/index.md
new file mode 100644
index 0000000000..a68e97a410
--- /dev/null
+++ b/docs/zh/29-roadmap/index.md
@@ -0,0 +1,14 @@
+---
+title: 产品路线图
+---
+
+TDengine OSS 之 2025 年年度路线图如下表所示。
+
+| 季度 | 功能 |
+| :----- | :----- |
+| 2025Q1 | - 虚拟表
- 查询能力:
REGEXP
、GREATEST
、LEAST
、CAST
函数支持判断表达式、单行选择函数的其他列值、INTERP
支持插值时间范围 - 存储能力:支持将查询结果写入超级表、超级表支持
KEEP
参数、STMT 写入性能提升 - 流计算:支持虚拟表、计算资源优化、事件通知机制、创建时间优化
- 数据类型:Decimal
- 高可用:加快宕机恢复时间、优化客户端 Failover 机制
- 稳定性:开始维护新的稳定版本 3.3.6.x
- JDBC:高效写入
- 生态工具:对接 Tableau
- 生态工具:对接 Excel
|
+| 2025Q2 | - 查询能力:大幅放宽关联查询限制、支持 MySQL 所有数学函数、支持积分/积分平均/连续方差函数、
CSUM
函数优化、COUNT(DISTINCT)
语法、事件窗口功能增强、提升标签过滤性能、提升 INTERP
查询性能 - 存储能力:TSMA 计算资源优化、写入抖动优化
- 流计算:节点高可用
- 数据类型:BLOB
- 数据订阅:支持 MQTT 协议
- 高可用:提高副本变更速度、提高集群宕机恢复速度、优化断电数据恢复机制
- 可观测性:写入诊断工具
- 生态工具:对接帆软 FineBI
|
+| 2025Q3 | - 查询能力:支持更多子查询类型、支持 MySQL 运算符、支持 MySQL 所有时间函数、窗口计算逻辑优化、查询性能抖动、计数窗口允许指定列
- 存储能力:提高 SQL 模式写入速度
- 可观测性:查询诊断工具、优化
EXPLAIN
输出、长任务观测
|
+| 2025Q4 | - 查询能力:窗口函数(
OVER
子句)、支持 MySQL 所有字符串/聚合/条件函数、Partition 支持组内排序、控制查询资源占用、提高子表聚合查询性能、INTERVAL
窗口支持插值时间范围 - 数据类型:支持不定长度字符串数据类型
- 数据缓存:提升按行缓存性能
- 可观测性:增强运维可观测性
|
+
+欲了解更多信息,请参见 [TDengine Public Roadmap](https://github.com/orgs/taosdata/projects/4) 。
diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h
index c934cb6961..2847f4278a 100644
--- a/include/common/tmsgcb.h
+++ b/include/common/tmsgcb.h
@@ -38,6 +38,7 @@ typedef enum {
STREAM_QUEUE,
ARB_QUEUE,
STREAM_CTRL_QUEUE,
+ STREAM_LONG_EXEC_QUEUE,
QUEUE_MAX,
} EQueueType;
diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h
index 883c5f7b99..988a650d6c 100644
--- a/include/libs/executor/executor.h
+++ b/include/libs/executor/executor.h
@@ -180,7 +180,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);
*/
int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode);
-int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode);
+int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration);
bool qTaskIsExecuting(qTaskInfo_t qinfo);
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index a4d89dcdcc..c4c0aaf742 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -58,6 +58,7 @@ extern "C" {
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
#define STREAM_EXEC_T_RESUME_TASK (-6)
#define STREAM_EXEC_T_ADD_FAILED_TASK (-7)
+#define STREAM_EXEC_T_STOP_ONE_TASK (-8)
typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue;
@@ -483,8 +484,10 @@ typedef struct STaskUpdateInfo {
} STaskUpdateInfo;
typedef struct SScanWalInfo {
- int32_t scanCounter;
+ int32_t scanSentinel;
tmr_h scanTimer;
+ int64_t lastScanTs;
+ int32_t tickCounter;
} SScanWalInfo;
typedef struct SFatalErrInfo {
@@ -752,15 +755,19 @@ void streamMetaCleanup();
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild expandFunc, FTaskExpand expandTaskFn, int32_t vgId,
int64_t stage, startComplete_fn_t fn, SStreamMeta** pMeta);
void streamMetaClose(SStreamMeta* streamMeta);
-int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
-int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);
+
+int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
+int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pKey);
+
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
+
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask);
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
+
void streamMetaClear(SStreamMeta* pMeta);
void streamMetaInitBackend(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
@@ -794,6 +801,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
+int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask);
int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts);
diff --git a/include/util/tworker.h b/include/util/tworker.h
index a3ba7dba6d..bc0dde1a37 100644
--- a/include/util/tworker.h
+++ b/include/util/tworker.h
@@ -76,7 +76,7 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue);
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool);
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool);
-STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp);
+STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp, int32_t minNum);
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue);
int32_t tWWorkerInit(SWWorkerPool *pool);
diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
index 84f5149624..9b4c11d6ae 100644
--- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
+++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
@@ -32,6 +32,7 @@ typedef struct SVnodeMgmt {
const char *name;
SQueryAutoQWorkerPool queryPool;
SAutoQWorkerPool streamPool;
+ SAutoQWorkerPool streamLongExecPool;
SWWorkerPool streamCtrlPool;
SWWorkerPool fetchPool;
SSingleWorker mgmtWorker;
@@ -75,6 +76,7 @@ typedef struct {
STaosQueue *pQueryQ;
STaosQueue *pStreamQ;
STaosQueue *pStreamCtrlQ;
+ STaosQueue *pStreamLongExecQ;
STaosQueue *pFetchQ;
STaosQueue *pMultiMgmQ;
} SVnodeObj;
@@ -137,6 +139,8 @@ int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
+int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
+
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
index 234d4f41e1..1dea7d3cad 100644
--- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
@@ -1008,27 +1008,29 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
- if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
- if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
- if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
- if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
- if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
- if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
- if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
- if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY, vmPutMsgToStreamLongExecQueue, 0) == NULL) goto _OVER;
+
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
index d71e0b02c4..6f30977e10 100644
--- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
@@ -398,10 +398,14 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains", pVnode->vgId,
pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ));
- while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
+ while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(50);
dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ);
- while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(10);
+ while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(50);
+
+ dInfo("vgId:%d, wait for vnode stream long-exec queue:%p is empty, %d remains", pVnode->vgId,
+ pVnode->pStreamLongExecQ, taosQueueItemSize(pVnode->pStreamLongExecQ));
+ while (!taosQueueEmpty(pVnode->pStreamLongExecQ)) taosMsleep(50);
dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
index b398bdf242..5acd06bbda 100644
--- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
@@ -150,7 +150,7 @@ static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_
SRpcMsg *pMsg = pItem;
const STraceId *trace = &pMsg->info.traceId;
- dGTrace("vgId:%d, msg:%p get from vnode-ctrl-stream queue", pVnode->vgId, pMsg);
+ dGTrace("vgId:%d, msg:%p get from vnode-stream-ctrl queue", pVnode->vgId, pMsg);
code = vnodeProcessStreamCtrlMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
terrno = code;
@@ -165,6 +165,26 @@ static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_
}
}
+static void vmProcessStreamLongExecQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
+ SVnodeObj *pVnode = pInfo->ahandle;
+ const STraceId *trace = &pMsg->info.traceId;
+ int32_t code = 0;
+
+ dGTrace("vgId:%d, msg:%p get from vnode-stream long-exec queue", pVnode->vgId, pMsg);
+
+ code = vnodeProcessStreamLongExecMsg(pVnode->pImpl, pMsg, pInfo);
+ if (code != 0) {
+ terrno = code;
+ dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
+ tstrerror(code));
+ vmSendRsp(pMsg, code);
+ }
+
+ dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
+ rpcFreeCont(pMsg->pCont);
+ taosFreeQitem(pMsg);
+}
+
static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
@@ -274,9 +294,13 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
code = taosWriteQitem(pVnode->pStreamQ, pMsg);
break;
case STREAM_CTRL_QUEUE:
- dGTrace("vgId:%d, msg:%p put into vnode-ctrl-stream queue", pVnode->vgId, pMsg);
+ dGTrace("vgId:%d, msg:%p put into vnode-stream-ctrl queue", pVnode->vgId, pMsg);
code = taosWriteQitem(pVnode->pStreamCtrlQ, pMsg);
break;
+ case STREAM_LONG_EXEC_QUEUE:
+ dGTrace("vgId:%d, msg:%p put into vnode-stream-long-exec queue", pVnode->vgId, pMsg);
+ code = taosWriteQitem(pVnode->pStreamLongExecQ, pMsg);
+ break;
case FETCH_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
@@ -335,6 +359,8 @@ int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMs
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_CTRL_QUEUE); }
+int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_LONG_EXEC_QUEUE); }
+
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId;
dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg);
@@ -409,6 +435,10 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
break;
case STREAM_CTRL_QUEUE:
size = taosQueueItemSize(pVnode->pStreamCtrlQ);
+ break;
+ case STREAM_LONG_EXEC_QUEUE:
+ size = taosQueueItemSize(pVnode->pStreamLongExecQ);
+ break;
default:
break;
}
@@ -451,13 +481,16 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
}
pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
- pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
+
+ // init stream msg processing queue family
+ pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue, 2);
pVnode->pStreamCtrlQ = tWWorkerAllocQueue(&pMgmt->streamCtrlPool, pVnode, (FItems)vmProcessStreamCtrlQueue);
+ pVnode->pStreamLongExecQ = tAutoQWorkerAllocQueue(&pMgmt->streamLongExecPool, pVnode, (FItem)vmProcessStreamLongExecQueue, 1);
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL ||
pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL
- || pVnode->pStreamCtrlQ == NULL) {
+ || pVnode->pStreamCtrlQ == NULL || pVnode->pStreamLongExecQ == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@@ -473,6 +506,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
taosQueueGetThreadId(pVnode->pFetchQ));
dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
+ dInfo("vgId:%d, stream-long-exec-queue:%p is alloced", pVnode->vgId, pVnode->pStreamLongExecQ);
dInfo("vgId:%d, stream-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamCtrlQ,
taosQueueGetThreadId(pVnode->pStreamCtrlQ));
return 0;
@@ -481,17 +515,22 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
+ tAutoQWorkerFreeQueue(&pMgmt->streamLongExecPool, pVnode->pStreamLongExecQ);
tWWorkerFreeQueue(&pMgmt->streamCtrlPool, pVnode->pStreamCtrlQ);
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
pVnode->pQueryQ = NULL;
+ pVnode->pFetchQ = NULL;
+
pVnode->pStreamQ = NULL;
pVnode->pStreamCtrlQ = NULL;
- pVnode->pFetchQ = NULL;
+ pVnode->pStreamLongExecQ = NULL;
+
dDebug("vgId:%d, queue is freed", pVnode->vgId);
}
int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
- int32_t code = 0;
+ int32_t code = 0;
+
SQueryAutoQWorkerPool *pQPool = &pMgmt->queryPool;
pQPool->name = "vnode-query";
pQPool->min = tsNumOfVnodeQueryThreads;
@@ -505,8 +544,13 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
pStreamPool->ratio = tsRatioOfVnodeStreamThreads;
if ((code = tAutoQWorkerInit(pStreamPool)) != 0) return code;
+ SAutoQWorkerPool *pLongExecPool = &pMgmt->streamLongExecPool;
+ pLongExecPool->name = "vnode-stream-long-exec";
+ pLongExecPool->ratio = tsRatioOfVnodeStreamThreads/3;
+ if ((code = tAutoQWorkerInit(pLongExecPool)) != 0) return code;
+
SWWorkerPool *pStreamCtrlPool = &pMgmt->streamCtrlPool;
- pStreamCtrlPool->name = "vnode-ctrl-stream";
+ pStreamCtrlPool->name = "vnode-stream-ctrl";
pStreamCtrlPool->max = 1;
if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code;
@@ -541,6 +585,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
void vmStopWorker(SVnodeMgmt *pMgmt) {
tQueryAutoQWorkerCleanup(&pMgmt->queryPool);
tAutoQWorkerCleanup(&pMgmt->streamPool);
+ tAutoQWorkerCleanup(&pMgmt->streamLongExecPool);
tWWorkerCleanup(&pMgmt->streamCtrlPool);
tWWorkerCleanup(&pMgmt->fetchPool);
dDebug("vnode workers are closed");
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index 220319122c..009cd1a108 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -2663,7 +2663,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
int64_t now = taosGetTimestampMs();
bool allReady = true;
SArray *pNodeSnapshot = NULL;
- int32_t maxAllowedTrans = 50;
+ int32_t maxAllowedTrans = 20;
int32_t numOfTrans = 0;
int32_t code = 0;
void *pIter = NULL;
@@ -2750,6 +2750,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
return TSDB_CODE_FAILED;
}
+ // todo: check for redundant consensus-checkpoint trans, if this kinds of trans repeatly failed.
code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h
index 50d75c4838..871a8c06e1 100644
--- a/source/dnode/vnode/inc/vnode.h
+++ b/source/dnode/vnode/inc/vnode.h
@@ -113,6 +113,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
+int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);
diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h
index 5bf0a9b199..6b3cf47d13 100644
--- a/source/dnode/vnode/src/inc/vnodeInt.h
+++ b/source/dnode/vnode/src/inc/vnodeInt.h
@@ -239,7 +239,7 @@ void tqClose(STQ*);
int tqPushMsg(STQ*, tmsg_t msgType);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
void tqUnregisterPushHandle(STQ* pTq, void* pHandle);
-int tqScanWalAsync(STQ* pTq, bool ckPause);
+void tqScanWalAsync(STQ* pTq);
int32_t tqStopStreamTasksAsync(STQ* pTq);
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c
index 7d83dbcf84..1f755f816e 100644
--- a/source/dnode/vnode/src/sma/smaRollup.c
+++ b/source/dnode/vnode/src/sma/smaRollup.c
@@ -1302,7 +1302,7 @@ _checkpoint:
}
streamMetaWLock(pMeta);
- if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
+ if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
streamMetaWUnLock(pMeta);
taosHashCancelIterate(pInfoHash, infoHash);
TSDB_CHECK_CODE(code, lino, _exit);
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index 119edb47bc..63727e5c45 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -920,12 +920,12 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
// now the fill-history task starts to scan data from wal files.
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
- if (code == TSDB_CODE_SUCCESS) {
- code = tqScanWalAsync(pTq, false);
- if (code) {
- tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code));
- }
- }
+// if (code == TSDB_CODE_SUCCESS) {
+// code = tqScanWalAsync(pTq, false);
+// if (code) {
+// tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code));
+// }
+// }
}
}
@@ -1113,23 +1113,14 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
// extracted submit data from wal files for all tasks
if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
return tqScanWal(pTq);
- }
+ } else {
+ code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
+ if (code) {
+ tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
+ }
- code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
- if (code) {
- tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}
-
- // let's continue scan data in the wal files
- if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) {
- code = tqScanWalAsync(pTq, false); // it's ok to failed
- if (code) {
- tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
- }
- }
-
- return code;
}
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c
index 2b2667773a..fc83343c99 100644
--- a/source/dnode/vnode/src/tq/tqPush.c
+++ b/source/dnode/vnode/src/tq/tqPush.c
@@ -49,20 +49,6 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
}
}
- streamMetaRLock(pTq->pStreamMeta);
- int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
- streamMetaRUnLock(pTq->pStreamMeta);
-
-// tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks);
-
- // push data for stream processing:
- // 1. the vnode has already been restored.
- // 2. the vnode should be the leader.
- // 3. the stream is not suspended yet.
- if ((!tsDisableStream) && (numOfTasks > 0)) {
- code = tqScanWalAsync(pTq, true);
- }
-
return code;
}
diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c
index 9ea84830f1..b34ea78f64 100644
--- a/source/dnode/vnode/src/tq/tqStreamTask.c
+++ b/source/dnode/vnode/src/tq/tqStreamTask.c
@@ -16,23 +16,19 @@
#include "tq.h"
#include "vnd.h"
-#define MAX_REPEAT_SCAN_THRESHOLD 3
-#define SCAN_WAL_IDLE_DURATION 500 // idle for 500ms to do next wal scan
+#define SCAN_WAL_IDLE_DURATION 250 // idle for 500ms to do next wal scan
+#define SCAN_WAL_WAIT_COUNT 2
typedef struct SBuildScanWalMsgParam {
int64_t metaId;
- int32_t numOfTasks;
- int8_t restored;
SMsgCb msgCb;
} SBuildScanWalMsgParam;
-static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta);
+static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
static bool taskReadyForDataFromWal(SStreamTask* pTask);
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
-static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration);
-static int32_t doScanWalAsync(STQ* pTq, bool ckPause);
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
int32_t tqScanWal(STQ* pTq) {
@@ -40,44 +36,55 @@ int32_t tqScanWal(STQ* pTq) {
int32_t vgId = pMeta->vgId;
int64_t st = taosGetTimestampMs();
int32_t numOfTasks = 0;
+ int64_t el = 0;
+ int32_t code = 0;
- tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);
-
- // check all tasks
- int32_t code = doScanWalForAllTasks(pMeta);
- if (code) {
- tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
+ int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanSentinel, 0, 1);
+ if (old == 0) {
+ tqDebug("vgId:%d try to scan wal to extract data", vgId);
+ } else {
+ tqDebug("vgId:%d already in wal scan, abort", vgId);
return code;
}
- streamMetaWLock(pMeta);
- int32_t times = (--pMeta->scanInfo.scanCounter);
- if (times < 0) {
- tqError("vgId:%d invalid scan counter:%d, reset to 0", vgId, times);
- times = 0;
+ // the scan wal interval less than 200, not scan, actually.
+ if ((pMeta->scanInfo.lastScanTs > 0) && (st - pMeta->scanInfo.lastScanTs < 200)) {
+ tqDebug("vgId:%d scan wal less than 200ms, do nothing", vgId);
+ atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
+ return code;
}
- numOfTasks = taosArrayGetSize(pMeta->pTaskList);
- streamMetaWUnLock(pMeta);
+ // check all tasks
+ code = doScanWalForAllTasks(pMeta, &numOfTasks);
- int64_t el = (taosGetTimestampMs() - st);
- tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 " ms", vgId, el);
+ pMeta->scanInfo.lastScanTs = taosGetTimestampMs();
+ el = (pMeta->scanInfo.lastScanTs - st);
- if (times > 0) {
- tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
- code = tqScanWalInFuture(pTq, numOfTasks, SCAN_WAL_IDLE_DURATION);
- if (code) {
- tqError("vgId:%d sched scan wal in %dms failed, ignore this failure", vgId, SCAN_WAL_IDLE_DURATION);
- }
+ if (code) {
+ tqError("vgId:%d failed to scan wal for all tasks, try next time, elapsed time:%" PRId64 "ms code:%s", vgId, el,
+ tstrerror(code));
+ } else {
+ tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms", vgId, el);
}
+ atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
return code;
}
-static void doStartScanWal(void* param, void* tmrId) {
- int32_t vgId = 0;
- int32_t code = 0;
+static bool waitEnoughDuration(SStreamMeta* pMeta) {
+ if ((++pMeta->scanInfo.tickCounter) >= SCAN_WAL_WAIT_COUNT) {
+ pMeta->scanInfo.tickCounter = 0;
+ return true;
+ }
+ return false;
+}
+
+static void doStartScanWal(void* param, void* tmrId) {
+ int32_t vgId = 0;
+ int32_t code = 0;
+ int32_t numOfTasks = 0;
+ tmr_h pTimer = NULL;
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
@@ -87,10 +94,18 @@ static void doStartScanWal(void* param, void* tmrId) {
return;
}
+ vgId = pMeta->vgId;
+ code = streamTimerGetInstance(&pTimer);
+ if (code) {
+ tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal, not scan wal, code:%s", vgId, tstrerror(code));
+ taosMemoryFree(pParam);
+ return;
+ }
+
if (pMeta->closeFlag) {
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
if (code == TSDB_CODE_SUCCESS) {
- tqDebug("vgId:%d jump out of scan wal timer since closed", vgId);
+ tqInfo("vgId:%d jump out of scan wal timer since closed", vgId);
} else {
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
tstrerror(code));
@@ -100,71 +115,100 @@ static void doStartScanWal(void* param, void* tmrId) {
return;
}
- vgId = pMeta->vgId;
+ if (pMeta->role != NODE_ROLE_LEADER) {
+ tqDebug("vgId:%d not leader, role:%d not scan wal anymore", vgId, pMeta->role);
- tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
- pParam->restored);
-#if 0
- // wait for the vnode is freed, and invalid read may occur.
+ code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
+ if (code == TSDB_CODE_SUCCESS) {
+ tqDebug("vgId:%d jump out of scan wal timer since not leader", vgId);
+ } else {
+ tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
+ tstrerror(code));
+ }
+
+ taosMemFree(pParam);
+ return;
+ }
+
+ if (pMeta->startInfo.startAllTasks) {
+ tqTrace("vgId:%d in restart procedure, not ready to scan wal", vgId);
+ goto _end;
+ }
+
+ if (!waitEnoughDuration(pMeta)) {
+ streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
+ "scan-wal");
+ code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
+ if (code) {
+ tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
+ tstrerror(code));
+ }
+ return;
+ }
+
+ streamMetaRLock(pMeta);
+ numOfTasks = taosArrayGetSize(pMeta->pTaskList);
+ streamMetaRUnLock(pMeta);
+
+ if (numOfTasks == 0) {
+ goto _end;
+ }
+
+ tqTrace("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks);
+
+ #if 0
+ // wait for the vnode is freed, and invalid read may occur.
taosMsleep(10000);
-#endif
+ #endif
code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
if (code) {
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
}
+_end:
+
+ streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal");
+ tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);
+
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
if (code) {
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
tstrerror(code));
}
-
- taosMemoryFree(pParam);
}
-int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
+void tqScanWalAsync(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
tmr_h pTimer = NULL;
SBuildScanWalMsgParam* pParam = NULL;
+ // 1. the vnode should be the leader.
+ // 2. the stream isn't disabled
+ if ((pMeta->role == NODE_ROLE_FOLLOWER) || tsDisableStream) {
+ tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId);
+ return;
+ }
+
pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
if (pParam == NULL) {
- return terrno;
+ tqError("vgId:%d failed to start scan wal, stream not executes, code:%s", vgId, tstrerror(code));
+ return;
}
pParam->metaId = pMeta->rid;
- pParam->numOfTasks = numOfTasks;
- pParam->restored = pTq->pVnode->restored;
pParam->msgCb = pTq->pVnode->msgCb;
code = streamTimerGetInstance(&pTimer);
if (code) {
- tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
+ tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
taosMemoryFree(pParam);
} else {
- streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut");
+ streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
+ "scan-wal");
}
-
- return code;
-}
-
-int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
- SStreamMeta* pMeta = pTq->pStreamMeta;
- bool alreadyRestored = pTq->pVnode->restored;
- int32_t code = 0;
-
- // do not launch the stream tasks, if it is a follower or not restored vnode.
- if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
- return TSDB_CODE_SUCCESS;
- }
-
- streamMetaWLock(pMeta);
- code = doScanWalAsync(pTq, ckPause);
- streamMetaWUnLock(pMeta);
- return code;
}
int32_t tqStopStreamTasksAsync(STQ* pTq) {
@@ -347,13 +391,10 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt
return code;
}
-int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
+int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks) {
int32_t vgId = pStreamMeta->vgId;
SArray* pTaskList = NULL;
- int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
- if (numOfTasks == 0) {
- return TSDB_CODE_SUCCESS;
- }
+ int32_t numOfTasks = 0;
// clone the task list, to avoid the task update during scan wal files
streamMetaWLock(pStreamMeta);
@@ -364,10 +405,13 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
return terrno;
}
- tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
-
// update the new task number
numOfTasks = taosArrayGetSize(pTaskList);
+ if (pNumOfTasks != NULL) {
+ *pNumOfTasks = numOfTasks;
+ }
+
+ tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pTaskId = taosArrayGet(pTaskList, i);
@@ -426,51 +470,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
return TSDB_CODE_SUCCESS;
}
-int32_t doScanWalAsync(STQ* pTq, bool ckPause) {
- SStreamMeta* pMeta = pTq->pStreamMeta;
- bool alreadyRestored = pTq->pVnode->restored;
- int32_t vgId = pMeta->vgId;
- int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
-
- if (numOfTasks == 0) {
- tqDebug("vgId:%d no stream tasks existed to run", vgId);
- return 0;
- }
-
- if (pMeta->startInfo.startAllTasks) {
- tqTrace("vgId:%d in restart procedure, not scan wal", vgId);
- return 0;
- }
-
- pMeta->scanInfo.scanCounter += 1;
- if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
- pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
- }
-
- if (pMeta->scanInfo.scanCounter > 1) {
- tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter);
- return 0;
- }
-
- int32_t numOfPauseTasks = pMeta->numOfPausedTasks;
- if (ckPause && numOfTasks == numOfPauseTasks) {
- tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);
-
- // reset the counter value, since we do not launch the scan wal operation.
- pMeta->scanInfo.scanCounter = 0;
- return 0;
- }
-
- tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
- numOfTasks, alreadyRestored);
-
- return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
-}
-
void streamMetaFreeTQDuringScanWalError(STQ* pTq) {
SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam));
p->metaId = pTq->pStreamMeta->rid;
- p->numOfTasks = 0;
doStartScanWal(p, 0);
}
\ No newline at end of file
diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c
index 197a45cdb9..6f001981fb 100644
--- a/source/dnode/vnode/src/tq/tqUtil.c
+++ b/source/dnode/vnode/src/tq/tqUtil.c
@@ -47,6 +47,10 @@ END:
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
SSyncState state = syncGetState(pTq->pVnode->sync);
streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
+
+ if (isLeader) {
+ tqScanWalAsync(pTq);
+ }
}
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c
index 77632d328e..2c48ada0fa 100644
--- a/source/dnode/vnode/src/tqCommon/tqCommon.c
+++ b/source/dnode/vnode/src/tqCommon/tqCommon.c
@@ -260,13 +260,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
// stream do update the nodeEp info, write it into stream meta.
if (updated) {
tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
- code = streamMetaSaveTask(pMeta, pTask);
+ code = streamMetaSaveTaskInMeta(pMeta, pTask);
if (code) {
tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
}
if (pHTask != NULL) {
- code = streamMetaSaveTask(pMeta, pHTask);
+ code = streamMetaSaveTaskInMeta(pMeta, pHTask);
if (code) {
tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
}
@@ -743,6 +743,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
}
streamMetaWUnLock(pMeta);
+ tqDebug("vgId:%d process drop task:0x%x completed", vgId, pReq->taskId);
+
return 0; // always return success
}
@@ -857,6 +859,9 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
return code;
+ } else if (type == STREAM_EXEC_T_STOP_ONE_TASK) {
+ code = streamMetaStopOneTask(pMeta, req.streamId, req.taskId);
+ return code;
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
@@ -938,10 +943,10 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
streamMetaWUnLock(pMeta);
- if (scanWal && (vgId != SNODE_HANDLE)) {
- tqDebug("vgId:%d start scan wal for executing tasks", vgId);
- code = tqScanWalAsync(pMeta->ahandle, true);
- }
+// if (scanWal && (vgId != SNODE_HANDLE)) {
+// tqDebug("vgId:%d start scan wal for executing tasks", vgId);
+// code = tqScanWalAsync(pMeta->ahandle, true);
+// }
return code;
}
@@ -1170,7 +1175,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
pTask->hTaskInfo.operatorOpen = false;
code = streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
- code = tqScanWalAsync((STQ*)handle, false);
+// code = tqScanWalAsync((STQ*)handle, false);
} else {
code = streamTrySchedExec(pTask);
}
diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c
index 6190f4b0a7..6562504988 100644
--- a/source/dnode/vnode/src/vnd/vnodeSvr.c
+++ b/source/dnode/vnode/src/vnd/vnodeSvr.c
@@ -945,8 +945,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
- case TDMT_VND_STREAM_SCAN_HISTORY:
- return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
case TDMT_VND_GET_STREAM_PROGRESS:
return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
default:
@@ -993,6 +991,22 @@ int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pIn
}
}
+int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
+ vTrace("vgId:%d, msg:%p in stream long exec queue is processing", pVnode->config.vgId, pMsg);
+ if (!syncIsReadyForRead(pVnode->sync)) {
+ vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
+ return 0;
+ }
+
+ switch (pMsg->msgType) {
+ case TDMT_VND_STREAM_SCAN_HISTORY:
+ return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
+ default:
+ vError("unknown msg type:%d in stream long exec queue", pMsg->msgType);
+ return TSDB_CODE_APP_ERROR;
+ }
+}
+
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
if (code) {
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index 364ab02c80..b6044eaacf 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -972,26 +972,43 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
return TSDB_CODE_SUCCESS;
}
-int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
+int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) {
+ int64_t st = taosGetTimestampMs();
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (pTaskInfo == NULL) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
- qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo));
+ if (waitDuration > 0) {
+ qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitDuration/1000.0);
+ } else {
+ qDebug("%s async killed execTask", GET_TASKID(pTaskInfo));
+ }
+
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
- while (1) {
- taosWLockLatch(&pTaskInfo->lock);
- if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again
- taosWUnLockLatch(&pTaskInfo->lock);
- taosMsleep(100);
- } else { // not running now
- pTaskInfo->code = rspCode;
- taosWUnLockLatch(&pTaskInfo->lock);
- return TSDB_CODE_SUCCESS;
+ if (waitDuration > 0) {
+ while (1) {
+ taosWLockLatch(&pTaskInfo->lock);
+ if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again
+ taosWUnLockLatch(&pTaskInfo->lock);
+
+ taosMsleep(200);
+
+ int64_t d = taosGetTimestampMs() - st;
+ if (d >= waitDuration && waitDuration >= 0) {
+ qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
+ return TSDB_CODE_SUCCESS;
+ }
+ } else { // not running now
+ pTaskInfo->code = rspCode;
+ taosWUnLockLatch(&pTaskInfo->lock);
+ return TSDB_CODE_SUCCESS;
+ }
}
}
+
+ return TSDB_CODE_SUCCESS;
}
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c
index eb448a13f5..b0f78c1aad 100644
--- a/source/libs/executor/src/projectoperator.c
+++ b/source/libs/executor/src/projectoperator.c
@@ -42,9 +42,7 @@ typedef struct SIndefOperatorInfo {
} SIndefOperatorInfo;
static int32_t doGenerateSourceData(SOperatorInfo* pOperator);
-static SSDataBlock* doProjectOperation1(SOperatorInfo* pOperator);
static int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
-static SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator);
static int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
static int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList);
static int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup,
@@ -557,12 +555,6 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp
}
}
-SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator) {
- SSDataBlock* pResBlock = NULL;
- pOperator->pTaskInfo->code = doApplyIndefinitFunction(pOperator, &pResBlock);
- return pResBlock;
-}
-
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_PARAM_CHECK(pResBlock);
SIndefOperatorInfo* pIndefInfo = pOperator->info;
diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c
index fa6008eba7..a2304a2e6c 100644
--- a/source/libs/executor/src/streameventwindowoperator.c
+++ b/source/libs/executor/src/streameventwindowoperator.c
@@ -234,6 +234,11 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW
pWin->ekey = pTsData[i];
pWinInfo->pWinFlag->endFlag = ends[i];
} else if (pWin->ekey == pTsData[i]) {
+ if (pWinInfo->pWinFlag->endFlag == true && ends[i] == false) {
+ (*pWinRow) = i + 1 - start;
+ *pRebuild = true;
+ goto _end;
+ }
pWinInfo->pWinFlag->endFlag |= ends[i];
} else if (ends[i] && !pWinInfo->pWinFlag->endFlag) {
*pRebuild = true;
diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c
index 5cab26b9d3..3b6c1963dc 100644
--- a/source/libs/executor/src/streamtimewindowoperator.c
+++ b/source/libs/executor/src/streamtimewindowoperator.c
@@ -479,14 +479,12 @@ void destroyFlusedppPos(void* ppRes) {
}
void clearGroupResInfo(SGroupResInfo* pGroupResInfo) {
- if (pGroupResInfo->freeItem) {
- int32_t size = taosArrayGetSize(pGroupResInfo->pRows);
- for (int32_t i = pGroupResInfo->index; i < size; i++) {
- void* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
- destroyFlusedPos(pPos);
- }
- pGroupResInfo->freeItem = false;
+ int32_t size = taosArrayGetSize(pGroupResInfo->pRows);
+ for (int32_t i = pGroupResInfo->index; i < size; i++) {
+ void* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
+ destroyFlusedPos(pPos);
}
+ pGroupResInfo->freeItem = false;
taosArrayDestroy(pGroupResInfo->pRows);
pGroupResInfo->pRows = NULL;
pGroupResInfo->index = 0;
diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c
index 41e3d40542..387010d0a6 100644
--- a/source/libs/stream/src/streamCheckpoint.c
+++ b/source/libs/stream/src/streamCheckpoint.c
@@ -734,7 +734,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
pTask->status.taskStatus = TASK_STATUS__READY;
- code = streamMetaSaveTask(pMeta, pTask);
+ code = streamMetaSaveTaskInMeta(pMeta, pTask);
streamMutexUnlock(&pTask->lock);
if (code != TSDB_CODE_SUCCESS) {
@@ -1584,6 +1584,11 @@ int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
streamFreeTaskState(pTask, p);
pTask->pBackend = NULL;
}
+
+ if (pTask->exec.pExecutor != NULL) {
+ qDestroyTask(pTask->exec.pExecutor);
+ pTask->exec.pExecutor = NULL;
+ }
return 0;
}
diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c
index 999a6b5a4c..054f88ec3d 100644
--- a/source/libs/stream/src/streamExec.c
+++ b/source/libs/stream/src/streamExec.c
@@ -880,7 +880,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
}
double el = (taosGetTimestampMs() - st) / 1000.0;
- if (el > 5.0) { // elapsed more than 5 sec, not occupy the CPU anymore
+ if (el > 2.0) { // elapsed more than 5 sec, not occupy the CPU anymore
stDebug("s-task:%s occupy more than 5.0s, release the exec threads and idle for 500ms", id);
streamTaskSetIdleInfo(pTask, 500);
return code;
diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c
index d973d7c05f..312ed85764 100644
--- a/source/libs/stream/src/streamHb.c
+++ b/source/libs/stream/src/streamHb.c
@@ -327,7 +327,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
pMeta->pHbInfo->hbStart = 0;
code = taosReleaseRef(streamMetaRefPool, rid);
if (code == TSDB_CODE_SUCCESS) {
- stDebug("vgId:%d jump out of meta timer", vgId);
+ stInfo("vgId:%d jump out of meta timer since closed", vgId);
} else {
stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
}
@@ -341,7 +341,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
if (code == TSDB_CODE_SUCCESS) {
stInfo("vgId:%d role:%d not leader not send hb to mnode", vgId, role);
} else {
- stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, vgId, role, rid);
+ stError("vgId:%d role:%d not leader not send hb to mnode, failed to release meta rid:%" PRId64, vgId, role, rid);
}
// taosMemoryFree(param);
return;
@@ -417,7 +417,7 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo) {
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) {
// wait for the stream meta hb function stopping
if (pMeta->role == NODE_ROLE_LEADER) {
- taosMsleep(2 * META_HB_CHECK_INTERVAL);
+ taosMsleep(3 * META_HB_CHECK_INTERVAL);
stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
}
}
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index ddd0201460..d5561ebe76 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -427,7 +427,10 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno);
- pMeta->scanInfo.scanCounter = 0;
+ pMeta->scanInfo.scanSentinel = 0;
+ pMeta->scanInfo.lastScanTs = 0;
+ pMeta->scanInfo.tickCounter = 0;
+
pMeta->vgId = vgId;
pMeta->ahandle = ahandle;
pMeta->buildTaskFn = buildTaskFn;
@@ -632,7 +635,7 @@ void streamMetaCloseImpl(void* arg) {
}
// todo let's check the status for each task
-int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
+int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t vgId = pTask->pMeta->vgId;
void* buf = NULL;
int32_t len;
@@ -682,7 +685,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
return code;
}
-int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) {
+int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pTaskId) {
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
if (code != 0) {
@@ -705,7 +708,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p != NULL) {
- stDebug("s-task:%" PRIx64 " already exist in meta, no need to register", id.taskId);
+ stDebug("s-task:0x%" PRIx64 " already exist in meta, no need to register", id.taskId);
tFreeStreamTask(pTask);
return code;
}
@@ -735,7 +738,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
return code;
}
- if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
+ if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
void* pUnused = taosArrayPop(pMeta->pTaskList);
@@ -885,6 +888,7 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
int32_t code = 0;
+
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
if (code) {
@@ -895,7 +899,7 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
// let's kill the query procedure within stream, to end it ASAP.
if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
- code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
+ code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, -1);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
}
@@ -932,7 +936,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
- code = streamMetaRemoveTask(pMeta, &id);
+ code = streamMetaRemoveTaskInMeta(pMeta, &id);
if (code) {
stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
}
@@ -963,6 +967,32 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
return 0;
}
+int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
+ SStreamTask* pTask = NULL;
+ int32_t code = 0;
+ int32_t vgId = pMeta->vgId;
+ int32_t numOfTasks = 0;
+
+ streamMetaWLock(pMeta);
+
+// code = streamMetaUnregisterTask(pMeta, streamId, taskId);
+// numOfTasks = streamMetaGetNumOfTasks(pMeta);
+// if (code) {
+// stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
+// }
+//
+// code = streamMetaCommit(pMeta);
+// if (code) {
+// stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
+// } else {
+// stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks);
+// }
+
+ streamMetaWUnLock(pMeta);
+
+ return code;
+}
+
int32_t streamMetaBegin(SStreamMeta* pMeta) {
streamMetaWLock(pMeta);
int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
@@ -1185,7 +1215,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (taosArrayGetSize(pRecycleList) > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
STaskId* pId = taosArrayGet(pRecycleList, i);
- code = streamMetaRemoveTask(pMeta, pId);
+ code = streamMetaRemoveTaskInMeta(pMeta, pId);
if (code) {
stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code));
}
@@ -1213,8 +1243,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
// wait for the stream meta hb function stopping
- streamMetaWaitForHbTmrQuit(pMeta);
pMeta->closeFlag = true;
+ streamMetaWaitForHbTmrQuit(pMeta);
stDebug("vgId:%d start to check all tasks for closing", vgId);
int64_t st = taosGetTimestampMs();
@@ -1253,6 +1283,12 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el);
+
+ if (pMeta->scanInfo.scanTimer != NULL) {
+ streamTmrStop(pMeta->scanInfo.scanTimer);
+ pMeta->scanInfo.scanTimer = NULL;
+ }
+
streamMetaRUnLock(pMeta);
}
@@ -1320,7 +1356,7 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
// mark the sign to send msg before close all tasks
// 1. for leader vnode, always send msg before closing
- // 2. for follower vnode, if it's is changed from leader, also sending msg before closing.
+ // 2. for follower vnode, if it's changed from leader, also sending msg before closing.
if (pMeta->role == NODE_ROLE_LEADER) {
pMeta->sendMsgBeforeClosing = true;
}
@@ -1330,11 +1366,11 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
if (isLeader) {
stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64,
- pMeta->vgId, prevStage, stage, isLeader, pMeta->rid);
+ pMeta->vgId, stage, prevStage, isLeader, pMeta->rid);
streamMetaStartHb(pMeta);
} else {
stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
- prevStage, stage, isLeader, pMeta->sendMsgBeforeClosing);
+ stage, prevStage, isLeader, pMeta->sendMsgBeforeClosing);
}
}
diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c
index 54a8929123..f8b1b5ecbc 100644
--- a/source/libs/stream/src/streamStartHistory.c
+++ b/source/libs/stream/src/streamStartHistory.c
@@ -76,7 +76,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
memcpy(serializedReq, &req, len);
SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
- return tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg);
+ return tmsgPutToQueue(pTask->pMsgCb, STREAM_LONG_EXEC_QUEUE, &rpcMsg);
}
void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c
index c40d5ef928..60c1694dda 100644
--- a/source/libs/stream/src/streamStartTask.c
+++ b/source/libs/stream/src/streamStartTask.c
@@ -451,7 +451,6 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
continue;
}
- int64_t refId = pTask->id.refId;
int32_t ret = streamTaskStop(pTask);
if (ret) {
stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c
index d27ed520c6..e4e8a37b37 100644
--- a/source/libs/stream/src/streamTask.c
+++ b/source/libs/stream/src/streamTask.c
@@ -703,7 +703,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
}
if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
- code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
+ code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code));
}
@@ -862,7 +862,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
pStreamTask->status.taskStatus = TASK_STATUS__READY;
}
- code = streamMetaSaveTask(pMeta, pStreamTask);
+ code = streamMetaSaveTaskInMeta(pMeta, pStreamTask);
streamMutexUnlock(&(pStreamTask->lock));
streamMetaReleaseTask(pMeta, pStreamTask);
@@ -1025,7 +1025,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
// in case of fill-history task, stop the tsdb file scan operation.
if (pTask->info.fillHistory == 1) {
void* pExecutor = pTask->exec.pExecutor;
- code = qKillTask(pExecutor, TSDB_CODE_SUCCESS);
+ code = qKillTask(pExecutor, TSDB_CODE_SUCCESS, 10000);
}
stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
@@ -1287,6 +1287,8 @@ const char* streamTaskGetExecType(int32_t type) {
return "resume-task-from-idle";
case STREAM_EXEC_T_ADD_FAILED_TASK:
return "record-start-failed-task";
+ case STREAM_EXEC_T_STOP_ONE_TASK:
+ return "stop-one-task";
case 0:
return "exec-all-tasks";
default:
diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c
index dbd8cb159e..469f98fcf0 100644
--- a/source/util/src/tworker.c
+++ b/source/util/src/tworker.c
@@ -256,7 +256,7 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
return NULL;
}
-STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp) {
+STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp, int32_t minNum) {
int32_t code;
STaosQueue *queue;
@@ -280,7 +280,10 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
int32_t queueNum = taosGetQueueNumber(pool->qset);
int32_t curWorkerNum = taosArrayGetSize(pool->workers);
int32_t dstWorkerNum = ceilf(queueNum * pool->ratio);
- if (dstWorkerNum < 2) dstWorkerNum = 2;
+
+ if (dstWorkerNum < minNum) {
+ dstWorkerNum = minNum;
+ }
// spawn a thread to process queue
while (curWorkerNum < dstWorkerNum) {
diff --git a/tests/ci/func.txt b/tests/ci/func.txt
index 45d4fb1c11..c724568537 100644
--- a/tests/ci/func.txt
+++ b/tests/ci/func.txt
@@ -79,7 +79,7 @@
(void)streamMetaAddFailedTask
(void)streamMetaAddTaskLaunchResult
(void)streamMetaCommit
-(void)streamMetaRemoveTask
+(void)streamMetaRemoveTaskInMeta
(void)streamMetaSendHbHelper
(void)streamMetaStartAllTasks
(void)streamMetaStartOneTask
diff --git a/tests/script/tsim/stream/tag.sim b/tests/script/tsim/stream/tag.sim
index f293f4ac05..9f4c62e747 100644
--- a/tests/script/tsim/stream/tag.sim
+++ b/tests/script/tsim/stream/tag.sim
@@ -26,7 +26,7 @@ sql insert into t1 values(1648791223000,0,1,1,1.0);
sql insert into t1 values(1648791223001,9,2,2,1.1);
sql insert into t1 values(1648791223009,0,3,3,1.0);
-sleep 300
+sleep 1000
sql select * from streamt;
if $data01 != 3 then