fix: the block return from buildBlockFromFiles should be of new fileset

This commit is contained in:
slzhou 2023-12-14 16:57:04 +08:00
parent 26af397431
commit 4dc39e74c5
2 changed files with 114 additions and 15 deletions

View File

@ -76,6 +76,8 @@ static void updateComposedBlockInfo(STsdbReader* pReader, double el, ST
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static void resetProcMemTableListIndex(SReaderStatus* pStatus);
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
int32_t numOfCols) {
pSupInfo->smaValid = true;
@ -2565,7 +2567,9 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) {
}
if (pReader->status.bProcMemPreFileset) {
resetTableListIndex(&pReader->status);
uInfo("has mem preset");
pReader->status.procMemUidList.tableUidList = pReader->status.uidList.tableUidList;
resetProcMemTableListIndex(&pReader->status);
}
if (!pReader->status.bProcMemPreFileset) {
@ -2573,6 +2577,7 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) {
STsdReaderNotifyInfo info = {0};
info.duration.filesetId = fid;
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
uInfo("new duration notification. %d", fid);
}
}
@ -2643,6 +2648,14 @@ static void resetTableListIndex(SReaderStatus* pStatus) {
pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
}
static void resetProcMemTableListIndex(SReaderStatus* pStatus) {
STableUidList* pList = &pStatus->procMemUidList;
pList->currentIndex = 0;
uint64_t uid = pList->tableUidList[0];
pStatus->pProcMemTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
}
static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) {
pOrderedCheckInfo->currentIndex += 1;
if (pOrderedCheckInfo->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) {
@ -2655,6 +2668,18 @@ static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pSt
return (pStatus->pTableIter != NULL);
}
static bool moveToNextTablePreFileSet(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) {
pOrderedCheckInfo->currentIndex += 1;
if (pOrderedCheckInfo->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) {
pStatus->pProcMemTableIter = NULL;
return false;
}
uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
pStatus->pProcMemTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
return (pStatus->pProcMemTableIter != NULL);
}
static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader;
@ -2889,6 +2914,48 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code;
}
static int32_t buildBlockFromBufferPreFilesetSequentially(STsdbReader* pReader, int64_t endKey) {
SReaderStatus* pStatus = &pReader->status;
STableUidList* pUidList = &pStatus->procMemUidList;
tsdbDebug("seq load data blocks from cache, %s", pReader->idStr);
while (1) {
if (pReader->code != TSDB_CODE_SUCCESS) {
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
return pReader->code;
}
STableBlockScanInfo** pBlockScanInfo = pStatus->pProcMemTableIter;
if (pReader->pIgnoreTables &&
taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) {
bool hasNexTable = moveToNextTablePreFileSet(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
}
continue;
}
initMemDataIterator(*pBlockScanInfo, pReader);
initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost);
int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pReader->resBlockInfo.pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS;
}
// current table is exhausted, let's try next table
bool hasNexTable = moveToNextTablePreFileSet(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
}
}
}
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t endKey) {
SReaderStatus* pStatus = &pReader->status;
STableUidList* pUidList = &pStatus->uidList;
@ -4244,6 +4311,37 @@ _err:
return code;
}
static int32_t buildMemoryBlockPreFileset(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SReaderStatus* pStatus = &pReader->status;
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
int32_t fid = pReader->status.pCurrentFileset->fid;
STimeWindow win = {0};
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey;
code = buildBlockFromBufferPreFilesetSequentially(pReader, endKey);
uInfo("zsl mem block rows: %ld", pBlock->info.rows);
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) {
return code;
} else {
pStatus->bProcMemPreFileset = false;
if (pReader->notifyFn) {
STsdReaderNotifyInfo info = {0};
info.duration.filesetId = fid;
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
uInfo("new duration notification. mem pre fileset : %d", fid);
}
if (pStatus->pNextFileBlock && pStatus->pNextFileBlock->info.rows > 0) {
copyDataBlock(pBlock, pStatus->pNextFileBlock);
blockDataDestroy(pStatus->pNextFileBlock);
}
}
return code;
}
static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
int32_t code = TSDB_CODE_SUCCESS;
@ -4251,22 +4349,9 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
if (pStatus->loadFromFile) {
if (pStatus->bProcMemPreFileset) {
int32_t fid = pReader->status.pCurrentFileset->fid;
STimeWindow win = {0};
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey;
code = buildBlockFromBufferSequentially(pReader, endKey);
code = buildMemoryBlockPreFileset(pReader);
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) {
return code;
} else {
pStatus->bProcMemPreFileset = false;
if (pReader->notifyFn) {
STsdReaderNotifyInfo info = {0};
info.duration.filesetId = fid;
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
}
resetTableListIndex(pStatus);
}
}
@ -4274,6 +4359,17 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pStatus->bProcMemPreFileset && pBlock->info.rows > 0) {
pStatus->pNextFileBlock = createOneDataBlock(pBlock, true);
blockDataCleanup(pBlock);
code = buildMemoryBlockPreFileset(pReader);
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) {
return code;
}
}
uInfo("zsl file block rows %ld", pBlock->info.rows);
if (pBlock->info.rows <= 0) {
resetTableListIndex(&pReader->status);

View File

@ -216,6 +216,9 @@ typedef struct SReaderStatus {
int64_t prevFilesetStartKey;
int64_t prevFilesetEndKey;
bool bProcMemFirstFileset;
STableUidList procMemUidList;
STableBlockScanInfo** pProcMemTableIter;
SSDataBlock* pNextFileBlock;
} SReaderStatus;
struct STsdbReader {