diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 98ca414466..387df52f16 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3473,8 +3473,6 @@ typedef struct SVUpdateCheckpointInfoReq { int64_t checkpointTs; int32_t transId; int8_t dropRelHTask; - int64_t hStreamId; - int64_t hTaskId; } SVUpdateCheckpointInfoReq; typedef struct { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1996c2ed63..bcf081dbfb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -734,6 +734,9 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32 void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs); void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, int64_t startTs); +void streamMetaClearUpdateTaskList(SStreamMeta* pMeta); +void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId); + void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7c8ad159bc..44aef4a91e 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -185,7 +185,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM SStreamTask* pTask = *ppTask; const char* idstr = pTask->id.idStr; - if (pMeta->updateInfo.transId != req.transId) { + if ((pMeta->updateInfo.transId != req.transId) && (pMeta->updateInfo.transId != -1)) { if (req.transId < pMeta->updateInfo.transId) { tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId, pMeta->updateInfo.transId, req.transId); @@ -197,13 +197,12 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } else { tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr, vgId, req.transId, pMeta->updateInfo.transId); - // info needs to be kept till the new trans to update the nodeEp arrived. - taosHashClear(pMeta->updateInfo.pTasks); - pMeta->updateInfo.transId = req.transId; + streamMetaInitUpdateTaskList(pMeta, req.transId); } } else { tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId); + streamMetaInitUpdateTaskList(pMeta, req.transId); } // duplicate update epset msg received, discard this redundant message @@ -280,6 +279,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM // persist to disk } + streamMetaClearUpdateTaskList(pMeta); + if (!restored) { tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId); pMeta->startInfo.tasksWillRestart = 0; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1583734a99..6696c9f8c2 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -418,6 +418,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin int32_t code = 0; const char* id = pTask->id.idStr; SCheckpointInfo* pInfo = &pTask->chkInfo; + STaskId hTaskId = {0}; taosThreadMutexLock(&pTask->lock); @@ -433,12 +434,11 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin // drop task should not in the meta-lock, and drop the related fill-history task now streamMetaWUnLock(pMeta); if (pReq->dropRelHTask) { - streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); + streamMetaUnregisterTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks); } - streamMetaWLock(pMeta); } @@ -476,8 +476,9 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin } if (pReq->dropRelHTask) { + hTaskId = pTask->hTaskInfo.id; stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint", - pReq->taskId, vgId, pReq->hTaskId); + pReq->taskId, vgId, hTaskId.taskId); CLEAR_RELATED_FILLHISTORY_TASK(pTask); } @@ -498,9 +499,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin // drop task should not in the meta-lock, and drop the related fill-history task now if (pReq->dropRelHTask) { - streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); + streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, (int32_t) pReq->hTaskId, numOfTasks); + stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, + (int32_t)hTaskId.taskId, numOfTasks); } streamMetaWLock(pMeta); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f6449829a3..da1fec5565 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -372,6 +372,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->expandFunc = expandFunc; pMeta->stage = stage; pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT; + pMeta->updateInfo.transId = -1; pMeta->startInfo.completeFn = fn; pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -1740,4 +1741,14 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", id, vgId, transId, el); } +} + +void streamMetaClearUpdateTaskList(SStreamMeta* pMeta) { + taosHashClear(pMeta->updateInfo.pTasks); + pMeta->updateInfo.transId = -1; +} + +void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) { + taosHashClear(pMeta->updateInfo.pTasks); + pMeta->updateInfo.transId = transId; } \ No newline at end of file