From 0fe2686b635412f93b4fa44e73069356a9c7653a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 19 May 2024 14:55:51 +0800 Subject: [PATCH] fix(query): fix bugs caused by refactor. --- source/libs/qworker/src/qworker.c | 6 +++--- source/libs/stream/src/streamData.c | 18 +++++++++++++++++- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 8efe42e9bc..93c5628b71 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -343,13 +343,13 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, // Got data from sink QW_TASK_DLOG("there are data in sink, dataLength:%" PRId64 "", len); - *dataLen += len + sizeof(int32_t) * 2; - *pRawDataLen += rawLen + sizeof(int32_t) * 2; + *dataLen += len + PAYLOAD_PREFIX_LEN; + *pRawDataLen += rawLen + PAYLOAD_PREFIX_LEN; QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &pRsp)); // set the serialize start position - output.pData = pRsp->data + *dataLen - (len + sizeof(int32_t) * 2); + output.pData = pRsp->data + *dataLen - (len + PAYLOAD_PREFIX_LEN); ((int32_t*) output.pData)[0] = len; ((int32_t*) output.pData)[1] = rawLen; diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 309234179c..5f3c6f671b 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -37,7 +37,23 @@ SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pRe for (int32_t i = 0; i < blockNum; i++) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i); - blockDecode(pDataBlock, pRetrieve->data); + + int32_t compLen = *(int32_t*)pRetrieve->data; + int32_t fullLen = *(int32_t*)(pRetrieve->data + sizeof(int32_t)); + + char* pInput = pRetrieve->data + PAYLOAD_PREFIX_LEN; + if (pRetrieve->compressed && compLen < fullLen) { + char* p = taosMemoryMalloc(fullLen); + int32_t len = tsDecompressString(pInput, compLen, 1, p, fullLen, ONE_STAGE_COMP, NULL, 0); + ASSERT(len == fullLen); + pInput = p; + } + + blockDecode(pDataBlock, pInput); + + if (pRetrieve->compressed && compLen < fullLen) { + taosMemoryFree(pInput); + } // TODO: refactor pDataBlock->info.window.skey = be64toh(pRetrieve->skey);