From f4bac239064f146a41ced20982ab93b893f6a30a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 17 Aug 2024 01:47:30 +0800 Subject: [PATCH] refactor: do some internal refactor --- source/dnode/mnode/impl/src/mndStreamHb.c | 46 ++++++++++++++++------ source/dnode/vnode/src/tqCommon/tqCommon.c | 21 ---------- source/libs/stream/src/streamCheckpoint.c | 2 +- source/libs/stream/src/streamHb.c | 2 +- source/libs/stream/src/streamMeta.c | 2 +- source/libs/stream/src/streamTask.c | 2 + 6 files changed, 39 insertions(+), 36 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 59f07ce977..f515e9565d 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -26,12 +26,13 @@ static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode); static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList); static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId); static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage); -static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo); +static void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info); static bool validateHbMsg(const SArray *pNodeList, int32_t vgId); static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks); static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId); +static void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks); void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); @@ -52,7 +53,7 @@ void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { } } -void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) { +void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) { int32_t num = taosArrayGetSize(pList); for (int32_t i = 0; i < num; ++i) { SFailedCheckpointInfo *p = taosArrayGet(pList, i); @@ -401,13 +402,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); if (pTaskEntry == NULL) { - mError("s-task:0x%" PRIx64 " not found in mnode task list, added into orphan task list", p->id.taskId); - - SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId}; - void* px = taosArrayPush(pOrphanTasks, &oTask); - if (px == NULL) { - mError("failed to put task into list, taskId:0x%" PRIx64, p->id.taskId); - } + checkforOrphanTask(pMnode, p, pOrphanTasks); continue; } @@ -423,7 +418,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SStreamObj *pStream = NULL; code = mndGetStreamObj(pMnode, p->id.streamId, &pStream); if (code) { - mError("stream obj not exist, failed to handle consensus checkpoint-info req, code:%s", tstrerror(code)); + mError("stream:0x%" PRIx64 " not exist, failed to handle consensus checkpoint-info req for task:0x%x, code:%s", + p->id.streamId, (int32_t)p->id.taskId, tstrerror(code)); continue; } @@ -434,7 +430,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (code == 0) { mndAddConsensusTasks(pInfo, &cp); } else { - mError("failed to get consensus checkpoint-info"); + mError("failed to get consensus checkpoint-info for stream:0x%" PRIx64, p->id.streamId); } mndReleaseStream(pMnode, pStream); @@ -454,7 +450,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SFailedCheckpointInfo info = { .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId}; - addIntoCheckpointList(pFailedChkpt, &info); + addIntoFailedChkptList(pFailedChkpt, &info); // remove failed trans from pChkptStreams code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId); @@ -516,6 +512,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (pMnode != NULL) { // make sure that the unit test case can work code = mndStreamSendUpdateChkptInfoMsg(pMnode); + if (code) { + mError("failed to send update checkpointInfo msg, code:%s, try next time", tstrerror(code)); + } } streamMutexUnlock(&execInfo.lock); @@ -554,3 +553,26 @@ void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_ tmsgSendRsp(&rsp); pRpcInfo->handle = NULL; // disable auto rsp } + +void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks) { + SStreamObj *pStream = NULL; + + int32_t code = mndGetStreamObj(pMnode, p->id.streamId, &pStream); + if (code) { + mError("stream:0x%" PRIx64 " not exists, s-task:0x%" PRIx64 " not found in task list, add into orphan list", + p->id.streamId, p->id.taskId); + + SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId}; + void *px = taosArrayPush(pOrphanTasks, &oTask); + if (px == NULL) { + mError("failed to put task into orphan list, taskId:0x%" PRIx64", code:%s", p->id.taskId, tstrerror(terrno)); + } + } else { + if (pStream != NULL) { + mndReleaseStream(pMnode, pStream); + } + + mError("s-task:0x%" PRIx64 " not found in task list but exists in mnode meta, data inconsistent, not drop yet", + p->id.taskId); + } +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7037eb5199..422ca16e50 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -131,27 +131,6 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK); } -int32_t tqStreamTaskRestoreCheckpoint(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { - int32_t vgId = pMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - if (numOfTasks == 0) { - tqDebug("vgId:%d no stream tasks existed to run", vgId); - return 0; - } - - tqDebug("vgId:%d restore task:0x%" PRIx64 "-0x%x checkpointId", vgId, streamId, taskId); - SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); - if (pTask == NULL) { - tqError("failed to acquire task:0x%x when trying to restore checkpointId", taskId); - return TSDB_CODE_STREAM_TASK_NOT_EXIST; - } - - code = streamTaskSendRestoreChkptMsg(pTask); - streamMetaReleaseTask(pMeta, pTask); - return code; -} - // this is to process request from transaction, always return true. int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { int32_t vgId = pMeta->vgId; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 9be8f5ffaa..0ef7c2312a 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1354,7 +1354,7 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { return code; } -int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { +int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { const char* id = pTask->id.idStr; streamMutexLock(&pTask->lock); diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index d2c5cb05b7..73392fade0 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -200,7 +200,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { if ((*pTask)->status.requireConsensusChkptId) { entry.checkpointInfo.consensusChkptId = 1; (*pTask)->status.requireConsensusChkptId = false; - stDebug("s-task:%s vgId:%d set the require consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId); + stDebug("s-task:%s vgId:%d set requiring consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId); } if ((*pTask)->exec.pWalReader != NULL) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 07c67ba007..5bec930455 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1424,7 +1424,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { } // negotiate the consensus checkpoint id for current task - code = streamTaskSendRestoreChkptMsg(pTask); + code = streamTaskSendNegotiateChkptIdMsg(pTask); // this task may has no checkpoint, but others tasks may generate checkpoint already? streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f190673430..5628095973 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1182,6 +1182,8 @@ const char* streamTaskGetExecType(int32_t type) { return "resume-task-from-idle"; case STREAM_EXEC_T_ADD_FAILED_TASK: return "record-start-failed-task"; + case 0: + return "exec-all-tasks"; default: return "invalid-exec-type"; }