fix(stream): add log for update task epset, set correct update flag if epset updated.

This commit is contained in:
Haojun Liao 2024-04-25 16:39:11 +08:00
parent 7d1e2f1f9d
commit cd8baa7ba0
2 changed files with 56 additions and 29 deletions

View File

@ -211,7 +211,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
if (pReqTask != NULL) {
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", idstr, vgId, req.transId);
tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId, req.transId);
rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
@ -235,7 +235,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
} else {
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
bool updateEpSet = streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
if (!updated) {
if (updateEpSet) {
updated = updateEpSet;
}
@ -245,14 +245,15 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
}
if (updated) {
tqDebug("s-task:%s vgId:%d save task after update epset", idstr, vgId);
tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
streamMetaSaveTask(pMeta, pTask);
if (ppHTask != NULL) {
streamMetaSaveTask(pMeta, *ppHTask);
}
} else {
tqDebug("s-task:%s vgId:%s not save task since not update epset actually, stop task", idstr);
}
tqDebug("s-task:%s vgId:%d start to stop task after save task", idstr, vgId);
streamTaskStop(pTask);
// keep the already updated info

View File

@ -36,15 +36,20 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
char buf[512] = {0};
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
if (!(*pUpdated)) {
*pUpdated = isEpsetEqual(&pTask->info.epSet, pEpSet);
}
epsetAssign(&pTask->info.epSet, pEpSet);
bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet);
epsetToStr(pEpSet, buf, tListLen(buf));
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
if (!isEqual) {
(*pUpdated) = true;
char tmp[512] = {0};
epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp));
epsetAssign(&pTask->info.epSet, pEpSet);
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
} else {
stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", buf);
}
}
// check for the dispatch info and the upstream task info
@ -620,13 +625,21 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
for (int32_t i = 0; i < numOfUpstream; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
if (pInfo->nodeId == nodeId) {
if (!(*pUpdated)) {
*pUpdated = isEpsetEqual(&pInfo->epSet, pEpSet);
bool equal = isEpsetEqual(&pInfo->epSet, pEpSet);
if (!equal) {
*pUpdated = true;
char tmp[512] = {0};
epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
epsetAssign(&pInfo->epSet, pEpSet);
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
pInfo->taskId, nodeId, buf, tmp);
} else {
stDebug("s-task:0x%x not update upstreamInfo, since identical, task:0x%x(nodeId:%d) epset:%s", pTask->id.taskId,
pInfo->taskId, nodeId, buf);
}
epsetAssign(&pInfo->epSet, pEpSet);
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, pInfo->taskId,
nodeId, buf);
break;
}
}
@ -653,7 +666,6 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool *pUpdated) {
char buf[512] = {0};
epsetToStr(pEpSet, buf, tListLen(buf));
*pUpdated = false;
int32_t id = pTask->id.taskId;
int8_t type = pTask->outputInfo.type;
@ -661,29 +673,43 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(pVgs);
for (int32_t i = 0; i < numOfVgroups; i++) {
for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
if (pVgInfo->vgId == nodeId) {
if (!(*pUpdated)) {
(*pUpdated) = isEpsetEqual(&pVgInfo->epSet, pEpSet);
}
bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
if (!isEqual) {
*pUpdated = true;
char tmp[512] = {0};
epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
epsetAssign(&pVgInfo->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pVgInfo->taskId, nodeId, buf);
epsetAssign(&pVgInfo->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
nodeId, buf, tmp);
} else {
stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
pVgInfo->taskId, nodeId, buf);
}
break;
}
}
} else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
if (pDispatcher->nodeId == nodeId) {
if (!(*pUpdated)) {
*pUpdated = isEpsetEqual(&pDispatcher->epSet, pEpSet);
}
bool equal = isEpsetEqual(&pDispatcher->epSet, pEpSet);
if (!equal) {
*pUpdated = true;
epsetAssign(&pDispatcher->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pDispatcher->taskId, nodeId, buf);
char tmp[512] = {0};
epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
epsetAssign(&pDispatcher->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
nodeId, buf, tmp);
} else {
stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
pDispatcher->taskId, nodeId, buf);
}
}
}
}