feat: super table order by primary key optimization
This commit is contained in:
parent
3158f705fc
commit
015193e816
|
@ -104,6 +104,7 @@ typedef struct SJoinLogicNode {
|
|||
SNode* pMergeCondition;
|
||||
SNode* pOnConditions;
|
||||
bool isSingleTableJoin;
|
||||
EOrder inputTsOrder;
|
||||
} SJoinLogicNode;
|
||||
|
||||
typedef struct SAggLogicNode {
|
||||
|
@ -201,6 +202,7 @@ typedef struct SWindowLogicNode {
|
|||
int64_t watermark;
|
||||
int8_t igExpired;
|
||||
EWindowAlgorithm windowAlgo;
|
||||
EOrder inputTsOrder;
|
||||
} SWindowLogicNode;
|
||||
|
||||
typedef struct SFillLogicNode {
|
||||
|
@ -356,15 +358,14 @@ typedef struct SInterpFuncPhysiNode {
|
|||
SNode* pTimeSeries; // SColumnNode
|
||||
} SInterpFuncPhysiNode;
|
||||
|
||||
typedef struct SJoinPhysiNode {
|
||||
typedef struct SSortMergeJoinPhysiNode {
|
||||
SPhysiNode node;
|
||||
EJoinType joinType;
|
||||
SNode* pMergeCondition;
|
||||
SNode* pOnConditions;
|
||||
SNodeList* pTargets;
|
||||
} SJoinPhysiNode;
|
||||
|
||||
typedef SJoinPhysiNode SSortMergeJoinPhysiNode;
|
||||
EOrder inputTsOrder;
|
||||
} SSortMergeJoinPhysiNode;
|
||||
|
||||
typedef struct SAggPhysiNode {
|
||||
SPhysiNode node;
|
||||
|
|
|
@ -135,7 +135,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
|
|||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
|
||||
SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode;
|
||||
SSortMergeJoinPhysiNode *pJoinNode = (SSortMergeJoinPhysiNode *)pNode;
|
||||
pPhysiChildren = pJoinNode->node.pChildren;
|
||||
break;
|
||||
}
|
||||
|
@ -434,7 +434,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
|
||||
STableScanPhysiNode *pTblScanNode = (STableScanPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level,
|
||||
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == pNode->type ? EXPLAIN_TBL_MERGE_SCAN_FORMAT : EXPLAIN_TBL_SCAN_FORMAT,
|
||||
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == pNode->type ? EXPLAIN_TBL_MERGE_SCAN_FORMAT
|
||||
: EXPLAIN_TBL_SCAN_FORMAT,
|
||||
pTblScanNode->scan.tableName.tname);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
|
@ -551,7 +552,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
if (pSTblScanNode->scan.pScanPseudoCols) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pSTblScanNode->scan.pScanPseudoCols->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
|
@ -613,7 +614,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
|
||||
SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode;
|
||||
SSortMergeJoinPhysiNode *pJoinNode = (SSortMergeJoinPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
|
@ -1180,7 +1181,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
if (pDistScanNode->pScanPseudoCols) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pDistScanNode->pScanPseudoCols->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDistScanNode->node.pOutputDataBlockDesc->totalRowSize);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
|
@ -1367,7 +1368,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pInterpNode->pFuncs->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
|
||||
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_MODE_FORMAT, nodesGetFillModeString(pInterpNode->fillMode));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
|
||||
|
@ -1419,7 +1420,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
default:
|
||||
qError("not supported physical node type %d", pNode->type);
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
|
|
|
@ -321,6 +321,49 @@ typedef struct STableScanInfo {
|
|||
int8_t noTable;
|
||||
} STableScanInfo;
|
||||
|
||||
typedef struct STableMergeScanInfo {
|
||||
STableListInfo* tableListInfo;
|
||||
int32_t tableStartIndex;
|
||||
int32_t tableEndIndex;
|
||||
bool hasGroupId;
|
||||
uint64_t groupId;
|
||||
SArray* dataReaders; // array of tsdbReaderT*
|
||||
SReadHandle readHandle;
|
||||
int32_t bufPageSize;
|
||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||
SArray* pSortInfo;
|
||||
SSortHandle* pSortHandle;
|
||||
|
||||
SSDataBlock* pSortInputBlock;
|
||||
int64_t startTs; // sort start time
|
||||
SArray* sortSourceParams;
|
||||
|
||||
SFileBlockLoadRecorder readRecorder;
|
||||
int64_t numOfRows;
|
||||
SScanInfo scanInfo;
|
||||
int32_t scanTimes;
|
||||
SNode* pFilterNode; // filter info, which is push down by optimizer
|
||||
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
|
||||
SResultRowInfo* pResultRowInfo;
|
||||
int32_t* rowEntryInfoOffset;
|
||||
SExprInfo* pExpr;
|
||||
SSDataBlock* pResBlock;
|
||||
SArray* pColMatchInfo;
|
||||
int32_t numOfOutput;
|
||||
|
||||
SExprInfo* pPseudoExpr;
|
||||
int32_t numOfPseudoExpr;
|
||||
SqlFunctionCtx* pPseudoCtx;
|
||||
|
||||
SQueryTableDataCond cond;
|
||||
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
||||
int32_t dataBlockLoadFlag;
|
||||
// if the upstream is an interval operator, the interval info is also kept here to get the time
|
||||
// window to check if current data block needs to be loaded.
|
||||
SInterval interval;
|
||||
SSampleExecInfo sample; // sample execution info
|
||||
} STableMergeScanInfo;
|
||||
|
||||
typedef struct STagScanInfo {
|
||||
SColumnInfo *pCols;
|
||||
SSDataBlock *pRes;
|
||||
|
@ -881,7 +924,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
|||
|
||||
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode,
|
||||
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
|
||||
|
|
|
@ -1356,7 +1356,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM
|
|||
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
|
||||
|
||||
if (pColMatchInfo != NULL) {
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) {
|
||||
SColMatchInfo* pInfo = taosArrayGet(pColMatchInfo, i);
|
||||
if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->targetSlotId);
|
||||
|
@ -2885,11 +2885,16 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
|
|||
*order = TSDB_ORDER_ASC;
|
||||
*scanFlag = MAIN_SCAN;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
|
||||
} else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
*order = pTableScanInfo->cond.order;
|
||||
*scanFlag = pTableScanInfo->scanFlag;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
|
||||
STableMergeScanInfo* pTableScanInfo = pOperator->info;
|
||||
*order = pTableScanInfo->cond.order;
|
||||
*scanFlag = pTableScanInfo->scanFlag;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
@ -3307,9 +3312,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo;
|
||||
SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo;
|
||||
|
||||
while(1) {
|
||||
while (1) {
|
||||
while (1) {
|
||||
blockDataCleanup(pRes);
|
||||
|
||||
|
@ -3326,7 +3331,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
if (pLimitInfo->remainGroupOffset > 0) {
|
||||
if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.groupId) { // it is the first group
|
||||
if (pLimitInfo->currentGroupId == 0 ||
|
||||
pLimitInfo->currentGroupId == pBlock->info.groupId) { // it is the first group
|
||||
pLimitInfo->currentGroupId = pBlock->info.groupId;
|
||||
continue;
|
||||
} else if (pLimitInfo->currentGroupId != pBlock->info.groupId) {
|
||||
|
@ -4265,7 +4271,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
|
|||
}
|
||||
|
||||
// this the tags and pseudo function columns, we only keep the tag columns
|
||||
for(int32_t i = 0; i < numOfTags; ++i) {
|
||||
for (int32_t i = 0; i < numOfTags; ++i) {
|
||||
STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i);
|
||||
|
||||
int32_t type = nodeType(pNode->pExpr);
|
||||
|
@ -4381,7 +4387,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
|||
int32_t groupNum = 0;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
|
||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||
int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId);
|
||||
int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -4701,7 +4707,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
|
||||
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
|
||||
pOptr = createMergeJoinOperatorInfo(ops, size, (SJoinPhysiNode*)pPhyNode, pTaskInfo);
|
||||
pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
|
||||
pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
|
||||
|
|
|
@ -28,30 +28,30 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
|
|||
static void destroyMergeJoinOperator(void* param, int32_t numOfOutput);
|
||||
static void extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode);
|
||||
|
||||
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
|
||||
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
|
||||
SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pOperator == NULL || pInfo == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
pInfo->pRes = pResBlock;
|
||||
pOperator->name = "MergeJoinOperator";
|
||||
pInfo->pRes = pResBlock;
|
||||
pOperator->name = "MergeJoinOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
SNode* pMergeCondition = pJoinNode->pMergeCondition;
|
||||
if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) {
|
||||
|
@ -104,7 +104,7 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
|
|||
void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
|
||||
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
|
||||
nodesDestroyNode(pJoinOperator->pCondAfterMerge);
|
||||
|
||||
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
|
|
|
@ -274,7 +274,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
|||
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
} else {
|
||||
qDebug("%s data block filter out, elapsed time:%"PRId64, GET_TASKID(pTaskInfo), (et - st));
|
||||
qDebug("%s data block filter out, elapsed time:%" PRId64, GET_TASKID(pTaskInfo), (et - st));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1838,11 +1838,14 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
int8_t tagType = smr.me.stbEntry.schemaTag.pSchema[i].type;
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 4);
|
||||
char tagTypeStr[VARSTR_HEADER_SIZE + 32];
|
||||
int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name);
|
||||
int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name);
|
||||
if (tagType == TSDB_DATA_TYPE_VARCHAR) {
|
||||
tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", (int32_t)(smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE));
|
||||
tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
|
||||
(int32_t)(smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE));
|
||||
} else if (tagType == TSDB_DATA_TYPE_NCHAR) {
|
||||
tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", (int32_t)((smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
||||
tagTypeLen +=
|
||||
sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
|
||||
(int32_t)((smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
||||
}
|
||||
varDataSetLen(tagTypeStr, tagTypeLen);
|
||||
colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false);
|
||||
|
@ -2527,49 +2530,6 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
typedef struct STableMergeScanInfo {
|
||||
STableListInfo* tableListInfo;
|
||||
int32_t tableStartIndex;
|
||||
int32_t tableEndIndex;
|
||||
bool hasGroupId;
|
||||
uint64_t groupId;
|
||||
SArray* dataReaders; // array of tsdbReaderT*
|
||||
SReadHandle readHandle;
|
||||
int32_t bufPageSize;
|
||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||
SArray* pSortInfo;
|
||||
SSortHandle* pSortHandle;
|
||||
|
||||
SSDataBlock* pSortInputBlock;
|
||||
int64_t startTs; // sort start time
|
||||
SArray* sortSourceParams;
|
||||
|
||||
SFileBlockLoadRecorder readRecorder;
|
||||
int64_t numOfRows;
|
||||
SScanInfo scanInfo;
|
||||
int32_t scanTimes;
|
||||
SNode* pFilterNode; // filter info, which is push down by optimizer
|
||||
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
|
||||
SResultRowInfo* pResultRowInfo;
|
||||
int32_t* rowEntryInfoOffset;
|
||||
SExprInfo* pExpr;
|
||||
SSDataBlock* pResBlock;
|
||||
SArray* pColMatchInfo;
|
||||
int32_t numOfOutput;
|
||||
|
||||
SExprInfo* pPseudoExpr;
|
||||
int32_t numOfPseudoExpr;
|
||||
SqlFunctionCtx* pPseudoCtx;
|
||||
|
||||
SQueryTableDataCond cond;
|
||||
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
||||
int32_t dataBlockLoadFlag;
|
||||
// if the upstream is an interval operator, the interval info is also kept here to get the time
|
||||
// window to check if current data block needs to be loaded.
|
||||
SInterval interval;
|
||||
SSampleExecInfo sample; // sample execution info
|
||||
} STableMergeScanInfo;
|
||||
|
||||
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
|
||||
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
|
||||
const char* idStr) {
|
||||
|
@ -2975,6 +2935,7 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
|
||||
taosArrayDestroy(pTableScanInfo->pSortInfo);
|
||||
|
||||
taosMemoryFreeClear(pTableScanInfo->rowEntryInfoOffset);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
|
|
|
@ -1717,7 +1717,7 @@ static const char* jkJoinPhysiPlanOnConditions = "OnConditions";
|
|||
static const char* jkJoinPhysiPlanTargets = "Targets";
|
||||
|
||||
static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SJoinPhysiNode* pNode = (const SJoinPhysiNode*)pObj;
|
||||
const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj;
|
||||
|
||||
int32_t code = physicPlanNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -1737,7 +1737,7 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
|
|||
}
|
||||
|
||||
static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) {
|
||||
SJoinPhysiNode* pNode = (SJoinPhysiNode*)pObj;
|
||||
SSortMergeJoinPhysiNode* pNode = (SSortMergeJoinPhysiNode*)pObj;
|
||||
|
||||
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -468,7 +468,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
|
|||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
|
||||
SJoinPhysiNode* pJoin = (SJoinPhysiNode*)pNode;
|
||||
SSortMergeJoinPhysiNode* pJoin = (SSortMergeJoinPhysiNode*)pNode;
|
||||
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlan(pJoin->pMergeCondition, order, walker, pContext);
|
||||
|
|
|
@ -287,7 +287,7 @@ SNode* nodesMakeNode(ENodeType type) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return makeNode(type, sizeof(SProjectPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
|
||||
return makeNode(type, sizeof(SJoinPhysiNode));
|
||||
return makeNode(type, sizeof(SSortMergeJoinPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG:
|
||||
return makeNode(type, sizeof(SAggPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
|
||||
|
@ -883,7 +883,7 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
|
||||
SJoinPhysiNode* pPhyNode = (SJoinPhysiNode*)pNode;
|
||||
SSortMergeJoinPhysiNode* pPhyNode = (SSortMergeJoinPhysiNode*)pNode;
|
||||
destroyPhysiNode((SPhysiNode*)pPhyNode);
|
||||
nodesDestroyNode(pPhyNode->pMergeCondition);
|
||||
nodesDestroyNode(pPhyNode->pOnConditions);
|
||||
|
|
|
@ -739,12 +739,13 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname, SArray* tagName, uint8_t tagNum) {
|
||||
static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
|
||||
SArray* tagName, uint8_t tagNum) {
|
||||
pTbReq->type = TD_CHILD_TABLE;
|
||||
pTbReq->name = strdup(tname);
|
||||
pTbReq->ctb.suid = suid;
|
||||
pTbReq->ctb.tagNum = tagNum;
|
||||
if(sname) pTbReq->ctb.name = strdup(sname);
|
||||
if (sname) pTbReq->ctb.name = strdup(sname);
|
||||
pTbReq->ctb.pTag = (uint8_t*)pTag;
|
||||
pTbReq->ctb.tagName = taosArrayDup(tagName);
|
||||
pTbReq->commentLen = -1;
|
||||
|
@ -969,7 +970,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
|
|||
}
|
||||
|
||||
SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]];
|
||||
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // todo this can be optimize with parse column
|
||||
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // todo this can be optimize with parse column
|
||||
code = checkAndTrimValue(&sToken, tmpTokenBuf, &pCxt->msg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
|
@ -1012,7 +1013,8 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
|
|||
goto end;
|
||||
}
|
||||
|
||||
buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName, pCxt->pTableMeta->tableInfo.numOfTags);
|
||||
buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName,
|
||||
pCxt->pTableMeta->tableInfo.numOfTags);
|
||||
|
||||
end:
|
||||
for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
|
||||
|
@ -1650,7 +1652,6 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) {
|
|||
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
|
||||
SName name;
|
||||
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
|
||||
CHECK_CODE(reserveDbCfgInCache(pCxt->pComCxt->acctId, name.dbname, pCxt->pMetaCache));
|
||||
CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache));
|
||||
CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache));
|
||||
CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
|
||||
|
@ -2332,7 +2333,8 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
|
|||
return ret;
|
||||
}
|
||||
|
||||
buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, pTableMeta->tableInfo.numOfTags);
|
||||
buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName,
|
||||
pTableMeta->tableInfo.numOfTags);
|
||||
taosArrayDestroy(tagName);
|
||||
|
||||
smlHandle->tableExecHandle.createTblReq.ctb.name = taosMemoryMalloc(sTableNameLen + 1);
|
||||
|
|
|
@ -92,7 +92,7 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
|||
case TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG:
|
||||
return "sliding value no larger than the interval value";
|
||||
case TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL:
|
||||
return "sliding value can not less than 1% of interval value";
|
||||
return "sliding value can not less than 1%% of interval value";
|
||||
case TSDB_CODE_PAR_ONLY_ONE_JSON_TAG:
|
||||
return "Only one tag if there is a json tag";
|
||||
case TSDB_CODE_PAR_INCORRECT_NUM_OF_COL:
|
||||
|
|
|
@ -993,22 +993,28 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) {
|
|||
}
|
||||
|
||||
static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeList** pScanNodes) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN: {
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
|
||||
if (NULL != pScan->pGroupTags) {
|
||||
*pNotOptimize = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
return nodesListMakeAppend(pScanNodes, (SNode*)pNode);
|
||||
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||
code =
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_JOIN: {
|
||||
int32_t code =
|
||||
sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code =
|
||||
sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), pNotOptimize, pScanNodes);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||
case QUERY_NODE_LOGIC_PLAN_PARTITION:
|
||||
*pNotOptimize = true;
|
||||
return code;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -1034,6 +1040,18 @@ static EOrder sortPriKeyOptGetPriKeyOrder(SSortLogicNode* pSort) {
|
|||
return ((SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0))->order;
|
||||
}
|
||||
|
||||
static void sortPriKeyOptSetParentOrder(SLogicNode* pNode, EOrder order) {
|
||||
if (NULL == pNode) {
|
||||
return;
|
||||
}
|
||||
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode)) {
|
||||
((SWindowLogicNode*)pNode)->inputTsOrder = order;
|
||||
} else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode)) {
|
||||
((SJoinLogicNode*)pNode)->inputTsOrder = order;
|
||||
}
|
||||
sortPriKeyOptSetParentOrder(pNode->pParent, order);
|
||||
}
|
||||
|
||||
static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort,
|
||||
SNodeList* pScanNodes) {
|
||||
EOrder order = sortPriKeyOptGetPriKeyOrder(pSort);
|
||||
|
@ -1048,6 +1066,7 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS
|
|||
pScan->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL;
|
||||
pScan->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL;
|
||||
}
|
||||
sortPriKeyOptSetParentOrder(pScan->node.pParent, order);
|
||||
}
|
||||
|
||||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0);
|
||||
|
@ -1613,10 +1632,10 @@ static void alignProjectionWithTarget(SLogicNode* pNode) {
|
|||
}
|
||||
|
||||
SProjectLogicNode* pProjectNode = (SProjectLogicNode*)pNode;
|
||||
SNode* pProjection = NULL;
|
||||
SNode* pProjection = NULL;
|
||||
FOREACH(pProjection, pProjectNode->pProjections) {
|
||||
SNode* pTarget = NULL;
|
||||
bool keep = false;
|
||||
bool keep = false;
|
||||
FOREACH(pTarget, pNode->pTargets) {
|
||||
if (0 == strcmp(((SColumnNode*)pProjection)->node.aliasName, ((SColumnNode*)pTarget)->colName)) {
|
||||
keep = true;
|
||||
|
|
|
@ -621,8 +621,8 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
|
|||
|
||||
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
|
||||
SPhysiNode** pPhyNode) {
|
||||
SJoinPhysiNode* pJoin =
|
||||
(SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN);
|
||||
SSortMergeJoinPhysiNode* pJoin =
|
||||
(SSortMergeJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN);
|
||||
if (NULL == pJoin) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
|
|
@ -469,7 +469,7 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, SNodeList** pMergeKeys) {
|
||||
static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, EOrder order, SNodeList** pMergeKeys) {
|
||||
SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
|
||||
if (NULL == pMergeKey) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -479,7 +479,7 @@ static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, SNodeList**
|
|||
nodesDestroyNode((SNode*)pMergeKey);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pMergeKey->order = ORDER_ASC;
|
||||
pMergeKey->order = order;
|
||||
pMergeKey->nullOrder = NULL_ORDER_FIRST;
|
||||
return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pMergeKey);
|
||||
}
|
||||
|
@ -491,7 +491,8 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
|
|||
((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_HASH;
|
||||
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_MERGE;
|
||||
SNodeList* pMergeKeys = NULL;
|
||||
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
|
||||
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk,
|
||||
((SWindowLogicNode*)pInfo->pSplitNode)->inputTsOrder, &pMergeKeys);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true);
|
||||
}
|
||||
|
@ -579,7 +580,8 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl
|
|||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0);
|
||||
|
||||
SNodeList* pMergeKeys = NULL;
|
||||
int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys);
|
||||
int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk,
|
||||
((SWindowLogicNode*)pWindow)->inputTsOrder, &pMergeKeys);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true);
|
||||
|
@ -950,7 +952,8 @@ static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOu
|
|||
pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE;
|
||||
pMergeScan->node.pChildren = pChildren;
|
||||
splSetParent((SLogicNode*)pMergeScan);
|
||||
code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan), &pMergeKeys);
|
||||
code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan),
|
||||
pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -1020,14 +1023,14 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
|
|||
}
|
||||
|
||||
static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeList** pMergeKeys) {
|
||||
SNode* pPrimaryKey =
|
||||
nodesCloneNode(stbSplFindPrimaryKeyFromScan((SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0)));
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0);
|
||||
SNode* pPrimaryKey = nodesCloneNode(stbSplFindPrimaryKeyFromScan(pScan));
|
||||
if (NULL == pPrimaryKey) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
int32_t code = nodesListAppend(pPart->pTargets, pPrimaryKey);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pMergeKeys);
|
||||
code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, pMergeKeys);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -512,7 +512,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_OFFSET_UNIT, "Cannot use 'year' as
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG, "Interval offset should be shorter than interval")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_UNIT, "Does not support sliding when interval is natural month/year")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG, "sliding value no larger than the interval value")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL, "sliding value can not less than 1% of interval value")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL, "sliding value can not less than 1%% of interval value")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_ONLY_ONE_JSON_TAG, "Only one tag if there is a json tag")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INCORRECT_NUM_OF_COL, "Query block has incorrect number of result columns")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL, "Incorrect TIMESTAMP value")
|
||||
|
|
Loading…
Reference in New Issue