From 653f7a1a434403882a2dc94f6ea59875f9665097 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 4 Jul 2024 13:55:50 +0800 Subject: [PATCH] fix(stream): refactor the checkpoint consensus policy. --- include/common/tmsgdef.h | 1 + include/libs/stream/streamMsg.h | 4 +- include/libs/stream/tstream.h | 5 +- source/dnode/mnode/impl/inc/mndStream.h | 10 +- source/dnode/mnode/impl/src/mndMain.c | 14 +- source/dnode/mnode/impl/src/mndStream.c | 195 +++++++++++++++----- source/dnode/mnode/impl/src/mndStreamUtil.c | 132 ++++--------- source/dnode/snode/src/snode.c | 4 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 8 +- source/libs/stream/src/streamCheckStatus.c | 6 +- source/libs/stream/src/streamCheckpoint.c | 8 - source/libs/stream/src/streamHb.c | 2 + 12 files changed, 223 insertions(+), 166 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index c92649f1f7..19fe34fe01 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -251,6 +251,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_CONSEN, "stream-chkpt-consen", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CONSEN_TIMER, "stream-consen-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) diff --git a/include/libs/stream/streamMsg.h b/include/libs/stream/streamMsg.h index b69032330d..8b6ca2c5cd 100644 --- a/include/libs/stream/streamMsg.h +++ b/include/libs/stream/streamMsg.h @@ -242,8 +242,8 @@ typedef struct { typedef struct SCheckpointConsensusEntry { SRestoreCheckpointInfo req; - SRpcMsg rsp; - int64_t ts; + SRpcHandleInfo rspInfo; + int64_t ts; } SCheckpointConsensusEntry; #ifdef __cplusplus diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e98039d2fe..66b9db47e2 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -616,8 +616,9 @@ typedef struct SStreamTaskState { typedef struct SCheckpointConsensusInfo { SArray* pTaskList; - int64_t checkpointId; - int64_t genTs; +// int64_t checkpointId; +// int64_t genTs; + int32_t numOfTasks; } SCheckpointConsensusInfo; int32_t streamSetupScheduleTrigger(SStreamTask* pTask); diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index b261f89057..a86e06b486 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -133,7 +133,8 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq); -int32_t mndSendConsensusCheckpointIdRsp(SArray* pList, int64_t checkpointId); +int32_t mndSendQuickConsensusChkptIdRsp(SRestoreCheckpointInfo *pReq, int32_t code, int64_t streamId, + int64_t checkpointId, SRpcHandleInfo *pRpcInfo); void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); @@ -146,10 +147,9 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInf int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId); -void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcMsg *pMsg); -int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo *pInfo, SStreamObj *pStream); -bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo *pInfo, int32_t numOfTasks, int32_t* pTotal); +SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks); +void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, + SRpcHandleInfo *pRpcInfo); void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg, int64_t checkpointId); int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index cad8c6d745..3acb727910 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -177,6 +177,15 @@ static void mndStreamCheckNode(SMnode *pMnode) { } } +static void mndStreamConsensusChkpt(SMnode *pMnode) { + int32_t contLen = 0; + void *pReq = mndBuildTimerMsg(&contLen); + if (pReq != NULL) { + SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CONSEN_TIMER, .pCont = pReq, .contLen = contLen}; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + } +} + static void mndPullupTelem(SMnode *pMnode) { mTrace("pullup telem msg"); int32_t contLen = 0; @@ -308,7 +317,6 @@ static int32_t minCronTime() { min = TMIN(min, tsCompactPullupInterval); min = TMIN(min, tsMqRebalanceInterval); min = TMIN(min, tsStreamCheckpointInterval); - min = TMIN(min, 6); // checkpointRemain min = TMIN(min, tsStreamNodeCheckInterval); min = TMIN(min, tsArbHeartBeatIntervalSec); min = TMIN(min, tsArbCheckSyncIntervalSec); @@ -353,6 +361,10 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) { mndStreamCheckNode(pMnode); } + if (sec % 5 == 0) { + mndStreamConsensusChkpt(pMnode); + } + if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) { mndPullupTelem(pMnode); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 54b24fe0b4..a4c03ab3e0 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -59,7 +59,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static int32_t mndProcessCheckpointReport(SRpcMsg *pReq); -static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq); +static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg); +static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); @@ -123,6 +124,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq); @@ -803,7 +805,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name); saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo); - mndRegisterConsensusChkptId(execInfo.pStreamConsensus, streamObj.uid); +// mndRegisterConsensusChkptId(execInfo.pStreamConsensus, streamObj.uid); taosThreadMutexUnlock(&execInfo.lock); // execute creation @@ -2625,12 +2627,42 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { return 0; } -static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; +static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, bool* pAllEqual) { + int32_t num = 0; + int64_t chkId = INT64_MAX; + *pAllEqual = true; + + for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { + STaskId* p = taosArrayGet(execInfo.pTaskList, i); + if (p->streamId != streamId) { + continue; + } + + num += 1; + STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); + + if (chkId != INT64_MAX && chkId != pe->checkpointInfo.latestId) { + *pAllEqual = false; + } + + if (chkId > pe->checkpointInfo.latestId) { + chkId = pe->checkpointInfo.latestId; + } + } + + if (num < numOfTasks) { // not all task send info to mnode through hbMsg, no valid checkpoint Id + return -1; + } + + return chkId; +} + +static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) { + SMnode *pMnode = pMsg->info.node; SDecoder decoder = {0}; SRestoreCheckpointInfo req = {0}; - tDecoderInit(&decoder, pReq->pCont, pReq->contLen); + tDecoderInit(&decoder, pMsg->pCont, pMsg->contLen); if (tDecodeRestoreCheckpointInfo(&decoder, &req)) { tDecoderClear(&decoder); @@ -2647,80 +2679,155 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); + + // mnode handle the create stream transaction too slow may cause this problem if (pStream == NULL) { - mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId); + mWarn("failed to find the stream:0x%" PRIx64 ", not handle consensus-checkpointId", req.streamId); // not in meta-store yet, try to acquire the task in exec buffer // the checkpoint req arrives too soon before the completion of the create stream trans. STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); if (p == NULL) { - mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId); + mError("failed to find the stream:0x%" PRIx64 " in buf, not handle consensus-checkpointId", req.streamId); terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + + mndSendQuickConsensusChkptIdRsp(&req, terrno, req.streamId, 0, &pMsg->info); + taosThreadMutexUnlock(&execInfo.lock); + pMsg->info.handle = NULL; // disable auto rsp return -1; } else { mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", req.streamId, req.taskId); + // todo wait for stream is created } } + mInfo("vgId:%d meta-stored checkpointId for stream:0x%" PRIx64 " %s is:%" PRId64, req.nodeId, req.streamId, + pStream->name, pStream->checkpointId); + int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); - - SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId); - - int64_t ckId = mndGetConsensusCheckpointId(pInfo, pStream); - if (ckId != -1) { // consensus checkpoint id already exist - SRpcMsg rsp = {0}; - rsp.code = 0; - rsp.info = pReq->info; - rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead); - rsp.pCont = rpcMallocCont(rsp.contLen); - - SMsgHead *pHead = rsp.pCont; - pHead->vgId = htonl(req.nodeId); - - mDebug("stream:0x%" PRIx64 " consensus-checkpointId:%" PRId64 " exists, return directly", req.streamId, ckId); - doSendConsensusCheckpointRsp(&req, &rsp, ckId); + if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly + mndSendQuickConsensusChkptIdRsp(&req, TSDB_CODE_SUCCESS, req.streamId, 0, &pMsg->info); taosThreadMutexUnlock(&execInfo.lock); - pReq->info.handle = NULL; // disable auto rsp - + pMsg->info.handle = NULL; // disable auto rsp return TSDB_CODE_SUCCESS; } - mndAddConsensusTasks(pInfo, &req, pReq); + bool allEqual = true; + int64_t chkId = getConsensusId(req.streamId, numOfTasks, &allEqual); - int32_t total = 0; - if (mndAllTaskSendCheckpointId(pInfo, numOfTasks, &total)) { // all tasks has send the reqs - // start transaction to set the checkpoint id - int64_t checkpointId = mndGetConsensusCheckpointId(pInfo, pStream); - mInfo("stream:0x%" PRIx64 " %s all %d tasks send latest checkpointId, the consensus-checkpointId is:%" PRId64 - " will be issued soon", - req.streamId, pStream->name, numOfTasks, checkpointId); + // some tasks not send hbMsg to mnode yet, wait for 5s. + if (chkId == -1) { + mDebug( + "not all task send hbMsg yet, add into list and wait for 10s to check the consensus-checkpointId again, " + "s-task:0x%x", req.taskId); + SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks); + mndAddConsensusTasks(pInfo, &req, &pMsg->info); + taosThreadMutexUnlock(&execInfo.lock); - // start the checkpoint consensus trans - int32_t code = mndSendConsensusCheckpointIdRsp(pInfo->pTaskList, checkpointId); - if (code == TSDB_CODE_SUCCESS) { - mndClearConsensusRspEntry(pInfo); - mDebug("clear all waiting for rsp entry for stream:0x%" PRIx64, req.streamId); - } else { - mDebug("stream:0x%" PRIx64 " not start send consensus-checkpointId msg, due to not all task ready", req.streamId); - } - } else { - mDebug("stream:0x%" PRIx64 " %d/%d tasks send consensus-checkpointId info", req.streamId, total, numOfTasks); + pMsg->info.handle = NULL; // disable auto rsp + return 0; } + if (chkId == req.checkpointId) { + mDebug("vgId:%d stream:0x%" PRIx64 " %s consensus-checkpointId is:%" PRId64, req.nodeId, req.streamId, + pStream->name, pStream->checkpointId); + mndSendQuickConsensusChkptIdRsp(&req, TSDB_CODE_SUCCESS, req.streamId, chkId, &pMsg->info); + + taosThreadMutexUnlock(&execInfo.lock); + pMsg->info.handle = NULL; // disable auto rsp + return 0; + } + + // wait for 5s and check again + SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks); + mndAddConsensusTasks(pInfo, &req, &pMsg->info); + if (pStream != NULL) { mndReleaseStream(pMnode, pStream); } taosThreadMutexUnlock(&execInfo.lock); - pReq->info.handle = NULL; // disable auto rsp + pMsg->info.handle = NULL; // disable auto rsp return 0; } +int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { + int64_t now = taosGetTimestampMs(); + int64_t streamId = -1; // todo: fix only one + + mDebug("start to process consensus-checkpointId in tmr"); + taosThreadMutexLock(&execInfo.lock); + + void *pIter = NULL; + while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) { + SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter; + + int32_t j = 0; + int32_t num = taosArrayGetSize(pInfo->pTaskList); + + SArray *pList = taosArrayInit(4, sizeof(int32_t)); + + for (; j < num; ++j) { + SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j); + + if ((now - pe->ts) > 10 * 1000) { + bool allEqual = true; + int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &allEqual); + if (chkId == -1) { + mDebug("tasks send hbMsg for stream:0x%" PRIx64 ", wait for next round", pe->req.streamId); + break; + } + + if (allEqual) { + mDebug("all has identical checkpointId for stream:0x%"PRIx64" send checkpointId to s-task:0x%x", + pe->req.streamId, pe->req.taskId); + + mndSendQuickConsensusChkptIdRsp(&pe->req, TSDB_CODE_SUCCESS, pe->req.streamId, chkId, &pe->rspInfo); + } else { + ASSERT(chkId <= pe->req.checkpointId); + mndSendQuickConsensusChkptIdRsp(&pe->req, TSDB_CODE_SUCCESS, pe->req.streamId, chkId, &pe->rspInfo); + } + + taosArrayPush(pList, &pe->req.taskId); + streamId = pe->req.streamId; + } else { + mDebug("s-task:0x%x sendTs:%" PRId64 " wait %2.fs already, wait for next round to check", pe->req.taskId, + (now - pe->ts)/ 1000.0, pe->ts); + } + } + + if (taosArrayGetSize(pList) > 0) { + for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + int32_t *taskId = taosArrayGet(pList, i); + for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) { + SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k); + if (pe->req.taskId == *taskId) { + taosArrayRemove(pInfo->pTaskList, k); + break; + } + } + } + } + + taosArrayDestroy(pList); + + if (taosArrayGetSize(pInfo->pTaskList) == 0) { + mndClearConsensusRspEntry(pInfo); + mndClearConsensusCheckpointId(execInfo.pStreamConsensus, streamId); + } + } + + taosThreadMutexUnlock(&execInfo.lock); + + mDebug("end to process consensus-checkpointId in tmr"); + return TSDB_CODE_SUCCESS; +} + static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) { int32_t code = mndProcessCreateStreamReq(pReq); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index a15b817784..9ee820925c 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -848,7 +848,7 @@ int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo* pInfo, SRpcMsg* pMs return -1; } - int32_t tlen = sizeof(SMsgHead) + blen; + int32_t tlen = sizeof(SMsgHead) + blen; void *abuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); @@ -863,22 +863,30 @@ int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo* pInfo, SRpcMsg* pMs return code; } -int32_t mndSendConsensusCheckpointIdRsp(SArray* pInfoList, int64_t checkpointId) { - for(int32_t i = 0; i < taosArrayGetSize(pInfoList); ++i) { - SCheckpointConsensusEntry* pInfo = taosArrayGet(pInfoList, i); - doSendConsensusCheckpointRsp(&pInfo->req, &pInfo->rsp, checkpointId); - } - return 0; +int32_t mndSendQuickConsensusChkptIdRsp(SRestoreCheckpointInfo *pReq, int32_t code, int64_t streamId, + int64_t checkpointId, SRpcHandleInfo *pRpcInfo) { + SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead)}; + rsp.pCont = rpcMallocCont(rsp.contLen); + + SMsgHead *pHead = rsp.pCont; + pHead->vgId = htonl(pReq->nodeId); + + mDebug("stream:0x%" PRIx64 " consensus-checkpointId:%" PRId64 " exists, s-task:0x%x send to vnode", + streamId, checkpointId, pReq->taskId); + return doSendConsensusCheckpointRsp(pReq, &rsp, checkpointId); } -SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId) { +SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks) { void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); if (pInfo != NULL) { return (SCheckpointConsensusInfo*)pInfo; } SCheckpointConsensusInfo p = { - .genTs = -1, .checkpointId = -1, .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry))}; + .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry)), + .numOfTasks = numOfTasks, + }; + taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p)); void* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId)); @@ -887,87 +895,15 @@ SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId) // no matter existed or not, add the request into info list anyway, since we need to send rsp mannually // discard the msg may lead to the lost of connections. -void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcMsg *pMsg) { - SCheckpointConsensusEntry info = {0}; +void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcHandleInfo* pRpcInfo) { + SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs(), .rspInfo = *pRpcInfo}; memcpy(&info.req, pRestoreInfo, sizeof(info.req)); - info.rsp.code = 0; - info.rsp.info = pMsg->info; - info.rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead); - info.rsp.pCont = rpcMallocCont(info.rsp.contLen); - - SMsgHead *pHead = info.rsp.pCont; - pHead->vgId = htonl(pRestoreInfo->nodeId); - taosArrayPush(pInfo->pTaskList, &info); -} -static int32_t entryComparFn(const void* p1, const void* p2) { - const SCheckpointConsensusEntry* pe1 = p1; - const SCheckpointConsensusEntry* pe2 = p2; - - if (pe1->req.taskId == pe2->req.taskId) { - return 0; - } - - return pe1->req.taskId < pe2->req.taskId? -1:1; -} - -bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo* pInfo, int32_t numOfTasks, int32_t* pTotal) { - int32_t numOfExisted = taosArrayGetSize(pInfo->pTaskList); - if (numOfExisted < numOfTasks) { - if (pTotal != NULL) { - *pTotal = numOfExisted; - } - return false; - } - - taosArraySort(pInfo->pTaskList, entryComparFn); - - int32_t num = 1; - int32_t taskId = ((SCheckpointConsensusEntry*)taosArrayGet(pInfo->pTaskList, 0))->req.taskId; - for(int32_t i = 1; i < taosArrayGetSize(pInfo->pTaskList); ++i) { - SCheckpointConsensusEntry* pe = taosArrayGet(pInfo->pTaskList, i); - if (pe->req.taskId != taskId) { - num += 1; - taskId = pe->req.taskId; - } - } - - if (pTotal != NULL) { - *pTotal = num; - } - - ASSERT(num <= numOfTasks); - return num == numOfTasks; -} - -int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo* pInfo, SStreamObj* pStream) { - if (pInfo->genTs > 0) { // there is no checkpoint ever generated if the checkpointId is 0. - mDebug("existed consensus-checkpointId:%" PRId64 " for stream:0x%" PRIx64 " %s exist, and return", - pInfo->checkpointId, pStream->uid, pStream->name); - return pInfo->checkpointId; - } - - int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); - if (!mndAllTaskSendCheckpointId(pInfo, numOfTasks, NULL)) { - return -1; - } - - int64_t checkpointId = INT64_MAX; - - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) { - SCheckpointConsensusEntry *pEntry = taosArrayGet(pInfo->pTaskList, i); - if (pEntry->req.checkpointId < checkpointId) { - checkpointId = pEntry->req.checkpointId; - mTrace("stream:0x%" PRIx64 " %s task:0x%x vgId:%d latest checkpointId:%" PRId64, pStream->uid, pStream->name, - pEntry->req.taskId, pEntry->req.nodeId, pEntry->req.checkpointId); - } - } - - pInfo->checkpointId = checkpointId; - pInfo->genTs = taosGetTimestampMs(); - return checkpointId; + int32_t num = taosArrayGetSize(pInfo->pTaskList); + mDebug("s-task:0x%x added into consensus-checkpointId list, stream:0x%" PRIx64 " total waiting:%d", + pRestoreInfo->taskId, pRestoreInfo->streamId, num); } void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) { @@ -982,15 +918,15 @@ int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) { return TSDB_CODE_SUCCESS; } -int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId) { - void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); - ASSERT(pInfo == NULL); - - SCheckpointConsensusInfo p = {.genTs = taosGetTimestampMs(), .checkpointId = 0, .pTaskList = NULL}; - taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p)); - - SCheckpointConsensusInfo* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId)); - ASSERT(pChkptInfo->genTs > 0 && pChkptInfo->checkpointId == 0); - mDebug("s-task:0x%" PRIx64 " set the initial consensus-checkpointId:0", streamId); - return TSDB_CODE_SUCCESS; -} \ No newline at end of file +//int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId) { +// void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); +// ASSERT(pInfo == NULL); +// +// SCheckpointConsensusInfo p = {.genTs = taosGetTimestampMs(), .checkpointId = 0, .pTaskList = NULL}; +// taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p)); +// +// SCheckpointConsensusInfo* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId)); +// ASSERT(pChkptInfo->genTs > 0 && pChkptInfo->checkpointId == 0); +// mDebug("s-task:0x%" PRIx64 " set the initial consensus-checkpointId:0", streamId); +// return TSDB_CODE_SUCCESS; +//} \ No newline at end of file diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 9686fd3789..69a7bc7ba4 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -43,14 +43,14 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce char *p = streamTaskGetStatus(pTask)->name; if (pTask->info.fillHistory) { - sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + sndInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms", SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam); } else { - sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + sndInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 668e178d2d..04a658a30c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -279,7 +279,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); - pMeta->startInfo.tasksWillRestart = 1; + if (restored) { + tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId); + pMeta->startInfo.tasksWillRestart = 1; + } if (updateTasks < numOfTasks) { tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, @@ -292,8 +295,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamMetaClearUpdateTaskList(pMeta); if (!restored) { - tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId); - pMeta->startInfo.tasksWillRestart = 0; + tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId); } else { tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId); #if 0 diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 8778e3314a..22d336a549 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -666,7 +666,11 @@ void rspMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); streamTaskCompleteCheckRsp(pInfo, true, id); - addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId); + + // not record the failed of the current task if try to close current vnode + if (!pMeta->closeFlag) { + addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId); + } streamMetaReleaseTask(pMeta, pTask); return; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index bc973f17d7..1195362ab3 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -447,14 +447,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV SStreamTaskState* pStatus = streamTaskGetStatus(pTask); - if (restored && (pStatus->state != TASK_STATUS__CK) && (pMeta->role == NODE_ROLE_LEADER)) { - stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 - " failed", - pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId); - taosThreadMutexUnlock(&pTask->lock); - return TSDB_CODE_STREAM_TASK_IVLD_STATUS; - } - if (!restored) { // during restore procedure, do update checkpoint-info stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index d6411e25f2..691ec44672 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -168,7 +168,9 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { continue; } + taosThreadMutexLock(&(*pTask)->lock); STaskStatusEntry entry = streamTaskGetStatusEntry(*pTask); + taosThreadMutexUnlock(&(*pTask)->lock); entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {