fix(query): handle the multi-group limit/offset in table group merge scan/ multiway-merge executor.
This commit is contained in:
parent
e3248d0053
commit
c9a1b3ba01
|
@ -705,7 +705,8 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
|||
|
||||
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
|
||||
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
|
||||
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator);
|
||||
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
|
||||
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator);
|
||||
|
||||
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
|
||||
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
|
||||
|
|
|
@ -738,9 +738,7 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
|
|||
}
|
||||
|
||||
// reset the value for a new group data
|
||||
pLimitInfo->numOfOutputRows = 0;
|
||||
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
|
||||
|
||||
resetLimitInfoForNextGroup(pLimitInfo);
|
||||
// existing rows that belongs to previous group.
|
||||
if (pBlock->info.rows > 0) {
|
||||
return PROJECT_RETRIEVE_DONE;
|
||||
|
@ -766,7 +764,12 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
|
|||
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
|
||||
blockDataKeepFirstNRows(pBlock, keepRows);
|
||||
if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
setOperatorCompleted(pOperator);
|
||||
} else {
|
||||
// current group limitation is reached, and future blocks of this group need to be discarded.
|
||||
if (pBlock->info.rows == 0) {
|
||||
return PROJECT_RETRIEVE_CONTINUE;
|
||||
}
|
||||
}
|
||||
|
||||
return PROJECT_RETRIEVE_DONE;
|
||||
|
|
|
@ -1759,6 +1759,11 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit
|
|||
pLimitInfo->remainGroupOffset = slimit.offset;
|
||||
}
|
||||
|
||||
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) {
|
||||
pLimitInfo->numOfOutputRows = 0;
|
||||
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
|
||||
}
|
||||
|
||||
uint64_t tableListGetSize(const STableListInfo* pTableList) {
|
||||
ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map));
|
||||
return taosArrayGetSize(pTableList->pTableList);
|
||||
|
|
|
@ -175,8 +175,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
|
|||
|
||||
// reset the value for a new group data
|
||||
// existing rows that belongs to previous group.
|
||||
pLimitInfo->numOfOutputRows = 0;
|
||||
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
|
||||
resetLimitInfoForNextGroup(pLimitInfo);
|
||||
}
|
||||
|
||||
return PROJECT_RETRIEVE_DONE;
|
||||
|
@ -200,10 +199,18 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
|
|||
if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
|
||||
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
|
||||
blockDataKeepFirstNRows(pBlock, keepRows);
|
||||
|
||||
// TODO: optimize it later when partition by + limit
|
||||
// all retrieved requirement has been fulfilled, let's finish this
|
||||
if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) ||
|
||||
(pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
|
||||
setOperatorCompleted(pOperator);
|
||||
} else {
|
||||
// Even current group is done, there may be many vgroups remain existed, and we need to continue to retrieve data
|
||||
// from next group. So let's continue this retrieve process
|
||||
if (keepRows == 0) {
|
||||
return PROJECT_RETRIEVE_CONTINUE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -357,7 +364,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||
}
|
||||
|
||||
// printDataBlock1(p, "project");
|
||||
return (p->info.rows > 0) ? p : NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -257,7 +257,7 @@ static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlo
|
|||
}
|
||||
|
||||
// todo handle the slimit info
|
||||
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
|
||||
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
|
||||
SLimit* pLimit = &pLimitInfo->limit;
|
||||
const char* id = GET_TASKID(pTaskInfo);
|
||||
|
||||
|
@ -266,6 +266,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
|
|||
pLimitInfo->remainOffset -= pBlock->info.rows;
|
||||
blockDataEmpty(pBlock);
|
||||
qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
|
||||
return false;
|
||||
} else {
|
||||
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
|
||||
pLimitInfo->remainOffset = 0;
|
||||
|
@ -274,13 +275,14 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
|
|||
|
||||
if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
|
||||
// limit the output rows
|
||||
int32_t overflowRows = pLimitInfo->numOfOutputRows + pBlock->info.rows - pLimit->limit;
|
||||
int32_t keep = pBlock->info.rows - overflowRows;
|
||||
int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
|
||||
|
||||
blockDataKeepFirstNRows(pBlock, keep);
|
||||
qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
|
||||
|
@ -391,7 +393,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
|
|||
}
|
||||
}
|
||||
|
||||
applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator);
|
||||
bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator);
|
||||
if (limitReached) { // set operator flag is done
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
pCost->totalRows += pBlock->info.rows;
|
||||
pTableScanInfo->limitInfo.numOfOutputRows = pCost->totalRows;
|
||||
|
@ -768,8 +773,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
|
||||
// reset value for the next group data output
|
||||
pOperator->status = OP_OPENED;
|
||||
pInfo->base.limitInfo.numOfOutputRows = 0;
|
||||
pInfo->base.limitInfo.remainOffset = pInfo->base.limitInfo.limit.offset;
|
||||
resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
|
||||
|
||||
int32_t num = 0;
|
||||
STableKeyInfo* pList = NULL;
|
||||
|
@ -2685,9 +2689,12 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
taosArrayDestroy(pInfo->queryConds);
|
||||
pInfo->queryConds = NULL;
|
||||
|
||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// all data produced by this function only belongs to one group
|
||||
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
|
||||
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
|
||||
SOperatorInfo* pOperator) {
|
||||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
|
@ -2707,10 +2714,12 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
|
|||
}
|
||||
}
|
||||
|
||||
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows);
|
||||
applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo, pOperator);
|
||||
pInfo->limitInfo.numOfOutputRows += pResBlock->info.rows;
|
||||
|
||||
qDebug("%s get sorted row block, rows:%d, limit:%"PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
|
||||
pInfo->limitInfo.numOfOutputRows);
|
||||
|
||||
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
||||
}
|
||||
|
||||
|
@ -2749,11 +2758,13 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
|||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
return pBlock;
|
||||
} else {
|
||||
// Data of this group are all dumped, let's try the next group
|
||||
stopGroupTableMergeScan(pOperator);
|
||||
if (pInfo->tableEndIndex >= tableListSize - 1) {
|
||||
setOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
||||
pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId;
|
||||
startGroupTableMergeScan(pOperator);
|
||||
|
|
|
@ -680,11 +680,13 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
break;
|
||||
}
|
||||
|
||||
bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator);
|
||||
if (limitReached) {
|
||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||
}
|
||||
|
||||
if (p->info.rows > 0) {
|
||||
applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator);
|
||||
if (p->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -698,7 +700,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
|
||||
}
|
||||
|
||||
pInfo->limitInfo.numOfOutputRows += p->info.rows;
|
||||
pDataBlock->info.rows = p->info.rows;
|
||||
pDataBlock->info.id.groupId = pInfo->groupId;
|
||||
pDataBlock->info.dataLoad = 1;
|
||||
|
|
Loading…
Reference in New Issue