diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 654ac890d5..a3b0d9c18e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -622,7 +622,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointReq* pReq); -int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointRsp* pRsp); +int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 586e692d77..38dbd079fb 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -220,8 +220,8 @@ int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); -int32_t tqProcessStreamCheckPointRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); +int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckStreamStatus(STQ* pTq); int tqCommit(STQ*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9f8c0551de..e2df35784b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -819,6 +819,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } + pTask->pRpcMsgList = taosArrayInit(4, POINTER_BYTES); + // sink if (pTask->outputType == TASK_OUTPUT__SMA) { pTask->smaSink.vnode = pTq->pVnode; @@ -1253,7 +1255,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // even in halt status, the data in inputQ must be processed int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) { + if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT || status == TASK_STATUS__CK) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); @@ -1546,14 +1548,14 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { return code; } -int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) { +int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg) { + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; - char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead)); - int32_t len = msgLen - sizeof(SMsgHead); - int32_t code = 0; - SStreamTaskCheckpointReq req= {0}; + SStreamTaskCheckpointReq req = {0}; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); @@ -1574,21 +1576,22 @@ int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* pMsg, int streamMetaReleaseTask(pMeta, pTask); return code; - FAIL: +FAIL: return code; } // downstream task has complete the stream task checkpoint procedure -int32_t tqProcessStreamCheckPointRsp(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) { +int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) { // if this task is an agg task, rsp this message to upstream directly. // if this task is an source task, send source rsp to mnode int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; - char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead)); - int32_t len = msgLen - sizeof(SMsgHead); + + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t code = 0; - SStreamTaskCheckpointRsp req= {0}; + SStreamTaskCheckpointRsp req = {0}; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); @@ -1605,7 +1608,9 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, int64_t sversion, char* pMsg, int goto FAIL; } - streamProcessCheckpointRsp(pMeta, pTask, &req); + tqDebug("vgId:%d s-task:%s received the checkpoint rsp, handle it", vgId, pTask->id.idStr); + + streamProcessCheckpointRsp(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask); return code; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 398fa6d166..ef24e85446 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -497,17 +497,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg goto _err; } } break; - - case TDMT_STREAM_TASK_CHECKPOINT: { - if (tqProcessStreamCheckPointReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { - goto _err; - } - } break; - case TDMT_STREAM_TASK_CHECKPOINT_RSP: { - if (tqProcessStreamCheckPointRsp(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { - goto _err; - } - } break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) { @@ -684,6 +673,10 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_CHECK_POINT_SOURCE: return tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_CHECKPOINT: + return tqProcessStreamCheckPointReq(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_CHECKPOINT_RSP: + return tqProcessStreamCheckPointRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index eb7081b998..a31e0a5676 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -160,6 +160,9 @@ static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t chec pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, i); streamDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } + } else { // no need to dispatch msg to downstream task + qDebug("s-task:%s no down stream task, not dispatch checkpoint msg to downstream", pTask->id.idStr); + streamProcessCheckpointRsp(NULL, pTask); } return 0; @@ -215,7 +218,7 @@ int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStre ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK); if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - qDebug("s-task:%s sink task set to checkpoint ready", pTask->id.idStr); + qDebug("s-task:%s sink task set to checkpoint ready, start to send rsp to upstream", pTask->id.idStr); appendCheckpointIntoInputQ(pTask); streamSchedExec(pTask); } else { @@ -248,7 +251,7 @@ int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStre * All down stream tasks have successfully completed the check point task. * Current stream task is allowed to start to do checkpoint things in ASYNC model. */ -int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointRsp* pRsp) { +int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG); // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task @@ -258,6 +261,8 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStre pTask->id.idStr); appendCheckpointIntoInputQ(pTask); streamSchedExec(pTask); + } else { + qDebug("s-task:%s %d downstream tasks are not ready, wait", pTask->id.idStr, notReady); } return 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 1af442182e..0a3b5c15b2 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -470,7 +470,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); - } else if (pItem->type == STREAM_CHECKPOINT) { + } else if (pItem->type == STREAM_INPUT__CHECKPOINT) { const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*) pInput; qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT); } else { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 1cf1be4ba2..45accde7f0 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -45,6 +45,7 @@ const char* streamGetTaskStatusStr(int32_t status) { case TASK_STATUS__HALT: return "halt"; case TASK_STATUS__PAUSE: return "paused"; case TASK_STATUS__CK: return "check-point"; + case TASK_STATUS__CK_READY: return "check-point-ready"; default:return ""; } }