fix(tsdb): init merge if get the initial pschema failed.

This commit is contained in:
Haojun Liao 2024-09-02 16:49:47 +08:00 committed by Jinqing Kuang
parent 93568271e1
commit a72c0c2bd2
1 changed files with 40 additions and 61 deletions

View File

@ -1734,14 +1734,35 @@ static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBloc
return isCleanFileBlock; return isCleanFileBlock;
} }
static int32_t initRowMergeIfNeeded(STsdbReader* pReader, int64_t uid) {
SRowMerger* pMerger = &pReader->status.merger;
int32_t code = 0;
if (pMerger->pArray == NULL) {
STSchema* ps = getTableSchemaImpl(pReader, uid);
if (ps == NULL) {
return terrno;
}
code = tsdbRowMergerInit(pMerger, ps);
}
return code;
}
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) { static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) { if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t code = initRowMergeIfNeeded(pReader, pBlockScanInfo->uid);
if (code != 0) {
return code;
}
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader); code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader);
double el = (taosGetTimestampUs() - st) / 1000.0; double el = (taosGetTimestampUs() - st) / 1000.0;
updateComposedBlockInfo(pReader, el, pBlockScanInfo); updateComposedBlockInfo(pReader, el, pBlockScanInfo);
@ -1943,19 +1964,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
if (pMerger->pArray == NULL) { int32_t code = initRowMergeIfNeeded(pReader, pBlockScanInfo->uid);
if (pReader->info.pSchema != NULL) { if (code != 0) {
tsdbError("tsdb failed at %s:%d", __func__, __LINE__); return code;
return TSDB_CODE_INTERNAL_ERROR;
}
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
}
int32_t code = tsdbRowMergerInit(pMerger, ps);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
SRowKey minKey = k; SRowKey minKey = k;
@ -1983,7 +1994,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
// file block ---> stt block -----> mem // file block ---> stt block -----> mem
if (pkCompEx(&minKey, pfKey) == 0) { if (pkCompEx(&minKey, pfKey) == 0) {
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL); code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -1996,7 +2007,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (pkCompEx(&minKey, pSttKey) == 0) { if (pkCompEx(&minKey, pSttKey) == 0) {
TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -2007,7 +2018,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
if (pkCompEx(&minKey, &k) == 0) { if (pkCompEx(&minKey, &k) == 0) {
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema); code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -2018,7 +2029,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} }
int32_t code = tsdbRowMergerGetRow(pMerger, &pTSRow); code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -2039,19 +2050,9 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot; int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
if (pMerger->pArray == NULL) { code = initRowMergeIfNeeded(pReader, pBlockScanInfo->uid);
if (pReader->info.pSchema) { if (code != 0) {
tsdbError("tsdb failed at %s %d", __func__, __LINE__); return code;
return TSDB_CODE_INTERNAL_ERROR;
}
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
}
code = tsdbRowMergerInit(pMerger, ps);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo); bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo);
@ -2175,20 +2176,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} }
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
if (pMerger->pArray == NULL) { code = initRowMergeIfNeeded(pReader, pBlockScanInfo->uid);
if (pReader->info.pSchema != NULL) { if (code != 0) {
tsdbError("tsdb read failed at: %s:%d", __func__, __LINE__); return code;
return TSDB_CODE_INTERNAL_ERROR;
}
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
}
code = tsdbRowMergerInit(pMerger, ps);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
SRowKey minKey = k; SRowKey minKey = k;
@ -2579,20 +2569,9 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
} }
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
if (pMerger->pArray == NULL) { code = initRowMergeIfNeeded(pReader, pBlockScanInfo->uid);
if (pReader->info.pSchema != NULL) { if (code != 0) {
tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__); return code;
return TSDB_CODE_INTERNAL_ERROR;
}
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
}
code = tsdbRowMergerInit(pMerger, ps);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey); tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey);
@ -4770,13 +4749,13 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
if (pCond->suid != 0) { if (pCond->suid != 0) {
pReader->info.pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, -1, 1); pReader->info.pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, -1, 1);
if (pReader->info.pSchema == NULL) { if (pReader->info.pSchema == NULL) {
tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->info.suid, pReader->idStr); tsdbWarn("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->info.suid, pReader->idStr);
} }
} else if (numOfTables > 0) { } else if (numOfTables > 0) {
STableKeyInfo* pKey = pTableList; STableKeyInfo* pKey = pTableList;
pReader->info.pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1); pReader->info.pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
if (pReader->info.pSchema == NULL) { if (pReader->info.pSchema == NULL) {
tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr); tsdbWarn("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
} }
} }