enh: optimize join performance
This commit is contained in:
parent
26a93847c3
commit
0fb672d8a6
|
@ -65,6 +65,7 @@ typedef struct SGcDownstreamCtx {
|
||||||
SHashObj* pSessions;
|
SHashObj* pSessions;
|
||||||
SHashObj* pWaitSessions;
|
SHashObj* pWaitSessions;
|
||||||
SGcFileCacheCtx fileCtx;
|
SGcFileCacheCtx fileCtx;
|
||||||
|
bool fetchDone;
|
||||||
} SGcDownstreamCtx;
|
} SGcDownstreamCtx;
|
||||||
|
|
||||||
typedef struct SGcVgroupCtx {
|
typedef struct SGcVgroupCtx {
|
||||||
|
|
|
@ -821,6 +821,7 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
|
||||||
|
|
||||||
taosArrayDestroy(pParam->pChildren);
|
taosArrayDestroy(pParam->pChildren);
|
||||||
pParam->pChildren = NULL;
|
pParam->pChildren = NULL;
|
||||||
|
pCtx->fetchDone = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -910,6 +911,7 @@ static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSes
|
||||||
while (NULL != (pGroup = taosHashIterate(pGrpHash, pGroup))) {
|
while (NULL != (pGroup = taosHashIterate(pGrpHash, pGroup))) {
|
||||||
handleGroupFetchDone(pGroup);
|
handleGroupFetchDone(pGroup);
|
||||||
}
|
}
|
||||||
|
pCtx->fetchDone = true;
|
||||||
} else {
|
} else {
|
||||||
int32_t uidNum = 0;
|
int32_t uidNum = 0;
|
||||||
SGcVgroupCtx* pVgCtx = NULL;
|
SGcVgroupCtx* pVgCtx = NULL;
|
||||||
|
@ -979,33 +981,41 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64
|
||||||
SGroupCacheOperatorInfo* pGCache = pOperator->info;
|
SGroupCacheOperatorInfo* pGCache = pOperator->info;
|
||||||
*got = true;
|
*got = true;
|
||||||
|
|
||||||
if (pSession->pGroupData->needCache) {
|
if (NULL != pSession->pGroupData) {
|
||||||
SGcBlkList* pBlkList = &pSession->pGroupData->blkList;
|
if (pSession->pGroupData->needCache) {
|
||||||
taosRLockLatch(&pBlkList->lock);
|
SGcBlkList* pBlkList = &pSession->pGroupData->blkList;
|
||||||
int64_t blkNum = taosArrayGetSize(pBlkList->pList);
|
taosRLockLatch(&pBlkList->lock);
|
||||||
if (pSession->lastBlkId < 0) {
|
int64_t blkNum = taosArrayGetSize(pBlkList->pList);
|
||||||
if (blkNum > 0) {
|
if (pSession->lastBlkId < 0) {
|
||||||
SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, 0);
|
if (blkNum > 0) {
|
||||||
|
SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, 0);
|
||||||
|
taosRUnLockLatch(&pBlkList->lock);
|
||||||
|
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes);
|
||||||
|
pSession->lastBlkId = 0;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
} else if ((pSession->lastBlkId + 1) < blkNum) {
|
||||||
|
SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, pSession->lastBlkId + 1);
|
||||||
taosRUnLockLatch(&pBlkList->lock);
|
taosRUnLockLatch(&pBlkList->lock);
|
||||||
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes);
|
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes);
|
||||||
pSession->lastBlkId = 0;
|
pSession->lastBlkId++;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
} else if ((pSession->lastBlkId + 1) < blkNum) {
|
|
||||||
SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, pSession->lastBlkId + 1);
|
|
||||||
taosRUnLockLatch(&pBlkList->lock);
|
taosRUnLockLatch(&pBlkList->lock);
|
||||||
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes);
|
} else if (pSession->pGroupData->pBlock) {
|
||||||
pSession->lastBlkId++;
|
*ppRes = pSession->pGroupData->pBlock;
|
||||||
|
pSession->pGroupData->pBlock = NULL;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {
|
||||||
|
*ppRes = NULL;
|
||||||
|
qDebug("sessionId: %" PRIu64 " fetch done", sessionId);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
taosRUnLockLatch(&pBlkList->lock);
|
} else {
|
||||||
} else if (pSession->pGroupData->pBlock) {
|
|
||||||
*ppRes = pSession->pGroupData->pBlock;
|
|
||||||
pSession->pGroupData->pBlock = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {
|
|
||||||
*ppRes = NULL;
|
*ppRes = NULL;
|
||||||
|
qDebug("sessionId: %" PRIu64 " fetch done since downstream fetch done", sessionId);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1136,13 +1146,17 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP
|
||||||
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
|
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
|
||||||
|
|
||||||
SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid));
|
SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid));
|
||||||
if (NULL == pGroup) {
|
if (NULL == pGroup && NULL != pParam->pChildren && !pCtx->fetchDone) {
|
||||||
code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? GROUP_CACHE_DEFAULT_VGID : pGcParam->vgId, pGcParam->tbUid);
|
code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? GROUP_CACHE_DEFAULT_VGID : pGcParam->vgId, pGcParam->tbUid);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (NULL == pGroup) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
initGroupCacheSessionCtx(&ctx, pGcParam, pGroup);
|
initGroupCacheSessionCtx(&ctx, pGcParam, pGroup);
|
||||||
|
|
||||||
code = taosHashPut(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx));
|
code = taosHashPut(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx));
|
||||||
|
@ -1166,6 +1180,10 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
if (NULL == pSession) {
|
||||||
|
qDebug("session %" PRId64 " in downstream %d total got 0 rows since downtream fetch done", pGcParam->sessionId, pCtx->id);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
} else if (pSession->pGroupData->needCache) {
|
} else if (pSession->pGroupData->needCache) {
|
||||||
SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
|
SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
|
||||||
if (ppBlock) {
|
if (ppBlock) {
|
||||||
|
|
|
@ -66,7 +66,7 @@ enum {
|
||||||
};
|
};
|
||||||
|
|
||||||
#define COL_DISPLAY_WIDTH 18
|
#define COL_DISPLAY_WIDTH 18
|
||||||
#define JT_MAX_LOOP 1000000
|
#define JT_MAX_LOOP 100000
|
||||||
|
|
||||||
#define LEFT_BLK_ID 0
|
#define LEFT_BLK_ID 0
|
||||||
#define RIGHT_BLK_ID 1
|
#define RIGHT_BLK_ID 1
|
||||||
|
@ -206,7 +206,7 @@ typedef struct {
|
||||||
|
|
||||||
|
|
||||||
SJoinTestCtx jtCtx = {0};
|
SJoinTestCtx jtCtx = {0};
|
||||||
SJoinTestCtrl jtCtrl = {1, 1, 1, 0, 0};
|
SJoinTestCtrl jtCtrl = {0, 0, 0, 0, 0};
|
||||||
SJoinTestStat jtStat = {0};
|
SJoinTestStat jtStat = {0};
|
||||||
SJoinTestResInfo jtRes = {0};
|
SJoinTestResInfo jtRes = {0};
|
||||||
|
|
||||||
|
@ -2862,7 +2862,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) {
|
||||||
bool contLoop = true;
|
bool contLoop = true;
|
||||||
|
|
||||||
SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param);
|
SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param);
|
||||||
createDummyBlkList(20, 20, 20, 20, 3);
|
createDummyBlkList(10000, 10000, 10000, 10000, 4096);
|
||||||
|
|
||||||
while (contLoop) {
|
while (contLoop) {
|
||||||
rerunBlockedHere();
|
rerunBlockedHere();
|
||||||
|
@ -2895,7 +2895,7 @@ void handleCaseEnd() {
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
#if 0
|
#if 1
|
||||||
#if 1
|
#if 1
|
||||||
TEST(innerJoin, noCondTest) {
|
TEST(innerJoin, noCondTest) {
|
||||||
SJoinTestParam param;
|
SJoinTestParam param;
|
||||||
|
@ -3006,7 +3006,7 @@ TEST(innerJoin, fullCondTest) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
#if 0
|
#if 1
|
||||||
#if 1
|
#if 1
|
||||||
TEST(leftOuterJoin, noCondTest) {
|
TEST(leftOuterJoin, noCondTest) {
|
||||||
SJoinTestParam param;
|
SJoinTestParam param;
|
||||||
|
@ -3117,7 +3117,7 @@ TEST(leftOuterJoin, fullCondTest) {
|
||||||
#endif
|
#endif
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#if 0
|
#if 1
|
||||||
#if 1
|
#if 1
|
||||||
TEST(fullOuterJoin, noCondTest) {
|
TEST(fullOuterJoin, noCondTest) {
|
||||||
SJoinTestParam param;
|
SJoinTestParam param;
|
||||||
|
@ -3229,7 +3229,7 @@ TEST(fullOuterJoin, fullCondTest) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
#if 0
|
#if 1
|
||||||
TEST(leftSemiJoin, noCondTest) {
|
TEST(leftSemiJoin, noCondTest) {
|
||||||
SJoinTestParam param;
|
SJoinTestParam param;
|
||||||
char* caseName = "leftSemiJoin:noCondTest";
|
char* caseName = "leftSemiJoin:noCondTest";
|
||||||
|
@ -3339,7 +3339,7 @@ TEST(leftSemiJoin, fullCondTest) {
|
||||||
#endif
|
#endif
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#if 0
|
#if 1
|
||||||
#if 1
|
#if 1
|
||||||
TEST(leftAntiJoin, noCondTest) {
|
TEST(leftAntiJoin, noCondTest) {
|
||||||
SJoinTestParam param;
|
SJoinTestParam param;
|
||||||
|
@ -3450,7 +3450,7 @@ TEST(leftAntiJoin, fullCondTest) {
|
||||||
#endif
|
#endif
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#if 0
|
#if 1
|
||||||
#if 1
|
#if 1
|
||||||
TEST(leftAsofJoin, noCondGreaterThanTest) {
|
TEST(leftAsofJoin, noCondGreaterThanTest) {
|
||||||
SJoinTestParam param;
|
SJoinTestParam param;
|
||||||
|
@ -3610,7 +3610,7 @@ TEST(leftAsofJoin, noCondLowerEqTest) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
#if 0
|
#if 1
|
||||||
#if 1
|
#if 1
|
||||||
TEST(leftWinJoin, noCondProjectionTest) {
|
TEST(leftWinJoin, noCondProjectionTest) {
|
||||||
SJoinTestParam param;
|
SJoinTestParam param;
|
||||||
|
|
|
@ -1036,6 +1036,11 @@ static void setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SColum
|
||||||
strcpy(pCol->tableAlias, pTable->table.tableAlias);
|
strcpy(pCol->tableAlias, pTable->table.tableAlias);
|
||||||
pCol->isPrimTs = isPrimaryKeyImpl((SNode*)pExpr);
|
pCol->isPrimTs = isPrimaryKeyImpl((SNode*)pExpr);
|
||||||
pCol->colId = pCol->isPrimTs ? PRIMARYKEY_TIMESTAMP_COL_ID : 0;
|
pCol->colId = pCol->isPrimTs ? PRIMARYKEY_TIMESTAMP_COL_ID : 0;
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
|
||||||
|
pCol->colType = ((SColumnNode*)pExpr)->colType;
|
||||||
|
strcpy(pCol->dbName, ((SColumnNode*)pExpr)->dbName);
|
||||||
|
strcpy(pCol->tableName, ((SColumnNode*)pExpr)->tableName);
|
||||||
|
}
|
||||||
strcpy(pCol->colName, pExpr->aliasName);
|
strcpy(pCol->colName, pExpr->aliasName);
|
||||||
if ('\0' == pCol->node.aliasName[0]) {
|
if ('\0' == pCol->node.aliasName[0]) {
|
||||||
strcpy(pCol->node.aliasName, pCol->colName);
|
strcpy(pCol->node.aliasName, pCol->colName);
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
|
|
||||||
#define OPTIMIZE_FLAG_SCAN_PATH OPTIMIZE_FLAG_MASK(0)
|
#define OPTIMIZE_FLAG_SCAN_PATH OPTIMIZE_FLAG_MASK(0)
|
||||||
#define OPTIMIZE_FLAG_PUSH_DOWN_CONDE OPTIMIZE_FLAG_MASK(1)
|
#define OPTIMIZE_FLAG_PUSH_DOWN_CONDE OPTIMIZE_FLAG_MASK(1)
|
||||||
|
#define OPTIMIZE_FLAG_STB_JOIN OPTIMIZE_FLAG_MASK(2)
|
||||||
|
|
||||||
#define OPTIMIZE_FLAG_SET_MASK(val, mask) (val) |= (mask)
|
#define OPTIMIZE_FLAG_SET_MASK(val, mask) (val) |= (mask)
|
||||||
#define OPTIMIZE_FLAG_CLEAR_MASK(val, mask) (val) &= (~(mask))
|
#define OPTIMIZE_FLAG_CLEAR_MASK(val, mask) (val) &= (~(mask))
|
||||||
|
@ -108,8 +109,8 @@ static SJoinOptimizeOpt gJoinWhereOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE]
|
||||||
static SJoinOptimizeOpt gJoinOnOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = {
|
static SJoinOptimizeOpt gJoinOnOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = {
|
||||||
/* NONE OUTER SEMI ANTI ASOF WINDOW */
|
/* NONE OUTER SEMI ANTI ASOF WINDOW */
|
||||||
/*INNER*/ {{PUSH_DONW_FLT_COND}, {0}, {0}, {0}, {0}, {0}},
|
/*INNER*/ {{PUSH_DONW_FLT_COND}, {0}, {0}, {0}, {0}, {0}},
|
||||||
/*LEFT*/ {{0}, {0}, {PUSH_DONW_FLT_COND}, {0}, {0}, {0}},
|
/*LEFT*/ {{0}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DONW_FLT_COND}, {PUSH_DOWN_RIGHT_FLT}, {0}, {0}},
|
||||||
/*RIGHT*/ {{0}, {0}, {PUSH_DONW_FLT_COND}, {0}, {0}, {0}},
|
/*RIGHT*/ {{0}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DONW_FLT_COND}, {PUSH_DOWN_LEFT_FLT}, {0}, {0}},
|
||||||
/*FULL*/ {{0}, {0}, {0}, {0}, {0}, {0}},
|
/*FULL*/ {{0}, {0}, {0}, {0}, {0}, {0}},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -544,6 +545,14 @@ static int32_t pdcDealScan(SOptimizeContext* pCxt, SScanLogicNode* pScan) {
|
||||||
static bool pdcColBelongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
|
static bool pdcColBelongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
|
||||||
SNode* pTableCol = NULL;
|
SNode* pTableCol = NULL;
|
||||||
FOREACH(pTableCol, pTableCols) {
|
FOREACH(pTableCol, pTableCols) {
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType(pCondCol) && QUERY_NODE_COLUMN == nodeType(pTableCol)) {
|
||||||
|
SColumnNode* pCondColNode = (SColumnNode*)pCondCol;
|
||||||
|
SColumnNode* pTblColNode = (SColumnNode*)pTableCol;
|
||||||
|
if (0 == strcmp(pCondColNode->tableAlias, pTblColNode->tableAlias) && 0 == strcmp(pCondColNode->colName, pTblColNode->colName)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (nodesEqualNode(pCondCol, pTableCol)) {
|
if (nodesEqualNode(pCondCol, pTableCol)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -1427,9 +1436,15 @@ static int32_t pdcRewriteTypeBasedOnConds(SOptimizeContext* pCxt, SJoinLogicNode
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case JOIN_TYPE_FULL:
|
case JOIN_TYPE_FULL:
|
||||||
if (tableCondTypes[0] && !tableCondTypes[1] && tableCondTypes[2] && !tableCondTypes[3]) {
|
if (tableCondTypes[0] && !tableCondTypes[1]) {
|
||||||
pJoin->joinType = JOIN_TYPE_INNER;
|
if (tableCondTypes[2] && !tableCondTypes[3]) {
|
||||||
pJoin->subType = JOIN_STYPE_NONE;
|
pJoin->joinType = JOIN_TYPE_INNER;
|
||||||
|
pJoin->subType = JOIN_STYPE_NONE;
|
||||||
|
} else {
|
||||||
|
pJoin->joinType = JOIN_TYPE_LEFT;
|
||||||
|
}
|
||||||
|
} else if (tableCondTypes[2] && !tableCondTypes[3]) {
|
||||||
|
pJoin->joinType = JOIN_TYPE_RIGHT;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -1454,16 +1469,18 @@ static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||||
SNode* pRightChildCond = NULL;
|
SNode* pRightChildCond = NULL;
|
||||||
int32_t code = pdcJoinCheckAllCond(pCxt, pJoin);
|
int32_t code = pdcJoinCheckAllCond(pCxt, pJoin);
|
||||||
while (true) {
|
while (true) {
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->node.pConditions && 0 != gJoinWhereOpt[t][s].pushDownFlag) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->node.pConditions) {
|
||||||
code = pdcJoinSplitCond(pJoin, &pJoin->node.pConditions, &pOnCond, &pLeftChildCond, &pRightChildCond, true);
|
if (0 != gJoinWhereOpt[t][s].pushDownFlag) {
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) {
|
code = pdcJoinSplitCond(pJoin, &pJoin->node.pConditions, &pOnCond, &pLeftChildCond, &pRightChildCond, true);
|
||||||
code = pdcJoinPushDownOnCond(pCxt, pJoin, &pOnCond);
|
if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) {
|
||||||
}
|
code = pdcJoinPushDownOnCond(pCxt, pJoin, &pOnCond);
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) {
|
}
|
||||||
code = pdcPushDownCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond);
|
if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) {
|
||||||
}
|
code = pdcPushDownCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond);
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pRightChildCond) {
|
}
|
||||||
code = pdcPushDownCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond);
|
if (TSDB_CODE_SUCCESS == code && NULL != pRightChildCond) {
|
||||||
|
code = pdcPushDownCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->node.pConditions) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->node.pConditions) {
|
||||||
code = pdcRewriteTypeBasedOnConds(pCxt, pJoin);
|
code = pdcRewriteTypeBasedOnConds(pCxt, pJoin);
|
||||||
|
@ -1478,7 +1495,7 @@ static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||||
s = pJoin->subType;
|
s = pJoin->subType;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->pFullOnCond) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->pFullOnCond && 0 != gJoinOnOpt[t][s].pushDownFlag) {
|
||||||
code = pdcJoinSplitCond(pJoin, &pJoin->pFullOnCond, NULL, &pLeftChildCond, &pRightChildCond, false);
|
code = pdcJoinSplitCond(pJoin, &pJoin->pFullOnCond, NULL, &pLeftChildCond, &pRightChildCond, false);
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) {
|
||||||
code = pdcPushDownCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond);
|
code = pdcPushDownCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond);
|
||||||
|
@ -2207,6 +2224,69 @@ static int32_t sortForJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
|
||||||
return sortForJoinOptimizeImpl(pCxt, pLogicSubplan, pJoin);
|
return sortForJoinOptimizeImpl(pCxt, pLogicSubplan, pJoin);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool joinCondMayBeOptimized(SLogicNode* pNode) {
|
||||||
|
if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
|
||||||
|
if (pNode->pChildren->length != 2 || JOIN_ALGO_HASH == pJoin->joinAlgo || JOIN_TYPE_FULL == pJoin->joinType) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SLogicNode* pLeft = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0);
|
||||||
|
SLogicNode* pRight = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1);
|
||||||
|
|
||||||
|
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pLeft) || QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pRight)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SScanLogicNode* pLScan = (SScanLogicNode*)pLeft;
|
||||||
|
SScanLogicNode* pRScan = (SScanLogicNode*)pRight;
|
||||||
|
|
||||||
|
if (!IS_TSWINDOW_SPECIFIED(pLScan->scanRange) && !IS_TSWINDOW_SPECIFIED(pLScan->scanRange)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void joinCondMergeScanRand(STimeWindow* pDst, STimeWindow* pSrc) {
|
||||||
|
if (pSrc->skey > pDst->skey) {
|
||||||
|
pDst->skey = pSrc->skey;
|
||||||
|
}
|
||||||
|
if (pSrc->ekey < pDst->ekey) {
|
||||||
|
pDst->ekey = pSrc->ekey;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t joinCondOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||||
|
SJoinLogicNode* pJoin = (SJoinLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, joinCondMayBeOptimized);
|
||||||
|
if (NULL == pJoin) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SScanLogicNode* pLScan = (SScanLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0);
|
||||||
|
SScanLogicNode* pRScan = (SScanLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1);
|
||||||
|
|
||||||
|
switch (pJoin->joinType) {
|
||||||
|
case JOIN_TYPE_INNER:
|
||||||
|
joinCondMergeScanRand(&pLScan->scanRange, &pRScan->scanRange);
|
||||||
|
pRScan->scanRange.skey = pLScan->scanRange.skey;
|
||||||
|
pRScan->scanRange.ekey = pLScan->scanRange.ekey;
|
||||||
|
break;
|
||||||
|
case JOIN_TYPE_LEFT:
|
||||||
|
joinCondMergeScanRand(&pRScan->scanRange, &pLScan->scanRange);
|
||||||
|
break;
|
||||||
|
case JOIN_TYPE_RIGHT:
|
||||||
|
joinCondMergeScanRand(&pLScan->scanRange, &pRScan->scanRange);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static bool smaIndexOptMayBeOptimized(SLogicNode* pNode) {
|
static bool smaIndexOptMayBeOptimized(SLogicNode* pNode) {
|
||||||
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode) || NULL == pNode->pParent ||
|
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode) || NULL == pNode->pParent ||
|
||||||
|
@ -2666,15 +2746,19 @@ static bool eliminateProjOptCheckProjColumnNames(SProjectLogicNode* pProjectNode
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool eliminateProjOptMayBeOptimized(SLogicNode* pNode) {
|
static bool eliminateProjOptMayBeOptimized(SLogicNode* pNode) {
|
||||||
// TODO: enable this optimization after new mechanising that map projection and targets of project node
|
// Super table scan requires project operator to merge packets to improve performance.
|
||||||
if (NULL != pNode->pParent) {
|
if (NULL == pNode->pParent && (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
|
||||||
|
(QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0)) &&
|
||||||
|
TSDB_SUPER_TABLE == ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->tableType))) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Super table scan requires project operator to merge packets to improve performance.
|
if (NULL != pNode->pParent && (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
|
||||||
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
|
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0)))) {
|
||||||
(QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0)) &&
|
return false;
|
||||||
TSDB_SUPER_TABLE == ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->tableType)) {
|
}
|
||||||
|
|
||||||
|
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2729,6 +2813,7 @@ static bool eliminateProjOptCanChildConditionUseChildTargets(SLogicNode* pChild,
|
||||||
nodesWalkExpr(pChild->pConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt);
|
nodesWalkExpr(pChild->pConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt);
|
||||||
if (!cxt.canUse) return false;
|
if (!cxt.canUse) return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild) && ((SJoinLogicNode*)pChild)->joinAlgo != JOIN_ALGO_UNKNOWN) {
|
if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild) && ((SJoinLogicNode*)pChild)->joinAlgo != JOIN_ALGO_UNKNOWN) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -2763,29 +2848,46 @@ static void alignProjectionWithTarget(SLogicNode* pNode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static EDealRes eliminateProjOptRewriteScanTableAlias(SNode* pNode, void* pContext) {
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
|
strcpy(pCol->tableAlias, (char*)pContext);
|
||||||
|
}
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan,
|
static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan,
|
||||||
SProjectLogicNode* pProjectNode) {
|
SProjectLogicNode* pProjectNode) {
|
||||||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProjectNode->node.pChildren, 0);
|
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProjectNode->node.pChildren, 0);
|
||||||
SNodeList* pNewChildTargets = nodesMakeList();
|
SNodeList* pNewChildTargets = nodesMakeList();
|
||||||
|
|
||||||
SNode* pProjection = NULL;
|
if (NULL == pProjectNode->node.pParent) {
|
||||||
FOREACH(pProjection, pProjectNode->pProjections) {
|
SNode* pProjection = NULL;
|
||||||
SNode* pChildTarget = NULL;
|
FOREACH(pProjection, pProjectNode->pProjections) {
|
||||||
FOREACH(pChildTarget, pChild->pTargets) {
|
SNode* pChildTarget = NULL;
|
||||||
if (0 == strcmp(((SColumnNode*)pProjection)->colName, ((SColumnNode*)pChildTarget)->colName)) {
|
FOREACH(pChildTarget, pChild->pTargets) {
|
||||||
nodesListAppend(pNewChildTargets, nodesCloneNode(pChildTarget));
|
if (0 == strcmp(((SColumnNode*)pProjection)->colName, ((SColumnNode*)pChildTarget)->colName)) {
|
||||||
break;
|
nodesListAppend(pNewChildTargets, nodesCloneNode(pChildTarget));
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if (eliminateProjOptCanChildConditionUseChildTargets(pChild, pNewChildTargets)) {
|
|
||||||
nodesDestroyList(pChild->pTargets);
|
|
||||||
pChild->pTargets = pNewChildTargets;
|
|
||||||
} else {
|
|
||||||
nodesDestroyList(pNewChildTargets);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if (eliminateProjOptCanChildConditionUseChildTargets(pChild, pNewChildTargets)) {
|
||||||
|
nodesDestroyList(pChild->pTargets);
|
||||||
|
pChild->pTargets = pNewChildTargets;
|
||||||
|
} else {
|
||||||
|
nodesDestroyList(pNewChildTargets);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
SScanLogicNode* pScan = (SScanLogicNode*)pChild;
|
||||||
|
nodesWalkExprs(pScan->pScanCols, eliminateProjOptRewriteScanTableAlias, pProjectNode->stmtName);
|
||||||
|
nodesWalkExprs(pScan->pScanPseudoCols, eliminateProjOptRewriteScanTableAlias, pProjectNode->stmtName);
|
||||||
|
nodesWalkExpr(pScan->node.pConditions, eliminateProjOptRewriteScanTableAlias, pProjectNode->stmtName);
|
||||||
|
nodesWalkExprs(pChild->pTargets, eliminateProjOptRewriteScanTableAlias, pProjectNode->stmtName);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild);
|
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild);
|
||||||
if (pProjectNode->node.pHint && !pChild->pHint) TSWAP(pProjectNode->node.pHint, pChild->pHint);
|
if (pProjectNode->node.pHint && !pChild->pHint) TSWAP(pProjectNode->node.pHint, pChild->pHint);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -4427,16 +4529,23 @@ static int32_t sortNonPriKeyOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) {
|
static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) {
|
||||||
if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) {
|
if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode) || OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_STB_JOIN)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
|
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
|
||||||
|
if (pJoin->joinAlgo == JOIN_ALGO_UNKNOWN) {
|
||||||
|
pJoin->joinAlgo = JOIN_ALGO_MERGE;
|
||||||
|
}
|
||||||
|
|
||||||
if (JOIN_STYPE_NONE != pJoin->subType || pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2
|
if (JOIN_STYPE_NONE != pJoin->subType || pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2
|
||||||
|| pJoin->hasSubQuery || pJoin->joinAlgo != JOIN_ALGO_UNKNOWN || pJoin->isLowLevelJoin) {
|
|| pJoin->isLowLevelJoin) {
|
||||||
if (pJoin->joinAlgo == JOIN_ALGO_UNKNOWN) {
|
return false;
|
||||||
pJoin->joinAlgo = JOIN_ALGO_MERGE;
|
}
|
||||||
}
|
|
||||||
|
SNode* pLeft = nodesListGetNode(pJoin->node.pChildren, 0);
|
||||||
|
SNode* pRight = nodesListGetNode(pJoin->node.pChildren, 1);
|
||||||
|
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pLeft) || QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pRight)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4582,7 +4691,8 @@ static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pCh
|
||||||
nodesDestroyList(pCols);
|
nodesDestroyList(pCols);
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
*ppLogic = (SLogicNode*)pJoin;
|
*ppLogic = (SLogicNode*)pJoin;
|
||||||
|
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_STB_JOIN);
|
||||||
} else {
|
} else {
|
||||||
nodesDestroyNode((SNode*)pJoin);
|
nodesDestroyNode((SNode*)pJoin);
|
||||||
}
|
}
|
||||||
|
@ -4729,6 +4839,7 @@ static int32_t stbJoinOptCreateMergeJoinNode(SLogicNode* pOrig, SLogicNode* pChi
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pChild->pParent = (SLogicNode*)pJoin;
|
pChild->pParent = (SLogicNode*)pJoin;
|
||||||
*ppLogic = (SLogicNode*)pJoin;
|
*ppLogic = (SLogicNode*)pJoin;
|
||||||
|
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_STB_JOIN);
|
||||||
} else {
|
} else {
|
||||||
nodesDestroyNode((SNode*)pJoin);
|
nodesDestroyNode((SNode*)pJoin);
|
||||||
}
|
}
|
||||||
|
@ -5089,6 +5200,7 @@ static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
||||||
static const SOptimizeRule optimizeRuleSet[] = {
|
static const SOptimizeRule optimizeRuleSet[] = {
|
||||||
{.pName = "ScanPath", .optimizeFunc = scanPathOptimize},
|
{.pName = "ScanPath", .optimizeFunc = scanPathOptimize},
|
||||||
{.pName = "PushDownCondition", .optimizeFunc = pdcOptimize},
|
{.pName = "PushDownCondition", .optimizeFunc = pdcOptimize},
|
||||||
|
{.pName = "JoinCondOptimize", .optimizeFunc = joinCondOptimize},
|
||||||
{.pName = "StableJoin", .optimizeFunc = stableJoinOptimize},
|
{.pName = "StableJoin", .optimizeFunc = stableJoinOptimize},
|
||||||
{.pName = "GroupJoin", .optimizeFunc = groupJoinOptimize},
|
{.pName = "GroupJoin", .optimizeFunc = groupJoinOptimize},
|
||||||
{.pName = "sortNonPriKeyOptimize", .optimizeFunc = sortNonPriKeyOptimize},
|
{.pName = "sortNonPriKeyOptimize", .optimizeFunc = sortNonPriKeyOptimize},
|
||||||
|
|
Loading…
Reference in New Issue