fix(stream): add need update node into update node list.

This commit is contained in:
Haojun Liao 2023-11-05 01:24:23 +08:00
parent 680eb5674e
commit 1f756f58f6
1 changed files with 33 additions and 27 deletions

View File

@ -318,6 +318,31 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
}
static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t vgId = pTask->pMeta->vgId;
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
bool existed = false;
for (int i = 0; i < num; ++i) {
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i);
if (p->nodeId == nodeId) {
existed = true;
break;
}
}
if (!existed) {
SDownstreamTaskEpset t = {.nodeId = nodeId};
taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t);
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId,
t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList));
}
taosThreadMutexUnlock(&pTask->lock);
}
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
const char* id = pTask->id.idStr;
@ -372,35 +397,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
"s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, "
"not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
} else {
if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
stError(
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
"downstream again, nodeUpdate needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
bool existed = false;
for (int i = 0; i < num; ++i) {
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i);
if (p->nodeId == pRsp->downstreamNodeId) {
existed = true;
break;
}
}
if (!existed) {
SDownstreamTaskEpset t = {.nodeId = pRsp->downstreamNodeId};
taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t);
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", id, vgId,
t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList));
}
taosThreadMutexUnlock(&pTask->lock);
return 0;
}
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
} else if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
stError(
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
"downstream again, nodeUpdate needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);