From d63758a1c54f5a94a6345dff9bb0f376d4e89545 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 16 Aug 2023 15:24:24 +0800 Subject: [PATCH] fix(stream): fix error in execute task on a tranferred node. --- include/libs/stream/tstream.h | 2 ++ source/dnode/vnode/src/vnd/vnodeSync.c | 20 +++++++---- source/libs/stream/src/streamMeta.c | 4 +++ source/libs/stream/src/streamTask.c | 46 +++++++++++++++++++++----- 4 files changed, 56 insertions(+), 16 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a9feeeeb95..b08832ed46 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -638,6 +638,8 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); +void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); + int32_t streamTaskStop(SStreamTask* pTask); int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, SRpcHandleInfo* pRpcInfo, int32_t taskId); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 3fe74989fb..30539534cc 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -530,6 +530,7 @@ static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *p static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { SVnode *pVnode = pFsm->data; + int32_t vgId = pVnode->config.vgId; SyncIndex appliedIdx = -1; do { @@ -541,7 +542,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) } else { vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64 ", applied-index:%" PRId64, - pVnode->config.vgId, commitIdx - appliedIdx, commitIdx, appliedIdx); + vgId, commitIdx - appliedIdx, commitIdx, appliedIdx); taosMsleep(10); } } while (true); @@ -550,14 +551,19 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) walApplyVer(pVnode->pWal, commitIdx); pVnode->restored = true; - vInfo("vgId:%d, sync restore finished, start to restore stream tasks by replay wal", pVnode->config.vgId); - // start to restore all stream tasks - if (tsDisableStream) { - vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", pVnode->config.vgId); + if (vnodeIsRoleLeader(pVnode)) { + vInfo("vgId:%d, sync restore finished, start to launch stream tasks", vgId); + + // start to restore all stream tasks + if (tsDisableStream) { + vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId); + } else { + vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); + tqCheckStreamStatus(pVnode->pTq); + } } else { - vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); - tqCheckStreamStatus(pVnode->pTq); + vInfo("vgId:%d, sync restore finished, no launch stream tasks since not leader", vgId); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 6408a92195..799dc96ec7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -143,6 +143,10 @@ _err: void streamMetaClose(SStreamMeta* pMeta) { qDebug("start to close stream meta"); + if (pMeta == NULL) { + return; + } + tdbAbort(pMeta->db, pMeta->txn); tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pCheckpointDb); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 2f957e7dea..6899573427 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -405,11 +405,15 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre } void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { + char buf[512] = {0}; + EPSET_TO_STR(pEpSet, buf); + int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList); for(int32_t i = 0; i < numOfUpstream; ++i) { SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i); if (pInfo->nodeId == nodeId) { epsetAssign(&pInfo->epSet, pEpSet); + qDebug("s-task:0x%x update the upstreamInfo, nodeId:%d newEpset:%s", pTask->id.taskId, nodeId, buf); break; } } @@ -426,6 +430,9 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo } void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { + char buf[512] = {0}; + EPSET_TO_STR(pEpSet, buf); + int8_t type = pTask->outputInfo.type; if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; @@ -435,16 +442,16 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); if (pVgInfo->vgId == nodeId) { - pVgInfo->epSet = *pEpSet; - qDebug("s-task:0x%x update the dispatch info, nodeId:%d", pTask->id.taskId, nodeId); + epsetAssign(&pVgInfo->epSet, pEpSet); + qDebug("s-task:0x%x update the dispatch info, nodeId:%d newEpset:%s", pTask->id.taskId, nodeId, buf); break; } } } else if (type == TASK_OUTPUT__FIXED_DISPATCH) { STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher; if (pDispatcher->nodeId == nodeId) { - pDispatcher->epSet = *pEpSet; - qDebug("s-task:0x%x update the dispatch info, nodeId:%d", pTask->id.taskId, nodeId); + epsetAssign(&pDispatcher->epSet, pEpSet); + qDebug("s-task:0x%x update the dispatch info, nodeId:%d newEpSet:%s", pTask->id.taskId, nodeId, buf); } } else { // do nothing @@ -486,7 +493,12 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask) taosArrayClear(pTask->checkReqIds); taosArrayClear(pTask->pRspMsgList); + // reset the upstream task stage info + streamTaskResetUpstreamStageInfo(pTask); + pTask->status.downstreamReady = 0; + + // todo: handle the case when the task is in fill-history (step 1) phase streamSetStatusNormal(pTask); taosWLockLatch(&pTask->pMeta->lock); @@ -494,20 +506,22 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask) streamMetaCommit(pTask->pMeta); taosWUnLockLatch(&pTask->pMeta->lock); -// qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id, -// pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus)); + qDebug("s-task:%s vgId:%d restart completed", pTask->id.idStr, vgId); // 3. start to check the downstream status if (startTask) { streamTaskCheckDownstreamTasks(pTask); } - return 0; } -int32_t doUpdateEpsetInfo(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { +int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { + char buf[512] = {0}; + if (pTask->info.nodeId == nodeId) { // execution task should be moved away epsetAssign(&pTask->info.epSet, pEpSet); + EPSET_TO_STR(pEpSet, buf) + qDebug("s-task:0x%x (vgId:%d) epset is updated %s", pTask->id.taskId, nodeId, buf); } // check for the dispath info and the upstream task info @@ -527,7 +541,21 @@ int32_t doUpdateEpsetInfo(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { for(int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i); - doUpdateEpsetInfo(pTask, pInfo->nodeId, &pInfo->newEp); + doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp); } return 0; +} + +void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + return; + } + + int32_t size = taosArrayGetSize(pTask->pUpstreamInfoList); + for(int32_t i = 0; i < size; ++i) { + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); + pInfo->stage = -1; + } + + qDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr); } \ No newline at end of file