refactor: remove the invalid return value.

This commit is contained in:
Haojun Liao 2024-07-16 15:49:08 +08:00
parent dd590f36e2
commit d2cb53ab3d
3 changed files with 145 additions and 76 deletions

View File

@ -679,7 +679,12 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S
cleanupInfoForNextFileset(pReader->status.pTableMap);
initBrinRecordIter(&iter, pReader->pFileReader, pIndexList);
while (((pRecord = getNextBrinRecord(&iter)) != NULL)) {
while (1) {
int32_t code = getNextBrinRecord(&iter, &pRecord);
if (code != TSDB_CODE_SUCCESS) {
break;
}
if (pRecord->suid > pReader->info.suid) {
break;
}
@ -707,7 +712,11 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S
ASSERT(pRecord->suid == pReader->info.suid && uid == pRecord->uid);
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
STableBlockScanInfo* pScanInfo = NULL;
code = getTableBlockScanInfo(pReader->status.pTableMap, uid, &pScanInfo, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// here we should find the first timestamp that is greater than the lastProcKey
// the window is an open interval NOW.
@ -1409,8 +1418,10 @@ static bool getNeighborBlockOfTable(SDataBlockIter* pBlockIter, SFileDataBlockIn
static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index,
int32_t step) {
int32_t code = TSDB_CODE_SUCCESS;
if (index < 0 || index >= pBlockIter->numOfBlocks) {
return -1;
code = -1;
return code;
}
SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
@ -1421,8 +1432,12 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
for (int32_t i = index - 1; i >= pBlockIter->index; --i) {
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i);
STableBlockScanInfo* pBlockScanInfo =
getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
STableBlockScanInfo* pBlockScanInfo = NULL;
code = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, &pBlockScanInfo, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx);
pTableDataBlockIdx->globalIndex = i + 1;
@ -1432,8 +1447,12 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
for (int32_t i = index + 1; i <= pBlockIter->index; ++i) {
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i);
STableBlockScanInfo* pBlockScanInfo =
getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
STableBlockScanInfo* pBlockScanInfo = NULL;
code = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, &pBlockScanInfo, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx);
pTableDataBlockIdx->globalIndex = i - 1;
@ -1442,7 +1461,12 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
}
taosArraySet(pBlockIter->blockList, pBlockIter->index, &fblock);
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, fblock.uid, pReader->idStr);
STableBlockScanInfo* pBlockScanInfo = NULL;
code = getTableBlockScanInfo(pReader->status.pTableMap, fblock.uid, &pBlockScanInfo, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, fblock.tbBlockIdx);
pTableDataBlockIdx->globalIndex = pBlockIter->index;
}
@ -2555,9 +2579,9 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
return code;
}
STableBlockScanInfo* pBlockScanInfo =
getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
if (pBlockScanInfo == NULL) {
STableBlockScanInfo* pBlockScanInfo = NULL;
code = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, &pBlockScanInfo, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
@ -2896,6 +2920,7 @@ static int32_t buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScan
tsdbDebug("%p uid:%" PRId64 " return clean stt block as one, brange:%" PRId64 "-%" PRId64 " rows:%" PRId64 " %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, pReader->idStr);
return code;
}
static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo,
@ -3063,9 +3088,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return pReader->code;
}
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
if (pScanInfo == NULL) {
return terrno;
code = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, &pScanInfo, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) {
@ -4206,6 +4231,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t num) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t size = tSimpleHashGetSize(pReader->status.pTableMap);
STableBlockScanInfo** p = NULL;
@ -4216,7 +4242,7 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t
}
if (size < num) {
int32_t code = ensureBlockScanInfoBuf(&pReader->blockInfoBuf, num);
code = ensureBlockScanInfoBuf(&pReader->blockInfoBuf, num);
if (code) {
return code;
}
@ -4237,8 +4263,16 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t
for (int32_t i = 0; i < num; ++i) {
pUidList->tableUidList[i] = pList[i].uid;
STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
initTableBlockScanInfo(pInfo, pList[i].uid, pReader->status.pTableMap, pReader);
STableBlockScanInfo* pInfo = NULL;
code = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = initTableBlockScanInfo(pInfo, pList[i].uid, pReader->status.pTableMap, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
return TDB_CODE_SUCCESS;
@ -4412,11 +4446,10 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
}
STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader;
pReader->status.pTableMap =
createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, &pReader->status.uidList, numOfTables);
if (pReader->status.pTableMap == NULL) {
code = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, &pReader->status.uidList, numOfTables, &pReader->status.pTableMap);
if (code != TSDB_CODE_SUCCESS) {
*ppReader = NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
@ -5067,40 +5100,40 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
return code;
}
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
static int32_t doRetrieveDataBlock(STsdbReader* pReader, SSDataBlock** pBlock) {
SReaderStatus* pStatus = &pReader->status;
int32_t code = TSDB_CODE_SUCCESS;
SFileDataBlockInfo* pBlockInfo = NULL;
*pBlock = NULL;
code = getCurrentBlockInfo(&pStatus->blockIter, &pBlockInfo);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
return code;
}
if (pReader->code != TSDB_CODE_SUCCESS) {
return NULL;
return pReader->code;
}
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
if (pBlockScanInfo == NULL) {
return NULL;
STableBlockScanInfo* pBlockScanInfo = NULL;
code = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, &pBlockScanInfo, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
if (code != TSDB_CODE_SUCCESS) {
tBlockDataReset(&pStatus->fileBlockData);
terrno = code;
return NULL;
return code;
}
code = copyBlockDataToSDataBlock(pReader, &pBlockScanInfo->lastProcKey);
if (code != TSDB_CODE_SUCCESS) {
tBlockDataReset(&pStatus->fileBlockData);
terrno = code;
return NULL;
}
return pReader->resBlockInfo.pResBlock;
*pBlock = pReader->resBlockInfo.pResBlock;
return code;
}
SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) {
@ -5121,7 +5154,8 @@ SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) {
return pTReader->resBlockInfo.pResBlock;
}
SSDataBlock* ret = doRetrieveDataBlock(pTReader);
SSDataBlock* ret = NULL;
doRetrieveDataBlock(pTReader, &ret);
qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader);
tsdbReleaseReader(pReader);

View File

@ -112,22 +112,28 @@ void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
taosArrayDestroy(pBuf->pData);
}
void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
int32_t getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index, STableBlockScanInfo** pInfo) {
*pInfo = NULL;
int32_t bucketIndex = index / pBuf->numPerBucket;
char** pBucket = taosArrayGet(pBuf->pData, bucketIndex);
return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}
STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id) {
STableBlockScanInfo** p = tSimpleHashGet(pTableMap, &uid, sizeof(uid));
if (p == NULL || *p == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
int32_t size = tSimpleHashGetSize(pTableMap);
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id);
return NULL;
if (pBucket == NULL) {
return TSDB_CODE_FAILED;
}
return *p;
*pInfo = (STableBlockScanInfo*)((*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo));
return TSDB_CODE_SUCCESS;
}
int32_t getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, STableBlockScanInfo** pInfo, const char* id) {
*pInfo = tSimpleHashGet(pTableMap, &uid, sizeof(uid));
if (pInfo == NULL || *pInfo == NULL) {
int32_t size = tSimpleHashGetSize(pTableMap);
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id);
return TSDB_CODE_INVALID_PARA;
}
return TSDB_CODE_SUCCESS;
}
int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc) {
@ -245,20 +251,27 @@ int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSH
initLastProcKey(pScanInfo, pReader);
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
int32_t code = tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pReader, pScanInfo->uid,
pScanInfo->lastProcKey.ts, pReader->idStr);
return TSDB_CODE_SUCCESS;
return code;
}
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
STableUidList* pUidList, int32_t numOfTables) {
int32_t createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
STableUidList* pUidList, int32_t numOfTables, SSHashObj** pHashObj) {
int32_t code = 0;
*pHashObj = NULL;
// allocate buffer in order to load data blocks from file
// todo use simple hash instead, optimize the memory consumption
SSHashObj* pTableMap = tSimpleHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (pTableMap == NULL) {
return NULL;
return terrno;
}
int64_t st = taosGetTimestampUs();
@ -267,7 +280,7 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
pUidList->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
if (pUidList->tableUidList == NULL) {
tSimpleHashCleanup(pTableMap);
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
pUidList->currentIndex = 0;
@ -275,8 +288,16 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
for (int32_t j = 0; j < numOfTables; ++j) {
pUidList->tableUidList[j] = idList[j].uid;
STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
initTableBlockScanInfo(pScanInfo, idList[j].uid, pTableMap, pTsdbReader);
STableBlockScanInfo* pScanInfo = NULL;
code = getPosInBlockInfoBuf(pBuf, j, &pScanInfo);
if (code != TSDB_CODE_SUCCESS) {
break;
}
code = initTableBlockScanInfo(pScanInfo, idList[j].uid, pTableMap, pTsdbReader);
if (code != TSDB_CODE_SUCCESS) {
break;
}
}
taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
@ -286,7 +307,8 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
(sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
pTsdbReader->idStr);
return pTableMap;
*pHashObj = pTableMap;
return code;
}
void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
@ -391,11 +413,13 @@ void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray
pIter->pBrinBlockList = pList;
}
SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) {
int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord) {
*pRecord = NULL;
if (pIter->blockIndex == -1 || (pIter->recordIndex + 1) >= pIter->block.numOfRecords) {
pIter->blockIndex += 1;
if (pIter->blockIndex >= taosArrayGetSize(pIter->pBrinBlockList)) {
return NULL;
return TSDB_CODE_FAILED;
}
pIter->pCurrentBlk = taosArrayGet(pIter->pBrinBlockList, pIter->blockIndex);
@ -404,7 +428,7 @@ SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) {
int32_t code = tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to read brinBlock from file, code:%s", tstrerror(code));
return NULL;
return TSDB_CODE_FAILED;
}
pIter->recordIndex = -1;
@ -412,7 +436,9 @@ SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) {
pIter->recordIndex += 1;
tBrinBlockGet(&pIter->block, pIter->recordIndex, &pIter->record);
return &pIter->record;
*pRecord = &pIter->record;
return TSDB_CODE_SUCCESS;
}
void clearBrinBlockIter(SBrinRecordIter* pIter) { tBrinBlockDestroy(&pIter->block); }
@ -670,7 +696,13 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_
STombRecord record = {0};
uint64_t uid = pReader->status.uidList.tableUidList[*j];
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
STableBlockScanInfo* pScanInfo = NULL;
code = getTableBlockScanInfo(pReader->status.pTableMap, uid, &pScanInfo, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pScanInfo->pFileDelData == NULL) {
pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData));
}
@ -702,7 +734,11 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_
}
uid = pReader->status.uidList.tableUidList[*j];
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
code = getTableBlockScanInfo(pReader->status.pTableMap, uid, &pScanInfo, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pScanInfo->pFileDelData == NULL) {
pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData));
}

View File

@ -315,11 +315,10 @@ typedef struct SBrinRecordIter {
} SBrinRecordIter;
int32_t uidComparFunc(const void* p1, const void* p2);
int32_t getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, STableBlockScanInfo** pInfo, const char* id);
STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id);
SSHashObj* createDataBlockScanInfo(STsdbReader* pReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
STableUidList* pUidList, int32_t numOfTables);
int32_t createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
STableUidList* pUidList, int32_t numOfTables, SSHashObj** pHashObj);
int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSHashObj* pTableMap,
STsdbReader* pReader);
void clearBlockScanInfo(STableBlockScanInfo* p);
@ -328,11 +327,11 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t s
void cleanupInfoForNextFileset(SSHashObj* pTableMap);
int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables);
void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf);
void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index);
int32_t getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index, STableBlockScanInfo** pRes);
// brin records iterator
void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList);
SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter);
int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord);
void clearBrinBlockIter(SBrinRecordIter* pIter);
// initialize block iterator API