diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 7c1eff9a49..cee789e426 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -21,6 +21,18 @@ extern "C" { typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*); +typedef enum EJoinTableType { + E_JOIN_TB_BUILD = 1, + E_JOIN_TB_PROBE +} EJoinTableType; + +typedef enum EJoinPhase { + E_JOIN_PHASE_RETRIEVE, + E_JOIN_PHASE_SPLIT, + E_JOIN_PHASE_OUTPUT, + E_JOIN_PHASE_ +} EJoinPhase; + typedef struct SMJoinColInfo { int32_t srcSlot; int32_t dstSlot; @@ -33,6 +45,18 @@ typedef struct SMJoinColInfo { } SMJoinColInfo; +typedef struct SMJoinBlkInfo { + bool cloned; + SSDataBlock* pBlk; + void* pNext; +} SMJoinBlkInfo; + +typedef struct SMJoinRowInfo { + int64_t blkId; + int32_t rowIdx; + int64_t rowGIdx; +} SMJoinRowInfo; + typedef struct SMJoinTableInfo { int32_t downStreamIdx; SOperatorInfo* downStream; @@ -56,8 +80,109 @@ typedef struct SMJoinTableInfo { bool valColExist; } SMJoinTableInfo; +typedef struct SMJoinTsJoinCtx { + SMJoinTableCtx* pProbeCtx; + SMJoinTableCtx* pBuildCtx; + int64_t probeRowNum; + int64_t buildRowNum; + int64_t* probeTs; + int64_t* buildTs; + int64_t* probeEndTs; + int64_t* buildEndTs; + bool inSameTsGrp; + bool inDiffTsGrp; + SGrpPairCtx* pLastGrpPairCtx; + SGrpPairCtx currGrpPairCtx; +} SMJoinTsJoinCtx; + +typedef struct SBuildGrpCtx { + bool multiBlkGrp; + bool hashJoin; + SSHashObj* pGrpHash; + int32_t grpRowReadIdx; + int32_t grpRowGReadIdx; + int32_t grpRowBeginIdx; + int32_t grpRowNum; +} SBuildGrpCtx; + +typedef struct SProbeGrpCtx { + int32_t grpRowReadIdx; + int32_t grpRowBeginIdx; + int32_t grpRowNum; +} SProbeGrpCtx; + +typedef struct SGrpPairCtx { + bool sameTsGrp; + bool finishGrp; + SBuildGrpCtx buildGrp; + SProbeGrpCtx probeGrp; +} SGrpPairCtx; + +typedef struct SMJoinOutputCtx { + int32_t grpReadIdx; + int32_t grpWriteIdx; + SArray* pGrpList; +} SMJoinOutputCtx; + +typedef struct SMJoinTableCtx { + EJoinTableType type; + SMJoinTableInfo* pTbInfo; + bool dsInitDone; + bool dsFetchDone; + int64_t blkCurTs; + int32_t blkRowIdx; + int64_t blkIdx; + int64_t blkNum; + SMJoinBlkInfo* pCurrBlk; + SMJoinBlkInfo* pHeadBlk; + SMJoinBlkInfo* pTailBlk; +} SMJoinTableCtx; + +typedef struct SMJoinMergeCtx { + bool hashJoin; + EJoinPhase joinPhase; + int64_t grpCurTs; + SMJoinOutputCtx outputCtx; + SMJoinTsJoinCtx tsJoinCtx; + SMJoinTableCtx buildTbCtx; + SMJoinTableCtx probeTbCtx; +} SMJoinMergeCtx; + +typedef struct SMJoinWinCtx { + +} SMJoinWinCtx; + + +typedef struct SMJoinFlowFlags { + bool mergeJoin; + bool windowJoin; + bool preFilter; + bool retrieveAfterBuildDone; +} SMJoinFlowFlags; + +typedef struct SMJoinCtx { + SMJoinFlowFlags* pFlags; + union { + SMJoinMergeCtx mergeCtx; + SMJoinWinCtx winCtx; + }; + +} SMJoinCtx; + +typedef struct SMJoinExecInfo { + int64_t buildBlkNum; + int64_t buildBlkRows; + int64_t probeBlkNum; + int64_t probeBlkRows; + int64_t resRows; + int64_t expectRows; +} SMJoinExecInfo; + + typedef struct SMJoinOperatorInfo { + SOperatorInfo* pOperator; int32_t joinType; + int32_t subType; int32_t inputTsOrder; SMJoinTableInfo tbs[2]; SMJoinTableInfo* pBuild; @@ -65,16 +190,50 @@ typedef struct SMJoinOperatorInfo { SSDataBlock* pRes; int32_t pResColNum; int8_t* pResColMap; - SArray* pRowBufs; SFilterInfo* pPreFilter; SFilterInfo* pFinFilter; - SSHashObj* pKeyHash; - bool keyHashBuilt; - joinImplFp joinFp; - SHJoinCtx ctx; - SHJoinExecInfo execInfo; + SMJoinFuncs joinFps; + SMJoinCtx ctx; + SMJoinExecInfo execInfo; } SMJoinOperatorInfo; +#define MJOIN_DOWNSTREAM_NEED_INIT(_pOp) ((_pOp)->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)(_pOp)->pOperatorGetParam->value)->initDownstream) + +#define FIN_SAME_TS_GRP() do { \ + if (inSameTsGrp) { \ + grpPairCtx.sameTsGrp = true; \ + grpPairCtx.finishGrp = true; \ + grpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx; \ + grpPairCtx.probeGrp.grpRowNum = 1; \ + inSameTsGrp = false; \ + pLastGrpPairCtx = taosArrayPush(pCtx->grpCtx.pGrpList, &grpPairCtx); \ + } \ + } while (0) + +#define PAUSE_SAME_TS_GRP() do { \ + if (inSameTsGrp) { \ + grpPairCtx.sameTsGrp = true; \ + grpPairCtx.finishGrp = false; \ + grpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx; \ + grpPairCtx.probeGrp.grpRowNum = 1; \ + inSameTsGrp = false; \ + pLastGrpPairCtx = taosArrayPush(pCtx->grpCtx.pGrpList, &grpPairCtx); \ + } \ + } while (0) + + +#define FIN_DIFF_TS_GRP() do { \ + if (inDiffTsGrp) { \ + grpPairCtx.sameTsGrp = false; \ + grpPairCtx.finishGrp = true; \ + grpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx; \ + grpPairCtx.probeGrp.grpRowNum = 1; \ + inDiffTsGrp = false; \ + pLastGrpPairCtx = taosArrayPush(pCtx->grpCtx.pGrpList, &grpPairCtx); \ + } \ + } while (0) + + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 83b9cc1b73..ffaac1e323 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -266,57 +266,280 @@ static int32_t mJoinInitBufPages(SMJoinOperatorInfo* pInfo) { return mJoinAddPageToBufList(pInfo->pRowBufs); } -static SSDataBlock* mJoinHanleMergeJoin(SOperatorInfo* pOperator) { - SMJoinOperatorInfo* pJoinInfo = pOperator->info; - - int32_t nrows = pRes->info.rows; - - bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false; - - while (1) { - int64_t leftTs = 0; - int64_t rightTs = 0; - if (pJoinInfo->rowCtx.rowRemains) { - leftTs = pJoinInfo->rowCtx.ts; - rightTs = pJoinInfo->rowCtx.ts; +static int32_t mJoinDoRetrieve(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTbCtx, SMJoinTableInfo* pTbInfo) { + bool retrieveCont = false; + int32_t code = TSDB_CODE_SUCCESS; + + do { + SSDataBlock* pBlock = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTbInfo->downStreamIdx); + pTbCtx->dsInitDone = true; + + if (NULL == pBlock) { + retrieveCont = false; + code = (*pJoin->joinFps.handleTbFetchDoneFp)(pJoin, pTbCtx, pTbInfo); } else { - bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); - if (!hasNextTs) { + code = (*pJoin->joinFps.handleBlkFetchedFp)(pJoin, pTbCtx, pTbInfo, pBlock, &retrieveCont); + } + } while (retrieveCont || TSDB_CODE_SUCCESS != code); + + return code; +} + +static FORCE_INLINE bool mJoinBuildTbNeedRetrieve(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pBuildCtx) { + if ((!pBuildCtx->grpRetrieved) && (!pBuildCtx->dsFetchDone)) { + return true; + } + return false; +} + +static FORCE_INLINE bool mJoinProbeTbNeedRetrieve(SOperatorInfo* pOperator, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) { + SMJoinOperatorInfo* pJoin = pOperator->info; + if (MJOIN_DOWNSTREAM_NEED_INIT(pOperator) && !pBuildCtx->dsInitDone) { + return true; + } + if (((!pProbeCtx->grpRetrieved) && (!pProbeCtx->dsFetchDone)) && (pBuildCtx->grpRetrieved || pJoin->ctx.flags.retrieveAfterBuildDone))) { + return true; + } + return false; +} + +static bool mJoinOpenMergeJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { + SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx; + SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx; + int32_t code = mJoinDoRetrieve(pOperator, pBuildCtx, pJoin->pBuild); + if (TSDB_CODE_SUCCESS != code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } + if (pBuildCtx->dsFetchDone && !pJoin->ctx.flags.retrieveAfterBuildDone) { + return false; + } + + code = mJoinDoRetrieve(pOperator, pProbeCtx, pJoin->pProbe); + if (TSDB_CODE_SUCCESS != code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } + return true; +} + +static void mJoinInitJoinCtx(SMJoinOperatorInfo* pJoin) { + +} + +static void mJoinLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { + SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx; + SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx; + + int32_t code = mJoinDoRetrieve(pOperator, pProbeCtx, pJoin->pProbe); + if (TSDB_CODE_SUCCESS != code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } + + if (NULL == pProbeCtx->pHeadBlk) { + return; + } + + code = mJoinDoRetrieve(pJoin, pBuildCtx, pJoin->pBuild); + if (TSDB_CODE_SUCCESS != code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } + + pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_TS_JOIN; +} + +static FORCE_INLINE void mJoinLefeJoinUpdateTsJoinCtx(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) { + pCtx->probeRowNum = pProbeCtx->pCurrBlk->pBlk->info.rows; + pCtx->buildRowNum = pBuildCtx->pCurrBlk->pBlk->info.rows; + SColumnInfoData* probeCol = taosArrayGet(pProbeCtx->pCurrBlk->pBlk, pProbeCtx->pTbInfo->primCol->srcSlot); + SColumnInfoData* buildCol = taosArrayGet(pBuildCtx->pHeadBlk->pBlk, pBuildCtx->pTbInfo->primCol->srcSlot); + pCtx->probeTs = (int64_t*)probeCol->pData; + pCtx->probeEndTs = (int64_t*)probeCol->pData + pCtx->probeRowNum - 1; + pCtx->buildTs = (int64_t*)buildCol->pData; + pCtx->buildEndTs = (int64_t*)buildCol->pData + pCtx->buildRowNum - 1; +} + +static bool mJoinMoveToNextProbeTable(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx) { + if (NULL == pProbeCtx->pCurrBlk->pNext) { + pProbeCtx->blkIdx++; + return false; + } + + pProbeCtx->pCurrBlk = pProbeCtx->pCurrBlk->pNext; + pCtx->probeRowNum = pProbeCtx->pCurrBlk->pBlk->info.rows; + SColumnInfoData* probeCol = taosArrayGet(pProbeCtx->pCurrBlk->pBlk, pProbeCtx->pTbInfo->primCol->srcSlot); + pCtx->probeTs = (int64_t*)probeCol->pData; + pCtx->probeEndTs = (int64_t*)probeCol->pData + pCtx->probeRowNum - 1; + pProbeCtx->blkRowIdx = 0; + + return true; +} + +static bool mJoinMoveToNextBuildTable(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pBuildCtx, SMJoinTableCtx* pProbeCtx) { + bool contLoop = false; + bool res = false; + + do { + if (pBuildCtx->pCurrBlk->pNext) { + pBuildCtx->blkIdx++; + return false; + } + + pBuildCtx->pCurrBlk = pBuildCtx->pCurrBlk->pNext; + pCtx->buildRowNum = pBuildCtx->pCurrBlk->pBlk->info.rows; + SColumnInfoData* buildCol = taosArrayGet(pBuildCtx->pCurrBlk->pBlk, pBuildCtx->pTbInfo->primCol->srcSlot); + pCtx->buildTs = (int64_t*)buildCol->pData; + pCtx->buildEndTs = (int64_t*)buildCol->pData + pCtx->buildRowNum - 1; + pBuildCtx->blkRowIdx = 0; + + do { + if (*pCtx->buildTs > pCtx->probeTs[pProbeCtx->blkRowIdx]) { + mJoinLeftJoinAddBlkToGrp(pJoin, pCtx, pProbeCtx, pBuildCtx); + contLoop = mJoinMoveToNextProbeTable(pCtx, pProbeCtx); + } else if (pCtx->probeTs[pProbeCtx->blkRowIdx] > *pCtx->buildEndTs) { break; + } else { + contLoop = false; + res = true; + } + } while (contLoop); + } while (contLoop); + + return res; +} + + + +static void mJoinLeftJoinSplitGrp(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { + SMJoinTsJoinCtx* pCtx = &pJoin->ctx.mergeCtx.tsJoinCtx; + SMJoinTableCtx* pProbeCtx = &pJoin->ctx.mergeCtx.probeTbCtx; + SMJoinTableCtx* pBuildCtx = &pJoin->ctx.mergeCtx.buildTbCtx; + + mJoinLefeJoinUpdateTsJoinCtx(pCtx, pProbeCtx, pBuildCtx); + + bool nextRow = false; + for (; pProbeCtx->blkIdx < pProbeCtx->blkNum; mJoinMoveToNextProbeTable(pCtx, pProbeCtx)) { + if (*pCtx->buildTs > *pCtx->probeEndTs) { + mJoinLeftJoinAddBlkToGrp(pJoin, pCtx, pProbeCtx, pBuildCtx); + continue; + } else if (*pCtx->probeTs > *pCtx->buildEndTs) { + if (!mJoinMoveToNextBuildTable(pJoin, pCtx, pBuildCtx, pProbeCtx)) { + break; + //retrieve build } } - if (leftTs == rightTs) { - mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows); - } else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) { - pJoinInfo->leftPos += 1; + for (; pProbeCtx->blkRowIdx < pCtx->probeRowNum; ++pProbeCtx->blkRowIdx) { + for (; pBuildCtx->blkIdx < pBuildCtx->blkNum; mJoinMoveToNextBuildTable(pJoin, pCtx, pBuildCtx, pProbeCtx)) { + for (; pBuildCtx->blkRowIdx < pCtx->buildRowNum; ++pBuildCtx->blkRowIdx) { + if (pCtx->probeTs[pProbeCtx->blkRowIdx] > pCtx->buildTs[pBuildCtx->blkRowIdx]) { + FIN_SAME_TS_GRP(); + FIN_DIFF_TS_GRP(); + continue; + } else if (pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->buildTs[pBuildCtx->blkRowIdx]) { + FIN_DIFF_TS_GRP(); + if (pCtx->inSameTsGrp) { + pCtx->currGrpPairCtx.buildGrp.grpRowNum++; + } else { + pCtx->inSameTsGrp = true; + pCtx->currGrpPairCtx.buildGrp.grpRowBeginIdx = pBuildCtx->blkRowIdx; + } + } else { + FIN_SAME_TS_GRP(); + if (pCtx->inDiffTsGrp) { + pCtx->currGrpPairCtx.probeGrp.grpRowNum++; + } else { + pCtx->inDiffTsGrp = true; + pCtx->currGrpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx; + } + nextRow = true; + break; + } + } - if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) { + // end of single build table + if (nextRow) { + break; + } + } + + // end of all build tables + if (nextRow) { continue; } - } else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) { - pJoinInfo->rightPos += 1; - if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) { - continue; + + if (pCtx->inSameTsGrp) { + PAUSE_SAME_TS_GRP(); } - } - - // the pDataBlock are always the same one, no need to call this again - pRes->info.rows = nrows; - pRes->info.dataLoad = 1; - pRes->info.scanFlag = MAIN_SCAN; - if (pRes->info.rows >= pOperator->resultInfo.threshold) { + break; } + + // end of single probe table + if (nextRow) { + continue; + } + + break; + } + + // end of all probe tables + FIN_DIFF_TS_GRP(); + + pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_OUTPUT; + + return true; +} + +static bool mJoinLeftJoinOutput(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { + +} + +static SSDataBlock* mJoinDoLeftJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { + bool contLoop = false; + + do { + switch (pJoin->ctx.mergeCtx.joinPhase) { + case E_JOIN_PHASE_RETRIEVE: + contLoop = mJoinLeftJoinRetrieve(pOperator, pJoin); + break; + case E_JOIN_PHASE_SPLIT: + contLoop = mJoinLeftJoinSplitGrp(pOperator, pJoin); + break; + case E_JOIN_PHASE_OUTPUT: + contLoop = mJoinLeftJoinOutput(pOperator, pJoin); + break; + } + } while (contLoop); + +} + +static SSDataBlock* mJoinHanleMergeJoin(SOperatorInfo* pOperator) { + SMJoinOperatorInfo* pJoin = pOperator->info; + SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx; + SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx; + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pBlock = NULL; + + while (true) { + pBlock = (*pJoin->joinFp)(pOperator, pJoin, pCtx, pBuildCtx, pProbeCtx); + if (pBlock && pBlock->info.rows > 0) { + return pBlock; + } } } SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { - SMJoinOperatorInfo* pJoinInfo = pOperator->info; + SMJoinOperatorInfo* pJoin = pOperator->info; if (pOperator->status == OP_EXEC_DONE) { if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) { - qDebug("%s total merge join res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pJoinInfo->resRows); + qDebug("%s total merge join res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pJoin->resRows); return NULL; } else { resetMergeJoinOperator(pOperator); @@ -329,20 +552,18 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { st = taosGetTimestampUs(); } - SSDataBlock* pRes = pJoinInfo->pRes; - blockDataCleanup(pRes); + SSDataBlock* pBlock = NULL; + //blockDataCleanup(pJoin->pRes); while (true) { - int32_t numOfRowsBefore = pRes->info.rows; - mJoinImpl(pOperator, pRes); - int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore; - if (numOfNewRows == 0) { + pBlock = (*pJoin->joinFp)(pOperator); + if (NULL == pBlock) { break; } - if (pJoinInfo->pFinFilter != NULL) { - doFilter(pRes, pJoinInfo->pFinFilter, NULL); + if (pJoin->pFinFilter != NULL) { + doFilter(pBlock, pJoin->pFinFilter, NULL); } - if (pRes->info.rows > 0 || pOperator->status == OP_EXEC_DONE) { + if (pBlock->info.rows > 0 || pOperator->status == OP_EXEC_DONE) { break; } } @@ -351,12 +572,12 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } - if (pRes->info.rows > 0) { - pJoinInfo->resRows += pRes->info.rows; - qDebug("%s merge join returns res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pRes->info.rows); - return pRes; + if (pBlock->info.rows > 0) { + pJoin->resRows += pBlock->info.rows; + qDebug("%s merge join returns res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.rows); + return pBlock; } else { - qDebug("%s total merge join res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pJoinInfo->resRows); + qDebug("%s total merge join res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pJoin->resRows); return NULL; } } @@ -374,6 +595,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t goto _error; } + pInfo->pOperator = pOperator; code = mJoinInitDownstreamInfo(pInfo, pDownstream, numOfDownstream, newDownstreams); if (TSDB_CODE_SUCCESS != code) { goto _error; @@ -391,6 +613,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t mJoinSetBuildAndProbeTable(pInfo, pJoinNode); + mJoinInitJoinCtx(pInfo); + code = mJoinBuildResColMap(pInfo, pJoinNode); if (code) { goto _error;