diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 5dbd03354e..af48d865a0 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -468,26 +468,37 @@ typedef struct SInterpFuncPhysiNode { } SInterpFuncPhysiNode; typedef struct SSortMergeJoinPhysiNode { - SPhysiNode node; - EJoinType joinType; - SNode* pPrimKeyCond; - SNode* pColEqCond; - SNode* pFullOnCond; - SNodeList* pTargets; + SPhysiNode node; + EJoinType joinType; + EJoinSubType subType; + SNode* pWindowOffset; + SNode* pJLimit; + int32_t leftPrimSlotId; + int32_t rightPrimSlotId; + SNodeList* pEqLeft; + SNodeList* pEqRight; + SNode* pPrimKeyCond; //remove + SNode* pColEqCond; //remove + SNode* pColOnCond; + SNode* pFullOnCond; + SNodeList* pTargets; } SSortMergeJoinPhysiNode; typedef struct SHashJoinPhysiNode { - SPhysiNode node; - EJoinType joinType; - SNodeList* pOnLeft; - SNodeList* pOnRight; - SNode* pFilterConditions; - SNodeList* pTargets; - SQueryStat inputStat[2]; + SPhysiNode node; + EJoinType joinType; + EJoinSubType subType; + SNode* pWindowOffset; + SNode* pJLimit; + SNodeList* pOnLeft; + SNodeList* pOnRight; + SNode* pFilterConditions; + SNodeList* pTargets; + SQueryStat inputStat[2]; - SNode* pPrimKeyCond; - SNode* pColEqCond; - SNode* pTagEqCond; + SNode* pPrimKeyCond; + SNode* pColEqCond; + SNode* pTagEqCond; } SHashJoinPhysiNode; typedef struct SGroupCachePhysiNode { diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 1956201422..efb118d4ed 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -23,6 +23,7 @@ extern "C" { #include "nodes.h" #include "tmsg.h" #include "tvariant.h" +#include "tsimplehash.h" #define TABLE_TOTAL_COL_NUM(pMeta) ((pMeta)->tableInfo.numOfColumns + (pMeta)->tableInfo.numOfTags) #define TABLE_META_SIZE(pMeta) \ @@ -552,15 +553,15 @@ typedef struct SQuery { bool stableQuery; } SQuery; -void nodesWalkSelectStmtImpl(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext, bool ignoreFrom); +void nodesWalkSelectStmtImpl(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext); void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext); void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext); typedef enum ECollectColType { COLLECT_COL_TYPE_COL = 1, COLLECT_COL_TYPE_TAG, COLLECT_COL_TYPE_ALL } ECollectColType; int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, ECollectColType type, SNodeList** pCols); -int32_t nodesCollectColumnsExt(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, ECollectColType type, - SNodeList** pCols, bool ignoreFrom); +int32_t nodesCollectColumnsExt(SSelectStmt* pSelect, ESqlClause clause, SSHashObj* pMultiTableAlias, ECollectColType type, + SNodeList** pCols); int32_t nodesCollectColumnsFromNode(SNode* node, const char* pTableAlias, ECollectColType type, SNodeList** pCols); typedef bool (*FFuncClassifier)(int32_t funcId); diff --git a/source/libs/executor/inc/hashjoin.h b/source/libs/executor/inc/hashjoin.h index 6822333d0a..0409f559c3 100755 --- a/source/libs/executor/inc/hashjoin.h +++ b/source/libs/executor/inc/hashjoin.h @@ -36,11 +36,6 @@ typedef struct SHJoinCtx { int32_t probeIdx; } SHJoinCtx; -typedef struct SRowLocation { - SSDataBlock* pDataBlock; - int32_t pos; -} SRowLocation; - typedef struct SHJoinColInfo { int32_t srcSlot; int32_t dstSlot; diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h new file mode 100755 index 0000000000..7c1eff9a49 --- /dev/null +++ b/source/libs/executor/inc/mergejoin.h @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef TDENGINE_MERGEJOIN_H +#define TDENGINE_MERGEJOIN_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*); + +typedef struct SMJoinColInfo { + int32_t srcSlot; + int32_t dstSlot; + bool keyCol; + bool vardata; + int32_t* offset; + int32_t bytes; + char* data; + char* bitMap; +} SMJoinColInfo; + + +typedef struct SMJoinTableInfo { + int32_t downStreamIdx; + SOperatorInfo* downStream; + int32_t blkId; + SQueryStat inputStat; + + SMJoinColInfo* primCol; + char* primData; + + int32_t keyNum; + SMJoinColInfo* keyCols; + char* keyBuf; + char* keyData; + + int32_t valNum; + SMJoinColInfo* valCols; + char* valData; + int32_t valBitMapSize; + int32_t valBufSize; + SArray* valVarCols; + bool valColExist; +} SMJoinTableInfo; + +typedef struct SMJoinOperatorInfo { + int32_t joinType; + int32_t inputTsOrder; + SMJoinTableInfo tbs[2]; + SMJoinTableInfo* pBuild; + SMJoinTableInfo* pProbe; + SSDataBlock* pRes; + int32_t pResColNum; + int8_t* pResColMap; + SArray* pRowBufs; + SFilterInfo* pPreFilter; + SFilterInfo* pFinFilter; + SSHashObj* pKeyHash; + bool keyHashBuilt; + joinImplFp joinFp; + SHJoinCtx ctx; + SHJoinExecInfo execInfo; +} SMJoinOperatorInfo; + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_MERGEJOIN_H diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index f382283d27..f8aa9323b7 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -106,7 +106,7 @@ static bool valColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys return false; } -static int32_t initJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { +static int32_t initHJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { getHJoinValColNum(pList, pTable->blkId, &pTable->valNum); if (pTable->valNum == 0) { return TSDB_CODE_SUCCESS; @@ -173,7 +173,7 @@ static int32_t initHJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* if (code) { return code; } - code = initJoinValColsInfo(pTable, pJoinNode->pTargets); + code = initHJoinValColsInfo(pTable, pJoinNode->pTargets); if (code) { return code; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 5e74edc47f..83b9cc1b73 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -25,186 +25,9 @@ #include "thash.h" #include "tmsg.h" #include "ttypes.h" +#include "mergejoin.h" -typedef struct SMJoinRowCtx { - bool rowRemains; - int64_t ts; - SArray* leftRowLocations; - SArray* leftCreatedBlocks; - SArray* rightCreatedBlocks; - int32_t leftRowIdx; - int32_t rightRowIdx; - - bool rightUseBuildTable; - SArray* rightRowLocations; -} SMJoinRowCtx; - -typedef struct SMJoinOperatorInfo { - SSDataBlock* pRes; - int32_t joinType; - int32_t inputOrder; - bool downstreamInitDone[2]; - bool downstreamFetchDone[2]; - int16_t downstreamResBlkId[2]; - - SSDataBlock* pLeft; - int32_t leftPos; - SColumnInfo leftCol; - - SSDataBlock* pRight; - int32_t rightPos; - SColumnInfo rightCol; - SNode* pCondAfterMerge; - SNode* pColEqualOnConditions; - - SArray* leftEqOnCondCols; - char* leftEqOnCondKeyBuf; - int32_t leftEqOnCondKeyLen; - - SArray* rightEqOnCondCols; - char* rightEqOnCondKeyBuf; - int32_t rightEqOnCondKeyLen; - - SSHashObj* rightBuildTable; - SMJoinRowCtx rowCtx; - - int64_t resRows; -} SMJoinOperatorInfo; - -static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); -static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); -static void destroyMergeJoinOperator(void* param); -static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr); - -static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) { - SNode* pPrimKeyCond = pJoinNode->pPrimKeyCond; - if (nodeType(pPrimKeyCond) != QUERY_NODE_OPERATOR) { - qError("not support this in join operator, %s", idStr); - return; // do not handle this - } - - SOperatorNode* pNode = (SOperatorNode*)pPrimKeyCond; - SColumnNode* col1 = (SColumnNode*)pNode->pLeft; - SColumnNode* col2 = (SColumnNode*)pNode->pRight; - SColumnNode* leftTsCol = NULL; - SColumnNode* rightTsCol = NULL; - if (col1->dataBlockId == col2->dataBlockId) { - leftTsCol = col1; - rightTsCol = col2; - } else { - if (col1->dataBlockId == pInfo->downstreamResBlkId[0]) { - ASSERT(col2->dataBlockId == pInfo->downstreamResBlkId[1]); - leftTsCol = col1; - rightTsCol = col2; - } else { - ASSERT(col1->dataBlockId == pInfo->downstreamResBlkId[1]); - ASSERT(col2->dataBlockId == pInfo->downstreamResBlkId[0]); - leftTsCol = col2; - rightTsCol = col1; - } - } - setJoinColumnInfo(&pInfo->leftCol, leftTsCol); - setJoinColumnInfo(&pInfo->rightCol, rightTsCol); -} - -static void extractEqualOnCondColsFromOper(SMJoinOperatorInfo* pInfo, SOperatorNode* pOperNode, - SColumn* pLeft, SColumn* pRight) { - SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft; - SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight; - if (pLeftNode->dataBlockId == pRightNode->dataBlockId || pLeftNode->dataBlockId == pInfo->downstreamResBlkId[0]) { - *pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft); - *pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight); - } else { - *pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight); - *pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft); - } -} - -static void extractEqualOnCondCols(SMJoinOperatorInfo* pInfo, SNode* pEqualOnCondNode, - SArray* leftTagEqCols, SArray* rightTagEqCols) { - SColumn left = {0}; - SColumn right = {0}; - if (nodeType(pEqualOnCondNode) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)pEqualOnCondNode)->condType == LOGIC_COND_TYPE_AND) { - SNode* pNode = NULL; - FOREACH(pNode, ((SLogicConditionNode*)pEqualOnCondNode)->pParameterList) { - SOperatorNode* pOperNode = (SOperatorNode*)pNode; - extractEqualOnCondColsFromOper(pInfo, pOperNode, &left, &right); - taosArrayPush(leftTagEqCols, &left); - taosArrayPush(rightTagEqCols, &right); - } - return; - } - - if (nodeType(pEqualOnCondNode) == QUERY_NODE_OPERATOR) { - SOperatorNode* pOperNode = (SOperatorNode*)pEqualOnCondNode; - extractEqualOnCondColsFromOper(pInfo, pOperNode, &left, &right); - taosArrayPush(leftTagEqCols, &left); - taosArrayPush(rightTagEqCols, &right); - } -} - -static int32_t initTagColskeyBuf(int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { - int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); - for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i); - (*keyLen) += pCol->bytes; // actual data + null_flag - } - - int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; - (*keyLen) += nullFlagSize; - - if (*keyLen >= 0) { - - (*keyBuf) = taosMemoryCalloc(1, (*keyLen)); - if ((*keyBuf) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - } - - return TSDB_CODE_SUCCESS; -} - -static int32_t fillKeyBufFromTagCols(SArray* pCols, SSDataBlock* pBlock, int32_t rowIndex, void* pKey) { - SColumnDataAgg* pColAgg = NULL; - size_t numOfGroupCols = taosArrayGetSize(pCols); - char* isNull = (char*)pKey; - char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols; - - for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = (SColumn*) taosArrayGet(pCols, i); - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); - - // valid range check. todo: return error code. - if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) { - continue; - } - - if (pBlock->pBlockAgg != NULL) { - pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? - } - - if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { - isNull[i] = 1; - } else { - isNull[i] = 0; - char* val = colDataGetData(pColInfoData, rowIndex); - if (pCol->type == TSDB_DATA_TYPE_JSON) { - int32_t dataLen = getJsonValueLen(val); - memcpy(pStart, val, dataLen); - pStart += dataLen; - } else if (IS_VAR_DATA_TYPE(pCol->type)) { - varDataCopy(pStart, val); - pStart += varDataTLen(val); - } else { - memcpy(pStart, val, pCol->bytes); - pStart += pCol->bytes; - } - } - } - return (int32_t)(pStart - (char*)pKey); -} - -SOperatorInfo** buildMergeJoinDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) { +SOperatorInfo** mJoinBuildDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) { SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES); if (p) { p[0] = pDownstream[0]; @@ -216,520 +39,234 @@ SOperatorInfo** buildMergeJoinDownstreams(SMJoinOperatorInfo* pInfo, SOperatorIn return p; } - -SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, - SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { - SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - bool newDownstreams = false; - - int32_t code = TSDB_CODE_SUCCESS; - if (pOperator == NULL || pInfo == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - if (1 == numOfDownstream) { - newDownstreams = true; - pDownstream = buildMergeJoinDownstreams(pInfo, pDownstream); +int32_t mJoinInitDownstreamInfo(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t *numOfDownstream, bool *newDownstreams) { + if (1 == *numOfDownstream) { + *newDownstreams = true; + pDownstream = mJoinBuildDownstreams(pInfo, pDownstream); if (NULL == pDownstream) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; + return TSDB_CODE_OUT_OF_MEMORY; } - numOfDownstream = 2; + *numOfDownstream = 2; } else { pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(pDownstream[0], 0); pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(pDownstream[1], 0); } - int32_t numOfCols = 0; - pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); - - SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); - initResultSizeInfo(&pOperator->resultInfo, 4096); - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - - setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; - - extractTimeCondition(pInfo, pJoinNode, GET_TASKID(pTaskInfo)); - - if (pJoinNode->pFullOnCond != NULL && pJoinNode->node.pConditions != NULL) { - pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); - if (pInfo->pCondAfterMerge == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge); - pLogicCond->pParameterList = nodesMakeList(); - if (pLogicCond->pParameterList == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pFullOnCond)); - nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); - pLogicCond->condType = LOGIC_COND_TYPE_AND; - } else if (pJoinNode->pFullOnCond != NULL) { - pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pFullOnCond); - } else if (pJoinNode->pColEqCond != NULL) { - pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pColEqCond); - } else if (pJoinNode->node.pConditions != NULL) { - pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->node.pConditions); - } else { - pInfo->pCondAfterMerge = NULL; - } - - code = filterInitFromNode(pInfo->pCondAfterMerge, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - pInfo->inputOrder = TSDB_ORDER_ASC; - if (pJoinNode->node.inputTsOrder == ORDER_ASC) { - pInfo->inputOrder = TSDB_ORDER_ASC; - } else if (pJoinNode->node.inputTsOrder == ORDER_DESC) { - pInfo->inputOrder = TSDB_ORDER_DESC; - } - - pInfo->pColEqualOnConditions = pJoinNode->pColEqCond; - if (pInfo->pColEqualOnConditions != NULL) { - pInfo->leftEqOnCondCols = taosArrayInit(4, sizeof(SColumn)); - pInfo->rightEqOnCondCols = taosArrayInit(4, sizeof(SColumn)); - extractEqualOnCondCols(pInfo, pInfo->pColEqualOnConditions, pInfo->leftEqOnCondCols, pInfo->rightEqOnCondCols); - initTagColskeyBuf(&pInfo->leftEqOnCondKeyLen, &pInfo->leftEqOnCondKeyBuf, pInfo->leftEqOnCondCols); - initTagColskeyBuf(&pInfo->rightEqOnCondKeyLen, &pInfo->rightEqOnCondKeyBuf, pInfo->rightEqOnCondCols); - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - pInfo->rightBuildTable = tSimpleHashInit(256, hashFn); - } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); - - code = appendDownstream(pOperator, pDownstream, numOfDownstream); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - if (newDownstreams) { - taosMemoryFree(pDownstream); - } - - pOperator->numOfRealDownstream = newDownstreams ? 1 : 2; - - return pOperator; - -_error: - if (pInfo != NULL) { - destroyMergeJoinOperator(pInfo); - } - if (newDownstreams) { - taosMemoryFree(pDownstream); - } - - taosMemoryFree(pOperator); - pTaskInfo->code = code; - return NULL; -} - -void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { - pColumn->slotId = pColumnNode->slotId; - pColumn->type = pColumnNode->node.resType.type; - pColumn->bytes = pColumnNode->node.resType.bytes; - pColumn->precision = pColumnNode->node.resType.precision; - pColumn->scale = pColumnNode->node.resType.scale; -} - -static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) { - void* p = NULL; - int32_t iter = 0; - - while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) { - SArray* rows = (*(SArray**)p); - taosArrayDestroy(rows); - } - - tSimpleHashCleanup(pBuildTable); -} - -void destroyMergeJoinOperator(void* param) { - SMJoinOperatorInfo* pJoinOperator = (SMJoinOperatorInfo*)param; - if (pJoinOperator->pColEqualOnConditions != NULL) { - mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable); - taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf); - taosArrayDestroy(pJoinOperator->rightEqOnCondCols); - - taosMemoryFreeClear(pJoinOperator->leftEqOnCondKeyBuf); - taosArrayDestroy(pJoinOperator->leftEqOnCondCols); - } - nodesDestroyNode(pJoinOperator->pCondAfterMerge); - - taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks); - taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks); - taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations); - taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations); - - pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); - taosMemoryFreeClear(param); -} - -static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow, - SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, - int32_t rightPos) { - SMJoinOperatorInfo* pJoinInfo = pOperator->info; - - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); - - SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i]; - - int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId; - int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId; - int32_t rowIndex = -1; - - SColumnInfoData* pSrc = NULL; - if (pLeftBlock->info.id.blockId == blockId) { - pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId); - rowIndex = leftPos; - } else { - pSrc = taosArrayGet(pRightBlock->pDataBlock, slotId); - rowIndex = rightPos; - } - - if (colDataIsNull_s(pSrc, rowIndex)) { - colDataSetNULL(pDst, currRow); - } else { - char* p = colDataGetData(pSrc, rowIndex); - colDataSetVal(pDst, currRow, p, false); - } - } -} -typedef struct SRowLocation { - SSDataBlock* pDataBlock; - int32_t pos; -} SRowLocation; - -// pBlock[tsSlotId][startPos, endPos) == timestamp, -static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp, - int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) { - int32_t numRows = pBlock->info.rows; - ASSERT(startPos < numRows); - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId); - - int32_t i = startPos; - for (; i < numRows; ++i) { - char* pNextVal = colDataGetData(pCol, i); - if (timestamp != *(int64_t*)pNextVal) { - break; - } - } - int32_t endPos = i; - *pEndPos = endPos; - - if (endPos - startPos == 0) { - return 0; - } - - SSDataBlock* block = pBlock; - bool createdNewBlock = false; - if (endPos == numRows) { - block = blockDataExtractBlock(pBlock, startPos, endPos - startPos); - taosArrayPush(createdBlocks, &block); - createdNewBlock = true; - } - SRowLocation location = {0}; - for (int32_t j = startPos; j < endPos; ++j) { - location.pDataBlock = block; - location.pos = (createdNewBlock ? j - startPos : j); - taosArrayPush(rowLocations, &location); - } - return 0; -} - -// whichChild == 0, left child of join; whichChild ==1, right child of join -static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId, - SSDataBlock* startDataBlock, int32_t startPos, - int64_t timestamp, SArray* rowLocations, - SArray* createdBlocks) { - ASSERT(whichChild == 0 || whichChild == 1); - - SMJoinOperatorInfo* pJoinInfo = pOperator->info; - int32_t endPos = -1; - SSDataBlock* dataBlock = startDataBlock; - mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks); - while (endPos == dataBlock->info.rows) { - SOperatorInfo* ds = pOperator->pDownstream[whichChild]; - dataBlock = getNextBlockFromDownstreamRemain(pOperator, whichChild); - qError("merge join %s got block for same ts, rows:%" PRId64, whichChild == 0 ? "left" : "right", dataBlock ? dataBlock->info.rows : 0); - if (whichChild == 0) { - pJoinInfo->leftPos = 0; - pJoinInfo->pLeft = dataBlock; - } else if (whichChild == 1) { - pJoinInfo->rightPos = 0; - pJoinInfo->pRight = dataBlock; - } - - if (dataBlock == NULL) { - pJoinInfo->downstreamFetchDone[whichChild] = true; - endPos = -1; - break; - } - - mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks); - } - if (endPos != -1) { - if (whichChild == 0) { - pJoinInfo->leftPos = endPos; - } else if (whichChild == 1) { - pJoinInfo->rightPos = endPos; - } - } - return 0; -} - -static int32_t mergeJoinFillBuildTable(SMJoinOperatorInfo* pInfo, SArray* rightRowLocations) { - for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { - SRowLocation* rightRow = taosArrayGet(rightRowLocations, i); - int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightEqOnCondCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightEqOnCondKeyBuf); - SArray** ppRows = tSimpleHashGet(pInfo->rightBuildTable, pInfo->rightEqOnCondKeyBuf, keyLen); - if (!ppRows) { - SArray* rows = taosArrayInit(4, sizeof(SRowLocation)); - taosArrayPush(rows, rightRow); - tSimpleHashPut(pInfo->rightBuildTable, pInfo->rightEqOnCondKeyBuf, keyLen, &rows, POINTER_BYTES); - } else { - taosArrayPush(*ppRows, rightRow); - } - } return TSDB_CODE_SUCCESS; } -static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows, - const SArray* leftRowLocations, int32_t leftRowIdx, - int32_t rightRowIdx, bool useBuildTableTSRange, SArray* rightRowLocations, bool* pReachThreshold) { - *pReachThreshold = false; - uint32_t limitRowNum = pOperator->resultInfo.threshold; - SMJoinOperatorInfo* pJoinInfo = pOperator->info; - size_t leftNumJoin = taosArrayGetSize(leftRowLocations); - - int32_t i,j; - - for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) { - SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); - SArray* pRightRows = NULL; - if (useBuildTableTSRange) { - int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftEqOnCondCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftEqOnCondKeyBuf); - SArray** ppRightRows = tSimpleHashGet(pJoinInfo->rightBuildTable, pJoinInfo->leftEqOnCondKeyBuf, keyLen); - if (!ppRightRows) { - continue; - } - pRightRows = *ppRightRows; - } else { - pRightRows = rightRowLocations; - } - size_t rightRowsSize = taosArrayGetSize(pRightRows); - for (j = rightRowIdx; j < rightRowsSize; ++j) { - if (*nRows >= limitRowNum) { - *pReachThreshold = true; - break; - } - - SRowLocation* rightRow = taosArrayGet(pRightRows, j); - mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock, - rightRow->pos); - ++*nRows; - } - if (*pReachThreshold) { - break; - } +static int32_t mJoinInitPrimKeyInfo(SMJoinTableInfo* pTable, int32_t slotId) { + pTable->primCol = taosMemoryMalloc(sizeof(SMJoinColInfo)); + if (NULL == pTable->primCol) { + return TSDB_CODE_OUT_OF_MEMORY; } - if (*pReachThreshold) { - pJoinInfo->rowCtx.rowRemains = true; - pJoinInfo->rowCtx.leftRowIdx = i; - pJoinInfo->rowCtx.rightRowIdx = j; - } + pTable->primCol->srcSlot = slotId; + return TSDB_CODE_SUCCESS; } -static void mergeJoinDestroyTSRangeCtx(SMJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks, - SArray* rightCreatedBlocks, bool rightUseBuildTable, SArray* rightRowLocations) { - for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { - SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); - blockDataDestroy(pBlock); - } - taosArrayDestroy(rightCreatedBlocks); - for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) { - SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); - blockDataDestroy(pBlock); - } - if (rightRowLocations != NULL) { - taosArrayDestroy(rightRowLocations); - } - if (rightUseBuildTable) { - void* p = NULL; - int32_t iter = 0; - while ((p = tSimpleHashIterate(pJoinInfo->rightBuildTable, p, &iter)) != NULL) { - SArray* rows = (*(SArray**)p); - taosArrayDestroy(rows); +static void mJoinGetValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) { + *colNum = 0; + + SNode* pNode = NULL; + FOREACH(pNode, pList) { + STargetNode* pTarget = (STargetNode*)pNode; + SColumnNode* pCol = (SColumnNode*)pTarget->pExpr; + if (pCol->dataBlockId == blkId) { + (*colNum)++; } - tSimpleHashClear(pJoinInfo->rightBuildTable); } - - taosArrayDestroy(leftCreatedBlocks); - taosArrayDestroy(leftRowLocations); - - pJoinInfo->rowCtx.rowRemains = false; - pJoinInfo->rowCtx.leftRowLocations = NULL; - pJoinInfo->rowCtx.leftCreatedBlocks = NULL; - pJoinInfo->rowCtx.rightCreatedBlocks = NULL; - pJoinInfo->rowCtx.rightUseBuildTable = false; - pJoinInfo->rowCtx.rightRowLocations = NULL; } -static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes, - int32_t* nRows) { - int32_t code = TSDB_CODE_SUCCESS; - SMJoinOperatorInfo* pJoinInfo = pOperator->info; - SArray* leftRowLocations = NULL; - SArray* rightRowLocations = NULL; - SArray* leftCreatedBlocks = NULL; - SArray* rightCreatedBlocks = NULL; - int32_t leftRowIdx = 0; - int32_t rightRowIdx = 0; - SSHashObj* rightTableHash = NULL; - bool rightUseBuildTable = false; - - if (pJoinInfo->rowCtx.rowRemains) { - leftRowLocations = pJoinInfo->rowCtx.leftRowLocations; - leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks; - rightUseBuildTable = pJoinInfo->rowCtx.rightUseBuildTable; - rightRowLocations = pJoinInfo->rowCtx.rightRowLocations; - rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks; - leftRowIdx = pJoinInfo->rowCtx.leftRowIdx; - rightRowIdx = pJoinInfo->rowCtx.rightRowIdx; - } else { - leftRowLocations = taosArrayInit(8, sizeof(SRowLocation)); - leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES); - - rightRowLocations = taosArrayInit(8, sizeof(SRowLocation)); - rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES); - - mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft, - pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); - mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, - pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); - if (pJoinInfo->pColEqualOnConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) { - mergeJoinFillBuildTable(pJoinInfo, rightRowLocations); - rightUseBuildTable = true; - taosArrayDestroy(rightRowLocations); - rightRowLocations = NULL; - } +static int32_t mJoinInitValColsInfo(SMJoinTableInfo* pTable, SNodeList* pList) { + mJoinGetValColNum(pList, pTable->blkId, &pTable->valNum); + if (pTable->valNum == 0) { + return TSDB_CODE_SUCCESS; } - size_t leftNumJoin = taosArrayGetSize(leftRowLocations); - code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.threshold); - if (code != TSDB_CODE_SUCCESS) { - qError("%s can not ensure block capacity for join. left: %zu", GET_TASKID(pOperator->pTaskInfo), - leftNumJoin); + pTable->valCols = taosMemoryMalloc(pTable->valNum * sizeof(SMJoinColInfo)); + if (NULL == pTable->valCols) { + return TSDB_CODE_OUT_OF_MEMORY; } - bool reachThreshold = false; - - if (code == TSDB_CODE_SUCCESS) { - mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx, - rightRowIdx, rightUseBuildTable, rightRowLocations, &reachThreshold); - } - - if (!reachThreshold) { - mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks, - rightUseBuildTable, rightRowLocations); - - } else { - pJoinInfo->rowCtx.rowRemains = true; - pJoinInfo->rowCtx.ts = timestamp; - pJoinInfo->rowCtx.leftRowLocations = leftRowLocations; - pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; - pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; - pJoinInfo->rowCtx.rightUseBuildTable = rightUseBuildTable; - pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; - } - return TSDB_CODE_SUCCESS; -} - -static void setMergeJoinDone(SOperatorInfo* pOperator) { - setOperatorCompleted(pOperator); - if (pOperator->pDownstreamGetParams) { - freeOperatorParam(pOperator->pDownstreamGetParams[0], OP_GET_PARAM); - freeOperatorParam(pOperator->pDownstreamGetParams[1], OP_GET_PARAM); - pOperator->pDownstreamGetParams[0] = NULL; - pOperator->pDownstreamGetParams[1] = NULL; - } -} - -static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { - SMJoinOperatorInfo* pJoinInfo = pOperator->info; - bool leftEmpty = false; - - if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { - if (!pJoinInfo->downstreamFetchDone[0]) { - pJoinInfo->pLeft = getNextBlockFromDownstreamRemain(pOperator, 0); - pJoinInfo->downstreamInitDone[0] = true; - - pJoinInfo->leftPos = 0; - qError("merge join left got block, rows:%" PRId64, pJoinInfo->pLeft ? pJoinInfo->pLeft->info.rows : 0); - } else { - pJoinInfo->pLeft = NULL; - } - - if (pJoinInfo->pLeft == NULL) { - if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstream && !pJoinInfo->downstreamInitDone[1]) { - leftEmpty = true; + int32_t i = 0; + int32_t colNum = 0; + SNode* pNode = NULL; + FOREACH(pNode, pList) { + STargetNode* pTarget = (STargetNode*)pNode; + SColumnNode* pColNode = (SColumnNode*)pTarget->pExpr; + if (pColNode->dataBlockId == pTable->blkId) { + if (valColInKeyCols(pColNode->slotId, pTable->keyNum, pTable->keyCols, &pTable->valCols[i].srcSlot)) { + pTable->valCols[i].keyCol = true; } else { - setMergeJoinDone(pOperator); - return false; + pTable->valCols[i].keyCol = false; + pTable->valCols[i].srcSlot = pColNode->slotId; + pTable->valColExist = true; + colNum++; } - } - } - - if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { - if (!pJoinInfo->downstreamFetchDone[1]) { - pJoinInfo->pRight = getNextBlockFromDownstreamRemain(pOperator, 1); - pJoinInfo->downstreamInitDone[1] = true; - - pJoinInfo->rightPos = 0; - qError("merge join right got block, rows:%" PRId64, pJoinInfo->pRight ? pJoinInfo->pRight->info.rows : 0); - } else { - pJoinInfo->pRight = NULL; - } - - if (pJoinInfo->pRight == NULL) { - setMergeJoinDone(pOperator); - return false; - } else { - if (leftEmpty) { - setMergeJoinDone(pOperator); - return false; + pTable->valCols[i].dstSlot = pTarget->slotId; + pTable->valCols[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); + if (pTable->valCols[i].vardata) { + if (NULL == pTable->valVarCols) { + pTable->valVarCols = taosArrayInit(pTable->valNum, sizeof(int32_t)); + if (NULL == pTable->valVarCols) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + taosArrayPush(pTable->valVarCols, &i); } + pTable->valCols[i].bytes = pColNode->node.resType.bytes; + if (!pTable->valCols[i].keyCol && !pTable->valCols[i].vardata) { + pTable->valBufSize += pColNode->node.resType.bytes; + } + i++; } } - if (NULL == pJoinInfo->pLeft || NULL == pJoinInfo->pRight) { - setMergeJoinDone(pOperator); - return false; - } - - // only the timestamp match support for ordinary table - SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId); - char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos); - *pLeftTs = *(int64_t*)pLeftVal; + pTable->valBitMapSize = BitmapLen(colNum); + pTable->valBufSize += pTable->valBitMapSize; - SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId); - char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos); - *pRightTs = *(int64_t*)pRightVal; - - return true; + return TSDB_CODE_SUCCESS; } -static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { +static int32_t mJoinInitKeyColsInfo(SMJoinTableInfo* pTable, SNodeList* pList) { + pTable->keyNum = LIST_LENGTH(pList); + + pTable->keyCols = taosMemoryMalloc(pTable->keyNum * sizeof(SMJoinColInfo)); + if (NULL == pTable->keyCols) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int64_t bufSize = 0; + int32_t i = 0; + SNode* pNode = NULL; + FOREACH(pNode, pList) { + SColumnNode* pColNode = (SColumnNode*)pNode; + pTable->keyCols[i].srcSlot = pColNode->slotId; + pTable->keyCols[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); + pTable->keyCols[i].bytes = pColNode->node.resType.bytes; + bufSize += pColNode->node.resType.bytes; + ++i; + } + + if (pTable->keyNum > 1) { + pTable->keyBuf = taosMemoryMalloc(bufSize); + if (NULL == pTable->keyBuf) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) { + SMJoinTableInfo* pTable = &pJoin->tbs[idx]; + pTable->downStream = pDownstream[idx]; + pTable->blkId = pDownstream[idx]->resultDataBlockId; + int32_t code = mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->LeftPrimSlotId : pJoinNode->rightPrimSlotId); + if (code) { + return code; + } + code = mJoinInitKeyColsInfo(pTable, (0 == idx) ? pJoinNode->pEqLeft : pJoinNode->pEqRight); + if (code) { + return code; + } + code = mJoinInitValColsInfo(pTable, pJoinNode->pTargets); + if (code) { + return code; + } + + memcpy(&pTable->inputStat, pStat, sizeof(*pStat)); + + return TSDB_CODE_SUCCESS; +} + +static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode) { + int32_t buildIdx = 0; + int32_t probeIdx = 1; + + pInfo->joinType = pJoinNode->joinType; + pInfo->subType = pJoinNode->subType; + + switch (pInfo->joinType) { + case JOIN_TYPE_INNER: + case JOIN_TYPE_FULL: + if (pInfo->tbs[0].inputStat.inputRowNum <= pInfo->tbs[1].inputStat.inputRowNum) { + buildIdx = 0; + probeIdx = 1; + } else { + buildIdx = 1; + probeIdx = 0; + } + break; + case JOIN_TYPE_LEFT: + buildIdx = 1; + probeIdx = 0; + break; + case JOIN_TYPE_RIGHT: + buildIdx = 0; + probeIdx = 1; + break; + default: + break; + } + + pInfo->joinFp = (pInfo->subType == JOIN_STYPE_ASOF || pInfo->subType == JOIN_STYPE_WIN) ? mJoinProcessWinJoin: mJoinProcessMergeJoin; + + pInfo->pBuild = &pInfo->tbs[buildIdx]; + pInfo->pProbe = &pInfo->tbs[probeIdx]; + + pInfo->pBuild->downStreamIdx = buildIdx; + pInfo->pProbe->downStreamIdx = probeIdx; +} + +static int32_t mJoinBuildResColMap(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode) { + pInfo->pResColNum = pJoinNode->pTargets->length; + pInfo->pResColMap = taosMemoryCalloc(pJoinNode->pTargets->length, sizeof(int8_t)); + if (NULL == pInfo->pResColMap) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SNode* pNode = NULL; + int32_t i = 0; + FOREACH(pNode, pJoinNode->pTargets) { + STargetNode* pTarget = (STargetNode*)pNode; + SColumnNode* pCol = (SColumnNode*)pTarget->pExpr; + if (pCol->dataBlockId == pInfo->pBuild->blkId) { + pInfo->pResColMap[i] = 1; + } + + i++; + } + + return TSDB_CODE_SUCCESS; +} + + +static FORCE_INLINE int32_t mJoinAddPageToBufList(SArray* pRowBufs) { + SBufPageInfo page; + page.pageSize = HASH_JOIN_DEFAULT_PAGE_SIZE; + page.offset = 0; + page.data = taosMemoryMalloc(page.pageSize); + if (NULL == page.data) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pRowBufs, &page); + return TSDB_CODE_SUCCESS; +} + +static int32_t mJoinInitBufPages(SMJoinOperatorInfo* pInfo) { + pInfo->pRowBufs = taosArrayInit(32, sizeof(SBufPageInfo)); + if (NULL == pInfo->pRowBufs) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + return mJoinAddPageToBufList(pInfo->pRowBufs); +} + +static SSDataBlock* mJoinHanleMergeJoin(SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoinInfo = pOperator->info; int32_t nrows = pRes->info.rows; @@ -774,33 +311,16 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) } } -void resetMergeJoinOperator(struct SOperatorInfo* pOperator) { - SMJoinOperatorInfo* pJoinInfo = pOperator->info; - if (pJoinInfo->rowCtx.rowRemains) { - mergeJoinDestroyTSRangeCtx(pJoinInfo, pJoinInfo->rowCtx.leftRowLocations, pJoinInfo->rowCtx.leftCreatedBlocks, pJoinInfo->rowCtx.rightCreatedBlocks, - pJoinInfo->rowCtx.rightUseBuildTable, pJoinInfo->rowCtx.rightRowLocations); - } - pJoinInfo->pLeft = NULL; - pJoinInfo->leftPos = 0; - pJoinInfo->pRight = NULL; - pJoinInfo->rightPos = 0; - pJoinInfo->downstreamFetchDone[0] = false; - pJoinInfo->downstreamFetchDone[1] = false; - pJoinInfo->downstreamInitDone[0] = false; - pJoinInfo->downstreamInitDone[1] = false; - pJoinInfo->resRows = 0; - pOperator->status = OP_OPENED; -} -SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { +SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoinInfo = pOperator->info; if (pOperator->status == OP_EXEC_DONE) { if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) { - qError("total merge join res rows:%" PRId64, pJoinInfo->resRows); + qDebug("%s total merge join res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pJoinInfo->resRows); return NULL; } else { resetMergeJoinOperator(pOperator); - qError("start new merge join"); + qDebug("%s start new round merge join", GET_TASKID(pOperator->pTaskInfo)); } } @@ -814,18 +334,15 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { while (true) { int32_t numOfRowsBefore = pRes->info.rows; - doMergeJoinImpl(pOperator, pRes); + mJoinImpl(pOperator, pRes); int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore; if (numOfNewRows == 0) { break; } - if (pOperator->exprSupp.pFilterInfo != NULL) { - doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); + if (pJoinInfo->pFinFilter != NULL) { + doFilter(pRes, pJoinInfo->pFinFilter, NULL); } - if (pRes->info.rows >= pOperator->resultInfo.threshold) { - break; - } - if (pOperator->status == OP_EXEC_DONE) { + if (pRes->info.rows > 0 || pOperator->status == OP_EXEC_DONE) { break; } } @@ -836,13 +353,122 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { if (pRes->info.rows > 0) { pJoinInfo->resRows += pRes->info.rows; - qError("merge join returns res rows:%" PRId64, pRes->info.rows); + qDebug("%s merge join returns res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pRes->info.rows); return pRes; } else { - qError("total merge join res rows:%" PRId64, pJoinInfo->resRows); + qDebug("%s total merge join res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pJoinInfo->resRows); return NULL; } } +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, + SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { + SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + bool newDownstreams = false; + + int32_t code = TSDB_CODE_SUCCESS; + if (pOperator == NULL || pInfo == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + code = mJoinInitDownstreamInfo(pInfo, pDownstream, numOfDownstream, newDownstreams); + if (TSDB_CODE_SUCCESS != code) { + goto _error; + } + + int32_t numOfCols = 0; + pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + initResultSizeInfo(&pOperator->resultInfo, 4096); + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + + setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + + mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); + mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); + + mJoinSetBuildAndProbeTable(pInfo, pJoinNode); + + code = mJoinBuildResColMap(pInfo, pJoinNode); + if (code) { + goto _error; + } + + code = initHJoinBufPages(pInfo); + if (code) { + goto _error; + } + + if (pJoinNode->pColOnCond != NULL) { + code = filterInitFromNode(pJoinNode->pColOnCond, &pInfo->pPreFilter, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + + if (pJoinNode->node.pConditions != NULL) { + code = filterInitFromNode(pJoinNode->node.pConditions, &pInfo->pFinFilter, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + + if (pJoinNode->node.inputTsOrder == ORDER_ASC) { + pInfo->inputTsOrder = TSDB_ORDER_ASC; + } else if (pJoinNode->node.inputTsOrder == ORDER_DESC) { + pInfo->inputTsOrder = TSDB_ORDER_DESC; + } else { + pInfo->inputTsOrder = TSDB_ORDER_ASC; + } + + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + + code = appendDownstream(pOperator, pDownstream, numOfDownstream); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + if (newDownstreams) { + taosMemoryFree(pDownstream); + pOperator->numOfRealDownstream = 1; + } else { + pOperator->numOfRealDownstream = 2; + } + + return pOperator; + +_error: + if (pInfo != NULL) { + destroyMergeJoinOperator(pInfo); + } + if (newDownstreams) { + taosMemoryFree(pDownstream); + } + + taosMemoryFree(pOperator); + pTaskInfo->code = code; + return NULL; +} + +void destroyMergeJoinOperator(void* param) { + SMJoinOperatorInfo* pJoinOperator = (SMJoinOperatorInfo*)param; + if (pJoinOperator->pColEqualOnConditions != NULL) { + mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable); + taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf); + taosArrayDestroy(pJoinOperator->rightEqOnCondCols); + + taosMemoryFreeClear(pJoinOperator->leftEqOnCondKeyBuf); + taosArrayDestroy(pJoinOperator->leftEqOnCondCols); + } + nodesDestroyNode(pJoinOperator->pCondAfterMerge); + + taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks); + taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks); + taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations); + taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations); + + pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); + taosMemoryFreeClear(param); +} diff --git a/source/libs/executor/src/mergejoinoperator_old.c b/source/libs/executor/src/mergejoinoperator_old.c new file mode 100755 index 0000000000..5e74edc47f --- /dev/null +++ b/source/libs/executor/src/mergejoinoperator_old.c @@ -0,0 +1,848 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executorInt.h" +#include "filter.h" +#include "function.h" +#include "operator.h" +#include "os.h" +#include "querynodes.h" +#include "querytask.h" +#include "tcompare.h" +#include "tdatablock.h" +#include "thash.h" +#include "tmsg.h" +#include "ttypes.h" + +typedef struct SMJoinRowCtx { + bool rowRemains; + int64_t ts; + SArray* leftRowLocations; + SArray* leftCreatedBlocks; + SArray* rightCreatedBlocks; + int32_t leftRowIdx; + int32_t rightRowIdx; + + bool rightUseBuildTable; + SArray* rightRowLocations; +} SMJoinRowCtx; + +typedef struct SMJoinOperatorInfo { + SSDataBlock* pRes; + int32_t joinType; + int32_t inputOrder; + bool downstreamInitDone[2]; + bool downstreamFetchDone[2]; + int16_t downstreamResBlkId[2]; + + SSDataBlock* pLeft; + int32_t leftPos; + SColumnInfo leftCol; + + SSDataBlock* pRight; + int32_t rightPos; + SColumnInfo rightCol; + SNode* pCondAfterMerge; + SNode* pColEqualOnConditions; + + SArray* leftEqOnCondCols; + char* leftEqOnCondKeyBuf; + int32_t leftEqOnCondKeyLen; + + SArray* rightEqOnCondCols; + char* rightEqOnCondKeyBuf; + int32_t rightEqOnCondKeyLen; + + SSHashObj* rightBuildTable; + SMJoinRowCtx rowCtx; + + int64_t resRows; +} SMJoinOperatorInfo; + +static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); +static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); +static void destroyMergeJoinOperator(void* param); +static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr); + +static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) { + SNode* pPrimKeyCond = pJoinNode->pPrimKeyCond; + if (nodeType(pPrimKeyCond) != QUERY_NODE_OPERATOR) { + qError("not support this in join operator, %s", idStr); + return; // do not handle this + } + + SOperatorNode* pNode = (SOperatorNode*)pPrimKeyCond; + SColumnNode* col1 = (SColumnNode*)pNode->pLeft; + SColumnNode* col2 = (SColumnNode*)pNode->pRight; + SColumnNode* leftTsCol = NULL; + SColumnNode* rightTsCol = NULL; + if (col1->dataBlockId == col2->dataBlockId) { + leftTsCol = col1; + rightTsCol = col2; + } else { + if (col1->dataBlockId == pInfo->downstreamResBlkId[0]) { + ASSERT(col2->dataBlockId == pInfo->downstreamResBlkId[1]); + leftTsCol = col1; + rightTsCol = col2; + } else { + ASSERT(col1->dataBlockId == pInfo->downstreamResBlkId[1]); + ASSERT(col2->dataBlockId == pInfo->downstreamResBlkId[0]); + leftTsCol = col2; + rightTsCol = col1; + } + } + setJoinColumnInfo(&pInfo->leftCol, leftTsCol); + setJoinColumnInfo(&pInfo->rightCol, rightTsCol); +} + +static void extractEqualOnCondColsFromOper(SMJoinOperatorInfo* pInfo, SOperatorNode* pOperNode, + SColumn* pLeft, SColumn* pRight) { + SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft; + SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight; + if (pLeftNode->dataBlockId == pRightNode->dataBlockId || pLeftNode->dataBlockId == pInfo->downstreamResBlkId[0]) { + *pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft); + *pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight); + } else { + *pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight); + *pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft); + } +} + +static void extractEqualOnCondCols(SMJoinOperatorInfo* pInfo, SNode* pEqualOnCondNode, + SArray* leftTagEqCols, SArray* rightTagEqCols) { + SColumn left = {0}; + SColumn right = {0}; + if (nodeType(pEqualOnCondNode) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)pEqualOnCondNode)->condType == LOGIC_COND_TYPE_AND) { + SNode* pNode = NULL; + FOREACH(pNode, ((SLogicConditionNode*)pEqualOnCondNode)->pParameterList) { + SOperatorNode* pOperNode = (SOperatorNode*)pNode; + extractEqualOnCondColsFromOper(pInfo, pOperNode, &left, &right); + taosArrayPush(leftTagEqCols, &left); + taosArrayPush(rightTagEqCols, &right); + } + return; + } + + if (nodeType(pEqualOnCondNode) == QUERY_NODE_OPERATOR) { + SOperatorNode* pOperNode = (SOperatorNode*)pEqualOnCondNode; + extractEqualOnCondColsFromOper(pInfo, pOperNode, &left, &right); + taosArrayPush(leftTagEqCols, &left); + taosArrayPush(rightTagEqCols, &right); + } +} + +static int32_t initTagColskeyBuf(int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { + int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i); + (*keyLen) += pCol->bytes; // actual data + null_flag + } + + int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; + (*keyLen) += nullFlagSize; + + if (*keyLen >= 0) { + + (*keyBuf) = taosMemoryCalloc(1, (*keyLen)); + if ((*keyBuf) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t fillKeyBufFromTagCols(SArray* pCols, SSDataBlock* pBlock, int32_t rowIndex, void* pKey) { + SColumnDataAgg* pColAgg = NULL; + size_t numOfGroupCols = taosArrayGetSize(pCols); + char* isNull = (char*)pKey; + char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols; + + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SColumn* pCol = (SColumn*) taosArrayGet(pCols, i); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); + + // valid range check. todo: return error code. + if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) { + continue; + } + + if (pBlock->pBlockAgg != NULL) { + pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? + } + + if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { + isNull[i] = 1; + } else { + isNull[i] = 0; + char* val = colDataGetData(pColInfoData, rowIndex); + if (pCol->type == TSDB_DATA_TYPE_JSON) { + int32_t dataLen = getJsonValueLen(val); + memcpy(pStart, val, dataLen); + pStart += dataLen; + } else if (IS_VAR_DATA_TYPE(pCol->type)) { + varDataCopy(pStart, val); + pStart += varDataTLen(val); + } else { + memcpy(pStart, val, pCol->bytes); + pStart += pCol->bytes; + } + } + } + return (int32_t)(pStart - (char*)pKey); +} + +SOperatorInfo** buildMergeJoinDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) { + SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES); + if (p) { + p[0] = pDownstream[0]; + p[1] = pDownstream[0]; + pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(p[0], 0); + pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(p[1], 1); + } + + return p; +} + + +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, + SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { + SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + bool newDownstreams = false; + + int32_t code = TSDB_CODE_SUCCESS; + if (pOperator == NULL || pInfo == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + if (1 == numOfDownstream) { + newDownstreams = true; + pDownstream = buildMergeJoinDownstreams(pInfo, pDownstream); + if (NULL == pDownstream) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + numOfDownstream = 2; + } else { + pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(pDownstream[0], 0); + pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(pDownstream[1], 0); + } + + int32_t numOfCols = 0; + pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + + SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); + initResultSizeInfo(&pOperator->resultInfo, 4096); + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + + setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->exprSupp.numOfExprs = numOfCols; + + extractTimeCondition(pInfo, pJoinNode, GET_TASKID(pTaskInfo)); + + if (pJoinNode->pFullOnCond != NULL && pJoinNode->node.pConditions != NULL) { + pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + if (pInfo->pCondAfterMerge == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge); + pLogicCond->pParameterList = nodesMakeList(); + if (pLogicCond->pParameterList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pFullOnCond)); + nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); + pLogicCond->condType = LOGIC_COND_TYPE_AND; + } else if (pJoinNode->pFullOnCond != NULL) { + pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pFullOnCond); + } else if (pJoinNode->pColEqCond != NULL) { + pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pColEqCond); + } else if (pJoinNode->node.pConditions != NULL) { + pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->node.pConditions); + } else { + pInfo->pCondAfterMerge = NULL; + } + + code = filterInitFromNode(pInfo->pCondAfterMerge, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pInfo->inputOrder = TSDB_ORDER_ASC; + if (pJoinNode->node.inputTsOrder == ORDER_ASC) { + pInfo->inputOrder = TSDB_ORDER_ASC; + } else if (pJoinNode->node.inputTsOrder == ORDER_DESC) { + pInfo->inputOrder = TSDB_ORDER_DESC; + } + + pInfo->pColEqualOnConditions = pJoinNode->pColEqCond; + if (pInfo->pColEqualOnConditions != NULL) { + pInfo->leftEqOnCondCols = taosArrayInit(4, sizeof(SColumn)); + pInfo->rightEqOnCondCols = taosArrayInit(4, sizeof(SColumn)); + extractEqualOnCondCols(pInfo, pInfo->pColEqualOnConditions, pInfo->leftEqOnCondCols, pInfo->rightEqOnCondCols); + initTagColskeyBuf(&pInfo->leftEqOnCondKeyLen, &pInfo->leftEqOnCondKeyBuf, pInfo->leftEqOnCondCols); + initTagColskeyBuf(&pInfo->rightEqOnCondKeyLen, &pInfo->rightEqOnCondKeyBuf, pInfo->rightEqOnCondCols); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->rightBuildTable = tSimpleHashInit(256, hashFn); + } + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + + code = appendDownstream(pOperator, pDownstream, numOfDownstream); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + if (newDownstreams) { + taosMemoryFree(pDownstream); + } + + pOperator->numOfRealDownstream = newDownstreams ? 1 : 2; + + return pOperator; + +_error: + if (pInfo != NULL) { + destroyMergeJoinOperator(pInfo); + } + if (newDownstreams) { + taosMemoryFree(pDownstream); + } + + taosMemoryFree(pOperator); + pTaskInfo->code = code; + return NULL; +} + +void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { + pColumn->slotId = pColumnNode->slotId; + pColumn->type = pColumnNode->node.resType.type; + pColumn->bytes = pColumnNode->node.resType.bytes; + pColumn->precision = pColumnNode->node.resType.precision; + pColumn->scale = pColumnNode->node.resType.scale; +} + +static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) { + void* p = NULL; + int32_t iter = 0; + + while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) { + SArray* rows = (*(SArray**)p); + taosArrayDestroy(rows); + } + + tSimpleHashCleanup(pBuildTable); +} + +void destroyMergeJoinOperator(void* param) { + SMJoinOperatorInfo* pJoinOperator = (SMJoinOperatorInfo*)param; + if (pJoinOperator->pColEqualOnConditions != NULL) { + mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable); + taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf); + taosArrayDestroy(pJoinOperator->rightEqOnCondCols); + + taosMemoryFreeClear(pJoinOperator->leftEqOnCondKeyBuf); + taosArrayDestroy(pJoinOperator->leftEqOnCondCols); + } + nodesDestroyNode(pJoinOperator->pCondAfterMerge); + + taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks); + taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks); + taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations); + taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations); + + pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); + taosMemoryFreeClear(param); +} + +static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow, + SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, + int32_t rightPos) { + SMJoinOperatorInfo* pJoinInfo = pOperator->info; + + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); + + SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i]; + + int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId; + int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId; + int32_t rowIndex = -1; + + SColumnInfoData* pSrc = NULL; + if (pLeftBlock->info.id.blockId == blockId) { + pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId); + rowIndex = leftPos; + } else { + pSrc = taosArrayGet(pRightBlock->pDataBlock, slotId); + rowIndex = rightPos; + } + + if (colDataIsNull_s(pSrc, rowIndex)) { + colDataSetNULL(pDst, currRow); + } else { + char* p = colDataGetData(pSrc, rowIndex); + colDataSetVal(pDst, currRow, p, false); + } + } +} +typedef struct SRowLocation { + SSDataBlock* pDataBlock; + int32_t pos; +} SRowLocation; + +// pBlock[tsSlotId][startPos, endPos) == timestamp, +static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp, + int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) { + int32_t numRows = pBlock->info.rows; + ASSERT(startPos < numRows); + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId); + + int32_t i = startPos; + for (; i < numRows; ++i) { + char* pNextVal = colDataGetData(pCol, i); + if (timestamp != *(int64_t*)pNextVal) { + break; + } + } + int32_t endPos = i; + *pEndPos = endPos; + + if (endPos - startPos == 0) { + return 0; + } + + SSDataBlock* block = pBlock; + bool createdNewBlock = false; + if (endPos == numRows) { + block = blockDataExtractBlock(pBlock, startPos, endPos - startPos); + taosArrayPush(createdBlocks, &block); + createdNewBlock = true; + } + SRowLocation location = {0}; + for (int32_t j = startPos; j < endPos; ++j) { + location.pDataBlock = block; + location.pos = (createdNewBlock ? j - startPos : j); + taosArrayPush(rowLocations, &location); + } + return 0; +} + +// whichChild == 0, left child of join; whichChild ==1, right child of join +static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId, + SSDataBlock* startDataBlock, int32_t startPos, + int64_t timestamp, SArray* rowLocations, + SArray* createdBlocks) { + ASSERT(whichChild == 0 || whichChild == 1); + + SMJoinOperatorInfo* pJoinInfo = pOperator->info; + int32_t endPos = -1; + SSDataBlock* dataBlock = startDataBlock; + mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks); + while (endPos == dataBlock->info.rows) { + SOperatorInfo* ds = pOperator->pDownstream[whichChild]; + dataBlock = getNextBlockFromDownstreamRemain(pOperator, whichChild); + qError("merge join %s got block for same ts, rows:%" PRId64, whichChild == 0 ? "left" : "right", dataBlock ? dataBlock->info.rows : 0); + if (whichChild == 0) { + pJoinInfo->leftPos = 0; + pJoinInfo->pLeft = dataBlock; + } else if (whichChild == 1) { + pJoinInfo->rightPos = 0; + pJoinInfo->pRight = dataBlock; + } + + if (dataBlock == NULL) { + pJoinInfo->downstreamFetchDone[whichChild] = true; + endPos = -1; + break; + } + + mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks); + } + if (endPos != -1) { + if (whichChild == 0) { + pJoinInfo->leftPos = endPos; + } else if (whichChild == 1) { + pJoinInfo->rightPos = endPos; + } + } + return 0; +} + +static int32_t mergeJoinFillBuildTable(SMJoinOperatorInfo* pInfo, SArray* rightRowLocations) { + for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { + SRowLocation* rightRow = taosArrayGet(rightRowLocations, i); + int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightEqOnCondCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightEqOnCondKeyBuf); + SArray** ppRows = tSimpleHashGet(pInfo->rightBuildTable, pInfo->rightEqOnCondKeyBuf, keyLen); + if (!ppRows) { + SArray* rows = taosArrayInit(4, sizeof(SRowLocation)); + taosArrayPush(rows, rightRow); + tSimpleHashPut(pInfo->rightBuildTable, pInfo->rightEqOnCondKeyBuf, keyLen, &rows, POINTER_BYTES); + } else { + taosArrayPush(*ppRows, rightRow); + } + } + return TSDB_CODE_SUCCESS; +} + +static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows, + const SArray* leftRowLocations, int32_t leftRowIdx, + int32_t rightRowIdx, bool useBuildTableTSRange, SArray* rightRowLocations, bool* pReachThreshold) { + *pReachThreshold = false; + uint32_t limitRowNum = pOperator->resultInfo.threshold; + SMJoinOperatorInfo* pJoinInfo = pOperator->info; + size_t leftNumJoin = taosArrayGetSize(leftRowLocations); + + int32_t i,j; + + for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) { + SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); + SArray* pRightRows = NULL; + if (useBuildTableTSRange) { + int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftEqOnCondCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftEqOnCondKeyBuf); + SArray** ppRightRows = tSimpleHashGet(pJoinInfo->rightBuildTable, pJoinInfo->leftEqOnCondKeyBuf, keyLen); + if (!ppRightRows) { + continue; + } + pRightRows = *ppRightRows; + } else { + pRightRows = rightRowLocations; + } + size_t rightRowsSize = taosArrayGetSize(pRightRows); + for (j = rightRowIdx; j < rightRowsSize; ++j) { + if (*nRows >= limitRowNum) { + *pReachThreshold = true; + break; + } + + SRowLocation* rightRow = taosArrayGet(pRightRows, j); + mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock, + rightRow->pos); + ++*nRows; + } + if (*pReachThreshold) { + break; + } + } + + if (*pReachThreshold) { + pJoinInfo->rowCtx.rowRemains = true; + pJoinInfo->rowCtx.leftRowIdx = i; + pJoinInfo->rowCtx.rightRowIdx = j; + } + return TSDB_CODE_SUCCESS; +} + +static void mergeJoinDestroyTSRangeCtx(SMJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks, + SArray* rightCreatedBlocks, bool rightUseBuildTable, SArray* rightRowLocations) { + for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); + blockDataDestroy(pBlock); + } + taosArrayDestroy(rightCreatedBlocks); + for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); + blockDataDestroy(pBlock); + } + if (rightRowLocations != NULL) { + taosArrayDestroy(rightRowLocations); + } + if (rightUseBuildTable) { + void* p = NULL; + int32_t iter = 0; + while ((p = tSimpleHashIterate(pJoinInfo->rightBuildTable, p, &iter)) != NULL) { + SArray* rows = (*(SArray**)p); + taosArrayDestroy(rows); + } + tSimpleHashClear(pJoinInfo->rightBuildTable); + } + + taosArrayDestroy(leftCreatedBlocks); + taosArrayDestroy(leftRowLocations); + + pJoinInfo->rowCtx.rowRemains = false; + pJoinInfo->rowCtx.leftRowLocations = NULL; + pJoinInfo->rowCtx.leftCreatedBlocks = NULL; + pJoinInfo->rowCtx.rightCreatedBlocks = NULL; + pJoinInfo->rowCtx.rightUseBuildTable = false; + pJoinInfo->rowCtx.rightRowLocations = NULL; +} + +static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes, + int32_t* nRows) { + int32_t code = TSDB_CODE_SUCCESS; + SMJoinOperatorInfo* pJoinInfo = pOperator->info; + SArray* leftRowLocations = NULL; + SArray* rightRowLocations = NULL; + SArray* leftCreatedBlocks = NULL; + SArray* rightCreatedBlocks = NULL; + int32_t leftRowIdx = 0; + int32_t rightRowIdx = 0; + SSHashObj* rightTableHash = NULL; + bool rightUseBuildTable = false; + + if (pJoinInfo->rowCtx.rowRemains) { + leftRowLocations = pJoinInfo->rowCtx.leftRowLocations; + leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks; + rightUseBuildTable = pJoinInfo->rowCtx.rightUseBuildTable; + rightRowLocations = pJoinInfo->rowCtx.rightRowLocations; + rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks; + leftRowIdx = pJoinInfo->rowCtx.leftRowIdx; + rightRowIdx = pJoinInfo->rowCtx.rightRowIdx; + } else { + leftRowLocations = taosArrayInit(8, sizeof(SRowLocation)); + leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES); + + rightRowLocations = taosArrayInit(8, sizeof(SRowLocation)); + rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES); + + mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft, + pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); + mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, + pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); + if (pJoinInfo->pColEqualOnConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) { + mergeJoinFillBuildTable(pJoinInfo, rightRowLocations); + rightUseBuildTable = true; + taosArrayDestroy(rightRowLocations); + rightRowLocations = NULL; + } + } + + size_t leftNumJoin = taosArrayGetSize(leftRowLocations); + code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.threshold); + if (code != TSDB_CODE_SUCCESS) { + qError("%s can not ensure block capacity for join. left: %zu", GET_TASKID(pOperator->pTaskInfo), + leftNumJoin); + } + + bool reachThreshold = false; + + if (code == TSDB_CODE_SUCCESS) { + mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx, + rightRowIdx, rightUseBuildTable, rightRowLocations, &reachThreshold); + } + + if (!reachThreshold) { + mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks, + rightUseBuildTable, rightRowLocations); + + } else { + pJoinInfo->rowCtx.rowRemains = true; + pJoinInfo->rowCtx.ts = timestamp; + pJoinInfo->rowCtx.leftRowLocations = leftRowLocations; + pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; + pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; + pJoinInfo->rowCtx.rightUseBuildTable = rightUseBuildTable; + pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; + } + return TSDB_CODE_SUCCESS; +} + +static void setMergeJoinDone(SOperatorInfo* pOperator) { + setOperatorCompleted(pOperator); + if (pOperator->pDownstreamGetParams) { + freeOperatorParam(pOperator->pDownstreamGetParams[0], OP_GET_PARAM); + freeOperatorParam(pOperator->pDownstreamGetParams[1], OP_GET_PARAM); + pOperator->pDownstreamGetParams[0] = NULL; + pOperator->pDownstreamGetParams[1] = NULL; + } +} + +static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { + SMJoinOperatorInfo* pJoinInfo = pOperator->info; + bool leftEmpty = false; + + if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { + if (!pJoinInfo->downstreamFetchDone[0]) { + pJoinInfo->pLeft = getNextBlockFromDownstreamRemain(pOperator, 0); + pJoinInfo->downstreamInitDone[0] = true; + + pJoinInfo->leftPos = 0; + qError("merge join left got block, rows:%" PRId64, pJoinInfo->pLeft ? pJoinInfo->pLeft->info.rows : 0); + } else { + pJoinInfo->pLeft = NULL; + } + + if (pJoinInfo->pLeft == NULL) { + if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstream && !pJoinInfo->downstreamInitDone[1]) { + leftEmpty = true; + } else { + setMergeJoinDone(pOperator); + return false; + } + } + } + + if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { + if (!pJoinInfo->downstreamFetchDone[1]) { + pJoinInfo->pRight = getNextBlockFromDownstreamRemain(pOperator, 1); + pJoinInfo->downstreamInitDone[1] = true; + + pJoinInfo->rightPos = 0; + qError("merge join right got block, rows:%" PRId64, pJoinInfo->pRight ? pJoinInfo->pRight->info.rows : 0); + } else { + pJoinInfo->pRight = NULL; + } + + if (pJoinInfo->pRight == NULL) { + setMergeJoinDone(pOperator); + return false; + } else { + if (leftEmpty) { + setMergeJoinDone(pOperator); + return false; + } + } + } + + if (NULL == pJoinInfo->pLeft || NULL == pJoinInfo->pRight) { + setMergeJoinDone(pOperator); + return false; + } + + // only the timestamp match support for ordinary table + SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId); + char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos); + *pLeftTs = *(int64_t*)pLeftVal; + + SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId); + char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos); + *pRightTs = *(int64_t*)pRightVal; + + return true; +} + +static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { + SMJoinOperatorInfo* pJoinInfo = pOperator->info; + + int32_t nrows = pRes->info.rows; + + bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false; + + while (1) { + int64_t leftTs = 0; + int64_t rightTs = 0; + if (pJoinInfo->rowCtx.rowRemains) { + leftTs = pJoinInfo->rowCtx.ts; + rightTs = pJoinInfo->rowCtx.ts; + } else { + bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); + if (!hasNextTs) { + break; + } + } + + if (leftTs == rightTs) { + mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows); + } else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) { + pJoinInfo->leftPos += 1; + + if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) { + continue; + } + } else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) { + pJoinInfo->rightPos += 1; + if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) { + continue; + } + } + + // the pDataBlock are always the same one, no need to call this again + pRes->info.rows = nrows; + pRes->info.dataLoad = 1; + pRes->info.scanFlag = MAIN_SCAN; + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + break; + } + } +} + +void resetMergeJoinOperator(struct SOperatorInfo* pOperator) { + SMJoinOperatorInfo* pJoinInfo = pOperator->info; + if (pJoinInfo->rowCtx.rowRemains) { + mergeJoinDestroyTSRangeCtx(pJoinInfo, pJoinInfo->rowCtx.leftRowLocations, pJoinInfo->rowCtx.leftCreatedBlocks, pJoinInfo->rowCtx.rightCreatedBlocks, + pJoinInfo->rowCtx.rightUseBuildTable, pJoinInfo->rowCtx.rightRowLocations); + } + pJoinInfo->pLeft = NULL; + pJoinInfo->leftPos = 0; + pJoinInfo->pRight = NULL; + pJoinInfo->rightPos = 0; + pJoinInfo->downstreamFetchDone[0] = false; + pJoinInfo->downstreamFetchDone[1] = false; + pJoinInfo->downstreamInitDone[0] = false; + pJoinInfo->downstreamInitDone[1] = false; + pJoinInfo->resRows = 0; + pOperator->status = OP_OPENED; +} + +SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { + SMJoinOperatorInfo* pJoinInfo = pOperator->info; + if (pOperator->status == OP_EXEC_DONE) { + if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) { + qError("total merge join res rows:%" PRId64, pJoinInfo->resRows); + return NULL; + } else { + resetMergeJoinOperator(pOperator); + qError("start new merge join"); + } + } + + int64_t st = 0; + if (pOperator->cost.openCost == 0) { + st = taosGetTimestampUs(); + } + + SSDataBlock* pRes = pJoinInfo->pRes; + blockDataCleanup(pRes); + + while (true) { + int32_t numOfRowsBefore = pRes->info.rows; + doMergeJoinImpl(pOperator, pRes); + int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore; + if (numOfNewRows == 0) { + break; + } + if (pOperator->exprSupp.pFilterInfo != NULL) { + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); + } + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + break; + } + if (pOperator->status == OP_EXEC_DONE) { + break; + } + } + + if (pOperator->cost.openCost == 0) { + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + } + + if (pRes->info.rows > 0) { + pJoinInfo->resRows += pRes->info.rows; + qError("merge join returns res rows:%" PRId64, pRes->info.rows); + return pRes; + } else { + qError("total merge join res rows:%" PRId64, pJoinInfo->resRows); + return NULL; + } +} + + + diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index 1df71cce2c..7cae5ec97a 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -412,16 +412,14 @@ void nodesRewriteExprsPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* (void)rewriteExprs(pList, TRAVERSAL_POSTORDER, rewriter, pContext); } -void nodesWalkSelectStmtImpl(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext, bool ignoreFrom) { +void nodesWalkSelectStmtImpl(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext) { if (NULL == pSelect) { return; } switch (clause) { case SQL_CLAUSE_FROM: - if (!ignoreFrom) { - nodesWalkExpr(pSelect->pFromTable, walker, pContext); - } + nodesWalkExpr(pSelect->pFromTable, walker, pContext); nodesWalkExpr(pSelect->pWhere, walker, pContext); case SQL_CLAUSE_WHERE: nodesWalkExprs(pSelect->pPartitionByList, walker, pContext); @@ -451,7 +449,7 @@ void nodesWalkSelectStmtImpl(SSelectStmt* pSelect, ESqlClause clause, FNodeWalke } void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext) { - nodesWalkSelectStmtImpl(pSelect, clause, walker, pContext, false); + nodesWalkSelectStmtImpl(pSelect, clause, walker, pContext); } void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext) { diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index e940fbb627..7c5ea785ef 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -2000,6 +2000,7 @@ bool nodesIsBitwiseOp(const SOperatorNode* pOp) { typedef struct SCollectColumnsCxt { int32_t errCode; const char* pTableAlias; + SSHashObj* pMultiTableAlias; ECollectColType collectType; SNodeList* pCols; SHashObj* pColHash; @@ -2041,6 +2042,19 @@ static EDealRes collectColumns(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } +static EDealRes collectColumnsExt(SNode* pNode, void* pContext) { + SCollectColumnsCxt* pCxt = (SCollectColumnsCxt*)pContext; + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + SColumnNode* pCol = (SColumnNode*)pNode; + if (isCollectType(pCxt->collectType, pCol->colType) && 0 != strcmp(pCol->colName, "*") && + (NULL == pCxt->pMultiTableAlias || NULL != (pCxt->pTableAlias = tSimpleHashGet(pCxt->pMultiTableAlias, pCol->tableAlias, strlen(pCol->tableAlias))))) { + return doCollect(pCxt, pCol, pNode); + } + } + return DEAL_RES_CONTINUE; +} + + int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, ECollectColType type, SNodeList** pCols) { if (NULL == pSelect || NULL == pCols) { @@ -2072,15 +2086,16 @@ int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* return TSDB_CODE_SUCCESS; } -int32_t nodesCollectColumnsExt(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, ECollectColType type, - SNodeList** pCols, bool ignoreFrom) { +int32_t nodesCollectColumnsExt(SSelectStmt* pSelect, ESqlClause clause, SSHashObj* pMultiTableAlias, ECollectColType type, + SNodeList** pCols) { if (NULL == pSelect || NULL == pCols) { return TSDB_CODE_FAILED; } SCollectColumnsCxt cxt = { .errCode = TSDB_CODE_SUCCESS, - .pTableAlias = pTableAlias, + .pTableAlias = NULL, + .pMultiTableAlias = pMultiTableAlias, .collectType = type, .pCols = (NULL == *pCols ? nodesMakeList() : *pCols), .pColHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK)}; @@ -2088,7 +2103,7 @@ int32_t nodesCollectColumnsExt(SSelectStmt* pSelect, ESqlClause clause, const ch return TSDB_CODE_OUT_OF_MEMORY; } *pCols = NULL; - nodesWalkSelectStmtImpl(pSelect, clause, collectColumns, &cxt, ignoreFrom); + nodesWalkSelectStmtImpl(pSelect, clause, collectColumnsExt, &cxt); taosHashCleanup(cxt.pColHash); if (TSDB_CODE_SUCCESS != cxt.errCode) { nodesDestroyList(cxt.pCols); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index bbd2d51273..87cc42c06f 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -503,19 +503,18 @@ static int32_t createSubqueryLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe return createQueryLogicNode(pCxt, pTable->pSubquery, pLogicNode); } +int32_t collectJoinResColumns(SSelectStmt* pSelect, SJoinLogicNode* pJoin, SNodeList** pCols) { + SSHashObj* pTables = NULL; + collectTableAliasFromNodes(nodesListGetNode(pJoin->node.pChildren, 0), &pTables); + collectTableAliasFromNodes(nodesListGetNode(pJoin->node.pChildren, 1), &pTables); + int32_t code = nodesCollectColumnsExt(pSelect, SQL_CLAUSE_WHERE, pTables, COLLECT_COL_TYPE_ALL, pCols); -int32_t collectJoinResColumns(SSelectStmt* pSelect, SJoinTableNode* pJoinTable, SNodeList** pCols) { - int32_t code = TSDB_CODE_SUCCESS; - - if (TSDB_CODE_SUCCESS == code) { - code = nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, ((STableNode*)pJoinTable->pRight)->tableAlias, COLLECT_COL_TYPE_ALL, pCols); - } + tSimpleHashCleanup(pTables); return code; } - static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SJoinTableNode* pJoinTable, SLogicNode** pLogicNode) { int32_t code = TSDB_CODE_SUCCESS; @@ -619,7 +618,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect // set the output if (TSDB_CODE_SUCCESS == code) { SNodeList* pColList = NULL; - code = collectJoinResColumns(pSelect, pJoinTable, &pColList); + code = collectJoinResColumns(pSelect, pJoin, &pColList); if (TSDB_CODE_SUCCESS == code && NULL != pColList) { code = createColumnByRewriteExprs(pColList, &pJoin->node.pTargets); } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index aa563034bc..971a7f7d14 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -768,6 +768,9 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi } pJoin->joinType = pJoinLogicNode->joinType; + pJoin->subType = pJoinLogicNode->subType; + pJoin->pWindowOffset = nodesCloneNode(pJoinLogicNode->pWindowOffset); + pJoin->pJLimit = nodesCloneNode(pJoinLogicNode->pJLimit); pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder; SDataBlockDescNode* pLeftDesc = NULL; @@ -795,9 +798,33 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColEqCond) || (NULL != pJoinLogicNode->pTagEqCond))) { code = mergeEqCond(&pJoinLogicNode->pColEqCond, &pJoinLogicNode->pTagEqCond); } + //TODO set from input blocks for group algo +/* if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) { code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqCond); } +*/ + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) { + code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, + pJoinLogicNode->pColEqCond, &pJoin->pColEqCond); + } + + + if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColOnCond) || (NULL != pJoinLogicNode->pTagOnCond))) { + code = mergeEqCond(&pJoinLogicNode->pColOnCond, &pJoinLogicNode->pTagOnCond); + } + //TODO set from input blocks for group algo + /* + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColOnCond) { + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColOnCond, &pJoin->pColOnCond); + } + */ + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColOnCond) { + code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, + pJoinLogicNode->pColOnCond, &pJoin->pColOnCond); + } + + if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); } @@ -954,6 +981,9 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil int32_t code = TSDB_CODE_SUCCESS; pJoin->joinType = pJoinLogicNode->joinType; + pJoin->subType = pJoinLogicNode->subType; + pJoin->pWindowOffset = nodesCloneNode(pJoinLogicNode->pWindowOffset); + pJoin->pJLimit = nodesCloneNode(pJoinLogicNode->pJLimit); pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder; code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pJoin->pPrimKeyCond);