From f74fad7e60d696363a52cb923e6b402cda826f05 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 19 May 2024 17:17:40 +0800 Subject: [PATCH] fix(query): fix error in refactor. --- source/libs/stream/src/streamData.c | 1 - source/libs/stream/src/streamDispatch.c | 11 ++++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 5f3c6f671b..fa4efc3c6e 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -33,7 +33,6 @@ SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pRe } ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen))); - for (int32_t i = 0; i < blockNum; i++) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index e688762aee..f9bd0bbfc0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -130,9 +130,9 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req){ SRetrieveTableRsp* pRetrieve = NULL; - int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN; + int32_t len = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN; - pRetrieve = taosMemoryCalloc(1, dataStrLen); + pRetrieve = taosMemoryCalloc(1, len); if (pRetrieve == NULL) return TSDB_CODE_OUT_OF_MEMORY; int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -148,7 +148,6 @@ static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBl pRetrieve->version = htobe64(pBlock->info.version); int32_t actualLen = blockEncode(pBlock, pRetrieve->data+ PAYLOAD_PREFIX_LEN, numOfCols); - SET_PAYLOAD_LEN(pRetrieve->data, actualLen, actualLen); int32_t payloadLen = actualLen + PAYLOAD_PREFIX_LEN; @@ -160,7 +159,7 @@ static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBl req->srcNodeId = pTask->info.nodeId; req->srcTaskId = pTask->id.taskId; req->pRetrieve = pRetrieve; - req->retrieveLen = dataStrLen; + req->retrieveLen = len; return 0; } @@ -700,7 +699,9 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch pRetrieve->payloadLen = htonl(payloadLen); pRetrieve->compLen = htonl(payloadLen); - taosArrayPush(pReq->dataLen, &actualLen); + payloadLen += sizeof(SRetrieveTableRsp); + + taosArrayPush(pReq->dataLen, &payloadLen); taosArrayPush(pReq->data, &buf); pReq->totalLen += dataStrLen;