fix: merge join ignore param issue

This commit is contained in:
dapan1121 2023-07-19 14:42:18 +08:00
parent 38be8f2a8e
commit 8b62c75c26
3 changed files with 18 additions and 6 deletions

View File

@ -147,6 +147,7 @@ typedef struct SLimitInfo {
} SLimitInfo; } SLimitInfo;
typedef struct SSortMergeJoinOperatorParam { typedef struct SSortMergeJoinOperatorParam {
bool initParam;
} SSortMergeJoinOperatorParam; } SSortMergeJoinOperatorParam;
typedef struct SExchangeOperatorBasicParam { typedef struct SExchangeOperatorBasicParam {

View File

@ -36,7 +36,7 @@ void freeVgTableList(void* ptr) {
static void destroyDynQueryCtrlOperator(void* param) { static void destroyDynQueryCtrlOperator(void* param) {
SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param; SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64, qError("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64,
pDyn->execInfo.prevBlkNum, pDyn->execInfo.prevBlkRows, pDyn->execInfo.postBlkNum, pDyn->execInfo.postBlkRows); pDyn->execInfo.prevBlkNum, pDyn->execInfo.prevBlkRows, pDyn->execInfo.postBlkNum, pDyn->execInfo.postBlkRows);
if (pDyn->stbJoin.ctx.prev.leftVg) { if (pDyn->stbJoin.ctx.prev.leftVg) {
@ -158,7 +158,7 @@ static FORCE_INLINE int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppR
} }
static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) { static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam* pChild0, SOperatorParam* pChild1) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) { if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -178,6 +178,8 @@ static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes,
if (NULL == pJoin) { if (NULL == pJoin) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pJoin->initParam = initParam;
(*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
(*ppRes)->value = pJoin; (*ppRes)->value = pJoin;
@ -214,7 +216,7 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS
code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pExcParam1); code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pExcParam1);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildMergeJoinOperatorParam(ppParam, pGcParam0, pGcParam1); code = buildMergeJoinOperatorParam(ppParam, false, pGcParam0, pGcParam1);
} }
return code; return code;
} }
@ -254,7 +256,7 @@ static int32_t buildSeqBatchStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInf
code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pExcParam1); code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pExcParam1);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildMergeJoinOperatorParam(ppParam, pGcParam0, pGcParam1); code = buildMergeJoinOperatorParam(ppParam, pExcParam0 ? true : false, pGcParam0, pGcParam1);
} }
return code; return code;
} }

View File

@ -650,7 +650,8 @@ static void setMergeJoinDone(SOperatorInfo* pOperator) {
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
SMJoinOperatorInfo* pJoinInfo = pOperator->info; SMJoinOperatorInfo* pJoinInfo = pOperator->info;
bool leftEmpty = false;
if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
pJoinInfo->pLeft = getNextBlockFromDownstream(pOperator, 0); pJoinInfo->pLeft = getNextBlockFromDownstream(pOperator, 0);
@ -658,7 +659,11 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
if (pJoinInfo->pLeft == NULL) { if (pJoinInfo->pLeft == NULL) {
qError("merge join left got empty block"); qError("merge join left got empty block");
setMergeJoinDone(pOperator); setMergeJoinDone(pOperator);
return false; if (pOperator->pOperatorParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorParam->value)->initParam) {
leftEmpty = true;
} else {
return false;
}
} else { } else {
qError("merge join left got block"); qError("merge join left got block");
} }
@ -674,8 +679,12 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
return false; return false;
} else { } else {
qError("merge join right got block"); qError("merge join right got block");
if (leftEmpty) {
return false;
}
} }
} }
// only the timestamp match support for ordinary table // only the timestamp match support for ordinary table
SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId); SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos); char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);