fix(stream): fix error in generating checkpoint.

This commit is contained in:
Haojun Liao 2023-07-10 18:20:50 +08:00
parent d6e7e6e174
commit 385a5c9651
3 changed files with 16 additions and 19 deletions

View File

@ -1607,7 +1607,7 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) {
}
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.downstreamTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.upstreamTaskId);
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId);
goto FAIL;

View File

@ -67,8 +67,8 @@ int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointRe
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
@ -93,8 +93,8 @@ int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRs
if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
@ -115,7 +115,7 @@ int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRs
static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, int32_t childId) {
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
int64_t old = atomic_val_compare_exchange_64(&pTask->checkpointingId, 0, num);
int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num);
if (old == 0) {
qDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num);
pTask->checkpointingId = checkpointId;
@ -140,10 +140,6 @@ static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t chec
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
qDebug("s-task:%s dispatch checkpoint msg to task:0x%x(vgId:%d)", pTask->id.idStr, req.downstreamTaskId,
req.downstreamNodeId);
streamDispatchCheckpointMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
@ -195,8 +191,6 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask,
int32_t code = 0;
int64_t checkpointId = pReq->checkpointId;
qDebug("s-task:%s level:%d receive the checkpoint msg id:%" PRId64 " from mnode", pTask->id.idStr,
pTask->info.taskLevel, checkpointId);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
@ -218,9 +212,9 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
qDebug("s-task:%s sink task set to checkpoint ready, start to send rsp to upstream", pTask->id.idStr);
appendCheckpointIntoInputQ(pTask);
streamSchedExec(pTask);
qDebug("s-task:%s sink task set to checkpoint ready, start to send rsp to upstream", pTask->id.idStr);
} else {
// todo close the inputQ for data from childId, which means data from childId are not allowed to put into intpuQ
// anymore
@ -230,11 +224,13 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe
int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId);
if (notReady > 0) {
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
qDebug("s-task:%s %d upstream tasks not send checkpoint info yet, total:%d", pTask->id.idStr, notReady, num);
qDebug("s-task:%s received checkpoint req, %d upstream tasks not send checkpoint info yet, total:%d",
pTask->id.idStr, notReady, num);
return 0;
}
qDebug("s-task:%s all upstream send checkpoint msg now, dispatch checkpoint msg to downstream", pTask->id.idStr);
qDebug("s-task:%s received checkpoint req, all upstream sent checkpoint msg, dispatch checkpoint msg to downstream",
pTask->id.idStr);
pTask->checkpointNotReadyTasks = (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH)
? 1
: taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
@ -257,7 +253,7 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask) {
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
int32_t notReady = atomic_sub_fetch_32(&pTask->checkpointNotReadyTasks, 1);
if (notReady == 0) {
qDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for this task",
qDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for current task",
pTask->id.idStr);
appendCheckpointIntoInputQ(pTask);
streamSchedExec(pTask);

View File

@ -231,7 +231,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
tEncoderClear(&encoder);
initRpcMsg(&msg, TDMT_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead));
qDebug("s-task:%s (level:%d) dispatch check msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
qDebug("s-task:%s (level:%d) dispatch check msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
tmsgSendReq(pEpSet, &msg);
@ -444,7 +444,7 @@ int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointR
tEncoderClear(&encoder);
initRpcMsg(&msg,TDMT_STREAM_TASK_CHECKPOINT, buf, tlen + sizeof(SMsgHead));
qDebug("s-task:%s (level:%d, vgId:%d) dispatch checkpoint msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
qDebug("s-task:%s (level:%d, vgId:%d) dispatch checkpoint msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
pTask->info.taskLevel, pTask->info.nodeId, pReq->streamId, pReq->downstreamTaskId, nodeId);
tmsgSendReq(pEpSet, &msg);
@ -766,6 +766,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
taosArrayPush(pTask->pRpcMsgList, &rspMsg);
qDebug("s-task:%s add checkpoint rsp msg, total:%d", pTask->id.idStr, (int32_t) taosArrayGetSize(pTask->pRpcMsgList));
return TSDB_CODE_SUCCESS;
}
@ -793,7 +794,7 @@ int32_t streamAddCheckpointRspMsg(SStreamCheckpointReq* pReq, SRpcHandleInfo* pR
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamTaskId);
((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId);
void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));