fix(tsdb): check the return value.

This commit is contained in:
Haojun Liao 2024-09-23 10:57:19 +08:00
parent 0335799e1c
commit b8f06a06db
2 changed files with 100 additions and 58 deletions

View File

@ -82,7 +82,7 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil
static bool hasDataInSttBlock(STableBlockScanInfo* pInfo); static bool hasDataInSttBlock(STableBlockScanInfo* pInfo);
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
static void resetTableListIndex(SReaderStatus* pStatus); static int32_t resetTableListIndex(SReaderStatus* pStatus, const char* id);
static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t* pMinKey); static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t* pMinKey);
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo); static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo);
static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader); static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader);
@ -257,9 +257,8 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA
if (pIter->pSttBlockReader == NULL) { if (pIter->pSttBlockReader == NULL) {
pIter->pSttBlockReader = taosMemoryCalloc(1, sizeof(struct SSttBlockReader)); pIter->pSttBlockReader = taosMemoryCalloc(1, sizeof(struct SSttBlockReader));
if (pIter->pSttBlockReader == NULL) { if (pIter->pSttBlockReader == NULL) {
code = terrno; tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(terrno), pReader->idStr);
tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr); return terrno;
return code;
} }
} }
@ -381,14 +380,16 @@ _err:
bool shouldFreePkBuf(SBlockLoadSuppInfo* pSupp) { return (pSupp->numOfPks > 0) && IS_VAR_DATA_TYPE(pSupp->pk.type); } bool shouldFreePkBuf(SBlockLoadSuppInfo* pSupp) { return (pSupp->numOfPks > 0) && IS_VAR_DATA_TYPE(pSupp->pk.type); }
int32_t resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree) { int32_t resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree, const char* id) {
pIter->order = order; pIter->order = order;
pIter->index = -1; pIter->index = -1;
pIter->numOfBlocks = 0; pIter->numOfBlocks = 0;
if (pIter->blockList == NULL) { if (pIter->blockList == NULL) {
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
if (pIter->blockList == NULL) { if (pIter->blockList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; tsdbError("%s failed to reset block iter, func:%s at line:%d code:%s", id, __func__, __LINE__, tstrerror(terrno));
return terrno;
} }
} else { } else {
clearDataBlockIterator(pIter, needFree); clearDataBlockIterator(pIter, needFree);
@ -584,7 +585,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
code = tBlockDataCreate(&pReader->status.fileBlockData); code = tBlockDataCreate(&pReader->status.fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto _end; goto _end;
} }
@ -3068,12 +3068,18 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
} }
// pTableIter can be NULL, no need to handle the return value // pTableIter can be NULL, no need to handle the return value
static void resetTableListIndex(SReaderStatus* pStatus) { static int32_t resetTableListIndex(SReaderStatus* pStatus, const char* id) {
STableUidList* pList = &pStatus->uidList; STableUidList* pList = &pStatus->uidList;
pList->currentIndex = 0; pList->currentIndex = 0;
uint64_t uid = pList->tableUidList[0]; uint64_t uid = pList->tableUidList[0];
pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid)); pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
if (pStatus->pTableIter == NULL) {
tsdbError("%s failed to load tableBlockScanInfo for uid:%"PRId64", code: internal error", id, uid);
return TSDB_CODE_INTERNAL_ERROR;
}
return 0;
} }
static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus) { static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus) {
@ -3205,8 +3211,8 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
} }
// load the last data block of current table // load the last data block of current table
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; STableBlockScanInfo* pScanInfo = NULL;
if (pScanInfo == NULL) { if (pStatus->pTableIter == NULL) {
tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr); tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr);
bool hasNexTable = moveToNextTable(pUidList, pStatus); bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) { if (!hasNexTable) {
@ -3214,6 +3220,8 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
} }
continue; continue;
} else {
pScanInfo = *(STableBlockScanInfo**) pStatus->pTableIter;
} }
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
@ -3558,14 +3566,21 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList); code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList);
} else { // no block data, only last block exists } else { // no block data, only last block exists
tBlockDataReset(&pReader->status.fileBlockData); tBlockDataReset(&pReader->status.fileBlockData);
code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo), pReader->idStr);
resetTableListIndex(&pReader->status); if (code) {
return code;
}
code = resetTableListIndex(&pReader->status, pReader->idStr);
if (code) {
return code;
}
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) { // set the correct start position according to the query time window
// set the correct start position according to the query time window
initBlockDumpInfo(pReader, pBlockIter); initBlockDumpInfo(pReader, pBlockIter);
} }
taosArrayDestroy(pTableList); taosArrayDestroy(pTableList);
return code; return code;
} }
@ -3580,38 +3595,40 @@ typedef enum {
TSDB_READ_CONTINUE = 0x2, TSDB_READ_CONTINUE = 0x2,
} ERetrieveType; } ERetrieveType;
static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) { static int32_t doReadDataFromSttFiles(STsdbReader* pReader, ERetrieveType* pReturnType) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
SDataBlockIter* pBlockIter = &pReader->status.blockIter; SDataBlockIter* pBlockIter = &pReader->status.blockIter;
*pReturnType = TSDB_READ_RETURN;
tsdbDebug("seq load data blocks from stt files %s", pReader->idStr); tsdbDebug("seq load data blocks from stt files %s", pReader->idStr);
while (1) { while (1) {
terrno = 0;
code = doLoadSttBlockSequentially(pReader); code = doLoadSttBlockSequentially(pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; *pReturnType = TSDB_READ_RETURN;
return TSDB_READ_RETURN; return code;
} }
if (pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {
return TSDB_READ_RETURN; *pReturnType = TSDB_READ_RETURN;
return code;
} }
// all data blocks are checked in this stt file, now let's try the next file set // all data blocks are checked in this stt file, now let's try the next file set
if (pReader->status.pTableIter != NULL) { if (pReader->status.pTableIter != NULL) {
terrno = TSDB_CODE_INTERNAL_ERROR; code = TSDB_CODE_INTERNAL_ERROR;
tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__); tsdbError("tsdb reader failed at: %s:%d, code:%s", __func__, __LINE__, tstrerror(code));
return TSDB_READ_RETURN; return code;
} }
code = initForFirstBlockInFile(pReader, pBlockIter); code = initForFirstBlockInFile(pReader, pBlockIter);
// error happens or all the data files are completely checked // error happens or all the data files are completely checked
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
terrno = code; *pReturnType = TSDB_READ_RETURN;
return TSDB_READ_RETURN; return code;
} }
if (pReader->status.bProcMemPreFileset) { if (pReader->status.bProcMemPreFileset) {
@ -3621,15 +3638,18 @@ static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) {
} }
if (pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {
pReader->status.processingMemPreFileSet = true; pReader->status.processingMemPreFileSet = true;
return TSDB_READ_RETURN; *pReturnType = TSDB_READ_RETURN;
return code;
} }
} }
if (pBlockIter->numOfBlocks > 0) { // there are data blocks existed. if (pBlockIter->numOfBlocks > 0) { // there are data blocks existed.
return TSDB_READ_CONTINUE; *pReturnType = TSDB_READ_CONTINUE;
} else { // all blocks in data file are checked, let's check the data in last files } else { // all blocks in data file are checked, let's check the data in stt-files
resetTableListIndex(&pReader->status); code = resetTableListIndex(&pReader->status, pReader->idStr);
} }
return code;
} }
} }
@ -3637,15 +3657,18 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
bool asc = ASCENDING_TRAVERSE(pReader->info.order); bool asc = ASCENDING_TRAVERSE(pReader->info.order);
SDataBlockIter* pBlockIter = &pReader->status.blockIter; SDataBlockIter* pBlockIter = &pReader->status.blockIter;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData;
const char* id = pReader->idStr;
if (pBlockIter->numOfBlocks == 0) { if (pBlockIter->numOfBlocks == 0) {
// let's try to extract data from stt files. // let's try to extract data from stt files.
terrno = 0; ERetrieveType type = 0;
ERetrieveType type = doReadDataFromSttFiles(pReader); code = doReadDataFromSttFiles(pReader, &type);
if (type == TSDB_READ_RETURN) { if (code != 0 || type == TSDB_READ_RETURN) {
return terrno; return code;
} }
code = doBuildDataBlock(pReader); code = doBuildDataBlock(pReader);
@ -3654,9 +3677,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
} }
} }
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData;
while (1) { while (1) {
if (fileBlockPartiallyRead(pDumpInfo, asc)) { // file data block is partially loaded if (fileBlockPartiallyRead(pDumpInfo, asc)) { // file data block is partially loaded
code = buildComposedDataBlock(pReader); code = buildComposedDataBlock(pReader);
@ -3675,15 +3695,20 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
} }
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo), id);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
resetTableListIndex(&pReader->status);
ERetrieveType type = doReadDataFromSttFiles(pReader); code = resetTableListIndex(&pReader->status, id);
if (type == TSDB_READ_RETURN) { if (code != TSDB_CODE_SUCCESS) {
return terrno; return code;
}
ERetrieveType type = 0;
code = doReadDataFromSttFiles(pReader, &type);
if (code != 0 || type == TSDB_READ_RETURN) {
return code;
} }
} }
} }
@ -4635,7 +4660,7 @@ uint64_t tsdbGetReaderMaxVersion2(STsdbReader* pReader) { return pReader->info.v
static int32_t doOpenReaderImpl(STsdbReader* pReader) { static int32_t doOpenReaderImpl(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter; SDataBlockIter* pBlockIter = &pStatus->blockIter;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pReader->bFilesetDelimited) { if (pReader->bFilesetDelimited) {
getMemTableTimeRange(pReader, &pReader->status.memTableMaxKey, &pReader->status.memTableMinKey); getMemTableTimeRange(pReader, &pReader->status.memTableMaxKey, &pReader->status.memTableMinKey);
@ -4647,7 +4672,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
return code; return code;
} }
code = resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); code = resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo),
pReader->idStr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -4659,7 +4685,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
} }
if (!pStatus->loadFromFile) { if (!pStatus->loadFromFile) {
resetTableListIndex(pStatus); code = resetTableListIndex(pStatus, pReader->idStr);
} }
return code; return code;
@ -5132,7 +5158,11 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
} }
if (pBlock->info.rows <= 0) { if (pBlock->info.rows <= 0) {
resetTableListIndex(&pReader->status); code = resetTableListIndex(&pReader->status, pReader->idStr);
if (code) {
return code;
}
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
code = buildBlockFromBufferSequentially(pReader, endKey); code = buildBlockFromBufferSequentially(pReader, endKey);
} }
@ -5155,7 +5185,11 @@ static int32_t doTsdbNextDataBlockFilesFirst(STsdbReader* pReader) {
} }
if (pBlock->info.rows <= 0) { if (pBlock->info.rows <= 0) {
resetTableListIndex(&pReader->status); code = resetTableListIndex(&pReader->status, pReader->idStr);
if (code) {
return code;
}
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
code = buildBlockFromBufferSequentially(pReader, endKey); code = buildBlockFromBufferSequentially(pReader, endKey);
} }
@ -5573,13 +5607,17 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
return code; return code;
} }
code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo), pReader->idStr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
(void) tsdbReleaseReader(pReader); (void) tsdbReleaseReader(pReader);
return code; return code;
} }
resetTableListIndex(&pReader->status); code = resetTableListIndex(&pReader->status, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
(void) tsdbReleaseReader(pReader);
return code;
}
bool asc = ASCENDING_TRAVERSE(pReader->info.order); bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
@ -5594,7 +5632,11 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
// no data in files, let's try buffer in memory // no data in files, let's try buffer in memory
if (pStatus->fileIter.numOfFiles == 0) { if (pStatus->fileIter.numOfFiles == 0) {
pStatus->loadFromFile = false; pStatus->loadFromFile = false;
resetTableListIndex(pStatus); code = resetTableListIndex(pStatus, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
(void) tsdbReleaseReader(pReader);
return code;
}
} else { } else {
code = initForFirstBlockInFile(pReader, pBlockIter); code = initForFirstBlockInFile(pReader, pBlockIter);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -5831,9 +5873,9 @@ int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_
metaReaderDoInit(&mr, pMeta, META_READER_LOCK); metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid); int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; code = TSDB_CODE_TDB_INVALID_TABLE_ID;
metaReaderClear(&mr); metaReaderClear(&mr);
return terrno; return code;
} }
*suid = 0; *suid = 0;
@ -5844,15 +5886,15 @@ int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_
*suid = mr.me.ctbEntry.suid; *suid = mr.me.ctbEntry.suid;
code = metaReaderGetTableEntryByUidCache(&mr, *suid); code = metaReaderGetTableEntryByUidCache(&mr, *suid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; code = TSDB_CODE_TDB_INVALID_TABLE_ID;
metaReaderClear(&mr); metaReaderClear(&mr);
return terrno; return code;
} }
} else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing } else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing
} else { } else {
terrno = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
metaReaderClear(&mr); metaReaderClear(&mr);
return terrno; return code;
} }
metaReaderClear(&mr); metaReaderClear(&mr);

View File

@ -357,7 +357,7 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in
void clearRowKey(SRowKey* pKey); void clearRowKey(SRowKey* pKey);
bool shouldFreePkBuf(SBlockLoadSuppInfo* pSupp); bool shouldFreePkBuf(SBlockLoadSuppInfo* pSupp);
int32_t resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk); int32_t resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk, const char* id);
void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree); void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree);
void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk); void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk);