fix: join condition push down issue

This commit is contained in:
dapan1121 2023-08-16 14:28:39 +08:00
parent 8fc0f1bbd6
commit 91d6734b34
19 changed files with 3047 additions and 2838 deletions

View File

@ -276,87 +276,85 @@
#define TK_JOIN 258
#define TK_INNER 259
#define TK_SELECT 260
#define TK_NO_BATCH_SCAN 261
#define TK_BATCH_SCAN 262
#define TK_NK_HINT_BEGIN 263
#define TK_NK_HINT_END 264
#define TK_DISTINCT 265
#define TK_WHERE 266
#define TK_PARTITION 267
#define TK_BY 268
#define TK_SESSION 269
#define TK_STATE_WINDOW 270
#define TK_EVENT_WINDOW 271
#define TK_SLIDING 272
#define TK_FILL 273
#define TK_VALUE 274
#define TK_VALUE_F 275
#define TK_NONE 276
#define TK_PREV 277
#define TK_NULL_F 278
#define TK_LINEAR 279
#define TK_NEXT 280
#define TK_HAVING 281
#define TK_RANGE 282
#define TK_EVERY 283
#define TK_ORDER 284
#define TK_SLIMIT 285
#define TK_SOFFSET 286
#define TK_LIMIT 287
#define TK_OFFSET 288
#define TK_ASC 289
#define TK_NULLS 290
#define TK_ABORT 291
#define TK_AFTER 292
#define TK_ATTACH 293
#define TK_BEFORE 294
#define TK_BEGIN 295
#define TK_BITAND 296
#define TK_BITNOT 297
#define TK_BITOR 298
#define TK_BLOCKS 299
#define TK_CHANGE 300
#define TK_COMMA 301
#define TK_CONCAT 302
#define TK_CONFLICT 303
#define TK_COPY 304
#define TK_DEFERRED 305
#define TK_DELIMITERS 306
#define TK_DETACH 307
#define TK_DIVIDE 308
#define TK_DOT 309
#define TK_EACH 310
#define TK_FAIL 311
#define TK_FILE 312
#define TK_FOR 313
#define TK_GLOB 314
#define TK_ID 315
#define TK_IMMEDIATE 316
#define TK_IMPORT 317
#define TK_INITIALLY 318
#define TK_INSTEAD 319
#define TK_ISNULL 320
#define TK_KEY 321
#define TK_MODULES 322
#define TK_NK_BITNOT 323
#define TK_NK_SEMI 324
#define TK_NOTNULL 325
#define TK_OF 326
#define TK_PLUS 327
#define TK_PRIVILEGE 328
#define TK_RAISE 329
#define TK_RESTRICT 330
#define TK_ROW 331
#define TK_SEMI 332
#define TK_STAR 333
#define TK_STATEMENT 334
#define TK_STRICT 335
#define TK_STRING 336
#define TK_TIMES 337
#define TK_VALUES 338
#define TK_VARIABLE 339
#define TK_VIEW 340
#define TK_WAL 341
#define TK_NK_HINT 261
#define TK_DISTINCT 262
#define TK_WHERE 263
#define TK_PARTITION 264
#define TK_BY 265
#define TK_SESSION 266
#define TK_STATE_WINDOW 267
#define TK_EVENT_WINDOW 268
#define TK_SLIDING 269
#define TK_FILL 270
#define TK_VALUE 271
#define TK_VALUE_F 272
#define TK_NONE 273
#define TK_PREV 274
#define TK_NULL_F 275
#define TK_LINEAR 276
#define TK_NEXT 277
#define TK_HAVING 278
#define TK_RANGE 279
#define TK_EVERY 280
#define TK_ORDER 281
#define TK_SLIMIT 282
#define TK_SOFFSET 283
#define TK_LIMIT 284
#define TK_OFFSET 285
#define TK_ASC 286
#define TK_NULLS 287
#define TK_ABORT 288
#define TK_AFTER 289
#define TK_ATTACH 290
#define TK_BEFORE 291
#define TK_BEGIN 292
#define TK_BITAND 293
#define TK_BITNOT 294
#define TK_BITOR 295
#define TK_BLOCKS 296
#define TK_CHANGE 297
#define TK_COMMA 298
#define TK_CONCAT 299
#define TK_CONFLICT 300
#define TK_COPY 301
#define TK_DEFERRED 302
#define TK_DELIMITERS 303
#define TK_DETACH 304
#define TK_DIVIDE 305
#define TK_DOT 306
#define TK_EACH 307
#define TK_FAIL 308
#define TK_FILE 309
#define TK_FOR 310
#define TK_GLOB 311
#define TK_ID 312
#define TK_IMMEDIATE 313
#define TK_IMPORT 314
#define TK_INITIALLY 315
#define TK_INSTEAD 316
#define TK_ISNULL 317
#define TK_KEY 318
#define TK_MODULES 319
#define TK_NK_BITNOT 320
#define TK_NK_SEMI 321
#define TK_NOTNULL 322
#define TK_OF 323
#define TK_PLUS 324
#define TK_PRIVILEGE 325
#define TK_RAISE 326
#define TK_RESTRICT 327
#define TK_ROW 328
#define TK_SEMI 329
#define TK_STAR 330
#define TK_STATEMENT 331
#define TK_STRICT 332
#define TK_STRING 333
#define TK_TIMES 334
#define TK_VALUES 335
#define TK_VARIABLE 336
#define TK_VIEW 337
#define TK_WAL 338
#define TK_NK_SPACE 600
@ -365,6 +363,9 @@
#define TK_NK_HEX 603 // hex number 0x123
#define TK_NK_OCT 604 // oct number
#define TK_NK_BIN 605 // bin format data 0b111
#define TK_BATCH_SCAN 606
#define TK_NO_BATCH_SCAN 607
#define TK_NK_NIL 65535

View File

@ -43,10 +43,12 @@ typedef enum EGroupAction {
typedef struct SLogicNode {
ENodeType type;
bool dynamicOp;
bool stmtRoot;
SNodeList* pTargets; // SColumnNode
SNode* pConditions;
SNodeList* pChildren;
struct SLogicNode* pParent;
SNodeList* pHint;
int32_t optimizedFlag;
uint8_t precision;
SNode* pLimit;

View File

@ -124,7 +124,6 @@ typedef enum EHintOption {
typedef struct SHintNode {
ENodeType type;
EHintOption option;
char* literal;
void* value;
} SHintNode;

View File

@ -162,7 +162,7 @@ typedef struct SLimitInfo {
} SLimitInfo;
typedef struct SSortMergeJoinOperatorParam {
int32_t initDownstreamNum;
bool initDownstream;
} SSortMergeJoinOperatorParam;
typedef struct SExchangeOperatorBasicParam {

View File

@ -306,7 +306,7 @@ static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initPara
return TSDB_CODE_OUT_OF_MEMORY;
}
pJoin->initDownstreamNum = initParam ? 2 : 0;
pJoin->initDownstream = initParam;
(*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
(*ppRes)->value = pJoin;

View File

@ -143,11 +143,32 @@ static void destroyGroupCacheDownstreamCtx(SGroupCacheOperatorInfo* pGrpCacheOpe
taosMemoryFree(pGrpCacheOperator->pDownstreams);
}
void blockDataDeepCleanup(SSDataBlock* pDataBlock) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
taosMemoryFreeClear(p->pData);
if (IS_VAR_DATA_TYPE(p->info.type)) {
taosMemoryFreeClear(p->varmeta.offset);
p->varmeta.length = 0;
p->varmeta.allocLen = 0;
} else {
taosMemoryFreeClear(p->nullbitmap);
}
}
pDataBlock->info.capacity = 0;
pDataBlock->info.rows = 0;
}
static void destroySGcBlkCacheInfo(SGcBlkCacheInfo* pBlkCache) {
taosHashCleanup(pBlkCache->pDirtyBlk);
void* p = NULL;
while (p = taosHashIterate(pBlkCache->pReadBlk, p)) {
blockDataDeepCleanup(*(SSDataBlock**)p);
freeGcBlockInList(p);
}
@ -432,25 +453,6 @@ void blockDataDeepClear(SSDataBlock* pDataBlock) {
pDataBlock->info.rows = 0;
}
void blockDataDeepCleanup(SSDataBlock* pDataBlock) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
taosMemoryFreeClear(p->pData);
if (IS_VAR_DATA_TYPE(p->info.type)) {
taosMemoryFreeClear(p->varmeta.offset);
p->varmeta.length = 0;
p->varmeta.allocLen = 0;
} else {
taosMemoryFreeClear(p->nullbitmap);
}
}
pDataBlock->info.capacity = 0;
pDataBlock->info.rows = 0;
}
static int32_t buildGroupCacheBaseBlock(SSDataBlock** ppDst, SSDataBlock* pSrc) {
*ppDst = taosMemoryMalloc(sizeof(*pSrc));
if (NULL == *ppDst) {
@ -708,7 +710,7 @@ static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int
SGroupCacheFileInfo newFile = {0};
newFile.groupNum = 1;
taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile));
pTmp = &newFile;
pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
} else {
pTmp->groupNum++;
}

View File

@ -43,6 +43,7 @@ typedef struct SMJoinOperatorInfo {
SSDataBlock* pRes;
int32_t joinType;
int32_t inputOrder;
bool downstreamInitDone[2];
bool downstreamFetchDone[2];
int16_t downstreamResBlkId[2];
@ -671,6 +672,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
if (!pJoinInfo->downstreamFetchDone[0]) {
pJoinInfo->pLeft = getNextBlockFromDownstreamRemain(pOperator, 0);
pJoinInfo->downstreamInitDone[0] = true;
pJoinInfo->leftPos = 0;
qError("merge join left got block, rows:%" PRId64, pJoinInfo->pLeft ? pJoinInfo->pLeft->info.rows : 0);
@ -679,9 +681,8 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
}
if (pJoinInfo->pLeft == NULL) {
if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstreamNum > 0) {
if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstream && !pJoinInfo->downstreamInitDone[1]) {
leftEmpty = true;
((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstreamNum--;
} else {
setMergeJoinDone(pOperator);
return false;
@ -692,10 +693,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
if (!pJoinInfo->downstreamFetchDone[1]) {
pJoinInfo->pRight = getNextBlockFromDownstreamRemain(pOperator, 1);
if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstreamNum > 0) {
((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstreamNum--;
}
pJoinInfo->downstreamInitDone[1] = true;
pJoinInfo->rightPos = 0;
qError("merge join right got block, rows:%" PRId64, pJoinInfo->pRight ? pJoinInfo->pRight->info.rows : 0);
@ -783,6 +781,8 @@ void resetMergeJoinOperator(struct SOperatorInfo* pOperator) {
pJoinInfo->rightPos = 0;
pJoinInfo->downstreamFetchDone[0] = false;
pJoinInfo->downstreamFetchDone[1] = false;
pJoinInfo->downstreamInitDone[0] = false;
pJoinInfo->downstreamInitDone[1] = false;
pJoinInfo->resRows = 0;
pOperator->status = OP_OPENED;
}

View File

@ -348,6 +348,24 @@ static int32_t caseWhenNodeCopy(const SCaseWhenNode* pSrc, SCaseWhenNode* pDst)
return TSDB_CODE_SUCCESS;
}
static int32_t copyHintValue(const SHintNode* pSrc, SHintNode* pDst) {
if (NULL == pSrc->value) {
pDst->value = NULL;
return TSDB_CODE_SUCCESS;
}
switch (pSrc->option) {
default:
break;
}
return TSDB_CODE_SUCCESS;
}
static int32_t hintNodeCopy(const SHintNode* pSrc, SHintNode* pDst) {
COPY_SCALAR_FIELD(type);
COPY_SCALAR_FIELD(option);
return copyHintValue(pSrc, pDst);
}
static int32_t logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
CLONE_NODE_LIST_FIELD(pTargets);
CLONE_NODE_FIELD(pConditions);
@ -363,6 +381,7 @@ static int32_t logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
COPY_SCALAR_FIELD(outputTsOrder);
COPY_SCALAR_FIELD(dynamicOp);
COPY_SCALAR_FIELD(forceCreateNonBlockingOptr);
CLONE_NODE_LIST_FIELD(pHint);
return TSDB_CODE_SUCCESS;
}
@ -703,6 +722,7 @@ static int32_t selectStmtCopy(const SSelectStmt* pSrc, SSelectStmt* pDst) {
COPY_SCALAR_FIELD(timeLineResMode);
COPY_SCALAR_FIELD(hasAggFuncs);
COPY_SCALAR_FIELD(hasRepeatScanFuncs);
CLONE_NODE_LIST_FIELD(pHint);
return TSDB_CODE_SUCCESS;
}
@ -791,6 +811,9 @@ SNode* nodesCloneNode(const SNode* pNode) {
case QUERY_NODE_CASE_WHEN:
code = caseWhenNodeCopy((const SCaseWhenNode*)pNode, (SCaseWhenNode*)pDst);
break;
case QUERY_NODE_HINT:
code = hintNodeCopy((const SHintNode*)pNode, (SHintNode*)pDst);
break;
case QUERY_NODE_SELECT_STMT:
code = selectStmtCopy((const SSelectStmt*)pNode, (SSelectStmt*)pDst);
break;

View File

@ -610,6 +610,7 @@ static void destroyLogicNode(SLogicNode* pNode) {
nodesDestroyList(pNode->pChildren);
nodesDestroyNode(pNode->pLimit);
nodesDestroyNode(pNode->pSlimit);
nodesDestroyList(pNode->pHint);
}
static void destroyPhysiNode(SPhysiNode* pNode) {
@ -827,7 +828,6 @@ void nodesDestroyNode(SNode* pNode) {
}
case QUERY_NODE_HINT: {
SHintNode* pHint = (SHintNode*)pNode;
taosMemoryFreeClear(pHint->literal);
destroyHintValue(pHint->option, pHint->value);
break;
}

View File

@ -101,7 +101,7 @@ SNodeList* addNodeToList(SAstCreateContext* pCxt, SNodeList* pList, SNode* pNode
SNode* createColumnNode(SAstCreateContext* pCxt, SToken* pTableAlias, SToken* pColumnName);
SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken* pLiteral);
SNode* createHintNode(SAstCreateContext* pCxt, EHintOption option, const SToken* pLiteral);
SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral);
SNode* createIdentifierValueNode(SAstCreateContext* pCxt, SToken* pLiteral);
SNode* createDurationValueNode(SAstCreateContext* pCxt, const SToken* pLiteral);
SNode* createDefaultDatabaseCondValue(SAstCreateContext* pCxt);

View File

@ -1025,8 +1025,8 @@ query_specification(A) ::=
%type hint_list { SNodeList* }
%destructor hint_list { nodesDestroyList($$); }
hint_list(A) ::= . { createHintNodeList(pCxt, NULL); }
hint_list(A) ::= NK_HINT(B). { createHintNodeList(pCxt, &B); }
hint_list(A) ::= . { A = createHintNodeList(pCxt, NULL); }
hint_list(A) ::= NK_HINT(B). { A = createHintNodeList(pCxt, &B); }
%type set_quantifier_opt { bool }
%destructor set_quantifier_opt { }

View File

@ -348,15 +348,121 @@ SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken*
return (SNode*)val;
}
SNode* createHintNode(SAstCreateContext* pCxt, EHintOption option, const SToken* pLiteral) {
CHECK_PARSER_STATUS(pCxt);
bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOption opt, SToken* paramList, int32_t paramNum) {
void* value = NULL;
switch (opt) {
case HINT_BATCH_SCAN:
case HINT_NO_BATCH_SCAN: {
if (paramNum > 0) {
return true;
}
break;
}
default:
return true;
}
SHintNode* hint = (SHintNode*)nodesMakeNode(QUERY_NODE_HINT);
CHECK_OUT_OF_MEM(hint);
hint->option = option;
hint->literal = strndup(pLiteral->z, pLiteral->n);
trimString(pLiteral->z, pLiteral->n, hint->literal, pLiteral->n);
CHECK_OUT_OF_MEM(hint->literal);
return (SNode*)hint;
hint->option = opt;
hint->value = value;
if (NULL == *ppHintList) {
*ppHintList = nodesMakeList();
CHECK_OUT_OF_MEM(*ppHintList);
}
pCxt->errCode = nodesListStrictAppend(*ppHintList, (SNode*)hint);
if (pCxt->errCode) {
return true;
}
return false;
}
SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral) {
CHECK_PARSER_STATUS(pCxt);
if (NULL == pLiteral || pLiteral->n <= 5) {
return NULL;
}
SNodeList* pHintList = NULL;
char* hint = strndup(pLiteral->z + 3, pLiteral->n - 5);
int32_t i = 0;
bool quit = false;
bool inParamList = false;
bool lastComma = false;
EHintOption opt = 0;
int32_t paramNum = 0;
SToken paramList[10];
while (!quit) {
SToken t0 = {0};
if (hint[i] == 0) {
break;
}
t0.n = tGetToken(&hint[i], &t0.type);
t0.z = hint + i;
i += t0.n;
switch (t0.type) {
case TK_BATCH_SCAN:
lastComma = false;
if (0 != opt || inParamList) {
quit = true;
break;
}
opt = HINT_BATCH_SCAN;
break;
case TK_NO_BATCH_SCAN:
lastComma = false;
if (0 != opt || inParamList) {
quit = true;
break;
}
opt = HINT_NO_BATCH_SCAN;
break;
case TK_NK_LP:
lastComma = false;
if (0 == opt || inParamList) {
quit = true;
}
inParamList = true;
break;
case TK_NK_RP:
lastComma = false;
if (0 == opt || !inParamList) {
quit = true;
} else {
quit = addHintNodeToList(pCxt, &pHintList, opt, paramList, paramNum);
inParamList = false;
paramNum = 0;
opt = 0;
}
break;
case TK_NK_ID:
lastComma = false;
if (0 == opt || !inParamList) {
quit = true;
} else {
paramList[paramNum++] = t0;
}
break;
case TK_NK_COMMA:
if (lastComma) {
quit = true;
}
lastComma = true;
break;
case TK_NK_SPACE:
break;
default:
lastComma = false;
quit = true;
break;
}
}
taosMemoryFree(hint);
return pHintList;
}
SNode* createIdentifierValueNode(SAstCreateContext* pCxt, SToken* pLiteral) {

View File

@ -392,10 +392,6 @@ uint32_t tGetToken(const char* z, uint32_t* tokenId) {
return 1;
}
case '*': {
if (z[1] == '/') {
*tokenId = TK_NK_HINT_END;
return 2;
}
*tokenId = TK_NK_STAR;
return 1;
}
@ -406,13 +402,12 @@ uint32_t tGetToken(const char* z, uint32_t* tokenId) {
}
bool isHint = false;
if (z[2] == '+') {
*tokenId = TK_NK_HINT_BEGIN;
return 3;
isHint = true;
}
for (i = 3; z[i] && (z[i] != '/' || z[i - 1] != '*'); i++) {
}
if (z[i]) i++;
*tokenId = TK_NK_COMMENT;
*tokenId = isHint ? TK_NK_HINT : TK_NK_COMMENT;
return i;
}
case '%': {

File diff suppressed because it is too large Load Diff

View File

@ -21,6 +21,7 @@ extern "C" {
#endif
#include "planner.h"
#include "tsimplehash.h"
#include "taoserror.h"
#define planFatal(param, ...) qFatal("PLAN: " param, ##__VA_ARGS__)
@ -46,6 +47,8 @@ int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryP
bool isPartTableAgg(SAggLogicNode* pAgg);
bool isPartTableWinodw(SWindowLogicNode* pWindow);
bool getBatchScanOptionFromHint(SNodeList* pList);
SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr);
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);
#ifdef __cplusplus
}

View File

@ -478,10 +478,34 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
// set the output
if (TSDB_CODE_SUCCESS == code) {
SNodeList* pColList = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, NULL, COLLECT_COL_TYPE_ALL, &pColList);
if (QUERY_NODE_REAL_TABLE == nodeType(pJoinTable->pLeft)) {
code = nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, ((SRealTableNode*)pJoinTable->pLeft)->table.tableAlias, COLLECT_COL_TYPE_ALL, &pColList);
} else {
pJoin->node.pTargets = nodesCloneList(pLeft->pTargets);
if (NULL == pJoin->node.pTargets) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code && NULL != pColList) {
code = createColumnByRewriteExprs(pColList, &pJoin->node.pTargets);
}
}
if (TSDB_CODE_SUCCESS == code) {
SNodeList* pColList = NULL;
if (QUERY_NODE_REAL_TABLE == nodeType(pJoinTable->pRight)) {
code = nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, ((SRealTableNode*)pJoinTable->pRight)->table.tableAlias, COLLECT_COL_TYPE_ALL, &pColList);
} else {
if (pJoin->node.pTargets) {
nodesListStrictAppendList(pJoin->node.pTargets, nodesCloneList(pRight->pTargets));
} else {
pJoin->node.pTargets = nodesCloneList(pRight->pTargets);
if (NULL == pJoin->node.pTargets) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
}
if (TSDB_CODE_SUCCESS == code && NULL != pColList) {
code = createColumnByRewriteExprs(pColList, &pJoin->node.pTargets);
}
}
@ -1277,11 +1301,17 @@ static int32_t createSelectFromLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
}
static int32_t createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pSelect->pFromTable) {
return createSelectWithoutFromLogicNode(pCxt, pSelect, pLogicNode);
code = createSelectWithoutFromLogicNode(pCxt, pSelect, pLogicNode);
} else {
return createSelectFromLogicNode(pCxt, pSelect, pLogicNode);
code = createSelectFromLogicNode(pCxt, pSelect, pLogicNode);
}
if (TSDB_CODE_SUCCESS == code && NULL != *pLogicNode) {
(*pLogicNode)->stmtRoot = true;
TSWAP((*pLogicNode)->pHint, pSelect->pHint);
}
return code;
}
static int32_t createSetOpRootLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, FCreateSetOpLogicNode func,

View File

@ -50,8 +50,8 @@ typedef struct SOsdInfo {
} SOsdInfo;
typedef struct SCpdIsMultiTableCondCxt {
SNodeList* pLeftCols;
SNodeList* pRightCols;
SSHashObj* pLeftTbls;
SSHashObj* pRightTbls;
bool havaLeftCol;
bool haveRightCol;
} SCpdIsMultiTableCondCxt;
@ -505,12 +505,21 @@ static bool pushDownCondOptBelongThisTable(SNode* pCondCol, SNodeList* pTableCol
return false;
}
static bool pushDownCondOptColInTableList(SNode* pCondCol, SSHashObj* pTables) {
SColumnNode* pTableCol = (SColumnNode*)pCondCol;
if (NULL == tSimpleHashGet(pTables, pTableCol->tableAlias, strlen(pTableCol->tableAlias))) {
return false;
}
return true;
}
static EDealRes pushDownCondOptIsCrossTableCond(SNode* pNode, void* pContext) {
SCpdIsMultiTableCondCxt* pCxt = pContext;
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
if (pushDownCondOptBelongThisTable(pNode, pCxt->pLeftCols)) {
if (pushDownCondOptColInTableList(pNode, pCxt->pLeftTbls)) {
pCxt->havaLeftCol = true;
} else if (pushDownCondOptBelongThisTable(pNode, pCxt->pRightCols)) {
} else if (pushDownCondOptColInTableList(pNode, pCxt->pRightTbls)) {
pCxt->haveRightCol = true;
}
return pCxt->havaLeftCol && pCxt->haveRightCol ? DEAL_RES_END : DEAL_RES_CONTINUE;
@ -518,10 +527,10 @@ static EDealRes pushDownCondOptIsCrossTableCond(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static ECondAction pushDownCondOptGetCondAction(EJoinType joinType, SNodeList* pLeftCols, SNodeList* pRightCols,
static ECondAction pushDownCondOptGetCondAction(EJoinType joinType, SSHashObj* pLeftTbls, SSHashObj* pRightTbls,
SNode* pNode) {
SCpdIsMultiTableCondCxt cxt = {
.pLeftCols = pLeftCols, .pRightCols = pRightCols, .havaLeftCol = false, .haveRightCol = false};
.pLeftTbls = pLeftTbls, .pRightTbls = pRightTbls, .havaLeftCol = false, .haveRightCol = false};
nodesWalkExpr(pNode, pushDownCondOptIsCrossTableCond, &cxt);
return (JOIN_TYPE_INNER != joinType
? COND_ACTION_STAY
@ -537,9 +546,11 @@ static int32_t pushDownCondOptPartLogicCond(SJoinLogicNode* pJoin, SNode** pOnCo
return TSDB_CODE_SUCCESS;
}
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
int32_t code = TSDB_CODE_SUCCESS;
SSHashObj* pLeftTables = NULL;
SSHashObj* pRightTables = NULL;
collectTableAliasFromNodes(nodesListGetNode(pJoin->node.pChildren, 0), &pLeftTables);
collectTableAliasFromNodes(nodesListGetNode(pJoin->node.pChildren, 1), &pRightTables);
SNodeList* pOnConds = NULL;
SNodeList* pLeftChildConds = NULL;
@ -547,7 +558,7 @@ static int32_t pushDownCondOptPartLogicCond(SJoinLogicNode* pJoin, SNode** pOnCo
SNodeList* pRemainConds = NULL;
SNode* pCond = NULL;
FOREACH(pCond, pLogicCond->pParameterList) {
ECondAction condAction = pushDownCondOptGetCondAction(pJoin->joinType, pLeftCols, pRightCols, pCond);
ECondAction condAction = pushDownCondOptGetCondAction(pJoin->joinType, pLeftTables, pRightTables, pCond);
if (COND_ACTION_PUSH_JOIN == condAction) {
code = nodesListMakeAppend(&pOnConds, nodesCloneNode(pCond));
} else if (COND_ACTION_PUSH_LEFT_CHILD == condAction) {
@ -562,6 +573,9 @@ static int32_t pushDownCondOptPartLogicCond(SJoinLogicNode* pJoin, SNode** pOnCo
}
}
tSimpleHashCleanup(pLeftTables);
tSimpleHashCleanup(pRightTables);
SNode* pTempOnCond = NULL;
SNode* pTempLeftChildCond = NULL;
SNode* pTempRightChildCond = NULL;
@ -601,10 +615,17 @@ static int32_t pushDownCondOptPartLogicCond(SJoinLogicNode* pJoin, SNode** pOnCo
static int32_t pushDownCondOptPartOpCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond,
SNode** pRightChildCond) {
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
SSHashObj* pLeftTables = NULL;
SSHashObj* pRightTables = NULL;
collectTableAliasFromNodes(nodesListGetNode(pJoin->node.pChildren, 0), &pLeftTables);
collectTableAliasFromNodes(nodesListGetNode(pJoin->node.pChildren, 1), &pRightTables);
ECondAction condAction =
pushDownCondOptGetCondAction(pJoin->joinType, pLeftCols, pRightCols, pJoin->node.pConditions);
pushDownCondOptGetCondAction(pJoin->joinType, pLeftTables, pRightTables, pJoin->node.pConditions);
tSimpleHashCleanup(pLeftTables);
tSimpleHashCleanup(pRightTables);
if (COND_ACTION_STAY == condAction) {
return TSDB_CODE_SUCCESS;
} else if (COND_ACTION_PUSH_JOIN == condAction) {
@ -930,7 +951,6 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
if (pJoin->joinAlgo != JOIN_ALGO_UNKNOWN) {
return TSDB_CODE_SUCCESS;
}
pJoin->joinAlgo = JOIN_ALGO_MERGE;
if (NULL == pJoin->node.pConditions) {
int32_t code = pushDownCondOptJoinExtractCond(pCxt, pJoin);
@ -1320,6 +1340,7 @@ static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool group
}
case QUERY_NODE_LOGIC_PLAN_AGG:
case QUERY_NODE_LOGIC_PLAN_PARTITION:
case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL:
*pNotOptimize = true;
return TSDB_CODE_SUCCESS;
default:
@ -3160,7 +3181,11 @@ static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) {
}
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
if (pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2 || pJoin->hasSubQuery || pJoin->joinAlgo != JOIN_ALGO_UNKNOWN) {
if (pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2
|| pJoin->hasSubQuery || pJoin->joinAlgo != JOIN_ALGO_UNKNOWN || (pNode->pParent && nodeType(pNode->pParent) == QUERY_NODE_LOGIC_PLAN_JOIN)) {
if (pJoin->joinAlgo == JOIN_ALGO_UNKNOWN) {
pJoin->joinAlgo = JOIN_ALGO_MERGE;
}
return false;
}
@ -3329,7 +3354,7 @@ static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppL
return code;
}
static int32_t stbJoinOptCreateGroupCacheNode(SOptimizeContext* pCxt, SNodeList* pChildren, SLogicNode** ppLogic) {
static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pRoot, SNodeList* pChildren, SLogicNode** ppLogic) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheLogicNode* pGrpCache = (SGroupCacheLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_GROUP_CACHE);
if (NULL == pGrpCache) {
@ -3339,7 +3364,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SOptimizeContext* pCxt, SNodeList*
//pGrpCache->node.dynamicOp = true;
pGrpCache->grpColsMayBeNull = false;
pGrpCache->grpByUid = true;
pGrpCache->batchFetch = getBatchScanOptionFromHint(((SSelectStmt*)pCxt->pPlanCxt->pAstRoot)->pHint);
pGrpCache->batchFetch = getBatchScanOptionFromHint(pRoot->pHint);
pGrpCache->node.pChildren = pChildren;
pGrpCache->node.pTargets = nodesMakeList();
if (NULL == pGrpCache->node.pTargets) {
@ -3436,7 +3461,7 @@ static int32_t stbJoinOptCreateMergeJoinNode(SLogicNode* pOrig, SLogicNode* pChi
return TSDB_CODE_SUCCESS;
}
static int32_t stbJoinOptCreateDynQueryCtrlNode(SOptimizeContext* pCxt, SLogicNode* pPrev, SLogicNode* pPost, bool* srcScan, SLogicNode** ppDynNode) {
static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pRoot, SLogicNode* pPrev, SLogicNode* pPost, bool* srcScan, SLogicNode** ppDynNode) {
int32_t code = TSDB_CODE_SUCCESS;
SDynQueryCtrlLogicNode* pDynCtrl = (SDynQueryCtrlLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL);
if (NULL == pDynCtrl) {
@ -3444,7 +3469,7 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SOptimizeContext* pCxt, SLogicNo
}
pDynCtrl->qType = DYN_QTYPE_STB_HASH;
pDynCtrl->stbJoin.batchFetch = getBatchScanOptionFromHint(((SSelectStmt*)pCxt->pPlanCxt->pAstRoot)->pHint);
pDynCtrl->stbJoin.batchFetch = getBatchScanOptionFromHint(pRoot->pHint);
memcpy(pDynCtrl->stbJoin.srcScan, srcScan, sizeof(pDynCtrl->stbJoin.srcScan));
if (TSDB_CODE_SUCCESS == code) {
@ -3510,13 +3535,13 @@ static int32_t stbJoinOptRewriteStableJoin(SOptimizeContext* pCxt, SLogicNode* p
code = stbJoinOptCreateTableScanNodes(pJoin, &pTbScanNodes, srcScan);
}
if (TSDB_CODE_SUCCESS == code) {
code = stbJoinOptCreateGroupCacheNode(pCxt, pTbScanNodes, &pGrpCacheNode);
code = stbJoinOptCreateGroupCacheNode(getLogicNodeRootNode(pJoin), pTbScanNodes, &pGrpCacheNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = stbJoinOptCreateMergeJoinNode(pJoin, pGrpCacheNode, &pMJoinNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = stbJoinOptCreateDynQueryCtrlNode(pCxt, pHJoinNode, pMJoinNode, srcScan, &pDynNode);
code = stbJoinOptCreateDynQueryCtrlNode(getLogicNodeRootNode(pJoin), pHJoinNode, pMJoinNode, srcScan, &pDynNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = replaceLogicNode(pLogicSubplan, pJoin, (SLogicNode*)pDynNode);
@ -3542,12 +3567,12 @@ static int32_t stableJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicS
static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "ScanPath", .optimizeFunc = scanPathOptimize},
{.pName = "PushDownCondition", .optimizeFunc = pushDownCondOptimize},
{.pName = "StableJoin", .optimizeFunc = stableJoinOptimize},
{.pName = "sortNonPriKeyOptimize", .optimizeFunc = sortNonPriKeyOptimize},
{.pName = "SortPrimaryKey", .optimizeFunc = sortPrimaryKeyOptimize},
{.pName = "SmaIndex", .optimizeFunc = smaIndexOptimize},
{.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize},
{.pName = "PartitionTags", .optimizeFunc = partTagsOptimize},
{.pName = "StableJoin", .optimizeFunc = stableJoinOptimize},
{.pName = "MergeProjects", .optimizeFunc = mergeProjectsOptimize},
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize},
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},

View File

@ -108,6 +108,7 @@ int32_t createColumnByRewriteExpr(SNode* pExpr, SNodeList** pList) {
}
int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew) {
pNew->stmtRoot = pOld->stmtRoot;
if (NULL == pOld->pParent) {
pSubplan->pNode = (SLogicNode*)pNew;
pNew->pParent = NULL;
@ -125,6 +126,19 @@ int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode*
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr) {
while (pCurr) {
if (pCurr->stmtRoot || NULL == pCurr->pParent) {
return pCurr;
}
pCurr = pCurr->pParent;
}
return NULL;
}
static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel requirement) {
if ((SCAN_TYPE_TABLE != pScan->scanType && SCAN_TYPE_TABLE_MERGE != pScan->scanType) ||
DATA_ORDER_LEVEL_GLOBAL == pScan->node.requireDataOrder) {
@ -391,4 +405,30 @@ bool getBatchScanOptionFromHint(SNodeList* pList) {
return batchScan;
}
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SLogicNode* pCurr = (SLogicNode*)pNode;
FOREACH(pNode, pCurr->pTargets) {
SColumnNode* pCol = (SColumnNode*)pNode;
if (NULL == *ppRes) {
*ppRes = tSimpleHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
tSimpleHashPut(*ppRes, pCol->tableAlias, strlen(pCol->tableAlias), NULL, 0);
}
FOREACH(pNode, pCurr->pChildren) {
code = collectTableAliasFromNodes(pNode, ppRes);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
return TSDB_CODE_SUCCESS;
}

View File

@ -406,7 +406,7 @@ sql insert into tbb_5 values('2021-03-03 05:00:00.000', 99115,99115.0,'b5');
sql insert into tbb_5 values('2021-03-04 05:00:00.000', 99115,99115.0,'b5');
sql insert into tbb_5 values('2021-03-05 05:00:00.000', 99115,99115.0,'b5');
sql select * from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1;
sql select * from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1 order by st0.ts;
if $rows != 25 then
return -1
endi
@ -442,7 +442,7 @@ if $data09 != @21-03-01 01:00:00.000@ then
return -1
endi
sql select * from st0, st1 where st0.ts=st1.ts and st0.id2=st1.id2;
sql select * from st0, st1 where st0.ts=st1.ts and st0.id2=st1.id2 order by st0.ts;
if $rows != 25 then
return -1
endi
@ -478,7 +478,7 @@ if $data09 != @21-03-01 01:00:00.000@ then
return -1
endi
sql select * from st0, st1 where st0.id3=st1.id3 and st1.ts=st0.ts;
sql select * from st0, st1 where st0.id3=st1.id3 and st1.ts=st0.ts order by st0.ts;
if $rows != 25 then
return -1
endi
@ -514,7 +514,7 @@ if $data09 != @21-03-01 01:00:00.000@ then
return -1
endi
sql select * from st0, st1 where st1.id5=st0.id5 and st0.ts=st1.ts;
sql select * from st0, st1 where st1.id5=st0.id5 and st0.ts=st1.ts order by st0.ts;
if $rows != 25 then
return -1
endi
@ -550,7 +550,7 @@ if $data09 != @21-03-01 01:00:00.000@ then
return -1
endi
sql select st0.* from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1;
sql select st0.* from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1 order by st0.ts;
if $rows != 25 then
return -1
endi
@ -583,7 +583,7 @@ if $data08 != 3 then
return -1
endi
sql select st0.* from st0, st1 where st0.ts=st1.ts and st1.id2=st0.id2;
sql select st0.* from st0, st1 where st0.ts=st1.ts and st1.id2=st0.id2 order by st0.ts;
if $rows != 25 then
return -1
endi
@ -616,7 +616,7 @@ if $data08 != 3 then
return -1
endi
sql select st0.* from st0, st1 where st0.id3=st1.id3 and st1.ts=st0.ts;
sql select st0.* from st0, st1 where st0.id3=st1.id3 and st1.ts=st0.ts order by st0.ts;
if $rows != 25 then
return -1
endi
@ -649,7 +649,7 @@ if $data08 != 3 then
return -1
endi
sql select st1.* from st0, st1 where st1.id5=st0.id5 and st0.ts=st1.ts;
sql select st1.* from st0, st1 where st1.id5=st0.id5 and st0.ts=st1.ts order by st1.ts;
if $rows != 25 then
return -1
endi
@ -711,7 +711,7 @@ if $data31 != 9911 then
return -1
endi
sql select st0.ts,st1.ts from st0, st1 where st0.ts=st1.ts and st1.id2=st0.id2;
sql select st0.ts,st1.ts from st0, st1 where st0.ts=st1.ts and st1.id2=st0.id2 order by st1.ts;
if $rows != 25 then
return -1
endi
@ -728,7 +728,7 @@ if $data51 != @21-03-02 01:00:00.000@ then
return -1
endi
sql select st1.ts,st0.ts,st0.id3,st1.id3,st0.f3,st1.f3 from st0, st1 where st0.id3=st1.id3 and st1.ts=st0.ts;
sql select st1.ts,st0.ts,st0.id3,st1.id3,st0.f3,st1.f3 from st0, st1 where st0.id3=st1.id3 and st1.ts=st0.ts order by st1.ts;
if $rows != 25 then
return -1
endi
@ -751,7 +751,7 @@ if $data05 != 11 then
return -1
endi
sql select st0.ts,st0.f2,st1.f3,st1.f2,st0.f3 from st0, st1 where st1.id5=st0.id5 and st0.ts=st1.ts;
sql select st0.ts,st0.f2,st1.f3,st1.f2,st0.f3 from st0, st1 where st1.id5=st0.id5 and st0.ts=st1.ts order by st1.ts;
if $rows != 25 then
return -1
endi
@ -850,7 +850,7 @@ if $data08 != 15 then
return -1
endi
sql select st0.*,st1.* from st0, st1 where st1.id1=st0.id1 and st0.ts=st1.ts and st1.ts=st0.ts and st0.id1=st1.id1;
sql select st0.*,st1.* from st0, st1 where st1.id1=st0.id1 and st0.ts=st1.ts and st1.ts=st0.ts and st0.id1=st1.id1 order by st0.ts;
if $rows != 25 then
return -1
endi
@ -1085,7 +1085,7 @@ endi
# return -1
#endi
sql select st0.*,st1.*,st2.*,st3.*,st4.*,st5.*,st6.*,st7.*,st8.*,st9.* from st0,st1,st2,st3,st4,st5,st6,st7,st8,st9 where st0.ts=st2.ts and st0.ts=st4.ts and st0.ts=st6.ts and st0.ts=st8.ts and st1.ts=st3.ts and st3.ts=st5.ts and st5.ts=st7.ts and st7.ts=st9.ts and st0.ts=st1.ts and st0.id1=st2.id1 and st0.id1=st4.id1 and st0.id1=st6.id1 and st0.id1=st8.id1 and st1.id1=st3.id1 and st3.id1=st5.id1 and st5.id1=st7.id1 and st7.id1=st9.id1 and st0.id1=st1.id1;
sql select st0.*,st1.*,st2.*,st3.*,st4.*,st5.*,st6.*,st7.*,st8.*,st9.* from st0,st1,st2,st3,st4,st5,st6,st7,st8,st9 where st0.ts=st2.ts and st0.ts=st4.ts and st0.ts=st6.ts and st0.ts=st8.ts and st1.ts=st3.ts and st3.ts=st5.ts and st5.ts=st7.ts and st7.ts=st9.ts and st0.ts=st1.ts and st0.id1=st2.id1 and st0.id1=st4.id1 and st0.id1=st6.id1 and st0.id1=st8.id1 and st1.id1=st3.id1 and st3.id1=st5.id1 and st5.id1=st7.id1 and st7.id1=st9.id1 and st0.id1=st1.id1 order by st1.ts;
if $rows != 25 then
return -1
endi