commit
bc492a3f99
|
@ -1347,6 +1347,11 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
|
|||
if ((pQuery->limit.limit >= 0) && (pQuery->limit.limit + pQuery->limit.offset) <= numOfRes) {
|
||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
}
|
||||
|
||||
if (((pTableQInfo->lastKey > pTableQInfo->win.ekey) && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||
((pTableQInfo->lastKey < pTableQInfo->win.ekey) && (!QUERY_IS_ASC_QUERY(pQuery)))) {
|
||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -172,6 +172,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
|||
if (pQueryHandle == NULL) {
|
||||
goto out_of_memory;
|
||||
}
|
||||
|
||||
pQueryHandle->order = pCond->order;
|
||||
pQueryHandle->window = pCond->twindow;
|
||||
pQueryHandle->pTsdb = tsdb;
|
||||
|
@ -183,6 +184,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
|||
pQueryHandle->qinfo = qinfo;
|
||||
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
|
||||
pQueryHandle->allocSize = 0;
|
||||
pQueryHandle->locateStart = false;
|
||||
|
||||
if (tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) {
|
||||
goto out_of_memory;
|
||||
|
@ -193,6 +195,12 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
|||
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
|
||||
assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
|
||||
|
||||
if (ASCENDING_TRAVERSE(pCond->order)) {
|
||||
assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey);
|
||||
} else {
|
||||
assert(pQueryHandle->window.skey >= pQueryHandle->window.ekey);
|
||||
}
|
||||
|
||||
// allocate buffer in order to load data blocks from file
|
||||
int32_t numOfCols = pCond->numOfCols;
|
||||
|
||||
|
@ -244,6 +252,8 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
|||
info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
|
||||
|
||||
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
|
||||
tsdbDebug("%p check table uid:%"PRId64", tid:%d from lastKey:%"PRId64" %p", pQueryHandle, info.tableId.uid,
|
||||
info.tableId.tid, info.lastKey, qinfo);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1072,6 +1082,14 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
|
||||
TSKEY* tsArray = pCols->cols[0].pData;
|
||||
|
||||
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||
TSKEY s = tsArray[cur->pos];
|
||||
assert(s >= pQueryHandle->window.skey && s <= pQueryHandle->window.ekey);
|
||||
} else {
|
||||
TSKEY s = tsArray[cur->pos];
|
||||
assert(s <= pQueryHandle->window.skey && s >= pQueryHandle->window.ekey);
|
||||
}
|
||||
|
||||
// for search the endPos, so the order needs to reverse
|
||||
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
|
||||
|
||||
|
@ -1551,7 +1569,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
|
|||
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
|
||||
|
||||
// current block is done, try next
|
||||
if (!cur->mixBlock || cur->blockCompleted) {
|
||||
if ((!cur->mixBlock) || cur->blockCompleted) {
|
||||
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
||||
// all data blocks in current file has been checked already, try next file if exists
|
||||
|
@ -1570,6 +1588,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
tsdbDebug("%p continue in current data block, index:%d, %p", pQueryHandle, cur->slot, pQueryHandle->qinfo);
|
||||
handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
|
||||
*exists = pQueryHandle->realNumOfRows > 0;
|
||||
|
||||
|
|
Loading…
Reference in New Issue