diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 2319643b09..f3ec0d85e0 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -246,6 +246,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, QUERY_NODE_PHYSICAL_PLAN_PROJECT, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, + QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, QUERY_NODE_PHYSICAL_PLAN_MERGE, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 02459ed951..61fe0d0b0f 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -410,6 +410,17 @@ typedef struct SSortMergeJoinPhysiNode { SNode* pColEqualOnConditions; } SSortMergeJoinPhysiNode; +typedef struct SHashJoinPhysiNode { + SPhysiNode node; + EJoinType joinType; + SNodeList* pOnLeft; + SNodeList* pOnRight; + SNode* pOnConditions; + SNode* pFilterConditions; + SNodeList* pTargets; + SQueryStat inputStat[2]; +} SHashJoinPhysiNode; + typedef struct SAggPhysiNode { SPhysiNode node; SNodeList* pExprs; // these are expression list of group_by_clause and parameter expression of aggregate function diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index de3eba599d..e0325bebd6 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -212,6 +212,11 @@ typedef struct SQueryNodeStat { int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT } SQueryNodeStat; +typedef struct SQueryStat { + int64_t inputRowNum; + int32_t inputRowSize; +} SQueryStat; + int32_t initTaskQueue(); int32_t cleanupTaskQueue(); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 30911c6061..a0a15d32c1 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -162,7 +162,7 @@ int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode size_t getTableTagsBufLen(const SNodeList* pGroups); SArray* createSortInfo(SNodeList* pNodeList); -SArray* extractPartitionColInfo(SNodeList* pNodeList); +SArray* makeColumnArrayFromList(SNodeList* pNodeList); int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type, SColMatchInfo* pMatchInfo); diff --git a/source/libs/executor/inc/hashjoin.h b/source/libs/executor/inc/hashjoin.h new file mode 100755 index 0000000000..b1c1052c88 --- /dev/null +++ b/source/libs/executor/inc/hashjoin.h @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef TDENGINE_HASHJOIN_H +#define TDENGINE_HASHJOIN_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SHJoinRowCtx { + bool rowRemains; + int64_t ts; + SArray* leftRowLocations; + SArray* leftCreatedBlocks; + SArray* rightCreatedBlocks; + int32_t leftRowIdx; + int32_t rightRowIdx; + + bool rightUseBuildTable; + SArray* rightRowLocations; +} SHJoinRowCtx; + +typedef struct SRowLocation { + SSDataBlock* pDataBlock; + int32_t pos; +} SRowLocation; + +typedef struct SColBufInfo { + int32_t slotId; + bool vardata; + int32_t* offset; + int32_t bytes; + char* data; +} SColBufInfo; + +typedef struct SJoinTableInfo { + int32_t keyNum; + SColBufInfo* keyCols; + char* keyBuf; + int32_t valNum; + SColBufInfo* valCols; + char* valBuf; +} SJoinTableInfo; + +typedef struct SHJoinOperatorInfo { + SSDataBlock* pRes; + int32_t joinType; + + SJoinTableInfo tbs[2]; + + SJoinTableInfo* pBuild; + SJoinTableInfo* pProbe; + + int32_t pLeftKeyNum; + SColBufInfo* pLeftKeyInfo; + char* pLeftKeyBuf; + int32_t pLeftValNum; + SColBufInfo* pLeftValInfo; + char* pLeftValBuf; + + int32_t pRightKeyNum; + SColBufInfo* pRightKeyInfo; + char* pRightKeyBuf; + int32_t pRightValNum; + SColBufInfo* pRightValInfo; + char* pRightValBuf; + + SNode* pCondAfterJoin; + + SSHashObj* pKeyHash; + + SQueryStat inputStat[2]; + + SHJoinRowCtx rowCtx; +} SHJoinOperatorInfo; +static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator); +static void destroyHashJoinOperator(void* param); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_HASHJOIN_H diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 331a2fa7ab..9d7d906467 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1246,7 +1246,7 @@ int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, return TSDB_CODE_SUCCESS; } -SArray* extractPartitionColInfo(SNodeList* pNodeList) { +SArray* makeColumnArrayFromList(SNodeList* pNodeList) { if (!pNodeList) { return NULL; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index c448ea0160..2c11e460f9 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -846,7 +846,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols); - pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys); + pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys); if (pPartNode->pExprs != NULL) { int32_t num = 0; @@ -1240,7 +1240,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr goto _error; } - pInfo->partitionSup.pGroupCols = extractPartitionColInfo(pPartNode->part.pPartitionKeys); + pInfo->partitionSup.pGroupCols = makeColumnArrayFromList(pPartNode->part.pPartitionKeys); if (pPartNode->part.pExprs != NULL) { int32_t num = 0; diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c new file mode 100755 index 0000000000..6886de3bd5 --- /dev/null +++ b/source/libs/executor/src/hashjoinoperator.c @@ -0,0 +1,320 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executorInt.h" +#include "filter.h" +#include "function.h" +#include "operator.h" +#include "os.h" +#include "querynodes.h" +#include "querytask.h" +#include "tcompare.h" +#include "tdatablock.h" +#include "thash.h" +#include "tmsg.h" +#include "ttypes.h" +#include "hashjoin.h" + +int32_t initJoinKeyBufInfo(SColBufInfo** ppInfo, int32_t* colNum, SNodeList* pList, char** ppBuf) { + *colNum = LIST_LENGTH(pList); + + (*ppInfo) = taosMemoryMalloc((*colNum) * sizeof(SColBufInfo)); + if (NULL == (*ppInfo)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int64_t bufSize = 0; + for (int32_t i = 0; i < *colNum; ++i) { + SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pList, i); + + (*ppInfo)->slotId = pColNode->slotId; + (*ppInfo)->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); + (*ppInfo)->bytes = pColNode->node.resType.bytes; + bufSize += pColNode->node.resType.bytes; + } + + *ppBuf = taosMemoryMalloc(bufSize); + if (NULL == *ppBuf) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SNodeList* pKeyList, int32_t idx) { + SJoinTableInfo* pTable = &pJoin->tbs[idx]; + + int32_t code = initJoinKeyBufInfo(&pTable->keyCols, &pTable->keyNum, pKeyList, &pTable->keyBuf); + if (code) { + return code; + } +} + +SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, + SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { + SHJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SHJoinOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + + int32_t code = TSDB_CODE_SUCCESS; + if (pOperator == NULL || pInfo == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + int32_t numOfCols = 0; + pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + + SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); + initResultSizeInfo(&pOperator->resultInfo, 4096); + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + + setOperatorInfo(pOperator, "HashJoinOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->exprSupp.numOfExprs = numOfCols; + + initJoinTableInfo(pJoinNode, pJoinNode->pOnLeft, 0); + initJoinTableInfo(pJoinNode, pJoinNode->pOnRight, 1); + + code = initJoinColBufInfo(&pInfo->pRightKeyInfo, &pInfo->pRightKeyNum, pJoinNode->pOnRight, &pInfo->pRightKeyBuf); + if (code) { + goto _error; + } + + memcpy(pInfo->inputStat, pJoinNode->inputStat, sizeof(pJoinNode->inputStat)); + + size_t hashCap = pInfo->inputStat[1].inputRowNum > 0 ? (pInfo->inputStat[1].inputRowNum * 1.5) : 1024; + pInfo->pKeyHash = tSimpleHashInit(hashCap, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); + if (pInfo->pKeyHash == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + if (pJoinNode->pFilterConditions != NULL && pJoinNode->node.pConditions != NULL) { + pInfo->pCondAfterJoin = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + if (pInfo->pCondAfterJoin == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterJoin); + pLogicCond->pParameterList = nodesMakeList(); + if (pLogicCond->pParameterList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pFilterConditions)); + nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); + pLogicCond->condType = LOGIC_COND_TYPE_AND; + } else if (pJoinNode->pFilterConditions != NULL) { + pInfo->pCondAfterJoin = nodesCloneNode(pJoinNode->pFilterConditions); + } else if (pJoinNode->node.pConditions != NULL) { + pInfo->pCondAfterJoin = nodesCloneNode(pJoinNode->node.pConditions); + } else { + pInfo->pCondAfterJoin = NULL; + } + + code = filterInitFromNode(pInfo->pCondAfterJoin, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroHashJoinOperator, optrDefaultBufFn, NULL); + code = appendDownstream(pOperator, pDownstream, numOfDownstream); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + +_error: + if (pInfo != NULL) { + destroyMergeJoinOperator(pInfo); + } + + taosMemoryFree(pOperator); + pTaskInfo->code = code; + return NULL; +} + +void destroHashJoinOperator(void* param) { + SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param; + if (pJoinOperator->pColEqualOnConditions != NULL) { + mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable); + taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf); + taosArrayDestroy(pJoinOperator->rightEqOnCondCols); + + taosMemoryFreeClear(pJoinOperator->leftEqOnCondKeyBuf); + taosArrayDestroy(pJoinOperator->leftEqOnCondCols); + } + nodesDestroyNode(pJoinOperator->pCondAfterMerge); + + taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks); + taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks); + taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations); + taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations); + + pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); + taosMemoryFreeClear(param); +} + +static void doHashJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { + SHJoinOperatorInfo* pJoinInfo = pOperator->info; + + int32_t nrows = pRes->info.rows; + + bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false; + + while (1) { + int64_t leftTs = 0; + int64_t rightTs = 0; + if (pJoinInfo->rowCtx.rowRemains) { + leftTs = pJoinInfo->rowCtx.ts; + rightTs = pJoinInfo->rowCtx.ts; + } else { + bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); + if (!hasNextTs) { + break; + } + } + + if (leftTs == rightTs) { + mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows); + } else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) { + pJoinInfo->leftPos += 1; + + if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) { + continue; + } + } else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) { + pJoinInfo->rightPos += 1; + if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) { + continue; + } + } + + // the pDataBlock are always the same one, no need to call this again + pRes->info.rows = nrows; + pRes->info.dataLoad = 1; + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + break; + } + } +} + +int32_t setColBufInfo(SSDataBlock* pBlock, int32_t colNum, SColBufInfo* pColList) { + for (int32_t i = 0; i < colNum; ++i) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pColList[i].slotId); + if (pColList[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { + qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pColList[i].slotId, pCol->info.type, pColList[i].vardata); + return TSDB_CODE_INVALID_PARA; + } + if (pColList[i].bytes != IS_VAR_DATA_TYPE(pCol->info.bytes)) { + qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pColList[i].slotId, pCol->info.bytes, pColList[i].bytes); + return TSDB_CODE_INVALID_PARA; + } + pColList[i].data = pCol->pData; + if (pColList[i].vardata) { + pColList[i].offset = pCol->varmeta.offset; + } + } + + return TSDB_CODE_SUCCESS; +} + +FORCE_INLINE void copyColDataToBuf(int32_t colNum, int32_t rowIdx, SColBufInfo* pColList, char* pBuf, size_t *bufLen) { + char *pData = NULL; + + *bufLen = 0; + for (int32_t i = 0; i < colNum; ++i) { + if (pColList[i].vardata) { + pData = pColList[i].data + pColList[i].offset[rowIdx]; + memcpy(pBuf + *bufLen, pData, varDataTLen(pData)); + *bufLen += varDataTLen(pData); + } + } +} + + +int32_t addBlockRowsKeyToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) { + int32_t code = setColBufInfo(pBlock, pJoin->pRightKeyNum, pJoin->pRightKeyInfo); + if (code) { + return code; + } + + size_t bufLen = 0; + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + copyColDataToBuf(pJoin->pRightKeyNum, i, pJoin->pRightKeyInfo, pJoin->pRightKeyBuf, &bufLen); + } + + tSimpleHashPut(pJoin->pKeyHash, pJoin->pRightKeyBuf, bufLen, NULL, 0); +} + +int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) { + SHJoinOperatorInfo* pJoin = pOperator->info; + SSDataBlock* pBlock = NULL; + int32_t code = TSDB_CODE_SUCCESS; + + while (true) { + pBlock = pOperator->pDownstream[1]->fpSet.getNextFn(pOperator->pDownstream[1]); + if (NULL == pBlock) { + break; + } + + code = addBlockRowsKeyToHash(pBlock, pJoin); + if (code) { + return code; + } + } + + return TSDB_CODE_SUCCESS; +} + +SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { + SHJoinOperatorInfo* pJoinInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pRes = pJoinInfo->pRes; + blockDataCleanup(pRes); + + if (NULL == pJoinInfo->pKeyHash) { + code = buildJoinKeyHash(pJoinInfo); + if (code) { + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } + + if (tSimpleHashGetSize(pJoinInfo->pKeyHash) <= 0) { + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + return NULL; + } + } + + while (true) { + int32_t numOfRowsBefore = pRes->info.rows; + doHashJoinImpl(pOperator, pRes); + int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore; + if (numOfNewRows == 0) { + break; + } + if (pOperator->exprSupp.pFilterInfo != NULL) { + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); + } + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + break; + } + } + return (pRes->info.rows > 0) ? pRes : NULL; +} diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/mergejoinoperator.c similarity index 95% rename from source/libs/executor/src/joinoperator.c rename to source/libs/executor/src/mergejoinoperator.c index 73143fdba7..a7cda47a7c 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -26,7 +26,7 @@ #include "tmsg.h" #include "ttypes.h" -typedef struct SJoinRowCtx { +typedef struct SMJoinRowCtx { bool rowRemains; int64_t ts; SArray* leftRowLocations; @@ -37,9 +37,9 @@ typedef struct SJoinRowCtx { bool rightUseBuildTable; SArray* rightRowLocations; -} SJoinRowCtx; +} SMJoinRowCtx; -typedef struct SJoinOperatorInfo { +typedef struct SMJoinOperatorInfo { SSDataBlock* pRes; int32_t joinType; int32_t inputOrder; @@ -63,16 +63,16 @@ typedef struct SJoinOperatorInfo { int32_t rightEqOnCondKeyLen; SSHashObj* rightBuildTable; - SJoinRowCtx rowCtx; -} SJoinOperatorInfo; + SMJoinRowCtx rowCtx; +} SMJoinOperatorInfo; static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); static void destroyMergeJoinOperator(void* param); -static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num, +static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr); -static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num, +static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) { SNode* pMergeCondition = pJoinNode->pMergeCondition; if (nodeType(pMergeCondition) != QUERY_NODE_OPERATOR) { @@ -104,7 +104,7 @@ static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDown setJoinColumnInfo(&pInfo->rightCol, rightTsCol); } -static void extractEqualOnCondColsFromOper(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstreams, SOperatorNode* pOperNode, +static void extractEqualOnCondColsFromOper(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstreams, SOperatorNode* pOperNode, SColumn* pLeft, SColumn* pRight) { SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft; SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight; @@ -117,7 +117,7 @@ static void extractEqualOnCondColsFromOper(SJoinOperatorInfo* pInfo, SOperatorIn } } -static void extractEqualOnCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownStream, SNode* pEqualOnCondNode, +static void extractEqualOnCondCols(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownStream, SNode* pEqualOnCondNode, SArray* leftTagEqCols, SArray* rightTagEqCols) { SColumn left = {0}; SColumn right = {0}; @@ -200,7 +200,7 @@ static int32_t fillKeyBufFromTagCols(SArray* pCols, SSDataBlock* pBlock, int32_t SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { - SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); + SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); int32_t code = TSDB_CODE_SUCCESS; @@ -308,7 +308,7 @@ static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) { } void destroyMergeJoinOperator(void* param) { - SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; + SMJoinOperatorInfo* pJoinOperator = (SMJoinOperatorInfo*)param; if (pJoinOperator->pColEqualOnConditions != NULL) { mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable); taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf); @@ -331,7 +331,7 @@ void destroyMergeJoinOperator(void* param) { static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow, SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, int32_t rightPos) { - SJoinOperatorInfo* pJoinInfo = pOperator->info; + SMJoinOperatorInfo* pJoinInfo = pOperator->info; for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); @@ -408,7 +408,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator SArray* createdBlocks) { ASSERT(whichChild == 0 || whichChild == 1); - SJoinOperatorInfo* pJoinInfo = pOperator->info; + SMJoinOperatorInfo* pJoinInfo = pOperator->info; int32_t endPos = -1; SSDataBlock* dataBlock = startDataBlock; mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks); @@ -441,7 +441,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator return 0; } -static int32_t mergeJoinFillBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations) { +static int32_t mergeJoinFillBuildTable(SMJoinOperatorInfo* pInfo, SArray* rightRowLocations) { for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { SRowLocation* rightRow = taosArrayGet(rightRowLocations, i); int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightEqOnCondCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightEqOnCondKeyBuf); @@ -462,7 +462,7 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* int32_t rightRowIdx, bool useBuildTableTSRange, SArray* rightRowLocations, bool* pReachThreshold) { *pReachThreshold = false; uint32_t limitRowNum = pOperator->resultInfo.threshold; - SJoinOperatorInfo* pJoinInfo = pOperator->info; + SMJoinOperatorInfo* pJoinInfo = pOperator->info; size_t leftNumJoin = taosArrayGetSize(leftRowLocations); int32_t i,j; @@ -505,7 +505,7 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* return TSDB_CODE_SUCCESS; } -static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks, +static void mergeJoinDestroyTSRangeCtx(SMJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks, SArray* rightCreatedBlocks, bool rightUseBuildTable, SArray* rightRowLocations) { for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); @@ -543,7 +543,7 @@ static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* lef static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes, int32_t* nRows) { int32_t code = TSDB_CODE_SUCCESS; - SJoinOperatorInfo* pJoinInfo = pOperator->info; + SMJoinOperatorInfo* pJoinInfo = pOperator->info; SArray* leftRowLocations = NULL; SArray* rightRowLocations = NULL; SArray* leftCreatedBlocks = NULL; @@ -611,7 +611,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t } static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { - SJoinOperatorInfo* pJoinInfo = pOperator->info; + SMJoinOperatorInfo* pJoinInfo = pOperator->info; if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { SOperatorInfo* ds1 = pOperator->pDownstream[0]; @@ -647,7 +647,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs } static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { - SJoinOperatorInfo* pJoinInfo = pOperator->info; + SMJoinOperatorInfo* pJoinInfo = pOperator->info; int32_t nrows = pRes->info.rows; @@ -691,7 +691,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) } SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { - SJoinOperatorInfo* pJoinInfo = pOperator->info; + SMJoinOperatorInfo* pJoinInfo = pOperator->info; SSDataBlock* pRes = pJoinInfo->pRes; blockDataCleanup(pRes);