fix(query): check memory malloc failure.

This commit is contained in:
Haojun Liao 2022-09-01 18:46:50 +08:00
parent 4af80fa0aa
commit fed9efa840
1 changed files with 53 additions and 26 deletions

View File

@ -184,9 +184,9 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR
static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
static void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
STsdbReader* pReader, bool* freeTSRow); STsdbReader* pReader, bool* freeTSRow);
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
STSRow** pTSRow); STSRow** pTSRow);
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader); static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader);
@ -1395,7 +1395,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} }
tRowMergerGetRow(&merge, &pTSRow); int32_t code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
@ -1422,7 +1426,11 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
} }
tRowMergerGetRow(&merge, &pTSRow); int32_t code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
@ -1456,12 +1464,16 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
tRowMergerGetRow(&merge, &pTSRow); int32_t code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tRowMergerClear(&merge); tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS; return code;
} else { } else {
ASSERT(0); ASSERT(0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1618,12 +1630,16 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} }
tRowMergerGetRow(&merge, &pTSRow); int32_t code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tRowMergerClear(&merge); tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS; return code;
} }
#if 0 #if 0
@ -1907,7 +1923,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
tRowMergerInit(&merge, &fRow, pReader->pSchema); tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow); int32_t code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
@ -3026,7 +3046,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
STsdbReader* pReader, bool* freeTSRow) { STsdbReader* pReader, bool* freeTSRow) {
TSDBROW* pNextRow = NULL; TSDBROW* pNextRow = NULL;
TSDBROW current = *pRow; TSDBROW current = *pRow;
@ -3037,19 +3057,19 @@ void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SAr
if (!pIter->hasVal) { if (!pIter->hasVal) {
*pTSRow = current.pTSRow; *pTSRow = current.pTSRow;
*freeTSRow = false; *freeTSRow = false;
return; return TSDB_CODE_SUCCESS;
} else { // has next point in mem/imem } else { // has next point in mem/imem
pNextRow = getValidMemRow(pIter, pDelList, pReader); pNextRow = getValidMemRow(pIter, pDelList, pReader);
if (pNextRow == NULL) { if (pNextRow == NULL) {
*pTSRow = current.pTSRow; *pTSRow = current.pTSRow;
*freeTSRow = false; *freeTSRow = false;
return; return TSDB_CODE_SUCCESS;
} }
if (current.pTSRow->ts != pNextRow->pTSRow->ts) { if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
*pTSRow = current.pTSRow; *pTSRow = current.pTSRow;
*freeTSRow = false; *freeTSRow = false;
return; return TSDB_CODE_SUCCESS;
} }
} }
} }
@ -3069,13 +3089,17 @@ void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SAr
tRowMergerAdd(&merge, pNextRow, pTSchema1); tRowMergerAdd(&merge, pNextRow, pTSchema1);
doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader); doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow); int32_t code = tRowMergerGetRow(&merge, pTSRow);
tRowMergerClear(&merge); if (code != TSDB_CODE_SUCCESS) {
return code;
}
tRowMergerClear(&merge);
*freeTSRow = true; *freeTSRow = true;
return TSDB_CODE_SUCCESS;
} }
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
STSRow** pTSRow) { STSRow** pTSRow) {
SRowMerger merge = {0}; SRowMerger merge = {0};
@ -3100,7 +3124,8 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
} }
tRowMergerGetRow(&merge, pTSRow); int32_t code = tRowMergerGetRow(&merge, pTSRow);
return code;
} }
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey, int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
@ -3130,28 +3155,30 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow); TSDBKEY ik = TSDBROW_KEY(piRow);
int32_t code = TSDB_CODE_SUCCESS;
if (ik.ts != k.ts) { if (ik.ts != k.ts) {
if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
} else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) { } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
} }
} else { // ik.ts == k.ts } else { // ik.ts == k.ts
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
*freeTSRow = true; *freeTSRow = true;
code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
return TSDB_CODE_SUCCESS; return code;
} }
if (pBlockScanInfo->iter.hasVal && pRow != NULL) { if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
return TSDB_CODE_SUCCESS;
} }
if (pBlockScanInfo->iiter.hasVal && piRow != NULL) { if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
return TSDB_CODE_SUCCESS;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;