diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 4c70697bb8..7f66b4469a 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -147,6 +147,7 @@ typedef struct SLimitInfo { } SLimitInfo; typedef struct SSortMergeJoinOperatorParam { + bool initParam; } SSortMergeJoinOperatorParam; typedef struct SExchangeOperatorBasicParam { diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 86627b6578..b323ad7874 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -36,7 +36,7 @@ void freeVgTableList(void* ptr) { static void destroyDynQueryCtrlOperator(void* param) { SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param; - qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64, + qError("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64, pDyn->execInfo.prevBlkNum, pDyn->execInfo.prevBlkRows, pDyn->execInfo.postBlkNum, pDyn->execInfo.postBlkRows); if (pDyn->stbJoin.ctx.prev.leftVg) { @@ -158,7 +158,7 @@ static FORCE_INLINE int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppR } -static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) { +static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam* pChild0, SOperatorParam* pChild1) { *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); if (NULL == *ppRes) { return TSDB_CODE_OUT_OF_MEMORY; @@ -178,6 +178,8 @@ static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, if (NULL == pJoin) { return TSDB_CODE_OUT_OF_MEMORY; } + + pJoin->initParam = initParam; (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; (*ppRes)->value = pJoin; @@ -214,7 +216,7 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pExcParam1); } if (TSDB_CODE_SUCCESS == code) { - code = buildMergeJoinOperatorParam(ppParam, pGcParam0, pGcParam1); + code = buildMergeJoinOperatorParam(ppParam, false, pGcParam0, pGcParam1); } return code; } @@ -254,7 +256,7 @@ static int32_t buildSeqBatchStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInf code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pExcParam1); } if (TSDB_CODE_SUCCESS == code) { - code = buildMergeJoinOperatorParam(ppParam, pGcParam0, pGcParam1); + code = buildMergeJoinOperatorParam(ppParam, pExcParam0 ? true : false, pGcParam0, pGcParam1); } return code; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 9493ae9315..8a28a29ab9 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -650,7 +650,8 @@ static void setMergeJoinDone(SOperatorInfo* pOperator) { static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { SMJoinOperatorInfo* pJoinInfo = pOperator->info; - + bool leftEmpty = false; + if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { pJoinInfo->pLeft = getNextBlockFromDownstream(pOperator, 0); @@ -658,7 +659,11 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs if (pJoinInfo->pLeft == NULL) { qError("merge join left got empty block"); setMergeJoinDone(pOperator); - return false; + if (pOperator->pOperatorParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorParam->value)->initParam) { + leftEmpty = true; + } else { + return false; + } } else { qError("merge join left got block"); } @@ -674,8 +679,12 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs return false; } else { qError("merge join right got block"); + if (leftEmpty) { + return false; + } } } + // only the timestamp match support for ordinary table SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId); char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);