diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a3b0d9c18e..25f90961a3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -474,6 +474,7 @@ typedef struct { int64_t checkpointId; int32_t taskId; int32_t nodeId; + int32_t mnodeId; int64_t expireTime; } SStreamCheckpointSourceReq; @@ -482,6 +483,7 @@ typedef struct { int64_t checkpointId; int32_t taskId; int32_t nodeId; + int32_t mnodeId; int64_t expireTime; } SStreamCheckpointSourceRsp; @@ -500,7 +502,7 @@ typedef struct { int32_t upstreamTaskId; int32_t upstreamNodeId; int32_t childId; -} SStreamTaskCheckpointReq; +} SStreamCheckpointReq; typedef struct { SMsgHead msgHead; @@ -511,13 +513,13 @@ typedef struct { int32_t upstreamTaskId; int32_t upstreamNodeId; int32_t childId; -} SStreamTaskCheckpointRsp; +} SStreamCheckpointRsp; -int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq); -int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq); +int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointReq* pReq); +int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pReq); -int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamTaskCheckpointRsp* pRsp); -int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamTaskCheckpointRsp* pRsp); +int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp); +int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp); typedef struct { int64_t streamId; @@ -613,20 +615,22 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); -int32_t streamDoCheckpoint(SStreamMeta* streamMeta); - int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); -int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointReq* pReq); +int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq); int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); +int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, + SStreamTask* pTask); +int32_t streamAddCheckpointRspMsg(SStreamCheckpointReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e2df35784b..84d2da4f97 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -819,7 +819,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } - pTask->pRpcMsgList = taosArrayInit(4, POINTER_BYTES); + pTask->pRpcMsgList = taosArrayInit(4, sizeof(SRpcMsg)); // sink if (pTask->outputType == TASK_OUTPUT__SMA) { @@ -1516,16 +1516,18 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { // todo handle this bug: task not in ready state. SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.taskId); if (pTask == NULL) { - tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, req.taskId); + tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, + req.taskId); goto FAIL; } - // backup the rpchandle for rsp - SRpcMsg* pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg)); - memcpy(pRpcMsg, (SRpcMsg*)pMsg, sizeof(SRpcMsg)); - taosArrayPush(pTask->pRpcMsgList, &pRpcMsg); + code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); + if (code != TSDB_CODE_SUCCESS) { + goto FAIL; + } // todo: when generating checkpoint, no new tasks are allowed to add into current Vnode + // todo: when generating checkpoint, leader of mnode has transfer to other DNode? // set the initial value for generating check point int32_t total = 0; @@ -1544,7 +1546,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pTask); return code; - FAIL: +FAIL: return code; } @@ -1555,7 +1557,7 @@ int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; - SStreamTaskCheckpointReq req = {0}; + SStreamCheckpointReq req = {0}; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); @@ -1572,6 +1574,11 @@ int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg) { goto FAIL; } + code = streamAddCheckpointRspMsg(&req, &pMsg->info, pTask); + if (code != TSDB_CODE_SUCCESS) { + goto FAIL; + } + streamProcessCheckpointReq(pMeta, pTask, &req); streamMetaReleaseTask(pMeta, pTask); return code; @@ -1591,7 +1598,7 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t code = 0; - SStreamTaskCheckpointRsp req = {0}; + SStreamCheckpointRsp req = {0}; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 4f9880cd23..af82baa6d9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -165,6 +165,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; + } else if (type == STREAM_INPUT__CHECKPOINT) { + taosArrayPush(pInfo->pBlockLists, input); + pInfo->blockType = STREAM_INPUT__CHECKPOINT; } else { ASSERT(0); } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 47b03914b8..5de5bea7fd 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -50,9 +50,9 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); -int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet); +int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId); -int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask, int32_t vgId); +int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a31e0a5676..5ba8beb467 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -21,6 +21,7 @@ int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckp if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; @@ -32,6 +33,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1; tEndDecode(pDecoder); return 0; @@ -59,7 +61,7 @@ int32_t tDecodeStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSo return 0; } -int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { +int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; @@ -72,7 +74,7 @@ int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpoi return pEncoder->pos; } -int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) { +int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; @@ -85,7 +87,7 @@ int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* return 0; } -int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamTaskCheckpointRsp* pRsp) { +int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1; @@ -98,7 +100,7 @@ int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamTaskCheckpoi return pEncoder->pos; } -int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamTaskCheckpointRsp* pRsp) { +int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1; @@ -124,7 +126,7 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, i } static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t checkpointId) { - SStreamTaskCheckpointReq req = { + SStreamCheckpointReq req = { .streamId = pTask->id.streamId, .upstreamTaskId = pTask->id.taskId, .upstreamNodeId = pTask->info.nodeId, @@ -208,7 +210,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, return code; } -int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointReq* pReq) { +int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq) { int32_t code; int64_t checkpointId = pReq->checkpointId; int32_t childId = pReq->childId; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index dbbb6b0db3..939737ee2d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -415,7 +415,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpointReq* pReq, int32_t nodeId, +int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; @@ -476,70 +476,36 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { // this function is usually invoked by sink/agg task int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId) { -// int32_t code = 0; -// int32_t len; -// -// // todo set upstreamTaskId Info -// const SStreamTaskCheckpointRsp rsp = { -// .streamId = pTask->id.streamId, .downstreamTaskId = pTask->id.taskId, .downstreamNodeId = vgId}; -// -// SEncoder encoder; -// tEncodeSize(tEncodeStreamCheckpointRsp, &rsp, len, code); -// if (code < 0) { -// qError("vgId:%d failed to encode checkpoint rsp, task:0x%x", vgId, rsp.downstreamTaskId); -// return -1; -// } -// -// void* buf = rpcMallocCont(sizeof(SMsgHead) + len); -// ((SMsgHead*)buf)->vgId = htonl(rsp.upstreamNodeId); -// -// void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); -// tEncoderInit(&encoder, (uint8_t*)abuf, len); -// tEncodeStreamCheckpointRsp(&encoder, &rsp); -// tEncoderClear(&encoder); - int32_t num = taosArrayGetSize(pTask->pRpcMsgList); ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) == num); qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr, pTask->info.taskLevel, num); + for(int32_t i = 0; i < num; ++i) { - SRpcMsg* pMsg = taosArrayGetP(pTask->pRpcMsgList, 0); + SRpcMsg* pMsg = taosArrayGet(pTask->pRpcMsgList, i); tmsgSendRsp(pMsg); } + int8_t prev = pTask->status.taskStatus; + pTask->status.taskStatus = TASK_STATUS__NORMAL; + qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream, set status:%s, prev:%s", pTask->id.idStr, + pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), streamGetTaskStatusStr(prev)); + return TSDB_CODE_SUCCESS; } // this function is only invoked by source task, and send rsp to mnode -int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask, int32_t vgId) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); -// int32_t code = 0; -// int32_t len; -// SEncoder encoder; -// SStreamMeta* pMeta = pTask->pMeta; -// const SStreamCheckpointSourceRsp rsp = { -// .streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId, .checkpointId = pMeta->checkpointId}; -// -// tEncodeSize(tEncodeStreamCheckpointSourceRsp, &rsp, len, code); -// if (code < 0) { -// qError("vgId:%d failed to encode source checkpoint rsp, task:0x%x", vgId, pTask->id.taskId); -// return -1; -// } -// -// void* buf = rpcMallocCont(sizeof(SMsgHead) + len); -// ((SMsgHead*)buf)->vgId = htonl(rsp.nodeId); -// -// void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); -// tEncoderInit(&encoder, (uint8_t*)abuf, len); -// tEncodeStreamCheckpointSourceRsp(&encoder, &rsp); -// tEncoderClear(&encoder); -// SRpcMsg rspMsg = *pTask->rpcMsg; +int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { + ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosArrayGetSize(pTask->pRpcMsgList) == 1); + SRpcMsg* pMsg = taosArrayGet(pTask->pRpcMsgList, 0); - ASSERT(taosArrayGetSize(pTask->pRpcMsgList) == 0); + tmsgSendRsp(pMsg); - qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); - tmsgSendRsp(taosArrayGetP(pTask->pRpcMsgList, 0)); + int8_t prev = pTask->status.taskStatus; + pTask->status.taskStatus = TASK_STATUS__NORMAL; + qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode, set status:%s, prev:%s", pTask->id.idStr, + pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), streamGetTaskStatusStr(prev)); return TSDB_CODE_SUCCESS; } @@ -598,7 +564,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch } int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, - SEpSet* pEpSet) { + SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; SRpcMsg msg = {0}; @@ -759,3 +725,80 @@ void doRetryDispatchData(void* param, void* tmrId) { streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } } + +int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, + SStreamTask* pTask) { + int32_t len = 0; + int32_t code = 0; + SEncoder encoder; + + SStreamCheckpointSourceRsp rsp = { + .checkpointId = pReq->checkpointId, + .taskId = pReq->taskId, + .nodeId = pReq->nodeId, + .streamId = pReq->streamId, + .expireTime = pReq->expireTime, + .mnodeId = pReq->mnodeId, + }; + + tEncodeSize(tEncodeStreamCheckpointSourceRsp, &rsp, len, code); + if (code < 0) { + return code; + } + + void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len); + if (pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SMsgHead*)pBuf)->vgId = htonl(pReq->mnodeId); + + void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); + + tEncoderInit(&encoder, (uint8_t*)abuf, len); + tEncodeStreamCheckpointSourceRsp(&encoder, &rsp); + tEncoderClear(&encoder); + + SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; + taosArrayPush(pTask->pRpcMsgList, &rspMsg); + + return TSDB_CODE_SUCCESS; +} + +int32_t streamAddCheckpointRspMsg(SStreamCheckpointReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) { + int32_t len = 0; + int32_t code = 0; + SEncoder encoder; + + SStreamCheckpointRsp rsp = { + .checkpointId = pReq->checkpointId, + .downstreamTaskId = pReq->downstreamTaskId, + .downstreamNodeId = pReq->downstreamNodeId, + .streamId = pReq->streamId, + .upstreamTaskId = pReq->upstreamTaskId, + .upstreamNodeId = pReq->upstreamNodeId, + }; + + tEncodeSize(tEncodeStreamCheckpointRsp, &rsp, len, code); + if (code < 0) { + return code; + } + + void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len); + if (pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamTaskId); + + void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); + + tEncoderInit(&encoder, (uint8_t*)abuf, len); + tEncodeStreamCheckpointRsp(&encoder, &rsp); + tEncoderClear(&encoder); + + SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; + taosArrayPush(pTask->pRpcMsgList, &rspMsg); + + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 0a3b5c15b2..dc4437eb97 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -546,7 +546,7 @@ int32_t streamTryExec(SStreamTask* pTask) { // send check point response to upstream task if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - streamTaskSendCheckpointSourceRsp(pTask, pMeta->vgId); + streamTaskSendCheckpointSourceRsp(pTask); } else { streamTaskSendCheckpointRsp(pTask, pMeta->vgId); }