[td-1334] refactor codes.
This commit is contained in:
parent
379d574d7d
commit
bcee3a4ae9
|
@ -737,6 +737,7 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p
|
||||||
static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end);
|
static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end);
|
||||||
static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols);
|
static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols);
|
||||||
static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle);
|
static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle);
|
||||||
|
static void copyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos);
|
||||||
|
|
||||||
static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
|
static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
|
||||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||||
|
@ -746,10 +747,11 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBloc
|
||||||
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
|
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
|
||||||
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
|
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
|
||||||
|
|
||||||
|
assert(cur->pos >= 0 && cur->pos <= binfo.rows);
|
||||||
|
|
||||||
TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
|
TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
|
||||||
tsdbDebug("%p key in mem:%"PRId64", %p", pQueryHandle, key, pQueryHandle->qinfo);
|
tsdbDebug("%p key in mem:%"PRId64", %p", pQueryHandle, key, pQueryHandle->qinfo);
|
||||||
|
|
||||||
|
|
||||||
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
|
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
|
||||||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
|
(!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
|
||||||
|
|
||||||
|
@ -797,45 +799,15 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBloc
|
||||||
cur->win = binfo.window;
|
cur->win = binfo.window;
|
||||||
cur->mixBlock = false;
|
cur->mixBlock = false;
|
||||||
cur->lastKey = ASCENDING_TRAVERSE(pQueryHandle->order)? (binfo.window.ekey + 1): (binfo.window.skey -1);
|
cur->lastKey = ASCENDING_TRAVERSE(pQueryHandle->order)? (binfo.window.ekey + 1): (binfo.window.skey -1);
|
||||||
} else {
|
} else { // partially copy to dest buffer
|
||||||
|
int32_t endPos = ASCENDING_TRAVERSE(pQueryHandle->order)? (binfo.rows - 1): 0;
|
||||||
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
|
copyRowsFromFileBlock(pQueryHandle, pCheckInfo, &binfo, endPos);
|
||||||
TSKEY* tsArray = pCols->cols[0].pData;
|
|
||||||
|
|
||||||
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
|
|
||||||
cur->pos >= 0 && cur->pos < pBlock->numOfRows &&
|
|
||||||
(tsArray[0] == binfo.window.skey && tsArray[binfo.rows - 1] == binfo.window.ekey));
|
|
||||||
|
|
||||||
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
|
||||||
cur->rows = binfo.rows - cur->pos;
|
|
||||||
|
|
||||||
cur->win.skey = tsArray[cur->pos];
|
|
||||||
cur->win.ekey = binfo.window.ekey;
|
|
||||||
|
|
||||||
cur->lastKey = binfo.window.ekey + 1;
|
|
||||||
int32_t numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, 0, cur->pos, binfo.rows-1);
|
|
||||||
assert(numOfRows == cur->rows);
|
|
||||||
} else {
|
|
||||||
cur->rows = cur->pos + 1;
|
|
||||||
|
|
||||||
cur->win.skey = binfo.window.skey;
|
|
||||||
cur->win.ekey = tsArray[cur->pos];
|
|
||||||
|
|
||||||
cur->lastKey = binfo.window.skey - 1;
|
|
||||||
int32_t numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, 0, 0, cur->pos);
|
|
||||||
assert(numOfRows == cur->rows);
|
|
||||||
|
|
||||||
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
|
|
||||||
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
|
|
||||||
doCheckGeneratedBlockRange(pQueryHandle);
|
|
||||||
}
|
|
||||||
|
|
||||||
pQueryHandle->realNumOfRows = cur->rows;
|
|
||||||
cur->mixBlock = true;
|
cur->mixBlock = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
cur->blockCompleted = true;
|
assert(cur->blockCompleted);
|
||||||
pCheckInfo->lastKey = cur->lastKey;
|
tsdbDebug("create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", %p",
|
||||||
|
cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pQueryHandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -957,7 +929,7 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
|
||||||
return midPos;
|
return midPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
|
int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
|
||||||
char* pData = NULL;
|
char* pData = NULL;
|
||||||
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
|
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
|
||||||
|
|
||||||
|
@ -1182,6 +1154,49 @@ static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void copyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) {
|
||||||
|
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||||
|
|
||||||
|
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
|
||||||
|
TSKEY* tsArray = pCols->cols[0].pData;
|
||||||
|
|
||||||
|
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
|
||||||
|
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
|
||||||
|
|
||||||
|
int32_t pos = cur->pos;
|
||||||
|
|
||||||
|
int32_t start = cur->pos;
|
||||||
|
int32_t end = endPos;
|
||||||
|
|
||||||
|
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
|
assert(start >= end);
|
||||||
|
SWAP(start, end, int32_t);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, 0, start, end);
|
||||||
|
|
||||||
|
// the time window should always be ascending order: skey <= ekey
|
||||||
|
cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
|
||||||
|
cur->mixBlock = (start == 0 && end == pBlockInfo->rows - 1);
|
||||||
|
cur->lastKey = tsArray[endPos] + step;
|
||||||
|
|
||||||
|
pos += endPos + step;
|
||||||
|
|
||||||
|
cur->blockCompleted =
|
||||||
|
(((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||||
|
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
|
||||||
|
|
||||||
|
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
||||||
|
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
|
||||||
|
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
|
||||||
|
doCheckGeneratedBlockRange(pQueryHandle);
|
||||||
|
|
||||||
|
tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %p",
|
||||||
|
pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->mixBlock, cur->win.skey,
|
||||||
|
cur->win.ekey, cur->rows, pQueryHandle->qinfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
|
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
|
||||||
// be included in the query time window will be discarded
|
// be included in the query time window will be discarded
|
||||||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
|
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
|
||||||
|
@ -1224,37 +1239,13 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
|
|
||||||
// compared with the data from in-memory buffer, to generate the correct timestamp array list
|
// compared with the data from in-memory buffer, to generate the correct timestamp array list
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
|
|
||||||
int32_t pos = cur->pos;
|
int32_t pos = cur->pos;
|
||||||
cur->win = TSWINDOW_INITIALIZER;
|
cur->win = TSWINDOW_INITIALIZER;
|
||||||
|
|
||||||
// no data in buffer, load data from file directly
|
// no data in buffer, load data from file directly
|
||||||
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
|
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
|
||||||
int32_t start = cur->pos;
|
copyRowsFromFileBlock(pQueryHandle, pCheckInfo, &blockInfo, endPos);
|
||||||
int32_t end = endPos;
|
|
||||||
|
|
||||||
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
|
||||||
SWAP(start, end, int32_t);
|
|
||||||
}
|
|
||||||
|
|
||||||
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
|
|
||||||
|
|
||||||
// the time window should always be right order: skey <= ekey
|
|
||||||
cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
|
|
||||||
cur->lastKey = tsArray[endPos];
|
|
||||||
pos += (end - start + 1) * step;
|
|
||||||
|
|
||||||
cur->blockCompleted =
|
|
||||||
(((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
|
||||||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
|
|
||||||
|
|
||||||
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
|
||||||
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
|
|
||||||
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
|
|
||||||
doCheckGeneratedBlockRange(pQueryHandle);
|
|
||||||
|
|
||||||
tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %p",
|
|
||||||
pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->mixBlock, cur->win.skey,
|
|
||||||
cur->win.ekey, cur->rows, pQueryHandle->qinfo);
|
|
||||||
return;
|
return;
|
||||||
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
|
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
|
||||||
SSkipListNode* node = NULL;
|
SSkipListNode* node = NULL;
|
||||||
|
@ -1703,7 +1694,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
|
||||||
return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
|
return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tsdbDebug("%p continue in current data block, index:%d, %p", pQueryHandle, cur->slot, pQueryHandle->qinfo);
|
tsdbDebug("%p continue in current data block, index:%d, pos:%d, %p", pQueryHandle, cur->slot, cur->pos, pQueryHandle->qinfo);
|
||||||
int32_t code = handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
|
int32_t code = handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
|
||||||
*exists = pQueryHandle->realNumOfRows > 0;
|
*exists = pQueryHandle->realNumOfRows > 0;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue