enh(query): allocate the compress buffer before compress data.

This commit is contained in:
Haojun Liao 2024-06-04 17:47:50 +08:00
parent 6d7e1eb576
commit 2ff79c81d7
1 changed files with 27 additions and 5 deletions

View File

@ -49,6 +49,8 @@ typedef struct SDataDispatchHandle {
bool queryEnd;
uint64_t useconds;
uint64_t cachedSize;
void* pCompressBuf;
int32_t bufSize;
TdThreadMutex mutex;
} SDataDispatchHandle;
@ -83,10 +85,26 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
{
if ((pBuf->allocSize > tsCompressMsgSize) && (tsCompressMsgSize > 0) && pHandle->pManager->cfg.compress) {
char* p = taosMemoryMalloc(pBuf->allocSize);
if (pHandle->pCompressBuf == NULL) {
// allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size
pHandle->pCompressBuf = taosMemoryMalloc(pBuf->allocSize + 8);
pHandle->bufSize = pBuf->allocSize + 8;
} else {
if (pHandle->bufSize < pBuf->allocSize + 8) {
pHandle->bufSize = pBuf->allocSize + 8;
void* p = taosMemoryRealloc(pHandle->pCompressBuf, pHandle->bufSize);
if (p != NULL) {
pHandle->pCompressBuf = p;
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qError("failed to prepare compress buf:%d, code: out of memory", pHandle->bufSize);
return;
}
}
}
int32_t dataLen = blockEncode(pInput->pData, p, numOfCols);
int32_t len = tsCompressString(p, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0);
int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, numOfCols);
int32_t len = tsCompressString(pHandle->pCompressBuf, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0);
if (len < dataLen) {
pEntry->compressed = 1;
pEntry->dataLen = len;
@ -95,9 +113,8 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pEntry->compressed = 0;
pEntry->dataLen = dataLen;
pEntry->rawLen = dataLen;
memcpy(pEntry->data, p, dataLen);
memcpy(pEntry->data, pHandle->pCompressBuf, dataLen);
}
taosMemoryFree(p);
} else {
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
pEntry->rawLen = pEntry->dataLen;
@ -247,6 +264,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize);
taosMemoryFreeClear(pDispatcher->nextOutput.pData);
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
@ -255,7 +273,11 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
taosFreeQitem(pBuf);
}
}
taosCloseQueue(pDispatcher->pDataBlocks);
taosMemoryFreeClear(pDispatcher->pCompressBuf);
pDispatcher->bufSize = 0;
taosThreadMutexDestroy(&pDispatcher->mutex);
taosMemoryFree(pDispatcher->pManager);
return TSDB_CODE_SUCCESS;