From cd8baa7ba00b59b361b22370114494034944ca69 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Apr 2024 16:39:11 +0800 Subject: [PATCH] fix(stream): add log for update task epset, set correct update flag if epset updated. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 9 +-- source/libs/stream/src/streamTask.c | 76 +++++++++++++++------- 2 files changed, 56 insertions(+), 29 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 8b2e9693eb..254e7e5856 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -211,7 +211,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); if (pReqTask != NULL) { - tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", idstr, vgId, req.transId); + tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId, req.transId); rsp.code = TSDB_CODE_SUCCESS; streamMetaWUnLock(pMeta); taosArrayDestroy(req.pNodeList); @@ -235,7 +235,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } else { tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); bool updateEpSet = streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); - if (!updated) { + if (updateEpSet) { updated = updateEpSet; } @@ -245,14 +245,15 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } if (updated) { - tqDebug("s-task:%s vgId:%d save task after update epset", idstr, vgId); + tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId); streamMetaSaveTask(pMeta, pTask); if (ppHTask != NULL) { streamMetaSaveTask(pMeta, *ppHTask); } + } else { + tqDebug("s-task:%s vgId:%s not save task since not update epset actually, stop task", idstr); } - tqDebug("s-task:%s vgId:%d start to stop task after save task", idstr, vgId); streamTaskStop(pTask); // keep the already updated info diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 2528e03593..dff1a1505f 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -36,15 +36,20 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) { char buf[512] = {0}; - if (pTask->info.nodeId == nodeId) { // execution task should be moved away - if (!(*pUpdated)) { - *pUpdated = isEpsetEqual(&pTask->info.epSet, pEpSet); - } - - epsetAssign(&pTask->info.epSet, pEpSet); + bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet); epsetToStr(pEpSet, buf, tListLen(buf)); - stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf); + + if (!isEqual) { + (*pUpdated) = true; + char tmp[512] = {0}; + epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); + + epsetAssign(&pTask->info.epSet, pEpSet); + stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp); + } else { + stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", buf); + } } // check for the dispatch info and the upstream task info @@ -620,13 +625,21 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS for (int32_t i = 0; i < numOfUpstream; ++i) { SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); if (pInfo->nodeId == nodeId) { - if (!(*pUpdated)) { - *pUpdated = isEpsetEqual(&pInfo->epSet, pEpSet); + bool equal = isEpsetEqual(&pInfo->epSet, pEpSet); + if (!equal) { + *pUpdated = true; + + char tmp[512] = {0}; + epsetToStr(&pInfo->epSet, tmp, tListLen(tmp)); + + epsetAssign(&pInfo->epSet, pEpSet); + stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId, + pInfo->taskId, nodeId, buf, tmp); + } else { + stDebug("s-task:0x%x not update upstreamInfo, since identical, task:0x%x(nodeId:%d) epset:%s", pTask->id.taskId, + pInfo->taskId, nodeId, buf); } - epsetAssign(&pInfo->epSet, pEpSet); - stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, pInfo->taskId, - nodeId, buf); break; } } @@ -653,7 +666,6 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool *pUpdated) { char buf[512] = {0}; epsetToStr(pEpSet, buf, tListLen(buf)); - *pUpdated = false; int32_t id = pTask->id.taskId; int8_t type = pTask->outputInfo.type; @@ -661,29 +673,43 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t numOfVgroups = taosArrayGetSize(pVgs); - for (int32_t i = 0; i < numOfVgroups; i++) { + for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) { SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); if (pVgInfo->vgId == nodeId) { - if (!(*pUpdated)) { - (*pUpdated) = isEpsetEqual(&pVgInfo->epSet, pEpSet); - } + bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet); + if (!isEqual) { + *pUpdated = true; + char tmp[512] = {0}; + epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp)); - epsetAssign(&pVgInfo->epSet, pEpSet); - stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pVgInfo->taskId, nodeId, buf); + epsetAssign(&pVgInfo->epSet, pEpSet); + stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId, + nodeId, buf, tmp); + } else { + stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id, + pVgInfo->taskId, nodeId, buf); + } break; } } } else if (type == TASK_OUTPUT__FIXED_DISPATCH) { STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher; if (pDispatcher->nodeId == nodeId) { - if (!(*pUpdated)) { - *pUpdated = isEpsetEqual(&pDispatcher->epSet, pEpSet); - } + bool equal = isEpsetEqual(&pDispatcher->epSet, pEpSet); + if (!equal) { + *pUpdated = true; - epsetAssign(&pDispatcher->epSet, pEpSet); - stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pDispatcher->taskId, nodeId, buf); + char tmp[512] = {0}; + epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp)); + + epsetAssign(&pDispatcher->epSet, pEpSet); + stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId, + nodeId, buf, tmp); + } else { + stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id, + pDispatcher->taskId, nodeId, buf); + } } } }