From 0162fb9e76a879a48dadc71ed1b3be94ceb2ec85 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 12 Jan 2024 10:46:03 +0800 Subject: [PATCH 01/11] set the timelineresmode to subquery timeline resmode --- source/libs/parser/src/parTranslater.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ea1a5f7d1d..e407cc1e81 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3158,7 +3158,11 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable) { ((SSelectStmt*)pTempTable->pSubquery)->isEmptyResult && isSelectStmt(pCxt->pCurrStmt)) { ((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true; } - + if (QUERY_NODE_SELECT_STMT == nodeType(pTempTable->pSubquery) && isSelectStmt(pCxt->pCurrStmt)) { + SSelectStmt* pSubStmt = (SSelectStmt*)pTempTable->pSubquery; + SSelectStmt* pCurrSmt = (SSelectStmt*)(pCxt->pCurrStmt); + pCurrSmt->timeLineResMode = pSubStmt->timeLineResMode; + } pTempTable->table.precision = getStmtPrecision(pTempTable->pSubquery); pTempTable->table.singleTable = stmtIsSingleTable(pTempTable->pSubquery); code = addNamespace(pCxt, pTempTable); From bc1eacf626e601004f0fe0e62660686a69032942 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Thu, 11 Jan 2024 15:48:07 +0800 Subject: [PATCH 02/11] add PARTITION_BEFORE_GROUP hint to use partition node before agg --- docs/en/12-taos-sql/06-select.md | 4 +++- docs/zh/12-taos-sql/06-select.md | 4 +++- include/common/ttokendef.h | 1 + include/libs/nodes/querynodes.h | 1 + source/libs/parser/src/parAstCreater.c | 24 +++++++++++++++++++ source/libs/parser/src/parTokenizer.c | 1 + source/libs/planner/inc/planInt.h | 1 + source/libs/planner/src/planOptimizer.c | 3 ++- source/libs/planner/src/planUtil.c | 13 ++++++++++ tests/system-test/2-query/partition_by_col.py | 10 ++++++++ 10 files changed, 59 insertions(+), 3 deletions(-) diff --git a/docs/en/12-taos-sql/06-select.md b/docs/en/12-taos-sql/06-select.md index 2c94787440..1fc6fb7e67 100755 --- a/docs/en/12-taos-sql/06-select.md +++ b/docs/en/12-taos-sql/06-select.md @@ -91,13 +91,15 @@ The list of currently supported Hints is as follows: | :-----------: | -------------- | -------------------------- | -----------------------------------| | BATCH_SCAN | None | Batch table scan | JOIN statment for stable | | NO_BATCH_SCAN | None | Sequential table scan | JOIN statment for stable | -| SORT_FOR_GROUP| None | Use sort for partition | With normal column in partition by list | +| SORT_FOR_GROUP| None | Use sort for partition, conflict with PARTITION_FIRST | With normal column in partition by list | +| PARTITION_FIRST| None | Use Partition before aggregate, conflict with SORT_FOR_GROUP | With normal column in partition by list | For example: ```sql SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts; SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1; +SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1; ``` ## Lists diff --git a/docs/zh/12-taos-sql/06-select.md b/docs/zh/12-taos-sql/06-select.md index 3c6e4f3bcf..2a7dff6f7d 100755 --- a/docs/zh/12-taos-sql/06-select.md +++ b/docs/zh/12-taos-sql/06-select.md @@ -91,13 +91,15 @@ Hints 是用户控制单个语句查询优化的一种手段,当 Hint 不适 | :-----------: | -------------- | -------------------------- | -----------------------------| | BATCH_SCAN | 无 | 采用批量读表的方式 | 超级表 JOIN 语句 | | NO_BATCH_SCAN | 无 | 采用顺序读表的方式 | 超级表 JOIN 语句 | -| SORT_FOR_GROUP| 无 | 采用sort方式进行分组 | partition by 列表有普通列时 | +| SORT_FOR_GROUP| 无 | 采用sort方式进行分组, 与PARTITION_FIRST冲突 | partition by 列表有普通列时 | +| PARTITION_FIRST| 无 | 在聚合之前使用PARTITION计算分组, 与SORT_FOR_GROUP冲突 | partition by 列表有普通列时 | 举例: ```sql SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts; SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1; +SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1; ``` ## 列表 diff --git a/include/common/ttokendef.h b/include/common/ttokendef.h index bdee3934fe..445fe0737b 100644 --- a/include/common/ttokendef.h +++ b/include/common/ttokendef.h @@ -373,6 +373,7 @@ #define TK_BATCH_SCAN 606 #define TK_NO_BATCH_SCAN 607 #define TK_SORT_FOR_GROUP 608 +#define TK_PARTITION_FIRST 609 #define TK_NK_NIL 65535 diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index fdf598153f..331679a3d3 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -126,6 +126,7 @@ typedef enum EHintOption { HINT_NO_BATCH_SCAN = 1, HINT_BATCH_SCAN, HINT_SORT_FOR_GROUP, + HINT_PARTITION_FIRST, } EHintOption; typedef struct SHintNode { diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index eb866c99aa..8e89ae1f53 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -371,6 +371,18 @@ SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken* return (SNode*)val; } +static bool hasHint(SNodeList* pHintList, EHintOption hint) { + if (!pHintList) return false; + SNode* pNode; + FOREACH(pNode, pHintList) { + SHintNode* pHint = (SHintNode*)pNode; + if (pHint->option == hint) { + return true; + } + } + return false; +} + bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOption opt, SToken* paramList, int32_t paramNum) { void* value = NULL; @@ -384,6 +396,10 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt } case HINT_SORT_FOR_GROUP: if (paramNum > 0) return true; + if (hasHint(*ppHintList, HINT_PARTITION_FIRST)) return true; + break; + case HINT_PARTITION_FIRST: + if (paramNum > 0 || hasHint(*ppHintList, HINT_SORT_FOR_GROUP)) return true; break; default: return true; @@ -455,6 +471,14 @@ SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral) { } opt = HINT_SORT_FOR_GROUP; break; + case TK_PARTITION_FIRST: + lastComma = false; + if (0 != opt || inParamList) { + quit = true; + break; + } + opt = HINT_PARTITION_FIRST; + break; case TK_NK_LP: lastComma = false; if (0 == opt || inParamList) { diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index 03a5317cd3..072892fe7f 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -170,6 +170,7 @@ static SKeyword keywordTable[] = { {"PAGES", TK_PAGES}, {"PAGESIZE", TK_PAGESIZE}, {"PARTITION", TK_PARTITION}, + {"PARTITION_FIRST", TK_PARTITION_FIRST}, {"PASS", TK_PASS}, {"PORT", TK_PORT}, {"PPS", TK_PPS}, diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index e2a4ded5a9..d4074e1373 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -47,6 +47,7 @@ int32_t validateQueryPlan(SPlanContext* pCxt, SQueryPlan* pPlan); bool getBatchScanOptionFromHint(SNodeList* pList); bool getSortForGroupOptHint(SNodeList* pList); +bool getOptHint(SNodeList* pList, EHintOption hint); SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr); int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes); bool isPartTableAgg(SAggLogicNode* pAgg); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 035377d22e..730dcd9352 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3997,7 +3997,8 @@ static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub } } return code; - } else if (pNode->node.pParent && nodeType(pNode->node.pParent) == QUERY_NODE_LOGIC_PLAN_AGG) { + } else if (pNode->node.pParent && nodeType(pNode->node.pParent) == QUERY_NODE_LOGIC_PLAN_AGG && + !getOptHint(pRootNode->pHint, HINT_PARTITION_FIRST)) { // Check if we can delete partition node SAggLogicNode* pAgg = (SAggLogicNode*)pNode->node.pParent; FOREACH(node, pNode->pPartitionKeys) { diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 2da270e42d..aeef3f2487 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -432,6 +432,7 @@ bool getBatchScanOptionFromHint(SNodeList* pList) { } bool getSortForGroupOptHint(SNodeList* pList) { + if (!pList) return false; SNode* pNode; FOREACH(pNode, pList) { SHintNode* pHint = (SHintNode*)pNode; @@ -442,6 +443,18 @@ bool getSortForGroupOptHint(SNodeList* pList) { return false; } +bool getOptHint(SNodeList* pList, EHintOption hint) { + if (!pList) return false; + SNode* pNode; + FOREACH(pNode, pList) { + SHintNode* pHint = (SHintNode*)pNode; + if (pHint->option == hint) { + return true; + } + } + return false; +} + int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) { int32_t code = TSDB_CODE_SUCCESS; SLogicNode* pCurr = (SLogicNode*)pNode; diff --git a/tests/system-test/2-query/partition_by_col.py b/tests/system-test/2-query/partition_by_col.py index feb8693e3e..549e2738be 100644 --- a/tests/system-test/2-query/partition_by_col.py +++ b/tests/system-test/2-query/partition_by_col.py @@ -169,6 +169,16 @@ class TDTestCase: self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) + sql = 'select count(*), c1 from meters partition by c1' + sql_hint = 'select /*+ sort_for_group() partition_first()*/ count(*), c1 from meters partition by c1' + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) + sql_hint = 'select /*+ partition_first()*/ count(*), c1 from meters partition by c1' + self.check_explain_res_has_row("Partition on", self.explain_sql(sql_hint)) + sql_hint = 'select /*+ partition_first() sort_for_group()*/ count(*), c1 from meters partition by c1' + self.check_explain_res_has_row("Partition on", self.explain_sql(sql_hint)) + sql_hint = 'select /*+ sort_for_group() partition_first()*/ count(*), c1 from meters partition by c1' + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) + def add_order_by(self, sql: str, order_by: str, select_list: str = "*") -> str: return "select %s from (%s)t order by %s" % (select_list, sql, order_by) From 00e2bdec23838ae8b3eeb8e0dd537fa5c3e849e5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 12 Jan 2024 19:08:46 +0800 Subject: [PATCH 03/11] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 8 +- source/libs/stream/src/streamMeta.c | 14 +- source/libs/stream/src/streamStart.c | 166 ++++++++++++++------- 4 files changed, 120 insertions(+), 70 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a115783b70..04694b05fd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -872,7 +872,7 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, +int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int64_t endTs, bool ready); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7883c858f0..5a92677462 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -526,7 +526,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe streamMetaRUnLock(pMeta); if (hasHistoryTask) { - streamMetaUpdateTaskDownstreamStatus(pMeta, fId.streamId, fId.taskId, initTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); } tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, @@ -539,7 +539,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; } - streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); return code; } @@ -553,13 +553,13 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe streamMetaRUnLock(pMeta); if (hasHistoryTask) { - streamMetaUpdateTaskDownstreamStatus(pMeta, fId.streamId, fId.taskId, initTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); } } else { streamMetaRUnLock(pMeta); } - streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", rsp.streamId, rsp.upstreamTaskId, vgId); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 68348f462f..ee2b70eebe 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1466,7 +1466,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); - streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, 0, now, false); + streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, 0, now, false); continue; } @@ -1487,7 +1487,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { streamLaunchFillHistoryTask(pTask); } - streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, true); + streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, true); streamMetaReleaseTask(pMeta, pTask); continue; } @@ -1497,10 +1497,10 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); code = ret; - streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, false); if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - streamMetaUpdateTaskDownstreamStatus(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); } } @@ -1519,7 +1519,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, taskId); - streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false); + streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false); return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } @@ -1538,10 +1538,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas if (ret != TSDB_CODE_SUCCESS) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); - streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->init, pInfo->start, false); if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - streamMetaUpdateTaskDownstreamStatus(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); } } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 24909a9776..27ed6af402 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -26,6 +26,7 @@ typedef struct SLaunchHTaskInfo { SStreamMeta* pMeta; STaskId id; + STaskId hTaskId; } SLaunchHTaskInfo; typedef struct STaskRecheckInfo { @@ -43,7 +44,8 @@ typedef struct STaskInitTs { static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); -static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); +static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t hStreamId, + int32_t hTaskId); static void tryLaunchHistoryTask(void* param, void* tmrId); static void doProcessDownstreamReadyRsp(SStreamTask* pTask); @@ -394,7 +396,7 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) { int64_t initTs = pTask->execInfo.init; int64_t startTs = pTask->execInfo.start; - streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true); + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true); // start the related fill-history task, when current task is ready // not invoke in success callback due to the deadlock. @@ -492,12 +494,12 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs int32_t startTs = pTask->execInfo.init; int64_t now = taosGetTimestampMs(); - streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false); + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false); // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false); + streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false); } } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); @@ -731,21 +733,27 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) static void tryLaunchHistoryTask(void* param, void* tmrId) { SLaunchHTaskInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; + int64_t now = taosGetTimestampMs(); streamMetaWLock(pMeta); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id)); - if (ppTask) { + if (ppTask != NULL) { ASSERT((*ppTask)->status.timerActive >= 1); if (streamTaskShouldStop(*ppTask)) { char* p = streamTaskGetStatus(*ppTask)->name; + int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d", (*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref); + streamMetaWUnLock(pMeta); + + // record the related fill-history task failed + STaskId* pHTaskId = &(*ppTask)->hTaskInfo.id; + streamMetaAddTaskLaunchResult(pMeta, pHTaskId->streamId, pHTaskId->taskId, 0, now, false); taosMemoryFree(pInfo); - streamMetaWUnLock(pMeta); return; } } @@ -764,8 +772,9 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - streamMetaReleaseTask(pMeta, pTask); + streamMetaAddTaskLaunchResult(pMeta, pInfo->id.streamId, pInfo->id.taskId, 0, now, false); + streamMetaReleaseTask(pMeta, pTask); stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d", pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref); @@ -798,91 +807,132 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, pHTaskInfo->retryTimes, ref); + streamMetaReleaseTask(pMeta, pTask); } } else { - stError("s-task:0x%x failed to load task, it may have been destroyed, not launch related fill-history task", + streamMetaAddTaskLaunchResult(pMeta, pInfo->id.streamId, pInfo->id.taskId, 0, now, false); + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stError("s-task:0x%x failed to load fill-history task, it may have been destroyed, not launch fill-history task", (int32_t)pInfo->id.taskId); } taosMemoryFree(pInfo); } -SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { +SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t hStreamId, + int32_t hTaskId) { SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo)); if (pInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pInfo->id.taskId = taskId; pInfo->id.streamId = streamId; + pInfo->id.taskId = taskId; + + pInfo->hTaskId.streamId = hStreamId; + pInfo->hTaskId.taskId = hTaskId; + pInfo->pMeta = pMeta; return pInfo; } -// an fill history task needs to be started. -int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { - SStreamMeta* pMeta = pTask->pMeta; - - int64_t streamId = pTask->hTaskInfo.id.streamId; - int32_t hTaskId = pTask->hTaskInfo.id.taskId; +static int32_t handleNotBuiltFillHistoryTask(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + STaskExecStatisInfo* pExecInfo = &pTask->execInfo; + const char* idStr = pTask->id.idStr; + int64_t hStreamId = pTask->hTaskInfo.id.streamId; + int32_t hTaskId = pTask->hTaskInfo.id.taskId; ASSERT(hTaskId != 0); - SStreamTaskState* pStatus = streamTaskGetStatus(pTask); - if (pStatus->state != TASK_STATUS__READY) { - STaskExecStatisInfo* pInfo = &pTask->execInfo; - stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", pTask->id.idStr, - pTask->hTaskInfo.id.streamId, hTaskId, pStatus->name); + stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", idStr, pMeta->vgId, hTaskId); - streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, hTaskId, pInfo->init, pInfo->start, false); - return -1;// todo set the correct error code - } else { - stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, - pTask->hTaskInfo.id.streamId, hTaskId); + SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, pTask->id.streamId, pTask->id.taskId, hStreamId, hTaskId); + if (pInfo == NULL) { + stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr); + + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + return terrno; } - // Set the execute conditions, including the query time window and the version range - SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); - if (pHTask == NULL) { - stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", pTask->id.idStr, pMeta->vgId, - hTaskId); + // set the launch time info + streamTaskInitForLaunchHTask(&pTask->hTaskInfo); - SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pTask->pMeta, pTask->id.streamId, pTask->id.taskId); - if (pInfo == NULL) { - stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", pTask->id.idStr); + // check for the timer + if (pTask->hTaskInfo.pTimer == NULL) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer); + + if (pTask->hTaskInfo.pTimer == NULL) { + ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, + pTask->status.timerActive); + + taosMemoryFree(pInfo); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); return terrno; } - streamTaskInitForLaunchHTask(&pTask->hTaskInfo); - if (pTask->hTaskInfo.pTimer == NULL) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer); - if (pTask->hTaskInfo.pTimer == NULL) { - atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", pTask->id.idStr, - pTask->status.timerActive); - taosMemoryFree(pInfo); - } else { - ASSERT(ref >= 1); - stDebug("s-task:%s set timer active flag, ref:%d", pTask->id.idStr, ref); + ASSERT(ref >= 1); + + stDebug("s-task:%s set timer active flag, ref:%d", idStr, ref); + } else { // timer exists + ASSERT(pTask->status.timerActive >= 1); + stDebug("s-task:%s set timer active flag, task timer not null", idStr); + taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer); + } + + return TSDB_CODE_SUCCESS; +} + +// an fill history task needs to be started. +int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + STaskExecStatisInfo* pExecInfo = &pTask->execInfo; + const char* idStr = pTask->id.idStr; + int64_t hStreamId = pTask->hTaskInfo.id.streamId; + int32_t hTaskId = pTask->hTaskInfo.id.taskId; + ASSERT(hTaskId != 0); + + // check stream task status in the first place. + SStreamTaskState* pStatus = streamTaskGetStatus(pTask); + if (pStatus->state != TASK_STATUS__READY) { + stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId, + pStatus->name); + + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + return -1; // todo set the correct error code + } + + stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId); + + // Set the execute conditions, including the query time window and the version range + streamMetaRLock(pMeta); + SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); + streamMetaRUnLock(pMeta); + + if (pHTask != NULL) { + SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId); + if (pHisTask == NULL) { + stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped", idStr); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + } else { + if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing + stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, true); + } else { // exist, but not ready, continue check downstream task status + checkFillhistoryTaskStatus(pTask, pHisTask); } - } else { // timer exists - ASSERT(pTask->status.timerActive >= 1); - stDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr); - taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer); + + streamMetaReleaseTask(pMeta, pHisTask); } return TSDB_CODE_SUCCESS; } - if ((*pHTask)->status.downstreamReady == 1) { - stDebug("s-task:%s fill-history task is ready, no need to check downstream", (*pHTask)->id.idStr); - } else { - checkFillhistoryTaskStatus(pTask, *pHTask); - } - - return TSDB_CODE_SUCCESS; + return handleNotBuiltFillHistoryTask(pTask); } int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { @@ -1114,7 +1164,7 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) } } -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, +int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int64_t endTs, bool ready) { STaskStartInfo* pStartInfo = &pMeta->startInfo; STaskId id = {.streamId = streamId, .taskId = taskId}; From 44208925f9deb2651f0aec2828561522f5c52cab Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 13 Jan 2024 00:10:47 +0800 Subject: [PATCH 04/11] refactor: do some internal refactor. --- source/libs/stream/src/streamStart.c | 111 +++++++++++++++++---------- 1 file changed, 69 insertions(+), 42 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 27ed6af402..c8879e59c7 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -730,6 +730,42 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCANHIST); } +static void noRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) { + SStreamMeta* pMeta = pTask->pMeta; + SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); + + stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d", + pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref); + + pHTaskInfo->id.taskId = 0; + pHTaskInfo->id.streamId = 0; +} + +static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) { + SStreamMeta* pMeta = pTask->pMeta; + SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; + + if (streamTaskShouldStop(pTask)) { // record the failure + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64 ", ref:%d", pInfo->id.taskId, + pInfo->hTaskId.taskId, ref); + + streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); + taosMemoryFree(pInfo); + } else { + char* p = streamTaskGetStatus(pTask)->name; + int32_t hTaskId = pHTaskInfo->id.taskId; + + stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", + pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); + + taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer); + } +} + static void tryLaunchHistoryTask(void* param, void* tmrId) { SLaunchHTaskInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; @@ -738,28 +774,35 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { streamMetaWLock(pMeta); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id)); - if (ppTask != NULL) { + if (ppTask == NULL || *ppTask == NULL) { + stError("s-task:0x%x and rel fill-history task:0x%" PRIx64 " all have been destroyed, not launch", + (int32_t)pInfo->id.taskId, pInfo->hTaskId.taskId); + streamMetaWUnLock(pMeta); + + // already dropped, no need to set the failure info into the stream task meta. + taosMemoryFree(pInfo); + return; + } + + if (streamTaskShouldStop(*ppTask)) { ASSERT((*ppTask)->status.timerActive >= 1); - if (streamTaskShouldStop(*ppTask)) { - char* p = streamTaskGetStatus(*ppTask)->name; + char* p = streamTaskGetStatus(*ppTask)->name; + int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); + stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d", + (*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref); - int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); - stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d", - (*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref); - streamMetaWUnLock(pMeta); + streamMetaWUnLock(pMeta); - // record the related fill-history task failed - STaskId* pHTaskId = &(*ppTask)->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pMeta, pHTaskId->streamId, pHTaskId->taskId, 0, now, false); - - taosMemoryFree(pInfo); - return; - } + // record the related fill-history task failed + streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); + taosMemoryFree(pInfo); + return; } + + SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pInfo->id.streamId, pInfo->id.taskId); streamMetaWUnLock(pMeta); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId); if (pTask != NULL) { SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; @@ -771,34 +814,18 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { } if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - streamMetaAddTaskLaunchResult(pMeta, pInfo->id.streamId, pInfo->id.taskId, 0, now, false); - - streamMetaReleaseTask(pMeta, pTask); - stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d", - pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref); - - pHTaskInfo->id.taskId = 0; - pHTaskInfo->id.streamId = 0; + noRetryLaunchFillHistoryTask(pTask, pInfo, now); } else { // not reach the limitation yet, let's continue retrying launch related fill-history task. streamTaskSetRetryInfoForLaunch(pHTaskInfo); ASSERT(pTask->status.timerActive >= 1); // abort the timer if intend to stop task SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId); - if (pHTask == NULL && (!streamTaskShouldStop(pTask))) { - char* p = streamTaskGetStatus(pTask)->name; - int32_t hTaskId = pHTaskInfo->id.taskId; - - stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", - pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); - - taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer); + if (pHTask == NULL) { + doRetryLaunchFillHistoryTask(pTask, pInfo, now); streamMetaReleaseTask(pMeta, pTask); return; - } - - if (pHTask != NULL) { + } else { checkFillhistoryTaskStatus(pTask, pHTask); streamMetaReleaseTask(pMeta, pHTask); } @@ -807,15 +834,15 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, pHTaskInfo->retryTimes, ref); - - streamMetaReleaseTask(pMeta, pTask); } - } else { - streamMetaAddTaskLaunchResult(pMeta, pInfo->id.streamId, pInfo->id.taskId, 0, now, false); - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stError("s-task:0x%x failed to load fill-history task, it may have been destroyed, not launch fill-history task", - (int32_t)pInfo->id.taskId); + streamMetaReleaseTask(pMeta, pTask); + } else { + streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); + + int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); + stError("s-task:0x%x rel fill-history task:0x%" PRIx64 " may have been destroyed, not launch, ref:%d", + (int32_t)pInfo->id.taskId, pInfo->hTaskId.taskId, ref); } taosMemoryFree(pInfo); From 98758862cf42d403b9f8bcc29729fc594d9acbcf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 13 Jan 2024 00:17:29 +0800 Subject: [PATCH 05/11] refactor: do some internal refactor. --- source/libs/stream/src/streamStart.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index c8879e59c7..53c40e5b5c 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -866,7 +866,7 @@ SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, in return pInfo; } -static int32_t handleNotBuiltFillHistoryTask(SStreamTask* pTask) { +static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; STaskExecStatisInfo* pExecInfo = &pTask->execInfo; const char* idStr = pTask->id.idStr; @@ -894,8 +894,7 @@ static int32_t handleNotBuiltFillHistoryTask(SStreamTask* pTask) { if (pTask->hTaskInfo.pTimer == NULL) { ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, - pTask->status.timerActive); + stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, ref); taosMemoryFree(pInfo); streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); @@ -940,10 +939,10 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); streamMetaRUnLock(pMeta); - if (pHTask != NULL) { + if (pHTask != NULL) { // it is already added into stream meta store. SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId); if (pHisTask == NULL) { - stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped", idStr); + stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr); streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); } else { if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing @@ -957,9 +956,9 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { } return TSDB_CODE_SUCCESS; + } else { + return launchNotBuiltFillHistoryTask(pTask); } - - return handleNotBuiltFillHistoryTask(pTask); } int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { From dc1a0821a7c06495e1dc91db5f1b095c8623cad3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 13 Jan 2024 00:19:46 +0800 Subject: [PATCH 06/11] refactor: do some internal refactor. --- source/libs/stream/src/streamStart.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 53c40e5b5c..84f1dbb4d7 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -828,12 +828,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { } else { checkFillhistoryTaskStatus(pTask, pHTask); streamMetaReleaseTask(pMeta, pHTask); - } - // not in timer anymore - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, - pHTaskInfo->retryTimes, ref); + // not in timer anymore + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, + pHTaskInfo->retryTimes, ref); + } } streamMetaReleaseTask(pMeta, pTask); From 8a2760e4d00c81adbaf7e42a187d2bc9a6fe713a Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sun, 14 Jan 2024 15:39:10 +0800 Subject: [PATCH 07/11] feat: s3 support stream check --- tests/army/enterprise/s3/s3_basic.py | 30 +++++++++++++++++++++++++++- tests/army/frame/caseBase.py | 3 +++ tests/army/frame/sql.py | 5 +++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/tests/army/enterprise/s3/s3_basic.py b/tests/army/enterprise/s3/s3_basic.py index f22d482502..39ab64e85d 100644 --- a/tests/army/enterprise/s3/s3_basic.py +++ b/tests/army/enterprise/s3/s3_basic.py @@ -57,13 +57,18 @@ class TDTestCase(TBase): etool.benchMark(json=json) tdSql.execute(f"use {self.db}") - # set insert data information + # come from s3_basic.json self.childtable_count = 4 self.insert_rows = 1000000 self.timestamp_step = 1000 + def createStream(self, sname): + sql = f"create stream {sname} fill_history 1 into stm1 as select count(*) from {self.db}.{self.stb} interval(10s);" + tdSql.execute(sql) + def doAction(self): tdLog.info(f"do action.") + self.flushDb() self.compactDb() @@ -80,16 +85,33 @@ class TDTestCase(TBase): time.sleep(5) self.trimDb(True) loop += 1 + tdLog.info(f"loop={loop} wait 5s...") + + def checkStreamCorrect(self): + sql = f"select count(*) from {self.db}.stm1" + count = 0 + for i in range(30): + tdSql.query(sql) + count = tdSql.getData(0, 0) + if count == 100000 or count == 100001: + return True + time.sleep(1) + + tdLog.exit(f"stream count is not expect . expect = 100000 or 100001 real={count} . sql={sql}") # run def run(self): tdLog.debug(f"start to excute {__file__}") + self.sname = "stream1" if eos.isArm64Cpu(): tdLog.success(f"{__file__} arm64 ignore executed") else: # insert data self.insertData() + # creat stream + self.createStream(self.sname) + # check insert data correct self.checkInsertCorrect() @@ -105,6 +127,12 @@ class TDTestCase(TBase): # check insert correct again self.checkInsertCorrect() + # check stream correct and drop stream + self.checkStreamCorrect() + + # drop stream + self.dropStream(self.sname) + # drop database and free s3 file self.dropDb() diff --git a/tests/army/frame/caseBase.py b/tests/army/frame/caseBase.py index c9f3aa1880..ec6b36aa1b 100644 --- a/tests/army/frame/caseBase.py +++ b/tests/army/frame/caseBase.py @@ -72,6 +72,9 @@ class TBase: def dropDb(self, show = False): tdSql.execute(f"drop database {self.db}", show = show) + def dropStream(self, sname, show = False): + tdSql.execute(f"drop stream {sname}", show = show) + def splitVGroups(self): vgids = self.getVGroup(self.db) selid = random.choice(vgids) diff --git a/tests/army/frame/sql.py b/tests/army/frame/sql.py index 2e14f0c2f0..eafae9be2d 100644 --- a/tests/army/frame/sql.py +++ b/tests/army/frame/sql.py @@ -482,6 +482,11 @@ class TDSql: time.sleep(1) pass + # execute many sql + def executes(self, sqls, queryTimes=30, show=False): + for sql in sqls: + self.execute(sql, queryTimes, show) + def checkAffectedRows(self, expectAffectedRows): if self.affectedRows != expectAffectedRows: caller = inspect.getframeinfo(inspect.stack()[1][0]) From 3573eddd8709e50998a5b260ab612a59fec410b1 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sun, 14 Jan 2024 16:42:59 +0800 Subject: [PATCH 08/11] fix: change delay 120s to wait stream result --- tests/army/enterprise/s3/s3_basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/army/enterprise/s3/s3_basic.py b/tests/army/enterprise/s3/s3_basic.py index 39ab64e85d..976ad85747 100644 --- a/tests/army/enterprise/s3/s3_basic.py +++ b/tests/army/enterprise/s3/s3_basic.py @@ -90,7 +90,7 @@ class TDTestCase(TBase): def checkStreamCorrect(self): sql = f"select count(*) from {self.db}.stm1" count = 0 - for i in range(30): + for i in range(120): tdSql.query(sql) count = tdSql.getData(0, 0) if count == 100000 or count == 100001: From ce855bc493dbf27fd07ffaf5a20da5f24d2ad0f0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Jan 2024 09:21:31 +0800 Subject: [PATCH 09/11] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckpoint.c | 47 +++++++++++------------ 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1c9361fa61..fb9d111124 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -294,11 +294,8 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { int32_t code = 0; SCheckpointInfo* pCKInfo = &p->chkInfo; - if (p->info.fillHistory == 1) { - return code; - } - - if (p->info.taskLevel > TASK_LEVEL__SINK) { + // fill-history task, rsma task, and sink task will not generate the checkpoint + if ((p->info.fillHistory == 1) || (p->info.taskLevel >= TASK_LEVEL__SINK)) { return code; } @@ -333,7 +330,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, pStatus->name); // save the task if not sink task - if (p->info.taskLevel != TASK_LEVEL__SINK) { + if (p->info.taskLevel < TASK_LEVEL__SINK) { streamMetaWLock(pMeta); code = streamMetaSaveTask(pMeta, p); @@ -451,16 +448,17 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL); } int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { - int32_t code = TSDB_CODE_SUCCESS; - int64_t startTs = pTask->chkInfo.startTs; - int64_t ckId = pTask->chkInfo.checkpointingId; + int32_t code = TSDB_CODE_SUCCESS; + int64_t startTs = pTask->chkInfo.startTs; + int64_t ckId = pTask->chkInfo.checkpointingId; + const char* id = pTask->id.idStr; // sink task do not need to save the status, and generated the checkpoint if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - stDebug("s-task:%s level:%d start gen checkpoint", pTask->id.idStr, pTask->info.taskLevel); + stDebug("s-task:%s level:%d start gen checkpoint", id, pTask->info.taskLevel); code = streamBackendDoCheckpoint(pTask->pBackend, ckId); if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId, tstrerror(terrno)); + stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno)); } } @@ -474,39 +472,38 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { // todo: let's retry send rsp to upstream/mnode - stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr, - ckId, tstrerror(code)); + stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId, + tstrerror(code)); } } // clear the checkpoint info, and commit the newest checkpoint info if all works are done successfully if (code == TSDB_CODE_SUCCESS) { code = streamSaveTaskCheckpointInfo(pTask, ckId); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId, - tstrerror(code)); - } else { - code = streamTaskUploadChkp(pTask, ckId, (char*)pTask->id.idStr); - if (code != 0) { - stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", pTask->id.idStr, ckId); + if (code == TSDB_CODE_SUCCESS) { + code = streamTaskUploadChkp(pTask, ckId, (char*)id); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId); } + } else { + stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code)); } } - if (code != TSDB_CODE_SUCCESS) { // clear the checkpoint info if failed + // clear the checkpoint info if failed + if (code != TSDB_CODE_SUCCESS) { taosThreadMutexLock(&pTask->lock); streamTaskClearCheckInfo(pTask, false); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); taosThreadMutexUnlock(&pTask->lock); streamTaskSetCheckpointFailedId(pTask); - stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, pTask->id.idStr, - ckId); + stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId); } double el = (taosGetTimestampMs() - startTs) / 1000.0; - stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", - pTask->id.idStr, pTask->pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el, + stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", id, + pTask->pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el, (code == TSDB_CODE_SUCCESS) ? "succ" : "failed"); return code; From e058653e1d1b1820eaedeec254699c4502a61a6f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Jan 2024 09:45:04 +0800 Subject: [PATCH 10/11] fix(stream): adjust test cases. --- tests/system-test/8-stream/at_once_session.py | 4 ++++ tests/system-test/8-stream/max_delay_interval_ext.py | 2 ++ 2 files changed, 6 insertions(+) diff --git a/tests/system-test/8-stream/at_once_session.py b/tests/system-test/8-stream/at_once_session.py index 9a253a187f..3462561c4e 100644 --- a/tests/system-test/8-stream/at_once_session.py +++ b/tests/system-test/8-stream/at_once_session.py @@ -59,10 +59,14 @@ class TDTestCase: ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", cast(cast({partition_elm_alias} as bigint) as varchar(20))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", cast(cast({partition_elm_alias} as bigint) as varchar(20))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None + time.sleep(1) # create stb/ctb/tb stream self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {self.ctb_name} partition by {partition} {partition_elm_alias} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="at_once", ignore_expired=ignore_expired, ignore_update=ignore_update, subtable_value=ctb_subtable_value, fill_history_value=fill_history_value) self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} partition by {partition} {partition_elm_alias} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="at_once", ignore_expired=ignore_expired, ignore_update=ignore_update, subtable_value=tb_subtable_value, fill_history_value=fill_history_value) + + time.sleep(1) + for i in range(self.tdCom.range_count): ctb_name = self.tdCom.get_long_name() self.tdCom.screate_ctable(stbname=self.stb_name, ctbname=ctb_name) diff --git a/tests/system-test/8-stream/max_delay_interval_ext.py b/tests/system-test/8-stream/max_delay_interval_ext.py index 653fcd997c..6536309a25 100644 --- a/tests/system-test/8-stream/max_delay_interval_ext.py +++ b/tests/system-test/8-stream/max_delay_interval_ext.py @@ -50,6 +50,8 @@ class TDTestCase: # create stb/ctb/tb stream self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ext_stb_stream_des_table, subtable_value=stb_subtable_value, source_sql=f'select _wstart AS wstart, {partitial_tb_source_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value, fill_history_value=fill_history_value, stb_field_name_value=stb_field_name_value, tag_value=tag_value, use_exist_stb=use_exist_stb) + time.sleep(1) + init_num = 0 for i in range(self.tdCom.range_count): if i == 0: From d46b5e6c9a6be8676eeb48bb92c7abc72673869b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Jan 2024 09:53:15 +0800 Subject: [PATCH 11/11] fix(stream): enable sink tasks handle the checkpoint. --- source/libs/stream/src/streamCheckpoint.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index fb9d111124..cb3f7a3504 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -295,7 +295,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { SCheckpointInfo* pCKInfo = &p->chkInfo; // fill-history task, rsma task, and sink task will not generate the checkpoint - if ((p->info.fillHistory == 1) || (p->info.taskLevel >= TASK_LEVEL__SINK)) { + if ((p->info.fillHistory == 1) || (p->info.taskLevel > TASK_LEVEL__SINK)) { return code; }