enhance: tsdb output duration order

This commit is contained in:
shenglian zhou 2023-11-28 14:39:08 +08:00
parent cdb7eb462d
commit 89cd2b6f17
2 changed files with 78 additions and 21 deletions

View File

@ -58,6 +58,7 @@ static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbRea
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
static void resetTableListIndex(SReaderStatus* pStatus);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
@ -241,11 +242,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->info.window.skey, tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->info.window.skey,
pReader->info.window.ekey, pReader->idStr); pReader->info.window.ekey, pReader->idStr);
if (pReader->notifyFn) {
STsdReaderNotifyInfo info = {0};
info.duration.fileSetId = fid;
pReader->notifyFn(TSD_READER_NOTIFY_DURATION, &info, pReader->notifyParam);
}
*hasNext = true; *hasNext = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -436,6 +433,8 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
goto _end; goto _end;
} }
pReader->bDurationOrder = true;
tsdbInitReaderLock(pReader); tsdbInitReaderLock(pReader);
*ppReader = pReader; *ppReader = pReader;
@ -2550,6 +2549,8 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
} }
if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) { if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) {
pReader->status.bProcMemPreFileset = true;
resetTableListIndex(&pReader->status);
break; break;
} }
} }
@ -2931,7 +2932,7 @@ static int32_t readRowsCountFromMem(STsdbReader* pReader) {
return code; return code;
} }
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t endKey) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
STableUidList* pUidList = &pStatus->uidList; STableUidList* pUidList = &pStatus->uidList;
@ -2948,13 +2949,12 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
if (!hasNexTable) { if (!hasNexTable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pBlockScanInfo = pStatus->pTableIter; continue;
} }
initMemDataIterator(*pBlockScanInfo, pReader); initMemDataIterator(*pBlockScanInfo, pReader);
initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost); initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost);
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
@ -4350,6 +4350,70 @@ static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
return pBlock->info.rows > 0; return pBlock->info.rows > 0;
} }
static int32_t doTsdbNextDataBlockDurationOrder(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
if (pStatus->loadFromFile) {
if (pStatus->bProcMemPreFileset) {
int32_t fid = pReader->status.pCurrentFileset->fid;
STimeWindow win = {0};
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
code = buildBlockFromBufferSequentially(pReader, win.skey);
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) {
return code;
} else {
pStatus->bProcMemPreFileset = false;
if (pReader->notifyFn) {
STsdReaderNotifyInfo info = {0};
info.duration.fileSetId = fid;
pReader->notifyFn(TSD_READER_NOTIFY_DURATION, &info, pReader->notifyParam);
}
}
}
code = buildBlockFromFiles(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pBlock->info.rows <= 0) {
resetTableListIndex(&pReader->status);
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
code = buildBlockFromBufferSequentially(pReader, endKey);
}
} else { // no data in files, let's try the buffer
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
code = buildBlockFromBufferSequentially(pReader, endKey);
}
return code;
}
static int32_t doTsdbNextDataBlockFilesFirst(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
if (pStatus->loadFromFile) {
code = buildBlockFromFiles(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pBlock->info.rows <= 0) {
resetTableListIndex(&pReader->status);
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
code = buildBlockFromBufferSequentially(pReader, endKey);
}
} else { // no data in files, let's try the buffer
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
code = buildBlockFromBufferSequentially(pReader, endKey);
}
return code;
}
static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -4367,19 +4431,10 @@ static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
if (READ_MODE_COUNT_ONLY == pReader->info.readMode) { if (READ_MODE_COUNT_ONLY == pReader->info.readMode) {
return tsdbReadRowsCountOnly(pReader); return tsdbReadRowsCountOnly(pReader);
} }
if (!pReader->bDurationOrder) {
if (pStatus->loadFromFile) { code = doTsdbNextDataBlockFilesFirst(pReader);
code = buildBlockFromFiles(pReader); } else {
if (code != TSDB_CODE_SUCCESS) { code = doTsdbNextDataBlockDurationOrder(pReader);
return code;
}
if (pBlock->info.rows <= 0) {
resetTableListIndex(&pReader->status);
code = buildBlockFromBufferSequentially(pReader);
}
} else { // no data in files, let's try the buffer
code = buildBlockFromBufferSequentially(pReader);
} }
*hasNext = pBlock->info.rows > 0; *hasNext = pBlock->info.rows > 0;

View File

@ -206,6 +206,7 @@ typedef struct SReaderStatus {
SArray* pLDataIterArray; SArray* pLDataIterArray;
SRowMerger merger; SRowMerger merger;
SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data
bool bProcMemPreFileset;
} SReaderStatus; } SReaderStatus;
struct STsdbReader { struct STsdbReader {
@ -228,6 +229,7 @@ struct STsdbReader {
SBlockInfoBuf blockInfoBuf; SBlockInfoBuf blockInfoBuf;
EContentData step; EContentData step;
STsdbReader* innerReader[2]; STsdbReader* innerReader[2];
bool bDurationOrder; // duration by duration output
TsdReaderNotifyCbFn notifyFn; TsdReaderNotifyCbFn notifyFn;
void* notifyParam; void* notifyParam;
}; };