fix(stream): recover the overwritten code
This commit is contained in:
parent
65d90a66a7
commit
86c4d34339
|
@ -3328,6 +3328,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
if (pLimitInfo->remainGroupOffset > 0) {
|
||||
if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.groupId) { // it is the first group
|
||||
pLimitInfo->currentGroupId = pBlock->info.groupId;
|
||||
ASSERT(pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM);
|
||||
continue;
|
||||
} else if (pLimitInfo->currentGroupId != pBlock->info.groupId) {
|
||||
// now it is the data from a new group
|
||||
|
@ -3336,6 +3337,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
|
||||
// ignore data block in current group
|
||||
if (pLimitInfo->remainGroupOffset > 0) {
|
||||
ASSERT(pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
@ -3380,10 +3382,12 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
if (pLimitInfo->remainOffset >= pInfo->pRes->info.rows) {
|
||||
pLimitInfo->remainOffset -= pInfo->pRes->info.rows;
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
ASSERT(pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM);
|
||||
continue;
|
||||
} else if (pLimitInfo->remainOffset < pInfo->pRes->info.rows && pLimitInfo->remainOffset > 0) {
|
||||
blockDataTrimFirstNRows(pInfo->pRes, pLimitInfo->remainOffset);
|
||||
pLimitInfo->remainOffset = 0;
|
||||
ASSERT(pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM);
|
||||
}
|
||||
|
||||
// check for the limitation in each group
|
||||
|
@ -3391,6 +3395,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
pLimitInfo->numOfOutputRows + pInfo->pRes->info.rows >= pLimitInfo->limit.limit) {
|
||||
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
|
||||
blockDataKeepFirstNRows(pInfo->pRes, keepRows);
|
||||
ASSERT(pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM);
|
||||
if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
@ -3412,7 +3417,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
// continue merge data, ignore the group id
|
||||
blockDataMerge(pFinalRes, pInfo->pRes);
|
||||
|
||||
if (pFinalRes->info.rows + pInfo->pRes->info.rows <= pOperator->resultInfo.threshold) {
|
||||
if (pFinalRes->info.rows + pInfo->pRes->info.rows <= pOperator->resultInfo.threshold &&
|
||||
pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue