[td-225] fix bugs in descending traverse over in-men data
This commit is contained in:
parent
3cf5b7697b
commit
731707608c
|
@ -21,7 +21,7 @@
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "exception.h"
|
#include "exception.h"
|
||||||
|
|
||||||
#include "../../../query/inc/qast.h" // todo move to common module
|
#include "../../query/inc/qAst.h" // todo move to common module
|
||||||
#include "tlosertree.h"
|
#include "tlosertree.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "tsdbMain.h"
|
#include "tsdbMain.h"
|
||||||
|
@ -331,7 +331,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
|
||||||
tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
|
tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
|
||||||
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
|
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
|
||||||
} else {
|
} else {
|
||||||
tsdbDebug("%p uid:%" PRId64 ", tid:%d no data in mem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
|
tsdbDebug("%p uid:%"PRId64", tid:%d no data in mem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
|
||||||
|
pHandle->qinfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!imemEmpty) {
|
if (!imemEmpty) {
|
||||||
|
@ -343,7 +344,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
|
||||||
tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
|
tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
|
||||||
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
|
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
|
||||||
} else {
|
} else {
|
||||||
tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
|
tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
|
||||||
|
pHandle->qinfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -354,7 +356,7 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
|
||||||
tSkipListDestroyIter(pCheckInfo->iiter);
|
tSkipListDestroyIter(pCheckInfo->iiter);
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
|
SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order) {
|
||||||
SDataRow rmem = NULL, rimem = NULL;
|
SDataRow rmem = NULL, rimem = NULL;
|
||||||
if (pCheckInfo->iter) {
|
if (pCheckInfo->iter) {
|
||||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
||||||
|
@ -371,20 +373,35 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rmem != NULL && rimem != NULL) {
|
if (rmem != NULL && rimem != NULL) {
|
||||||
if (dataRowKey(rmem) < dataRowKey(rimem)) {
|
TSKEY r1 = dataRowKey(rmem);
|
||||||
pCheckInfo->chosen = 0;
|
TSKEY r2 = dataRowKey(rimem);
|
||||||
return rmem;
|
|
||||||
} else if (dataRowKey(rmem) == dataRowKey(rimem)) {
|
if (r1 == r2) { // data ts are duplicated, ignore the data in mem
|
||||||
// data ts are duplicated, ignore the data in mem
|
|
||||||
tSkipListIterNext(pCheckInfo->iter);
|
tSkipListIterNext(pCheckInfo->iter);
|
||||||
pCheckInfo->chosen = 1;
|
pCheckInfo->chosen = 1;
|
||||||
return rimem;
|
return rimem;
|
||||||
} else {
|
} else {
|
||||||
pCheckInfo->chosen = 1;
|
if (ASCENDING_TRAVERSE(order)) {
|
||||||
return rimem;
|
if (r1 < r2) {
|
||||||
|
pCheckInfo->chosen = 0;
|
||||||
|
return rmem;
|
||||||
|
} else {
|
||||||
|
pCheckInfo->chosen = 1;
|
||||||
|
return rimem;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (r1 < r2) {
|
||||||
|
pCheckInfo->chosen = 1;
|
||||||
|
return rimem;
|
||||||
|
} else {
|
||||||
|
pCheckInfo->chosen = 0;
|
||||||
|
return rmem;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// at least one (rmem or rimem) is absent here
|
||||||
if (rmem != NULL) {
|
if (rmem != NULL) {
|
||||||
pCheckInfo->chosen = 0;
|
pCheckInfo->chosen = 0;
|
||||||
return rmem;
|
return rmem;
|
||||||
|
@ -398,7 +415,7 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool moveToNextRow(STableCheckInfo* pCheckInfo) {
|
static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
|
||||||
bool hasNext = false;
|
bool hasNext = false;
|
||||||
if (pCheckInfo->chosen == 0) {
|
if (pCheckInfo->chosen == 0) {
|
||||||
if (pCheckInfo->iter != NULL) {
|
if (pCheckInfo->iter != NULL) {
|
||||||
|
@ -412,19 +429,17 @@ static bool moveToNextRow(STableCheckInfo* pCheckInfo) {
|
||||||
if (pCheckInfo->iiter != NULL) {
|
if (pCheckInfo->iiter != NULL) {
|
||||||
return tSkipListIterGet(pCheckInfo->iiter) != NULL;
|
return tSkipListIterGet(pCheckInfo->iiter) != NULL;
|
||||||
}
|
}
|
||||||
} else {
|
} else { //pCheckInfo->chosen == 1
|
||||||
if (pCheckInfo->chosen == 1) {
|
if (pCheckInfo->iiter != NULL) {
|
||||||
if (pCheckInfo->iiter != NULL) {
|
hasNext = tSkipListIterNext(pCheckInfo->iiter);
|
||||||
hasNext = tSkipListIterNext(pCheckInfo->iiter);
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (hasNext) {
|
if (hasNext) {
|
||||||
return hasNext;
|
return hasNext;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCheckInfo->iter != NULL) {
|
if (pCheckInfo->iter != NULL) {
|
||||||
return tSkipListIterGet(pCheckInfo->iter) != NULL;
|
return tSkipListIterGet(pCheckInfo->iter) != NULL;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -445,7 +460,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
|
||||||
initTableMemIterator(pHandle, pCheckInfo);
|
initTableMemIterator(pHandle, pCheckInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataRow row = getSDataRowInTableMem(pCheckInfo);
|
SDataRow row = getSDataRowInTableMem(pCheckInfo, pHandle->order);
|
||||||
if (row == NULL) {
|
if (row == NULL) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -650,7 +665,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
|
||||||
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
|
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
|
||||||
|
|
||||||
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
|
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
|
||||||
SDataRow row = getSDataRowInTableMem(pCheckInfo);
|
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
|
||||||
|
|
||||||
TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
|
TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
|
||||||
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1);
|
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1);
|
||||||
|
@ -1033,7 +1048,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
|
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
|
||||||
SSkipListNode* node = NULL;
|
SSkipListNode* node = NULL;
|
||||||
do {
|
do {
|
||||||
SDataRow row = getSDataRowInTableMem(pCheckInfo);
|
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
|
||||||
if (row == NULL) {
|
if (row == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1061,9 +1076,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
cur->lastKey = key + step;
|
cur->lastKey = key + step;
|
||||||
cur->mixBlock = true;
|
cur->mixBlock = true;
|
||||||
|
|
||||||
moveToNextRow(pCheckInfo);
|
moveToNextRowInMem(pCheckInfo);
|
||||||
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
|
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
|
||||||
moveToNextRow(pCheckInfo);
|
moveToNextRowInMem(pCheckInfo);
|
||||||
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||||
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
||||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||||
|
@ -1072,7 +1087,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
|
|
||||||
int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
|
int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
|
||||||
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
|
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
|
||||||
moveToNextRow(pCheckInfo);
|
moveToNextRowInMem(pCheckInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t start = -1;
|
int32_t start = -1;
|
||||||
|
@ -1754,7 +1769,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
||||||
STable* pTable = pCheckInfo->pTableObj;
|
STable* pTable = pCheckInfo->pTableObj;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
SDataRow row = getSDataRowInTableMem(pCheckInfo);
|
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
|
||||||
if (row == NULL) {
|
if (row == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1775,11 +1790,11 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
||||||
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable);
|
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable);
|
||||||
|
|
||||||
if (++numOfRows >= maxRowsToRead) {
|
if (++numOfRows >= maxRowsToRead) {
|
||||||
moveToNextRow(pCheckInfo);
|
moveToNextRowInMem(pCheckInfo);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
} while(moveToNextRow(pCheckInfo));
|
} while(moveToNextRowInMem(pCheckInfo));
|
||||||
|
|
||||||
assert(numOfRows <= maxRowsToRead);
|
assert(numOfRows <= maxRowsToRead);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue