From a72c0c2bd2c4a8eeb02598391f04e8eee25ff2bb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 2 Sep 2024 16:49:47 +0800 Subject: [PATCH] fix(tsdb): init merge if get the initial pschema failed. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 101 ++++++++++-------------- 1 file changed, 40 insertions(+), 61 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index b7bfd045d1..cecc52c51c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1734,14 +1734,35 @@ static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBloc return isCleanFileBlock; } +static int32_t initRowMergeIfNeeded(STsdbReader* pReader, int64_t uid) { + SRowMerger* pMerger = &pReader->status.merger; + int32_t code = 0; + + if (pMerger->pArray == NULL) { + STSchema* ps = getTableSchemaImpl(pReader, uid); + if (ps == NULL) { + return terrno; + } + + code = tsdbRowMergerInit(pMerger, ps); + } + + return code; +} + static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) { if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) { return TSDB_CODE_SUCCESS; } + int32_t code = initRowMergeIfNeeded(pReader, pBlockScanInfo->uid); + if (code != 0) { + return code; + } + int64_t st = taosGetTimestampUs(); SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; - int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader); + code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader); double el = (taosGetTimestampUs() - st) / 1000.0; updateComposedBlockInfo(pReader, el, pBlockScanInfo); @@ -1943,19 +1964,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized - if (pMerger->pArray == NULL) { - if (pReader->info.pSchema != NULL) { - tsdbError("tsdb failed at %s:%d", __func__, __LINE__); - return TSDB_CODE_INTERNAL_ERROR; - } - STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); - if (ps == NULL) { - return terrno; - } - int32_t code = tsdbRowMergerInit(pMerger, ps); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + int32_t code = initRowMergeIfNeeded(pReader, pBlockScanInfo->uid); + if (code != 0) { + return code; } SRowKey minKey = k; @@ -1983,7 +1994,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* // file block ---> stt block -----> mem if (pkCompEx(&minKey, pfKey) == 0) { - int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL); + code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1996,7 +2007,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (pkCompEx(&minKey, pSttKey) == 0) { TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); + code = tsdbRowMergerAdd(pMerger, fRow1, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2007,7 +2018,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } if (pkCompEx(&minKey, &k) == 0) { - int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema); + code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2018,7 +2029,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } - int32_t code = tsdbRowMergerGetRow(pMerger, &pTSRow); + code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2039,19 +2050,9 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot; // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized - if (pMerger->pArray == NULL) { - if (pReader->info.pSchema) { - tsdbError("tsdb failed at %s %d", __func__, __LINE__); - return TSDB_CODE_INTERNAL_ERROR; - } - STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); - if (ps == NULL) { - return terrno; - } - code = tsdbRowMergerInit(pMerger, ps); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = initRowMergeIfNeeded(pReader, pBlockScanInfo->uid); + if (code != 0) { + return code; } bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo); @@ -2175,20 +2176,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized - if (pMerger->pArray == NULL) { - if (pReader->info.pSchema != NULL) { - tsdbError("tsdb read failed at: %s:%d", __func__, __LINE__); - return TSDB_CODE_INTERNAL_ERROR; - } - STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); - if (ps == NULL) { - return terrno; - } - - code = tsdbRowMergerInit(pMerger, ps); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = initRowMergeIfNeeded(pReader, pBlockScanInfo->uid); + if (code != 0) { + return code; } SRowKey minKey = k; @@ -2579,20 +2569,9 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized - if (pMerger->pArray == NULL) { - if (pReader->info.pSchema != NULL) { - tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__); - return TSDB_CODE_INTERNAL_ERROR; - } - STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); - if (ps == NULL) { - return terrno; - } - - code = tsdbRowMergerInit(pMerger, ps); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = initRowMergeIfNeeded(pReader, pBlockScanInfo->uid); + if (code != 0) { + return code; } tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey); @@ -4770,13 +4749,13 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi if (pCond->suid != 0) { pReader->info.pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, -1, 1); if (pReader->info.pSchema == NULL) { - tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->info.suid, pReader->idStr); + tsdbWarn("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->info.suid, pReader->idStr); } } else if (numOfTables > 0) { STableKeyInfo* pKey = pTableList; pReader->info.pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1); if (pReader->info.pSchema == NULL) { - tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr); + tsdbWarn("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr); } }