From e8f7146a2ce1c42467e686242a2113e0fb08c55f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 11 Sep 2023 22:19:10 +0800 Subject: [PATCH 1/3] fix(tsdb): check the schema before merge rows in buffer, and do some refactor. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 281 ++++++++++++------------ 1 file changed, 136 insertions(+), 145 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index fa3e00ce0f..beaeefc048 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1504,6 +1504,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (ps == NULL) { return terrno; } + + int32_t code = tsdbRowMergerInit(pMerger, ps); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("failed to init row merger, code:%s", tstrerror(code)); + return code; + } } int64_t minKey = 0; @@ -1535,15 +1541,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } - // todo remove init - bool init = false; - // ASC: file block ---> last block -----> imem -----> mem // DESC: mem -----> imem -----> last block -----> file block if (pReader->info.order == TSDB_ORDER_ASC) { if (minKey == key) { - init = true; - int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1552,47 +1554,44 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - if (init) { - tsdbRowMergerAdd(pMerger, fRow1, NULL); - } else { - init = true; - int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->info.pSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; } doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } if (minKey == k.ts) { - STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - if (pSchema == NULL) { - return terrno; - } - if (init) { - tsdbRowMergerAdd(pMerger, pRow, pSchema); - } else { - init = true; - int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; + STSchema* pTSchema = NULL; + if (pRow->type == TSDBROW_ROW_FMT) { + pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + if (pTSchema == NULL) { + return terrno; } } - int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); + + int32_t code = tsdbRowMergerAdd(pMerger, pRow, pTSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } } } else { if (minKey == k.ts) { - init = true; - STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - if (pSchema == NULL) { - return terrno; + STSchema* pTSchema = NULL; + if (pRow->type == TSDBROW_ROW_FMT) { + pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + if (pTSchema == NULL) { + return terrno; + } } - int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, pRow, pTSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1605,28 +1604,18 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - if (init) { - tsdbRowMergerAdd(pMerger, fRow1, NULL); - } else { - init = true; - int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->info.pSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; } doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } if (minKey == key) { - if (init) { - tsdbRowMergerAdd(pMerger, &fRow, NULL); - } else { - init = true; - int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; } doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); } @@ -1674,7 +1663,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, pBlockScanInfo->lastKey = tsLastBlock; return TSDB_CODE_SUCCESS; } else { - code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1699,7 +1688,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, } } } else { // not merge block data - code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1742,6 +1731,12 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader if (ps == NULL) { return terrno; } + + int32_t code = tsdbRowMergerInit(pMerger, ps); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("failed to init row merger, code:%s", tstrerror(code)); + return code; + } } if (hasDataInFileBlock(pBlockData, pDumpInfo)) { @@ -1768,7 +1763,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader } // the following for key == tsLast SRow* pTSRow = NULL; - int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1776,7 +1771,10 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - tsdbRowMergerAdd(pMerger, pRow1, NULL); + code = tsdbRowMergerAdd(pMerger, pRow1, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; + } doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); @@ -1816,14 +1814,21 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); - STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - if (pSchema == NULL) { - return code; + + STSchema* pSchema = NULL; + if (pRow->type == TSDBROW_ROW_FMT) { + pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + if (pSchema == NULL) { + return terrno; + } } - STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); - if (piSchema == NULL) { - return code; + STSchema* piSchema = NULL; + if (piRow->type == TSDBROW_ROW_FMT) { + piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); + if (piSchema == NULL) { + return code; + } } // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized @@ -1833,6 +1838,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (ps == NULL) { return terrno; } + + code = tsdbRowMergerInit(pMerger, ps); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("failed to init row merger, code:%s", tstrerror(code)); + return code; + } } int64_t minKey = 0; @@ -1872,15 +1883,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - bool init = false; - // ASC: file block -----> last block -----> imem -----> mem // DESC: mem -----> imem -----> last block -----> file block if (ASCENDING_TRAVERSE(pReader->info.order)) { if (minKey == key) { - init = true; TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1890,14 +1898,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - if (init) { - tsdbRowMergerAdd(pMerger, pRow1, NULL); - } else { - init = true; - code = tsdbRowMergerAdd(pMerger, pRow1, pReader->info.pSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = tsdbRowMergerAdd(pMerger, pRow1, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; } doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, @@ -1905,14 +1908,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } if (minKey == ik.ts) { - if (init) { - tsdbRowMergerAdd(pMerger, piRow, piSchema); - } else { - init = true; - code = tsdbRowMergerAdd(pMerger, piRow, piSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = tsdbRowMergerAdd(pMerger, piRow, piSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; } code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader); @@ -1922,15 +1920,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } if (minKey == k.ts) { - if (init) { - tsdbRowMergerAdd(pMerger, pRow, pSchema); - } else { - // STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - code = tsdbRowMergerAdd(pMerger, pRow, pSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = tsdbRowMergerAdd(pMerger, pRow, pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; } + code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1938,7 +1932,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } else { if (minKey == k.ts) { - init = true; code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1951,15 +1944,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } if (minKey == ik.ts) { - if (init) { - tsdbRowMergerAdd(pMerger, piRow, piSchema); - } else { - init = true; - code = tsdbRowMergerAdd(pMerger, piRow, piSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = tsdbRowMergerAdd(pMerger, piRow, piSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; } + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1968,29 +1957,22 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - if (init) { - tsdbRowMergerAdd(pMerger, pRow1, NULL); - } else { - init = true; - code = tsdbRowMergerAdd(pMerger, pRow1, pReader->info.pSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = tsdbRowMergerAdd(pMerger, pRow1, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; } + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } if (minKey == key) { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - if (!init) { - code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } else { - tsdbRowMergerAdd(pMerger, &fRow, NULL); + code = tsdbRowMergerAdd(pMerger, &fRow, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; } + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); } } @@ -2184,6 +2166,12 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc if (ps == NULL) { return terrno; } + + code = tsdbRowMergerInit(pMerger, ps); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("failed to init row merger, code:%s", tstrerror(code)); + return code; + } } if (copied) { @@ -2193,7 +2181,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); SRow* pTSRow = NULL; - code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3328,16 +3316,15 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe break; } + STSchema* pTSchema = NULL; if (pRow->type == TSDBROW_ROW_FMT) { - STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid); + pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid); if (pTSchema == NULL) { return terrno; } - - tsdbRowMergerAdd(pMerger, pRow, pTSchema); - } else { // column format - tsdbRowMergerAdd(pMerger, pRow, NULL); } + + tsdbRowMergerAdd(pMerger, pRow, pTSchema); } return TSDB_CODE_SUCCESS; @@ -3473,31 +3460,30 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, int32_t code = 0; // start to merge duplicated rows - if (current.type == TSDBROW_ROW_FMT) { - // get the correct schema for data in memory - STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(¤t), pReader, uid); + STSchema* pTSchema = NULL; + if (current.type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory + pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(¤t), pReader, uid); if (pTSchema == NULL) { return terrno; } + } - code = tsdbRowMergerAdd(&pReader->status.merger, ¤t, pTSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = tsdbRowMergerAdd(&pReader->status.merger, ¤t, pTSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid); + STSchema* pTSchema1 = NULL; + if (pNextRow->type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory + pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid); if (pTSchema1 == NULL) { return terrno; } + } - tsdbRowMergerAdd(&pReader->status.merger, pNextRow, pTSchema1); - } else { // let's merge rows in file block - code = tsdbRowMergerAdd(&pReader->status.merger, ¤t, pReader->info.pSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - tsdbRowMergerAdd(&pReader->status.merger, pNextRow, NULL); + code = tsdbRowMergerAdd(&pReader->status.merger, pNextRow, pTSchema1); + if (code != TSDB_CODE_SUCCESS) { + return code; } code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(¤t), pDelList, pReader); @@ -3523,14 +3509,21 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); - STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - if (pSchema == NULL) { - return terrno; + + STSchema* pSchema = NULL; + if (pRow->type == TSDBROW_ROW_FMT) { + pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + if (pSchema == NULL) { + return terrno; + } } - STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); - if (piSchema == NULL) { - return terrno; + STSchema* piSchema = NULL; + if (piRow->type == TSDBROW_ROW_FMT) { + piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); + if (piSchema == NULL) { + return terrno; + } } if (ASCENDING_TRAVERSE(pReader->info.order)) { // ascending order imem --> mem @@ -4898,10 +4891,10 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pMem = pTsdb->mem; pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode)); if (pSnap->pNode == NULL) { - taosThreadRwlockUnlock(&pTsdb->rwLock); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } + pSnap->pNode->pQHandle = pReader; pSnap->pNode->reseek = reseek; @@ -4912,10 +4905,10 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pIMem = pTsdb->imem; pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode)); if (pSnap->pINode == NULL) { - taosThreadRwlockUnlock(&pTsdb->rwLock); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } + pSnap->pINode->pQHandle = pReader; pSnap->pINode->reseek = reseek; @@ -4924,18 +4917,16 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs // fs code = tsdbFSCreateRefSnapshot(pTsdb->pFS, &pSnap->pfSetArray); - if (code) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _exit; + if (code == TSDB_CODE_SUCCESS) { + tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), pReader->idStr); } - // unlock +_exit: taosThreadRwlockUnlock(&pTsdb->rwLock); - tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("vgId:%d take read snapshot failed, %s code:%s", TD_VID(pTsdb->pVnode), pReader->idStr, tstrerror(code)); -_exit: - if (code) { *ppSnap = NULL; if (pSnap) { if (pSnap->pNode) taosMemoryFree(pSnap->pNode); From a5b93aaf97e831f810216468cfc8b4d466e9b2df Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 11 Sep 2023 23:55:34 +0800 Subject: [PATCH 2/3] fix(tsdb): fix invalid read. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index beaeefc048..e4aba7011c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4918,14 +4918,14 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs // fs code = tsdbFSCreateRefSnapshot(pTsdb->pFS, &pSnap->pfSetArray); if (code == TSDB_CODE_SUCCESS) { - tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), pReader->idStr); + tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); } _exit: taosThreadRwlockUnlock(&pTsdb->rwLock); if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d take read snapshot failed, %s code:%s", TD_VID(pTsdb->pVnode), pReader->idStr, tstrerror(code)); + tsdbError("vgId:%d take read snapshot failed, code:%s", TD_VID(pTsdb->pVnode), tstrerror(code)); *ppSnap = NULL; if (pSnap) { From f602aa965f74a1548e2297445baaf9efe829f473 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 12 Sep 2023 09:16:18 +0800 Subject: [PATCH 3/3] fix(stream): fix coverity scan issues. --- source/dnode/mnode/impl/src/mndStream.c | 3 ++- source/dnode/vnode/src/tq/tqUtil.c | 2 ++ source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 7 ++++++- source/libs/parser/src/parTranslater.c | 2 -- source/libs/stream/src/stream.c | 1 - source/libs/stream/src/streamCheckpoint.c | 8 ++++++-- source/libs/stream/src/streamMeta.c | 4 ++++ source/libs/stream/src/streamQueue.c | 12 ++++++++++-- 8 files changed, 30 insertions(+), 9 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 66acbcc05b..049b4e737a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1922,6 +1922,7 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans) { SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); if (pCommitRaw == NULL) { mError("failed to encode stream since %s", terrstr()); + mndTransDrop(pTrans); return -1; } @@ -1988,6 +1989,7 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pBuf); taosWUnLockLatch(&pStream->lock); + mndTransDrop(pTrans); return -1; } } @@ -1998,7 +2000,6 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr int32_t code = mndPersistTransLog(pStream, pTrans); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); return -1; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index f1b154a4ac..52862ea67b 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -437,6 +437,8 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream taosArrayDestroy(pRes->uidList); *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); if (*pRefBlock == NULL) { + blockDataCleanup(pDelBlock); + taosMemoryFree(pDelBlock); return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 1f3c8b54ec..74eb1c7302 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -273,7 +273,12 @@ SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) { pIter->pCurrentBlk = taosArrayGet(pIter->pBrinBlockList, pIter->blockIndex); tBrinBlockClear(&pIter->block); - tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block); + int32_t code = tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("failed to read brinBlock from file, code:%s", tstrerror(code)); + return NULL; + } + pIter->recordIndex = -1; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8008f4397e..cae8c5d5e8 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -7007,7 +7007,6 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta int32_t code = nodesListStrictAppend(pParamterList, (SNode*)col); if (code) { - nodesDestroyNode((SNode*)col); nodesDestroyList(pParamterList); return code; } @@ -7025,7 +7024,6 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta } code = nodesListStrictAppend(pProjectionList, pFunc); if (code) { - nodesDestroyNode(pFunc); nodesDestroyList(pProjectionList); return code; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 1f93498557..32b36f8848 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -83,7 +83,6 @@ static void streamSchedByTimer(void* param, void* tmrId) { atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE); pTrigger->pBlock->info.type = STREAM_GET_ALL; if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) { - taosFreeQitem(pTrigger); taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); return; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 361602fac9..cc93d25fd5 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -125,7 +125,6 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint taosMemoryFree(pBlock); if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pChkpoint) < 0) { - taosFreeQitem(pChkpoint); return TSDB_CODE_OUT_OF_MEMORY; } @@ -271,7 +270,12 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { keys[0] = pId->streamId; keys[1] = pId->taskId; - SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + SStreamTask** ppTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + if (ppTask == NULL) { + continue; + } + + SStreamTask* p = *ppTask; if (p->info.fillHistory == 1) { continue; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5bc21286d7..a11f4a8b26 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -418,6 +418,10 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) { int64_t keys[2] = {pId->streamId, pId->taskId}; SStreamTask** p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + if (p == NULL) { + continue; + } + if ((*p)->info.fillHistory == 0) { num += 1; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 62873f37bc..bd64e0779a 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -312,12 +312,20 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) } } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { - taosWriteQitem(pQueue, pItem); + int32_t code = taosWriteQitem(pQueue, pItem); + if (code != TSDB_CODE_SUCCESS) { + taosFreeQitem(pItem); + return code; + } qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size); } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. - taosWriteQitem(pQueue, pItem); + int32_t code = taosWriteQitem(pQueue, pItem); + if (code != TSDB_CODE_SUCCESS) { + taosFreeQitem(pItem); + return code; + } qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } else { ASSERT(0);