From 629ab2b85fb59eeebc3e4634de1db0c9b0b91586 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 15 May 2023 16:10:11 +0800 Subject: [PATCH 01/13] fix: extract tag equal condition --- include/libs/nodes/plannodes.h | 2 + source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/nodes/src/nodesCodeFuncs.c | 18 ++++-- source/libs/nodes/src/nodesMsgFuncs.c | 10 ++- source/libs/nodes/src/nodesUtilFuncs.c | 2 + source/libs/planner/src/planOptimizer.c | 73 ++++++++++++++++++++++ source/libs/planner/src/planPhysiCreater.c | 3 + 7 files changed, 103 insertions(+), 6 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index ad4b59714c..0e235191b1 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -112,6 +112,7 @@ typedef struct SJoinLogicNode { SNode* pOnConditions; bool isSingleTableJoin; EOrder inputTsOrder; + SNode* pTagEqualConditions; } SJoinLogicNode; typedef struct SAggLogicNode { @@ -405,6 +406,7 @@ typedef struct SSortMergeJoinPhysiNode { SNode* pOnConditions; SNodeList* pTargets; EOrder inputTsOrder; + SNode* pTagEqualCondtions; } SSortMergeJoinPhysiNode; typedef struct SAggPhysiNode { diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index d9a4c5178f..91db527a02 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -401,6 +401,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_SCALAR_FIELD(joinType); CLONE_NODE_FIELD(pMergeCondition); CLONE_NODE_FIELD(pOnConditions); + CLONE_NODE_FIELD(pTagEqualConditions); COPY_SCALAR_FIELD(isSingleTableJoin); COPY_SCALAR_FIELD(inputTsOrder); return TSDB_CODE_SUCCESS; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0aeb83ce08..b4e2d95e26 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1416,6 +1416,7 @@ static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) { static const char* jkJoinLogicPlanJoinType = "JoinType"; static const char* jkJoinLogicPlanOnConditions = "OnConditions"; static const char* jkJoinLogicPlanMergeCondition = "MergeConditions"; +static const char* jkJoinLogicPlanTagEqualConditions = "TagEqualConditions"; static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { const SJoinLogicNode* pNode = (const SJoinLogicNode*)pObj; @@ -1430,7 +1431,9 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOnConditions); } - + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinLogicPlanTagEqualConditions, nodeToJson, pNode->pTagEqualConditions); + } return code; } @@ -1447,7 +1450,9 @@ static int32_t jsonToLogicJoinNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOnConditions); } - + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinLogicPlanTagEqualConditions, &pNode->pTagEqualConditions); + } return code; } @@ -1878,6 +1883,7 @@ static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder"; static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition"; static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanTargets = "Targets"; +static const char* jkJoinPhysiPlanTagEqualConditions = "TagEqualConditions"; static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; @@ -1898,7 +1904,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets); } - + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanTagEqualConditions, nodeToJson, pNode->pTagEqualCondtions); + } return code; } @@ -1921,7 +1929,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); } - + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanTagEqualConditions, &pNode->pTagEqualCondtions); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 6c6b6c0e81..45cebb4559 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2317,7 +2317,8 @@ enum { PHY_SORT_MERGE_JOIN_CODE_MERGE_CONDITION, PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, PHY_SORT_MERGE_JOIN_CODE_TARGETS, - PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER + PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, + PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS }; static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2339,7 +2340,9 @@ static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, pNode->inputTsOrder); } - + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS, nodeToMsg, pNode->pTagEqualCondtions); + } return code; } @@ -2368,6 +2371,9 @@ static int32_t msgToPhysiJoinNode(STlvDecoder* pDecoder, void* pObj) { case PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER: code = tlvDecodeEnum(pTlv, &pNode->inputTsOrder, sizeof(pNode->inputTsOrder)); break; + case PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTagEqualCondtions); + break; default: break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index f71eef7969..d97b17e30c 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1072,6 +1072,7 @@ void nodesDestroyNode(SNode* pNode) { destroyLogicNode((SLogicNode*)pLogicNode); nodesDestroyNode(pLogicNode->pMergeCondition); nodesDestroyNode(pLogicNode->pOnConditions); + nodesDestroyNode(pLogicNode->pTagEqualConditions); break; } case QUERY_NODE_LOGIC_PLAN_AGG: { @@ -1204,6 +1205,7 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pPhyNode->pMergeCondition); nodesDestroyNode(pPhyNode->pOnConditions); nodesDestroyList(pPhyNode->pTargets); + nodesDestroyNode(pPhyNode->pTagEqualCondtions); break; } case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 66d85a9c89..f5ef02e207 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -740,6 +740,75 @@ static int32_t pushDownCondOptJoinExtractMergeCond(SOptimizeContext* pCxt, SJoin return code; } +static bool pushDownCondOptIsTag(SNode* pNode, SNodeList* pTableCols) { + if (QUERY_NODE_COLUMN != nodeType(pNode)) { + return false; + } + SColumnNode* pCol = (SColumnNode*)pNode; + if (COLUMN_TYPE_TAG != pCol->colType) { + return false; + } + return pushDownCondOptBelongThisTable(pNode, pTableCols); +} + +static bool pushDownCondOptIsTagEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { + if (QUERY_NODE_OPERATOR != nodeType(pCond)) { + return false; + } + SOperatorNode* pOper = (SOperatorNode*)pCond; + if (OP_TYPE_EQUAL != pOper->opType) { + return false; + } + SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; + SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; + if (pushDownCondOptIsTag(pOper->pLeft, pLeftCols)) { + return pushDownCondOptIsTag(pOper->pRight, pRightCols); + } else if (pushDownCondOptIsTag(pOper->pLeft, pRightCols)) { + return pushDownCondOptIsTag(pOper->pRight, pLeftCols); + } + return false; +} + +static int32_t pushDownCondOptJoinExtractTagEqualLogicCond(SJoinLogicNode* pJoin) { + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions); + + int32_t code = TSDB_CODE_SUCCESS; + SNodeList* pTagEqualConds = NULL; + SNode* pCond = NULL; + FOREACH(pCond, pLogicCond->pParameterList) { + if (pushDownCondOptIsTagEqualCond(pJoin, pCond)) { + code = nodesListMakeAppend(&pTagEqualConds, nodesCloneNode(pCond)); + } + } + + SNode* pTempTagEqCond = NULL; + if (TSDB_CODE_SUCCESS == code) { + code = nodesMergeConds(&pTempTagEqCond, &pTagEqualConds); + } + + if (TSDB_CODE_SUCCESS == code) { + pJoin->pTagEqualConditions = pTempTagEqCond; + return TSDB_CODE_SUCCESS; + } else { + nodesDestroyList(pTagEqualConds); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t pushDownCondOptJoinExtractTagEqualCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) && + LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) { + return pushDownCondOptJoinExtractTagEqualLogicCond(pJoin); + } + + if (pushDownCondOptIsTagEqualCond(pJoin, pJoin->pOnConditions)) { + pJoin->pTagEqualConditions = nodesCloneNode(pJoin->pOnConditions); + } + + return TSDB_CODE_SUCCESS; +} + static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { return TSDB_CODE_SUCCESS; @@ -774,6 +843,10 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p code = pushDownCondOptJoinExtractMergeCond(pCxt, pJoin); } + if (TSDB_CODE_SUCCESS == code) { + code = pushDownCondOptJoinExtractTagEqualCond(pCxt, pJoin); + } + if (TSDB_CODE_SUCCESS == code) { OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); pCxt->optimized = true; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index e2c2e4c655..9c6c227480 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -705,6 +705,9 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren pJoinLogicNode->pOnConditions, &pJoin->pOnConditions); } + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pTagEqualConditions) { + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqualConditions, &pJoin->pTagEqualCondtions); + } if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); } From c6555c3b3778808e910a2044714913cd62895d61 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 16 May 2023 16:25:55 +0800 Subject: [PATCH 02/13] fix: join operator finished --- source/libs/executor/src/joinoperator.c | 369 +++++++++++++++++++----- source/libs/planner/src/planOptimizer.c | 8 + 2 files changed, 307 insertions(+), 70 deletions(-) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 754b5f4737..a5591b22b3 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -30,11 +30,11 @@ typedef struct SJoinRowCtx { bool rowRemains; int64_t ts; SArray* leftRowLocations; - SArray* rightRowLocations; SArray* leftCreatedBlocks; SArray* rightCreatedBlocks; int32_t leftRowIdx; int32_t rightRowIdx; + SSHashObj* buildTableTSRange; } SJoinRowCtx; typedef struct SJoinOperatorInfo { @@ -50,6 +50,17 @@ typedef struct SJoinOperatorInfo { int32_t rightPos; SColumnInfo rightCol; SNode* pCondAfterMerge; + SNode* pTagEqualConditions; + + SArray* leftTagCols; + SArray* leftTagKeys; + char* leftTagKeyBuf; + int32_t leftTagKeyLen; + + SArray* rightTagCols; + SArray* rightTagKeys; + char* rightTagKeyBuf; + int32_t rightTagKeyLen; SJoinRowCtx rowCtx; } SJoinOperatorInfo; @@ -92,6 +103,147 @@ static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDown setJoinColumnInfo(&pInfo->rightCol, rightTsCol); } +static void extractTagEqualColsFromOper(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstreams, SOperatorNode* pOperNode, + SColumn* pLeft, SColumn* pRight) { + SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft; + SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight; + if (pLeftNode->dataBlockId == pRightNode->dataBlockId || pLeftNode->dataBlockId == pDownstreams[0]->resultDataBlockId) { + *pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft); + *pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight); + } else { + *pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight); + *pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft); + } +} + +static void extractTagEqualCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownStream, SNode* pTagEqualNode, + SArray* leftTagEqCols, SArray* rightTagEqCols) { + SColumn left = {0}; + SColumn right = {0}; + if (nodeType(pTagEqualNode) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)pTagEqualNode)->condType == LOGIC_COND_TYPE_AND) { + SNode* pNode = NULL; + FOREACH(pNode, ((SLogicConditionNode*)pTagEqualNode)->pParameterList) { + SOperatorNode* pOperNode = (SOperatorNode*)pNode; + extractTagEqualColsFromOper(pInfo, pDownStream, pOperNode, &left, &right); + taosArrayPush(leftTagEqCols, &left); + taosArrayPush(rightTagEqCols, &right); + } + return; + } + + if (nodeType(pTagEqualNode) == QUERY_NODE_OPERATOR) { + SOperatorNode* pOperNode = (SOperatorNode*)pTagEqualNode; + extractTagEqualColsFromOper(pInfo, pDownStream, pOperNode, &left, &right); + taosArrayPush(leftTagEqCols, &left); + taosArrayPush(rightTagEqCols, &right); + } +} + +static int32_t initTagColsGroupkeys(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { + *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); + if ((*pGroupColVals) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i); + (*keyLen) += pCol->bytes; // actual data + null_flag + + SGroupKeys key = {0}; + key.bytes = pCol->bytes; + key.type = pCol->type; + key.isNull = false; + key.pData = taosMemoryCalloc(1, pCol->bytes); + if (key.pData == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush((*pGroupColVals), &key); + } + + int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; + (*keyLen) += nullFlagSize; + + (*keyBuf) = taosMemoryCalloc(1, (*keyLen)); + if ((*keyBuf) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + return TSDB_CODE_SUCCESS; +} + +static void fillGroupKeyValsFromTagCols(SArray* pCols, SArray* pGroupKeys, SSDataBlock* pBlock, int32_t rowIndex) { + SColumnDataAgg* pColAgg = NULL; + + size_t numOfGroupCols = taosArrayGetSize(pCols); + + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SColumn* pCol = (SColumn*) taosArrayGet(pCols, i); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); + + // valid range check. todo: return error code. + if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) { + continue; + } + + if (pBlock->pBlockAgg != NULL) { + pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? + } + + SGroupKeys* pkey = taosArrayGet(pGroupKeys, i); + if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { + pkey->isNull = true; + } else { + pkey->isNull = false; + char* val = colDataGetData(pColInfoData, rowIndex); + if (pkey->type == TSDB_DATA_TYPE_JSON) { + if (tTagIsJson(val)) { + terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR; + return; + } + int32_t dataLen = getJsonValueLen(val); + memcpy(pkey->pData, val, dataLen); + } else if (IS_VAR_DATA_TYPE(pkey->type)) { + memcpy(pkey->pData, val, varDataTLen(val)); + ASSERT(varDataTLen(val) <= pkey->bytes); + } else { + memcpy(pkey->pData, val, pkey->bytes); + } + } + } +} + +static int32_t combineGroupKeysIntoBuf(void* pKey, const SArray* pGroupKeys) { + size_t numOfGroupCols = taosArrayGetSize(pGroupKeys); + + char* isNull = (char*)pKey; + char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols; + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SGroupKeys* pkey = taosArrayGet(pGroupKeys, i); + if (pkey->isNull) { + isNull[i] = 1; + continue; + } + + isNull[i] = 0; + if (pkey->type == TSDB_DATA_TYPE_JSON) { + int32_t dataLen = getJsonValueLen(pkey->pData); + memcpy(pStart, (pkey->pData), dataLen); + pStart += dataLen; + } else if (IS_VAR_DATA_TYPE(pkey->type)) { + varDataCopy(pStart, pkey->pData); + pStart += varDataTLen(pkey->pData); + ASSERT(varDataTLen(pkey->pData) <= pkey->bytes); + } else { + memcpy(pStart, pkey->pData, pkey->bytes); + pStart += pkey->bytes; + } + } + + return (int32_t)(pStart - (char*)pKey); +} + SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); @@ -153,6 +305,13 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->inputOrder = TSDB_ORDER_DESC; } + pInfo->pTagEqualConditions = pJoinNode->pTagEqualCondtions; + pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn)); + pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn)); + extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols); + initTagColsGroupkeys(&pInfo->leftTagKeys, &pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols); + initTagColsGroupkeys(&pInfo->rightTagKeys, &pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { @@ -179,8 +338,22 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { pColumn->scale = pColumnNode->node.resType.scale; } +static void freeGroupKeyData(void* param) { + SGroupKeys* pKey = (SGroupKeys*)param; + taosMemoryFree(pKey->pData); +} + void destroyMergeJoinOperator(void* param) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; + + taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf); + taosArrayDestroyEx(pJoinOperator->rightTagKeys, freeGroupKeyData); + taosArrayDestroy(pJoinOperator->rightTagCols); + + taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf); + taosArrayDestroyEx(pJoinOperator->leftTagKeys, freeGroupKeyData); + taosArrayDestroy(pJoinOperator->leftTagCols); + nodesDestroyNode(pJoinOperator->pCondAfterMerge); pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); @@ -300,22 +473,121 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator return 0; } +static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations,SSHashObj** ppHashObj) { + int32_t buildTableCap = MIN(taosArrayGetSize(rightRowLocations), 4096); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + SSHashObj* buildTable = tSimpleHashInit(buildTableCap, hashFn); + for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { + SRowLocation* rightRow = taosArrayGet(rightRowLocations, i); + fillGroupKeyValsFromTagCols(pInfo->rightTagCols, pInfo->rightTagKeys, rightRow->pDataBlock, rightRow->pos); + int32_t keyLen = combineGroupKeysIntoBuf(pInfo->rightTagKeyBuf, pInfo->rightTagKeys); + SArray** ppRows = tSimpleHashGet(buildTable, pInfo->rightTagKeyBuf, keyLen); + if (!ppRows) { + SArray* rows = taosArrayInit(4, sizeof(SRowLocation)); + taosArrayPush(rows, rightRow); + tSimpleHashPut(buildTable, pInfo->rightTagKeyBuf, keyLen, &rows, POINTER_BYTES); + } else { + taosArrayPush(*ppRows, rightRow); + } + } + *ppHashObj = buildTable; + return TSDB_CODE_SUCCESS; +} + +static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows, + const SArray* leftRowLocations, int32_t leftRowIdx, + int32_t rightRowIdx, SSHashObj* rightTableHash, bool* pReachThreshold) { + *pReachThreshold = false; + uint32_t limitRowNum = pOperator->resultInfo.threshold; + SJoinOperatorInfo* pJoinInfo = pOperator->info; + size_t leftNumJoin = taosArrayGetSize(leftRowLocations); + + int32_t i,j; + + for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) { + SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); + fillGroupKeyValsFromTagCols(pJoinInfo->leftTagCols, pJoinInfo->leftTagKeys, leftRow->pDataBlock, leftRow->pos); + int32_t keyLen = combineGroupKeysIntoBuf(pJoinInfo->leftTagKeyBuf, pJoinInfo->leftTagKeys); + SArray** ppRightRows = tSimpleHashGet(rightTableHash, pJoinInfo->leftTagKeyBuf, keyLen); + if (!ppRightRows) { + continue; + } + SArray* pRightRows = *ppRightRows; + size_t rightRowsSize = taosArrayGetSize(pRightRows); + for (j = rightRowIdx; j < rightRowsSize; ++j) { + if (*nRows >= limitRowNum) { + *pReachThreshold = true; + break; + } + + SRowLocation* rightRow = taosArrayGet(pRightRows, j); + mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock, + rightRow->pos); + ++*nRows; + } + if (*pReachThreshold) { + break; + } + } + + if (*pReachThreshold) { + pJoinInfo->rowCtx.rowRemains = true; + pJoinInfo->rowCtx.leftRowIdx = i; + pJoinInfo->rowCtx.rightRowIdx = j; + } + return TSDB_CODE_SUCCESS; +} + +static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) { + void* p = NULL; + int32_t iter = 0; + + while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) { + SArray* rows = (*(SArray**)p); + taosArrayDestroy(rows); + } + + tSimpleHashCleanup(pBuildTable); +} + +static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks, + SArray* rightCreatedBlocks, SSHashObj* rightTableHash) { + for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); + blockDataDestroy(pBlock); + } + taosArrayDestroy(rightCreatedBlocks); + for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); + blockDataDestroy(pBlock); + } + mergeJoinDestoryBuildTable(rightTableHash); + + taosArrayDestroy(leftCreatedBlocks); + taosArrayDestroy(leftRowLocations); + + pJoinInfo->rowCtx.rowRemains = false; + pJoinInfo->rowCtx.leftRowLocations = NULL; + pJoinInfo->rowCtx.leftCreatedBlocks = NULL; + pJoinInfo->rowCtx.rightCreatedBlocks = NULL; + pJoinInfo->rowCtx.buildTableTSRange = NULL; +} + static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes, int32_t* nRows) { int32_t code = TSDB_CODE_SUCCESS; SJoinOperatorInfo* pJoinInfo = pOperator->info; SArray* leftRowLocations = NULL; SArray* leftCreatedBlocks = NULL; - SArray* rightRowLocations = NULL; SArray* rightCreatedBlocks = NULL; int32_t leftRowIdx = 0; int32_t rightRowIdx = 0; - int32_t i, j; - + SSHashObj* rightTableHash = NULL; + if (pJoinInfo->rowCtx.rowRemains) { leftRowLocations = pJoinInfo->rowCtx.leftRowLocations; leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks; - rightRowLocations = pJoinInfo->rowCtx.rightRowLocations; + rightTableHash = pJoinInfo->rowCtx.buildTableTSRange; rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks; leftRowIdx = pJoinInfo->rowCtx.leftRowIdx; rightRowIdx = pJoinInfo->rowCtx.rightRowIdx; @@ -323,85 +595,42 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t leftRowLocations = taosArrayInit(8, sizeof(SRowLocation)); leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES); - rightRowLocations = taosArrayInit(8, sizeof(SRowLocation)); + SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation)); rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES); mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft, pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); + mergeJoinCreateBuildTable(pJoinInfo, rightRowLocations, &rightTableHash); + taosArrayDestroy(rightRowLocations); } size_t leftNumJoin = taosArrayGetSize(leftRowLocations); - size_t rightNumJoin = taosArrayGetSize(rightRowLocations); - uint32_t maxRowNum = *nRows + (leftNumJoin - leftRowIdx - 1) * rightNumJoin + rightNumJoin - rightRowIdx; - uint32_t limitRowNum = maxRowNum; - if (maxRowNum > pOperator->resultInfo.threshold) { - limitRowNum = pOperator->resultInfo.threshold; - if (!pJoinInfo->rowCtx.rowRemains) { + code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.threshold); + if (code != TSDB_CODE_SUCCESS) { + qError("%s can not ensure block capacity for join. left: %zu", GET_TASKID(pOperator->pTaskInfo), + leftNumJoin); + } + + bool reachThreshold = false; + + if (code == TSDB_CODE_SUCCESS) { + mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx, + rightRowIdx, rightTableHash, &reachThreshold); + } + + if (!reachThreshold) { + mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks, + rightTableHash); + + } else { pJoinInfo->rowCtx.rowRemains = true; pJoinInfo->rowCtx.ts = timestamp; pJoinInfo->rowCtx.leftRowLocations = leftRowLocations; - pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; - } - } - - code = blockDataEnsureCapacity(pRes, limitRowNum); - if (code != TSDB_CODE_SUCCESS) { - qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo), - leftNumJoin, rightNumJoin); - } - - - if (code == TSDB_CODE_SUCCESS) { - bool done = false; - for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) { - for (j = rightRowIdx; j < rightNumJoin; ++j) { - if (*nRows >= limitRowNum) { - done = true; - break; - } - - SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); - SRowLocation* rightRow = taosArrayGet(rightRowLocations, j); - mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock, - rightRow->pos); - ++*nRows; - } - if (done) { - break; - } - } - - if (maxRowNum > pOperator->resultInfo.threshold) { - pJoinInfo->rowCtx.leftRowIdx = i; - pJoinInfo->rowCtx.rightRowIdx = j; - } - } - - if (maxRowNum <= pOperator->resultInfo.threshold) { - for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { - SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); - blockDataDestroy(pBlock); - } - taosArrayDestroy(rightCreatedBlocks); - taosArrayDestroy(rightRowLocations); - for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) { - SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); - blockDataDestroy(pBlock); - } - taosArrayDestroy(leftCreatedBlocks); - taosArrayDestroy(leftRowLocations); - - if (pJoinInfo->rowCtx.rowRemains) { - pJoinInfo->rowCtx.rowRemains = false; - pJoinInfo->rowCtx.leftRowLocations = NULL; - pJoinInfo->rowCtx.rightRowLocations = NULL; - pJoinInfo->rowCtx.leftCreatedBlocks = NULL; - pJoinInfo->rowCtx.rightCreatedBlocks = NULL; - } + pJoinInfo->rowCtx.buildTableTSRange = rightTableHash; } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index f5ef02e207..8c111133fc 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -759,6 +759,14 @@ static bool pushDownCondOptIsTagEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { if (OP_TYPE_EQUAL != pOper->opType) { return false; } + if (QUERY_NODE_COLUMN != nodeType(pOper->pLeft) || QUERY_NODE_COLUMN != nodeType(pOper->pRight)) { + return false; + } + SColumnNode* pLeft = (SColumnNode*)(pOper->pLeft); + SColumnNode* pRight = (SColumnNode*)(pOper->pRight); + if (pLeft->node.resType.type != pRight->node.resType.type) { + return false; + } SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; if (pushDownCondOptIsTag(pOper->pLeft, pLeftCols)) { From a51de5cc9384b22d09ec27f9607f46361e548975 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 16 May 2023 16:31:58 +0800 Subject: [PATCH 03/13] fix: add restriction --- source/libs/planner/src/planOptimizer.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 8c111133fc..d76c25fc55 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -764,7 +764,8 @@ static bool pushDownCondOptIsTagEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { } SColumnNode* pLeft = (SColumnNode*)(pOper->pLeft); SColumnNode* pRight = (SColumnNode*)(pOper->pRight); - if (pLeft->node.resType.type != pRight->node.resType.type) { + //TODO: add cast to operator and remove this restriction of optimization + if (pLeft->node.resType.type != pRight->node.resType.type || pLeft->node.resType.bytes != pRight->node.resType.bytes) { return false; } SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; From 568725a245d8e1a2f80dc056c85955bdd8a382cd Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 17 May 2023 08:33:47 +0800 Subject: [PATCH 04/13] fix: using right rows to build hash table when the tag equal cond is not null and same ts row row number is greater than 16 --- source/libs/executor/src/joinoperator.c | 74 ++++++++++++++++--------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index a5591b22b3..54ddab9e1e 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -34,7 +34,9 @@ typedef struct SJoinRowCtx { SArray* rightCreatedBlocks; int32_t leftRowIdx; int32_t rightRowIdx; + SSHashObj* buildTableTSRange; + SArray* rightRowLocations; } SJoinRowCtx; typedef struct SJoinOperatorInfo { @@ -306,12 +308,13 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t } pInfo->pTagEqualConditions = pJoinNode->pTagEqualCondtions; - pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn)); - pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn)); - extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols); - initTagColsGroupkeys(&pInfo->leftTagKeys, &pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols); - initTagColsGroupkeys(&pInfo->rightTagKeys, &pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols); - + if (pInfo->pTagEqualConditions != NULL) { + pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn)); + pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn)); + extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols); + initTagColsGroupkeys(&pInfo->leftTagKeys, &pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols); + initTagColsGroupkeys(&pInfo->rightTagKeys, &pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols); + } pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { @@ -345,15 +348,15 @@ static void freeGroupKeyData(void* param) { void destroyMergeJoinOperator(void* param) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; + if (pJoinOperator->pTagEqualConditions != NULL) { + taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf); + taosArrayDestroyEx(pJoinOperator->rightTagKeys, freeGroupKeyData); + taosArrayDestroy(pJoinOperator->rightTagCols); - taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf); - taosArrayDestroyEx(pJoinOperator->rightTagKeys, freeGroupKeyData); - taosArrayDestroy(pJoinOperator->rightTagCols); - - taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf); - taosArrayDestroyEx(pJoinOperator->leftTagKeys, freeGroupKeyData); - taosArrayDestroy(pJoinOperator->leftTagCols); - + taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf); + taosArrayDestroyEx(pJoinOperator->leftTagKeys, freeGroupKeyData); + taosArrayDestroy(pJoinOperator->leftTagCols); + } nodesDestroyNode(pJoinOperator->pCondAfterMerge); pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); @@ -496,7 +499,7 @@ static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* right static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows, const SArray* leftRowLocations, int32_t leftRowIdx, - int32_t rightRowIdx, SSHashObj* rightTableHash, bool* pReachThreshold) { + int32_t rightRowIdx, SSHashObj* rightTableHash, SArray* rightRowLocations, bool* pReachThreshold) { *pReachThreshold = false; uint32_t limitRowNum = pOperator->resultInfo.threshold; SJoinOperatorInfo* pJoinInfo = pOperator->info; @@ -506,13 +509,18 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) { SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); - fillGroupKeyValsFromTagCols(pJoinInfo->leftTagCols, pJoinInfo->leftTagKeys, leftRow->pDataBlock, leftRow->pos); - int32_t keyLen = combineGroupKeysIntoBuf(pJoinInfo->leftTagKeyBuf, pJoinInfo->leftTagKeys); - SArray** ppRightRows = tSimpleHashGet(rightTableHash, pJoinInfo->leftTagKeyBuf, keyLen); - if (!ppRightRows) { - continue; + SArray* pRightRows = NULL; + if (rightTableHash != NULL) { + fillGroupKeyValsFromTagCols(pJoinInfo->leftTagCols, pJoinInfo->leftTagKeys, leftRow->pDataBlock, leftRow->pos); + int32_t keyLen = combineGroupKeysIntoBuf(pJoinInfo->leftTagKeyBuf, pJoinInfo->leftTagKeys); + SArray** ppRightRows = tSimpleHashGet(rightTableHash, pJoinInfo->leftTagKeyBuf, keyLen); + if (!ppRightRows) { + continue; + } + pRightRows = *ppRightRows; + } else { + pRightRows = rightRowLocations; } - SArray* pRightRows = *ppRightRows; size_t rightRowsSize = taosArrayGetSize(pRightRows); for (j = rightRowIdx; j < rightRowsSize; ++j) { if (*nRows >= limitRowNum) { @@ -551,7 +559,7 @@ static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) { } static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks, - SArray* rightCreatedBlocks, SSHashObj* rightTableHash) { + SArray* rightCreatedBlocks, SSHashObj* rightTableHash, SArray* rightRowLocations) { for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); blockDataDestroy(pBlock); @@ -561,7 +569,12 @@ static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* lef SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); blockDataDestroy(pBlock); } - mergeJoinDestoryBuildTable(rightTableHash); + if (rightRowLocations != NULL) { + taosArrayDestroy(rightRowLocations); + } + if (rightTableHash != NULL) { + mergeJoinDestoryBuildTable(rightTableHash); + } taosArrayDestroy(leftCreatedBlocks); taosArrayDestroy(leftRowLocations); @@ -571,6 +584,7 @@ static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* lef pJoinInfo->rowCtx.leftCreatedBlocks = NULL; pJoinInfo->rowCtx.rightCreatedBlocks = NULL; pJoinInfo->rowCtx.buildTableTSRange = NULL; + pJoinInfo->rowCtx.rightRowLocations = NULL; } static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes, @@ -578,6 +592,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t int32_t code = TSDB_CODE_SUCCESS; SJoinOperatorInfo* pJoinInfo = pOperator->info; SArray* leftRowLocations = NULL; + SArray* rightRowLocations = NULL; SArray* leftCreatedBlocks = NULL; SArray* rightCreatedBlocks = NULL; int32_t leftRowIdx = 0; @@ -588,6 +603,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t leftRowLocations = pJoinInfo->rowCtx.leftRowLocations; leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks; rightTableHash = pJoinInfo->rowCtx.buildTableTSRange; + rightRowLocations = pJoinInfo->rowCtx.rightRowLocations; rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks; leftRowIdx = pJoinInfo->rowCtx.leftRowIdx; rightRowIdx = pJoinInfo->rowCtx.rightRowIdx; @@ -602,8 +618,11 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); - mergeJoinCreateBuildTable(pJoinInfo, rightRowLocations, &rightTableHash); - taosArrayDestroy(rightRowLocations); + if (pJoinInfo->pTagEqualConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) { + mergeJoinCreateBuildTable(pJoinInfo, rightRowLocations, &rightTableHash); + taosArrayDestroy(rightRowLocations); + rightRowLocations = NULL; + } } size_t leftNumJoin = taosArrayGetSize(leftRowLocations); @@ -617,12 +636,12 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t if (code == TSDB_CODE_SUCCESS) { mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx, - rightRowIdx, rightTableHash, &reachThreshold); + rightRowIdx, rightTableHash, rightRowLocations, &reachThreshold); } if (!reachThreshold) { mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks, - rightTableHash); + rightTableHash, rightRowLocations); } else { pJoinInfo->rowCtx.rowRemains = true; @@ -631,6 +650,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; pJoinInfo->rowCtx.buildTableTSRange = rightTableHash; + pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; } return TSDB_CODE_SUCCESS; } From 03e77288e8ca2b3ea9a7385c21f181593fa899a6 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 17 May 2023 10:07:19 +0800 Subject: [PATCH 05/13] fix: add on conditions null check and rightRowLocations no longer hides the outer scope rightRowLocations --- source/libs/executor/src/joinoperator.c | 4 ++-- source/libs/planner/src/planOptimizer.c | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 54ddab9e1e..07b8a4db11 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -477,7 +477,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator } static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations,SSHashObj** ppHashObj) { - int32_t buildTableCap = MIN(taosArrayGetSize(rightRowLocations), 4096); + int32_t buildTableCap = TMIN(taosArrayGetSize(rightRowLocations), 256); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); SSHashObj* buildTable = tSimpleHashInit(buildTableCap, hashFn); for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { @@ -611,7 +611,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t leftRowLocations = taosArrayInit(8, sizeof(SRowLocation)); leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES); - SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation)); + rightRowLocations = taosArrayInit(8, sizeof(SRowLocation)); rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES); mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft, diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index d76c25fc55..5be67389c8 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -806,6 +806,10 @@ static int32_t pushDownCondOptJoinExtractTagEqualLogicCond(SJoinLogicNode* pJoin } static int32_t pushDownCondOptJoinExtractTagEqualCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { + if (NULL == pJoin->pOnConditions) { + pJoin->pTagEqualConditions = NULL; + return TSDB_CODE_SUCCESS; + } if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) && LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) { return pushDownCondOptJoinExtractTagEqualLogicCond(pJoin); From ee16b46112e6a83eb1434b9ab90cc104c8e89bcb Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 18 May 2023 10:46:19 +0800 Subject: [PATCH 06/13] fix: remove group keys from join operator --- source/libs/executor/src/joinoperator.c | 94 +++++-------------------- 1 file changed, 18 insertions(+), 76 deletions(-) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 07b8a4db11..68f6951c26 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -55,12 +55,10 @@ typedef struct SJoinOperatorInfo { SNode* pTagEqualConditions; SArray* leftTagCols; - SArray* leftTagKeys; char* leftTagKeyBuf; int32_t leftTagKeyLen; SArray* rightTagCols; - SArray* rightTagKeys; char* rightTagKeyBuf; int32_t rightTagKeyLen; @@ -141,27 +139,11 @@ static void extractTagEqualCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pD } } -static int32_t initTagColsGroupkeys(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { - *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); - if ((*pGroupColVals) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - +static int32_t initTagColskeyBuf(int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); for (int32_t i = 0; i < numOfGroupCols; ++i) { SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i); (*keyLen) += pCol->bytes; // actual data + null_flag - - SGroupKeys key = {0}; - key.bytes = pCol->bytes; - key.type = pCol->type; - key.isNull = false; - key.pData = taosMemoryCalloc(1, pCol->bytes); - if (key.pData == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - taosArrayPush((*pGroupColVals), &key); } int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; @@ -175,10 +157,11 @@ static int32_t initTagColsGroupkeys(SArray** pGroupColVals, int32_t* keyLen, cha return TSDB_CODE_SUCCESS; } -static void fillGroupKeyValsFromTagCols(SArray* pCols, SArray* pGroupKeys, SSDataBlock* pBlock, int32_t rowIndex) { +static int32_t fillKeyBufFromTagCols(SArray* pCols, SSDataBlock* pBlock, int32_t rowIndex, void* pKey) { SColumnDataAgg* pColAgg = NULL; - size_t numOfGroupCols = taosArrayGetSize(pCols); + char* isNull = (char*)pKey; + char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols; for (int32_t i = 0; i < numOfGroupCols; ++i) { SColumn* pCol = (SColumn*) taosArrayGet(pCols, i); @@ -193,56 +176,24 @@ static void fillGroupKeyValsFromTagCols(SArray* pCols, SArray* pGroupKeys, SSDat pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? } - SGroupKeys* pkey = taosArrayGet(pGroupKeys, i); if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { - pkey->isNull = true; + isNull[i] = 1; } else { - pkey->isNull = false; + isNull[i] = 0; char* val = colDataGetData(pColInfoData, rowIndex); - if (pkey->type == TSDB_DATA_TYPE_JSON) { - if (tTagIsJson(val)) { - terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR; - return; - } + if (pCol->type == TSDB_DATA_TYPE_JSON) { int32_t dataLen = getJsonValueLen(val); - memcpy(pkey->pData, val, dataLen); - } else if (IS_VAR_DATA_TYPE(pkey->type)) { - memcpy(pkey->pData, val, varDataTLen(val)); - ASSERT(varDataTLen(val) <= pkey->bytes); + memcpy(pStart, val, dataLen); + pStart += dataLen; + } else if (IS_VAR_DATA_TYPE(pCol->type)) { + varDataCopy(pStart, val); + pStart += varDataTLen(val); } else { - memcpy(pkey->pData, val, pkey->bytes); + memcpy(pStart, val, pCol->bytes); + pStart += pCol->bytes; } } } -} - -static int32_t combineGroupKeysIntoBuf(void* pKey, const SArray* pGroupKeys) { - size_t numOfGroupCols = taosArrayGetSize(pGroupKeys); - - char* isNull = (char*)pKey; - char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols; - for (int32_t i = 0; i < numOfGroupCols; ++i) { - SGroupKeys* pkey = taosArrayGet(pGroupKeys, i); - if (pkey->isNull) { - isNull[i] = 1; - continue; - } - - isNull[i] = 0; - if (pkey->type == TSDB_DATA_TYPE_JSON) { - int32_t dataLen = getJsonValueLen(pkey->pData); - memcpy(pStart, (pkey->pData), dataLen); - pStart += dataLen; - } else if (IS_VAR_DATA_TYPE(pkey->type)) { - varDataCopy(pStart, pkey->pData); - pStart += varDataTLen(pkey->pData); - ASSERT(varDataTLen(pkey->pData) <= pkey->bytes); - } else { - memcpy(pStart, pkey->pData, pkey->bytes); - pStart += pkey->bytes; - } - } - return (int32_t)(pStart - (char*)pKey); } @@ -312,8 +263,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn)); pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn)); extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols); - initTagColsGroupkeys(&pInfo->leftTagKeys, &pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols); - initTagColsGroupkeys(&pInfo->rightTagKeys, &pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols); + initTagColskeyBuf(&pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols); + initTagColskeyBuf(&pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols); } pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, pDownstream, numOfDownstream); @@ -341,20 +292,13 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { pColumn->scale = pColumnNode->node.resType.scale; } -static void freeGroupKeyData(void* param) { - SGroupKeys* pKey = (SGroupKeys*)param; - taosMemoryFree(pKey->pData); -} - void destroyMergeJoinOperator(void* param) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; if (pJoinOperator->pTagEqualConditions != NULL) { taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf); - taosArrayDestroyEx(pJoinOperator->rightTagKeys, freeGroupKeyData); taosArrayDestroy(pJoinOperator->rightTagCols); taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf); - taosArrayDestroyEx(pJoinOperator->leftTagKeys, freeGroupKeyData); taosArrayDestroy(pJoinOperator->leftTagCols); } nodesDestroyNode(pJoinOperator->pCondAfterMerge); @@ -482,8 +426,7 @@ static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* right SSHashObj* buildTable = tSimpleHashInit(buildTableCap, hashFn); for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { SRowLocation* rightRow = taosArrayGet(rightRowLocations, i); - fillGroupKeyValsFromTagCols(pInfo->rightTagCols, pInfo->rightTagKeys, rightRow->pDataBlock, rightRow->pos); - int32_t keyLen = combineGroupKeysIntoBuf(pInfo->rightTagKeyBuf, pInfo->rightTagKeys); + int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightTagCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightTagKeyBuf); SArray** ppRows = tSimpleHashGet(buildTable, pInfo->rightTagKeyBuf, keyLen); if (!ppRows) { SArray* rows = taosArrayInit(4, sizeof(SRowLocation)); @@ -511,8 +454,7 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); SArray* pRightRows = NULL; if (rightTableHash != NULL) { - fillGroupKeyValsFromTagCols(pJoinInfo->leftTagCols, pJoinInfo->leftTagKeys, leftRow->pDataBlock, leftRow->pos); - int32_t keyLen = combineGroupKeysIntoBuf(pJoinInfo->leftTagKeyBuf, pJoinInfo->leftTagKeys); + int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftTagCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftTagKeyBuf); SArray** ppRightRows = tSimpleHashGet(rightTableHash, pJoinInfo->leftTagKeyBuf, keyLen); if (!ppRightRows) { continue; From 14d4a81e750303da755b121da8c2c36dfac93dff Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 18 May 2023 11:15:04 +0800 Subject: [PATCH 07/13] fix: build hash table is created during creating join operator --- source/libs/executor/src/joinoperator.c | 62 +++++++++++++------------ 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 68f6951c26..c90c18e6f5 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -35,7 +35,7 @@ typedef struct SJoinRowCtx { int32_t leftRowIdx; int32_t rightRowIdx; - SSHashObj* buildTableTSRange; + bool rightUseBuildTable; SArray* rightRowLocations; } SJoinRowCtx; @@ -62,6 +62,7 @@ typedef struct SJoinOperatorInfo { char* rightTagKeyBuf; int32_t rightTagKeyLen; + SSHashObj* rightBuildTable; SJoinRowCtx rowCtx; } SJoinOperatorInfo; @@ -265,6 +266,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols); initTagColskeyBuf(&pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols); initTagColskeyBuf(&pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->rightBuildTable = tSimpleHashInit(256, hashFn); } pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, pDownstream, numOfDownstream); @@ -292,9 +295,22 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { pColumn->scale = pColumnNode->node.resType.scale; } +static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) { + void* p = NULL; + int32_t iter = 0; + + while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) { + SArray* rows = (*(SArray**)p); + taosArrayDestroy(rows); + } + + tSimpleHashCleanup(pBuildTable); +} + void destroyMergeJoinOperator(void* param) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; if (pJoinOperator->pTagEqualConditions != NULL) { + mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable); taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf); taosArrayDestroy(pJoinOperator->rightTagCols); @@ -420,10 +436,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator return 0; } -static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations,SSHashObj** ppHashObj) { - int32_t buildTableCap = TMIN(taosArrayGetSize(rightRowLocations), 256); - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - SSHashObj* buildTable = tSimpleHashInit(buildTableCap, hashFn); +static int32_t mergeJoinFillBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations,SSHashObj* buildTable) { for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { SRowLocation* rightRow = taosArrayGet(rightRowLocations, i); int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightTagCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightTagKeyBuf); @@ -436,13 +449,12 @@ static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* right taosArrayPush(*ppRows, rightRow); } } - *ppHashObj = buildTable; return TSDB_CODE_SUCCESS; } static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows, const SArray* leftRowLocations, int32_t leftRowIdx, - int32_t rightRowIdx, SSHashObj* rightTableHash, SArray* rightRowLocations, bool* pReachThreshold) { + int32_t rightRowIdx, bool useBuildTableTSRange, SArray* rightRowLocations, bool* pReachThreshold) { *pReachThreshold = false; uint32_t limitRowNum = pOperator->resultInfo.threshold; SJoinOperatorInfo* pJoinInfo = pOperator->info; @@ -453,9 +465,9 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) { SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); SArray* pRightRows = NULL; - if (rightTableHash != NULL) { + if (useBuildTableTSRange) { int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftTagCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftTagKeyBuf); - SArray** ppRightRows = tSimpleHashGet(rightTableHash, pJoinInfo->leftTagKeyBuf, keyLen); + SArray** ppRightRows = tSimpleHashGet(pJoinInfo->rightBuildTable, pJoinInfo->leftTagKeyBuf, keyLen); if (!ppRightRows) { continue; } @@ -488,20 +500,8 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* return TSDB_CODE_SUCCESS; } -static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) { - void* p = NULL; - int32_t iter = 0; - - while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) { - SArray* rows = (*(SArray**)p); - taosArrayDestroy(rows); - } - - tSimpleHashCleanup(pBuildTable); -} - static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks, - SArray* rightCreatedBlocks, SSHashObj* rightTableHash, SArray* rightRowLocations) { + SArray* rightCreatedBlocks, bool rightUseBuildTable, SArray* rightRowLocations) { for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); blockDataDestroy(pBlock); @@ -514,8 +514,8 @@ static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* lef if (rightRowLocations != NULL) { taosArrayDestroy(rightRowLocations); } - if (rightTableHash != NULL) { - mergeJoinDestoryBuildTable(rightTableHash); + if (rightUseBuildTable) { + tSimpleHashClear(pJoinInfo->rightBuildTable); } taosArrayDestroy(leftCreatedBlocks); @@ -525,7 +525,7 @@ static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* lef pJoinInfo->rowCtx.leftRowLocations = NULL; pJoinInfo->rowCtx.leftCreatedBlocks = NULL; pJoinInfo->rowCtx.rightCreatedBlocks = NULL; - pJoinInfo->rowCtx.buildTableTSRange = NULL; + pJoinInfo->rowCtx.rightUseBuildTable = false; pJoinInfo->rowCtx.rightRowLocations = NULL; } @@ -540,11 +540,12 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t int32_t leftRowIdx = 0; int32_t rightRowIdx = 0; SSHashObj* rightTableHash = NULL; + bool rightUseBuildTable = false; if (pJoinInfo->rowCtx.rowRemains) { leftRowLocations = pJoinInfo->rowCtx.leftRowLocations; leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks; - rightTableHash = pJoinInfo->rowCtx.buildTableTSRange; + rightUseBuildTable = pJoinInfo->rowCtx.rightUseBuildTable; rightRowLocations = pJoinInfo->rowCtx.rightRowLocations; rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks; leftRowIdx = pJoinInfo->rowCtx.leftRowIdx; @@ -561,7 +562,8 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); if (pJoinInfo->pTagEqualConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) { - mergeJoinCreateBuildTable(pJoinInfo, rightRowLocations, &rightTableHash); + mergeJoinFillBuildTable(pJoinInfo, rightRowLocations, pJoinInfo->rightBuildTable); + rightUseBuildTable = true; taosArrayDestroy(rightRowLocations); rightRowLocations = NULL; } @@ -578,12 +580,12 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t if (code == TSDB_CODE_SUCCESS) { mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx, - rightRowIdx, rightTableHash, rightRowLocations, &reachThreshold); + rightRowIdx, rightUseBuildTable, rightRowLocations, &reachThreshold); } if (!reachThreshold) { mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks, - rightTableHash, rightRowLocations); + rightUseBuildTable, rightRowLocations); } else { pJoinInfo->rowCtx.rowRemains = true; @@ -591,7 +593,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t pJoinInfo->rowCtx.leftRowLocations = leftRowLocations; pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; - pJoinInfo->rowCtx.buildTableTSRange = rightTableHash; + pJoinInfo->rowCtx.rightUseBuildTable = true; pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; } return TSDB_CODE_SUCCESS; From d7f8757844da2a8db1d9a756d2d9884a3b57cb91 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 18 May 2023 11:23:50 +0800 Subject: [PATCH 08/13] fix: memory leak --- source/libs/executor/src/joinoperator.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index c90c18e6f5..0add034673 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -515,6 +515,12 @@ static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* lef taosArrayDestroy(rightRowLocations); } if (rightUseBuildTable) { + void* p = NULL; + int32_t iter = 0; + while ((p = tSimpleHashIterate(pJoinInfo->rightBuildTable, p, &iter)) != NULL) { + SArray* rows = (*(SArray**)p); + taosArrayDestroy(rows); + } tSimpleHashClear(pJoinInfo->rightBuildTable); } From 7102c107ebd4884541bfa47d428b73020b5a7d43 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 18 May 2023 13:19:52 +0800 Subject: [PATCH 09/13] fix: remove parameter of filling build table --- source/libs/executor/src/joinoperator.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 0add034673..23bd7c5700 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -436,15 +436,16 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator return 0; } -static int32_t mergeJoinFillBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations,SSHashObj* buildTable) { +static int32_t mergeJoinFillBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations) { for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { + tSimpleHashClear(pInfo->rightBuildTable); SRowLocation* rightRow = taosArrayGet(rightRowLocations, i); int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightTagCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightTagKeyBuf); - SArray** ppRows = tSimpleHashGet(buildTable, pInfo->rightTagKeyBuf, keyLen); + SArray** ppRows = tSimpleHashGet(pInfo->rightBuildTable, pInfo->rightTagKeyBuf, keyLen); if (!ppRows) { SArray* rows = taosArrayInit(4, sizeof(SRowLocation)); taosArrayPush(rows, rightRow); - tSimpleHashPut(buildTable, pInfo->rightTagKeyBuf, keyLen, &rows, POINTER_BYTES); + tSimpleHashPut(pInfo->rightBuildTable, pInfo->rightTagKeyBuf, keyLen, &rows, POINTER_BYTES); } else { taosArrayPush(*ppRows, rightRow); } @@ -568,7 +569,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); if (pJoinInfo->pTagEqualConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) { - mergeJoinFillBuildTable(pJoinInfo, rightRowLocations, pJoinInfo->rightBuildTable); + mergeJoinFillBuildTable(pJoinInfo, rightRowLocations); rightUseBuildTable = true; taosArrayDestroy(rightRowLocations); rightRowLocations = NULL; From 51cb79fc1d93eec01fba74a513c7e7993c3c8e98 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 18 May 2023 13:28:46 +0800 Subject: [PATCH 10/13] fix: set rightUseBuildTable correctly --- source/libs/executor/src/joinoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 23bd7c5700..700a59ccd8 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -600,7 +600,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t pJoinInfo->rowCtx.leftRowLocations = leftRowLocations; pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; - pJoinInfo->rowCtx.rightUseBuildTable = true; + pJoinInfo->rowCtx.rightUseBuildTable = rightUseBuildTable; pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; } return TSDB_CODE_SUCCESS; From 4db88e8ffe49e5dbbb56caf25b51d157dc13042e Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 18 May 2023 15:45:58 +0800 Subject: [PATCH 11/13] fix: fix ci comments --- source/libs/executor/src/joinoperator.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 700a59ccd8..9dee58367c 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -438,7 +438,6 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator static int32_t mergeJoinFillBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations) { for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { - tSimpleHashClear(pInfo->rightBuildTable); SRowLocation* rightRow = taosArrayGet(rightRowLocations, i); int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightTagCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightTagKeyBuf); SArray** ppRows = tSimpleHashGet(pInfo->rightBuildTable, pInfo->rightTagKeyBuf, keyLen); From 99b424b181d527a3ce368b199dcf8be0a142d70d Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 18 May 2023 16:26:20 +0800 Subject: [PATCH 12/13] fix: add test case --- tests/script/output.txt | 6 ++ tests/script/sh/l2norm2.c | 71 ++++++++++++++++++++ tests/script/tsim/parser/join_manyblocks.sim | 8 +-- 3 files changed, 81 insertions(+), 4 deletions(-) create mode 100644 tests/script/output.txt create mode 100644 tests/script/sh/l2norm2.c diff --git a/tests/script/output.txt b/tests/script/output.txt new file mode 100644 index 0000000000..0c7c386bbc --- /dev/null +++ b/tests/script/output.txt @@ -0,0 +1,6 @@ +[03/30 08:08:52.140115] ERROR: failed to connect native (null):6030, code: 0x8000000b, reason: Unable to establish connection +[03/30 08:08:52.140136] ERROR: insert test process failed +[03/30 08:09:02.860424] ERROR: failed to connect native (null):6030, code: 0x8000000b, reason: Unable to establish connection +[03/30 08:09:02.860445] ERROR: insert test process failed +[03/30 08:10:57.082603] ERROR: failed to connect native (null):6030, code: 0x8000000b, reason: Unable to establish connection +[03/30 08:10:57.082626] ERROR: insert test process failed diff --git a/tests/script/sh/l2norm2.c b/tests/script/sh/l2norm2.c new file mode 100644 index 0000000000..0739f3a23b --- /dev/null +++ b/tests/script/sh/l2norm2.c @@ -0,0 +1,71 @@ +#include +#include +#include +#include + +#include "taosudf.h" + +DLL_EXPORT int32_t l2norm2_init() { + return 0; +} + +DLL_EXPORT int32_t l2norm2_destroy() { + return 0; +} + +DLL_EXPORT int32_t l2norm2_start(SUdfInterBuf *buf) { + *(int64_t*)(buf->buf) = 0; + buf->bufLen = sizeof(double); + buf->numOfResult = 1; + return 0; +} + +DLL_EXPORT int32_t l2norm2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { + double sumSquares = *(double*)interBuf->buf; + int8_t numNotNull = 0; + for (int32_t i = 0; i < block->numOfCols; ++i) { + SUdfColumn* col = block->udfCols[i]; + if (!(col->colMeta.type == TSDB_DATA_TYPE_INT || + col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) { + return TSDB_CODE_UDF_INVALID_INPUT; + } + } + for (int32_t i = 0; i < block->numOfCols; ++i) { + for (int32_t j = 0; j < block->numOfRows; ++j) { + SUdfColumn* col = block->udfCols[i]; + if (udfColDataIsNull(col, j)) { + continue; + } + switch (col->colMeta.type) { + case TSDB_DATA_TYPE_INT: { + char* cell = udfColDataGetData(col, j); + int32_t num = *(int32_t*)cell; + sumSquares += (double)num * num; + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + char* cell = udfColDataGetData(col, j); + double num = *(double*)cell; + sumSquares += num * num; + break; + } + default: + break; + } + ++numNotNull; + } + } + + *(double*)(newInterBuf->buf) = sumSquares; + newInterBuf->bufLen = sizeof(double); + newInterBuf->numOfResult = 1; + return 0; +} + +DLL_EXPORT int32_t l2norm2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) { + double sumSquares = *(double*)(buf->buf); + *(double*)(resultData->buf) = sumSquares; + resultData->bufLen = sizeof(double); + resultData->numOfResult = 1; + return 0; +} diff --git a/tests/script/tsim/parser/join_manyblocks.sim b/tests/script/tsim/parser/join_manyblocks.sim index a40a75f50c..7fd0df21b3 100644 --- a/tests/script/tsim/parser/join_manyblocks.sim +++ b/tests/script/tsim/parser/join_manyblocks.sim @@ -6,8 +6,8 @@ sql connect $dbPrefix = join_m_db $tbPrefix = join_tb $mtPrefix = join_mt -$tbNum = 3 -$rowNum = 2000 +$tbNum = 20 +$rowNum = 200 $totalNum = $tbNum * $rowNum print =============== join_manyBlocks.sim @@ -78,8 +78,8 @@ print ==============> td-3313 sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1; print $row -if $row != 6000 then - print expect 6000, actual: $row +if $row != 4000 then + print expect 4000, actual: $row return -1 endi From 3e02bb8faa96dfb7c23fe34357d47231e1033dd9 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 18 May 2023 16:26:52 +0800 Subject: [PATCH 13/13] fix: remove files --- tests/script/output.txt | 6 ---- tests/script/sh/l2norm2.c | 71 --------------------------------------- 2 files changed, 77 deletions(-) delete mode 100644 tests/script/output.txt delete mode 100644 tests/script/sh/l2norm2.c diff --git a/tests/script/output.txt b/tests/script/output.txt deleted file mode 100644 index 0c7c386bbc..0000000000 --- a/tests/script/output.txt +++ /dev/null @@ -1,6 +0,0 @@ -[03/30 08:08:52.140115] ERROR: failed to connect native (null):6030, code: 0x8000000b, reason: Unable to establish connection -[03/30 08:08:52.140136] ERROR: insert test process failed -[03/30 08:09:02.860424] ERROR: failed to connect native (null):6030, code: 0x8000000b, reason: Unable to establish connection -[03/30 08:09:02.860445] ERROR: insert test process failed -[03/30 08:10:57.082603] ERROR: failed to connect native (null):6030, code: 0x8000000b, reason: Unable to establish connection -[03/30 08:10:57.082626] ERROR: insert test process failed diff --git a/tests/script/sh/l2norm2.c b/tests/script/sh/l2norm2.c deleted file mode 100644 index 0739f3a23b..0000000000 --- a/tests/script/sh/l2norm2.c +++ /dev/null @@ -1,71 +0,0 @@ -#include -#include -#include -#include - -#include "taosudf.h" - -DLL_EXPORT int32_t l2norm2_init() { - return 0; -} - -DLL_EXPORT int32_t l2norm2_destroy() { - return 0; -} - -DLL_EXPORT int32_t l2norm2_start(SUdfInterBuf *buf) { - *(int64_t*)(buf->buf) = 0; - buf->bufLen = sizeof(double); - buf->numOfResult = 1; - return 0; -} - -DLL_EXPORT int32_t l2norm2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { - double sumSquares = *(double*)interBuf->buf; - int8_t numNotNull = 0; - for (int32_t i = 0; i < block->numOfCols; ++i) { - SUdfColumn* col = block->udfCols[i]; - if (!(col->colMeta.type == TSDB_DATA_TYPE_INT || - col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) { - return TSDB_CODE_UDF_INVALID_INPUT; - } - } - for (int32_t i = 0; i < block->numOfCols; ++i) { - for (int32_t j = 0; j < block->numOfRows; ++j) { - SUdfColumn* col = block->udfCols[i]; - if (udfColDataIsNull(col, j)) { - continue; - } - switch (col->colMeta.type) { - case TSDB_DATA_TYPE_INT: { - char* cell = udfColDataGetData(col, j); - int32_t num = *(int32_t*)cell; - sumSquares += (double)num * num; - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - char* cell = udfColDataGetData(col, j); - double num = *(double*)cell; - sumSquares += num * num; - break; - } - default: - break; - } - ++numNotNull; - } - } - - *(double*)(newInterBuf->buf) = sumSquares; - newInterBuf->bufLen = sizeof(double); - newInterBuf->numOfResult = 1; - return 0; -} - -DLL_EXPORT int32_t l2norm2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) { - double sumSquares = *(double*)(buf->buf); - *(double*)(resultData->buf) = sumSquares; - resultData->bufLen = sizeof(double); - resultData->numOfResult = 1; - return 0; -}