parent
e9c0e7d2f1
commit
379d574d7d
|
@ -734,6 +734,10 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end);
|
||||||
|
static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols);
|
||||||
|
static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle);
|
||||||
|
|
||||||
static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
|
static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
|
||||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||||
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
|
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
|
||||||
|
@ -745,7 +749,6 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBloc
|
||||||
TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
|
TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
|
||||||
tsdbDebug("%p key in mem:%"PRId64", %p", pQueryHandle, key, pQueryHandle->qinfo);
|
tsdbDebug("%p key in mem:%"PRId64", %p", pQueryHandle, key, pQueryHandle->qinfo);
|
||||||
|
|
||||||
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1);
|
|
||||||
|
|
||||||
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
|
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
|
||||||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
|
(!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
|
||||||
|
@ -784,14 +787,54 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBloc
|
||||||
*
|
*
|
||||||
* Here the buffer is not enough, so only part of file block can be loaded into memory buffer
|
* Here the buffer is not enough, so only part of file block can be loaded into memory buffer
|
||||||
*/
|
*/
|
||||||
assert(pQueryHandle->outputCapacity >= binfo.rows);
|
assert(pQueryHandle->outputCapacity >= binfo.rows);
|
||||||
pQueryHandle->realNumOfRows = binfo.rows;
|
|
||||||
|
if ((cur->pos == 0 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||||
|
(cur->pos == (binfo.rows - 1) && (!ASCENDING_TRAVERSE(pQueryHandle->order)))) {
|
||||||
|
pQueryHandle->realNumOfRows = binfo.rows;
|
||||||
|
|
||||||
|
cur->rows = binfo.rows;
|
||||||
|
cur->win = binfo.window;
|
||||||
|
cur->mixBlock = false;
|
||||||
|
cur->lastKey = ASCENDING_TRAVERSE(pQueryHandle->order)? (binfo.window.ekey + 1): (binfo.window.skey -1);
|
||||||
|
} else {
|
||||||
|
|
||||||
|
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
|
||||||
|
TSKEY* tsArray = pCols->cols[0].pData;
|
||||||
|
|
||||||
|
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
|
||||||
|
cur->pos >= 0 && cur->pos < pBlock->numOfRows &&
|
||||||
|
(tsArray[0] == binfo.window.skey && tsArray[binfo.rows - 1] == binfo.window.ekey));
|
||||||
|
|
||||||
|
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
|
cur->rows = binfo.rows - cur->pos;
|
||||||
|
|
||||||
|
cur->win.skey = tsArray[cur->pos];
|
||||||
|
cur->win.ekey = binfo.window.ekey;
|
||||||
|
|
||||||
|
cur->lastKey = binfo.window.ekey + 1;
|
||||||
|
int32_t numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, 0, cur->pos, binfo.rows-1);
|
||||||
|
assert(numOfRows == cur->rows);
|
||||||
|
} else {
|
||||||
|
cur->rows = cur->pos + 1;
|
||||||
|
|
||||||
|
cur->win.skey = binfo.window.skey;
|
||||||
|
cur->win.ekey = tsArray[cur->pos];
|
||||||
|
|
||||||
|
cur->lastKey = binfo.window.skey - 1;
|
||||||
|
int32_t numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, 0, 0, cur->pos);
|
||||||
|
assert(numOfRows == cur->rows);
|
||||||
|
|
||||||
|
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
|
||||||
|
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
|
||||||
|
doCheckGeneratedBlockRange(pQueryHandle);
|
||||||
|
}
|
||||||
|
|
||||||
|
pQueryHandle->realNumOfRows = cur->rows;
|
||||||
|
cur->mixBlock = true;
|
||||||
|
}
|
||||||
|
|
||||||
cur->rows = binfo.rows;
|
|
||||||
cur->win = binfo.window;
|
|
||||||
cur->mixBlock = false;
|
|
||||||
cur->blockCompleted = true;
|
cur->blockCompleted = true;
|
||||||
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
|
|
||||||
pCheckInfo->lastKey = cur->lastKey;
|
pCheckInfo->lastKey = cur->lastKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -823,6 +866,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBl
|
||||||
assert(pCheckInfo->lastKey <= pBlock->keyLast);
|
assert(pCheckInfo->lastKey <= pBlock->keyLast);
|
||||||
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
||||||
} else { // the whole block is loaded in to buffer
|
} else { // the whole block is loaded in to buffer
|
||||||
|
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(pBlock->numOfRows-1);
|
||||||
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
|
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
|
||||||
}
|
}
|
||||||
} else { //desc order, query ended in current block
|
} else { //desc order, query ended in current block
|
||||||
|
@ -842,6 +886,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBl
|
||||||
assert(pCheckInfo->lastKey >= pBlock->keyFirst);
|
assert(pCheckInfo->lastKey >= pBlock->keyFirst);
|
||||||
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
||||||
} else {
|
} else {
|
||||||
|
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(pBlock->numOfRows-1);
|
||||||
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
|
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue