diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e4038b27de..a96f737cc2 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -213,6 +213,214 @@ typedef enum _mgmt_table { #define TD_REQ_FROM_APP 0 #define TD_REQ_FROM_TAOX 1 +typedef enum ENodeType { + // Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN, + // VALUE, OPERATOR, FUNCTION and so on. + QUERY_NODE_COLUMN = 1, + QUERY_NODE_VALUE, + QUERY_NODE_OPERATOR, + QUERY_NODE_LOGIC_CONDITION, + QUERY_NODE_FUNCTION, + QUERY_NODE_REAL_TABLE, + QUERY_NODE_TEMP_TABLE, + QUERY_NODE_JOIN_TABLE, + QUERY_NODE_GROUPING_SET, + QUERY_NODE_ORDER_BY_EXPR, + QUERY_NODE_LIMIT, + QUERY_NODE_STATE_WINDOW, + QUERY_NODE_SESSION_WINDOW, + QUERY_NODE_INTERVAL_WINDOW, + QUERY_NODE_NODE_LIST, + QUERY_NODE_FILL, + QUERY_NODE_RAW_EXPR, // Only be used in parser module. + QUERY_NODE_TARGET, + QUERY_NODE_DATABLOCK_DESC, + QUERY_NODE_SLOT_DESC, + QUERY_NODE_COLUMN_DEF, + QUERY_NODE_DOWNSTREAM_SOURCE, + QUERY_NODE_DATABASE_OPTIONS, + QUERY_NODE_TABLE_OPTIONS, + QUERY_NODE_INDEX_OPTIONS, + QUERY_NODE_EXPLAIN_OPTIONS, + QUERY_NODE_STREAM_OPTIONS, + QUERY_NODE_LEFT_VALUE, + QUERY_NODE_COLUMN_REF, + QUERY_NODE_WHEN_THEN, + QUERY_NODE_CASE_WHEN, + QUERY_NODE_EVENT_WINDOW, + + // Statement nodes are used in parser and planner module. + QUERY_NODE_SET_OPERATOR = 100, + QUERY_NODE_SELECT_STMT, + QUERY_NODE_VNODE_MODIFY_STMT, + QUERY_NODE_CREATE_DATABASE_STMT, + QUERY_NODE_DROP_DATABASE_STMT, + QUERY_NODE_ALTER_DATABASE_STMT, + QUERY_NODE_FLUSH_DATABASE_STMT, + QUERY_NODE_TRIM_DATABASE_STMT, + QUERY_NODE_CREATE_TABLE_STMT, + QUERY_NODE_CREATE_SUBTABLE_CLAUSE, + QUERY_NODE_CREATE_MULTI_TABLES_STMT, + QUERY_NODE_DROP_TABLE_CLAUSE, + QUERY_NODE_DROP_TABLE_STMT, + QUERY_NODE_DROP_SUPER_TABLE_STMT, + QUERY_NODE_ALTER_TABLE_STMT, + QUERY_NODE_ALTER_SUPER_TABLE_STMT, + QUERY_NODE_CREATE_USER_STMT, + QUERY_NODE_ALTER_USER_STMT, + QUERY_NODE_DROP_USER_STMT, + QUERY_NODE_USE_DATABASE_STMT, + QUERY_NODE_CREATE_DNODE_STMT, + QUERY_NODE_DROP_DNODE_STMT, + QUERY_NODE_ALTER_DNODE_STMT, + QUERY_NODE_CREATE_INDEX_STMT, + QUERY_NODE_DROP_INDEX_STMT, + QUERY_NODE_CREATE_QNODE_STMT, + QUERY_NODE_DROP_QNODE_STMT, + QUERY_NODE_CREATE_BNODE_STMT, + QUERY_NODE_DROP_BNODE_STMT, + QUERY_NODE_CREATE_SNODE_STMT, + QUERY_NODE_DROP_SNODE_STMT, + QUERY_NODE_CREATE_MNODE_STMT, + QUERY_NODE_DROP_MNODE_STMT, + QUERY_NODE_CREATE_TOPIC_STMT, + QUERY_NODE_DROP_TOPIC_STMT, + QUERY_NODE_DROP_CGROUP_STMT, + QUERY_NODE_ALTER_LOCAL_STMT, + QUERY_NODE_EXPLAIN_STMT, + QUERY_NODE_DESCRIBE_STMT, + QUERY_NODE_RESET_QUERY_CACHE_STMT, + QUERY_NODE_COMPACT_DATABASE_STMT, + QUERY_NODE_CREATE_FUNCTION_STMT, + QUERY_NODE_DROP_FUNCTION_STMT, + QUERY_NODE_CREATE_STREAM_STMT, + QUERY_NODE_DROP_STREAM_STMT, + QUERY_NODE_BALANCE_VGROUP_STMT, + QUERY_NODE_MERGE_VGROUP_STMT, + QUERY_NODE_REDISTRIBUTE_VGROUP_STMT, + QUERY_NODE_SPLIT_VGROUP_STMT, + QUERY_NODE_SYNCDB_STMT, + QUERY_NODE_GRANT_STMT, + QUERY_NODE_REVOKE_STMT, + QUERY_NODE_SHOW_DNODES_STMT, + QUERY_NODE_SHOW_MNODES_STMT, + QUERY_NODE_SHOW_MODULES_STMT, + QUERY_NODE_SHOW_QNODES_STMT, + QUERY_NODE_SHOW_SNODES_STMT, + QUERY_NODE_SHOW_BNODES_STMT, + QUERY_NODE_SHOW_CLUSTER_STMT, + QUERY_NODE_SHOW_DATABASES_STMT, + QUERY_NODE_SHOW_FUNCTIONS_STMT, + QUERY_NODE_SHOW_INDEXES_STMT, + QUERY_NODE_SHOW_STABLES_STMT, + QUERY_NODE_SHOW_STREAMS_STMT, + QUERY_NODE_SHOW_TABLES_STMT, + QUERY_NODE_SHOW_TAGS_STMT, + QUERY_NODE_SHOW_USERS_STMT, + QUERY_NODE_SHOW_LICENCES_STMT, + QUERY_NODE_SHOW_VGROUPS_STMT, + QUERY_NODE_SHOW_TOPICS_STMT, + QUERY_NODE_SHOW_CONSUMERS_STMT, + QUERY_NODE_SHOW_CONNECTIONS_STMT, + QUERY_NODE_SHOW_QUERIES_STMT, + QUERY_NODE_SHOW_APPS_STMT, + QUERY_NODE_SHOW_VARIABLES_STMT, + QUERY_NODE_SHOW_DNODE_VARIABLES_STMT, + QUERY_NODE_SHOW_TRANSACTIONS_STMT, + QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT, + QUERY_NODE_SHOW_VNODES_STMT, + QUERY_NODE_SHOW_USER_PRIVILEGES_STMT, + QUERY_NODE_SHOW_CREATE_DATABASE_STMT, + QUERY_NODE_SHOW_CREATE_TABLE_STMT, + QUERY_NODE_SHOW_CREATE_STABLE_STMT, + QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT, + QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT, + QUERY_NODE_SHOW_SCORES_STMT, + QUERY_NODE_SHOW_TABLE_TAGS_STMT, + QUERY_NODE_KILL_CONNECTION_STMT, + QUERY_NODE_KILL_QUERY_STMT, + QUERY_NODE_KILL_TRANSACTION_STMT, + QUERY_NODE_DELETE_STMT, + QUERY_NODE_INSERT_STMT, + QUERY_NODE_QUERY, + QUERY_NODE_SHOW_DB_ALIVE_STMT, + QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT, + QUERY_NODE_BALANCE_VGROUP_LEADER_STMT, + QUERY_NODE_RESTORE_DNODE_STMT, + QUERY_NODE_RESTORE_QNODE_STMT, + QUERY_NODE_RESTORE_MNODE_STMT, + QUERY_NODE_RESTORE_VNODE_STMT, + QUERY_NODE_PAUSE_STREAM_STMT, + QUERY_NODE_RESUME_STREAM_STMT, + + // logic plan node + QUERY_NODE_LOGIC_PLAN_SCAN = 1000, + QUERY_NODE_LOGIC_PLAN_JOIN, + QUERY_NODE_LOGIC_PLAN_AGG, + QUERY_NODE_LOGIC_PLAN_PROJECT, + QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY, + QUERY_NODE_LOGIC_PLAN_EXCHANGE, + QUERY_NODE_LOGIC_PLAN_MERGE, + QUERY_NODE_LOGIC_PLAN_WINDOW, + QUERY_NODE_LOGIC_PLAN_FILL, + QUERY_NODE_LOGIC_PLAN_SORT, + QUERY_NODE_LOGIC_PLAN_PARTITION, + QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC, + QUERY_NODE_LOGIC_PLAN_INTERP_FUNC, + QUERY_NODE_LOGIC_PLAN_GROUP_CACHE, + QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL, + QUERY_NODE_LOGIC_SUBPLAN, + QUERY_NODE_LOGIC_PLAN, + + // physical plan node + QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100, + QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, + QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, + QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, + QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, + QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, + QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, + QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, + QUERY_NODE_PHYSICAL_PLAN_PROJECT, + QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, + QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, + QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, + QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, + QUERY_NODE_PHYSICAL_PLAN_MERGE, + QUERY_NODE_PHYSICAL_PLAN_SORT, + QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, + QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, + QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, + QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, + QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, + QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL, + QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL, + QUERY_NODE_PHYSICAL_PLAN_FILL, + QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, + QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, + QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, + QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION, + QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION, + QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, + QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, + QUERY_NODE_PHYSICAL_PLAN_PARTITION, + QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, + QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, + QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, + QUERY_NODE_PHYSICAL_PLAN_DISPATCH, + QUERY_NODE_PHYSICAL_PLAN_INSERT, + QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT, + QUERY_NODE_PHYSICAL_PLAN_DELETE, + QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, + QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, + QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, + QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT, + QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, + QUERY_NODE_PHYSICAL_SUBPLAN, + QUERY_NODE_PHYSICAL_PLAN +} ENodeType; + + typedef struct { int32_t vgId; char* dbFName; @@ -1842,6 +2050,11 @@ typedef struct { int32_t tversion; } SResReadyRsp; + +typedef struct SOperatorParam { + SArray* pOpParams; //SArray +} SOperatorParam; + typedef struct SOperatorSpecParam { int32_t opType; void* value; @@ -1851,9 +2064,6 @@ typedef struct SOperatorBaseParam { SOperatorParam* pChild; } SOperatorBaseParam; -typedef struct SOperatorParam { - SArray* pOpParams; //SArray -} SOperatorParam; typedef struct STableScanOperatorParam { SOperatorParam* pChild; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 5cc8eec39e..26d46638d8 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -134,6 +134,8 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI bool qIsDynamicExecTask(qTaskInfo_t tinfo); +void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam); + /** * Create the exec task object according to task json * @param readHandle diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 59840b5a1e..6e36ea7514 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "tdef.h" +#include "tmsg.h" #define nodeType(nodeptr) (((const SNode*)(nodeptr))->type) #define setNodeType(nodeptr, nodetype) (((SNode*)(nodeptr))->type = (nodetype)) @@ -78,213 +79,6 @@ extern "C" { (list) = NULL; \ } while (0) -typedef enum ENodeType { - // Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN, - // VALUE, OPERATOR, FUNCTION and so on. - QUERY_NODE_COLUMN = 1, - QUERY_NODE_VALUE, - QUERY_NODE_OPERATOR, - QUERY_NODE_LOGIC_CONDITION, - QUERY_NODE_FUNCTION, - QUERY_NODE_REAL_TABLE, - QUERY_NODE_TEMP_TABLE, - QUERY_NODE_JOIN_TABLE, - QUERY_NODE_GROUPING_SET, - QUERY_NODE_ORDER_BY_EXPR, - QUERY_NODE_LIMIT, - QUERY_NODE_STATE_WINDOW, - QUERY_NODE_SESSION_WINDOW, - QUERY_NODE_INTERVAL_WINDOW, - QUERY_NODE_NODE_LIST, - QUERY_NODE_FILL, - QUERY_NODE_RAW_EXPR, // Only be used in parser module. - QUERY_NODE_TARGET, - QUERY_NODE_DATABLOCK_DESC, - QUERY_NODE_SLOT_DESC, - QUERY_NODE_COLUMN_DEF, - QUERY_NODE_DOWNSTREAM_SOURCE, - QUERY_NODE_DATABASE_OPTIONS, - QUERY_NODE_TABLE_OPTIONS, - QUERY_NODE_INDEX_OPTIONS, - QUERY_NODE_EXPLAIN_OPTIONS, - QUERY_NODE_STREAM_OPTIONS, - QUERY_NODE_LEFT_VALUE, - QUERY_NODE_COLUMN_REF, - QUERY_NODE_WHEN_THEN, - QUERY_NODE_CASE_WHEN, - QUERY_NODE_EVENT_WINDOW, - - // Statement nodes are used in parser and planner module. - QUERY_NODE_SET_OPERATOR = 100, - QUERY_NODE_SELECT_STMT, - QUERY_NODE_VNODE_MODIFY_STMT, - QUERY_NODE_CREATE_DATABASE_STMT, - QUERY_NODE_DROP_DATABASE_STMT, - QUERY_NODE_ALTER_DATABASE_STMT, - QUERY_NODE_FLUSH_DATABASE_STMT, - QUERY_NODE_TRIM_DATABASE_STMT, - QUERY_NODE_CREATE_TABLE_STMT, - QUERY_NODE_CREATE_SUBTABLE_CLAUSE, - QUERY_NODE_CREATE_MULTI_TABLES_STMT, - QUERY_NODE_DROP_TABLE_CLAUSE, - QUERY_NODE_DROP_TABLE_STMT, - QUERY_NODE_DROP_SUPER_TABLE_STMT, - QUERY_NODE_ALTER_TABLE_STMT, - QUERY_NODE_ALTER_SUPER_TABLE_STMT, - QUERY_NODE_CREATE_USER_STMT, - QUERY_NODE_ALTER_USER_STMT, - QUERY_NODE_DROP_USER_STMT, - QUERY_NODE_USE_DATABASE_STMT, - QUERY_NODE_CREATE_DNODE_STMT, - QUERY_NODE_DROP_DNODE_STMT, - QUERY_NODE_ALTER_DNODE_STMT, - QUERY_NODE_CREATE_INDEX_STMT, - QUERY_NODE_DROP_INDEX_STMT, - QUERY_NODE_CREATE_QNODE_STMT, - QUERY_NODE_DROP_QNODE_STMT, - QUERY_NODE_CREATE_BNODE_STMT, - QUERY_NODE_DROP_BNODE_STMT, - QUERY_NODE_CREATE_SNODE_STMT, - QUERY_NODE_DROP_SNODE_STMT, - QUERY_NODE_CREATE_MNODE_STMT, - QUERY_NODE_DROP_MNODE_STMT, - QUERY_NODE_CREATE_TOPIC_STMT, - QUERY_NODE_DROP_TOPIC_STMT, - QUERY_NODE_DROP_CGROUP_STMT, - QUERY_NODE_ALTER_LOCAL_STMT, - QUERY_NODE_EXPLAIN_STMT, - QUERY_NODE_DESCRIBE_STMT, - QUERY_NODE_RESET_QUERY_CACHE_STMT, - QUERY_NODE_COMPACT_DATABASE_STMT, - QUERY_NODE_CREATE_FUNCTION_STMT, - QUERY_NODE_DROP_FUNCTION_STMT, - QUERY_NODE_CREATE_STREAM_STMT, - QUERY_NODE_DROP_STREAM_STMT, - QUERY_NODE_BALANCE_VGROUP_STMT, - QUERY_NODE_MERGE_VGROUP_STMT, - QUERY_NODE_REDISTRIBUTE_VGROUP_STMT, - QUERY_NODE_SPLIT_VGROUP_STMT, - QUERY_NODE_SYNCDB_STMT, - QUERY_NODE_GRANT_STMT, - QUERY_NODE_REVOKE_STMT, - QUERY_NODE_SHOW_DNODES_STMT, - QUERY_NODE_SHOW_MNODES_STMT, - QUERY_NODE_SHOW_MODULES_STMT, - QUERY_NODE_SHOW_QNODES_STMT, - QUERY_NODE_SHOW_SNODES_STMT, - QUERY_NODE_SHOW_BNODES_STMT, - QUERY_NODE_SHOW_CLUSTER_STMT, - QUERY_NODE_SHOW_DATABASES_STMT, - QUERY_NODE_SHOW_FUNCTIONS_STMT, - QUERY_NODE_SHOW_INDEXES_STMT, - QUERY_NODE_SHOW_STABLES_STMT, - QUERY_NODE_SHOW_STREAMS_STMT, - QUERY_NODE_SHOW_TABLES_STMT, - QUERY_NODE_SHOW_TAGS_STMT, - QUERY_NODE_SHOW_USERS_STMT, - QUERY_NODE_SHOW_LICENCES_STMT, - QUERY_NODE_SHOW_VGROUPS_STMT, - QUERY_NODE_SHOW_TOPICS_STMT, - QUERY_NODE_SHOW_CONSUMERS_STMT, - QUERY_NODE_SHOW_CONNECTIONS_STMT, - QUERY_NODE_SHOW_QUERIES_STMT, - QUERY_NODE_SHOW_APPS_STMT, - QUERY_NODE_SHOW_VARIABLES_STMT, - QUERY_NODE_SHOW_DNODE_VARIABLES_STMT, - QUERY_NODE_SHOW_TRANSACTIONS_STMT, - QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT, - QUERY_NODE_SHOW_VNODES_STMT, - QUERY_NODE_SHOW_USER_PRIVILEGES_STMT, - QUERY_NODE_SHOW_CREATE_DATABASE_STMT, - QUERY_NODE_SHOW_CREATE_TABLE_STMT, - QUERY_NODE_SHOW_CREATE_STABLE_STMT, - QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT, - QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT, - QUERY_NODE_SHOW_SCORES_STMT, - QUERY_NODE_SHOW_TABLE_TAGS_STMT, - QUERY_NODE_KILL_CONNECTION_STMT, - QUERY_NODE_KILL_QUERY_STMT, - QUERY_NODE_KILL_TRANSACTION_STMT, - QUERY_NODE_DELETE_STMT, - QUERY_NODE_INSERT_STMT, - QUERY_NODE_QUERY, - QUERY_NODE_SHOW_DB_ALIVE_STMT, - QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT, - QUERY_NODE_BALANCE_VGROUP_LEADER_STMT, - QUERY_NODE_RESTORE_DNODE_STMT, - QUERY_NODE_RESTORE_QNODE_STMT, - QUERY_NODE_RESTORE_MNODE_STMT, - QUERY_NODE_RESTORE_VNODE_STMT, - QUERY_NODE_PAUSE_STREAM_STMT, - QUERY_NODE_RESUME_STREAM_STMT, - - // logic plan node - QUERY_NODE_LOGIC_PLAN_SCAN = 1000, - QUERY_NODE_LOGIC_PLAN_JOIN, - QUERY_NODE_LOGIC_PLAN_AGG, - QUERY_NODE_LOGIC_PLAN_PROJECT, - QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY, - QUERY_NODE_LOGIC_PLAN_EXCHANGE, - QUERY_NODE_LOGIC_PLAN_MERGE, - QUERY_NODE_LOGIC_PLAN_WINDOW, - QUERY_NODE_LOGIC_PLAN_FILL, - QUERY_NODE_LOGIC_PLAN_SORT, - QUERY_NODE_LOGIC_PLAN_PARTITION, - QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC, - QUERY_NODE_LOGIC_PLAN_INTERP_FUNC, - QUERY_NODE_LOGIC_PLAN_GROUP_CACHE, - QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL, - QUERY_NODE_LOGIC_SUBPLAN, - QUERY_NODE_LOGIC_PLAN, - - // physical plan node - QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100, - QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, - QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, - QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, - QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, - QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, - QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, - QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, - QUERY_NODE_PHYSICAL_PLAN_PROJECT, - QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, - QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, - QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, - QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, - QUERY_NODE_PHYSICAL_PLAN_MERGE, - QUERY_NODE_PHYSICAL_PLAN_SORT, - QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, - QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, - QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, - QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, - QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, - QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL, - QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL, - QUERY_NODE_PHYSICAL_PLAN_FILL, - QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, - QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, - QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, - QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION, - QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION, - QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, - QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, - QUERY_NODE_PHYSICAL_PLAN_PARTITION, - QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, - QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, - QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, - QUERY_NODE_PHYSICAL_PLAN_DISPATCH, - QUERY_NODE_PHYSICAL_PLAN_INSERT, - QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT, - QUERY_NODE_PHYSICAL_PLAN_DELETE, - QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, - QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, - QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, - QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT, - QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, - QUERY_NODE_PHYSICAL_SUBPLAN, - QUERY_NODE_PHYSICAL_PLAN -} ENodeType; - /** * The first field of a node of any type is guaranteed to be the ENodeType. * Hence the type of any node can be gotten by casting it to SNode. diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 74bca00cb8..6ef6ccbb4d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5530,7 +5530,7 @@ int32_t tSerializeSOperatorParam(SEncoder* pEncoder, SOperatorParam* pOpParam) { } int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam, int32_t specNum) { - pOpParam->pOpParams = taosArrayInit(specNum, sizeof(SOperatorSpecParam)) + pOpParam->pOpParams = taosArrayInit(specNum, sizeof(SOperatorSpecParam)); if (NULL == pOpParam->pOpParams) return -1; SOperatorSpecParam specParam; diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index b828f5abf2..afd9f52823 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -65,7 +65,7 @@ typedef struct SGroupColsInfo { } SGroupColsInfo; typedef struct SGcSessionCtx { - SOperatorInfo* pDownstream; + int32_t downstreamIdx; bool cacheHit; bool needCache; SGcBlkBufInfo* pLastBlk; diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 59e57e6060..ec27f38254 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -65,6 +65,7 @@ typedef struct SOperatorInfo { uint16_t operatorType; int16_t resultDataBlockId; bool blocking; // block operator or not + bool transparent; uint8_t status; // denote if current operator is completed char* name; // name, for debug purpose void* info; // extension attribution @@ -165,7 +166,9 @@ void setOperatorCompleted(SOperatorInfo* pOperator); void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, void* pInfo, SExecTaskInfo* pTaskInfo); int32_t optrDefaultBufFn(SOperatorInfo* pOperator); -SSDataBlock* optrDefaultGetExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam); +SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam); +SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx); +int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx); SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser, const char* dbname); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 9083608e83..036023fc3f 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -478,7 +478,9 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas req.queryId = pTaskInfo->id.queryId; req.execId = pSource->execId; if (pDataInfo->pSrcUidList) { - int32_t code = buildTableScanOperatorParam(&req.opParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType); + int32_t code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType); + taosArrayDestroy(pDataInfo->pSrcUidList); + pDataInfo->pSrcUidList = NULL; if (TSDB_CODE_SUCCESS != code) { pTaskInfo->code = code; taosMemoryFree(pWrapper); @@ -766,6 +768,8 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { dataInfo.pSrcUidList = taosArrayDup(pParam->uidList, NULL); dataInfo.srcOpType = pParam->srcOpType; taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo); + + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index bb715ed959..6da37b6029 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -516,6 +516,15 @@ bool qIsDynamicExecTask(qTaskInfo_t tinfo) { return ((SExecTaskInfo*)tinfo)->dynamicTask; } +void destroyOperatorParam(SOperatorParam* pParam) { + if (NULL == pParam) { + return; + } + + //TODO +} + + void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) { destroyOperatorParam(((SExecTaskInfo*)tinfo)->pOpParam); ((SExecTaskInfo*)tinfo)->pOpParam = pParam; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index b882145d92..06062faa20 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -120,16 +120,6 @@ static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) { return addPageToGroupCacheBuf(pInfo->pBlkBufs); } -static int32_t initGroupCacheDownstreamInfo(SGroupCachePhysiNode* pPhyciNode, SOperatorInfo** pDownstream, int32_t numOfDownstream, SGcDownstreamInfo* pInfo) { - pInfo->ppDownStream = taosMemoryMalloc(numOfDownstream * POINTER_BYTES); - if (NULL == pInfo->ppDownStream) { - return TSDB_CODE_OUT_OF_MEMORY; - } - memcpy(pInfo->ppDownStream, pDownstream, numOfDownstream * POINTER_BYTES); - - return TSDB_CODE_SUCCESS; -} - static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperatorParam* pParam, SGcSessionCtx** ppSession) { SGcSessionCtx ctx = {0}; SGroupCacheOperatorInfo* pGCache = pOperator->info; @@ -138,7 +128,7 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperato ctx.cacheHit = true; ctx.pLastBlk = pGroup->blks; } else { - ctx.pDownstream = pOperator->pDownstream[pParam->downstreamIdx]; + ctx.downstreamIdx = pParam->downstreamIdx; ctx.needCache = pParam->needCache; } @@ -210,7 +200,7 @@ SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) { } while (true) { - SSDataBlock* pBlock = pSession->pDownstream->fpSet.getNextExtFn(pSession->pDownstream, param); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, pSession->downstreamIdx); if (NULL == pBlock) { setCurrentGroupCacheDone(pOperator); break; @@ -237,6 +227,8 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t goto _error; } + pOperator->transparent = true; + setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo); code = initGroupColsInfo(&pInfo->groupColsInfo, pPhyciNode->grpColsMayBeNull, pPhyciNode->pGroupCols); diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 1e3b78719c..d9f96ae83f 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -639,7 +639,7 @@ static int32_t buildHJoinKeyHash(struct SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; while (true) { - pBlock = getNextBlockFromDownstream(pOperator, pJoin->pBuild->downStreamIdx) + pBlock = getNextBlockFromDownstream(pOperator, pJoin->pBuild->downStreamIdx); if (NULL == pBlock) { break; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 653503fb39..ad9238c833 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -43,6 +43,7 @@ typedef struct SMJoinOperatorInfo { SSDataBlock* pRes; int32_t joinType; int32_t inputOrder; + int16_t downstreamResBlkId[2]; SSDataBlock* pLeft; int32_t leftPos; @@ -72,8 +73,7 @@ static void destroyMergeJoinOperator(void* param); static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr); -static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num, - SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) { +static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) { SNode* pPrimKeyCond = pJoinNode->pPrimKeyCond; if (nodeType(pPrimKeyCond) != QUERY_NODE_OPERATOR) { qError("not support this in join operator, %s", idStr); @@ -89,13 +89,13 @@ static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDow leftTsCol = col1; rightTsCol = col2; } else { - if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) { - ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId); + if (col1->dataBlockId == pInfo->downstreamResBlkId[0]) { + ASSERT(col2->dataBlockId == pInfo->downstreamResBlkId[1]); leftTsCol = col1; rightTsCol = col2; } else { - ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId); - ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId); + ASSERT(col1->dataBlockId == pInfo->downstreamResBlkId[1]); + ASSERT(col2->dataBlockId == pInfo->downstreamResBlkId[0]); leftTsCol = col2; rightTsCol = col1; } @@ -104,11 +104,11 @@ static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDow setJoinColumnInfo(&pInfo->rightCol, rightTsCol); } -static void extractEqualOnCondColsFromOper(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstreams, SOperatorNode* pOperNode, +static void extractEqualOnCondColsFromOper(SMJoinOperatorInfo* pInfo, SOperatorNode* pOperNode, SColumn* pLeft, SColumn* pRight) { SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft; SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight; - if (pLeftNode->dataBlockId == pRightNode->dataBlockId || pLeftNode->dataBlockId == pDownstreams[0]->resultDataBlockId) { + if (pLeftNode->dataBlockId == pRightNode->dataBlockId || pLeftNode->dataBlockId == pInfo->downstreamResBlkId[0]) { *pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft); *pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight); } else { @@ -117,7 +117,7 @@ static void extractEqualOnCondColsFromOper(SMJoinOperatorInfo* pInfo, SOperatorI } } -static void extractEqualOnCondCols(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownStream, SNode* pEqualOnCondNode, +static void extractEqualOnCondCols(SMJoinOperatorInfo* pInfo, SNode* pEqualOnCondNode, SArray* leftTagEqCols, SArray* rightTagEqCols) { SColumn left = {0}; SColumn right = {0}; @@ -125,7 +125,7 @@ static void extractEqualOnCondCols(SMJoinOperatorInfo* pInfo, SOperatorInfo** pD SNode* pNode = NULL; FOREACH(pNode, ((SLogicConditionNode*)pEqualOnCondNode)->pParameterList) { SOperatorNode* pOperNode = (SOperatorNode*)pNode; - extractEqualOnCondColsFromOper(pInfo, pDownStream, pOperNode, &left, &right); + extractEqualOnCondColsFromOper(pInfo, pOperNode, &left, &right); taosArrayPush(leftTagEqCols, &left); taosArrayPush(rightTagEqCols, &right); } @@ -134,7 +134,7 @@ static void extractEqualOnCondCols(SMJoinOperatorInfo* pInfo, SOperatorInfo** pD if (nodeType(pEqualOnCondNode) == QUERY_NODE_OPERATOR) { SOperatorNode* pOperNode = (SOperatorNode*)pEqualOnCondNode; - extractEqualOnCondColsFromOper(pInfo, pDownStream, pOperNode, &left, &right); + extractEqualOnCondColsFromOper(pInfo, pOperNode, &left, &right); taosArrayPush(leftTagEqCols, &left); taosArrayPush(rightTagEqCols, &right); } @@ -198,17 +198,41 @@ static int32_t fillKeyBufFromTagCols(SArray* pCols, SSDataBlock* pBlock, int32_t return (int32_t)(pStart - (char*)pKey); } +SOperatorInfo** buildMergeJoinDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) { + SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES); + if (p) { + p[0] = pDownstream[0]; + p[1] = pDownstream[0]; + pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(pDownstream[0], 0); + pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(pDownstream[1], 1); + } + + return p; +} + + SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - + bool newDownstreams = false; + int32_t code = TSDB_CODE_SUCCESS; if (pOperator == NULL || pInfo == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } + if (1 == numOfDownstream) { + newDownstreams = true; + pDownstream = buildMergeJoinDownstreams(pInfo, pDownstream); + if (NULL == pDownstream) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + numOfDownstream = 2; + } + int32_t numOfCols = 0; pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); @@ -220,7 +244,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.numOfExprs = numOfCols; - extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode, GET_TASKID(pTaskInfo)); + extractTimeCondition(pInfo, pJoinNode, GET_TASKID(pTaskInfo)); if (pJoinNode->pOtherOnCond != NULL && pJoinNode->node.pConditions != NULL) { pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); @@ -265,7 +289,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t if (pInfo->pColEqualOnConditions != NULL) { pInfo->leftEqOnCondCols = taosArrayInit(4, sizeof(SColumn)); pInfo->rightEqOnCondCols = taosArrayInit(4, sizeof(SColumn)); - extractEqualOnCondCols(pInfo, pDownstream, pInfo->pColEqualOnConditions, pInfo->leftEqOnCondCols, pInfo->rightEqOnCondCols); + extractEqualOnCondCols(pInfo, pInfo->pColEqualOnConditions, pInfo->leftEqOnCondCols, pInfo->rightEqOnCondCols); initTagColskeyBuf(&pInfo->leftEqOnCondKeyLen, &pInfo->leftEqOnCondKeyBuf, pInfo->leftEqOnCondCols); initTagColskeyBuf(&pInfo->rightEqOnCondKeyLen, &pInfo->rightEqOnCondKeyBuf, pInfo->rightEqOnCondCols); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -283,6 +307,9 @@ _error: if (pInfo != NULL) { destroyMergeJoinOperator(pInfo); } + if (newDownstreams) { + taosMemoryFree(pDownstream); + } taosMemoryFree(pOperator); pTaskInfo->code = code; diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index ba2f4aa2a5..eaf51bf238 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -94,16 +94,6 @@ int32_t optrDefaultBufFn(SOperatorInfo* pOperator) { } } -SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { - pOperator->pOperatorParam = getOperatorParam(pOperator->operatorType, pParam, 0); - int32_t code = setOperatorParams(pOperator, pOperator->pOperatorParam ? pOperator->pOperatorParam->pChild : pParam); - if (TSDB_CODE_SUCCESS != code) { - pOperator->pTaskInfo->code = code; - T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); - } - return pOperator->fpSet.getNextFn(pOperator); -} - static int64_t getQuerySupportBufSize(size_t numOfTables) { size_t s1 = sizeof(STableQueryInfo); // size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb @@ -656,7 +646,7 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pPara return TSDB_CODE_SUCCESS; } -FORCE_INLINE SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) { +SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) { if (pOperator->pDownstreamParams && pOperator->pDownstreamParams[idx]) { return pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pOperator->pDownstreamParams[idx]); } @@ -664,10 +654,22 @@ FORCE_INLINE SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOper return pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]); } -void destroyOperatorParam(SOperatorParam* pParam) { - if (NULL == pParam) { - return; + +SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { + pOperator->pOperatorParam = getOperatorParam(pOperator->operatorType, pParam, 0); + int32_t code = setOperatorParams(pOperator, pOperator->pOperatorParam ? pOperator->pOperatorParam->pChild : pParam); + if (TSDB_CODE_SUCCESS != code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); } + return pOperator->fpSet.getNextFn(pOperator); +} + +int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx) { + if (pOperator->transparent) { + return getOperatorResultBlockId(pOperator->pDownstream[idx], 0); + } + return pOperator->resultDataBlockId; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 11c1536c62..26ef9aae4b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -789,14 +789,17 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t code = 0; STableListInfo* pListInfo = pInfo->base.pTableListInfo; - int32_t num = taosArrayGetSize(pOperator->pOperatorParam->pUidList); + STableScanOperatorParam* pParam = (STableScanOperatorParam*)pOperator->pOperatorParam; + int32_t num = taosArrayGetSize(pParam->pUidList); if (num <= 0) { qError("empty table scan uid list"); return TSDB_CODE_INVALID_PARA; } + qDebug("add total %d dynamic tables to scan", num); + for (int32_t i = 0; i < num; ++i) { - uint64_t* pUid = taosArrayGet(pOperator->pOperatorParam->pUidList, i); + uint64_t* pUid = taosArrayGet(pParam->pUidList, i); STableKeyInfo info = {.uid = *pUid, .groupId = 0}; void* p = taosArrayPush(pListInfo->pTableList, &info); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index baba3480d5..433fd02943 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -5259,7 +5259,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); - destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); pInfo->statestore = pTaskInfo->storageAPI.stateStore; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index eac96faf5d..4f43f31226 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -396,6 +396,7 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type); void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch); int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch); void qwFreeTaskCtx(SQWTaskCtx *ctx); +int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx); void qwDbgDumpMgmtInfo(SQWorker *mgmt); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 3a9bfbc81e..303eaf38ff 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -401,6 +401,7 @@ _return: QW_RET(code); } + int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) { char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; QW_SET_QTID(id, qId, tId, eId); @@ -416,9 +417,9 @@ int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) { return TSDB_CODE_SUCCESS; } - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); + QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC)); - qwHandleTaskComplete(QW_FPARAMS(), ctx); + QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 54fabc00af..06becf99d2 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -95,6 +95,7 @@ static void freeItem(void *param) { taosMemoryFree(pInfo->verboseInfo); } + int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { qTaskInfo_t taskHandle = ctx->taskHandle; @@ -492,7 +493,7 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg return TSDB_CODE_ACTION_IN_PROGRESS; } - qUpdateOperatorParam(ctx->taskHandle); + qUpdateOperatorParam(ctx->taskHandle, qwMsg->msg); atomic_store_8((int8_t *)&ctx->queryInQueue, 1); @@ -755,7 +756,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { //qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true); ctx->level = plan->level; - ctx->dynamicTask = qIsDynamicExecTask(pTaskInfo) + ctx->dynamicTask = qIsDynamicExecTask(pTaskInfo); atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->sinkHandle, sinkHandle);