[td-225] add the stop check during scan files
This commit is contained in:
parent
5a448cd9a3
commit
9213011d18
|
@ -87,7 +87,6 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
|
|||
* @param id
|
||||
* @return
|
||||
*/
|
||||
//#define getResBufPage(buf, id) ((tFilePage*)((buf)->pBuf + (buf)->pageSize * (id)))
|
||||
static FORCE_INLINE tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
|
||||
if (id < pResultBuf->inMemPages) {
|
||||
return (tFilePage*) ((char*) pResultBuf->iBuf + id * pResultBuf->pageSize);
|
||||
|
|
|
@ -426,6 +426,7 @@ int tsdbUpdateFileHeader(SFile* pFile, uint32_t version);
|
|||
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
|
||||
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
|
||||
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
|
||||
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
|
||||
|
||||
// ------------------ tsdbRWHelper.c
|
||||
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
|
||||
|
|
|
@ -31,7 +31,6 @@ static int tsdbCommitMeta(STsdbRepo *pRepo);
|
|||
static void tsdbEndCommit(STsdbRepo *pRepo);
|
||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
|
||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
|
||||
static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
|
||||
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
|
||||
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
|
||||
|
||||
|
@ -544,7 +543,7 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) {
|
||||
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) {
|
||||
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
|
||||
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
|
||||
}
|
||||
|
|
|
@ -128,8 +128,7 @@ typedef struct STsdbQueryHandle {
|
|||
|
||||
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
|
||||
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
|
||||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
|
||||
SArray* sa);
|
||||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock);
|
||||
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
|
||||
STsdbQueryHandle* pQueryHandle);
|
||||
|
@ -695,7 +694,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
|
|||
}
|
||||
|
||||
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
|
||||
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn);
|
||||
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
||||
} else {
|
||||
/*
|
||||
* no data in cache, only load data from file
|
||||
|
@ -711,6 +710,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
|
|||
cur->mixBlock = false;
|
||||
cur->blockCompleted = true;
|
||||
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
|
||||
pCheckInfo->lastKey = cur->lastKey;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -734,7 +734,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
|||
cur->pos = 0;
|
||||
}
|
||||
|
||||
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn);
|
||||
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
||||
} else { // the whole block is loaded in to buffer
|
||||
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
|
||||
}
|
||||
|
@ -751,7 +751,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
|||
cur->pos = pBlock->numOfRows - 1;
|
||||
}
|
||||
|
||||
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn);
|
||||
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
||||
} else {
|
||||
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
|
||||
}
|
||||
|
@ -907,12 +907,12 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
|
|||
|
||||
pQueryHandle->cur.win.ekey = tsArray[end];
|
||||
pQueryHandle->cur.lastKey = tsArray[end] + step;
|
||||
|
||||
|
||||
return numOfRows + num;
|
||||
}
|
||||
|
||||
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row,
|
||||
STsdbMeta *pMeta, int32_t numOfCols, STable* pTable) {
|
||||
int32_t numOfCols, STable* pTable) {
|
||||
char* pData = NULL;
|
||||
|
||||
// the schema version info is embeded in SDataRow
|
||||
|
@ -973,8 +973,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
|
|||
|
||||
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
|
||||
// be included in the query time window will be discarded
|
||||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
|
||||
SArray* sa) {
|
||||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
|
||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
|
||||
|
||||
|
@ -987,7 +986,6 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
|
||||
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
|
||||
|
||||
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
|
||||
STable* pTable = pCheckInfo->pTableObj;
|
||||
|
||||
int32_t endPos = cur->pos;
|
||||
|
@ -1066,7 +1064,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
|
||||
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
||||
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, pMeta, numOfCols, pTable);
|
||||
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable);
|
||||
numOfRows += 1;
|
||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||
cur->win.skey = key;
|
||||
|
@ -1406,8 +1404,21 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
|
|||
|
||||
int32_t numOfBlocks = 0;
|
||||
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
|
||||
|
||||
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
|
||||
STimeWindow win = TSWINDOW_INITIALIZER;
|
||||
|
||||
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
|
||||
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fileId, &win.skey, &win.ekey);
|
||||
|
||||
// current file are not overlapped with query time window, ignore remain files
|
||||
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) ||
|
||||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) {
|
||||
tsdbDebug("%p remain files are not qualified for qrange:%"PRId64"-%"PRId64", ignore, %p", pQueryHandle, pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo)
|
||||
pQueryHandle->pFileGroup = NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
@ -1765,7 +1776,6 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
|||
win->skey = TSKEY_INITIAL_VAL;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
|
||||
STable* pTable = pCheckInfo->pTableObj;
|
||||
|
||||
do {
|
||||
|
@ -1787,7 +1797,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
|||
}
|
||||
|
||||
win->ekey = key;
|
||||
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable);
|
||||
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable);
|
||||
|
||||
if (++numOfRows >= maxRowsToRead) {
|
||||
moveToNextRowInMem(pCheckInfo);
|
||||
|
|
Loading…
Reference in New Issue