From 385a5c96519ee2b8930fe2b7459220e662b4a179 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 10 Jul 2023 18:20:50 +0800 Subject: [PATCH] fix(stream): fix error in generating checkpoint. --- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/stream/src/streamCheckpoint.c | 26 ++++++++++------------- source/libs/stream/src/streamDispatch.c | 7 +++--- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a3d28832fa..97b0d2c5de 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1607,7 +1607,7 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) { } tDecoderClear(&decoder); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.downstreamTaskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.upstreamTaskId); if (pTask == NULL) { tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId); goto FAIL; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 9233b608b5..bd798d206a 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -67,8 +67,8 @@ int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointRe if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->upstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; @@ -93,8 +93,8 @@ int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRs if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->upstreamTaskId) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->upstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; @@ -115,7 +115,7 @@ int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRs static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, int32_t childId) { int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList); - int64_t old = atomic_val_compare_exchange_64(&pTask->checkpointingId, 0, num); + int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num); if (old == 0) { qDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num); pTask->checkpointingId = checkpointId; @@ -140,10 +140,6 @@ static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t chec if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; - - qDebug("s-task:%s dispatch checkpoint msg to task:0x%x(vgId:%d)", pTask->id.idStr, req.downstreamTaskId, - req.downstreamNodeId); - streamDispatchCheckpointMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; @@ -195,8 +191,6 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, int32_t code = 0; int64_t checkpointId = pReq->checkpointId; - qDebug("s-task:%s level:%d receive the checkpoint msg id:%" PRId64 " from mnode", pTask->id.idStr, - pTask->info.taskLevel, checkpointId); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); // 1. set task status to be prepared for check point, no data are allowed to put into inputQ. @@ -218,9 +212,9 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK); if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - qDebug("s-task:%s sink task set to checkpoint ready, start to send rsp to upstream", pTask->id.idStr); appendCheckpointIntoInputQ(pTask); streamSchedExec(pTask); + qDebug("s-task:%s sink task set to checkpoint ready, start to send rsp to upstream", pTask->id.idStr); } else { // todo close the inputQ for data from childId, which means data from childId are not allowed to put into intpuQ // anymore @@ -230,11 +224,13 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId); if (notReady > 0) { int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList); - qDebug("s-task:%s %d upstream tasks not send checkpoint info yet, total:%d", pTask->id.idStr, notReady, num); + qDebug("s-task:%s received checkpoint req, %d upstream tasks not send checkpoint info yet, total:%d", + pTask->id.idStr, notReady, num); return 0; } - qDebug("s-task:%s all upstream send checkpoint msg now, dispatch checkpoint msg to downstream", pTask->id.idStr); + qDebug("s-task:%s received checkpoint req, all upstream sent checkpoint msg, dispatch checkpoint msg to downstream", + pTask->id.idStr); pTask->checkpointNotReadyTasks = (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) ? 1 : taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); @@ -257,7 +253,7 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask) { // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task int32_t notReady = atomic_sub_fetch_32(&pTask->checkpointNotReadyTasks, 1); if (notReady == 0) { - qDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for this task", + qDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for current task", pTask->id.idStr); appendCheckpointIntoInputQ(pTask); streamSchedExec(pTask); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5f43326ddc..e8220a787d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -231,7 +231,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR tEncoderClear(&encoder); initRpcMsg(&msg, TDMT_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead)); - qDebug("s-task:%s (level:%d) dispatch check msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, + qDebug("s-task:%s (level:%d) dispatch check msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId); tmsgSendReq(pEpSet, &msg); @@ -444,7 +444,7 @@ int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointR tEncoderClear(&encoder); initRpcMsg(&msg,TDMT_STREAM_TASK_CHECKPOINT, buf, tlen + sizeof(SMsgHead)); - qDebug("s-task:%s (level:%d, vgId:%d) dispatch checkpoint msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, + qDebug("s-task:%s (level:%d, vgId:%d) dispatch checkpoint msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, pTask->info.taskLevel, pTask->info.nodeId, pReq->streamId, pReq->downstreamTaskId, nodeId); tmsgSendReq(pEpSet, &msg); @@ -766,6 +766,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; taosArrayPush(pTask->pRpcMsgList, &rspMsg); + qDebug("s-task:%s add checkpoint rsp msg, total:%d", pTask->id.idStr, (int32_t) taosArrayGetSize(pTask->pRpcMsgList)); return TSDB_CODE_SUCCESS; } @@ -793,7 +794,7 @@ int32_t streamAddCheckpointRspMsg(SStreamCheckpointReq* pReq, SRpcHandleInfo* pR return TSDB_CODE_OUT_OF_MEMORY; } - ((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamTaskId); + ((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId); void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));