fix(query): check the init status for pReader->status.merger

This commit is contained in:
Haojun Liao 2023-04-27 10:15:52 +08:00
parent db1d75bb48
commit fe3f3f478a
1 changed files with 76 additions and 39 deletions

View File

@ -246,8 +246,6 @@ static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order
static STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id);
static STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
@ -1354,17 +1352,41 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE STSchema* getTableSchemaImpl(STsdbReader* pReader, uint64_t uid) {
ASSERT(pReader->pSchema == NULL);
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, -1, &pReader->pSchema);
if (code != TSDB_CODE_SUCCESS || pReader->pSchema == NULL) {
terrno = code;
tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr);
return NULL;
}
code = tsdbRowMergerInit(&pReader->status.merger, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
tsdbError("failed to init merger, code:%s, %s", tstrerror(code), pReader->idStr);
return NULL;
}
return pReader->pSchema;
}
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
uint64_t uid) {
int32_t code = 0;
STSchema* pSchema = pReader->pSchema;
int64_t st = taosGetTimestampUs();
tBlockDataReset(pBlockData);
STSchema* pSchema = getLatestTableSchema(pReader, uid);
if (pReader->pSchema == NULL) {
pSchema = getTableSchemaImpl(pReader, uid);
if (pSchema == NULL) {
tsdbDebug("%p table uid:%" PRIu64 " has been dropped, no data existed, %s", pReader, uid, pReader->idStr);
return code;
}
}
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
TABLEID tid = {.suid = pReader->suid, .uid = uid};
@ -1912,33 +1934,11 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
return code;
}
STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
if (pReader->pSchema != NULL) {
return pReader->pSchema;
}
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, -1, &pReader->pSchema);
if (code != TSDB_CODE_SUCCESS || pReader->pSchema == NULL) {
tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr);
}
return pReader->pSchema;
}
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
int32_t code = 0;
// always set the newest schema version in pReader->pSchema
if (pReader->pSchema == NULL) {
code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, -1, &pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
code = tsdbRowMergerInit(&pReader->status.merger, pReader->pSchema);
if (code != 0) {
terrno = code;
STSchema* ps = getTableSchemaImpl(pReader, uid);
if (ps == NULL) {
return NULL;
}
}
@ -1953,7 +1953,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
}
STSchema* ptr = NULL;
code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr);
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
@ -1982,6 +1982,15 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
// merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized
if (pMerger->pArray == NULL) {
ASSERT(pReader->pSchema == NULL);
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
}
}
int64_t minKey = 0;
if (pReader->order == TSDB_ORDER_ASC) {
minKey = INT64_MAX; // chosen the minimum value
@ -2011,13 +2020,14 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
// todo remove init
bool init = false;
// ASC: file block ---> last block -----> imem -----> mem
// DESC: mem -----> imem -----> last block -----> file block
if (pReader->order == TSDB_ORDER_ASC) {
if (minKey == key) {
init = true; // todo check if pReader->pSchema is null or not
init = true;
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -2203,6 +2213,16 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SRowMerger* pMerger = &pReader->status.merger;
// merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized
if (pMerger->pArray == NULL) {
ASSERT(pReader->pSchema == NULL);
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
}
}
if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
// no last block available, only data block exists
@ -2220,8 +2240,6 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key == ts) {
SRow* pTSRow = NULL;
SRowMerger* pMerger = &pReader->status.merger;
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -2285,6 +2303,15 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
// merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized
if (pMerger->pArray == NULL) {
ASSERT(pReader->pSchema == NULL);
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
}
}
int64_t minKey = 0;
if (ASCENDING_TRAVERSE(pReader->order)) {
minKey = INT64_MAX; // let's find the minimum
@ -2596,6 +2623,7 @@ bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo*
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
STsdbReader* pReader) {
SRowMerger* pMerger = &pReader->status.merger;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
bool copied = false;
int32_t code = tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo, &copied);
@ -2603,6 +2631,15 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
return code;
}
// merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized
if (pMerger->pArray == NULL) {
ASSERT(pReader->pSchema == NULL);
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
}
}
if (copied) {
pBlockScanInfo->lastKey = key;
return TSDB_CODE_SUCCESS;
@ -2610,13 +2647,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
SRow* pTSRow = NULL;
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
code = tsdbRowMergerGetRow(&pReader->status.merger, &pTSRow);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -2624,7 +2661,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(&pReader->status.merger);
tsdbRowMergerClear(pMerger);
return code;
}
}