enhance: refactor code

This commit is contained in:
slzhou 2023-12-15 08:23:25 +08:00
parent 4dc39e74c5
commit 5bf364cc0a
2 changed files with 29 additions and 28 deletions

View File

@ -76,7 +76,7 @@ static void updateComposedBlockInfo(STsdbReader* pReader, double el, ST
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static void resetProcMemTableListIndex(SReaderStatus* pStatus);
static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus);
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
int32_t numOfCols) {
@ -2567,9 +2567,9 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) {
}
if (pReader->status.bProcMemPreFileset) {
uInfo("has mem preset");
tsdbDebug("will start pre-fileset %d buffer processing. %s", fid, pReader->idStr);
pReader->status.procMemUidList.tableUidList = pReader->status.uidList.tableUidList;
resetProcMemTableListIndex(&pReader->status);
resetPreFilesetMemTableListIndex(&pReader->status);
}
if (!pReader->status.bProcMemPreFileset) {
@ -2577,7 +2577,7 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) {
STsdReaderNotifyInfo info = {0};
info.duration.filesetId = fid;
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
uInfo("new duration notification. %d", fid);
tsdbDebug("new duration %d start notification when no buffer preceeding fileset, %s", fid, pReader->idStr);
}
}
@ -2648,7 +2648,7 @@ static void resetTableListIndex(SReaderStatus* pStatus) {
pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
}
static void resetProcMemTableListIndex(SReaderStatus* pStatus) {
static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus) {
STableUidList* pList = &pStatus->procMemUidList;
pList->currentIndex = 0;
@ -2668,14 +2668,15 @@ static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pSt
return (pStatus->pTableIter != NULL);
}
static bool moveToNextTablePreFileSet(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) {
pOrderedCheckInfo->currentIndex += 1;
if (pOrderedCheckInfo->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) {
static bool moveToNextTableForPreFileSetMem(SReaderStatus* pStatus) {
STableUidList* pUidList = &pStatus->procMemUidList;
pUidList->currentIndex += 1;
if (pUidList->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) {
pStatus->pProcMemTableIter = NULL;
return false;
}
uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
uint64_t uid = pUidList->tableUidList[pUidList->currentIndex];
pStatus->pProcMemTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
return (pStatus->pProcMemTableIter != NULL);
}
@ -2914,11 +2915,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code;
}
static int32_t buildBlockFromBufferPreFilesetSequentially(STsdbReader* pReader, int64_t endKey) {
static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_t endKey) {
SReaderStatus* pStatus = &pReader->status;
STableUidList* pUidList = &pStatus->procMemUidList;
tsdbDebug("seq load data blocks from cache, %s", pReader->idStr);
tsdbDebug("seq load data blocks from cache that preceeds fileset %d, %s", pReader->status.pCurrentFileset->fid, pReader->idStr);
while (1) {
if (pReader->code != TSDB_CODE_SUCCESS) {
@ -2929,7 +2929,7 @@ static int32_t buildBlockFromBufferPreFilesetSequentially(STsdbReader* pReader,
STableBlockScanInfo** pBlockScanInfo = pStatus->pProcMemTableIter;
if (pReader->pIgnoreTables &&
taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) {
bool hasNexTable = moveToNextTablePreFileSet(pUidList, pStatus);
bool hasNexTable = moveToNextTableForPreFileSetMem(pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
}
@ -2949,7 +2949,7 @@ static int32_t buildBlockFromBufferPreFilesetSequentially(STsdbReader* pReader,
}
// current table is exhausted, let's try next table
bool hasNexTable = moveToNextTablePreFileSet(pUidList, pStatus);
bool hasNexTable = moveToNextTableForPreFileSetMem(pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
}
@ -4311,7 +4311,7 @@ _err:
return code;
}
static int32_t buildMemoryBlockPreFileset(STsdbReader* pReader) {
static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SReaderStatus* pStatus = &pReader->status;
@ -4322,21 +4322,22 @@ static int32_t buildMemoryBlockPreFileset(STsdbReader* pReader) {
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey;
code = buildBlockFromBufferPreFilesetSequentially(pReader, endKey);
uInfo("zsl mem block rows: %ld", pBlock->info.rows);
code = buildBlockFromBufferSeqForPreFileset(pReader, endKey);
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) {
return code;
} else {
tsdbDebug("finished pre-fileset %d buffer processing. %s", fid, pReader->idStr);
pStatus->bProcMemPreFileset = false;
if (pReader->notifyFn) {
STsdReaderNotifyInfo info = {0};
info.duration.filesetId = fid;
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
uInfo("new duration notification. mem pre fileset : %d", fid);
tsdbDebug("new duration %d start notification when no buffer preceeding fileset, %s", fid, pReader->idStr);
}
if (pStatus->pNextFileBlock && pStatus->pNextFileBlock->info.rows > 0) {
copyDataBlock(pBlock, pStatus->pNextFileBlock);
blockDataDestroy(pStatus->pNextFileBlock);
if (pStatus->pNextFilesetBlock && pStatus->pNextFilesetBlock->info.rows > 0) {
tsdbDebug("return the saved block from fileset %d files, %s", fid, pReader->idStr);
copyDataBlock(pBlock, pStatus->pNextFilesetBlock);
blockDataDestroy(pStatus->pNextFilesetBlock);
}
}
return code;
@ -4349,7 +4350,7 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
if (pStatus->loadFromFile) {
if (pStatus->bProcMemPreFileset) {
code = buildMemoryBlockPreFileset(pReader);
code = buildFromPreFilesetBuffer(pReader);
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) {
return code;
}
@ -4359,18 +4360,16 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pStatus->bProcMemPreFileset && pBlock->info.rows > 0) {
pStatus->pNextFileBlock = createOneDataBlock(pBlock, true);
pStatus->pNextFilesetBlock = createOneDataBlock(pBlock, true);
blockDataCleanup(pBlock);
code = buildMemoryBlockPreFileset(pReader);
code = buildFromPreFilesetBuffer(pReader);
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) {
return code;
}
}
uInfo("zsl file block rows %ld", pBlock->info.rows);
if (pBlock->info.rows <= 0) {
resetTableListIndex(&pReader->status);
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;

View File

@ -210,6 +210,8 @@ typedef struct SReaderStatus {
SArray* pLDataIterArray;
SRowMerger merger;
SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data
// the following for preceeds fileset memory processing
// TODO: refactor into seperate struct
bool bProcMemPreFileset;
int64_t memTableMaxKey;
int64_t memTableMinKey;
@ -218,7 +220,7 @@ typedef struct SReaderStatus {
bool bProcMemFirstFileset;
STableUidList procMemUidList;
STableBlockScanInfo** pProcMemTableIter;
SSDataBlock* pNextFileBlock;
SSDataBlock* pNextFilesetBlock;
} SReaderStatus;
struct STsdbReader {