From dcb91082f9aa1c047b36909412ae980ca3e05b4e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 23:37:40 +0800 Subject: [PATCH] fix(stream): fix error. --- source/dnode/snode/src/snode.c | 6 ++---- source/libs/stream/src/stream.c | 4 ++-- source/libs/stream/src/streamDispatch.c | 20 ++++++++------------ source/libs/stream/src/streamExec.c | 7 ++----- 4 files changed, 14 insertions(+), 23 deletions(-) diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index ec4500aee6..e9225f3d6e 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -223,11 +223,9 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { tDecoderClear(&decoder); int32_t taskId = req.dstTaskId; SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + if (pTask) { - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, - }; + SRpcMsg rsp = { .info = pMsg->info, .code = 0}; streamProcessRetrieveReq(pTask, &req, &rsp); streamMetaReleaseTask(pSnode->pMeta, pTask); tDeleteStreamRetrieveReq(&req); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index e33e7845bb..8cc1ef1dd3 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -161,7 +161,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, // enqueue if (pData != NULL) { - qDebug("task %d(child %d) recv retrieve req from task %d, reqId %" PRId64, pTask->id.taskId, pTask->selfChildId, + qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x, reqId %" PRId64, pTask->id.idStr, pTask->selfChildId, pReq->srcTaskId, pReq->reqId); pData->type = STREAM_INPUT__DATA_RETRIEVE; @@ -271,7 +271,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { } int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { - qDebug("s-task:%s receive retrieve req from node %d taskId:%d", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId); + qDebug("s-task:%s receive retrieve req from node %d taskId:0x%x", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId); streamTaskEnqueueRetrieve(pTask, pReq, pRsp); ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 722013115f..401a8b9e74 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -138,7 +138,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i); req.dstNodeId = pEpInfo->nodeId; req.dstTaskId = pEpInfo->taskId; - int32_t code; int32_t len; tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code); if (code < 0) { @@ -158,23 +157,18 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) tEncodeStreamRetrieveReq(&encoder, &req); tEncoderClear(&encoder); - SRpcMsg rpcMsg = { - .code = 0, - .msgType = TDMT_STREAM_RETRIEVE, - .pCont = buf, - .contLen = sizeof(SMsgHead) + len, - }; - + SRpcMsg rpcMsg = { .code = 0, .msgType = TDMT_STREAM_RETRIEVE, .pCont = buf, .contLen = sizeof(SMsgHead) + len }; if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) { ASSERT(0); goto CLEAR; } - buf = NULL; - qDebug("s-task:%s (child %d) send retrieve req to task %d at node %d, reqId %" PRId64, pTask->id.idStr, + buf = NULL; + qDebug("s-task:%s (child %d) send retrieve req to task %d at node %d, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId); } code = 0; + CLEAR: taosMemoryFree(pRetrieve); rpcFreeCont(buf); @@ -400,12 +394,14 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat if (req.data == NULL || req.dataLen == NULL) { taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroy(req.dataLen); - return code; + return TSDB_CODE_OUT_OF_MEMORY; } for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); - if (streamAddBlockIntoDispatchMsg(pDataBlock, &req) < 0) { + code = streamAddBlockIntoDispatchMsg(pDataBlock, &req); + + if (code != TSDB_CODE_SUCCESS) { taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroy(req.dataLen); return code; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c7420b0bef..55474541ed 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -108,7 +108,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i block.info.type = STREAM_PULL_OVER; block.info.childId = pTask->selfChildId; taosArrayPush(pRes, &block); - + numOfBlocks += 1; qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId, pRetrieveBlock->reqId); } @@ -152,14 +152,11 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i if (numOfBlocks > 0) { ASSERT(numOfBlocks == taosArrayGetSize(pRes)); code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); - if (code != TSDB_CODE_SUCCESS) { - return code; - } } else { taosArrayDestroy(pRes); } - return 0; + return code; } int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {