Merge pull request #21328 from taosdata/szhou/fix-ts3405

fix: join improvement
This commit is contained in:
dapan1121 2023-05-19 08:51:06 +08:00 committed by GitHub
commit 386b8484ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 387 additions and 78 deletions

View File

@ -112,6 +112,7 @@ typedef struct SJoinLogicNode {
SNode* pOnConditions;
bool isSingleTableJoin;
EOrder inputTsOrder;
SNode* pTagEqualConditions;
} SJoinLogicNode;
typedef struct SAggLogicNode {
@ -405,6 +406,7 @@ typedef struct SSortMergeJoinPhysiNode {
SNode* pOnConditions;
SNodeList* pTargets;
EOrder inputTsOrder;
SNode* pTagEqualCondtions;
} SSortMergeJoinPhysiNode;
typedef struct SAggPhysiNode {

View File

@ -30,11 +30,13 @@ typedef struct SJoinRowCtx {
bool rowRemains;
int64_t ts;
SArray* leftRowLocations;
SArray* rightRowLocations;
SArray* leftCreatedBlocks;
SArray* rightCreatedBlocks;
int32_t leftRowIdx;
int32_t rightRowIdx;
bool rightUseBuildTable;
SArray* rightRowLocations;
} SJoinRowCtx;
typedef struct SJoinOperatorInfo {
@ -50,7 +52,17 @@ typedef struct SJoinOperatorInfo {
int32_t rightPos;
SColumnInfo rightCol;
SNode* pCondAfterMerge;
SNode* pTagEqualConditions;
SArray* leftTagCols;
char* leftTagKeyBuf;
int32_t leftTagKeyLen;
SArray* rightTagCols;
char* rightTagKeyBuf;
int32_t rightTagKeyLen;
SSHashObj* rightBuildTable;
SJoinRowCtx rowCtx;
} SJoinOperatorInfo;
@ -92,6 +104,100 @@ static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDown
setJoinColumnInfo(&pInfo->rightCol, rightTsCol);
}
static void extractTagEqualColsFromOper(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstreams, SOperatorNode* pOperNode,
SColumn* pLeft, SColumn* pRight) {
SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft;
SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight;
if (pLeftNode->dataBlockId == pRightNode->dataBlockId || pLeftNode->dataBlockId == pDownstreams[0]->resultDataBlockId) {
*pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft);
*pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight);
} else {
*pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight);
*pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft);
}
}
static void extractTagEqualCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownStream, SNode* pTagEqualNode,
SArray* leftTagEqCols, SArray* rightTagEqCols) {
SColumn left = {0};
SColumn right = {0};
if (nodeType(pTagEqualNode) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)pTagEqualNode)->condType == LOGIC_COND_TYPE_AND) {
SNode* pNode = NULL;
FOREACH(pNode, ((SLogicConditionNode*)pTagEqualNode)->pParameterList) {
SOperatorNode* pOperNode = (SOperatorNode*)pNode;
extractTagEqualColsFromOper(pInfo, pDownStream, pOperNode, &left, &right);
taosArrayPush(leftTagEqCols, &left);
taosArrayPush(rightTagEqCols, &right);
}
return;
}
if (nodeType(pTagEqualNode) == QUERY_NODE_OPERATOR) {
SOperatorNode* pOperNode = (SOperatorNode*)pTagEqualNode;
extractTagEqualColsFromOper(pInfo, pDownStream, pOperNode, &left, &right);
taosArrayPush(leftTagEqCols, &left);
taosArrayPush(rightTagEqCols, &right);
}
}
static int32_t initTagColskeyBuf(int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i);
(*keyLen) += pCol->bytes; // actual data + null_flag
}
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
(*keyLen) += nullFlagSize;
(*keyBuf) = taosMemoryCalloc(1, (*keyLen));
if ((*keyBuf) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
static int32_t fillKeyBufFromTagCols(SArray* pCols, SSDataBlock* pBlock, int32_t rowIndex, void* pKey) {
SColumnDataAgg* pColAgg = NULL;
size_t numOfGroupCols = taosArrayGetSize(pCols);
char* isNull = (char*)pKey;
char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = (SColumn*) taosArrayGet(pCols, i);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
// valid range check. todo: return error code.
if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) {
continue;
}
if (pBlock->pBlockAgg != NULL) {
pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
}
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
isNull[i] = 1;
} else {
isNull[i] = 0;
char* val = colDataGetData(pColInfoData, rowIndex);
if (pCol->type == TSDB_DATA_TYPE_JSON) {
int32_t dataLen = getJsonValueLen(val);
memcpy(pStart, val, dataLen);
pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
varDataCopy(pStart, val);
pStart += varDataTLen(val);
} else {
memcpy(pStart, val, pCol->bytes);
pStart += pCol->bytes;
}
}
}
return (int32_t)(pStart - (char*)pKey);
}
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
@ -153,6 +259,16 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo->inputOrder = TSDB_ORDER_DESC;
}
pInfo->pTagEqualConditions = pJoinNode->pTagEqualCondtions;
if (pInfo->pTagEqualConditions != NULL) {
pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn));
pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn));
extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols);
initTagColskeyBuf(&pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols);
initTagColskeyBuf(&pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->rightBuildTable = tSimpleHashInit(256, hashFn);
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) {
@ -179,8 +295,28 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
pColumn->scale = pColumnNode->node.resType.scale;
}
static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) {
void* p = NULL;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) {
SArray* rows = (*(SArray**)p);
taosArrayDestroy(rows);
}
tSimpleHashCleanup(pBuildTable);
}
void destroyMergeJoinOperator(void* param) {
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
if (pJoinOperator->pTagEqualConditions != NULL) {
mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable);
taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf);
taosArrayDestroy(pJoinOperator->rightTagCols);
taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf);
taosArrayDestroy(pJoinOperator->leftTagCols);
}
nodesDestroyNode(pJoinOperator->pCondAfterMerge);
pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes);
@ -300,21 +436,122 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator
return 0;
}
static int32_t mergeJoinFillBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations) {
for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) {
SRowLocation* rightRow = taosArrayGet(rightRowLocations, i);
int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightTagCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightTagKeyBuf);
SArray** ppRows = tSimpleHashGet(pInfo->rightBuildTable, pInfo->rightTagKeyBuf, keyLen);
if (!ppRows) {
SArray* rows = taosArrayInit(4, sizeof(SRowLocation));
taosArrayPush(rows, rightRow);
tSimpleHashPut(pInfo->rightBuildTable, pInfo->rightTagKeyBuf, keyLen, &rows, POINTER_BYTES);
} else {
taosArrayPush(*ppRows, rightRow);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows,
const SArray* leftRowLocations, int32_t leftRowIdx,
int32_t rightRowIdx, bool useBuildTableTSRange, SArray* rightRowLocations, bool* pReachThreshold) {
*pReachThreshold = false;
uint32_t limitRowNum = pOperator->resultInfo.threshold;
SJoinOperatorInfo* pJoinInfo = pOperator->info;
size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
int32_t i,j;
for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) {
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
SArray* pRightRows = NULL;
if (useBuildTableTSRange) {
int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftTagCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftTagKeyBuf);
SArray** ppRightRows = tSimpleHashGet(pJoinInfo->rightBuildTable, pJoinInfo->leftTagKeyBuf, keyLen);
if (!ppRightRows) {
continue;
}
pRightRows = *ppRightRows;
} else {
pRightRows = rightRowLocations;
}
size_t rightRowsSize = taosArrayGetSize(pRightRows);
for (j = rightRowIdx; j < rightRowsSize; ++j) {
if (*nRows >= limitRowNum) {
*pReachThreshold = true;
break;
}
SRowLocation* rightRow = taosArrayGet(pRightRows, j);
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
rightRow->pos);
++*nRows;
}
if (*pReachThreshold) {
break;
}
}
if (*pReachThreshold) {
pJoinInfo->rowCtx.rowRemains = true;
pJoinInfo->rowCtx.leftRowIdx = i;
pJoinInfo->rowCtx.rightRowIdx = j;
}
return TSDB_CODE_SUCCESS;
}
static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks,
SArray* rightCreatedBlocks, bool rightUseBuildTable, SArray* rightRowLocations) {
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
blockDataDestroy(pBlock);
}
taosArrayDestroy(rightCreatedBlocks);
for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
blockDataDestroy(pBlock);
}
if (rightRowLocations != NULL) {
taosArrayDestroy(rightRowLocations);
}
if (rightUseBuildTable) {
void* p = NULL;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pJoinInfo->rightBuildTable, p, &iter)) != NULL) {
SArray* rows = (*(SArray**)p);
taosArrayDestroy(rows);
}
tSimpleHashClear(pJoinInfo->rightBuildTable);
}
taosArrayDestroy(leftCreatedBlocks);
taosArrayDestroy(leftRowLocations);
pJoinInfo->rowCtx.rowRemains = false;
pJoinInfo->rowCtx.leftRowLocations = NULL;
pJoinInfo->rowCtx.leftCreatedBlocks = NULL;
pJoinInfo->rowCtx.rightCreatedBlocks = NULL;
pJoinInfo->rowCtx.rightUseBuildTable = false;
pJoinInfo->rowCtx.rightRowLocations = NULL;
}
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
int32_t* nRows) {
int32_t code = TSDB_CODE_SUCCESS;
SJoinOperatorInfo* pJoinInfo = pOperator->info;
SArray* leftRowLocations = NULL;
SArray* leftCreatedBlocks = NULL;
SArray* rightRowLocations = NULL;
SArray* leftCreatedBlocks = NULL;
SArray* rightCreatedBlocks = NULL;
int32_t leftRowIdx = 0;
int32_t rightRowIdx = 0;
int32_t i, j;
SSHashObj* rightTableHash = NULL;
bool rightUseBuildTable = false;
if (pJoinInfo->rowCtx.rowRemains) {
leftRowLocations = pJoinInfo->rowCtx.leftRowLocations;
leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks;
rightUseBuildTable = pJoinInfo->rowCtx.rightUseBuildTable;
rightRowLocations = pJoinInfo->rowCtx.rightRowLocations;
rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks;
leftRowIdx = pJoinInfo->rowCtx.leftRowIdx;
@ -330,78 +567,40 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
if (pJoinInfo->pTagEqualConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) {
mergeJoinFillBuildTable(pJoinInfo, rightRowLocations);
rightUseBuildTable = true;
taosArrayDestroy(rightRowLocations);
rightRowLocations = NULL;
}
}
size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
uint32_t maxRowNum = *nRows + (leftNumJoin - leftRowIdx - 1) * rightNumJoin + rightNumJoin - rightRowIdx;
uint32_t limitRowNum = maxRowNum;
if (maxRowNum > pOperator->resultInfo.threshold) {
limitRowNum = pOperator->resultInfo.threshold;
if (!pJoinInfo->rowCtx.rowRemains) {
code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.threshold);
if (code != TSDB_CODE_SUCCESS) {
qError("%s can not ensure block capacity for join. left: %zu", GET_TASKID(pOperator->pTaskInfo),
leftNumJoin);
}
bool reachThreshold = false;
if (code == TSDB_CODE_SUCCESS) {
mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx,
rightRowIdx, rightUseBuildTable, rightRowLocations, &reachThreshold);
}
if (!reachThreshold) {
mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks,
rightUseBuildTable, rightRowLocations);
} else {
pJoinInfo->rowCtx.rowRemains = true;
pJoinInfo->rowCtx.ts = timestamp;
pJoinInfo->rowCtx.leftRowLocations = leftRowLocations;
pJoinInfo->rowCtx.rightRowLocations = rightRowLocations;
pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks;
pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks;
}
}
code = blockDataEnsureCapacity(pRes, limitRowNum);
if (code != TSDB_CODE_SUCCESS) {
qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo),
leftNumJoin, rightNumJoin);
}
if (code == TSDB_CODE_SUCCESS) {
bool done = false;
for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) {
for (j = rightRowIdx; j < rightNumJoin; ++j) {
if (*nRows >= limitRowNum) {
done = true;
break;
}
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
SRowLocation* rightRow = taosArrayGet(rightRowLocations, j);
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
rightRow->pos);
++*nRows;
}
if (done) {
break;
}
}
if (maxRowNum > pOperator->resultInfo.threshold) {
pJoinInfo->rowCtx.leftRowIdx = i;
pJoinInfo->rowCtx.rightRowIdx = j;
}
}
if (maxRowNum <= pOperator->resultInfo.threshold) {
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
blockDataDestroy(pBlock);
}
taosArrayDestroy(rightCreatedBlocks);
taosArrayDestroy(rightRowLocations);
for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
blockDataDestroy(pBlock);
}
taosArrayDestroy(leftCreatedBlocks);
taosArrayDestroy(leftRowLocations);
if (pJoinInfo->rowCtx.rowRemains) {
pJoinInfo->rowCtx.rowRemains = false;
pJoinInfo->rowCtx.leftRowLocations = NULL;
pJoinInfo->rowCtx.rightRowLocations = NULL;
pJoinInfo->rowCtx.leftCreatedBlocks = NULL;
pJoinInfo->rowCtx.rightCreatedBlocks = NULL;
}
pJoinInfo->rowCtx.rightUseBuildTable = rightUseBuildTable;
pJoinInfo->rowCtx.rightRowLocations = rightRowLocations;
}
return TSDB_CODE_SUCCESS;
}

View File

@ -401,6 +401,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
COPY_SCALAR_FIELD(joinType);
CLONE_NODE_FIELD(pMergeCondition);
CLONE_NODE_FIELD(pOnConditions);
CLONE_NODE_FIELD(pTagEqualConditions);
COPY_SCALAR_FIELD(isSingleTableJoin);
COPY_SCALAR_FIELD(inputTsOrder);
return TSDB_CODE_SUCCESS;

View File

@ -1416,6 +1416,7 @@ static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) {
static const char* jkJoinLogicPlanJoinType = "JoinType";
static const char* jkJoinLogicPlanOnConditions = "OnConditions";
static const char* jkJoinLogicPlanMergeCondition = "MergeConditions";
static const char* jkJoinLogicPlanTagEqualConditions = "TagEqualConditions";
static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) {
const SJoinLogicNode* pNode = (const SJoinLogicNode*)pObj;
@ -1430,7 +1431,9 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOnConditions);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinLogicPlanTagEqualConditions, nodeToJson, pNode->pTagEqualConditions);
}
return code;
}
@ -1447,7 +1450,9 @@ static int32_t jsonToLogicJoinNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOnConditions);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinLogicPlanTagEqualConditions, &pNode->pTagEqualConditions);
}
return code;
}
@ -1878,6 +1883,7 @@ static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder";
static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition";
static const char* jkJoinPhysiPlanOnConditions = "OnConditions";
static const char* jkJoinPhysiPlanTargets = "Targets";
static const char* jkJoinPhysiPlanTagEqualConditions = "TagEqualConditions";
static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj;
@ -1898,7 +1904,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanTagEqualConditions, nodeToJson, pNode->pTagEqualCondtions);
}
return code;
}
@ -1921,7 +1929,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanTagEqualConditions, &pNode->pTagEqualCondtions);
}
return code;
}

View File

@ -2317,7 +2317,8 @@ enum {
PHY_SORT_MERGE_JOIN_CODE_MERGE_CONDITION,
PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS,
PHY_SORT_MERGE_JOIN_CODE_TARGETS,
PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER
PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER,
PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS
};
static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -2339,7 +2340,9 @@ static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, pNode->inputTsOrder);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS, nodeToMsg, pNode->pTagEqualCondtions);
}
return code;
}
@ -2368,6 +2371,9 @@ static int32_t msgToPhysiJoinNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER:
code = tlvDecodeEnum(pTlv, &pNode->inputTsOrder, sizeof(pNode->inputTsOrder));
break;
case PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTagEqualCondtions);
break;
default:
break;
}

View File

@ -1072,6 +1072,7 @@ void nodesDestroyNode(SNode* pNode) {
destroyLogicNode((SLogicNode*)pLogicNode);
nodesDestroyNode(pLogicNode->pMergeCondition);
nodesDestroyNode(pLogicNode->pOnConditions);
nodesDestroyNode(pLogicNode->pTagEqualConditions);
break;
}
case QUERY_NODE_LOGIC_PLAN_AGG: {
@ -1204,6 +1205,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pPhyNode->pMergeCondition);
nodesDestroyNode(pPhyNode->pOnConditions);
nodesDestroyList(pPhyNode->pTargets);
nodesDestroyNode(pPhyNode->pTagEqualCondtions);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {

View File

@ -740,6 +740,88 @@ static int32_t pushDownCondOptJoinExtractMergeCond(SOptimizeContext* pCxt, SJoin
return code;
}
static bool pushDownCondOptIsTag(SNode* pNode, SNodeList* pTableCols) {
if (QUERY_NODE_COLUMN != nodeType(pNode)) {
return false;
}
SColumnNode* pCol = (SColumnNode*)pNode;
if (COLUMN_TYPE_TAG != pCol->colType) {
return false;
}
return pushDownCondOptBelongThisTable(pNode, pTableCols);
}
static bool pushDownCondOptIsTagEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
if (QUERY_NODE_OPERATOR != nodeType(pCond)) {
return false;
}
SOperatorNode* pOper = (SOperatorNode*)pCond;
if (OP_TYPE_EQUAL != pOper->opType) {
return false;
}
if (QUERY_NODE_COLUMN != nodeType(pOper->pLeft) || QUERY_NODE_COLUMN != nodeType(pOper->pRight)) {
return false;
}
SColumnNode* pLeft = (SColumnNode*)(pOper->pLeft);
SColumnNode* pRight = (SColumnNode*)(pOper->pRight);
//TODO: add cast to operator and remove this restriction of optimization
if (pLeft->node.resType.type != pRight->node.resType.type || pLeft->node.resType.bytes != pRight->node.resType.bytes) {
return false;
}
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
if (pushDownCondOptIsTag(pOper->pLeft, pLeftCols)) {
return pushDownCondOptIsTag(pOper->pRight, pRightCols);
} else if (pushDownCondOptIsTag(pOper->pLeft, pRightCols)) {
return pushDownCondOptIsTag(pOper->pRight, pLeftCols);
}
return false;
}
static int32_t pushDownCondOptJoinExtractTagEqualLogicCond(SJoinLogicNode* pJoin) {
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions);
int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pTagEqualConds = NULL;
SNode* pCond = NULL;
FOREACH(pCond, pLogicCond->pParameterList) {
if (pushDownCondOptIsTagEqualCond(pJoin, pCond)) {
code = nodesListMakeAppend(&pTagEqualConds, nodesCloneNode(pCond));
}
}
SNode* pTempTagEqCond = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = nodesMergeConds(&pTempTagEqCond, &pTagEqualConds);
}
if (TSDB_CODE_SUCCESS == code) {
pJoin->pTagEqualConditions = pTempTagEqCond;
return TSDB_CODE_SUCCESS;
} else {
nodesDestroyList(pTagEqualConds);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
return TSDB_CODE_SUCCESS;
}
static int32_t pushDownCondOptJoinExtractTagEqualCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL == pJoin->pOnConditions) {
pJoin->pTagEqualConditions = NULL;
return TSDB_CODE_SUCCESS;
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) &&
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) {
return pushDownCondOptJoinExtractTagEqualLogicCond(pJoin);
}
if (pushDownCondOptIsTagEqualCond(pJoin, pJoin->pOnConditions)) {
pJoin->pTagEqualConditions = nodesCloneNode(pJoin->pOnConditions);
}
return TSDB_CODE_SUCCESS;
}
static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
return TSDB_CODE_SUCCESS;
@ -774,6 +856,10 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
code = pushDownCondOptJoinExtractMergeCond(pCxt, pJoin);
}
if (TSDB_CODE_SUCCESS == code) {
code = pushDownCondOptJoinExtractTagEqualCond(pCxt, pJoin);
}
if (TSDB_CODE_SUCCESS == code) {
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
pCxt->optimized = true;

View File

@ -705,6 +705,9 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
pJoinLogicNode->pOnConditions, &pJoin->pOnConditions);
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pTagEqualConditions) {
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqualConditions, &pJoin->pTagEqualCondtions);
}
if (TSDB_CODE_SUCCESS == code) {
code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
}

View File

@ -6,8 +6,8 @@ sql connect
$dbPrefix = join_m_db
$tbPrefix = join_tb
$mtPrefix = join_mt
$tbNum = 3
$rowNum = 2000
$tbNum = 20
$rowNum = 200
$totalNum = $tbNum * $rowNum
print =============== join_manyBlocks.sim
@ -78,8 +78,8 @@ print ==============> td-3313
sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1;
print $row
if $row != 6000 then
print expect 6000, actual: $row
if $row != 4000 then
print expect 4000, actual: $row
return -1
endi