diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index ad4b59714c..0e235191b1 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -112,6 +112,7 @@ typedef struct SJoinLogicNode { SNode* pOnConditions; bool isSingleTableJoin; EOrder inputTsOrder; + SNode* pTagEqualConditions; } SJoinLogicNode; typedef struct SAggLogicNode { @@ -405,6 +406,7 @@ typedef struct SSortMergeJoinPhysiNode { SNode* pOnConditions; SNodeList* pTargets; EOrder inputTsOrder; + SNode* pTagEqualCondtions; } SSortMergeJoinPhysiNode; typedef struct SAggPhysiNode { diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 754b5f4737..9dee58367c 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -30,11 +30,13 @@ typedef struct SJoinRowCtx { bool rowRemains; int64_t ts; SArray* leftRowLocations; - SArray* rightRowLocations; SArray* leftCreatedBlocks; SArray* rightCreatedBlocks; int32_t leftRowIdx; int32_t rightRowIdx; + + bool rightUseBuildTable; + SArray* rightRowLocations; } SJoinRowCtx; typedef struct SJoinOperatorInfo { @@ -50,7 +52,17 @@ typedef struct SJoinOperatorInfo { int32_t rightPos; SColumnInfo rightCol; SNode* pCondAfterMerge; + SNode* pTagEqualConditions; + SArray* leftTagCols; + char* leftTagKeyBuf; + int32_t leftTagKeyLen; + + SArray* rightTagCols; + char* rightTagKeyBuf; + int32_t rightTagKeyLen; + + SSHashObj* rightBuildTable; SJoinRowCtx rowCtx; } SJoinOperatorInfo; @@ -92,6 +104,100 @@ static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDown setJoinColumnInfo(&pInfo->rightCol, rightTsCol); } +static void extractTagEqualColsFromOper(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstreams, SOperatorNode* pOperNode, + SColumn* pLeft, SColumn* pRight) { + SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft; + SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight; + if (pLeftNode->dataBlockId == pRightNode->dataBlockId || pLeftNode->dataBlockId == pDownstreams[0]->resultDataBlockId) { + *pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft); + *pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight); + } else { + *pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight); + *pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft); + } +} + +static void extractTagEqualCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownStream, SNode* pTagEqualNode, + SArray* leftTagEqCols, SArray* rightTagEqCols) { + SColumn left = {0}; + SColumn right = {0}; + if (nodeType(pTagEqualNode) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)pTagEqualNode)->condType == LOGIC_COND_TYPE_AND) { + SNode* pNode = NULL; + FOREACH(pNode, ((SLogicConditionNode*)pTagEqualNode)->pParameterList) { + SOperatorNode* pOperNode = (SOperatorNode*)pNode; + extractTagEqualColsFromOper(pInfo, pDownStream, pOperNode, &left, &right); + taosArrayPush(leftTagEqCols, &left); + taosArrayPush(rightTagEqCols, &right); + } + return; + } + + if (nodeType(pTagEqualNode) == QUERY_NODE_OPERATOR) { + SOperatorNode* pOperNode = (SOperatorNode*)pTagEqualNode; + extractTagEqualColsFromOper(pInfo, pDownStream, pOperNode, &left, &right); + taosArrayPush(leftTagEqCols, &left); + taosArrayPush(rightTagEqCols, &right); + } +} + +static int32_t initTagColskeyBuf(int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { + int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i); + (*keyLen) += pCol->bytes; // actual data + null_flag + } + + int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; + (*keyLen) += nullFlagSize; + + (*keyBuf) = taosMemoryCalloc(1, (*keyLen)); + if ((*keyBuf) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t fillKeyBufFromTagCols(SArray* pCols, SSDataBlock* pBlock, int32_t rowIndex, void* pKey) { + SColumnDataAgg* pColAgg = NULL; + size_t numOfGroupCols = taosArrayGetSize(pCols); + char* isNull = (char*)pKey; + char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols; + + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SColumn* pCol = (SColumn*) taosArrayGet(pCols, i); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); + + // valid range check. todo: return error code. + if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) { + continue; + } + + if (pBlock->pBlockAgg != NULL) { + pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? + } + + if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { + isNull[i] = 1; + } else { + isNull[i] = 0; + char* val = colDataGetData(pColInfoData, rowIndex); + if (pCol->type == TSDB_DATA_TYPE_JSON) { + int32_t dataLen = getJsonValueLen(val); + memcpy(pStart, val, dataLen); + pStart += dataLen; + } else if (IS_VAR_DATA_TYPE(pCol->type)) { + varDataCopy(pStart, val); + pStart += varDataTLen(val); + } else { + memcpy(pStart, val, pCol->bytes); + pStart += pCol->bytes; + } + } + } + return (int32_t)(pStart - (char*)pKey); +} + SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); @@ -153,6 +259,16 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->inputOrder = TSDB_ORDER_DESC; } + pInfo->pTagEqualConditions = pJoinNode->pTagEqualCondtions; + if (pInfo->pTagEqualConditions != NULL) { + pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn)); + pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn)); + extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols); + initTagColskeyBuf(&pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols); + initTagColskeyBuf(&pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->rightBuildTable = tSimpleHashInit(256, hashFn); + } pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { @@ -179,8 +295,28 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { pColumn->scale = pColumnNode->node.resType.scale; } +static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) { + void* p = NULL; + int32_t iter = 0; + + while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) { + SArray* rows = (*(SArray**)p); + taosArrayDestroy(rows); + } + + tSimpleHashCleanup(pBuildTable); +} + void destroyMergeJoinOperator(void* param) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; + if (pJoinOperator->pTagEqualConditions != NULL) { + mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable); + taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf); + taosArrayDestroy(pJoinOperator->rightTagCols); + + taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf); + taosArrayDestroy(pJoinOperator->leftTagCols); + } nodesDestroyNode(pJoinOperator->pCondAfterMerge); pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); @@ -300,21 +436,122 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator return 0; } +static int32_t mergeJoinFillBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations) { + for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { + SRowLocation* rightRow = taosArrayGet(rightRowLocations, i); + int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightTagCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightTagKeyBuf); + SArray** ppRows = tSimpleHashGet(pInfo->rightBuildTable, pInfo->rightTagKeyBuf, keyLen); + if (!ppRows) { + SArray* rows = taosArrayInit(4, sizeof(SRowLocation)); + taosArrayPush(rows, rightRow); + tSimpleHashPut(pInfo->rightBuildTable, pInfo->rightTagKeyBuf, keyLen, &rows, POINTER_BYTES); + } else { + taosArrayPush(*ppRows, rightRow); + } + } + return TSDB_CODE_SUCCESS; +} + +static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows, + const SArray* leftRowLocations, int32_t leftRowIdx, + int32_t rightRowIdx, bool useBuildTableTSRange, SArray* rightRowLocations, bool* pReachThreshold) { + *pReachThreshold = false; + uint32_t limitRowNum = pOperator->resultInfo.threshold; + SJoinOperatorInfo* pJoinInfo = pOperator->info; + size_t leftNumJoin = taosArrayGetSize(leftRowLocations); + + int32_t i,j; + + for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) { + SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); + SArray* pRightRows = NULL; + if (useBuildTableTSRange) { + int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftTagCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftTagKeyBuf); + SArray** ppRightRows = tSimpleHashGet(pJoinInfo->rightBuildTable, pJoinInfo->leftTagKeyBuf, keyLen); + if (!ppRightRows) { + continue; + } + pRightRows = *ppRightRows; + } else { + pRightRows = rightRowLocations; + } + size_t rightRowsSize = taosArrayGetSize(pRightRows); + for (j = rightRowIdx; j < rightRowsSize; ++j) { + if (*nRows >= limitRowNum) { + *pReachThreshold = true; + break; + } + + SRowLocation* rightRow = taosArrayGet(pRightRows, j); + mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock, + rightRow->pos); + ++*nRows; + } + if (*pReachThreshold) { + break; + } + } + + if (*pReachThreshold) { + pJoinInfo->rowCtx.rowRemains = true; + pJoinInfo->rowCtx.leftRowIdx = i; + pJoinInfo->rowCtx.rightRowIdx = j; + } + return TSDB_CODE_SUCCESS; +} + +static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks, + SArray* rightCreatedBlocks, bool rightUseBuildTable, SArray* rightRowLocations) { + for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); + blockDataDestroy(pBlock); + } + taosArrayDestroy(rightCreatedBlocks); + for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); + blockDataDestroy(pBlock); + } + if (rightRowLocations != NULL) { + taosArrayDestroy(rightRowLocations); + } + if (rightUseBuildTable) { + void* p = NULL; + int32_t iter = 0; + while ((p = tSimpleHashIterate(pJoinInfo->rightBuildTable, p, &iter)) != NULL) { + SArray* rows = (*(SArray**)p); + taosArrayDestroy(rows); + } + tSimpleHashClear(pJoinInfo->rightBuildTable); + } + + taosArrayDestroy(leftCreatedBlocks); + taosArrayDestroy(leftRowLocations); + + pJoinInfo->rowCtx.rowRemains = false; + pJoinInfo->rowCtx.leftRowLocations = NULL; + pJoinInfo->rowCtx.leftCreatedBlocks = NULL; + pJoinInfo->rowCtx.rightCreatedBlocks = NULL; + pJoinInfo->rowCtx.rightUseBuildTable = false; + pJoinInfo->rowCtx.rightRowLocations = NULL; +} + static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes, int32_t* nRows) { int32_t code = TSDB_CODE_SUCCESS; SJoinOperatorInfo* pJoinInfo = pOperator->info; SArray* leftRowLocations = NULL; - SArray* leftCreatedBlocks = NULL; SArray* rightRowLocations = NULL; + SArray* leftCreatedBlocks = NULL; SArray* rightCreatedBlocks = NULL; int32_t leftRowIdx = 0; int32_t rightRowIdx = 0; - int32_t i, j; - + SSHashObj* rightTableHash = NULL; + bool rightUseBuildTable = false; + if (pJoinInfo->rowCtx.rowRemains) { leftRowLocations = pJoinInfo->rowCtx.leftRowLocations; leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks; + rightUseBuildTable = pJoinInfo->rowCtx.rightUseBuildTable; rightRowLocations = pJoinInfo->rowCtx.rightRowLocations; rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks; leftRowIdx = pJoinInfo->rowCtx.leftRowIdx; @@ -330,78 +567,40 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); + if (pJoinInfo->pTagEqualConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) { + mergeJoinFillBuildTable(pJoinInfo, rightRowLocations); + rightUseBuildTable = true; + taosArrayDestroy(rightRowLocations); + rightRowLocations = NULL; + } } size_t leftNumJoin = taosArrayGetSize(leftRowLocations); - size_t rightNumJoin = taosArrayGetSize(rightRowLocations); - uint32_t maxRowNum = *nRows + (leftNumJoin - leftRowIdx - 1) * rightNumJoin + rightNumJoin - rightRowIdx; - uint32_t limitRowNum = maxRowNum; - if (maxRowNum > pOperator->resultInfo.threshold) { - limitRowNum = pOperator->resultInfo.threshold; - if (!pJoinInfo->rowCtx.rowRemains) { + code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.threshold); + if (code != TSDB_CODE_SUCCESS) { + qError("%s can not ensure block capacity for join. left: %zu", GET_TASKID(pOperator->pTaskInfo), + leftNumJoin); + } + + bool reachThreshold = false; + + if (code == TSDB_CODE_SUCCESS) { + mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx, + rightRowIdx, rightUseBuildTable, rightRowLocations, &reachThreshold); + } + + if (!reachThreshold) { + mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks, + rightUseBuildTable, rightRowLocations); + + } else { pJoinInfo->rowCtx.rowRemains = true; pJoinInfo->rowCtx.ts = timestamp; pJoinInfo->rowCtx.leftRowLocations = leftRowLocations; - pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; - } - } - - code = blockDataEnsureCapacity(pRes, limitRowNum); - if (code != TSDB_CODE_SUCCESS) { - qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo), - leftNumJoin, rightNumJoin); - } - - - if (code == TSDB_CODE_SUCCESS) { - bool done = false; - for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) { - for (j = rightRowIdx; j < rightNumJoin; ++j) { - if (*nRows >= limitRowNum) { - done = true; - break; - } - - SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); - SRowLocation* rightRow = taosArrayGet(rightRowLocations, j); - mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock, - rightRow->pos); - ++*nRows; - } - if (done) { - break; - } - } - - if (maxRowNum > pOperator->resultInfo.threshold) { - pJoinInfo->rowCtx.leftRowIdx = i; - pJoinInfo->rowCtx.rightRowIdx = j; - } - } - - if (maxRowNum <= pOperator->resultInfo.threshold) { - for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { - SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); - blockDataDestroy(pBlock); - } - taosArrayDestroy(rightCreatedBlocks); - taosArrayDestroy(rightRowLocations); - for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) { - SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); - blockDataDestroy(pBlock); - } - taosArrayDestroy(leftCreatedBlocks); - taosArrayDestroy(leftRowLocations); - - if (pJoinInfo->rowCtx.rowRemains) { - pJoinInfo->rowCtx.rowRemains = false; - pJoinInfo->rowCtx.leftRowLocations = NULL; - pJoinInfo->rowCtx.rightRowLocations = NULL; - pJoinInfo->rowCtx.leftCreatedBlocks = NULL; - pJoinInfo->rowCtx.rightCreatedBlocks = NULL; - } + pJoinInfo->rowCtx.rightUseBuildTable = rightUseBuildTable; + pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index d9a4c5178f..91db527a02 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -401,6 +401,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_SCALAR_FIELD(joinType); CLONE_NODE_FIELD(pMergeCondition); CLONE_NODE_FIELD(pOnConditions); + CLONE_NODE_FIELD(pTagEqualConditions); COPY_SCALAR_FIELD(isSingleTableJoin); COPY_SCALAR_FIELD(inputTsOrder); return TSDB_CODE_SUCCESS; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0aeb83ce08..b4e2d95e26 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1416,6 +1416,7 @@ static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) { static const char* jkJoinLogicPlanJoinType = "JoinType"; static const char* jkJoinLogicPlanOnConditions = "OnConditions"; static const char* jkJoinLogicPlanMergeCondition = "MergeConditions"; +static const char* jkJoinLogicPlanTagEqualConditions = "TagEqualConditions"; static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { const SJoinLogicNode* pNode = (const SJoinLogicNode*)pObj; @@ -1430,7 +1431,9 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOnConditions); } - + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinLogicPlanTagEqualConditions, nodeToJson, pNode->pTagEqualConditions); + } return code; } @@ -1447,7 +1450,9 @@ static int32_t jsonToLogicJoinNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOnConditions); } - + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinLogicPlanTagEqualConditions, &pNode->pTagEqualConditions); + } return code; } @@ -1878,6 +1883,7 @@ static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder"; static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition"; static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanTargets = "Targets"; +static const char* jkJoinPhysiPlanTagEqualConditions = "TagEqualConditions"; static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; @@ -1898,7 +1904,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets); } - + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanTagEqualConditions, nodeToJson, pNode->pTagEqualCondtions); + } return code; } @@ -1921,7 +1929,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); } - + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanTagEqualConditions, &pNode->pTagEqualCondtions); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 6c6b6c0e81..45cebb4559 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2317,7 +2317,8 @@ enum { PHY_SORT_MERGE_JOIN_CODE_MERGE_CONDITION, PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, PHY_SORT_MERGE_JOIN_CODE_TARGETS, - PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER + PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, + PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS }; static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2339,7 +2340,9 @@ static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, pNode->inputTsOrder); } - + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS, nodeToMsg, pNode->pTagEqualCondtions); + } return code; } @@ -2368,6 +2371,9 @@ static int32_t msgToPhysiJoinNode(STlvDecoder* pDecoder, void* pObj) { case PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER: code = tlvDecodeEnum(pTlv, &pNode->inputTsOrder, sizeof(pNode->inputTsOrder)); break; + case PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTagEqualCondtions); + break; default: break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 99455a15b5..67747497db 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1072,6 +1072,7 @@ void nodesDestroyNode(SNode* pNode) { destroyLogicNode((SLogicNode*)pLogicNode); nodesDestroyNode(pLogicNode->pMergeCondition); nodesDestroyNode(pLogicNode->pOnConditions); + nodesDestroyNode(pLogicNode->pTagEqualConditions); break; } case QUERY_NODE_LOGIC_PLAN_AGG: { @@ -1204,6 +1205,7 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pPhyNode->pMergeCondition); nodesDestroyNode(pPhyNode->pOnConditions); nodesDestroyList(pPhyNode->pTargets); + nodesDestroyNode(pPhyNode->pTagEqualCondtions); break; } case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 66d85a9c89..5be67389c8 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -740,6 +740,88 @@ static int32_t pushDownCondOptJoinExtractMergeCond(SOptimizeContext* pCxt, SJoin return code; } +static bool pushDownCondOptIsTag(SNode* pNode, SNodeList* pTableCols) { + if (QUERY_NODE_COLUMN != nodeType(pNode)) { + return false; + } + SColumnNode* pCol = (SColumnNode*)pNode; + if (COLUMN_TYPE_TAG != pCol->colType) { + return false; + } + return pushDownCondOptBelongThisTable(pNode, pTableCols); +} + +static bool pushDownCondOptIsTagEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { + if (QUERY_NODE_OPERATOR != nodeType(pCond)) { + return false; + } + SOperatorNode* pOper = (SOperatorNode*)pCond; + if (OP_TYPE_EQUAL != pOper->opType) { + return false; + } + if (QUERY_NODE_COLUMN != nodeType(pOper->pLeft) || QUERY_NODE_COLUMN != nodeType(pOper->pRight)) { + return false; + } + SColumnNode* pLeft = (SColumnNode*)(pOper->pLeft); + SColumnNode* pRight = (SColumnNode*)(pOper->pRight); + //TODO: add cast to operator and remove this restriction of optimization + if (pLeft->node.resType.type != pRight->node.resType.type || pLeft->node.resType.bytes != pRight->node.resType.bytes) { + return false; + } + SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; + SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; + if (pushDownCondOptIsTag(pOper->pLeft, pLeftCols)) { + return pushDownCondOptIsTag(pOper->pRight, pRightCols); + } else if (pushDownCondOptIsTag(pOper->pLeft, pRightCols)) { + return pushDownCondOptIsTag(pOper->pRight, pLeftCols); + } + return false; +} + +static int32_t pushDownCondOptJoinExtractTagEqualLogicCond(SJoinLogicNode* pJoin) { + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions); + + int32_t code = TSDB_CODE_SUCCESS; + SNodeList* pTagEqualConds = NULL; + SNode* pCond = NULL; + FOREACH(pCond, pLogicCond->pParameterList) { + if (pushDownCondOptIsTagEqualCond(pJoin, pCond)) { + code = nodesListMakeAppend(&pTagEqualConds, nodesCloneNode(pCond)); + } + } + + SNode* pTempTagEqCond = NULL; + if (TSDB_CODE_SUCCESS == code) { + code = nodesMergeConds(&pTempTagEqCond, &pTagEqualConds); + } + + if (TSDB_CODE_SUCCESS == code) { + pJoin->pTagEqualConditions = pTempTagEqCond; + return TSDB_CODE_SUCCESS; + } else { + nodesDestroyList(pTagEqualConds); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t pushDownCondOptJoinExtractTagEqualCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { + if (NULL == pJoin->pOnConditions) { + pJoin->pTagEqualConditions = NULL; + return TSDB_CODE_SUCCESS; + } + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) && + LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) { + return pushDownCondOptJoinExtractTagEqualLogicCond(pJoin); + } + + if (pushDownCondOptIsTagEqualCond(pJoin, pJoin->pOnConditions)) { + pJoin->pTagEqualConditions = nodesCloneNode(pJoin->pOnConditions); + } + + return TSDB_CODE_SUCCESS; +} + static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { return TSDB_CODE_SUCCESS; @@ -774,6 +856,10 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p code = pushDownCondOptJoinExtractMergeCond(pCxt, pJoin); } + if (TSDB_CODE_SUCCESS == code) { + code = pushDownCondOptJoinExtractTagEqualCond(pCxt, pJoin); + } + if (TSDB_CODE_SUCCESS == code) { OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); pCxt->optimized = true; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 651be08aec..e07c6ebcfe 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -705,6 +705,9 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren pJoinLogicNode->pOnConditions, &pJoin->pOnConditions); } + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pTagEqualConditions) { + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqualConditions, &pJoin->pTagEqualCondtions); + } if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); } diff --git a/tests/script/tsim/parser/join_manyblocks.sim b/tests/script/tsim/parser/join_manyblocks.sim index a40a75f50c..7fd0df21b3 100644 --- a/tests/script/tsim/parser/join_manyblocks.sim +++ b/tests/script/tsim/parser/join_manyblocks.sim @@ -6,8 +6,8 @@ sql connect $dbPrefix = join_m_db $tbPrefix = join_tb $mtPrefix = join_mt -$tbNum = 3 -$rowNum = 2000 +$tbNum = 20 +$rowNum = 200 $totalNum = $tbNum * $rowNum print =============== join_manyBlocks.sim @@ -78,8 +78,8 @@ print ==============> td-3313 sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1; print $row -if $row != 6000 then - print expect 6000, actual: $row +if $row != 4000 then + print expect 4000, actual: $row return -1 endi