diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index f08f04b86f..9493ae9315 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -733,10 +733,26 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) } } +void resetMergeJoinOperator(struct SOperatorInfo* pOperator) { + SMJoinOperatorInfo* pJoinInfo = pOperator->info; + if (pJoinInfo->rowCtx.rowRemains) { + mergeJoinDestroyTSRangeCtx(pJoinInfo, pJoinInfo->rowCtx.leftRowLocations, pJoinInfo->rowCtx.leftCreatedBlocks, pJoinInfo->rowCtx.rightCreatedBlocks, + pJoinInfo->rowCtx.rightUseBuildTable, pJoinInfo->rowCtx.rightRowLocations); + } + pJoinInfo->pLeft = NULL; + pJoinInfo->leftPos = 0; + pJoinInfo->pRight = NULL; + pJoinInfo->rightPos = 0; +} + SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoinInfo = pOperator->info; - if (pOperator->status == OP_EXEC_DONE && (NULL == pOperator->pDownstreamParams[0] || NULL == pOperator->pDownstreamParams[1])) { - return NULL; + if (pOperator->status == OP_EXEC_DONE) { + if (NULL == pOperator->pDownstreamParams[0] || NULL == pOperator->pDownstreamParams[1]) { + return NULL; + } else { + resetMergeJoinOperator(pOperator); + } } SSDataBlock* pRes = pJoinInfo->pRes;