From f72183fea5d047adfc3e9e95bc667f79fc8b065a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Jul 2020 14:35:49 +0800 Subject: [PATCH] [td-225] enable block ts check. --- src/tsdb/src/tsdbRead.c | 42 +++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 1d15860912..7538c6f7e1 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -831,6 +831,12 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap TSKEY* tsArray = pCols->cols[0].pData; int32_t num = end - start + 1; + assert(num >= 0); + + if (num == 0) { + return numOfRows; + } + int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); //data in buffer has greater timestamp, copy data in file block @@ -973,7 +979,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, } static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols) { - if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + if (numOfRows == 0 || ASCENDING_TRAVERSE(pQueryHandle->order)) { return; } @@ -1022,6 +1028,26 @@ static void updateInfoAfterMerge(STsdbQueryHandle* pQueryHandle, STableCheckInfo cur->pos = endPos; } +static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle) { + SQueryFilePos* cur = &pQueryHandle->cur; + + if (cur->rows > 0) { + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + assert(cur->win.skey >= pQueryHandle->window.skey && cur->win.ekey <= pQueryHandle->window.ekey); + } else { + assert(cur->win.skey >= pQueryHandle->window.ekey && cur->win.ekey <= pQueryHandle->window.skey); + } + + SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, 0); + assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows-1]); + } else { + cur->win = pQueryHandle->window; + + int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; + cur->lastKey = pQueryHandle->window.ekey + step; + } +} + // only return the qualified data to client in terms of query time window, data rows in the same block but do not // be included in the query time window will be discarded static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) { @@ -1073,6 +1099,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* // the time window should always be right order: skey <= ekey cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]}; + cur->lastKey = tsArray[endPos]; pos += (end - start + 1) * step; cur->blockCompleted = @@ -1082,7 +1109,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* // if the buffer is not full in case of descending order query, move the data in the front of the buffer moveDataToFront(pQueryHandle, numOfRows, numOfCols); updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos); - + doCheckGeneratedBlockRange(pQueryHandle); return; } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { SSkipListNode* node = NULL; @@ -1175,15 +1202,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* moveDataToFront(pQueryHandle, numOfRows, numOfCols); updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos); - - if (ASCENDING_TRAVERSE(pQueryHandle->order)) { - assert(cur->win.skey >= pQueryHandle->window.skey && cur->win.ekey <= pQueryHandle->window.ekey); - } else { - assert(cur->win.skey >= pQueryHandle->window.ekey && cur->win.ekey <= pQueryHandle->window.skey); - } - - SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, 0); - assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows-1]); + doCheckGeneratedBlockRange(pQueryHandle); tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->win.skey, cur->win.ekey, cur->rows, pQueryHandle->qinfo); @@ -2027,7 +2046,6 @@ typedef struct STableGroupSupporter { int32_t numOfCols; SColIndex* pCols; STSchema* pTagSchema; -// void* tsdbMeta; } STableGroupSupporter; int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {