enh(query): add check for limit reach status.
This commit is contained in:
parent
89b1843382
commit
24cdf0e758
|
@ -190,6 +190,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
|
||||||
return PROJECT_RETRIEVE_DONE;
|
return PROJECT_RETRIEVE_DONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo refactor
|
||||||
static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock,
|
static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock,
|
||||||
SOperatorInfo* pOperator) {
|
SOperatorInfo* pOperator) {
|
||||||
// set current group id
|
// set current group id
|
||||||
|
|
|
@ -617,6 +617,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// process this data block based on the probabilities
|
// process this data block based on the probabilities
|
||||||
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
|
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
|
||||||
if (!processThisBlock) {
|
if (!processThisBlock) {
|
||||||
|
@ -628,9 +632,8 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
uint32_t status = 0;
|
uint32_t status = 0;
|
||||||
int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
|
int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
|
||||||
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
// current block is filter out according to filter condition, continue load the next block
|
// current block is filter out according to filter condition, continue load the next block
|
||||||
|
@ -2540,7 +2543,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t status = 0;
|
uint32_t status = 0;
|
||||||
loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
|
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
|
||||||
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
|
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
@ -2714,10 +2717,13 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
|
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
|
||||||
qDebug("%s get sorted row block, rows:%d, limit:%"PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
|
qDebug("%s get sorted row block, rows:%d, limit:%"PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
|
||||||
pInfo->limitInfo.numOfOutputRows);
|
pInfo->limitInfo.numOfOutputRows);
|
||||||
|
|
||||||
|
if (limitReached) {
|
||||||
|
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||||
|
}
|
||||||
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue