From 1f6cef26e8f09ea0dc7a16777acec7ac54f85608 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 17 Aug 2024 01:44:58 +0800 Subject: [PATCH 1/4] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 27 +++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 9938c073ff..4729b912a7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -828,16 +828,19 @@ static int32_t findNextValidRow(SLDataIter *pIter, const char *idStr) { return code; } -bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { +int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool* hasNext) { int32_t step = pIter->backward ? -1 : 1; int32_t code = 0; int32_t iBlockL = pIter->iSttBlk; SBlockData *pBlockData = NULL; + int32_t lino = 0; + + *hasNext = false; terrno = 0; // no qualified last file block in current file, no need to fetch row if (pIter->pSttBlk == NULL) { - return false; + return code; } code = loadLastBlock(pIter, idStr, &pBlockData); @@ -850,9 +853,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { while (1) { bool skipBlock = false; code = findNextValidRow(pIter, idStr); - if (code) { - goto _exit; - } + TSDB_CHECK_CODE(code, lino, _exit); if (pIter->pBlockLoadInfo->checkRemainingRow) { skipBlock = true; @@ -902,7 +903,8 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow); _exit: - return (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL); + *hasNext = (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL); + return code; } // SMergeTree ================================================= @@ -1005,7 +1007,12 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF goto _end; } - bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr); + bool hasVal = NULL; + code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal); + if (code) { + goto _end; + } + if (hasVal) { tMergeTreeAddIter(pMTree, pIter); @@ -1018,7 +1025,6 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF pSttDataInfo->numOfRows += numOfRows; } } else { - TAOS_CHECK_GOTO(terrno, NULL, _end); if (!pMTree->ignoreEarlierTs) { pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs; } @@ -1100,8 +1106,9 @@ bool tMergeTreeNext(SMergeTree *pMTree) { if (pMTree->pIter) { SLDataIter *pIter = pMTree->pIter; - bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr); - if (!hasVal) { + bool hasVal = false; + int32_t code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal); + if (!hasVal || (code != 0)) { pMTree->pIter = NULL; } From f4bac239064f146a41ced20982ab93b893f6a30a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 17 Aug 2024 01:47:30 +0800 Subject: [PATCH 2/4] refactor: do some internal refactor --- source/dnode/mnode/impl/src/mndStreamHb.c | 46 ++++++++++++++++------ source/dnode/vnode/src/tqCommon/tqCommon.c | 21 ---------- source/libs/stream/src/streamCheckpoint.c | 2 +- source/libs/stream/src/streamHb.c | 2 +- source/libs/stream/src/streamMeta.c | 2 +- source/libs/stream/src/streamTask.c | 2 + 6 files changed, 39 insertions(+), 36 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 59f07ce977..f515e9565d 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -26,12 +26,13 @@ static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode); static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList); static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId); static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage); -static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo); +static void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info); static bool validateHbMsg(const SArray *pNodeList, int32_t vgId); static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks); static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId); +static void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks); void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); @@ -52,7 +53,7 @@ void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { } } -void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) { +void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) { int32_t num = taosArrayGetSize(pList); for (int32_t i = 0; i < num; ++i) { SFailedCheckpointInfo *p = taosArrayGet(pList, i); @@ -401,13 +402,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); if (pTaskEntry == NULL) { - mError("s-task:0x%" PRIx64 " not found in mnode task list, added into orphan task list", p->id.taskId); - - SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId}; - void* px = taosArrayPush(pOrphanTasks, &oTask); - if (px == NULL) { - mError("failed to put task into list, taskId:0x%" PRIx64, p->id.taskId); - } + checkforOrphanTask(pMnode, p, pOrphanTasks); continue; } @@ -423,7 +418,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SStreamObj *pStream = NULL; code = mndGetStreamObj(pMnode, p->id.streamId, &pStream); if (code) { - mError("stream obj not exist, failed to handle consensus checkpoint-info req, code:%s", tstrerror(code)); + mError("stream:0x%" PRIx64 " not exist, failed to handle consensus checkpoint-info req for task:0x%x, code:%s", + p->id.streamId, (int32_t)p->id.taskId, tstrerror(code)); continue; } @@ -434,7 +430,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (code == 0) { mndAddConsensusTasks(pInfo, &cp); } else { - mError("failed to get consensus checkpoint-info"); + mError("failed to get consensus checkpoint-info for stream:0x%" PRIx64, p->id.streamId); } mndReleaseStream(pMnode, pStream); @@ -454,7 +450,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SFailedCheckpointInfo info = { .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId}; - addIntoCheckpointList(pFailedChkpt, &info); + addIntoFailedChkptList(pFailedChkpt, &info); // remove failed trans from pChkptStreams code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId); @@ -516,6 +512,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (pMnode != NULL) { // make sure that the unit test case can work code = mndStreamSendUpdateChkptInfoMsg(pMnode); + if (code) { + mError("failed to send update checkpointInfo msg, code:%s, try next time", tstrerror(code)); + } } streamMutexUnlock(&execInfo.lock); @@ -554,3 +553,26 @@ void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_ tmsgSendRsp(&rsp); pRpcInfo->handle = NULL; // disable auto rsp } + +void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks) { + SStreamObj *pStream = NULL; + + int32_t code = mndGetStreamObj(pMnode, p->id.streamId, &pStream); + if (code) { + mError("stream:0x%" PRIx64 " not exists, s-task:0x%" PRIx64 " not found in task list, add into orphan list", + p->id.streamId, p->id.taskId); + + SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId}; + void *px = taosArrayPush(pOrphanTasks, &oTask); + if (px == NULL) { + mError("failed to put task into orphan list, taskId:0x%" PRIx64", code:%s", p->id.taskId, tstrerror(terrno)); + } + } else { + if (pStream != NULL) { + mndReleaseStream(pMnode, pStream); + } + + mError("s-task:0x%" PRIx64 " not found in task list but exists in mnode meta, data inconsistent, not drop yet", + p->id.taskId); + } +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7037eb5199..422ca16e50 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -131,27 +131,6 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK); } -int32_t tqStreamTaskRestoreCheckpoint(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { - int32_t vgId = pMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - if (numOfTasks == 0) { - tqDebug("vgId:%d no stream tasks existed to run", vgId); - return 0; - } - - tqDebug("vgId:%d restore task:0x%" PRIx64 "-0x%x checkpointId", vgId, streamId, taskId); - SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); - if (pTask == NULL) { - tqError("failed to acquire task:0x%x when trying to restore checkpointId", taskId); - return TSDB_CODE_STREAM_TASK_NOT_EXIST; - } - - code = streamTaskSendRestoreChkptMsg(pTask); - streamMetaReleaseTask(pMeta, pTask); - return code; -} - // this is to process request from transaction, always return true. int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { int32_t vgId = pMeta->vgId; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 9be8f5ffaa..0ef7c2312a 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1354,7 +1354,7 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { return code; } -int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { +int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { const char* id = pTask->id.idStr; streamMutexLock(&pTask->lock); diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index d2c5cb05b7..73392fade0 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -200,7 +200,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { if ((*pTask)->status.requireConsensusChkptId) { entry.checkpointInfo.consensusChkptId = 1; (*pTask)->status.requireConsensusChkptId = false; - stDebug("s-task:%s vgId:%d set the require consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId); + stDebug("s-task:%s vgId:%d set requiring consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId); } if ((*pTask)->exec.pWalReader != NULL) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 07c67ba007..5bec930455 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1424,7 +1424,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { } // negotiate the consensus checkpoint id for current task - code = streamTaskSendRestoreChkptMsg(pTask); + code = streamTaskSendNegotiateChkptIdMsg(pTask); // this task may has no checkpoint, but others tasks may generate checkpoint already? streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f190673430..5628095973 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1182,6 +1182,8 @@ const char* streamTaskGetExecType(int32_t type) { return "resume-task-from-idle"; case STREAM_EXEC_T_ADD_FAILED_TASK: return "record-start-failed-task"; + case 0: + return "exec-all-tasks"; default: return "invalid-exec-type"; } From 0d3d0730d4cf697abdb90999e30330dc4f848eba Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 17 Aug 2024 15:30:10 +0800 Subject: [PATCH 3/4] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 5 +- source/libs/stream/src/streamMeta.c | 402 +------------------- source/libs/stream/src/streamStartTask.c | 444 +++++++++++++++++++++++ 3 files changed, 452 insertions(+), 399 deletions(-) create mode 100644 source/libs/stream/src/streamStartTask.c diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5e7f2bf0a6..8d6184cab6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -750,6 +750,9 @@ void streamMetaStartHb(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int64_t endTs, bool ready); +int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo); +void streamMetaClearStartInfo(STaskStartInfo* pStartInfo); + int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta); int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs); @@ -770,7 +773,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); bool streamMetaAllTasksReady(const SStreamMeta* pMeta); -int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask); +int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask); // timer int32_t streamTimerGetInstance(tmr_h* pTmr); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5bec930455..a9976760b6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -37,12 +37,6 @@ typedef struct { SHashObj* pTable; } SMetaRefMgt; -typedef struct STaskInitTs { - int64_t start; - int64_t end; - bool success; -} STaskInitTs; - SMetaRefMgt gMetaRefMgt; int32_t metaRefMgtInit(); @@ -405,15 +399,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, goto _err; } - pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK); - if (pMeta->startInfo.pReadyTaskSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK); - if (pMeta->startInfo.pFailedTaskSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = streamMetaInitStartInfo(&pMeta->startInfo); + if (code) { goto _err; } @@ -609,8 +596,8 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->pTasksMap); taosHashCleanup(pMeta->pTaskDbUnique); taosHashCleanup(pMeta->updateInfo.pTasks); - taosHashCleanup(pMeta->startInfo.pReadyTaskSet); - taosHashCleanup(pMeta->startInfo.pFailedTaskSet); + + streamMetaClearStartInfo(&pMeta->startInfo); destroyMetaHbInfo(pMeta->pHbInfo); pMeta->pHbInfo = NULL; @@ -1191,18 +1178,6 @@ void streamMetaStartHb(SStreamMeta* pMeta) { streamMetaHbToMnode(pRid, NULL); } -void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) { - taosHashClear(pStartInfo->pReadyTaskSet); - taosHashClear(pStartInfo->pFailedTaskSet); - pStartInfo->tasksWillRestart = 0; - pStartInfo->readyTs = 0; - pStartInfo->elapsedTime = 0; - - // reset the sentinel flag value to be 0 - pStartInfo->startAllTasks = 0; - stDebug("vgId:%d clear start-all-task info", vgId); -} - void streamMetaRLock(SStreamMeta* pMeta) { // stTrace("vgId:%d meta-rlock", pMeta->vgId); (void)taosThreadRwlockRdlock(&pMeta->lock); @@ -1302,185 +1277,6 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) } } -static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) { - streamMetaWLock(pMeta); - - if (pMeta->closeFlag) { - streamMetaWUnLock(pMeta); - stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId); - return TSDB_CODE_FAILED; - } - - *pList = taosArrayDup(pMeta->pTaskList, NULL); - if (*pList == NULL) { - return terrno; - } - - taosHashClear(pMeta->startInfo.pReadyTaskSet); - taosHashClear(pMeta->startInfo.pFailedTaskSet); - pMeta->startInfo.startTs = now; - - int32_t code = streamMetaResetTaskStatus(pMeta); - streamMetaWUnLock(pMeta); - - return code; -} - -// restore the checkpoint id by negotiating the latest consensus checkpoint id -int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t vgId = pMeta->vgId; - int64_t now = taosGetTimestampMs(); - - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now); - - if (numOfTasks == 0) { - stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId); - return TSDB_CODE_SUCCESS; - } - - SArray* pTaskList = NULL; - code = prepareBeforeStartTasks(pMeta, &pTaskList, now); - if (code != TSDB_CODE_SUCCESS) { - ASSERT(pTaskList == NULL); - return TSDB_CODE_SUCCESS; - } - - // broadcast the check downstream tasks msg only for tasks with related fill-history tasks. - numOfTasks = taosArrayGetSize(pTaskList); - - // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without - // initialization, when the operation of check downstream tasks status is executed far quickly. - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = NULL; - code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); - if (pTask == NULL) { - stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); - (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); - continue; - } - - if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) { - code = pMeta->expandTaskFn(pTask); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId); - streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); - } - } - - streamMetaReleaseTask(pMeta, pTask); - } - - // Tasks, with related fill-history task or without any checkpoint yet, can be started directly here. - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - - SStreamTask* pTask = NULL; - code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); - if (pTask == NULL) { - stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); - (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); - continue; - } - - STaskExecStatisInfo* pInfo = &pTask->execInfo; - - // fill-history task can only be launched by related stream tasks. - if (pTask->info.fillHistory == 1) { - stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr); - streamMetaReleaseTask(pMeta, pTask); - continue; - } - - // ready now, start the related fill-history task - if (pTask->status.downstreamReady == 1) { - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", - pTask->id.idStr); - (void)streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task? - } - - (void)streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, - true); - streamMetaReleaseTask(pMeta, pTask); - continue; - } - - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); - if (ret != TSDB_CODE_SUCCESS) { - stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); - code = ret; - - if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); - } - } - - streamMetaReleaseTask(pMeta, pTask); - continue; - } - - // negotiate the consensus checkpoint id for current task - code = streamTaskSendNegotiateChkptIdMsg(pTask); - - // this task may has no checkpoint, but others tasks may generate checkpoint already? - streamMetaReleaseTask(pMeta, pTask); - } - - // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without - // initialization, when the operation of check downstream tasks status is executed far quickly. - stInfo("vgId:%d start all task(s) completed", pMeta->vgId); - taosArrayDestroy(pTaskList); - return code; -} - -int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { - streamMetaRLock(pMeta); - - int32_t num = taosArrayGetSize(pMeta->pTaskList); - stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num); - if (num == 0) { - stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num); - streamMetaRUnLock(pMeta); - return TSDB_CODE_SUCCESS; - } - - int64_t st = taosGetTimestampMs(); - - // send hb msg to mnode before closing all tasks. - SArray* pTaskList = NULL; - int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - int32_t numOfTasks = taosArrayGetSize(pTaskList); - - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = NULL; - - code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); - if (code != TSDB_CODE_SUCCESS) { - continue; - } - - (void)streamTaskStop(pTask); - streamMetaReleaseTask(pMeta, pTask); - } - - taosArrayDestroy(pTaskList); - - double el = (taosGetTimestampMs() - st) / 1000.0; - stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el); - - streamMetaRUnLock(pMeta); - return 0; -} - bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { int32_t num = taosArrayGetSize(pMeta->pTaskList); for (int32_t i = 0; i < num; ++i) { @@ -1499,196 +1295,6 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { return true; } -int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { - int32_t code = 0; - int32_t vgId = pMeta->vgId; - SStreamTask* pTask = NULL; - bool continueExec = true; - - stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId); - - code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); - if (pTask == NULL) { - stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId); - (void)streamMetaAddFailedTask(pMeta, streamId, taskId); - return TSDB_CODE_STREAM_TASK_IVLD_STATUS; - } - - // fill-history task can only be launched by related stream tasks. - STaskExecStatisInfo* pInfo = &pTask->execInfo; - if (pTask->info.fillHistory == 1) { - stError("s-task:0x%x vgId:%d fill-histroy task, not start here", taskId, vgId); - streamMetaReleaseTask(pMeta, pTask); - return TSDB_CODE_SUCCESS; - } - - // the start all tasks procedure may happen to start the newly deployed stream task, and results in the - // concurrently start this task by two threads. - streamMutexLock(&pTask->lock); - SStreamTaskState status = streamTaskGetStatus(pTask); - if (status.state != TASK_STATUS__UNINIT) { - stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name); - continueExec = false; - } else { - continueExec = true; - } - streamMutexUnlock(&pTask->lock); - - if (!continueExec) { - streamMetaReleaseTask(pMeta, pTask); - return TSDB_CODE_STREAM_TASK_IVLD_STATUS; - } - - ASSERT(pTask->status.downstreamReady == 0); - - // avoid initialization and destroy running concurrently. - streamMutexLock(&pTask->lock); - if (pTask->pBackend == NULL) { - code = pMeta->expandTaskFn(pTask); - streamMutexUnlock(&pTask->lock); - - if (code != TSDB_CODE_SUCCESS) { - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); - } - } else { - streamMutexUnlock(&pTask->lock); - } - - // concurrently start task may cause the later started task be failed, and also failed to added into meta result. - if (code == TSDB_CODE_SUCCESS) { - code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT, - tstrerror(code)); - - // do no added into result hashmap if it is failed due to concurrently starting of this stream task. - if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); - } - } - } - - streamMetaReleaseTask(pMeta, pTask); - return code; -} - -static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { - int32_t vgId = pMeta->vgId; - void* pIter = NULL; - size_t keyLen = 0; - - stInfo("vgId:%d %d tasks check-downstream completed, %s", vgId, taosHashGetSize(pTaskSet), - succ ? "success" : "failed"); - - while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) { - STaskInitTs* pInfo = pIter; - void* key = taosHashGetKey(pIter, &keyLen); - - SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId)); - if (pTask1 == NULL) { - stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed"); - } else { - stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr, - (*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed"); - } - } -} - -// check all existed tasks are received rsp -static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) { - for (int32_t i = 0; i < numOfTotal; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); - if (pTaskId == NULL) { - continue; - } - - STaskId idx = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; - void* px = taosHashGet(pStartInfo->pReadyTaskSet, &idx, sizeof(idx)); - if (px == NULL) { - px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx)); - if (px == NULL) { - return false; - } - } - } - - return true; -} - -int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, - int64_t endTs, bool ready) { - STaskStartInfo* pStartInfo = &pMeta->startInfo; - STaskId id = {.streamId = streamId, .taskId = taskId}; - int32_t vgId = pMeta->vgId; - bool allRsp = true; - - streamMetaWLock(pMeta); - SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (p == NULL) { // task does not exists in current vnode, not record the complete info - stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId); - streamMetaWUnLock(pMeta); - return 0; - } - - // clear the send consensus-checkpointId flag - streamMutexLock(&(*p)->lock); - (*p)->status.sendConsensusChkptId = false; - streamMutexUnlock(&(*p)->lock); - - if (pStartInfo->startAllTasks != 1) { - int64_t el = endTs - startTs; - stDebug( - "vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed " - "time:%" PRId64 "ms", - vgId, taskId, ready, el); - streamMetaWUnLock(pMeta); - return 0; - } - - STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; - SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; - int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); - if (code) { - if (code == TSDB_CODE_DUP_KEY) { - stError("vgId:%d record start task result failed, s-task:0x%" PRIx64 - " already exist start results in meta start task result hashmap", - vgId, id.taskId); - } else { - stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId); - } - streamMetaWUnLock(pMeta); - return code; - } - - int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); - int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); - - allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal); - if (allRsp) { - pStartInfo->readyTs = taosGetTimestampMs(); - pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; - - stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 - ", readyTs:%" PRId64 " total elapsed time:%.2fs", - vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, - pStartInfo->elapsedTime / 1000.0); - - // print the initialization elapsed time and info - displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); - displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); - streamMetaResetStartInfo(pStartInfo, vgId); - streamMetaWUnLock(pMeta); - - code = pStartInfo->completeFn(pMeta); - } else { - streamMetaWUnLock(pMeta); - stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready, - numOfRecv, numOfTotal); - } - - return code; -} - int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) { int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c new file mode 100644 index 0000000000..3cf06fd04a --- /dev/null +++ b/source/libs/stream/src/streamStartTask.c @@ -0,0 +1,444 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executor.h" +#include "streamBackendRocksdb.h" +#include "streamInt.h" +#include "tmisce.h" +#include "tref.h" +#include "tsched.h" +#include "tstream.h" +#include "ttimer.h" +#include "wal.h" + +typedef struct STaskInitTs { + int64_t start; + int64_t end; + bool success; +} STaskInitTs; + +static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now); +static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal); +static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ); + +// restore the checkpoint id by negotiating the latest consensus checkpoint id +int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t vgId = pMeta->vgId; + int64_t now = taosGetTimestampMs(); + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now); + + if (numOfTasks == 0) { + stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId); + return TSDB_CODE_SUCCESS; + } + + SArray* pTaskList = NULL; + code = prepareBeforeStartTasks(pMeta, &pTaskList, now); + if (code != TSDB_CODE_SUCCESS) { + ASSERT(pTaskList == NULL); + return TSDB_CODE_SUCCESS; + } + + // broadcast the check downstream tasks msg only for tasks with related fill-history tasks. + numOfTasks = taosArrayGetSize(pTaskList); + + // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without + // initialization, when the operation of check downstream tasks status is executed far quickly. + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = NULL; + code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); + if (pTask == NULL) { + stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); + (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); + continue; + } + + if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) { + code = pMeta->expandTaskFn(pTask); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId); + streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); + } + } + + streamMetaReleaseTask(pMeta, pTask); + } + + // Tasks, with related fill-history task or without any checkpoint yet, can be started directly here. + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + + SStreamTask* pTask = NULL; + code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); + if (pTask == NULL) { + stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); + (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); + continue; + } + + STaskExecStatisInfo* pInfo = &pTask->execInfo; + + // fill-history task can only be launched by related stream tasks. + if (pTask->info.fillHistory == 1) { + stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr); + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + // ready now, start the related fill-history task + if (pTask->status.downstreamReady == 1) { + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", + pTask->id.idStr); + (void)streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task? + } + + (void)streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, + true); + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + if (ret != TSDB_CODE_SUCCESS) { + stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); + code = ret; + + if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } + } + + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + // negotiate the consensus checkpoint id for current task + code = streamTaskSendNegotiateChkptIdMsg(pTask); + + // this task may has no checkpoint, but others tasks may generate checkpoint already? + streamMetaReleaseTask(pMeta, pTask); + } + + // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without + // initialization, when the operation of check downstream tasks status is executed far quickly. + stInfo("vgId:%d start all task(s) completed", pMeta->vgId); + taosArrayDestroy(pTaskList); + return code; +} + +int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) { + streamMetaWLock(pMeta); + + if (pMeta->closeFlag) { + streamMetaWUnLock(pMeta); + stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId); + return TSDB_CODE_FAILED; + } + + *pList = taosArrayDup(pMeta->pTaskList, NULL); + if (*pList == NULL) { + return terrno; + } + + taosHashClear(pMeta->startInfo.pReadyTaskSet); + taosHashClear(pMeta->startInfo.pFailedTaskSet); + pMeta->startInfo.startTs = now; + + int32_t code = streamMetaResetTaskStatus(pMeta); + streamMetaWUnLock(pMeta); + + return code; +} + +void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) { + taosHashClear(pStartInfo->pReadyTaskSet); + taosHashClear(pStartInfo->pFailedTaskSet); + pStartInfo->tasksWillRestart = 0; + pStartInfo->readyTs = 0; + pStartInfo->elapsedTime = 0; + + // reset the sentinel flag value to be 0 + pStartInfo->startAllTasks = 0; + stDebug("vgId:%d clear start-all-task info", vgId); +} + +int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, + int64_t endTs, bool ready) { + STaskStartInfo* pStartInfo = &pMeta->startInfo; + STaskId id = {.streamId = streamId, .taskId = taskId}; + int32_t vgId = pMeta->vgId; + bool allRsp = true; + + streamMetaWLock(pMeta); + SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (p == NULL) { // task does not exists in current vnode, not record the complete info + stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId); + streamMetaWUnLock(pMeta); + return 0; + } + + // clear the send consensus-checkpointId flag + streamMutexLock(&(*p)->lock); + (*p)->status.sendConsensusChkptId = false; + streamMutexUnlock(&(*p)->lock); + + if (pStartInfo->startAllTasks != 1) { + int64_t el = endTs - startTs; + stDebug( + "vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed " + "time:%" PRId64 "ms", + vgId, taskId, ready, el); + streamMetaWUnLock(pMeta); + return 0; + } + + STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; + SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; + int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); + if (code) { + if (code == TSDB_CODE_DUP_KEY) { + stError("vgId:%d record start task result failed, s-task:0x%" PRIx64 + " already exist start results in meta start task result hashmap", + vgId, id.taskId); + } else { + stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId); + } + streamMetaWUnLock(pMeta); + return code; + } + + int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); + int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); + + allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal); + if (allRsp) { + pStartInfo->readyTs = taosGetTimestampMs(); + pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; + + stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 + ", readyTs:%" PRId64 " total elapsed time:%.2fs", + vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, + pStartInfo->elapsedTime / 1000.0); + + // print the initialization elapsed time and info + displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); + displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); + streamMetaResetStartInfo(pStartInfo, vgId); + streamMetaWUnLock(pMeta); + + code = pStartInfo->completeFn(pMeta); + } else { + streamMetaWUnLock(pMeta); + stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready, + numOfRecv, numOfTotal); + } + + return code; +} + +// check all existed tasks are received rsp +bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) { + for (int32_t i = 0; i < numOfTotal; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + if (pTaskId == NULL) { + continue; + } + + STaskId idx = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; + void* px = taosHashGet(pStartInfo->pReadyTaskSet, &idx, sizeof(idx)); + if (px == NULL) { + px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx)); + if (px == NULL) { + return false; + } + } + } + + return true; +} + +void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { + int32_t vgId = pMeta->vgId; + void* pIter = NULL; + size_t keyLen = 0; + + stInfo("vgId:%d %d tasks check-downstream completed, %s", vgId, taosHashGetSize(pTaskSet), + succ ? "success" : "failed"); + + while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) { + STaskInitTs* pInfo = pIter; + void* key = taosHashGetKey(pIter, &keyLen); + + SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId)); + if (pTask1 == NULL) { + stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed"); + } else { + stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr, + (*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed"); + } + } +} + +int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo) { + _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); + + pStartInfo->pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK); + if (pStartInfo->pReadyTaskSet == NULL) { + return terrno; + } + + pStartInfo->pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK); + if (pStartInfo->pFailedTaskSet == NULL) { + return terrno; + } + + return 0; +} + +void streamMetaClearStartInfo(STaskStartInfo* pStartInfo) { + taosHashClear(pStartInfo->pReadyTaskSet); + taosHashClear(pStartInfo->pFailedTaskSet); + pStartInfo->readyTs = 0; + pStartInfo->elapsedTime = 0; + pStartInfo->startTs = 0; + pStartInfo->startAllTasks = 0; + pStartInfo->tasksWillRestart = 0; + pStartInfo->restartCount = 0; +} + +int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { + int32_t code = 0; + int32_t vgId = pMeta->vgId; + SStreamTask* pTask = NULL; + bool continueExec = true; + + stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId); + + code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); + if (pTask == NULL) { + stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId); + (void)streamMetaAddFailedTask(pMeta, streamId, taskId); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + + // fill-history task can only be launched by related stream tasks. + STaskExecStatisInfo* pInfo = &pTask->execInfo; + if (pTask->info.fillHistory == 1) { + stError("s-task:0x%x vgId:%d fill-histroy task, not start here", taskId, vgId); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; + } + + // the start all tasks procedure may happen to start the newly deployed stream task, and results in the + // concurrently start this task by two threads. + streamMutexLock(&pTask->lock); + SStreamTaskState status = streamTaskGetStatus(pTask); + if (status.state != TASK_STATUS__UNINIT) { + stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name); + continueExec = false; + } else { + continueExec = true; + } + streamMutexUnlock(&pTask->lock); + + if (!continueExec) { + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + + ASSERT(pTask->status.downstreamReady == 0); + + // avoid initialization and destroy running concurrently. + streamMutexLock(&pTask->lock); + if (pTask->pBackend == NULL) { + code = pMeta->expandTaskFn(pTask); + streamMutexUnlock(&pTask->lock); + + if (code != TSDB_CODE_SUCCESS) { + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } + } else { + streamMutexUnlock(&pTask->lock); + } + + // concurrently start task may cause the later started task be failed, and also failed to added into meta result. + if (code == TSDB_CODE_SUCCESS) { + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT, + tstrerror(code)); + + // do no added into result hashmap if it is failed due to concurrently starting of this stream task. + if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } + } + } + + streamMetaReleaseTask(pMeta, pTask); + return code; +} + +int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { + streamMetaRLock(pMeta); + + int32_t num = taosArrayGetSize(pMeta->pTaskList); + stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num); + if (num == 0) { + stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num); + streamMetaRUnLock(pMeta); + return TSDB_CODE_SUCCESS; + } + + int64_t st = taosGetTimestampMs(); + + // send hb msg to mnode before closing all tasks. + SArray* pTaskList = NULL; + int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + int32_t numOfTasks = taosArrayGetSize(pTaskList); + + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = NULL; + + code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); + if (code != TSDB_CODE_SUCCESS) { + continue; + } + + (void)streamTaskStop(pTask); + streamMetaReleaseTask(pMeta, pTask); + } + + taosArrayDestroy(pTaskList); + + double el = (taosGetTimestampMs() - st) / 1000.0; + stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el); + + streamMetaRUnLock(pMeta); + return 0; +} + + From 89eaf01621163256fbfa551317808772c123ecf2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 17 Aug 2024 23:23:18 +0800 Subject: [PATCH 4/4] fix(stream):fix memory leak. --- source/libs/stream/src/streamStartTask.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 3cf06fd04a..99f4e84951 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -314,8 +314,8 @@ int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo) { } void streamMetaClearStartInfo(STaskStartInfo* pStartInfo) { - taosHashClear(pStartInfo->pReadyTaskSet); - taosHashClear(pStartInfo->pFailedTaskSet); + taosHashCleanup(pStartInfo->pReadyTaskSet); + taosHashCleanup(pStartInfo->pFailedTaskSet); pStartInfo->readyTs = 0; pStartInfo->elapsedTime = 0; pStartInfo->startTs = 0;