enh: add merge join reset
This commit is contained in:
parent
b2cea6ab84
commit
6960872515
|
@ -733,10 +733,26 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void resetMergeJoinOperator(struct SOperatorInfo* pOperator) {
|
||||||
|
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||||
|
if (pJoinInfo->rowCtx.rowRemains) {
|
||||||
|
mergeJoinDestroyTSRangeCtx(pJoinInfo, pJoinInfo->rowCtx.leftRowLocations, pJoinInfo->rowCtx.leftCreatedBlocks, pJoinInfo->rowCtx.rightCreatedBlocks,
|
||||||
|
pJoinInfo->rowCtx.rightUseBuildTable, pJoinInfo->rowCtx.rightRowLocations);
|
||||||
|
}
|
||||||
|
pJoinInfo->pLeft = NULL;
|
||||||
|
pJoinInfo->leftPos = 0;
|
||||||
|
pJoinInfo->pRight = NULL;
|
||||||
|
pJoinInfo->rightPos = 0;
|
||||||
|
}
|
||||||
|
|
||||||
SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
|
SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
|
||||||
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
|
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||||
if (pOperator->status == OP_EXEC_DONE && (NULL == pOperator->pDownstreamParams[0] || NULL == pOperator->pDownstreamParams[1])) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
if (NULL == pOperator->pDownstreamParams[0] || NULL == pOperator->pDownstreamParams[1]) {
|
||||||
|
return NULL;
|
||||||
|
} else {
|
||||||
|
resetMergeJoinOperator(pOperator);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pRes = pJoinInfo->pRes;
|
SSDataBlock* pRes = pJoinInfo->pRes;
|
||||||
|
|
Loading…
Reference in New Issue