From 54fdea3173b7132ea98b8c77f6a7ac611bce5de5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 15 Aug 2024 18:01:28 +0800 Subject: [PATCH 1/3] fix(stream): perform node update when mnode leader/follower switches. --- include/common/tmsg.h | 1 + include/libs/stream/tstream.h | 8 +- source/dnode/mnode/impl/inc/mndStream.h | 3 + source/dnode/mnode/impl/src/mndStream.c | 117 ++++++++++++++++--- source/dnode/mnode/impl/src/mndStreamTrans.c | 2 +- source/dnode/mnode/impl/src/mndStreamUtil.c | 3 + source/dnode/mnode/impl/src/mndSync.c | 4 + source/dnode/vnode/src/tqCommon/tqCommon.c | 41 +++---- source/libs/stream/src/streamExec.c | 8 +- source/libs/stream/src/streamMeta.c | 59 ++++++++-- source/libs/stream/src/streamTask.c | 2 +- 11 files changed, 194 insertions(+), 54 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 70cf9c8b58..cfacb5cfd6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3696,6 +3696,7 @@ typedef struct { SMsgHead head; int64_t streamId; int32_t taskId; + int32_t transId; } SVPauseStreamTaskReq, SVResetStreamTaskReq; typedef struct { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9c59e3f3ec..5e7f2bf0a6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -473,7 +473,9 @@ typedef struct STaskStartInfo { typedef struct STaskUpdateInfo { SHashObj* pTasks; - int32_t transId; + int32_t activeTransId; + int32_t completeTransId; + int64_t completeTs; } STaskUpdateInfo; typedef struct SScanWalInfo { @@ -753,8 +755,8 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs); void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, int64_t startTs); -void streamMetaClearUpdateTaskList(SStreamMeta* pMeta); -void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId); +void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta); +bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 88b8e98afb..a87a01c5b6 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -64,6 +64,8 @@ typedef struct SChkptReportInfo { } SChkptReportInfo; typedef struct SStreamExecInfo { + int32_t role; + bool switchFromFollower; bool initTaskList; SArray *pNodeList; int64_t ts; // snapshot ts @@ -153,6 +155,7 @@ int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask); int32_t mndInitExecInfo(); void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo); void mndInitStreamExecInfoForLeader(SMnode *pMnode); +void mndInitStreamExecInfoUpdateRole(SMnode *pMnode, int32_t role); int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a01bc92a97..a85b5c733b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -144,7 +144,7 @@ int32_t mndInitStream(SMnode *pMnode) { code = sdbSetTable(pMnode->pSdb, table); if (code) { - return terrno; + return code; } code = sdbSetTable(pMnode->pSdb, tableSeq); @@ -2024,7 +2024,7 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP return info; } -static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { +static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; void *pIter = NULL; @@ -2069,12 +2069,14 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange } } - void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); - void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb)); - if (p == NULL && p1 == NULL) { - mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name); - sdbRelease(pSdb, pStream); - continue; + if (!includeAllNodes) { + void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); + void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb)); + if (p1 == NULL && p2 == NULL) { + mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name); + sdbRelease(pSdb, pStream); + continue; + } } mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, @@ -2192,11 +2194,36 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi return code; } +static int32_t addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) { + void *pIter = NULL; + int32_t code = 0; + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) { + break; + } + + code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0); + sdbRelease(pSdb, pVgroup); + + if (code == 0) { + int32_t size = taosHashGetSize(pDBMap); + mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size); + } + } + + return code; +} + // this function runs by only one thread, so it is not multi-thread safe static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int32_t code = 0; bool allReady = true; SArray *pNodeSnapshot = NULL; + SMnode *pMnode = pMsg->info.node; + int64_t ts = taosGetTimestampSec(); + bool updateAllVgroups = false; int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1); if (old != 0) { @@ -2204,10 +2231,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } - mDebug("start to do node change checking"); - int64_t ts = taosGetTimestampSec(); - - SMnode *pMnode = pMsg->info.node; + mDebug("start to do node changing check"); streamMutexLock(&execInfo.lock); int32_t numOfNodes = extractStreamNodeList(pMnode); @@ -2240,10 +2264,20 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { } SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); - if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { + + { + if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) { + mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans"); + updateAllVgroups = true; + execInfo.switchFromFollower = false; // reset the flag + (void) addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb); + } + } + + if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) { // kill current active checkpoint transaction, since the transaction is vnode wide. killAllCheckpointTrans(pMnode, &changeInfo); - code = mndProcessVgroupChange(pMnode, &changeInfo); + code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups); // keep the new vnode snapshot if success if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { @@ -2284,6 +2318,9 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { int32_t size = sizeof(SMStreamNodeCheckMsg); SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size); + if (pMsg == NULL) { + return terrno; + } SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size}; return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); @@ -2459,6 +2496,10 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { { SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)}; rsp.pCont = rpcMallocCont(rsp.contLen); + if (rsp.pCont == NULL) { + return terrno; + } + SMsgHead *pHead = rsp.pCont; pHead->vgId = htonl(req.nodeId); @@ -2663,11 +2704,13 @@ static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pEx static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) { SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize}; rsp.pCont = rpcMallocCont(rsp.contLen); - SMsgHead *pHead = rsp.pCont; - pHead->vgId = htonl(vgId); + if (rsp.pCont != NULL) { + SMsgHead *pHead = rsp.pCont; + pHead->vgId = htonl(vgId); - tmsgSendRsp(&rsp); - pInfo->handle = NULL; // disable auto rsp + tmsgSendRsp(&rsp); + pInfo->handle = NULL; // disable auto rsp + } } int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { @@ -2804,6 +2847,10 @@ static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) { int32_t code = mndProcessCreateStreamReq(pReq); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { pReq->info.rsp = rpcMallocCont(1); + if (pReq->info.rsp == NULL) { + return terrno; + } + pReq->info.rspLen = 1; pReq->info.noResp = false; pReq->code = code; @@ -2815,6 +2862,10 @@ static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) { int32_t code = mndProcessDropStreamReq(pReq); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { pReq->info.rsp = rpcMallocCont(1); + if (pReq->info.rsp == NULL) { + return terrno; + } + pReq->info.rspLen = 1; pReq->info.noResp = false; pReq->code = code; @@ -2837,6 +2888,36 @@ void mndInitStreamExecInfoForLeader(SMnode* pMnode) { mndInitStreamExecInfo(pMnode, &execInfo); } +void mndInitStreamExecInfoUpdateRole(SMnode* pMnode, int32_t role) { + execInfo.switchFromFollower = false; + + if (execInfo.role == NODE_ROLE_UNINIT) { + execInfo.role = role; + if (role == NODE_ROLE_LEADER) { + mInfo("init mnode is set to leader"); + } else { + mInfo("init mnode is set to follower"); + } + } else { + if (role == NODE_ROLE_LEADER) { + if (execInfo.role == NODE_ROLE_FOLLOWER) { + execInfo.role = role; + execInfo.switchFromFollower = true; + mInfo("mnode switch to be leader from follower"); + } else { + mInfo("mnode remain to be leader, do nothing"); + } + } else { // follower's + if (execInfo.role == NODE_ROLE_LEADER) { + execInfo.role = role; + mInfo("mnode switch to be follower from leader"); + } else { + mInfo("mnode remain to be follower, do nothing"); + } + } + } +} + void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 414cd402ec..494771e65e 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -334,5 +334,5 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { taosMemoryFree(p); } - mDebug("complete clear checkpoints in Dbs"); + mDebug("complete clear checkpoints in all Dbs"); } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 383ffe16da..07bba4e1b3 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -785,6 +785,9 @@ int32_t mndInitExecInfo() { return terrno; } + execInfo.role = NODE_ROLE_UNINIT; + execInfo.switchFromFollower = false; + taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList); taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 1094a17f6b..f5704be371 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -360,6 +360,8 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) { (void)tsem_post(&pMgmt->syncSem); } (void)taosThreadMutexUnlock(&pMgmt->lock); + + mndInitStreamExecInfoUpdateRole(pMnode, NODE_ROLE_FOLLOWER); } static void mndBecomeLearner(const SSyncFSM *pFsm) { @@ -382,6 +384,8 @@ static void mndBecomeLearner(const SSyncFSM *pFsm) { static void mndBecomeLeader(const SSyncFSM *pFsm) { mInfo("vgId:1, become leader"); SMnode *pMnode = pFsm->data; + + mndInitStreamExecInfoUpdateRole(pMnode, NODE_ROLE_LEADER); mndInitStreamExecInfoForLeader(pMnode); } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index dc58bfd8c4..7037eb5199 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -193,28 +193,23 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM SStreamTask* pTask = *ppTask; const char* idstr = pTask->id.idStr; - if (pMeta->updateInfo.transId == -1) { // info needs to be kept till the new trans to update the nodeEp arrived. - streamMetaInitUpdateTaskList(pMeta, req.transId); + if (req.transId <= 0) { + tqError("vgId:%d invalid update nodeEp task, transId:%d, discard", vgId, req.taskId); + rsp.code = TSDB_CODE_SUCCESS; + streamMetaWUnLock(pMeta); + + taosArrayDestroy(req.pNodeList); + return rsp.code; } - if (pMeta->updateInfo.transId != req.transId) { - if (req.transId < pMeta->updateInfo.transId) { - tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId, - pMeta->updateInfo.transId, req.transId); - rsp.code = TSDB_CODE_SUCCESS; - streamMetaWUnLock(pMeta); + // info needs to be kept till the new trans to update the nodeEp arrived. + bool update = streamMetaInitUpdateTaskList(pMeta, req.transId); + if (!update) { + rsp.code = TSDB_CODE_SUCCESS; + streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return rsp.code; - } else { - tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr, - vgId, req.transId, pMeta->updateInfo.transId); - // info needs to be kept till the new trans to update the nodeEp arrived. - streamMetaInitUpdateTaskList(pMeta, req.transId); - } - } else { - tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d, recorded update transId:%d", idstr, - vgId, req.transId, pMeta->updateInfo.transId); + taosArrayDestroy(req.pNodeList); + return rsp.code; } // duplicate update epset msg received, discard this redundant message @@ -311,7 +306,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM // persist to disk } - streamMetaClearUpdateTaskList(pMeta); + streamMetaClearSetUpdateTaskListComplete(pMeta); if (!restored) { tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId); @@ -775,8 +770,8 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { streamMetaWUnLock(pMeta); terrno = 0; - tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId, - pMeta->updateInfo.transId); + tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId, + pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs); while (streamMetaTaskInTimer(pMeta)) { tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); @@ -902,7 +897,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER)); } else { if (pStartInfo->restartCount == 0) { - tqDebug("vgId:%d start all tasks completed in callbackFn, restartCount is 0", pMeta->vgId); + tqDebug("vgId:%d start all tasks completed in callbackFn, restartCounter is 0", pMeta->vgId); } else if (allReady) { pStartInfo->restartCount = 0; tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d222004fb7..cd69c9168c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -631,7 +631,13 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB (void) streamTaskReloadState(pTask); stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, streamTaskGetStatus(pHTask).name); - + // todo execute qExecTask to fetch the reload-generated result, if this is stream is for session window query. + /* + * while(1) { + * qExecTask() + * } + * // put into the output queue. + */ streamMetaReleaseTask(pTask->pMeta, pHTask); } else { stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id, diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5ed9f274a2..8379d904c2 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -431,7 +431,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, pMeta->expandTaskFn = expandTaskFn; pMeta->stage = stage; pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT; - pMeta->updateInfo.transId = -1; + pMeta->updateInfo.activeTransId = -1; + pMeta->updateInfo.completeTransId = -1; pMeta->startInfo.completeFn = fn; pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -1759,12 +1760,56 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt } } -void streamMetaClearUpdateTaskList(SStreamMeta* pMeta) { - taosHashClear(pMeta->updateInfo.pTasks); - pMeta->updateInfo.transId = -1; +void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta) { + STaskUpdateInfo* pInfo = &pMeta->updateInfo; + + taosHashClear(pInfo->pTasks); + + int32_t prev = pInfo->completeTransId; + pInfo->completeTransId = pInfo->activeTransId; + pInfo->activeTransId = -1; + pInfo->completeTs = taosGetTimestampMs(); + + stDebug("vgId:%d set the nodeEp update complete, ts:%" PRId64 ", complete transId:%d->%d, reset active transId", + pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId); } -void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) { - taosHashClear(pMeta->updateInfo.pTasks); - pMeta->updateInfo.transId = transId; +bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) { + STaskUpdateInfo* pInfo = &pMeta->updateInfo; + + if (transId > pInfo->completeTransId) { + if (pInfo->activeTransId == -1) { + taosHashClear(pInfo->pTasks); + pInfo->activeTransId = transId; + + stInfo("vgId:%d set the active epset update transId:%d, prev complete transId:%d", pMeta->vgId, transId, + pInfo->completeTransId); + return true; + } else { + if (pInfo->activeTransId == transId) { + // do nothing + return true; + } else if (transId < pInfo->activeTransId) { + stError("vgId:%d invalid(out of order)epset update transId:%d, active transId:%d, complete transId:%d, discard", + pMeta->vgId, transId, pInfo->activeTransId, pInfo->completeTransId); + return false; + } else { // transId > pInfo->activeTransId + taosHashClear(pInfo->pTasks); + int32_t prev = pInfo->activeTransId; + pInfo->activeTransId = transId; + + stInfo("vgId:%d active epset update transId updated from:%d to %d, prev complete transId:%d", pMeta->vgId, + transId, prev, pInfo->completeTransId); + return true; + } + } + } else if (transId == pInfo->completeTransId) { + stError("vgId:%d already handled epset update transId:%d, completeTs:%" PRId64 " ignore", pMeta->vgId, transId, + pInfo->completeTs); + return false; + } else { // pInfo->completeTransId > transId + stError("vgId:%d disorder update nodeEp msg recv, prev completed epset update transId:%d, recv:%d, discard", + pMeta->vgId, pInfo->activeTransId, transId); + return false; + } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c0b2b16d30..f190673430 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -637,7 +637,7 @@ bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { numOfNodes, p->updateCount, prevTs); bool updated = false; - for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { + for (int32_t i = 0; i < numOfNodes; ++i) { SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i); if (pInfo == NULL) { continue; From 5322b60a31086f37da347ce101e6243e8d5e4776 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 15 Aug 2024 18:08:16 +0800 Subject: [PATCH 2/3] refactor: remove unused attributes in msg. --- include/common/tmsg.h | 1 - 1 file changed, 1 deletion(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index cfacb5cfd6..70cf9c8b58 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3696,7 +3696,6 @@ typedef struct { SMsgHead head; int64_t streamId; int32_t taskId; - int32_t transId; } SVPauseStreamTaskReq, SVResetStreamTaskReq; typedef struct { From c94cd245931f713e3e980eb23f34b5e3013355ac Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 15 Aug 2024 18:29:36 +0800 Subject: [PATCH 3/3] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckpoint.c | 14 +++++++++----- source/libs/stream/src/streamMeta.c | 20 ++++++++++++-------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 4bf74d8d4f..9be8f5ffaa 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -561,12 +561,14 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks); } + streamMetaWLock(pMeta); - if (streamMetaCommit(pMeta) < 0) { - // persist to disk + if (pReq->dropRelHTask) { + code = streamMetaCommit(pMeta); } } + // always return true return TSDB_CODE_SUCCESS; } @@ -594,13 +596,15 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && pInfo->processedVer <= pReq->checkpointVer); - // update only it is in checkpoint status. - if (pStatus.state == TASK_STATUS__CK) { + // update only it is in checkpoint status, or during restore procedure. + if (pStatus.state == TASK_STATUS__CK || (!restored)) { pInfo->checkpointId = pReq->checkpointId; pInfo->checkpointVer = pReq->checkpointVer; pInfo->checkpointTime = pReq->checkpointTs; - code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + if (restored) { + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + } } streamTaskClearCheckInfo(pTask, true); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8379d904c2..7c6461b1c8 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -891,24 +891,28 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) { } int32_t streamMetaCommit(SStreamMeta* pMeta) { - if (tdbCommit(pMeta->db, pMeta->txn) < 0) { + int32_t code = 0; + code = tdbCommit(pMeta->db, pMeta->txn); + if (code != 0) { stError("vgId:%d failed to commit stream meta", pMeta->vgId); - return -1; + return code; } - if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) { + code = tdbPostCommit(pMeta->db, pMeta->txn); + if (code != 0) { stError("vgId:%d failed to do post-commit stream meta", pMeta->vgId); - return -1; + return code; } - if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, - TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + if (code != 0) { stError("vgId:%d failed to begin trans", pMeta->vgId); - return -1; + return code; } stDebug("vgId:%d stream meta file commit completed", pMeta->vgId); - return 0; + return code; } int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {