From 2806fe1c563b13a554147b9c6b21f219a1892fde Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 14 Aug 2023 10:58:24 +0800 Subject: [PATCH 1/7] fix(stream): ignore the related stream task destory msg in transfer state. --- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/stream/src/streamDispatch.c | 6 ++--- source/libs/stream/src/streamExec.c | 32 ++++++++++++++----------- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9dfde0fed7..4b666ec54a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1527,7 +1527,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; + return TSDB_CODE_SUCCESS; } else { tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId); return TSDB_CODE_INVALID_MSG; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 94e005b790..06861454d1 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -758,10 +758,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ASSERT(pTask->info.fillHistory == 1); code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - return code; + if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens +// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); } + return TSDB_CODE_SUCCESS; } pTask->msgInfo.retryCount = 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3b954793de..a3ff752bc5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -292,9 +292,20 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - // todo: destroy the fill-history task here - qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, - pTask->streamTaskId.taskId); + qError( + "s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related " + "fill-history task", + pTask->id.idStr, pTask->streamTaskId.taskId); + + // 1. free it and remove fill-history task from disk meta-store + streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); + + // 2. save to disk + taosWLockLatch(&pMeta->lock); + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } + taosWUnLockLatch(&pMeta->lock); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } else { qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr, @@ -334,9 +345,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr); } - // todo check the output queue for fill-history task, and wait for it complete - - // 1. expand the query time window for stream task of WAL scanner pTimeWindow->skey = INT64_MIN; qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); @@ -390,15 +398,10 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__SOURCE) { streamTaskFillHistoryFinished(pTask); + } + + if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. code = streamDoTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo handle this - return code; - } - } else if (level == TASK_LEVEL__AGG) { // do transfer task operator states. - code = streamDoTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo handle this - return code; - } } return code; @@ -522,6 +525,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); } + // agg task should dispatch trans-state msg to sink task, to flush all data to sink task. if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { pBlock->srcVgId = pTask->pMeta->vgId; code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); From e8c9a019a4938671fbc0a77fe4e523139c987dd8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 14 Aug 2023 11:41:24 +0800 Subject: [PATCH 2/7] fix(stream): remove the invalid set of scheduler status. --- include/libs/stream/tstream.h | 1 - source/libs/stream/src/streamExec.c | 10 ---------- source/libs/stream/src/streamRecover.c | 2 -- 3 files changed, 13 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d3b670d0ec..02bb65b762 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -593,7 +593,6 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); -int32_t streamTaskEndScanWAL(SStreamTask* pTask); SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a3ff752bc5..37c5808e02 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -646,16 +646,6 @@ bool streamTaskIsIdle(const SStreamTask* pTask) { pTask->status.taskStatus == TASK_STATUS__DROPPING); } -int32_t streamTaskEndScanWAL(SStreamTask* pTask) { - const char* id = pTask->id.idStr; - double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; - qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); - - // 1. notify all downstream tasks to transfer executor state after handle all history blocks. - appendTranstateIntoInputQ(pTask); - return TSDB_CODE_SUCCESS; -} - int32_t streamTryExec(SStreamTask* pTask) { // this function may be executed by multi-threads, so status check is required. int8_t schedStatus = diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 72dae735e1..42ff9b9b4e 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -400,9 +400,7 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { } pTask->status.appendTranstateBlock = true; - qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus); - pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; streamSchedExec(pTask); return TSDB_CODE_SUCCESS; From 5ec6b64aab0f37930dcb9c4c45b1ece6663f664a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 14 Aug 2023 13:12:41 +0800 Subject: [PATCH 3/7] fix(stream): add logs. --- source/dnode/vnode/src/tq/tqRead.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 9b8f1781cb..a875febd09 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -339,8 +339,12 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead)); int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead); - extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); - tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver); + code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); + if (code != TSDB_CODE_SUCCESS) { + tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code)); + } else { + tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver); + } } else { ASSERT(0); } From 91710b0c0f243e8c8f00a65b637e231d36801f33 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 14 Aug 2023 14:56:17 +0800 Subject: [PATCH 4/7] fix(stream): --- source/dnode/vnode/src/tq/tq.c | 2 ++ source/libs/stream/src/streamRecover.c | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 64715122f2..ddd0c49649 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1274,6 +1274,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pTask->tsInfo.step2Start = taosGetTimestampMs(); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); appendTranstateIntoInputQ(pTask); + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + streamSchedExec(pTask); } else { STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 42ff9b9b4e..c3d4d4c7ae 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -401,8 +401,6 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { pTask->status.appendTranstateBlock = true; qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus); - streamSchedExec(pTask); - return TSDB_CODE_SUCCESS; } From a1e554fbf32b68496f63a5a9c9a0d5fe41d25dfd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 14 Aug 2023 15:05:13 +0800 Subject: [PATCH 5/7] refactor: exec directly not asynchnoized. --- source/dnode/vnode/src/tq/tq.c | 3 +-- source/libs/stream/src/streamExec.c | 36 ++++++----------------------- 2 files changed, 8 insertions(+), 31 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ddd0c49649..815e9647b5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1274,8 +1274,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pTask->tsInfo.step2Start = taosGetTimestampMs(); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); appendTranstateIntoInputQ(pTask); - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - streamSchedExec(pTask); + streamTryExec(pTask); // exec directly } else { STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 37c5808e02..ccfa331661 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -661,36 +661,14 @@ int32_t streamTryExec(SStreamTask* pTask) { } // todo the task should be commit here -// if (taosQueueEmpty(pTask->inputQueue->queue)) { - // fill-history WAL scan has completed -// if (pTask->status.transferState) { -// code = streamTransferStateToStreamTask(pTask); -// if (code != TSDB_CODE_SUCCESS) { -// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); -// return code; -// } + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), + pTask->status.schedStatus); - // the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by - // call this function (streamExecForAll) directly. - // code = streamExecForAll(pTask); - // if (code < 0) { - // do nothing - // } -// } - -// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); -// qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, -// streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); -// } else { - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->status.schedStatus); - - if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) || - streamTaskShouldPause(&pTask->status))) { - streamSchedExec(pTask); - } -// } + if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) || + streamTaskShouldPause(&pTask->status))) { + streamSchedExec(pTask); + } } else { qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); From 2e6263b43f5102f108fd38ce3a14ab590d3580d8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 14 Aug 2023 17:16:11 +0800 Subject: [PATCH 6/7] fix(stream): fix memory leak. --- source/dnode/vnode/src/tq/tqRestore.c | 5 +++-- source/libs/stream/src/streamDispatch.c | 4 ++-- source/libs/stream/src/streamExec.c | 17 +++++++++++------ source/libs/stream/src/streamRecover.c | 1 - 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 3054179416..d363031db1 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -211,12 +211,13 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { const char* id = pTask->id.idStr; + int64_t maxVer = pTask->dataRange.range.maxVer; if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { if (!pTask->status.appendTranstateBlock) { qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal anymore, add transfer-state block into inputQ", - id, ver, pTask->dataRange.range.maxVer); + id, ver, maxVer); double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); @@ -224,7 +225,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { /*int32_t code = */streamSchedExec(pTask); } else { qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal", - id, ver, pTask->dataRange.range.maxVer); + id, ver, maxVer); } } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 06861454d1..557b92baf9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -757,10 +757,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i qDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id); ASSERT(pTask->info.fillHistory == 1); code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens -// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); } + + streamFreeQitem(pTask->msgInfo.pData); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ccfa331661..cebae0801d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -534,16 +534,21 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock } else { streamFreeQitem((SStreamQueueItem*)pBlock); } + } else { // level == TASK_LEVEL__SINK + streamFreeQitem((SStreamQueueItem*)pBlock); } } else { // non-dispatch task, do task state transfer directly - qDebug("s-task:%s non-dispatch task, start to transfer state directly", id); - streamFreeQitem((SStreamQueueItem*)pBlock); - ASSERT(pTask->info.fillHistory == 1); - code = streamTransferStateToStreamTask(pTask); + if (level != TASK_LEVEL__SINK) { + qDebug("s-task:%s non-dispatch task, start to transfer state directly", id); + ASSERT(pTask->info.fillHistory == 1); + code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + if (code != TSDB_CODE_SUCCESS) { + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + } + } else { + qDebug("s-task:%d sink task does not transfer state", id); } } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index c3d4d4c7ae..0a1a15259c 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -400,7 +400,6 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { } pTask->status.appendTranstateBlock = true; - qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus); return TSDB_CODE_SUCCESS; } From 5d3232d275f97ecf63205dc04c211b0887f5e39e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 14 Aug 2023 18:14:32 +0800 Subject: [PATCH 7/7] fix(stream): fix an syntax error. --- source/libs/stream/src/streamExec.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index cebae0801d..b513d7c13e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -548,7 +548,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); } } else { - qDebug("s-task:%d sink task does not transfer state", id); + qDebug("s-task:%s sink task does not transfer state", id); } }