From 315b86cf666139c54f80513a0539239c824d597b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 22 Apr 2024 23:52:45 +0800 Subject: [PATCH] fix(stream): save the epset if task epset updated. --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 24 ++++----- source/libs/stream/src/streamTask.c | 58 ++++++++++++++-------- 3 files changed, 50 insertions(+), 34 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index de7c743b7d..64adf53bc8 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -806,7 +806,7 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); void streamTaskCheckDownstream(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamId, int32_t vgId, int64_t stage, int64_t* oldStage); -int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); +bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask); bool streamTaskSetSchedStatusWait(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 4667cd73b1..7deebf3b0f 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -161,6 +161,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM int32_t len = pMsg->contLen - sizeof(SMsgHead); SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; int64_t st = taosGetTimestampMs(); + bool updated = false; SStreamTaskNodeUpdateMsg req = {0}; @@ -214,7 +215,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM return rsp.code; } - streamTaskUpdateEpsetInfo(pTask, req.pNodeList); + updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskResetStatus(pTask); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); @@ -230,21 +231,22 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM CLEAR_RELATED_FILLHISTORY_TASK(pTask); } else { tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); - streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); + bool updateEpSet = streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); + if (!updated) { + updated = updateEpSet; + } + streamTaskResetStatus(*ppHTask); streamTaskStopMonitorCheckRsp(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr); } } - if (restored) { - tqDebug("s-task:%s vgId:%d start to save task", idstr, vgId); + if (updated) { + tqDebug("s-task:%s vgId:%d save task after update epset", idstr, vgId); streamMetaSaveTask(pMeta, pTask); if (ppHTask != NULL) { streamMetaSaveTask(pMeta, *ppHTask); } - - } else { - tqDebug("s-task:%s vgId:%d not save since restore not finish", idstr, vgId); } tqDebug("s-task:%s vgId:%d start to stop task after save task", idstr, vgId); @@ -253,15 +255,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM // keep the already updated info taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0); + int64_t now = taosGetTimestampMs(); if (ppHTask != NULL) { streamTaskStop(*ppHTask); - - int64_t now = taosGetTimestampMs(); tqDebug("s-task:%s vgId:%d task nodeEp update completed, streamTask/fill-history closed, elapsed:%" PRId64 " ms", idstr, vgId, now - st); taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); } else { - int64_t now = taosGetTimestampMs(); tqDebug("s-task:%s vgId:%d, task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", idstr, vgId, now - st); } @@ -277,7 +277,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM if (updateTasks < numOfTasks) { tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, updateTasks, (numOfTasks - updateTasks)); - streamMetaWUnLock(pMeta); } else { if (streamMetaCommit(pMeta) < 0) { // persist to disk @@ -286,7 +285,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM if (!restored) { tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId); pMeta->startInfo.tasksWillRestart = 0; - streamMetaWUnLock(pMeta); } else { tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId); #if 0 @@ -295,10 +293,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM #endif tqStreamTaskStartAsync(pMeta, cb, true); - streamMetaWUnLock(pMeta); } } + streamMetaWUnLock(pMeta); taosArrayDestroy(req.pNodeList); return rsp.code; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 72611f4c14..b87c19b08e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -24,8 +24,8 @@ #define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo); -static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); -static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); +static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated); +static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate); static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray); @@ -34,10 +34,14 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { return 0; } -static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { +static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) { char buf[512] = {0}; if (pTask->info.nodeId == nodeId) { // execution task should be moved away + if (!(*pUpdated)) { + *pUpdated = isEpsetEqual(&pTask->info.epSet, pEpSet); + } + epsetAssign(&pTask->info.epSet, pEpSet); epsetToStr(pEpSet, buf, tListLen(buf)); stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf); @@ -46,12 +50,12 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp // check for the dispatch info and the upstream task info int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__SOURCE) { - streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); + streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated); } else if (level == TASK_LEVEL__AGG) { - streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); - streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); + streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated); + streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated); } else { // TASK_LEVEL__SINK - streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); + streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated); } return 0; @@ -608,7 +612,7 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre return TSDB_CODE_SUCCESS; } -void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { +void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) { char buf[512] = {0}; epsetToStr(pEpSet, buf, tListLen(buf)); @@ -616,6 +620,10 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS for (int32_t i = 0; i < numOfUpstream; ++i) { SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); if (pInfo->nodeId == nodeId) { + if (!(*pUpdated)) { + *pUpdated = isEpsetEqual(&pInfo->epSet, pEpSet); + } + epsetAssign(&pInfo->epSet, pEpSet); stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, pInfo->taskId, nodeId, buf); @@ -642,12 +650,14 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; } -void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { +void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool *pUpdated) { char buf[512] = {0}; epsetToStr(pEpSet, buf, tListLen(buf)); - int32_t id = pTask->id.taskId; + *pUpdated = false; + + int32_t id = pTask->id.taskId; + int8_t type = pTask->outputInfo.type; - int8_t type = pTask->outputInfo.type; if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; @@ -656,18 +666,24 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); if (pVgInfo->vgId == nodeId) { + if (!(*pUpdated)) { + (*pUpdated) = isEpsetEqual(&pVgInfo->epSet, pEpSet); + } + epsetAssign(&pVgInfo->epSet, pEpSet); - stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pVgInfo->taskId, nodeId, - buf); + stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pVgInfo->taskId, nodeId, buf); break; } } } else if (type == TASK_OUTPUT__FIXED_DISPATCH) { STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher; if (pDispatcher->nodeId == nodeId) { + if (!(*pUpdated)) { + *pUpdated = isEpsetEqual(&pDispatcher->epSet, pEpSet); + } + epsetAssign(&pDispatcher->epSet, pEpSet); - stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pDispatcher->taskId, nodeId, - buf); + stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pDispatcher->taskId, nodeId, buf); } } } @@ -690,7 +706,7 @@ int32_t streamTaskStop(SStreamTask* pTask) { return 0; } -int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { +bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { STaskExecStatisInfo* p = &pTask->execInfo; int32_t numOfNodes = taosArrayGetSize(pNodeList); @@ -701,11 +717,13 @@ int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId, numOfNodes, p->updateCount, prevTs); + bool updated = false; for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i); - doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp); + doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated); } - return 0; + + return updated; } void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) { @@ -1033,7 +1051,7 @@ static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, const char* id) stWarn("s-task:%s already not in-check-procedure", id); } - int64_t el = taosGetTimestampMs() - pInfo->startTs; + int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0; stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el); pInfo->startTs = 0; @@ -1075,7 +1093,7 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); break;