fix:memory leak

This commit is contained in:
wangmm0220 2023-07-03 18:43:12 +08:00
parent 136bc379d1
commit dbcd33a402
1 changed files with 22 additions and 18 deletions

View File

@ -215,16 +215,12 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL;
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table;
}
if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
continue;
goto loop_table;
}
}
if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
@ -237,7 +233,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
if (TSDB_CODE_SUCCESS != code) {
continue;
goto loop_table;
}
void* createReq = taosMemoryCalloc(1, len);
SEncoder encoder = {0};
@ -246,7 +242,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (code < 0) {
tEncoderClear(&encoder);
taosMemoryFree(createReq);
continue;
goto loop_table;
}
taosArrayPush(pRsp->createTableLen, &len);
@ -256,7 +252,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
tEncoderClear(&encoder);
}
if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL){
continue;
goto loop_table;
}
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
@ -268,6 +264,12 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++;
}
continue;
loop_table:
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
}
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReader* pReader = pExec->pTqReader;
@ -277,16 +279,12 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL;
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db;
}
if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
continue;
goto loop_db;
}
}
if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
@ -299,7 +297,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
if (TSDB_CODE_SUCCESS != code) {
continue;
goto loop_db;
}
void* createReq = taosMemoryCalloc(1, len);
SEncoder encoder = {0};
@ -308,7 +306,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (code < 0) {
tEncoderClear(&encoder);
taosMemoryFree(createReq);
continue;
goto loop_db;
}
taosArrayPush(pRsp->createTableLen, &len);
@ -318,7 +316,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
tEncoderClear(&encoder);
}
if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL){
continue;
goto loop_db;
}
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
@ -330,6 +328,12 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++;
}
continue;
loop_db:
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
}
}
taosArrayDestroy(pBlocks);