refactor(query): opt memory prepare.

This commit is contained in:
Haojun Liao 2024-05-17 17:51:41 +08:00
parent 524549a08a
commit a2692dbba4
4 changed files with 38 additions and 19 deletions

View File

@ -196,7 +196,8 @@ typedef struct SReqResultInfo {
bool localResultFetched; bool localResultFetched;
bool completed; bool completed;
bool convertUcs4; bool convertUcs4;
char* decompressBuf; char* decompBuf;
int32_t decompBufSize;
int32_t precision; int32_t precision;
int32_t payloadLen; int32_t payloadLen;
char* convertJson; char* convertJson;

View File

@ -370,7 +370,7 @@ void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
taosMemoryFreeClear(pResInfo->fields); taosMemoryFreeClear(pResInfo->fields);
taosMemoryFreeClear(pResInfo->userFields); taosMemoryFreeClear(pResInfo->userFields);
taosMemoryFreeClear(pResInfo->convertJson); taosMemoryFreeClear(pResInfo->convertJson);
taosMemoryFreeClear(pResInfo->decompressBuf); taosMemoryFreeClear(pResInfo->decompBuf);
if (pResInfo->convertBuf != NULL) { if (pResInfo->convertBuf != NULL) {
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {

View File

@ -2202,24 +2202,28 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
if (pRsp->compressed) { if (pRsp->compressed) {
int32_t payloadLen = htonl(pRsp->payloadLen); int32_t payloadLen = htonl(pRsp->payloadLen);
if (pResultInfo->decompressBuf == NULL) { if (pResultInfo->decompBuf == NULL) {
pResultInfo->decompressBuf = taosMemoryMalloc(payloadLen); pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
pResultInfo->decompBufSize = payloadLen;
} else { } else {
char* p = taosMemoryRealloc(pResultInfo->decompressBuf, payloadLen); if (pResultInfo->decompBufSize < payloadLen) {
char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
if (p == NULL) { if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("failed to prepare the decompress buffer, size:%d", payloadLen); tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
return terrno; return terrno;
} }
pResultInfo->decompressBuf = p; pResultInfo->decompBuf = p;
pResultInfo->decompBufSize = payloadLen;
}
} }
int32_t len = tsDecompressString((void*)pRsp->data, htonl(pRsp->compLen), 1, pResultInfo->decompressBuf, payloadLen, int32_t len = tsDecompressString((void*)pRsp->data, htonl(pRsp->compLen), 1, pResultInfo->decompBuf, payloadLen,
ONE_STAGE_COMP, NULL, 0); ONE_STAGE_COMP, NULL, 0);
ASSERT(len == payloadLen); ASSERT(len == payloadLen);
pResultInfo->pData = pResultInfo->decompressBuf; pResultInfo->pData = pResultInfo->decompBuf;
pResultInfo->payloadLen = payloadLen; pResultInfo->payloadLen = payloadLen;
} else { } else {
pResultInfo->pData = (void*)pRsp->data; pResultInfo->pData = (void*)pRsp->data;

View File

@ -41,6 +41,8 @@ typedef struct SSourceDataInfo {
SArray* pSrcUidList; SArray* pSrcUidList;
int32_t srcOpType; int32_t srcOpType;
bool tableSeq; bool tableSeq;
char* decompBuf;
int32_t decompBufSize;
} SSourceDataInfo; } SSourceDataInfo;
static void destroyExchangeOperatorInfo(void* param); static void destroyExchangeOperatorInfo(void* param);
@ -370,7 +372,10 @@ void freeBlock(void* pParam) {
void freeSourceDataInfo(void* p) { void freeSourceDataInfo(void* p) {
SSourceDataInfo* pInfo = (SSourceDataInfo*)p; SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
taosMemoryFreeClear(pInfo->decompBuf);
taosMemoryFreeClear(pInfo->pRsp); taosMemoryFreeClear(pInfo->pRsp);
pInfo->decompBufSize = 0;
} }
void doDestroyExchangeOperatorInfo(void* param) { void doDestroyExchangeOperatorInfo(void* param) {
@ -675,14 +680,24 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
char* pStart = pRetrieveRsp->data; char* pStart = pRetrieveRsp->data;
int32_t index = 0; int32_t index = 0;
int32_t code = 0; int32_t code = 0;
char* p = NULL;
if (pRetrieveRsp->compressed) { // decompress the data if (pRetrieveRsp->compressed) { // decompress the data
p = taosMemoryMalloc(pRetrieveRsp->payloadLen); if (pDataInfo->decompBuf == NULL) {
int32_t t = tsDecompressString(pRetrieveRsp->data, pRetrieveRsp->compLen, 1, p, pRetrieveRsp->payloadLen, pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
ONE_STAGE_COMP, NULL, 0); pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
} else {
if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
if (p != NULL) {
pDataInfo->decompBuf = p;
pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
}
}
}
int32_t t = tsDecompressString(pRetrieveRsp->data, pRetrieveRsp->compLen, 1, pDataInfo->decompBuf,
pRetrieveRsp->payloadLen, ONE_STAGE_COMP, NULL, 0);
ASSERT(t == pRetrieveRsp->payloadLen); ASSERT(t == pRetrieveRsp->payloadLen);
pStart = p; pStart = pDataInfo->decompBuf;
} }
while (index++ < pRetrieveRsp->numOfBlocks) { while (index++ < pRetrieveRsp->numOfBlocks) {
@ -703,7 +718,6 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
taosArrayPush(pExchangeInfo->pResultBlockList, &pb); taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
} }
taosMemoryFreeClear(p);
return code; return code;
} }