TD-1439
This commit is contained in:
parent
760b308729
commit
aca8e3644b
|
@ -432,7 +432,7 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
|
|||
tSkipListDestroyIter(pCheckInfo->iiter);
|
||||
}
|
||||
|
||||
static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order) {
|
||||
static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) {
|
||||
SDataRow rmem = NULL, rimem = NULL;
|
||||
if (pCheckInfo->iter) {
|
||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
||||
|
@ -466,9 +466,15 @@ static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order
|
|||
TSKEY r2 = dataRowKey(rimem);
|
||||
|
||||
if (r1 == r2) { // data ts are duplicated, ignore the data in mem
|
||||
if (!update) {
|
||||
tSkipListIterNext(pCheckInfo->iter);
|
||||
pCheckInfo->chosen = 1;
|
||||
return rimem;
|
||||
} else {
|
||||
tSkipListIterNext(pCheckInfo->iiter);
|
||||
pCheckInfo->chosen = 0;
|
||||
return rmem;
|
||||
}
|
||||
} else {
|
||||
if (ASCENDING_TRAVERSE(order)) {
|
||||
if (r1 < r2) {
|
||||
|
@ -522,6 +528,7 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
|
|||
}
|
||||
|
||||
static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
|
||||
STsdbCfg *pCfg = &pHandle->pTsdb->config;
|
||||
size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
|
||||
assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
|
||||
pHandle->cur.fid = -1;
|
||||
|
@ -535,7 +542,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
|
|||
initTableMemIterator(pHandle, pCheckInfo);
|
||||
}
|
||||
|
||||
SDataRow row = getSDataRowInTableMem(pCheckInfo, pHandle->order);
|
||||
SDataRow row = getSDataRowInTableMem(pCheckInfo, pHandle->order, pCfg->update);
|
||||
if (row == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
@ -741,11 +748,12 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl
|
|||
|
||||
static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
|
||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
|
||||
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
|
||||
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
|
||||
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update);
|
||||
|
||||
assert(cur->pos >= 0 && cur->pos <= binfo.rows);
|
||||
|
||||
|
@ -1208,6 +1216,7 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl
|
|||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
|
||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
|
||||
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
|
||||
|
||||
initTableMemIterator(pQueryHandle, pCheckInfo);
|
||||
|
||||
|
@ -1256,7 +1265,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
|
||||
SSkipListNode* node = NULL;
|
||||
do {
|
||||
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
|
||||
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update);
|
||||
if (row == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -1286,7 +1295,22 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
|
||||
moveToNextRowInMem(pCheckInfo);
|
||||
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
|
||||
if (pCfg->update) {
|
||||
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable);
|
||||
numOfRows += 1;
|
||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||
cur->win.skey = key;
|
||||
}
|
||||
|
||||
cur->win.ekey = key;
|
||||
cur->lastKey = key + step;
|
||||
cur->mixBlock = true;
|
||||
|
||||
moveToNextRowInMem(pCheckInfo);
|
||||
pos += step;
|
||||
} else {
|
||||
moveToNextRowInMem(pCheckInfo);
|
||||
}
|
||||
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||
|
@ -1765,13 +1789,14 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
|||
STsdbQueryHandle* pQueryHandle) {
|
||||
int numOfRows = 0;
|
||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pQueryHandle->pColumns);
|
||||
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
|
||||
win->skey = TSKEY_INITIAL_VAL;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
STable* pTable = pCheckInfo->pTableObj;
|
||||
|
||||
do {
|
||||
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
|
||||
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update);
|
||||
if (row == NULL) {
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue