From 4ea737571dd73e78808473f76eb6d2b8ad411ce6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 24 Sep 2023 01:26:51 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 38 ++++---- source/dnode/vnode/src/tq/tq.c | 18 ++-- source/dnode/vnode/src/tq/tqSink.c | 14 +-- source/dnode/vnode/src/tq/tqStreamTask.c | 6 +- source/libs/stream/inc/streamInt.h | 12 ++- source/libs/stream/src/streamCheckpoint.c | 15 ++- source/libs/stream/src/streamDispatch.c | 58 +++++------ source/libs/stream/src/streamExec.c | 4 +- source/libs/stream/src/streamRecover.c | 113 +++++++++++++++------- source/libs/stream/src/streamTask.c | 58 +++++++---- 10 files changed, 205 insertions(+), 131 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f1c6e369cb..20cf7bb110 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -209,7 +209,7 @@ typedef struct { int32_t taskId; int32_t nodeId; SEpSet epSet; -} STaskDispatcherFixedEp; +} STaskDispatcherFixed; typedef struct { char stbFullName[TSDB_TABLE_FNAME_LEN]; @@ -298,7 +298,7 @@ typedef struct SDispatchMsgInfo { int8_t dispatchMsgType; int16_t msgType; // dispatch msg type int32_t retryCount; // retry send data count - int64_t startTs; // output blocking timestamp + int64_t startTs; // dispatch start time, record total elapsed time for dispatch SArray* pRetryList; // current dispatch successfully completed node of downstream } SDispatchMsgInfo; @@ -318,24 +318,27 @@ typedef struct STaskSchedInfo { void* pTimer; } STaskSchedInfo; -typedef struct SSinkTaskRecorder { +typedef struct SSinkRecorder { int64_t numOfSubmit; int64_t numOfBlocks; int64_t numOfRows; int64_t bytes; -} SSinkTaskRecorder; +} SSinkRecorder; -typedef struct { - int64_t created; - int64_t init; - int64_t step1Start; - int64_t step2Start; - int64_t start; - int32_t updateCount; - int32_t dispatchCount; - int64_t latestUpdateTs; +typedef struct STaskExecStatisInfo { + int64_t created; + int64_t init; + int64_t step1Start; + int64_t step2Start; + int64_t start; + int32_t updateCount; + int32_t dispatch; + int64_t latestUpdateTs; + int32_t checkpoint; + SSinkRecorder sink; } STaskExecStatisInfo; +typedef struct STaskTimer STaskTimer; typedef struct STokenBucket STokenBucket; typedef struct SMetaHbInfo SMetaHbInfo; @@ -353,23 +356,22 @@ struct SStreamTask { SDataRange dataRange; STaskId historyTaskId; STaskId streamTaskId; - STaskExecStatisInfo taskExecInfo; + STaskExecStatisInfo execInfo; SArray* pReadyMsgList; // SArray TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ SArray* pUpstreamInfoList; // output union { - STaskDispatcherFixedEp fixedEpDispatcher; + STaskDispatcherFixed fixedDispatcher; STaskDispatcherShuffle shuffleDispatcher; STaskSinkTb tbSink; STaskSinkSma smaSink; STaskSinkFetch fetchSink; }; - SSinkTaskRecorder sinkRecorder; - STokenBucket* pTokenBucket; - void* launchTaskTimer; + STokenBucket* pTokenBucket; + STaskTimer* pTimer; SMsgCb* pMsgCb; // msg handle SStreamState* pState; // state backend SArray* pRspMsgList; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f38800ef78..60f43950ce 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1003,8 +1003,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms bool restored = pTq->pVnode->restored; if (p != NULL && restored) { - p->taskExecInfo.init = taosGetTimestampMs(); - tqDebug("s-task:%s set the init ts:%"PRId64, p->id.idStr, p->taskExecInfo.init); + p->execInfo.init = taosGetTimestampMs(); + tqDebug("s-task:%s set the init ts:%"PRId64, p->id.idStr, p->execInfo.init); streamTaskCheckDownstream(p); } else if (!restored) { @@ -1042,14 +1042,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus); - if (pTask->taskExecInfo.step1Start == 0) { + if (pTask->execInfo.step1Start == 0) { ASSERT(pTask->status.pauseAllowed == false); - pTask->taskExecInfo.step1Start = taosGetTimestampMs(); + pTask->execInfo.step1Start = taosGetTimestampMs(); if (pTask->info.fillHistory == 1) { streamTaskEnablePause(pTask); } } else { - tqDebug("s-task:%s resume from paused, start ts:%" PRId64, pTask->id.idStr, pTask->taskExecInfo.step1Start); + tqDebug("s-task:%s resume from paused, start ts:%" PRId64, pTask->id.idStr, pTask->execInfo.step1Start); } // we have to continue retrying to successfully execute the scan history task. @@ -1069,7 +1069,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamScanHistoryData(pTask); if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { - double el = (taosGetTimestampMs() - pTask->taskExecInfo.step1Start) / 1000.0; + double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; int8_t status = streamTaskSetSchedStatusInActive(pTask); tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status); streamMetaReleaseTask(pMeta, pTask); @@ -1077,7 +1077,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // the following procedure should be executed, no matter status is stop/pause or not - double el = (taosGetTimestampMs() - pTask->taskExecInfo.step1Start) / 1000.0; + double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { @@ -1158,7 +1158,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer); if (done) { - pTask->taskExecInfo.step2Start = taosGetTimestampMs(); + pTask->execInfo.step2Start = taosGetTimestampMs(); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); streamTaskPutTranstateIntoInputQ(pTask); streamTryExec(pTask); // exec directly @@ -1170,7 +1170,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pStreamTask->id.idStr); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - pTask->taskExecInfo.step2Start = taosGetTimestampMs(); + pTask->execInfo.step2Start = taosGetTimestampMs(); streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); int64_t dstVer = pTask->dataRange.range.minVer; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index f3765f6484..81c3d3d07d 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -270,11 +270,11 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); } - SSinkTaskRecorder* pRec = &pTask->sinkRecorder; + SSinkRecorder* pRec = &pTask->execInfo.sink; pRec->numOfSubmit += 1; if ((pRec->numOfSubmit % 5000) == 0) { - double el = (taosGetTimestampMs() - pTask->taskExecInfo.start) / 1000.0; + double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0; tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 " submit into dst table, %.2fMiB duration:%.2f Sec.", pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->bytes), el); @@ -755,8 +755,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { int32_t code = TSDB_CODE_SUCCESS; const char* id = pTask->id.idStr; - if (pTask->taskExecInfo.start == 0) { - pTask->taskExecInfo.start = taosGetTimestampMs(); + if (pTask->execInfo.start == 0) { + pTask->execInfo.start = taosGetTimestampMs(); } bool onlySubmitData = true; @@ -785,7 +785,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { continue; } else { - pTask->sinkRecorder.numOfBlocks += 1; + pTask->execInfo.sink.numOfBlocks += 1; SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; if (submitReq.aSubmitTbData == NULL) { @@ -833,7 +833,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { } hasSubmit = true; - pTask->sinkRecorder.numOfBlocks += 1; + pTask->execInfo.sink.numOfBlocks += 1; uint64_t groupId = pDataBlock->info.id.groupId; SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; @@ -867,7 +867,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { } } - pTask->sinkRecorder.numOfRows += pDataBlock->info.rows; + pTask->execInfo.sink.numOfRows += pDataBlock->info.rows; } taosHashCleanup(pTableIndexMap); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 8992d07879..b39132d675 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -96,8 +96,8 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) { continue; } - pTask->taskExecInfo.init = taosGetTimestampMs(); - tqDebug("s-task:%s start check downstream tasks, set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init); + pTask->execInfo.init = taosGetTimestampMs(); + tqDebug("s-task:%s start check downstream tasks, set the init ts:%"PRId64, pTask->id.idStr, pTask->execInfo.init); streamSetStatusNormal(pTask); streamTaskCheckDownstream(pTask); @@ -306,7 +306,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { ", not scan wal anymore, add transfer-state block into inputQ", id, ver, maxVer); - double el = (taosGetTimestampMs() - pTask->taskExecInfo.step2Start) / 1000.0; + double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0; qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); /*int32_t code = */streamSchedExec(pTask); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 10a7dc7be7..6600d7dd04 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -26,6 +26,8 @@ extern "C" { #endif +#define CHECK_DOWNSTREAM_INTERVAL 100 + // clang-format off #define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define stError(...) do { if (stDebugFlag & DEBUG_ERROR) { taosPrintLog("STM ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) @@ -53,11 +55,17 @@ struct STokenBucket { int32_t rate; // number of token per second }; +struct STaskTimer { + void* hTaskLaunchTimer; + void* dispatchTimer; + void* checkTimer; +}; + extern SStreamGlobalEnv streamEnv; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; -void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); +void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration); int32_t streamDispatchStreamBlock(SStreamTask* pTask); void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups); int32_t getNumOfDispatchBranch(SStreamTask* pTask); @@ -75,7 +83,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId); int32_t streamTaskBuildCheckpoint(SStreamTask* pTask); -int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); +int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 735136ba5b..997fecbba9 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -141,6 +141,8 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); pTask->chkInfo.startTs = taosGetTimestampMs(); + pTask->execInfo.checkpoint += 1; + // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into // inputQ, to make sure all blocks with less version have been handled by this task already. int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); @@ -200,6 +202,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0); if (pTask->chkInfo.startTs == 0) { pTask->chkInfo.startTs = taosGetTimestampMs(); + pTask->execInfo.checkpoint += 1; } // update the child Id for downstream tasks @@ -321,13 +324,15 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { stDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); - stInfo("vgId:%d vnode wide checkpoint completed, save all tasks status, elapsed time:%.2f Sec checkpointId:%" PRId64, pMeta->vgId, - el, pTask->checkpointingId); + stInfo( + "vgId:%d vnode wide checkpoint completed, save all tasks status, last:%s, level:%d elapsed time:%.2f Sec " + "checkpointId:%" PRId64, + pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, pTask->checkpointingId); } else { stInfo( - "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, elapsed time:%.2f Sec not " - "ready:%d/%d", - pMeta->vgId, pTask->id.idStr, el, remain, pMeta->numOfStreamTasks); + "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, level:%d elapsed time:%.2f Sec " + "not ready:%d/%d", + pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, remain, pMeta->numOfStreamTasks); } // send check point response to upstream task diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 08be06c841..bf7abc7457 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -114,7 +114,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas pReq->streamId = pTask->id.streamId; pReq->srcVgId = vgId; pReq->stage = pTask->pMeta->stage; - pReq->msgId = pTask->taskExecInfo.dispatchCount; + pReq->msgId = pTask->execInfo.dispatch; pReq->upstreamTaskId = pTask->id.taskId; pReq->upstreamChildId = pTask->info.selfChildId; pReq->upstreamNodeId = pTask->info.nodeId; @@ -245,7 +245,7 @@ CLEAR: return code; } -int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) { +int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; SRpcMsg msg = {0}; @@ -305,7 +305,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq)); - int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; + int32_t downstreamTaskId = pTask->fixedDispatcher.taskId; code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type); if (code != TSDB_CODE_SUCCESS) { return code; @@ -375,19 +375,19 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD pTask->msgInfo.pData = pReqs; } - stDebug("s-task:%s build dispatch msg success, msgId:%d", pTask->id.idStr, pTask->taskExecInfo.dispatchCount); + stDebug("s-task:%s build dispatch msg success, msgId:%d", pTask->id.idStr, pTask->execInfo.dispatch); return code; } static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatchMsg) { int32_t code = 0; - int32_t msgId = pTask->taskExecInfo.dispatchCount; + int32_t msgId = pTask->execInfo.dispatch; const char* id = pTask->id.idStr; if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - int32_t vgId = pTask->fixedEpDispatcher.nodeId; - SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet; - int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; + int32_t vgId = pTask->fixedDispatcher.nodeId; + SEpSet* pEpSet = &pTask->fixedDispatcher.epSet; + int32_t downstreamTaskId = pTask->fixedDispatcher.taskId; stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id, pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId); @@ -422,7 +422,7 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch static void doRetryDispatchData(void* param, void* tmrId) { SStreamTask* pTask = param; const char* id = pTask->id.idStr; - int32_t msgId = pTask->taskExecInfo.dispatchCount; + int32_t msgId = pTask->execInfo.dispatch; if (streamTaskShouldStop(&pTask->status)) { int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); @@ -443,7 +443,6 @@ static void doRetryDispatchData(void* param, void* tmrId) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgroups = taosArrayGetSize(vgInfo); - int32_t numOfFailed = taosArrayGetSize(pList); stDebug("s-task:%s (child taskId:%d) re-try shuffle-dispatch blocks to %d vgroup(s), msgId:%d", id, pTask->info.selfChildId, numOfFailed, msgId); @@ -467,9 +466,9 @@ static void doRetryDispatchData(void* param, void* tmrId) { stDebug("s-task:%s complete re-try shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfFailed, msgId); } else { - int32_t vgId = pTask->fixedEpDispatcher.nodeId; - SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet; - int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; + int32_t vgId = pTask->fixedDispatcher.nodeId; + SEpSet* pEpSet = &pTask->fixedDispatcher.epSet; + int32_t downstreamTaskId = pTask->fixedDispatcher.taskId; stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id, pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId); @@ -483,9 +482,9 @@ static void doRetryDispatchData(void* param, void* tmrId) { // stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); // atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); if (streamTaskShouldPause(&pTask->status)) { - streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS * 10); + streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS * 10); } else { - streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); + streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); } } else { int32_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); @@ -497,15 +496,17 @@ static void doRetryDispatchData(void* param, void* tmrId) { } } -void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) { +void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) { + STaskTimer* pTmr = pTask->pTimer; pTask->msgInfo.retryCount++; - stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr, waitDuration, - pTask->taskExecInfo.dispatchCount, pTask->msgInfo.retryCount); - if (pTask->launchTaskTimer != NULL) { - taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer); + stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr, + waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount); + + if (pTmr->dispatchTimer != NULL) { + taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTmr->dispatchTimer); } else { - pTask->launchTaskTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer); + pTmr->dispatchTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer); } } @@ -608,7 +609,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { pBlock->type == STREAM_INPUT__TRANS_STATE); int32_t retryCount = 0; - pTask->taskExecInfo.dispatchCount += 1; + pTask->execInfo.dispatch += 1; pTask->msgInfo.startTs = taosGetTimestampMs(); int32_t code = doBuildDispatchMsg(pTask, pBlock); @@ -624,7 +625,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } stDebug("s-task:%s failed to dispatch msg:%d to downstream, code:%s, output status:%d, retry cnt:%d", id, - pTask->taskExecInfo.dispatchCount, tstrerror(terrno), pTask->outputInfo.status, retryCount); + pTask->execInfo.dispatch, tstrerror(terrno), pTask->outputInfo.status, retryCount); // todo deal with only partially success dispatch case atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); @@ -636,11 +637,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); - stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d", pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); - streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); + streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); break; } } @@ -659,9 +659,9 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { // serialize if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; + req.downstreamTaskId = pTask->fixedDispatcher.taskId; pTask->notReadyTasks = 1; - doDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); + doDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedDispatcher.nodeId, &pTask->fixedDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); @@ -1061,7 +1061,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { const char* id = pTask->id.idStr; int32_t vgId = pTask->pMeta->vgId; - int32_t msgId = pTask->taskExecInfo.dispatchCount; + int32_t msgId = pTask->execInfo.dispatch; if ((!pTask->pMeta->leader) || (pTask->status.downstreamReady != 1)) { stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId); @@ -1165,7 +1165,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d", pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); - streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); + streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); } else { // this message has been sent successfully, let's try next one. pTask->msgInfo.retryCount = 0; handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 712b0fe610..a87bb00972 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -202,7 +202,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { while (!finished) { if (streamTaskShouldPause(&pTask->status)) { - double el = (taosGetTimestampMs() - pTask->taskExecInfo.step1Start) / 1000.0; + double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); break; } @@ -556,7 +556,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { // here only handle the data block sink operation if (type == STREAM_INPUT__DATA_BLOCK) { int32_t blockSize = streamQueueItemGetSize(pInput); - pTask->sinkRecorder.bytes += blockSize; + pTask->execInfo.sink.bytes += blockSize; stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize)); doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index b3838677ce..7bc9898fb0 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -13,16 +13,20 @@ * along with this program. If not, see . */ -#include #include "streamInt.h" #include "trpc.h" #include "ttimer.h" #include "wal.h" -typedef struct SStreamTaskRetryInfo { +typedef struct SLaunchHTaskInfo { SStreamMeta* pMeta; STaskId id; -} SStreamTaskRetryInfo; +} SLaunchHTaskInfo; + +typedef struct STaskRecheckInfo { + SStreamTask* pTask; + SStreamTaskCheckReq req; +} STaskRecheckInfo; static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); @@ -39,9 +43,10 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { ASSERT(pTask->status.downstreamReady == 0); pTask->status.downstreamReady = 1; - int64_t el = (taosGetTimestampMs() - pTask->taskExecInfo.init); - stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%"PRId64"ms, task status:%s", - pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus)); + pTask->execInfo.start = taosGetTimestampMs(); + int64_t el = (pTask->execInfo.start - pTask->execInfo.init); + stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", + pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus)); } int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { @@ -126,8 +131,8 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { // serialize streamProcessScanHistoryFinishRsp if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { req.reqId = tGenIdPI64(); - req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; - req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; + req.downstreamNodeId = pTask->fixedDispatcher.nodeId; + req.downstreamTaskId = pTask->fixedDispatcher.taskId; pTask->checkReqId = req.reqId; stDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64 @@ -135,7 +140,7 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.stage, req.reqId); - streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); + streamSendCheckMsg(pTask, &req, pTask->fixedDispatcher.nodeId, &pTask->fixedDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; @@ -154,7 +159,7 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { req.downstreamTaskId = pVgInfo->taskId; stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, stage:%" PRId64, pTask->id.idStr, pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, i, req.stage); - streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); + streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); @@ -168,8 +173,15 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { return 0; } -int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { - SStreamTaskCheckReq req = { +static STaskRecheckInfo* createRecheckInfo(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { + STaskRecheckInfo* pInfo = taosMemoryCalloc(1, sizeof(STaskRecheckInfo)); + if (pInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pInfo->pTask = pTask; + pInfo->req = (SStreamTaskCheckReq){ .reqId = pRsp->reqId, .streamId = pRsp->streamId, .upstreamTaskId = pRsp->upstreamTaskId, @@ -180,25 +192,41 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p .stage = pTask->pMeta->stage, }; + return pInfo; +} + +static void destroyRecheckInfo(STaskRecheckInfo* pInfo) { + if (pInfo != NULL) { + taosMemoryFree(pInfo); + } +} + +static void recheckDownstreamTasks(void* param, void* tmrId) { + STaskRecheckInfo* pInfo = param; + SStreamTask* pTask = pInfo->pTask; + + SStreamTaskCheckReq* pReq = &pInfo->req; + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr, - pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, req.stage); - streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); + pTask->info.nodeId, pReq->downstreamTaskId, pReq->downstreamNodeId, pReq->stage); + streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pTask->fixedDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - if (pVgInfo->taskId == req.downstreamTaskId) { + if (pVgInfo->taskId == pReq->downstreamTaskId) { stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr, - pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, req.stage); - streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet); + pTask->info.nodeId, pReq->downstreamTaskId, pReq->downstreamNodeId, pReq->stage); + streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pVgInfo->epSet); } } } - return 0; + destroyRecheckInfo(pInfo); + atomic_sub_fetch_8(&pTask->status.timerActive, 1); } int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage) { @@ -265,6 +293,11 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); const char* id = pTask->id.idStr; + if (streamTaskShouldStop(&pTask->status)) { + stDebug("s-task:%s should stop, do not do check downstream again", id); + return TSDB_CODE_SUCCESS; + } + if (pRsp->status == TASK_DOWNSTREAM_READY) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { bool found = false; @@ -293,7 +326,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } else { int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, - pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); + pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); } } else { ASSERT(pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH); @@ -305,18 +338,28 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } } else { // not ready, wait for 100ms and retry if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { - stError("s-task:%s downstream taskId:0x%x (vgId:%d) vnode-transfer/leader-change detected, roll-back needed not send check again", - id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); + stError( + "s-task:%s downstream taskId:0x%x (vgId:%d) vnode-transfer/leader-change detected, not send check again, " + "roll-back needed", + id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); } else if (pRsp->status == TASK_SELF_NEW_STAGE) { stError( - "s-task:%s vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, roll-back needed " - "and not send check again", - id, pRsp->oldStage, (int32_t) pTask->pMeta->stage); + "s-task:%s vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, not send check " + "again, roll-back needed", + id, pRsp->oldStage, (int32_t)pTask->pMeta->stage); } else { - stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, wait for 100ms and retry", id, + stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage); - taosMsleep(100); - streamRecheckDownstream(pTask, pRsp); + + STaskTimer* pTmr = pTask->pTimer; + STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); + + atomic_add_fetch_8(&pTask->status.timerActive, 1); + if (pTmr->checkTimer != NULL) { + taosTmrReset(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer, &pTmr->checkTimer); + } else { + pTmr->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer); + } } } @@ -547,8 +590,8 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) } static void tryLaunchHistoryTask(void* param, void* tmrId) { - SStreamTaskRetryInfo* pInfo = param; - SStreamMeta* pMeta = pInfo->pMeta; + SLaunchHTaskInfo* pInfo = param; + SStreamMeta* pMeta = pInfo->pMeta; stDebug("s-task:0x%x in timer to launch related history task", (int32_t) pInfo->id.taskId); @@ -582,7 +625,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { "destroyed, or should stop", pTask->id.idStr, pMeta->vgId, pStatus, (int32_t) pTask->historyTaskId.taskId); - taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer); + taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->pTimer->hTaskLaunchTimer); streamMetaReleaseTask(pMeta, pTask); return; } @@ -621,14 +664,14 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, pMeta->vgId, hTaskId); - SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo)); + SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo)); pInfo->id.taskId = pTask->id.taskId; pInfo->id.streamId = pTask->id.streamId; pInfo->pMeta = pTask->pMeta; - if (pTask->launchTaskTimer == NULL) { - pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer); - if (pTask->launchTaskTimer == NULL) { + if (pTask->pTimer->hTaskLaunchTimer == NULL) { + pTask->pTimer->hTaskLaunchTimer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer); + if (pTask->pTimer->hTaskLaunchTimer == NULL) { // todo failed to create timer taosMemoryFree(pInfo); } else { @@ -639,7 +682,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { } else { // timer exists ASSERT(pTask->status.timerActive == 1); stDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr); - taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer); + taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->pTimer->hTaskLaunchTimer); } // try again in 100ms diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 56f97c565a..30189ad185 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -129,9 +129,9 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1; } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->fixedDispatcher.taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->fixedDispatcher.nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->fixedDispatcher.epSet) < 0) return -1; } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; @@ -211,9 +211,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1; } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->fixedDispatcher.taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->fixedDispatcher.nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->fixedDispatcher.epSet) < 0) return -1; } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; @@ -289,20 +289,17 @@ static void freeUpstreamItem(void* p) { void tFreeStreamTask(SStreamTask* pTask) { int32_t taskId = pTask->id.taskId; - STaskExecStatisInfo* pStatis = &pTask->taskExecInfo; + STaskExecStatisInfo* pStatis = &pTask->execInfo; stDebug("start to free s-task:0x%x, %p, state:%p, status:%s", taskId, pTask, pTask->pState, streamGetTaskStatusStr(pTask->status.taskStatus)); - stDebug("s-task:0x%x exec info: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64 + stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64 ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64 - " nextProcessVer:%" PRId64, + " nextProcessVer:%" PRId64", checkpointCount:%d", taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs, - pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, pTask->chkInfo.nextProcessVer); - - if (pStatis->created == 0 || pStatis->init == 0 || pStatis->start == 0) { - int32_t k = 1; - } + pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, pTask->chkInfo.nextProcessVer, + pStatis->checkpoint); // remove the ref by timer while (pTask->status.timerActive > 0) { @@ -315,9 +312,22 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->schedInfo.pTimer = NULL; } - if (pTask->launchTaskTimer != NULL) { - taosTmrStop(pTask->launchTaskTimer); - pTask->launchTaskTimer = NULL; + if (pTask->pTimer != NULL) { + if (pTask->pTimer->hTaskLaunchTimer != NULL) { + taosTmrStop(pTask->pTimer->hTaskLaunchTimer); + pTask->pTimer->hTaskLaunchTimer = NULL; + } + + if (pTask->pTimer->dispatchTimer != NULL) { + taosTmrStop(pTask->pTimer->dispatchTimer); + pTask->pTimer->dispatchTimer = NULL; + } + + if (pTask->pTimer->checkTimer != NULL) { + taosTmrStop(pTask->pTimer->checkTimer); + pTask->pTimer->checkTimer = NULL; + } + taosMemoryFreeClear(pTask->pTimer); } int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); @@ -402,7 +412,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i return TSDB_CODE_OUT_OF_MEMORY; } - pTask->taskExecInfo.created = taosGetTimestampMs(); + pTask->execInfo.created = taosGetTimestampMs(); pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMeta = pMeta; @@ -419,6 +429,12 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i return TSDB_CODE_OUT_OF_MEMORY; } + pTask->pTimer = taosMemoryCalloc(1, sizeof(STaskTimer)); + if (pTask->pTimer == NULL) { + stError("s-task:%s failed to prepare the timer, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; + } + streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50); TdThreadMutexAttr attr = {0}; @@ -501,7 +517,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS } void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) { - STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher; + STaskDispatcherFixed* pDispatcher = &pTask->fixedDispatcher; pDispatcher->taskId = pDownstreamTask->id.taskId; pDispatcher->nodeId = pDownstreamTask->info.nodeId; pDispatcher->epSet = pDownstreamTask->info.epSet; @@ -530,7 +546,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE } } } else if (type == TASK_OUTPUT__FIXED_DISPATCH) { - STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher; + STaskDispatcherFixed* pDispatcher = &pTask->fixedDispatcher; if (pDispatcher->nodeId == nodeId) { epsetAssign(&pDispatcher->epSet, pEpSet); stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId, @@ -598,7 +614,7 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { } int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { - STaskExecStatisInfo* p = &pTask->taskExecInfo; + STaskExecStatisInfo* p = &pTask->execInfo; int32_t numOfNodes = taosArrayGetSize(pNodeList); int64_t prevTs = p->latestUpdateTs;