fix: compile issue and merge join crash issue

This commit is contained in:
dapan1121 2023-07-06 10:52:28 +08:00
parent 4cb849267a
commit b371b7e6be
17 changed files with 310 additions and 262 deletions

View File

@ -213,6 +213,214 @@ typedef enum _mgmt_table {
#define TD_REQ_FROM_APP 0
#define TD_REQ_FROM_TAOX 1
typedef enum ENodeType {
// Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN,
// VALUE, OPERATOR, FUNCTION and so on.
QUERY_NODE_COLUMN = 1,
QUERY_NODE_VALUE,
QUERY_NODE_OPERATOR,
QUERY_NODE_LOGIC_CONDITION,
QUERY_NODE_FUNCTION,
QUERY_NODE_REAL_TABLE,
QUERY_NODE_TEMP_TABLE,
QUERY_NODE_JOIN_TABLE,
QUERY_NODE_GROUPING_SET,
QUERY_NODE_ORDER_BY_EXPR,
QUERY_NODE_LIMIT,
QUERY_NODE_STATE_WINDOW,
QUERY_NODE_SESSION_WINDOW,
QUERY_NODE_INTERVAL_WINDOW,
QUERY_NODE_NODE_LIST,
QUERY_NODE_FILL,
QUERY_NODE_RAW_EXPR, // Only be used in parser module.
QUERY_NODE_TARGET,
QUERY_NODE_DATABLOCK_DESC,
QUERY_NODE_SLOT_DESC,
QUERY_NODE_COLUMN_DEF,
QUERY_NODE_DOWNSTREAM_SOURCE,
QUERY_NODE_DATABASE_OPTIONS,
QUERY_NODE_TABLE_OPTIONS,
QUERY_NODE_INDEX_OPTIONS,
QUERY_NODE_EXPLAIN_OPTIONS,
QUERY_NODE_STREAM_OPTIONS,
QUERY_NODE_LEFT_VALUE,
QUERY_NODE_COLUMN_REF,
QUERY_NODE_WHEN_THEN,
QUERY_NODE_CASE_WHEN,
QUERY_NODE_EVENT_WINDOW,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR = 100,
QUERY_NODE_SELECT_STMT,
QUERY_NODE_VNODE_MODIFY_STMT,
QUERY_NODE_CREATE_DATABASE_STMT,
QUERY_NODE_DROP_DATABASE_STMT,
QUERY_NODE_ALTER_DATABASE_STMT,
QUERY_NODE_FLUSH_DATABASE_STMT,
QUERY_NODE_TRIM_DATABASE_STMT,
QUERY_NODE_CREATE_TABLE_STMT,
QUERY_NODE_CREATE_SUBTABLE_CLAUSE,
QUERY_NODE_CREATE_MULTI_TABLES_STMT,
QUERY_NODE_DROP_TABLE_CLAUSE,
QUERY_NODE_DROP_TABLE_STMT,
QUERY_NODE_DROP_SUPER_TABLE_STMT,
QUERY_NODE_ALTER_TABLE_STMT,
QUERY_NODE_ALTER_SUPER_TABLE_STMT,
QUERY_NODE_CREATE_USER_STMT,
QUERY_NODE_ALTER_USER_STMT,
QUERY_NODE_DROP_USER_STMT,
QUERY_NODE_USE_DATABASE_STMT,
QUERY_NODE_CREATE_DNODE_STMT,
QUERY_NODE_DROP_DNODE_STMT,
QUERY_NODE_ALTER_DNODE_STMT,
QUERY_NODE_CREATE_INDEX_STMT,
QUERY_NODE_DROP_INDEX_STMT,
QUERY_NODE_CREATE_QNODE_STMT,
QUERY_NODE_DROP_QNODE_STMT,
QUERY_NODE_CREATE_BNODE_STMT,
QUERY_NODE_DROP_BNODE_STMT,
QUERY_NODE_CREATE_SNODE_STMT,
QUERY_NODE_DROP_SNODE_STMT,
QUERY_NODE_CREATE_MNODE_STMT,
QUERY_NODE_DROP_MNODE_STMT,
QUERY_NODE_CREATE_TOPIC_STMT,
QUERY_NODE_DROP_TOPIC_STMT,
QUERY_NODE_DROP_CGROUP_STMT,
QUERY_NODE_ALTER_LOCAL_STMT,
QUERY_NODE_EXPLAIN_STMT,
QUERY_NODE_DESCRIBE_STMT,
QUERY_NODE_RESET_QUERY_CACHE_STMT,
QUERY_NODE_COMPACT_DATABASE_STMT,
QUERY_NODE_CREATE_FUNCTION_STMT,
QUERY_NODE_DROP_FUNCTION_STMT,
QUERY_NODE_CREATE_STREAM_STMT,
QUERY_NODE_DROP_STREAM_STMT,
QUERY_NODE_BALANCE_VGROUP_STMT,
QUERY_NODE_MERGE_VGROUP_STMT,
QUERY_NODE_REDISTRIBUTE_VGROUP_STMT,
QUERY_NODE_SPLIT_VGROUP_STMT,
QUERY_NODE_SYNCDB_STMT,
QUERY_NODE_GRANT_STMT,
QUERY_NODE_REVOKE_STMT,
QUERY_NODE_SHOW_DNODES_STMT,
QUERY_NODE_SHOW_MNODES_STMT,
QUERY_NODE_SHOW_MODULES_STMT,
QUERY_NODE_SHOW_QNODES_STMT,
QUERY_NODE_SHOW_SNODES_STMT,
QUERY_NODE_SHOW_BNODES_STMT,
QUERY_NODE_SHOW_CLUSTER_STMT,
QUERY_NODE_SHOW_DATABASES_STMT,
QUERY_NODE_SHOW_FUNCTIONS_STMT,
QUERY_NODE_SHOW_INDEXES_STMT,
QUERY_NODE_SHOW_STABLES_STMT,
QUERY_NODE_SHOW_STREAMS_STMT,
QUERY_NODE_SHOW_TABLES_STMT,
QUERY_NODE_SHOW_TAGS_STMT,
QUERY_NODE_SHOW_USERS_STMT,
QUERY_NODE_SHOW_LICENCES_STMT,
QUERY_NODE_SHOW_VGROUPS_STMT,
QUERY_NODE_SHOW_TOPICS_STMT,
QUERY_NODE_SHOW_CONSUMERS_STMT,
QUERY_NODE_SHOW_CONNECTIONS_STMT,
QUERY_NODE_SHOW_QUERIES_STMT,
QUERY_NODE_SHOW_APPS_STMT,
QUERY_NODE_SHOW_VARIABLES_STMT,
QUERY_NODE_SHOW_DNODE_VARIABLES_STMT,
QUERY_NODE_SHOW_TRANSACTIONS_STMT,
QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT,
QUERY_NODE_SHOW_VNODES_STMT,
QUERY_NODE_SHOW_USER_PRIVILEGES_STMT,
QUERY_NODE_SHOW_CREATE_DATABASE_STMT,
QUERY_NODE_SHOW_CREATE_TABLE_STMT,
QUERY_NODE_SHOW_CREATE_STABLE_STMT,
QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT,
QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT,
QUERY_NODE_SHOW_SCORES_STMT,
QUERY_NODE_SHOW_TABLE_TAGS_STMT,
QUERY_NODE_KILL_CONNECTION_STMT,
QUERY_NODE_KILL_QUERY_STMT,
QUERY_NODE_KILL_TRANSACTION_STMT,
QUERY_NODE_DELETE_STMT,
QUERY_NODE_INSERT_STMT,
QUERY_NODE_QUERY,
QUERY_NODE_SHOW_DB_ALIVE_STMT,
QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT,
QUERY_NODE_BALANCE_VGROUP_LEADER_STMT,
QUERY_NODE_RESTORE_DNODE_STMT,
QUERY_NODE_RESTORE_QNODE_STMT,
QUERY_NODE_RESTORE_MNODE_STMT,
QUERY_NODE_RESTORE_VNODE_STMT,
QUERY_NODE_PAUSE_STREAM_STMT,
QUERY_NODE_RESUME_STREAM_STMT,
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN = 1000,
QUERY_NODE_LOGIC_PLAN_JOIN,
QUERY_NODE_LOGIC_PLAN_AGG,
QUERY_NODE_LOGIC_PLAN_PROJECT,
QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY,
QUERY_NODE_LOGIC_PLAN_EXCHANGE,
QUERY_NODE_LOGIC_PLAN_MERGE,
QUERY_NODE_LOGIC_PLAN_WINDOW,
QUERY_NODE_LOGIC_PLAN_FILL,
QUERY_NODE_LOGIC_PLAN_SORT,
QUERY_NODE_LOGIC_PLAN_PARTITION,
QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC,
QUERY_NODE_LOGIC_PLAN_INTERP_FUNC,
QUERY_NODE_LOGIC_PLAN_GROUP_CACHE,
QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL,
QUERY_NODE_LOGIC_SUBPLAN,
QUERY_NODE_LOGIC_PLAN,
// physical plan node
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN,
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN,
QUERY_NODE_PHYSICAL_PLAN_PROJECT,
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN,
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN,
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
QUERY_NODE_PHYSICAL_PLAN_MERGE,
QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT,
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_FILL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION,
QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE,
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE,
QUERY_NODE_PHYSICAL_PLAN_PARTITION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION,
QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC,
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
QUERY_NODE_PHYSICAL_PLAN_INSERT,
QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT,
QUERY_NODE_PHYSICAL_PLAN_DELETE,
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE,
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL,
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN,
QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT,
QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT,
QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN
} ENodeType;
typedef struct {
int32_t vgId;
char* dbFName;
@ -1842,6 +2050,11 @@ typedef struct {
int32_t tversion;
} SResReadyRsp;
typedef struct SOperatorParam {
SArray* pOpParams; //SArray<SOperatorSpecParam>
} SOperatorParam;
typedef struct SOperatorSpecParam {
int32_t opType;
void* value;
@ -1851,9 +2064,6 @@ typedef struct SOperatorBaseParam {
SOperatorParam* pChild;
} SOperatorBaseParam;
typedef struct SOperatorParam {
SArray* pOpParams; //SArray<SOperatorSpecParam>
} SOperatorParam;
typedef struct STableScanOperatorParam {
SOperatorParam* pChild;

View File

@ -134,6 +134,8 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI
bool qIsDynamicExecTask(qTaskInfo_t tinfo);
void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam);
/**
* Create the exec task object according to task json
* @param readHandle

View File

@ -21,6 +21,7 @@ extern "C" {
#endif
#include "tdef.h"
#include "tmsg.h"
#define nodeType(nodeptr) (((const SNode*)(nodeptr))->type)
#define setNodeType(nodeptr, nodetype) (((SNode*)(nodeptr))->type = (nodetype))
@ -78,213 +79,6 @@ extern "C" {
(list) = NULL; \
} while (0)
typedef enum ENodeType {
// Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN,
// VALUE, OPERATOR, FUNCTION and so on.
QUERY_NODE_COLUMN = 1,
QUERY_NODE_VALUE,
QUERY_NODE_OPERATOR,
QUERY_NODE_LOGIC_CONDITION,
QUERY_NODE_FUNCTION,
QUERY_NODE_REAL_TABLE,
QUERY_NODE_TEMP_TABLE,
QUERY_NODE_JOIN_TABLE,
QUERY_NODE_GROUPING_SET,
QUERY_NODE_ORDER_BY_EXPR,
QUERY_NODE_LIMIT,
QUERY_NODE_STATE_WINDOW,
QUERY_NODE_SESSION_WINDOW,
QUERY_NODE_INTERVAL_WINDOW,
QUERY_NODE_NODE_LIST,
QUERY_NODE_FILL,
QUERY_NODE_RAW_EXPR, // Only be used in parser module.
QUERY_NODE_TARGET,
QUERY_NODE_DATABLOCK_DESC,
QUERY_NODE_SLOT_DESC,
QUERY_NODE_COLUMN_DEF,
QUERY_NODE_DOWNSTREAM_SOURCE,
QUERY_NODE_DATABASE_OPTIONS,
QUERY_NODE_TABLE_OPTIONS,
QUERY_NODE_INDEX_OPTIONS,
QUERY_NODE_EXPLAIN_OPTIONS,
QUERY_NODE_STREAM_OPTIONS,
QUERY_NODE_LEFT_VALUE,
QUERY_NODE_COLUMN_REF,
QUERY_NODE_WHEN_THEN,
QUERY_NODE_CASE_WHEN,
QUERY_NODE_EVENT_WINDOW,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR = 100,
QUERY_NODE_SELECT_STMT,
QUERY_NODE_VNODE_MODIFY_STMT,
QUERY_NODE_CREATE_DATABASE_STMT,
QUERY_NODE_DROP_DATABASE_STMT,
QUERY_NODE_ALTER_DATABASE_STMT,
QUERY_NODE_FLUSH_DATABASE_STMT,
QUERY_NODE_TRIM_DATABASE_STMT,
QUERY_NODE_CREATE_TABLE_STMT,
QUERY_NODE_CREATE_SUBTABLE_CLAUSE,
QUERY_NODE_CREATE_MULTI_TABLES_STMT,
QUERY_NODE_DROP_TABLE_CLAUSE,
QUERY_NODE_DROP_TABLE_STMT,
QUERY_NODE_DROP_SUPER_TABLE_STMT,
QUERY_NODE_ALTER_TABLE_STMT,
QUERY_NODE_ALTER_SUPER_TABLE_STMT,
QUERY_NODE_CREATE_USER_STMT,
QUERY_NODE_ALTER_USER_STMT,
QUERY_NODE_DROP_USER_STMT,
QUERY_NODE_USE_DATABASE_STMT,
QUERY_NODE_CREATE_DNODE_STMT,
QUERY_NODE_DROP_DNODE_STMT,
QUERY_NODE_ALTER_DNODE_STMT,
QUERY_NODE_CREATE_INDEX_STMT,
QUERY_NODE_DROP_INDEX_STMT,
QUERY_NODE_CREATE_QNODE_STMT,
QUERY_NODE_DROP_QNODE_STMT,
QUERY_NODE_CREATE_BNODE_STMT,
QUERY_NODE_DROP_BNODE_STMT,
QUERY_NODE_CREATE_SNODE_STMT,
QUERY_NODE_DROP_SNODE_STMT,
QUERY_NODE_CREATE_MNODE_STMT,
QUERY_NODE_DROP_MNODE_STMT,
QUERY_NODE_CREATE_TOPIC_STMT,
QUERY_NODE_DROP_TOPIC_STMT,
QUERY_NODE_DROP_CGROUP_STMT,
QUERY_NODE_ALTER_LOCAL_STMT,
QUERY_NODE_EXPLAIN_STMT,
QUERY_NODE_DESCRIBE_STMT,
QUERY_NODE_RESET_QUERY_CACHE_STMT,
QUERY_NODE_COMPACT_DATABASE_STMT,
QUERY_NODE_CREATE_FUNCTION_STMT,
QUERY_NODE_DROP_FUNCTION_STMT,
QUERY_NODE_CREATE_STREAM_STMT,
QUERY_NODE_DROP_STREAM_STMT,
QUERY_NODE_BALANCE_VGROUP_STMT,
QUERY_NODE_MERGE_VGROUP_STMT,
QUERY_NODE_REDISTRIBUTE_VGROUP_STMT,
QUERY_NODE_SPLIT_VGROUP_STMT,
QUERY_NODE_SYNCDB_STMT,
QUERY_NODE_GRANT_STMT,
QUERY_NODE_REVOKE_STMT,
QUERY_NODE_SHOW_DNODES_STMT,
QUERY_NODE_SHOW_MNODES_STMT,
QUERY_NODE_SHOW_MODULES_STMT,
QUERY_NODE_SHOW_QNODES_STMT,
QUERY_NODE_SHOW_SNODES_STMT,
QUERY_NODE_SHOW_BNODES_STMT,
QUERY_NODE_SHOW_CLUSTER_STMT,
QUERY_NODE_SHOW_DATABASES_STMT,
QUERY_NODE_SHOW_FUNCTIONS_STMT,
QUERY_NODE_SHOW_INDEXES_STMT,
QUERY_NODE_SHOW_STABLES_STMT,
QUERY_NODE_SHOW_STREAMS_STMT,
QUERY_NODE_SHOW_TABLES_STMT,
QUERY_NODE_SHOW_TAGS_STMT,
QUERY_NODE_SHOW_USERS_STMT,
QUERY_NODE_SHOW_LICENCES_STMT,
QUERY_NODE_SHOW_VGROUPS_STMT,
QUERY_NODE_SHOW_TOPICS_STMT,
QUERY_NODE_SHOW_CONSUMERS_STMT,
QUERY_NODE_SHOW_CONNECTIONS_STMT,
QUERY_NODE_SHOW_QUERIES_STMT,
QUERY_NODE_SHOW_APPS_STMT,
QUERY_NODE_SHOW_VARIABLES_STMT,
QUERY_NODE_SHOW_DNODE_VARIABLES_STMT,
QUERY_NODE_SHOW_TRANSACTIONS_STMT,
QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT,
QUERY_NODE_SHOW_VNODES_STMT,
QUERY_NODE_SHOW_USER_PRIVILEGES_STMT,
QUERY_NODE_SHOW_CREATE_DATABASE_STMT,
QUERY_NODE_SHOW_CREATE_TABLE_STMT,
QUERY_NODE_SHOW_CREATE_STABLE_STMT,
QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT,
QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT,
QUERY_NODE_SHOW_SCORES_STMT,
QUERY_NODE_SHOW_TABLE_TAGS_STMT,
QUERY_NODE_KILL_CONNECTION_STMT,
QUERY_NODE_KILL_QUERY_STMT,
QUERY_NODE_KILL_TRANSACTION_STMT,
QUERY_NODE_DELETE_STMT,
QUERY_NODE_INSERT_STMT,
QUERY_NODE_QUERY,
QUERY_NODE_SHOW_DB_ALIVE_STMT,
QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT,
QUERY_NODE_BALANCE_VGROUP_LEADER_STMT,
QUERY_NODE_RESTORE_DNODE_STMT,
QUERY_NODE_RESTORE_QNODE_STMT,
QUERY_NODE_RESTORE_MNODE_STMT,
QUERY_NODE_RESTORE_VNODE_STMT,
QUERY_NODE_PAUSE_STREAM_STMT,
QUERY_NODE_RESUME_STREAM_STMT,
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN = 1000,
QUERY_NODE_LOGIC_PLAN_JOIN,
QUERY_NODE_LOGIC_PLAN_AGG,
QUERY_NODE_LOGIC_PLAN_PROJECT,
QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY,
QUERY_NODE_LOGIC_PLAN_EXCHANGE,
QUERY_NODE_LOGIC_PLAN_MERGE,
QUERY_NODE_LOGIC_PLAN_WINDOW,
QUERY_NODE_LOGIC_PLAN_FILL,
QUERY_NODE_LOGIC_PLAN_SORT,
QUERY_NODE_LOGIC_PLAN_PARTITION,
QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC,
QUERY_NODE_LOGIC_PLAN_INTERP_FUNC,
QUERY_NODE_LOGIC_PLAN_GROUP_CACHE,
QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL,
QUERY_NODE_LOGIC_SUBPLAN,
QUERY_NODE_LOGIC_PLAN,
// physical plan node
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN,
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN,
QUERY_NODE_PHYSICAL_PLAN_PROJECT,
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN,
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN,
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
QUERY_NODE_PHYSICAL_PLAN_MERGE,
QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT,
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_FILL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION,
QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE,
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE,
QUERY_NODE_PHYSICAL_PLAN_PARTITION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION,
QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC,
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
QUERY_NODE_PHYSICAL_PLAN_INSERT,
QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT,
QUERY_NODE_PHYSICAL_PLAN_DELETE,
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE,
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL,
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN,
QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT,
QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT,
QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN
} ENodeType;
/**
* The first field of a node of any type is guaranteed to be the ENodeType.
* Hence the type of any node can be gotten by casting it to SNode.

View File

@ -5530,7 +5530,7 @@ int32_t tSerializeSOperatorParam(SEncoder* pEncoder, SOperatorParam* pOpParam) {
}
int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam, int32_t specNum) {
pOpParam->pOpParams = taosArrayInit(specNum, sizeof(SOperatorSpecParam))
pOpParam->pOpParams = taosArrayInit(specNum, sizeof(SOperatorSpecParam));
if (NULL == pOpParam->pOpParams) return -1;
SOperatorSpecParam specParam;

View File

@ -65,7 +65,7 @@ typedef struct SGroupColsInfo {
} SGroupColsInfo;
typedef struct SGcSessionCtx {
SOperatorInfo* pDownstream;
int32_t downstreamIdx;
bool cacheHit;
bool needCache;
SGcBlkBufInfo* pLastBlk;

View File

@ -65,6 +65,7 @@ typedef struct SOperatorInfo {
uint16_t operatorType;
int16_t resultDataBlockId;
bool blocking; // block operator or not
bool transparent;
uint8_t status; // denote if current operator is completed
char* name; // name, for debug purpose
void* info; // extension attribution
@ -165,7 +166,9 @@ void setOperatorCompleted(SOperatorInfo* pOperator);
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
void* pInfo, SExecTaskInfo* pTaskInfo);
int32_t optrDefaultBufFn(SOperatorInfo* pOperator);
SSDataBlock* optrDefaultGetExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam);
SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam);
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx);
int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx);
SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
SNode* pTagIndexCond, const char* pUser, const char* dbname);

View File

@ -478,7 +478,9 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
req.queryId = pTaskInfo->id.queryId;
req.execId = pSource->execId;
if (pDataInfo->pSrcUidList) {
int32_t code = buildTableScanOperatorParam(&req.opParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
int32_t code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
taosArrayDestroy(pDataInfo->pSrcUidList);
pDataInfo->pSrcUidList = NULL;
if (TSDB_CODE_SUCCESS != code) {
pTaskInfo->code = code;
taosMemoryFree(pWrapper);
@ -766,6 +768,8 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
dataInfo.pSrcUidList = taosArrayDup(pParam->uidList, NULL);
dataInfo.srcOpType = pParam->srcOpType;
taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
return TSDB_CODE_SUCCESS;
}

View File

@ -516,6 +516,15 @@ bool qIsDynamicExecTask(qTaskInfo_t tinfo) {
return ((SExecTaskInfo*)tinfo)->dynamicTask;
}
void destroyOperatorParam(SOperatorParam* pParam) {
if (NULL == pParam) {
return;
}
//TODO
}
void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) {
destroyOperatorParam(((SExecTaskInfo*)tinfo)->pOpParam);
((SExecTaskInfo*)tinfo)->pOpParam = pParam;

View File

@ -120,16 +120,6 @@ static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) {
return addPageToGroupCacheBuf(pInfo->pBlkBufs);
}
static int32_t initGroupCacheDownstreamInfo(SGroupCachePhysiNode* pPhyciNode, SOperatorInfo** pDownstream, int32_t numOfDownstream, SGcDownstreamInfo* pInfo) {
pInfo->ppDownStream = taosMemoryMalloc(numOfDownstream * POINTER_BYTES);
if (NULL == pInfo->ppDownStream) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(pInfo->ppDownStream, pDownstream, numOfDownstream * POINTER_BYTES);
return TSDB_CODE_SUCCESS;
}
static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperatorParam* pParam, SGcSessionCtx** ppSession) {
SGcSessionCtx ctx = {0};
SGroupCacheOperatorInfo* pGCache = pOperator->info;
@ -138,7 +128,7 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperato
ctx.cacheHit = true;
ctx.pLastBlk = pGroup->blks;
} else {
ctx.pDownstream = pOperator->pDownstream[pParam->downstreamIdx];
ctx.downstreamIdx = pParam->downstreamIdx;
ctx.needCache = pParam->needCache;
}
@ -210,7 +200,7 @@ SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) {
}
while (true) {
SSDataBlock* pBlock = pSession->pDownstream->fpSet.getNextExtFn(pSession->pDownstream, param);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, pSession->downstreamIdx);
if (NULL == pBlock) {
setCurrentGroupCacheDone(pOperator);
break;
@ -237,6 +227,8 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
goto _error;
}
pOperator->transparent = true;
setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
code = initGroupColsInfo(&pInfo->groupColsInfo, pPhyciNode->grpColsMayBeNull, pPhyciNode->pGroupCols);

View File

@ -639,7 +639,7 @@ static int32_t buildHJoinKeyHash(struct SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS;
while (true) {
pBlock = getNextBlockFromDownstream(pOperator, pJoin->pBuild->downStreamIdx)
pBlock = getNextBlockFromDownstream(pOperator, pJoin->pBuild->downStreamIdx);
if (NULL == pBlock) {
break;
}

View File

@ -43,6 +43,7 @@ typedef struct SMJoinOperatorInfo {
SSDataBlock* pRes;
int32_t joinType;
int32_t inputOrder;
int16_t downstreamResBlkId[2];
SSDataBlock* pLeft;
int32_t leftPos;
@ -72,8 +73,7 @@ static void destroyMergeJoinOperator(void* param);
static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num,
SSortMergeJoinPhysiNode* pJoinNode, const char* idStr);
static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num,
SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) {
static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) {
SNode* pPrimKeyCond = pJoinNode->pPrimKeyCond;
if (nodeType(pPrimKeyCond) != QUERY_NODE_OPERATOR) {
qError("not support this in join operator, %s", idStr);
@ -89,13 +89,13 @@ static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDow
leftTsCol = col1;
rightTsCol = col2;
} else {
if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) {
ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId);
if (col1->dataBlockId == pInfo->downstreamResBlkId[0]) {
ASSERT(col2->dataBlockId == pInfo->downstreamResBlkId[1]);
leftTsCol = col1;
rightTsCol = col2;
} else {
ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId);
ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId);
ASSERT(col1->dataBlockId == pInfo->downstreamResBlkId[1]);
ASSERT(col2->dataBlockId == pInfo->downstreamResBlkId[0]);
leftTsCol = col2;
rightTsCol = col1;
}
@ -104,11 +104,11 @@ static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDow
setJoinColumnInfo(&pInfo->rightCol, rightTsCol);
}
static void extractEqualOnCondColsFromOper(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstreams, SOperatorNode* pOperNode,
static void extractEqualOnCondColsFromOper(SMJoinOperatorInfo* pInfo, SOperatorNode* pOperNode,
SColumn* pLeft, SColumn* pRight) {
SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft;
SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight;
if (pLeftNode->dataBlockId == pRightNode->dataBlockId || pLeftNode->dataBlockId == pDownstreams[0]->resultDataBlockId) {
if (pLeftNode->dataBlockId == pRightNode->dataBlockId || pLeftNode->dataBlockId == pInfo->downstreamResBlkId[0]) {
*pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft);
*pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight);
} else {
@ -117,7 +117,7 @@ static void extractEqualOnCondColsFromOper(SMJoinOperatorInfo* pInfo, SOperatorI
}
}
static void extractEqualOnCondCols(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownStream, SNode* pEqualOnCondNode,
static void extractEqualOnCondCols(SMJoinOperatorInfo* pInfo, SNode* pEqualOnCondNode,
SArray* leftTagEqCols, SArray* rightTagEqCols) {
SColumn left = {0};
SColumn right = {0};
@ -125,7 +125,7 @@ static void extractEqualOnCondCols(SMJoinOperatorInfo* pInfo, SOperatorInfo** pD
SNode* pNode = NULL;
FOREACH(pNode, ((SLogicConditionNode*)pEqualOnCondNode)->pParameterList) {
SOperatorNode* pOperNode = (SOperatorNode*)pNode;
extractEqualOnCondColsFromOper(pInfo, pDownStream, pOperNode, &left, &right);
extractEqualOnCondColsFromOper(pInfo, pOperNode, &left, &right);
taosArrayPush(leftTagEqCols, &left);
taosArrayPush(rightTagEqCols, &right);
}
@ -134,7 +134,7 @@ static void extractEqualOnCondCols(SMJoinOperatorInfo* pInfo, SOperatorInfo** pD
if (nodeType(pEqualOnCondNode) == QUERY_NODE_OPERATOR) {
SOperatorNode* pOperNode = (SOperatorNode*)pEqualOnCondNode;
extractEqualOnCondColsFromOper(pInfo, pDownStream, pOperNode, &left, &right);
extractEqualOnCondColsFromOper(pInfo, pOperNode, &left, &right);
taosArrayPush(leftTagEqCols, &left);
taosArrayPush(rightTagEqCols, &right);
}
@ -198,17 +198,41 @@ static int32_t fillKeyBufFromTagCols(SArray* pCols, SSDataBlock* pBlock, int32_t
return (int32_t)(pStart - (char*)pKey);
}
SOperatorInfo** buildMergeJoinDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) {
SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES);
if (p) {
p[0] = pDownstream[0];
p[1] = pDownstream[0];
pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(pDownstream[0], 0);
pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(pDownstream[1], 1);
}
return p;
}
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
bool newDownstreams = false;
int32_t code = TSDB_CODE_SUCCESS;
if (pOperator == NULL || pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (1 == numOfDownstream) {
newDownstreams = true;
pDownstream = buildMergeJoinDownstreams(pInfo, pDownstream);
if (NULL == pDownstream) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
numOfDownstream = 2;
}
int32_t numOfCols = 0;
pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
@ -220,7 +244,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode, GET_TASKID(pTaskInfo));
extractTimeCondition(pInfo, pJoinNode, GET_TASKID(pTaskInfo));
if (pJoinNode->pOtherOnCond != NULL && pJoinNode->node.pConditions != NULL) {
pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
@ -265,7 +289,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
if (pInfo->pColEqualOnConditions != NULL) {
pInfo->leftEqOnCondCols = taosArrayInit(4, sizeof(SColumn));
pInfo->rightEqOnCondCols = taosArrayInit(4, sizeof(SColumn));
extractEqualOnCondCols(pInfo, pDownstream, pInfo->pColEqualOnConditions, pInfo->leftEqOnCondCols, pInfo->rightEqOnCondCols);
extractEqualOnCondCols(pInfo, pInfo->pColEqualOnConditions, pInfo->leftEqOnCondCols, pInfo->rightEqOnCondCols);
initTagColskeyBuf(&pInfo->leftEqOnCondKeyLen, &pInfo->leftEqOnCondKeyBuf, pInfo->leftEqOnCondCols);
initTagColskeyBuf(&pInfo->rightEqOnCondKeyLen, &pInfo->rightEqOnCondKeyBuf, pInfo->rightEqOnCondCols);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
@ -283,6 +307,9 @@ _error:
if (pInfo != NULL) {
destroyMergeJoinOperator(pInfo);
}
if (newDownstreams) {
taosMemoryFree(pDownstream);
}
taosMemoryFree(pOperator);
pTaskInfo->code = code;

View File

@ -94,16 +94,6 @@ int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
}
}
SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
pOperator->pOperatorParam = getOperatorParam(pOperator->operatorType, pParam, 0);
int32_t code = setOperatorParams(pOperator, pOperator->pOperatorParam ? pOperator->pOperatorParam->pChild : pParam);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
}
return pOperator->fpSet.getNextFn(pOperator);
}
static int64_t getQuerySupportBufSize(size_t numOfTables) {
size_t s1 = sizeof(STableQueryInfo);
// size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb
@ -656,7 +646,7 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pPara
return TSDB_CODE_SUCCESS;
}
FORCE_INLINE SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
if (pOperator->pDownstreamParams && pOperator->pDownstreamParams[idx]) {
return pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pOperator->pDownstreamParams[idx]);
}
@ -664,10 +654,22 @@ FORCE_INLINE SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOper
return pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]);
}
void destroyOperatorParam(SOperatorParam* pParam) {
if (NULL == pParam) {
return;
SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
pOperator->pOperatorParam = getOperatorParam(pOperator->operatorType, pParam, 0);
int32_t code = setOperatorParams(pOperator, pOperator->pOperatorParam ? pOperator->pOperatorParam->pChild : pParam);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
}
return pOperator->fpSet.getNextFn(pOperator);
}
int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx) {
if (pOperator->transparent) {
return getOperatorResultBlockId(pOperator->pDownstream[idx], 0);
}
return pOperator->resultDataBlockId;
}

View File

@ -789,14 +789,17 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = 0;
STableListInfo* pListInfo = pInfo->base.pTableListInfo;
int32_t num = taosArrayGetSize(pOperator->pOperatorParam->pUidList);
STableScanOperatorParam* pParam = (STableScanOperatorParam*)pOperator->pOperatorParam;
int32_t num = taosArrayGetSize(pParam->pUidList);
if (num <= 0) {
qError("empty table scan uid list");
return TSDB_CODE_INVALID_PARA;
}
qDebug("add total %d dynamic tables to scan", num);
for (int32_t i = 0; i < num; ++i) {
uint64_t* pUid = taosArrayGet(pOperator->pOperatorParam->pUidList, i);
uint64_t* pUid = taosArrayGet(pParam->pUidList, i);
STableKeyInfo info = {.uid = *pUid, .groupId = 0};
void* p = taosArrayPush(pListInfo->pTableList, &info);

View File

@ -5259,7 +5259,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL,
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
pInfo->statestore = pTaskInfo->storageAPI.stateStore;

View File

@ -396,6 +396,7 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch);
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
void qwFreeTaskCtx(SQWTaskCtx *ctx);
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
void qwDbgDumpMgmtInfo(SQWorker *mgmt);
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);

View File

@ -401,6 +401,7 @@ _return:
QW_RET(code);
}
int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) {
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
QW_SET_QTID(id, qId, tId, eId);
@ -416,9 +417,9 @@ int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) {
return TSDB_CODE_SUCCESS;
}
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC));
qwHandleTaskComplete(QW_FPARAMS(), ctx);
QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
return TSDB_CODE_SUCCESS;
}

View File

@ -95,6 +95,7 @@ static void freeItem(void *param) {
taosMemoryFree(pInfo->verboseInfo);
}
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
qTaskInfo_t taskHandle = ctx->taskHandle;
@ -492,7 +493,7 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg
return TSDB_CODE_ACTION_IN_PROGRESS;
}
qUpdateOperatorParam(ctx->taskHandle);
qUpdateOperatorParam(ctx->taskHandle, qwMsg->msg);
atomic_store_8((int8_t *)&ctx->queryInQueue, 1);
@ -755,7 +756,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
//qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true);
ctx->level = plan->level;
ctx->dynamicTask = qIsDynamicExecTask(pTaskInfo)
ctx->dynamicTask = qIsDynamicExecTask(pTaskInfo);
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);