From 1e5098f676713ce50daa80733cd722aec5b66031 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 19 May 2024 13:32:30 +0800 Subject: [PATCH] fix(query): fix bugs caused by refactor. --- include/common/tmsg.h | 8 ++++++++ include/libs/command/command.h | 8 -------- source/libs/command/src/explain.c | 2 +- source/libs/stream/src/streamDispatch.c | 12 ++++++++++-- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e583920357..b5c0a84063 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2129,6 +2129,14 @@ typedef struct { char data[]; } SRetrieveTableRsp; +#define PAYLOAD_PREFIX_LEN ((sizeof(int32_t)) << 1) + +#define SET_PAYLOAD_LEN(_p, _compLen, _fullLen) \ + do { \ + ((int32_t*)(_p))[0] = (_compLen); \ + ((int32_t*)(_p))[1] = (_fullLen); \ + } while (0); + typedef struct { int64_t version; int64_t numOfRows; diff --git a/include/libs/command/command.h b/include/libs/command/command.h index 47c106dc51..b788b03386 100644 --- a/include/libs/command/command.h +++ b/include/libs/command/command.h @@ -20,14 +20,6 @@ #include "plannodes.h" #include "tmsg.h" -#define PAYLOAD_PREFIX_LEN ((sizeof(int32_t)) << 1) - -#define SET_PAYLOAD_LEN(_p, _compLen, _fullLen) \ - do { \ - ((int32_t*)(_p))[0] = (_compLen); \ - ((int32_t*)(_p))[1] = (_fullLen); \ - } while (0); - typedef struct SExplainCtx SExplainCtx; int32_t qExecCommand(int64_t* pConnId, bool sysInfoUser, SNode *pStmt, SRetrieveTableRsp **pRsp, int8_t biMode); diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 3333f22e10..7081901db4 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -1941,7 +1941,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { pBlock->info.rows = rowNum; - int32_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + sizeof(int32_t)*2; + int32_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN; SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize); if (NULL == rsp) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 58c6e19581..c9779919a4 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -129,7 +129,8 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req){ SRetrieveTableRsp* pRetrieve = NULL; - int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); + + int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN; pRetrieve = taosMemoryCalloc(1, dataStrLen); if (pRetrieve == NULL) return TSDB_CODE_OUT_OF_MEMORY; @@ -146,7 +147,14 @@ static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBl pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->version = htobe64(pBlock->info.version); - int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); + int32_t actualLen = blockEncode(pBlock, pRetrieve->data+ PAYLOAD_PREFIX_LEN, numOfCols); + + SET_PAYLOAD_LEN(pRetrieve->data, actualLen, actualLen); + + int32_t payloadLen = actualLen + PAYLOAD_PREFIX_LEN; + pRetrieve->payloadLen = htonl(payloadLen); + pRetrieve->compLen = htonl(payloadLen); + pRetrieve->compressed = 0; req->streamId = pTask->id.streamId; req->srcNodeId = pTask->info.nodeId;