diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 51784b345b..297c87ab40 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -49,6 +49,8 @@ typedef struct SDataDispatchHandle { bool queryEnd; uint64_t useconds; uint64_t cachedSize; + void* pCompressBuf; + int32_t bufSize; TdThreadMutex mutex; } SDataDispatchHandle; @@ -83,10 +85,26 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn { if ((pBuf->allocSize > tsCompressMsgSize) && (tsCompressMsgSize > 0) && pHandle->pManager->cfg.compress) { - char* p = taosMemoryMalloc(pBuf->allocSize); + if (pHandle->pCompressBuf == NULL) { + // allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size + pHandle->pCompressBuf = taosMemoryMalloc(pBuf->allocSize + 8); + pHandle->bufSize = pBuf->allocSize + 8; + } else { + if (pHandle->bufSize < pBuf->allocSize + 8) { + pHandle->bufSize = pBuf->allocSize + 8; + void* p = taosMemoryRealloc(pHandle->pCompressBuf, pHandle->bufSize); + if (p != NULL) { + pHandle->pCompressBuf = p; + } else { + terrno = TSDB_CODE_OUT_OF_MEMORY; + qError("failed to prepare compress buf:%d, code: out of memory", pHandle->bufSize); + return; + } + } + } - int32_t dataLen = blockEncode(pInput->pData, p, numOfCols); - int32_t len = tsCompressString(p, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0); + int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, numOfCols); + int32_t len = tsCompressString(pHandle->pCompressBuf, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0); if (len < dataLen) { pEntry->compressed = 1; pEntry->dataLen = len; @@ -95,9 +113,8 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn pEntry->compressed = 0; pEntry->dataLen = dataLen; pEntry->rawLen = dataLen; - memcpy(pEntry->data, p, dataLen); + memcpy(pEntry->data, pHandle->pCompressBuf, dataLen); } - taosMemoryFree(p); } else { pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols); pEntry->rawLen = pEntry->dataLen; @@ -247,6 +264,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize); taosMemoryFreeClear(pDispatcher->nextOutput.pData); + while (!taosQueueEmpty(pDispatcher->pDataBlocks)) { SDataDispatchBuf* pBuf = NULL; taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); @@ -255,7 +273,11 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { taosFreeQitem(pBuf); } } + taosCloseQueue(pDispatcher->pDataBlocks); + taosMemoryFreeClear(pDispatcher->pCompressBuf); + pDispatcher->bufSize = 0; + taosThreadMutexDestroy(&pDispatcher->mutex); taosMemoryFree(pDispatcher->pManager); return TSDB_CODE_SUCCESS;