diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 8940d1be96..e6e78bd1df 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -257,7 +257,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT_READY, "stream-checkpoint-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 045d89cc22..08e4b55ffe 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -123,7 +123,7 @@ typedef struct { int8_t type; int32_t srcVgId; - int32_t childId; + int32_t srcTaskId; int64_t sourceVer; int64_t reqId; @@ -312,7 +312,7 @@ struct SStreamTask { SStreamId historyTaskId; SStreamId streamTaskId; SArray* pUpstreamInfoList; // SArray, // children info - SArray* pRpcMsgList; // SArray + SArray* pReadyMsgList; // SArray // output union { @@ -368,7 +368,6 @@ typedef struct SStreamMeta { SHashObj* pTaskBackendUnique; TdThreadMutex backendMutex; -// uint64_t checkpointId; int32_t chkptNotReadyTasks; SArray* checkpointSaved; SArray* checkpointInUse; @@ -396,7 +395,8 @@ typedef struct { typedef struct { int64_t streamId; int32_t taskId; - int32_t dataSrcVgId; + int32_t type; + int32_t srcVgId; int32_t upstreamTaskId; int32_t upstreamChildId; int32_t upstreamNodeId; @@ -503,24 +503,10 @@ typedef struct { int32_t upstreamTaskId; int32_t upstreamNodeId; int32_t childId; -} SStreamCheckpointReq; +} SStreamCheckpointReadyMsg; -typedef struct { - SMsgHead msgHead; - int64_t streamId; - int64_t checkpointId; - int32_t downstreamTaskId; - int32_t downstreamNodeId; - int32_t upstreamTaskId; - int32_t upstreamNodeId; - int32_t childId; -} SStreamCheckpointRsp; - -int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointReq* pReq); -int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pReq); - -int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp); -int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp); +int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp); +int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp); typedef struct { int64_t streamId; @@ -541,12 +527,12 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp); int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp); +int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq); int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); + int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq); -int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, - int64_t dstTaskId); void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); int32_t streamSetupScheduleTrigger(SStreamTask* pTask); @@ -556,8 +542,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); -void streamTaskOpenUpstreamInput(SStreamTask* pTask); +void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); +SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); void streamTaskInputFail(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask); @@ -623,16 +610,16 @@ int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); // checkpoint -int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); -int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pReq); -int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask); +int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); +int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); +int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); -int32_t streamAddCheckpointRspMsg(SStreamCheckpointReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); +int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 47ff491c3f..311cbdc43b 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -80,7 +80,6 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 70ce6f5c28..ebcb13e75b 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -751,8 +751,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 198086b4b6..922bd35e23 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -221,8 +221,7 @@ int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckStreamStatus(STQ* pTq); int tqCommit(STQ*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 84ab014597..354e210ec1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -833,8 +833,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } - pTask->pRpcMsgList = taosArrayInit(4, sizeof(SRpcMsg)); - // sink if (pTask->outputType == TASK_OUTPUT__SMA) { pTask->smaSink.vnode = pTq->pVnode; @@ -865,7 +863,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); } - streamTaskOpenUpstreamInput(pTask); + streamTaskOpenAllUpstreamInput(pTask); streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo; @@ -1276,13 +1274,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // even in halt status, the data in inputQ must be processed int8_t status = pTask->status.taskStatus; if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT || status == TASK_STATUS__CK) { - tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.checkpointVer); + tqDebug("vgId:%d s-task:%s start to process block from inputQ, status:%s, checkpointVer:%" PRId64, vgId, + pTask->id.idStr, streamGetTaskStatusStr(status), pTask->chkInfo.checkpointVer); streamProcessRunReq(pTask); } else { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, - pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); + pTask->id.idStr, streamGetTaskStatusStr(status), pTask->status.schedStatus); } streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -1531,7 +1529,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { code = TSDB_CODE_MSG_DECODE_ERROR; tDecoderClear(&decoder); tqError("vgId:%d failed to decode checkpoint source msg, code:%s", vgId, tstrerror(code)); - goto FAIL; + return code; } tDecoderClear(&decoder); @@ -1540,12 +1538,12 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { if (pTask == NULL) { tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, req.taskId); - goto FAIL; + return TSDB_CODE_SUCCESS; } code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); if (code != TSDB_CODE_SUCCESS) { - goto FAIL; + return code; } // todo: when generating checkpoint, no new tasks are allowed to add into current Vnode @@ -1561,67 +1559,27 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { total = taosArrayGetSize(pMeta->pTaskList); taosWUnLockLatch(&pMeta->lock); - qDebug("s-task:%s level:%d receive the checkpoint msg id:%" PRId64 " from mnode, total source checkpoint req:%d", + qDebug("s-task:%s level:%d receive checkpoint source msg from mnode id:%" PRId64 ", total source checkpoint req:%d", pTask->id.idStr, pTask->info.taskLevel, req.checkpointId, total); - streamProcessCheckpointSourceReq(pMeta, pTask, &req); - streamMetaReleaseTask(pMeta, pTask); - return code; - -FAIL: - return code; -} - -int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg) { - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - int32_t code = 0; - int32_t vgId = TD_VID(pTq->pVnode); - SStreamMeta* pMeta = pTq->pStreamMeta; - - SStreamCheckpointReq req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, len); - if (tDecodeStreamCheckpointReq(&decoder, &req) < 0) { - code = TSDB_CODE_MSG_DECODE_ERROR; - tDecoderClear(&decoder); - return code; - } - tDecoderClear(&decoder); - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.downstreamTaskId); - if (pTask == NULL) { - tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId); - return TSDB_CODE_SUCCESS; - } - - code = streamAddCheckpointRspMsg(&req, &pMsg->info, pTask); - if (code != TSDB_CODE_SUCCESS) { - streamMetaReleaseTask(pMeta, pTask); - return code; - } - - streamProcessCheckpointReq(pTask, &req); + streamProcessCheckpointSourceReq(pTask, &req); streamMetaReleaseTask(pMeta, pTask); return code; } -// downstream task has complete the stream task checkpoint procedure -int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) { - // if this task is an agg task, rsp this message to upstream directly. - // if this task is an source task, send source rsp to mnode +// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task +int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t code = 0; - SStreamCheckpointRsp req = {0}; + SStreamCheckpointReadyMsg req = {0}; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); - if (tDecodeStreamCheckpointRsp(&decoder, &req) < 0) { + if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) { code = TSDB_CODE_MSG_DECODE_ERROR; tDecoderClear(&decoder); return code; @@ -1630,13 +1588,13 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.upstreamTaskId); if (pTask == NULL) { - tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId); + tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId); return code; } - tqDebug("vgId:%d s-task:%s received the checkpoint rsp, handle it", vgId, pTask->id.idStr); + tqDebug("vgId:%d s-task:%s received the checkpoint ready msg, handle it", vgId, pTask->id.idStr); - streamProcessCheckpointRsp(pMeta, pTask); + streamProcessCheckpointReadyMsg(pTask); streamMetaReleaseTask(pMeta, pTask); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 1bacc257d0..1c159ce534 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -673,10 +673,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_CHECK_POINT_SOURCE: return tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_CHECKPOINT: - return tqProcessStreamCheckPointReq(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_CHECKPOINT_RSP: - return tqProcessStreamCheckPointRsp(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_CHECKPOINT_READY: + return tqProcessStreamTaskCheckpointReadyMsg(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index e7f13a4338..d732ec2876 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -50,13 +50,12 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); -int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t checkpointId); -int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask); +int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); -int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id); +int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); #ifdef __cplusplus diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index f1a6d7423c..b53d6e9d1c 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -143,10 +143,10 @@ int32_t streamSchedExec(SStreamTask* pTask) { return 0; } -int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { +static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int8_t status = 0; - SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId); + SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); if (pBlock == NULL) { streamTaskInputFail(pTask); status = TASK_INPUT_STATUS__FAILED; @@ -239,9 +239,12 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); - // todo: if current task has received the checkpoint req from the upstream t#1, the msg from t#1 should all blocked + // Current task has received the checkpoint req from the upstream task, from which the message should all be blocked + if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); + } - streamTaskEnqueueBlocks(pTask, pReq, pRsp); + streamTaskAppendInputBlocks(pTask, pReq, pRsp); tDeleteStreamDispatchReq(pReq); if (exec) { @@ -322,7 +325,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { } } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) { taosWriteQitem(pTask->inputQueue->queue, pItem); - qDebug("s-task:%s checkpoint enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); + qDebug("s-task:%s level:%d checkpoint(trigger) enqueue inputQ, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, + pTask->info.taskLevel, total, size); } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. taosWriteQitem(pTask->inputQueue->queue, pItem); qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); @@ -357,25 +361,33 @@ void* streamQueueNextItem(SStreamQueue* pQueue) { void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } -void streamTaskOpenUpstreamInput(SStreamTask* pTask) { +void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); if (num == 0) { return; } for(int32_t i = 0; i < num; ++i) { - SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i); + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); pInfo->dataAllowed = true; } } void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); + if (pInfo != NULL) { + pInfo->dataAllowed = false; + } +} + +SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); for(int32_t i = 0; i < num; ++i) { - SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i); + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); if (pInfo->taskId == taskId) { - pInfo->dataAllowed = false; - break; + return pInfo; } } + + return NULL; } \ No newline at end of file diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 839822c103..772500bc80 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -61,7 +61,7 @@ int32_t tDecodeStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSo return 0; } -int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointReq* pReq) { +int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; @@ -74,33 +74,7 @@ int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointRe return pEncoder->pos; } -int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - -int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp) { +int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1; @@ -113,7 +87,7 @@ int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRs return 0; } -static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, int32_t childId) { +static int32_t streamAlignCheckpoint(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num); if (old == 0) { @@ -124,19 +98,27 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, i } static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) { - SStreamCheckpoint* pChkpoint = taosAllocateQitem(sizeof(SStreamCheckpoint), DEF_QITEM, sizeof(SSDataBlock)); + SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pChkpoint == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } pChkpoint->type = checkpointType; - pChkpoint->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - if (pChkpoint->pBlock == NULL) { + + SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (pBlock == NULL) { taosFreeQitem(pChkpoint); return TSDB_CODE_OUT_OF_MEMORY; } - pChkpoint->pBlock->info.type = STREAM_CHECKPOINT; + pBlock->info.type = STREAM_CHECKPOINT; + pBlock->info.version = pTask->checkpointingId; + pBlock->info.rows = 1; + pBlock->info.childId = pTask->info.selfChildId; + + pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock; + taosArrayPush(pChkpoint->blocks, pBlock); + if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pChkpoint) < 0) { taosFreeQitem(pChkpoint); return TSDB_CODE_OUT_OF_MEMORY; @@ -146,56 +128,72 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint return TSDB_CODE_SUCCESS; } -int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) { - int32_t code = 0; +int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); // 1. set task status to be prepared for check point, no data are allowed to put into inputQ. pTask->status.taskStatus = TASK_STATUS__CK; - pTask->checkpointNotReadyTasks = 1; pTask->checkpointingId = pReq->checkpointId; + pTask->checkpointNotReadyTasks = 1; // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. // 2. put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task already. - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); - streamSchedExec(pTask); - return code; + return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); } -int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pReq) { - int64_t checkpointId = pReq->checkpointId; - int32_t childId = pReq->childId; +int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { + SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0); + int64_t checkpointId = pDataBlock->info.version; // set the task status pTask->checkpointingId = checkpointId; pTask->status.taskStatus = TASK_STATUS__CK; //todo fix race condition: set the status and append checkpoint block + int32_t taskLevel = pTask->info.taskLevel; + if (taskLevel == TASK_LEVEL__SOURCE) { + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + pBlock->srcTaskId = pTask->id.taskId; + pBlock->srcVgId = pTask->pMeta->vgId; - ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK); + qDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", pTask->id.idStr, + pTask->info.selfChildId); - if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock); + if (code != 0) { // todo failed to add it into the output queue, free it. + return code; + } + + streamFreeQitem((SStreamQueueItem*)pBlock); + streamDispatchStreamBlock(pTask); + } else { // only one task exists + streamProcessCheckpointReadyMsg(pTask); + } + } else if (taskLevel == TASK_LEVEL__SINK) { // todo: sink node needs alignment?? - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT); - streamSchedExec(pTask); - qDebug("s-task:%s sink task set to checkpoint ready, start to send rsp to upstream", pTask->id.idStr); + ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); + pTask->status.taskStatus = TASK_STATUS__CK_READY; + + // update the child Id for downstream tasks + streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); + qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", pTask->id.idStr); } else { ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0); - // close the inputQ for data from upstream task. - streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); + // update the child Id for downstream tasks + streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); // there are still some upstream tasks not send checkpoint request, do nothing and wait for then - int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId); + int32_t notReady = streamAlignCheckpoint(pTask); int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); if (notReady > 0) { - qDebug("s-task:%s received checkpoint req, %d upstream tasks not send checkpoint info yet, total:%d", - pTask->id.idStr, notReady, num); + qDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d", + pTask->id.idStr, pTask->info.selfChildId, notReady, num); return 0; } qDebug( - "s-task:%s receive one checkpoint req, all %d upstream sent checkpoint msgs, dispatch checkpoint msg to " + "s-task:%s receive one checkpoint block, all %d upstream sent checkpoint msgs, dispatch checkpoint msg to " "downstream", pTask->id.idStr, num); @@ -206,8 +204,20 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe // if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY // put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // already. And then, dispatch check point msg to all downstream tasks - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); - streamSchedExec(pTask); + + { + pBlock->srcTaskId = pTask->id.taskId; + pBlock->srcVgId = pTask->pMeta->vgId; + + ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); + int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock); + if (code != 0) { // todo failed to add it into the output queue, free it. + return code; + } + + streamFreeQitem((SStreamQueueItem*)pBlock); + streamDispatchStreamBlock(pTask); + } } return 0; @@ -217,16 +227,17 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe * All down stream tasks have successfully completed the check point task. * Current stream task is allowed to start to do checkpoint things in ASYNC model. */ -int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask) { +int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG); // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task int32_t notReady = atomic_sub_fetch_32(&pTask->checkpointNotReadyTasks, 1); + ASSERT(notReady >= 0); + if (notReady == 0) { qDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for current task", pTask->id.idStr); appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT); - streamSchedExec(pTask); } else { int32_t total = streamTaskGetNumOfDownstream(pTask); qDebug("s-task:%s %d/%d downstream tasks are not ready, wait", pTask->id.idStr, notReady, total); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index f8eb6ef069..1080b67a63 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -23,6 +23,7 @@ SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pRe pData->type = blockType; pData->srcVgId = srcVg; + pData->srcTaskId = pReq->upstreamTaskId; int32_t blockNum = pReq->blockNum; SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum); @@ -59,16 +60,15 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT return NULL; } + pStreamBlocks->srcTaskId = pTask->id.taskId; pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK; pStreamBlocks->blocks = pRes; if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem; - pStreamBlocks->childId = pTask->info.selfChildId; pStreamBlocks->sourceVer = pSubmit->ver; } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem; - pStreamBlocks->childId = pTask->info.selfChildId; pStreamBlocks->sourceVer = pMerged->ver; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index f246a781c6..9790762c0e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -26,8 +26,13 @@ typedef struct SBlockName { char parTbName[TSDB_TABLE_NAME_LEN]; } SBlockName; +typedef struct { + int32_t taskId; + SRpcMsg msg; + SEpSet epset; +} SStreamChkptReadyInfo; + static void doRetryDispatchData(void* param, void* tmrId); -static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq); static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq); static int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, SEpSet* pEpSet); @@ -46,7 +51,8 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->dataSrcVgId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; @@ -70,15 +76,16 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { return 0; } -int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, - int64_t dstTaskId) { +static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, + int32_t numOfBlocks, int64_t dstTaskId, int32_t type) { pReq->streamId = pTask->id.streamId; - pReq->dataSrcVgId = vgId; + pReq->srcVgId = vgId; pReq->upstreamTaskId = pTask->id.taskId; pReq->upstreamChildId = pTask->info.selfChildId; pReq->upstreamNodeId = pTask->info.nodeId; pReq->blockNum = numOfBlocks; pReq->taskId = dstTaskId; + pReq->type = type; pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES); pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t)); @@ -238,9 +245,8 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR return 0; } -static int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { +static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t code = 0; - int32_t numOfBlocks = taosArrayGetSize(pData->blocks); ASSERT(numOfBlocks != 0); @@ -248,7 +254,7 @@ static int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBloc SStreamDispatchReq req = {0}; int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; - code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId); + code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -289,7 +295,7 @@ static int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBloc for (int32_t i = 0; i < vgSz; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId); + code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type); if (code != TSDB_CODE_SUCCESS) { goto FAIL_SHUFFLE_DISPATCH; } @@ -299,7 +305,7 @@ static int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBloc SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); // TODO: do not use broadcast - if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT) { for (int32_t j = 0; j < vgSz; j++) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { goto FAIL_SHUFFLE_DISPATCH; @@ -359,7 +365,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); if (numOfElems > 0) { - qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr, + qDebug("s-task:%s try to dispatch intermediate block to downstream, elem in outputQ:%d", pTask->id.idStr, numOfElems); } @@ -382,12 +388,12 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } pTask->msgInfo.pData = pBlock; - ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); + ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER); int32_t retryCount = 0; while (1) { - int32_t code = streamDispatchAllBlocks(pTask, pBlock); + int32_t code = doDispatchAllBlocks(pTask, pBlock); if (code == TSDB_CODE_SUCCESS) { break; } @@ -415,81 +421,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -static int32_t doDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, - SEpSet* pEpSet) { - void* buf = NULL; - int32_t code = -1; - SRpcMsg msg = {0}; - - int32_t tlen; - tEncodeSize(tEncodeStreamCheckpointReq, pReq, tlen, code); - if (code < 0) { - return -1; - } - - buf = rpcMallocCont(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - return -1; - } - - ((SMsgHead*)buf)->vgId = htonl(nodeId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - if ((code = tEncodeStreamCheckpointReq(&encoder, pReq)) < 0) { - rpcFreeCont(buf); - return code; - } - tEncoderClear(&encoder); - - initRpcMsg(&msg, TDMT_STREAM_TASK_CHECKPOINT, buf, tlen + sizeof(SMsgHead)); - qDebug("s-task:%s (level:%d, vgId:%d) dispatch checkpoint msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", - pTask->id.idStr, pTask->info.taskLevel, pTask->info.nodeId, pReq->streamId, pReq->downstreamTaskId, nodeId); - - tmsgSendReq(pEpSet, &msg); - return 0; -} - -int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t checkpointId) { - SStreamCheckpointReq req = { - .streamId = pTask->id.streamId, - .upstreamTaskId = pTask->id.taskId, - .upstreamNodeId = pTask->info.nodeId, - .downstreamNodeId = pTask->info.nodeId, - .downstreamTaskId = pTask->id.taskId, - .childId = pTask->info.selfChildId, - .checkpointId = checkpointId, - }; - - // serialize - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; - req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; - doDispatchCheckpointMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - - int32_t numOfVgs = taosArrayGetSize(vgInfo); - pTask->notReadyTasks = numOfVgs; - pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t)); - - qDebug("s-task:%s dispatch %d checkpoint msg to downstream", pTask->id.idStr, numOfVgs); - - for (int32_t i = 0; i < numOfVgs; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.downstreamNodeId = pVgInfo->vgId; - req.downstreamTaskId = pVgInfo->taskId; - doDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); - } - } else { // no need to dispatch msg to downstream task - qDebug("s-task:%s no down stream task, not dispatch checkpoint msg to downstream", pTask->id.idStr); - streamProcessCheckpointRsp(NULL, pTask); - } - - return 0; -} - int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { SStreamScanHistoryFinishReq req = {.streamId = pTask->id.streamId, .childId = pTask->info.selfChildId}; @@ -514,32 +445,31 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { } // this function is usually invoked by sink/agg task -int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask) { - int32_t num = taosArrayGetSize(pTask->pRpcMsgList); +int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { + int32_t num = taosArrayGetSize(pTask->pReadyMsgList); ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) == num); - qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr, - pTask->info.taskLevel, num); - for (int32_t i = 0; i < num; ++i) { - SRpcMsg* pMsg = taosArrayGet(pTask->pRpcMsgList, i); - tmsgSendRsp(pMsg); + SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i); + tmsgSendReq(&pInfo->epset, &pInfo->msg); + qDebug("s-task:%s level:%d checkpoint ready msg sent to upstream:0x%x", pTask->id.idStr, pTask->info.taskLevel, + pInfo->taskId); } - taosArrayClear(pTask->pRpcMsgList); - qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream", pTask->id.idStr, pTask->info.taskLevel); + taosArrayClear(pTask->pReadyMsgList); + qDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel, num); return TSDB_CODE_SUCCESS; } // this function is only invoked by source task, and send rsp to mnode int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosArrayGetSize(pTask->pRpcMsgList) == 1); - SRpcMsg* pMsg = taosArrayGet(pTask->pRpcMsgList, 0); + ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosArrayGetSize(pTask->pReadyMsgList) == 1); + SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, 0); - tmsgSendRsp(pMsg); + tmsgSendRsp(&pInfo->msg); - taosArrayClear(pTask->pRpcMsgList); + taosArrayClear(pTask->pReadyMsgList); qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); return TSDB_CODE_SUCCESS; @@ -550,7 +480,8 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->type) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; @@ -753,7 +684,7 @@ void doRetryDispatchData(void* param, void* tmrId) { SStreamTask* pTask = param; ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT); - int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); + int32_t code = doDispatchAllBlocks(pTask, pTask->msgInfo.pData); if (code != TSDB_CODE_SUCCESS) { qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); @@ -794,49 +725,73 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa tEncodeStreamCheckpointSourceRsp(&encoder, &rsp); tEncoderClear(&encoder); - SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; - taosArrayPush(pTask->pRpcMsgList, &rspMsg); + SStreamChkptReadyInfo info = {0}; + initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len); + info.msg.info = *pRpcInfo; - qDebug("s-task:%s add checkpoint rsp msg, total:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pTask->pRpcMsgList)); + if (pTask->pReadyMsgList == NULL) { + pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo)); + } + + taosArrayPush(pTask->pReadyMsgList, &info); + + qDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pTask->pReadyMsgList)); return TSDB_CODE_SUCCESS; } -int32_t streamAddCheckpointRspMsg(SStreamCheckpointReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) { - int32_t len = 0; - int32_t code = 0; - SEncoder encoder; +int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, int32_t index, int64_t checkpointId) { + int32_t code = 0; + int32_t tlen = 0; + void* buf = NULL; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + return TSDB_CODE_SUCCESS; + } - SStreamCheckpointRsp rsp = { - .checkpointId = pReq->checkpointId, - .downstreamTaskId = pReq->downstreamTaskId, - .downstreamNodeId = pReq->downstreamNodeId, - .streamId = pReq->streamId, - .upstreamTaskId = pReq->upstreamTaskId, - .upstreamNodeId = pReq->upstreamNodeId, - }; + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); - tEncodeSize(tEncodeStreamCheckpointRsp, &rsp, len, code); + SStreamCheckpointReadyMsg req = {0}; + req.downstreamNodeId = pTask->pMeta->vgId; + req.downstreamTaskId = pTask->id.taskId; + req.streamId = pTask->id.streamId; + req.checkpointId = checkpointId; + req.childId = pInfo->childId; + req.upstreamNodeId = pInfo->nodeId; + req.upstreamTaskId = pInfo->taskId; + + tEncodeSize(tEncodeStreamCheckpointReadyMsg, &req, tlen, code); if (code < 0) { + return -1; + } + + buf = rpcMallocCont(sizeof(SMsgHead) + tlen); + if (buf == NULL) { + return -1; + } + + ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeStreamCheckpointReadyMsg(&encoder, &req)) < 0) { + rpcFreeCont(buf); return code; } - - void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len); - if (pBuf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - ((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId); - - void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); - - tEncoderInit(&encoder, (uint8_t*)abuf, len); - tEncodeStreamCheckpointRsp(&encoder, &rsp); tEncoderClear(&encoder); - SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; - taosArrayPush(pTask->pRpcMsgList, &rspMsg); + ASSERT(req.upstreamTaskId != 0); - return TSDB_CODE_SUCCESS; + SStreamChkptReadyInfo info = {.taskId = pInfo->taskId, .epset = pInfo->epSet}; + initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead)); + qDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 ":0x%x (vgId:%d) idx:%d", + pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.downstreamNodeId, index); + + if (pTask->pReadyMsgList == NULL) { + pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo)); + } + + taosArrayPush(pTask->pReadyMsgList, &info); + return 0; } // todo record the idle time for dispatch data @@ -848,7 +803,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // happened too fast. todo handle the shuffle dispatch failure qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr, pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); - int32_t ret = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); + int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData); if (ret != TSDB_CODE_SUCCESS) { } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 983edfb30d..08dc59dfd2 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -57,8 +57,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return TSDB_CODE_SUCCESS; } -static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, - int32_t* totalBlocks) { +static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; @@ -413,9 +412,9 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_ const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); - } else if (pItem->type == STREAM_INPUT__CHECKPOINT) { + } else if (pItem->type == STREAM_INPUT__CHECKPOINT || pItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*)pInput; - qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT); + qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, pItem->type); } else { ASSERT(0); @@ -434,9 +433,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { SStreamQueueItem* pInput = NULL; // merge multiple input data if possible in the input queue. - qDebug("s-task:%s start to extract data block from inputQ", id); - - /*int32_t code = */ extractBlocksFromInputQ(pTask, &pInput, &batchSize, id); + extractBlocksFromInputQ(pTask, &pInput, &batchSize); if (pInput == NULL) { ASSERT(batchSize == 0); if (pTask->info.fillHistory && pTask->status.transferState) { @@ -451,8 +448,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { } int32_t type = pInput->type; - if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - ASSERT(getNumOfItemsInputQ(pTask) == 1); + + // dispatch checkpoint msg to all downstream tasks + if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + streamProcessCheckpointBlock(pTask, (SStreamDataBlock*)pInput); + continue; } if (pTask->info.taskLevel == TASK_LEVEL__SINK) { @@ -462,21 +462,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); continue; - } else { // pInput->type == STREAM_INPUT__CHECKPOINT, for sink task, do nothing. - ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); - pTask->status.taskStatus = TASK_STATUS__CK_READY; - return 0; } } - // dispatch checkpoint msg to all downstream tasks - if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - qDebug("s-task:%s start to dispatch checkpoint msg to downstream", id); - - streamTaskDispatchCheckpointMsg(pTask, pTask->checkpointingId); - return 0; - } - int64_t st = taosGetTimestampMs(); const SStreamQueueItem* pItem = pInput; @@ -565,7 +553,7 @@ int32_t streamTryExec(SStreamTask* pTask) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamTaskSendCheckpointSourceRsp(pTask); } else { - code = streamTaskSendCheckpointRsp(pTask); + code = streamTaskSendCheckpointReadyMsg(pTask); } if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 49b7e81c25..74709b29e8 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -167,13 +167,14 @@ SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* } #endif -int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id) { +int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) { int32_t retryTimes = 0; int32_t MAX_RETRY_TIMES = 5; + const char* id = pTask->id.idStr; while (1) { if (streamTaskShouldPause(&pTask->status)) { - qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks); + qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); return TSDB_CODE_SUCCESS; } @@ -181,17 +182,24 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i if (qItem == NULL) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { taosMsleep(10); - qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes); + qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id); continue; } - qDebug("===stream===break batchSize:%d", *numOfBlocks); + qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id); return TSDB_CODE_SUCCESS; } // do not merge blocks for sink node and check point data block if ((pTask->info.taskLevel == TASK_LEVEL__SINK) || (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER)) { + + if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + qDebug("s-task:%s checkpoint msg extracted, start to process immediately", id); + } else { + qDebug("s-task:%s sink task handle result block one-by-one", id); + } + *numOfBlocks = 1; *pInput = qItem; return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index b229078b44..1446198b16 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -132,7 +132,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { } } else { pTask->status.downstreamReady = 1; - qDebug("s-task:%s (vgId:%d) no downstream tasks, set downstream checked, try to launch scan-history-data, status:%s", + qDebug("s-task:%s (vgId:%d) set downstream checked since no downstream, try to launch scan-history-data, status:%s", pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus)); streamTaskLaunchScanHistory(pTask); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index fa99006210..e5135a43c7 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -242,7 +242,7 @@ void tFreeStreamTask(SStreamTask* pTask) { streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); } - pTask->pRpcMsgList = taosArrayDestroy(pTask->pRpcMsgList); + pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList); if (pTask->id.idStr != NULL) { taosMemoryFree((void*)pTask->id.idStr);