Merge pull request #16342 from taosdata/feature/3_liaohj
refactor(query): do some internal refactor.
This commit is contained in:
commit
36eb46fa4d
|
@ -178,7 +178,7 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR
|
||||||
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
|
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
|
||||||
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
|
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
|
||||||
|
|
||||||
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
|
static void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
|
||||||
STsdbReader* pReader, bool* freeTSRow);
|
STsdbReader* pReader, bool* freeTSRow);
|
||||||
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
|
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
|
||||||
STSRow** pTSRow);
|
STSRow** pTSRow);
|
||||||
|
@ -1510,6 +1510,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
|
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
|
||||||
SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
|
SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
|
||||||
SRowMerger merge = {0};
|
SRowMerger merge = {0};
|
||||||
|
@ -1536,7 +1537,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
freeTSRow = true;
|
freeTSRow = true;
|
||||||
}
|
}
|
||||||
} else if (k.ts < key) { // k.ts < key
|
} else if (k.ts < key) { // k.ts < key
|
||||||
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
|
doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
|
||||||
} else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
|
} else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
|
||||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
|
@ -1549,7 +1550,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
} else { // descending order scan
|
} else { // descending order scan
|
||||||
if (key < k.ts) {
|
if (key < k.ts) {
|
||||||
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
|
doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
|
||||||
} else if (k.ts < key) {
|
} else if (k.ts < key) {
|
||||||
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
|
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1583,6 +1584,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
|
static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
|
||||||
SRowMerger merge = {0};
|
SRowMerger merge = {0};
|
||||||
STSRow* pTSRow = NULL;
|
STSRow* pTSRow = NULL;
|
||||||
|
@ -1734,6 +1737,7 @@ static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
|
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
|
||||||
SRowMerger merge = {0};
|
SRowMerger merge = {0};
|
||||||
STSRow* pTSRow = NULL;
|
STSRow* pTSRow = NULL;
|
||||||
|
@ -1779,7 +1783,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
// [3] ik.ts < key <= k.ts
|
// [3] ik.ts < key <= k.ts
|
||||||
// [4] ik.ts < k.ts <= key
|
// [4] ik.ts < k.ts <= key
|
||||||
if (ik.ts < k.ts) {
|
if (ik.ts < k.ts) {
|
||||||
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
|
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
|
||||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||||
if (freeTSRow) {
|
if (freeTSRow) {
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
|
@ -1790,7 +1794,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
// [5] k.ts < key <= ik.ts
|
// [5] k.ts < key <= ik.ts
|
||||||
// [6] k.ts < ik.ts <= key
|
// [6] k.ts < ik.ts <= key
|
||||||
if (k.ts < ik.ts) {
|
if (k.ts < ik.ts) {
|
||||||
doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow);
|
doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow);
|
||||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||||
if (freeTSRow) {
|
if (freeTSRow) {
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
|
@ -1836,7 +1840,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
// [3] ik.ts > k.ts >= Key
|
// [3] ik.ts > k.ts >= Key
|
||||||
// [4] ik.ts > key >= k.ts
|
// [4] ik.ts > key >= k.ts
|
||||||
if (ik.ts > key) {
|
if (ik.ts > key) {
|
||||||
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
|
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
|
||||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||||
if (freeTSRow) {
|
if (freeTSRow) {
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
|
@ -1859,7 +1863,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
|
|
||||||
//[7] key = ik.ts > k.ts
|
//[7] key = ik.ts > k.ts
|
||||||
if (key == ik.ts) {
|
if (key == ik.ts) {
|
||||||
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
|
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
|
||||||
|
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
tRowMerge(&merge, &fRow);
|
tRowMerge(&merge, &fRow);
|
||||||
|
@ -1876,6 +1880,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
|
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
|
||||||
STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
|
STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
|
||||||
|
@ -3115,7 +3120,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
|
void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
|
||||||
STsdbReader* pReader, bool* freeTSRow) {
|
STsdbReader* pReader, bool* freeTSRow) {
|
||||||
TSDBROW* pNextRow = NULL;
|
TSDBROW* pNextRow = NULL;
|
||||||
TSDBROW current = *pRow;
|
TSDBROW current = *pRow;
|
||||||
|
@ -3197,6 +3202,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
|
||||||
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
|
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
|
||||||
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
|
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
|
||||||
SArray* pDelList = pBlockScanInfo->delSkyline;
|
SArray* pDelList = pBlockScanInfo->delSkyline;
|
||||||
|
uint64_t uid = pBlockScanInfo->uid;
|
||||||
|
|
||||||
// todo refactor
|
// todo refactor
|
||||||
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
||||||
|
@ -3219,9 +3225,9 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
|
||||||
TSDBKEY ik = TSDBROW_KEY(piRow);
|
TSDBKEY ik = TSDBROW_KEY(piRow);
|
||||||
|
|
||||||
if (ik.ts < k.ts) { // ik.ts < k.ts
|
if (ik.ts < k.ts) { // ik.ts < k.ts
|
||||||
doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
|
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
|
||||||
} else if (k.ts < ik.ts) {
|
} else if (k.ts < ik.ts) {
|
||||||
doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
|
doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
|
||||||
} else { // ik.ts == k.ts
|
} else { // ik.ts == k.ts
|
||||||
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
|
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
|
||||||
*freeTSRow = true;
|
*freeTSRow = true;
|
||||||
|
@ -3231,12 +3237,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
|
if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
|
||||||
doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
|
doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
|
if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
|
||||||
doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
|
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3492,6 +3492,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
|
||||||
qError("Init stream agg supporter failed since %s", terrstr(terrno));
|
qError("Init stream agg supporter failed since %s", terrstr(terrno));
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
|
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("Create agg result buf failed since %s", tstrerror(code));
|
qError("Create agg result buf failed since %s", tstrerror(code));
|
||||||
|
|
|
@ -50,9 +50,11 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
|
||||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
|
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SExecTaskInfo* pTaskInfo) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
|
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,12 +69,11 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
||||||
pInfo->binfo.pRes = pResBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
|
pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
|
||||||
pInfo->pFilterNode = pProjPhyNode->node.pConditions;
|
pInfo->pFilterNode = pProjPhyNode->node.pConditions;
|
||||||
pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
|
|
||||||
|
|
||||||
// todo remove it soon
|
|
||||||
|
|
||||||
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||||
pInfo->mergeDataBlocks = false;
|
pInfo->mergeDataBlocks = false;
|
||||||
|
} else {
|
||||||
|
pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfRows = 4096;
|
int32_t numOfRows = 4096;
|
||||||
|
@ -83,9 +84,13 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
||||||
if (numOfRows * pResBlock->info.rowSize > TWOMB) {
|
if (numOfRows * pResBlock->info.rowSize > TWOMB) {
|
||||||
numOfRows = TWOMB / pResBlock->info.rowSize;
|
numOfRows = TWOMB / pResBlock->info.rowSize;
|
||||||
}
|
}
|
||||||
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
|
|
||||||
|
|
||||||
initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
|
||||||
|
code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||||
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
|
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
|
||||||
|
|
||||||
|
@ -99,7 +104,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
|
||||||
destroyProjectOperatorInfo, NULL, NULL, NULL);
|
destroyProjectOperatorInfo, NULL, NULL, NULL);
|
||||||
|
|
||||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -107,7 +112,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
destroyProjectOperatorInfo(pInfo, numOfCols);
|
||||||
|
taosMemoryFree(pOperator);
|
||||||
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -175,7 +182,8 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
|
||||||
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
|
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
|
||||||
blockDataKeepFirstNRows(pBlock, keepRows);
|
blockDataKeepFirstNRows(pBlock, keepRows);
|
||||||
//TODO: optimize it later when partition by + limit
|
//TODO: optimize it later when partition by + limit
|
||||||
if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) || pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
|
if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) ||
|
||||||
|
(pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue