From de8ce3f7ec4cb575da8ca1bc9cda512636db13fd Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 20 Dec 2023 16:01:17 +0800 Subject: [PATCH] fix:send stream retrieve msg to source task --- include/libs/stream/tstream.h | 6 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 16 ++- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/stream.c | 24 ++-- source/libs/stream/src/streamDispatch.c | 128 ++++++++++++--------- source/libs/stream/src/streamExec.c | 2 +- 6 files changed, 101 insertions(+), 77 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f6737b4e27..090a43fdd8 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -743,7 +743,6 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); -void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq); void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); int32_t streamSetupScheduleTrigger(SStreamTask* pTask); @@ -751,7 +750,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); -int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); +int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq); SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); void streamTaskInputFail(SStreamTask* pTask); @@ -868,6 +867,9 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask); void* streamDestroyStateMachine(SStreamTaskSM* pSM); + +int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req); +void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp); #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index b3c4479efc..64c4d2af5d 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -331,15 +331,25 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { if (pTask == NULL) { tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, req.dstTaskId); + taosMemoryFree(req.pRetrieve); return -1; } + int32_t code = 0; + if(pTask->info.taskLevel == TASK_LEVEL__SOURCE){ + code = streamProcessRetrieveReq(pTask, &req); + }else{ + req.srcNodeId = pTask->info.nodeId; + req.srcTaskId = pTask->id.taskId; + code = broadcastRetrieveMsg(pTask, &req); + } + SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamProcessRetrieveReq(pTask, &req, &rsp); + sendRetrieveRsp(&req, &rsp); streamMetaReleaseTask(pMeta, pTask); - tDeleteStreamRetrieveReq(&req); - return 0; + taosMemoryFree(req.pRetrieve); + return code; } int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index f709741b57..0d4dbf1658 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -104,7 +104,7 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT void destroyStreamDataBlock(SStreamDataBlock* pBlock); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); -int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock); +int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock); int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 1bef42bf14..e50e373be0 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -172,7 +172,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp return status; } -int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { +int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) { SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock)); int8_t status = TASK_INPUT_STATUS__NORMAL; @@ -194,17 +194,6 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, /*status = TASK_INPUT_STATUS__FAILED;*/ } - // rsp by input status - void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp)); - ((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId); - SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); - pCont->streamId = pReq->streamId; - pCont->rspToTaskId = pReq->srcTaskId; - pCont->rspFromTaskId = pReq->dstTaskId; - pRsp->pCont = buf; - pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp); - tmsgSendRsp(pRsp); - return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; } @@ -269,11 +258,12 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S return 0; } -int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { - streamTaskEnqueueRetrieve(pTask, pReq, pRsp); - ASSERT(pTask->info.taskLevel != TASK_LEVEL__SINK); - streamSchedExec(pTask); - return 0; +int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) { + int32_t code = streamTaskEnqueueRetrieve(pTask, pReq); + if(code != 0){ + return code; + } + return streamSchedExec(pTask); } void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5fb7db233f..1e037ee879 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -162,16 +162,71 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) { return 0; } -void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); } +void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){ + void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp)); + ((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId); + SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); + pCont->streamId = pReq->streamId; + pCont->rspToTaskId = pReq->srcTaskId; + pCont->rspFromTaskId = pReq->dstTaskId; + pRsp->pCont = buf; + pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp); + tmsgSendRsp(pRsp); +} -int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) { - int32_t code = -1; +int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req){ + int32_t code = 0; + void* buf = NULL; + int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList); + ASSERT(sz > 0); + for (int32_t i = 0; i < sz; i++) { + req->reqId = tGenIdPI64(); + SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + req->dstNodeId = pEpInfo->nodeId; + req->dstTaskId = pEpInfo->taskId; + int32_t len; + tEncodeSize(tEncodeStreamRetrieveReq, req, len, code); + if (code != 0) { + ASSERT(0); + return code; + } + + buf = rpcMallocCont(sizeof(SMsgHead) + len); + if (buf == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + + ((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + SEncoder encoder; + tEncoderInit(&encoder, abuf, len); + tEncodeStreamRetrieveReq(&encoder, req); + tEncoderClear(&encoder); + + SRpcMsg rpcMsg = {0}; + initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE, buf, len + sizeof(SMsgHead)); + + code = tmsgSendReq(&pEpInfo->epSet, &rpcMsg); + if (code != 0) { + ASSERT(0); + rpcFreeCont(buf); + return code; + } + + buf = NULL; + stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, + pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId); + } + return code; +} + +static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req){ SRetrieveTableRsp* pRetrieve = NULL; - void* buf = NULL; int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); pRetrieve = taosMemoryCalloc(1, dataStrLen); - if (pRetrieve == NULL) return -1; + if (pRetrieve == NULL) return TSDB_CODE_OUT_OF_MEMORY; int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); pRetrieve->useconds = 0; @@ -187,57 +242,24 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); - SStreamRetrieveReq req = { - .streamId = pTask->id.streamId, - .srcNodeId = pTask->info.nodeId, - .srcTaskId = pTask->id.taskId, - .pRetrieve = pRetrieve, - .retrieveLen = dataStrLen, - }; + req->streamId = pTask->id.streamId; + req->srcNodeId = pTask->info.nodeId; + req->srcTaskId = pTask->id.taskId; + req->pRetrieve = pRetrieve; + req->retrieveLen = dataStrLen; + return 0; +} - int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList); - ASSERT(sz > 0); - for (int32_t i = 0; i < sz; i++) { - req.reqId = tGenIdPI64(); - SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); - req.dstNodeId = pEpInfo->nodeId; - req.dstTaskId = pEpInfo->taskId; - int32_t len; - tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code); - if (code < 0) { - ASSERT(0); - return -1; - } - - buf = rpcMallocCont(sizeof(SMsgHead) + len); - if (buf == NULL) { - goto CLEAR; - } - - ((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - SEncoder encoder; - tEncoderInit(&encoder, abuf, len); - tEncodeStreamRetrieveReq(&encoder, &req); - tEncoderClear(&encoder); - - SRpcMsg rpcMsg = {0}; - initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE, buf, len + sizeof(SMsgHead)); - - if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) { - ASSERT(0); - goto CLEAR; - } - - buf = NULL; - stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, - pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId); +int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock) { + SStreamRetrieveReq req; + int32_t code = buildStreamRetrieveReq(pTask, pBlock, &req); + if(code != 0){ + return code; } - code = 0; -CLEAR: - taosMemoryFree(pRetrieve); - rpcFreeCont(buf); + code = broadcastRetrieveMsg(pTask, &req); + taosMemoryFree(req.pRetrieve); + return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 25f32195be..f7e4b7856d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -144,7 +144,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i } if (output->info.type == STREAM_RETRIEVE) { - if (streamBroadcastToChildren(pTask, output) < 0) { + if (streamBroadcastToUpTasks(pTask, output) < 0) { // TODO } continue;