Merge pull request #28024 from taosdata/fix/syntax
fix(tsdb): check the return value.
This commit is contained in:
commit
66cfee2c0e
|
@ -82,7 +82,7 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil
|
|||
static bool hasDataInSttBlock(STableBlockScanInfo* pInfo);
|
||||
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
|
||||
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
|
||||
static void resetTableListIndex(SReaderStatus* pStatus);
|
||||
static int32_t resetTableListIndex(SReaderStatus* pStatus, const char* id);
|
||||
static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t* pMinKey);
|
||||
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo);
|
||||
static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader);
|
||||
|
@ -257,9 +257,8 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA
|
|||
if (pIter->pSttBlockReader == NULL) {
|
||||
pIter->pSttBlockReader = taosMemoryCalloc(1, sizeof(struct SSttBlockReader));
|
||||
if (pIter->pSttBlockReader == NULL) {
|
||||
code = terrno;
|
||||
tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
|
||||
return code;
|
||||
tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(terrno), pReader->idStr);
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -381,13 +380,15 @@ _err:
|
|||
|
||||
bool shouldFreePkBuf(SBlockLoadSuppInfo* pSupp) { return (pSupp->numOfPks > 0) && IS_VAR_DATA_TYPE(pSupp->pk.type); }
|
||||
|
||||
int32_t resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree) {
|
||||
int32_t resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree, const char* id) {
|
||||
pIter->order = order;
|
||||
pIter->index = -1;
|
||||
pIter->numOfBlocks = 0;
|
||||
|
||||
if (pIter->blockList == NULL) {
|
||||
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
|
||||
if (pIter->blockList == NULL) {
|
||||
tsdbError("%s failed to reset block iter, func:%s at line:%d code:%s", id, __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
} else {
|
||||
|
@ -584,7 +585,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
|
|||
|
||||
code = tBlockDataCreate(&pReader->status.fileBlockData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
|
@ -3072,12 +3072,18 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
|
|||
}
|
||||
|
||||
// pTableIter can be NULL, no need to handle the return value
|
||||
static void resetTableListIndex(SReaderStatus* pStatus) {
|
||||
static int32_t resetTableListIndex(SReaderStatus* pStatus, const char* id) {
|
||||
STableUidList* pList = &pStatus->uidList;
|
||||
|
||||
pList->currentIndex = 0;
|
||||
uint64_t uid = pList->tableUidList[0];
|
||||
pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||
if (pStatus->pTableIter == NULL) {
|
||||
tsdbError("%s failed to load tableBlockScanInfo for uid:%"PRId64", code: internal error", id, uid);
|
||||
return TSDB_CODE_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus) {
|
||||
|
@ -3209,8 +3215,8 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
// load the last data block of current table
|
||||
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
|
||||
if (pScanInfo == NULL) {
|
||||
STableBlockScanInfo* pScanInfo = NULL;
|
||||
if (pStatus->pTableIter == NULL) {
|
||||
tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr);
|
||||
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
||||
if (!hasNexTable) {
|
||||
|
@ -3218,6 +3224,8 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
continue;
|
||||
} else {
|
||||
pScanInfo = *(STableBlockScanInfo**) pStatus->pTableIter;
|
||||
}
|
||||
|
||||
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
|
||||
|
@ -3562,14 +3570,21 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
|
|||
code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList);
|
||||
} else { // no block data, only last block exists
|
||||
tBlockDataReset(&pReader->status.fileBlockData);
|
||||
code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
|
||||
resetTableListIndex(&pReader->status);
|
||||
code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo), pReader->idStr);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = resetTableListIndex(&pReader->status, pReader->idStr);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
// set the correct start position according to the query time window
|
||||
if (code == TSDB_CODE_SUCCESS) { // set the correct start position according to the query time window
|
||||
initBlockDumpInfo(pReader, pBlockIter);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTableList);
|
||||
return code;
|
||||
}
|
||||
|
@ -3584,38 +3599,40 @@ typedef enum {
|
|||
TSDB_READ_CONTINUE = 0x2,
|
||||
} ERetrieveType;
|
||||
|
||||
static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) {
|
||||
static int32_t doReadDataFromSttFiles(STsdbReader* pReader, ERetrieveType* pReturnType) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
||||
|
||||
*pReturnType = TSDB_READ_RETURN;
|
||||
|
||||
tsdbDebug("seq load data blocks from stt files %s", pReader->idStr);
|
||||
|
||||
while (1) {
|
||||
terrno = 0;
|
||||
|
||||
code = doLoadSttBlockSequentially(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return TSDB_READ_RETURN;
|
||||
*pReturnType = TSDB_READ_RETURN;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pResBlock->info.rows > 0) {
|
||||
return TSDB_READ_RETURN;
|
||||
*pReturnType = TSDB_READ_RETURN;
|
||||
return code;
|
||||
}
|
||||
|
||||
// all data blocks are checked in this stt file, now let's try the next file set
|
||||
if (pReader->status.pTableIter != NULL) {
|
||||
terrno = TSDB_CODE_INTERNAL_ERROR;
|
||||
tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_READ_RETURN;
|
||||
code = TSDB_CODE_INTERNAL_ERROR;
|
||||
tsdbError("tsdb reader failed at: %s:%d, code:%s", __func__, __LINE__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||
|
||||
// error happens or all the data files are completely checked
|
||||
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
|
||||
terrno = code;
|
||||
return TSDB_READ_RETURN;
|
||||
*pReturnType = TSDB_READ_RETURN;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pReader->status.bProcMemPreFileset) {
|
||||
|
@ -3625,14 +3642,19 @@ static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) {
|
|||
}
|
||||
if (pResBlock->info.rows > 0) {
|
||||
pReader->status.processingMemPreFileSet = true;
|
||||
return TSDB_READ_RETURN;
|
||||
*pReturnType = TSDB_READ_RETURN;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
if (pBlockIter->numOfBlocks > 0) { // there are data blocks existed.
|
||||
return TSDB_READ_CONTINUE;
|
||||
} else { // all blocks in data file are checked, let's check the data in last files
|
||||
resetTableListIndex(&pReader->status);
|
||||
*pReturnType = TSDB_READ_CONTINUE;
|
||||
return code;
|
||||
} else { // all blocks in data file are checked, let's check the data in stt-files
|
||||
code = resetTableListIndex(&pReader->status, pReader->idStr);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3641,15 +3663,18 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||
|
||||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
||||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
||||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||
const char* id = pReader->idStr;
|
||||
|
||||
if (pBlockIter->numOfBlocks == 0) {
|
||||
// let's try to extract data from stt files.
|
||||
terrno = 0;
|
||||
ERetrieveType type = doReadDataFromSttFiles(pReader);
|
||||
if (type == TSDB_READ_RETURN) {
|
||||
return terrno;
|
||||
ERetrieveType type = 0;
|
||||
code = doReadDataFromSttFiles(pReader, &type);
|
||||
if (code != 0 || type == TSDB_READ_RETURN) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = doBuildDataBlock(pReader);
|
||||
|
@ -3658,9 +3683,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
|||
}
|
||||
}
|
||||
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||
|
||||
while (1) {
|
||||
if (fileBlockPartiallyRead(pDumpInfo, asc)) { // file data block is partially loaded
|
||||
code = buildComposedDataBlock(pReader);
|
||||
|
@ -3679,15 +3701,20 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
tBlockDataReset(pBlockData);
|
||||
code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
|
||||
code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo), id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
resetTableListIndex(&pReader->status);
|
||||
|
||||
ERetrieveType type = doReadDataFromSttFiles(pReader);
|
||||
if (type == TSDB_READ_RETURN) {
|
||||
return terrno;
|
||||
code = resetTableListIndex(&pReader->status, id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
ERetrieveType type = 0;
|
||||
code = doReadDataFromSttFiles(pReader, &type);
|
||||
if (code != 0 || type == TSDB_READ_RETURN) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4649,7 +4676,7 @@ uint64_t tsdbGetReaderMaxVersion2(STsdbReader* pReader) { return pReader->info.v
|
|||
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (pReader->bFilesetDelimited) {
|
||||
getMemTableTimeRange(pReader, &pReader->status.memTableMaxKey, &pReader->status.memTableMinKey);
|
||||
|
@ -4661,7 +4688,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
|||
return code;
|
||||
}
|
||||
|
||||
code = resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
|
||||
code = resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo),
|
||||
pReader->idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -4673,7 +4701,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
if (!pStatus->loadFromFile) {
|
||||
resetTableListIndex(pStatus);
|
||||
code = resetTableListIndex(pStatus, pReader->idStr);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -5146,7 +5174,11 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
if (pBlock->info.rows <= 0) {
|
||||
resetTableListIndex(&pReader->status);
|
||||
code = resetTableListIndex(&pReader->status, pReader->idStr);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
|
||||
code = buildBlockFromBufferSequentially(pReader, endKey);
|
||||
}
|
||||
|
@ -5169,7 +5201,11 @@ static int32_t doTsdbNextDataBlockFilesFirst(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
if (pBlock->info.rows <= 0) {
|
||||
resetTableListIndex(&pReader->status);
|
||||
code = resetTableListIndex(&pReader->status, pReader->idStr);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
|
||||
code = buildBlockFromBufferSequentially(pReader, endKey);
|
||||
}
|
||||
|
@ -5587,13 +5623,17 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
|||
return code;
|
||||
}
|
||||
|
||||
code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
|
||||
code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo), pReader->idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
(void) tsdbReleaseReader(pReader);
|
||||
return code;
|
||||
}
|
||||
|
||||
resetTableListIndex(&pReader->status);
|
||||
code = resetTableListIndex(&pReader->status, pReader->idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
(void) tsdbReleaseReader(pReader);
|
||||
return code;
|
||||
}
|
||||
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||
int32_t step = asc ? 1 : -1;
|
||||
|
@ -5608,7 +5648,11 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
|||
// no data in files, let's try buffer in memory
|
||||
if (pStatus->fileIter.numOfFiles == 0) {
|
||||
pStatus->loadFromFile = false;
|
||||
resetTableListIndex(pStatus);
|
||||
code = resetTableListIndex(pStatus, pReader->idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
(void) tsdbReleaseReader(pReader);
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -5845,9 +5889,9 @@ int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_
|
|||
metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
|
||||
int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
code = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
metaReaderClear(&mr);
|
||||
return terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
*suid = 0;
|
||||
|
@ -5858,15 +5902,15 @@ int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_
|
|||
*suid = mr.me.ctbEntry.suid;
|
||||
code = metaReaderGetTableEntryByUidCache(&mr, *suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
code = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
metaReaderClear(&mr);
|
||||
return terrno;
|
||||
return code;
|
||||
}
|
||||
} else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing
|
||||
} else {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
metaReaderClear(&mr);
|
||||
return terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
|
|
|
@ -357,7 +357,7 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in
|
|||
void clearRowKey(SRowKey* pKey);
|
||||
|
||||
bool shouldFreePkBuf(SBlockLoadSuppInfo* pSupp);
|
||||
int32_t resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk);
|
||||
int32_t resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk, const char* id);
|
||||
void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree);
|
||||
void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk);
|
||||
|
||||
|
|
Loading…
Reference in New Issue