fix(stream): fix error in checkpointing.

This commit is contained in:
Haojun Liao 2023-07-10 11:43:37 +08:00
parent 5940bbfb33
commit f6515e2a2c
3 changed files with 8 additions and 4 deletions

View File

@ -637,6 +637,12 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
return code; return code;
} }
SArray* pSinkTaskList = taosArrayGetP(pStream->tasks, 0);
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setEpToDownstreamTask(pAggTask, pSinkTask);
}
// source level // source level
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey); return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey);
} else if (numOfPlanLevel == 1) { } else if (numOfPlanLevel == 1) {

View File

@ -158,8 +158,6 @@ static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t chec
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.downstreamNodeId = pVgInfo->vgId; req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId; req.downstreamTaskId = pVgInfo->taskId;
qDebug("s-task:%s (vgId:%d) checkpoint to task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr,
pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, i);
streamDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
} }
} else { // no need to dispatch msg to downstream task } else { // no need to dispatch msg to downstream task

View File

@ -444,8 +444,8 @@ int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointR
tEncoderClear(&encoder); tEncoderClear(&encoder);
initRpcMsg(&msg,TDMT_STREAM_TASK_CHECKPOINT, buf, tlen + sizeof(SMsgHead)); initRpcMsg(&msg,TDMT_STREAM_TASK_CHECKPOINT, buf, tlen + sizeof(SMsgHead));
qDebug("s-task:%s (level:%d) dispatch checkpoint msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, qDebug("s-task:%s (level:%d, vgId:5d) dispatch checkpoint msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId); pTask->info.taskLevel, pTask->info.nodeId, pReq->streamId, pReq->downstreamTaskId, nodeId);
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
return 0; return 0;