diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index e06c7e63ac..21dd930c8f 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -196,7 +196,8 @@ typedef struct SReqResultInfo { bool localResultFetched; bool completed; bool convertUcs4; - char* decompressBuf; + char* decompBuf; + int32_t decompBufSize; int32_t precision; int32_t payloadLen; char* convertJson; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 38a895e4fd..1ee771b619 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -370,7 +370,7 @@ void doFreeReqResultInfo(SReqResultInfo *pResInfo) { taosMemoryFreeClear(pResInfo->fields); taosMemoryFreeClear(pResInfo->userFields); taosMemoryFreeClear(pResInfo->convertJson); - taosMemoryFreeClear(pResInfo->decompressBuf); + taosMemoryFreeClear(pResInfo->decompBuf); if (pResInfo->convertBuf != NULL) { for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 6f1c3fccd7..7c5aeeb375 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2202,24 +2202,28 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR if (pRsp->compressed) { int32_t payloadLen = htonl(pRsp->payloadLen); - if (pResultInfo->decompressBuf == NULL) { - pResultInfo->decompressBuf = taosMemoryMalloc(payloadLen); + if (pResultInfo->decompBuf == NULL) { + pResultInfo->decompBuf = taosMemoryMalloc(payloadLen); + pResultInfo->decompBufSize = payloadLen; } else { - char* p = taosMemoryRealloc(pResultInfo->decompressBuf, payloadLen); - if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tscError("failed to prepare the decompress buffer, size:%d", payloadLen); - return terrno; - } + if (pResultInfo->decompBufSize < payloadLen) { + char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen); + if (p == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tscError("failed to prepare the decompress buffer, size:%d", payloadLen); + return terrno; + } - pResultInfo->decompressBuf = p; + pResultInfo->decompBuf = p; + pResultInfo->decompBufSize = payloadLen; + } } - int32_t len = tsDecompressString((void*)pRsp->data, htonl(pRsp->compLen), 1, pResultInfo->decompressBuf, payloadLen, + int32_t len = tsDecompressString((void*)pRsp->data, htonl(pRsp->compLen), 1, pResultInfo->decompBuf, payloadLen, ONE_STAGE_COMP, NULL, 0); ASSERT(len == payloadLen); - pResultInfo->pData = pResultInfo->decompressBuf; + pResultInfo->pData = pResultInfo->decompBuf; pResultInfo->payloadLen = payloadLen; } else { pResultInfo->pData = (void*)pRsp->data; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 72d0af1726..dff92eb52d 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -41,6 +41,8 @@ typedef struct SSourceDataInfo { SArray* pSrcUidList; int32_t srcOpType; bool tableSeq; + char* decompBuf; + int32_t decompBufSize; } SSourceDataInfo; static void destroyExchangeOperatorInfo(void* param); @@ -370,7 +372,10 @@ void freeBlock(void* pParam) { void freeSourceDataInfo(void* p) { SSourceDataInfo* pInfo = (SSourceDataInfo*)p; + taosMemoryFreeClear(pInfo->decompBuf); taosMemoryFreeClear(pInfo->pRsp); + + pInfo->decompBufSize = 0; } void doDestroyExchangeOperatorInfo(void* param) { @@ -675,14 +680,24 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa char* pStart = pRetrieveRsp->data; int32_t index = 0; int32_t code = 0; - char* p = NULL; if (pRetrieveRsp->compressed) { // decompress the data - p = taosMemoryMalloc(pRetrieveRsp->payloadLen); - int32_t t = tsDecompressString(pRetrieveRsp->data, pRetrieveRsp->compLen, 1, p, pRetrieveRsp->payloadLen, - ONE_STAGE_COMP, NULL, 0); + if (pDataInfo->decompBuf == NULL) { + pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen); + pDataInfo->decompBufSize = pRetrieveRsp->payloadLen; + } else { + if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) { + char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen); + if (p != NULL) { + pDataInfo->decompBuf = p; + pDataInfo->decompBufSize = pRetrieveRsp->payloadLen; + } + } + } + int32_t t = tsDecompressString(pRetrieveRsp->data, pRetrieveRsp->compLen, 1, pDataInfo->decompBuf, + pRetrieveRsp->payloadLen, ONE_STAGE_COMP, NULL, 0); ASSERT(t == pRetrieveRsp->payloadLen); - pStart = p; + pStart = pDataInfo->decompBuf; } while (index++ < pRetrieveRsp->numOfBlocks) { @@ -703,7 +718,6 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa taosArrayPush(pExchangeInfo->pResultBlockList, &pb); } - taosMemoryFreeClear(p); return code; }