[td-225] enable block ts check.
This commit is contained in:
parent
df80c010a9
commit
f72183fea5
|
@ -831,6 +831,12 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
|
||||||
TSKEY* tsArray = pCols->cols[0].pData;
|
TSKEY* tsArray = pCols->cols[0].pData;
|
||||||
|
|
||||||
int32_t num = end - start + 1;
|
int32_t num = end - start + 1;
|
||||||
|
assert(num >= 0);
|
||||||
|
|
||||||
|
if (num == 0) {
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
|
int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
|
||||||
|
|
||||||
//data in buffer has greater timestamp, copy data in file block
|
//data in buffer has greater timestamp, copy data in file block
|
||||||
|
@ -973,7 +979,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
|
||||||
}
|
}
|
||||||
|
|
||||||
static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols) {
|
static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols) {
|
||||||
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
if (numOfRows == 0 || ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1022,6 +1028,26 @@ static void updateInfoAfterMerge(STsdbQueryHandle* pQueryHandle, STableCheckInfo
|
||||||
cur->pos = endPos;
|
cur->pos = endPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle) {
|
||||||
|
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||||
|
|
||||||
|
if (cur->rows > 0) {
|
||||||
|
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
|
assert(cur->win.skey >= pQueryHandle->window.skey && cur->win.ekey <= pQueryHandle->window.ekey);
|
||||||
|
} else {
|
||||||
|
assert(cur->win.skey >= pQueryHandle->window.ekey && cur->win.ekey <= pQueryHandle->window.skey);
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, 0);
|
||||||
|
assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows-1]);
|
||||||
|
} else {
|
||||||
|
cur->win = pQueryHandle->window;
|
||||||
|
|
||||||
|
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
|
||||||
|
cur->lastKey = pQueryHandle->window.ekey + step;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
|
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
|
||||||
// be included in the query time window will be discarded
|
// be included in the query time window will be discarded
|
||||||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
|
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
|
||||||
|
@ -1073,6 +1099,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
|
|
||||||
// the time window should always be right order: skey <= ekey
|
// the time window should always be right order: skey <= ekey
|
||||||
cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
|
cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
|
||||||
|
cur->lastKey = tsArray[endPos];
|
||||||
pos += (end - start + 1) * step;
|
pos += (end - start + 1) * step;
|
||||||
|
|
||||||
cur->blockCompleted =
|
cur->blockCompleted =
|
||||||
|
@ -1082,7 +1109,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
||||||
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
|
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
|
||||||
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
|
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
|
||||||
|
doCheckGeneratedBlockRange(pQueryHandle);
|
||||||
return;
|
return;
|
||||||
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
|
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
|
||||||
SSkipListNode* node = NULL;
|
SSkipListNode* node = NULL;
|
||||||
|
@ -1175,15 +1202,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
|
|
||||||
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
|
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
|
||||||
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
|
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
|
||||||
|
doCheckGeneratedBlockRange(pQueryHandle);
|
||||||
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
|
||||||
assert(cur->win.skey >= pQueryHandle->window.skey && cur->win.ekey <= pQueryHandle->window.ekey);
|
|
||||||
} else {
|
|
||||||
assert(cur->win.skey >= pQueryHandle->window.ekey && cur->win.ekey <= pQueryHandle->window.skey);
|
|
||||||
}
|
|
||||||
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, 0);
|
|
||||||
assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows-1]);
|
|
||||||
|
|
||||||
tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->win.skey,
|
tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->win.skey,
|
||||||
cur->win.ekey, cur->rows, pQueryHandle->qinfo);
|
cur->win.ekey, cur->rows, pQueryHandle->qinfo);
|
||||||
|
@ -2027,7 +2046,6 @@ typedef struct STableGroupSupporter {
|
||||||
int32_t numOfCols;
|
int32_t numOfCols;
|
||||||
SColIndex* pCols;
|
SColIndex* pCols;
|
||||||
STSchema* pTagSchema;
|
STSchema* pTagSchema;
|
||||||
// void* tsdbMeta;
|
|
||||||
} STableGroupSupporter;
|
} STableGroupSupporter;
|
||||||
|
|
||||||
int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
|
int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
|
||||||
|
|
Loading…
Reference in New Issue