Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/row_refact
This commit is contained in:
commit
e0428e45a4
|
@ -67,7 +67,6 @@ enum {
|
|||
|
||||
enum {
|
||||
#endif
|
||||
// Requests handled by DNODE
|
||||
TD_NEW_MSG_SEG(TDMT_DND_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_DROP_MNODE, "dnode-drop-mnode", NULL, NULL)
|
||||
|
@ -83,7 +82,6 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_DND_NET_TEST, "net-test", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL)
|
||||
|
||||
// Requests handled by MNODE
|
||||
TD_NEW_MSG_SEG(TDMT_MND_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ACCT, "create-acct", NULL, NULL)
|
||||
|
@ -95,7 +93,6 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_AUTH, "get-user-auth", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_DNODE, "create-dnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_DNODE, "config-dnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_DNODE, "alter-dnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_DNODE, "drop-dnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_MNODE, "create-mnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_MNODE, "alter-mnode", NULL, NULL)
|
||||
|
@ -103,6 +100,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_QNODE, "create-qnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_QNODE, "alter-qnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_QNODE, "drop-qnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "qnode-list", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SNODE, "create-snode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_SNODE, "alter-snode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_SNODE, "drop-snode", NULL, NULL)
|
||||
|
@ -115,52 +113,53 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_DB, "alter-db", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_SYNC_DB, "sync-db", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_DB, "compact-db", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "get-db-cfg", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "vgroup-list", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_FUNC, "create-func", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_FUNC, "retrieve-func", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_FUNC, "drop-func", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "create-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "alter-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "drop-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "table-meta", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SMA, "create-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_SMA, "drop-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "table-meta", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "vgroup-list", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "qnode-list", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "heartbeat", SClientHbBatchReq, SClientHbBatchRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "show", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "systable-retrieve", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "status", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "mq-ask-ep", SMqAskEpReq, SMqAskEpRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mq-consumer-lost", SMqConsumerLostMsg, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "mq-consumer-recover", SMqConsumerRecoverMsg, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mq-do-rebalance", SMqDoRebalanceMsg, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "mq-drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "create-stream", SCMCreateStreamReq, SCMCreateStreamRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "alter-stream", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "drop-stream", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_INDEX, "create-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "get-db-cfg", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "apply-msg", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "heartbeat", SClientHbBatchReq, SClientHbBatchRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "status", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "show", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "retrieve", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CONFIRM_WRITE, "mnode-confirm-write", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL)
|
||||
|
||||
// Requests handled by VNODE
|
||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_QUERY, "query", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "query-continue", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_FETCH, "fetch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TABLE, "create-table", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TABLE, "alter-table", NULL, NULL)
|
||||
|
@ -182,67 +181,38 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
|
||||
// TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp)
|
||||
// TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "vnode-query-heartbeat", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_GET_TSMA_EXP_WNDS, "vnode-get-tsma-expired-windows", SVGetTsmaExpWndsReq, SVGetTsmaExpWndsRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_START_WRITE, "start-write", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STOP_WRITE, "stop-write", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CONFIRM_WRITE, "confirm-write", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "vnode-sync-timeout", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING, "vnode-sync-ping", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING_REPLY, "vnode-sync-ping-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST, "vnode-sync-client-request", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, "vnode-sync-client-request-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE, "vnode-sync-request-vote", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE_REPLY, "vnode-sync-request-vote-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES, "vnode-sync-append-entries", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, "vnode-sync-append-entries-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_NOOP, "vnode-sync-noop", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_UNKNOWN, "vnode-sync-unknown", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_COMMON_RESPONSE, "vnode-sync-common-response", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "vnode-sync-apply-msg", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "vnode-sync-config-change", NULL, NULL)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "vnode-alter-config", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "vnode-alter-replica", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "vnode-compact", NULL, NULL)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "vnode-delete-data", SVDeleteReq, SVDeleteRsp)
|
||||
|
||||
// Requests handled by QNODE
|
||||
TD_NEW_MSG_SEG(TDMT_QND_MSG)
|
||||
|
||||
// Requests handled by SNODE
|
||||
TD_NEW_MSG_SEG(TDMT_SND_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
|
||||
//TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
|
||||
//TD_DEF_MSG_TYPE(TDMT_SND_TASK_PIPE_EXEC, "snode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
|
||||
//TD_DEF_MSG_TYPE(TDMT_SND_TASK_MERGE_EXEC, "snode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_SND_TASK_RUN, "snode-stream-task-run", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DISPATCH, "snode-stream-task-dispatch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SND_TASK_RECOVER, "snode-stream-task-recover", NULL, NULL)
|
||||
|
||||
// Requests handled by SCHEDULER
|
||||
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "scheduler-link-broken", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
|
||||
|
||||
// Monitor info exchange between processes
|
||||
TD_NEW_MSG_SEG(TDMT_MON_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_MON_MM_INFO, "monitor-minfo", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MON_VM_INFO, "monitor-vinfo", NULL, NULL)
|
||||
|
@ -253,6 +223,22 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MON_MM_LOAD, "monitor-mload", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_SYNC_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timeout", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_REPLY, "sync-client-request-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE, "sync-request-vote", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE_REPLY, "sync-request-vote-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES, "sync-append-entries", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES_REPLY, "sync-append-entries-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_NOOP, "sync-noop", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_UNKNOWN, "sync-unknown", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_COMMON_RESPONSE, "sync-common-response", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_APPLY_MSG, "sync-apply-msg", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CONFIG_CHANGE, "sync-config-change", NULL, NULL)
|
||||
|
||||
#if defined(TD_MSG_NUMBER_)
|
||||
TDMT_MAX
|
||||
#endif
|
||||
|
|
|
@ -229,6 +229,7 @@ typedef struct STableScanPhysiNode {
|
|||
double ratio;
|
||||
int32_t dataRequired;
|
||||
SNodeList* pDynamicScanFuncs;
|
||||
SNodeList* pPartitionKeys;
|
||||
int64_t interval;
|
||||
int64_t offset;
|
||||
int64_t sliding;
|
||||
|
|
|
@ -58,6 +58,8 @@ typedef struct {
|
|||
|
||||
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb);
|
||||
|
||||
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
|
||||
|
||||
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
|
||||
|
|
|
@ -563,7 +563,9 @@ int32_t* taosGetErrno();
|
|||
//scheduler&qworker
|
||||
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501)
|
||||
#define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502)
|
||||
#define TSDB_CODE_QW_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x2503)
|
||||
#define TSDB_CODE_SCH_IGNORE_ERROR TAOS_DEF_ERROR_CODE(0, 0x2503)
|
||||
#define TSDB_CODE_SCH_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x2504)
|
||||
#define TSDB_CODE_QW_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x2550)
|
||||
|
||||
//parser
|
||||
#define TSDB_CODE_PAR_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x2600)
|
||||
|
|
|
@ -66,10 +66,11 @@ void taos_cleanup(void) {
|
|||
|
||||
hbMgrCleanUp();
|
||||
|
||||
rpcCleanup();
|
||||
catalogDestroy();
|
||||
schedulerDestroy();
|
||||
|
||||
rpcCleanup();
|
||||
|
||||
tscInfo("all local resources released");
|
||||
taosCleanupCfg();
|
||||
taosCloseLog();
|
||||
|
|
|
@ -56,6 +56,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
|||
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg);
|
||||
|
||||
// mmWorker.c
|
||||
int32_t mmStartWorker(SMnodeMgmt *pMgmt);
|
||||
|
|
|
@ -129,12 +129,7 @@ SArray *mmGetMsgHandles() {
|
|||
SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle));
|
||||
if (pArray == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
// Requests handled by DNODE
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
@ -146,7 +141,6 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
// Requests handled by MNODE
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
@ -159,6 +153,8 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
@ -172,66 +168,66 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIRM_WRITE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
// Requests handled by VNODE
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
|
|
@ -89,6 +89,8 @@ int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
mndPreprocessQueryMsg(pMgmt->pMnode, pMsg);
|
||||
|
||||
return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
|
||||
}
|
||||
|
||||
|
|
|
@ -51,6 +51,8 @@ int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
|||
int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg * pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -57,6 +57,8 @@ static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
qndPreprocessQueryMsg(pMgmt->pQnode, pMsg);
|
||||
|
||||
return qmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
|
||||
}
|
||||
|
||||
|
|
|
@ -365,15 +365,15 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
|
|
@ -163,7 +163,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
|||
SRpcMsg rsp = {0};
|
||||
|
||||
// get original rpc msg
|
||||
assert(pMsg->msgType == TDMT_VND_SYNC_APPLY_MSG);
|
||||
assert(pMsg->msgType == TDMT_SYNC_APPLY_MSG);
|
||||
SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(pMsg);
|
||||
syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg);
|
||||
SRpcMsg originalRpcMsg;
|
||||
|
@ -250,6 +250,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
|||
|
||||
switch (qtype) {
|
||||
case QUERY_QUEUE:
|
||||
vnodePreprocessQueryMsg(pVnode->pImpl, pMsg);
|
||||
|
||||
dTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pQueryQ, pMsg);
|
||||
break;
|
||||
|
|
|
@ -376,35 +376,35 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
|||
syncRpcMsgLog2(logBuf, pMsg);
|
||||
taosMemoryFree(syncNodeStr);
|
||||
|
||||
if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
|
||||
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_VND_SYNC_PING) {
|
||||
} else if (pMsg->msgType == TDMT_SYNC_PING) {
|
||||
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
||||
syncPingDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
|
||||
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
||||
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
||||
syncPingReplyDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
|
||||
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
|
||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
|
||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
||||
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
|
||||
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
||||
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
|
||||
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||
|
@ -625,7 +625,7 @@ int32_t mndAcquireRpcRef(SMnode *pMnode) {
|
|||
code = -1;
|
||||
} else {
|
||||
int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
|
||||
mTrace("mnode rpc is acquired, ref:%d", ref);
|
||||
// mTrace("mnode rpc is acquired, ref:%d", ref);
|
||||
}
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
return code;
|
||||
|
@ -634,7 +634,7 @@ int32_t mndAcquireRpcRef(SMnode *pMnode) {
|
|||
void mndReleaseRpcRef(SMnode *pMnode) {
|
||||
taosThreadRwlockRdlock(&pMnode->lock);
|
||||
int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
|
||||
mTrace("mnode rpc is released, ref:%d", ref);
|
||||
// mTrace("mnode rpc is released, ref:%d", ref);
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
}
|
||||
|
||||
|
@ -675,7 +675,7 @@ int32_t mndAcquireSyncRef(SMnode *pMnode) {
|
|||
code = -1;
|
||||
} else {
|
||||
int32_t ref = atomic_add_fetch_32(&pMnode->syncRef, 1);
|
||||
mTrace("mnode sync is acquired, ref:%d", ref);
|
||||
// mTrace("mnode sync is acquired, ref:%d", ref);
|
||||
}
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
return code;
|
||||
|
@ -684,6 +684,6 @@ int32_t mndAcquireSyncRef(SMnode *pMnode) {
|
|||
void mndReleaseSyncRef(SMnode *pMnode) {
|
||||
taosThreadRwlockRdlock(&pMnode->lock);
|
||||
int32_t ref = atomic_sub_fetch_32(&pMnode->syncRef, 1);
|
||||
mTrace("mnode sync is released, ref:%d", ref);
|
||||
// mTrace("mnode sync is released, ref:%d", ref);
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
}
|
||||
|
|
|
@ -18,6 +18,14 @@
|
|||
#include "mndMnode.h"
|
||||
#include "qworker.h"
|
||||
|
||||
int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg) {
|
||||
if (TDMT_VND_QUERY != pMsg->msgType) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg);
|
||||
}
|
||||
|
||||
int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
|
||||
int32_t code = -1;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
|
|
|
@ -63,6 +63,14 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg * pMsg) {
|
||||
if (TDMT_VND_QUERY != pMsg->msgType) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return qWorkerPreprocessQueryMsg(pQnode->pQuery, pMsg);
|
||||
}
|
||||
|
||||
int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
|
||||
int32_t code = -1;
|
||||
SReadHandle handle = {.pMsgCb = &pQnode->msgCb};
|
||||
|
|
|
@ -56,6 +56,7 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg);
|
|||
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
||||
int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
int32_t vnodePreprocessQueryMsg(SVnode * pVnode, SRpcMsg * pMsg);
|
||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||
|
|
|
@ -190,7 +190,15 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
int32_t vnodePreprocessQueryMsg(SVnode * pVnode, SRpcMsg * pMsg) {
|
||||
if (TDMT_VND_QUERY != pMsg->msgType) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg);
|
||||
}
|
||||
|
||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
vTrace("message in vnode query queue is processing");
|
||||
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||
switch (pMsg->msgType) {
|
||||
|
@ -279,56 +287,56 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
|
||||
SRpcMsg *pRpcMsg = pMsg;
|
||||
|
||||
if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
|
||||
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
|
||||
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
||||
syncPingDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
||||
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
||||
syncPingReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
||||
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
||||
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
|
|
|
@ -27,6 +27,10 @@ typedef struct {
|
|||
int32_t bytes;
|
||||
} SGroupKeys, SStateKeys;
|
||||
|
||||
int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList);
|
||||
uint64_t calcGroupId(char* pData, int32_t len);
|
||||
void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex);
|
||||
int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -333,6 +333,12 @@ typedef struct STableScanInfo {
|
|||
double sampleRatio; // data block sample ratio, 1 by default
|
||||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
|
||||
|
||||
SArray* pGroupCols;
|
||||
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
|
||||
char* keyBuf; // group by keys for hash
|
||||
int32_t groupKeyLen; // total group by column width
|
||||
SHashObj* pGroupSet; // quick locate the window object for each result
|
||||
|
||||
int32_t curTWinIdx;
|
||||
} STableScanInfo;
|
||||
|
||||
|
@ -727,7 +733,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
|||
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
|
||||
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
|
||||
|
||||
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
||||
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
|
||||
|
|
|
@ -4387,9 +4387,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
if (pDataReader == NULL && terrno != 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SArray* groupKyes = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
|
||||
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
||||
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, groupKyes, pTaskInfo);
|
||||
|
||||
STableScanInfo* pScanInfo = pOperator->info;
|
||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||
|
|
|
@ -24,10 +24,10 @@
|
|||
#include "tcompare.h"
|
||||
#include "thash.h"
|
||||
#include "ttypes.h"
|
||||
#include "executorInt.h"
|
||||
|
||||
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
|
||||
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
|
||||
static uint64_t calcGroupId(char* pData, int32_t len);
|
||||
|
||||
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
|
||||
|
@ -37,7 +37,7 @@ static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosArrayDestroy(pInfo->pGroupColVals);
|
||||
}
|
||||
|
||||
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
|
||||
int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
|
||||
*pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
|
||||
if ((*pGroupColVals) == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -110,7 +110,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo
|
|||
return true;
|
||||
}
|
||||
|
||||
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
|
||||
void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
|
||||
SColumnDataAgg* pColAgg = NULL;
|
||||
|
||||
size_t numOfGroupCols = taosArrayGetSize(pGroupCols);
|
||||
|
@ -139,7 +139,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
|
||||
int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
|
||||
ASSERT(pKey != NULL);
|
||||
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
|
||||
|
||||
|
@ -409,7 +409,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
|
||||
SPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
|
||||
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
|
||||
int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
|
||||
|
@ -622,8 +621,13 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
|
||||
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||
taosArrayDestroy(pInfo->pGroupCols);
|
||||
for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){
|
||||
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
|
||||
taosMemoryFree(key.pData);
|
||||
}
|
||||
taosArrayDestroy(pInfo->pGroupColVals);
|
||||
taosMemoryFree(pInfo->keyBuf);
|
||||
taosHashCleanup(pInfo->pGroupSet);
|
||||
taosMemoryFree(pInfo->columnOffset);
|
||||
}
|
||||
|
||||
|
|
|
@ -33,6 +33,8 @@
|
|||
#include "ttypes.h"
|
||||
#include "vnode.h"
|
||||
|
||||
#include "executorInt.h"
|
||||
|
||||
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||
|
||||
|
@ -373,6 +375,17 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
longjmp(pOperator->pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, 0);
|
||||
int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
|
||||
|
||||
uint64_t *groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
|
||||
if (groupId) {
|
||||
pBlock->info.groupId = *groupId;
|
||||
}else if(len != 0){
|
||||
pBlock->info.groupId = calcGroupId(pTableScanInfo->keyBuf, len);
|
||||
taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &pBlock->info.groupId, sizeof(uint64_t));
|
||||
}
|
||||
|
||||
// current block is filter out according to filter condition, continue load the next block
|
||||
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
|
||||
continue;
|
||||
|
@ -497,21 +510,25 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosMemoryFree(pTableScanInfo->pResBlock);
|
||||
tsdbCleanupReadHandle(pTableScanInfo->dataReader);
|
||||
|
||||
taosArrayDestroy(pTableScanInfo->pGroupCols);
|
||||
for(int i = 0; i < taosArrayGetSize(pTableScanInfo->pGroupColVals); i++){
|
||||
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pTableScanInfo->pGroupColVals, i);
|
||||
taosMemoryFree(key.pData);
|
||||
}
|
||||
taosArrayDestroy(pTableScanInfo->pGroupColVals);
|
||||
taosMemoryFree(pTableScanInfo->keyBuf);
|
||||
taosHashCleanup(pTableScanInfo->pGroupSet);
|
||||
if (pTableScanInfo->pColMatchInfo != NULL) {
|
||||
taosArrayDestroy(pTableScanInfo->pColMatchInfo);
|
||||
}
|
||||
}
|
||||
|
||||
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader,
|
||||
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
|
||||
SReadHandle* readHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo) {
|
||||
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
|
||||
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
||||
|
@ -522,7 +539,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
|
||||
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
|
||||
|
@ -552,6 +569,18 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
// for table group
|
||||
pInfo->pGroupCols = groupKyes;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
|
||||
if (pInfo->pGroupSet == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, groupKyes);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
|
||||
NULL, NULL, getTableScannerExecInfo);
|
||||
|
||||
|
@ -559,6 +588,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
pOperator->cost.openCost = 0;
|
||||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
|
||||
|
@ -901,7 +936,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
|||
SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;
|
||||
|
||||
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
||||
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, NULL, pTaskInfo);
|
||||
|
||||
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ extern "C" {
|
|||
#define QW_DEFAULT_TASK_NUMBER 10000
|
||||
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
|
||||
#define QW_DEFAULT_SHORT_RUN_TIMES 2
|
||||
#define QW_DEFAULT_HEARTBEAT_MSEC 3000
|
||||
#define QW_DEFAULT_HEARTBEAT_MSEC 5000
|
||||
|
||||
enum {
|
||||
QW_PHASE_PRE_QUERY = 1,
|
||||
|
|
|
@ -23,6 +23,7 @@ extern "C" {
|
|||
#include "qwInt.h"
|
||||
#include "dataSinkMgt.h"
|
||||
|
||||
int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain);
|
||||
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||
|
|
|
@ -36,7 +36,8 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
|
|||
|
||||
break;
|
||||
case JOB_TASK_STATUS_NOT_START:
|
||||
if (newStatus != JOB_TASK_STATUS_CANCELLED) {
|
||||
if (newStatus != JOB_TASK_STATUS_DROPPING && newStatus != JOB_TASK_STATUS_EXECUTING
|
||||
&& newStatus != JOB_TASK_STATUS_FAILED) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
|
|
|
@ -248,6 +248,41 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SSubQueryMsg *msg = pMsg->pCont;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
|
||||
QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
msg->sId = be64toh(msg->sId);
|
||||
msg->queryId = be64toh(msg->queryId);
|
||||
msg->taskId = be64toh(msg->taskId);
|
||||
msg->refId = be64toh(msg->refId);
|
||||
msg->phyLen = ntohl(msg->phyLen);
|
||||
msg->sqlLen = ntohl(msg->sqlLen);
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
uint64_t tId = msg->taskId;
|
||||
int64_t rId = msg->refId;
|
||||
|
||||
SQWMsg qwMsg = {.msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
|
||||
|
||||
QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle);
|
||||
QW_ERR_RET(qwPrerocessQuery(QW_FPARAMS(), &qwMsg));
|
||||
QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p", pMsg->info.handle);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
|
@ -265,12 +300,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
|||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
msg->sId = be64toh(msg->sId);
|
||||
msg->queryId = be64toh(msg->queryId);
|
||||
msg->taskId = be64toh(msg->taskId);
|
||||
msg->refId = be64toh(msg->refId);
|
||||
msg->phyLen = ntohl(msg->phyLen);
|
||||
msg->sqlLen = ntohl(msg->sqlLen);
|
||||
msg->sId = msg->sId;
|
||||
msg->queryId = msg->queryId;
|
||||
msg->taskId = msg->taskId;
|
||||
msg->refId = msg->refId;
|
||||
msg->phyLen = msg->phyLen;
|
||||
msg->sqlLen = msg->sqlLen;
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
|
|
|
@ -248,11 +248,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
|||
|
||||
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
|
||||
|
||||
if (QW_PHASE_PRE_QUERY == phase) {
|
||||
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
||||
} else {
|
||||
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
||||
}
|
||||
|
||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||
|
||||
|
@ -285,7 +281,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
|||
break;
|
||||
}
|
||||
|
||||
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
|
||||
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
|
||||
break;
|
||||
}
|
||||
case QW_PHASE_PRE_FETCH: {
|
||||
|
@ -437,7 +433,7 @@ _return:
|
|||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
|
||||
int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||
int32_t code = 0;
|
||||
bool queryRsped = false;
|
||||
SSubplan *plan = NULL;
|
||||
|
@ -448,6 +444,32 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
|
|||
|
||||
QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
|
||||
|
||||
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
||||
|
||||
ctx->ctrlConnInfo = qwMsg->connInfo;
|
||||
|
||||
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_NOT_START));
|
||||
|
||||
_return:
|
||||
|
||||
if (ctx) {
|
||||
QW_UPDATE_RSP_CODE(ctx, code);
|
||||
qwReleaseTaskCtx(mgmt, ctx);
|
||||
}
|
||||
|
||||
QW_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
|
||||
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
|
||||
int32_t code = 0;
|
||||
bool queryRsped = false;
|
||||
SSubplan *plan = NULL;
|
||||
SQWPhaseInput input = {0};
|
||||
qTaskInfo_t pTaskInfo = NULL;
|
||||
DataSinkHandle sinkHandle = NULL;
|
||||
SQWTaskCtx *ctx = NULL;
|
||||
|
||||
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
|
||||
|
||||
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
|
||||
|
@ -455,8 +477,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
|
|||
atomic_store_8(&ctx->taskType, taskType);
|
||||
atomic_store_8(&ctx->explain, explain);
|
||||
|
||||
ctx->ctrlConnInfo = qwMsg->connInfo;
|
||||
|
||||
/*QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);*/
|
||||
|
||||
code = qStringToSubplan(qwMsg->msg, &plan);
|
||||
|
@ -663,7 +683,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
|
||||
// TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED
|
||||
|
||||
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
||||
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
||||
|
||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||
|
||||
|
|
|
@ -32,6 +32,10 @@ extern "C" {
|
|||
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
|
||||
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT
|
||||
|
||||
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
|
||||
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
|
||||
|
||||
#define SCH_TASK_MAX_EXEC_TIMES 5
|
||||
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
|
||||
|
||||
enum {
|
||||
|
@ -51,6 +55,7 @@ typedef struct SSchTrans {
|
|||
|
||||
typedef struct SSchHbTrans {
|
||||
SRWLatch lock;
|
||||
int64_t taskNum;
|
||||
SRpcCtx rpcCtx;
|
||||
SSchTrans trans;
|
||||
} SSchHbTrans;
|
||||
|
@ -114,7 +119,8 @@ typedef struct SSchTaskCallbackParam {
|
|||
uint64_t queryId;
|
||||
int64_t refId;
|
||||
uint64_t taskId;
|
||||
void *transport;
|
||||
int32_t execIdx;
|
||||
void *pTrans;
|
||||
} SSchTaskCallbackParam;
|
||||
|
||||
typedef struct SSchHbCallbackParam {
|
||||
|
@ -148,25 +154,36 @@ typedef struct SSchLevel {
|
|||
SArray *subTasks; // Element is SQueryTask
|
||||
} SSchLevel;
|
||||
|
||||
typedef struct SSchTaskProfile {
|
||||
int64_t startTs;
|
||||
int64_t execUseTime[SCH_TASK_MAX_EXEC_TIMES];
|
||||
int64_t waitTime;
|
||||
int64_t endTs;
|
||||
} SSchTaskProfile;
|
||||
|
||||
typedef struct SSchTask {
|
||||
uint64_t taskId; // task id
|
||||
SRWLatch lock; // task lock
|
||||
int32_t maxExecTimes; // task may exec times
|
||||
int32_t execIdx; // task current execute try index
|
||||
SSchLevel *level; // level
|
||||
SRWLatch planLock; // task update plan lock
|
||||
SSubplan *plan; // subplan
|
||||
char *msg; // operator tree
|
||||
int32_t msgLen; // msg length
|
||||
int8_t status; // task status
|
||||
int32_t lastMsgType; // last sent msg type
|
||||
int32_t tryTimes; // task already tried times
|
||||
int64_t timeoutUsec; // taks timeout useconds before reschedule
|
||||
SQueryNodeAddr succeedAddr; // task executed success node address
|
||||
int8_t candidateIdx; // current try condidation index
|
||||
SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr
|
||||
SArray *execNodes; // all tried node for current task, element is SSchNodeInfo
|
||||
SQueryProfileSummary summary; // task execution summary
|
||||
SHashObj *execNodes; // all tried node for current task, element is SSchNodeInfo
|
||||
SSchTaskProfile profile; // task execution profile
|
||||
int32_t childReady; // child task ready number
|
||||
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
|
||||
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
|
||||
void* handle; // task send handle
|
||||
bool registerdHb; // registered in hb
|
||||
} SSchTask;
|
||||
|
||||
typedef struct SSchJobAttr {
|
||||
|
@ -215,8 +232,39 @@ typedef struct SSchJob {
|
|||
|
||||
extern SSchedulerMgmt schMgmt;
|
||||
|
||||
#define SCH_LOG_TASK_START_TS(_task) \
|
||||
do { \
|
||||
int64_t us = taosGetTimestampUs(); \
|
||||
int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \
|
||||
(_task)->profile.execUseTime[idx] = us; \
|
||||
if (0 == (_task)->execIdx) { \
|
||||
(_task)->profile.startTs = us; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define SCH_LOG_TASK_WAIT_TS(_task) \
|
||||
do { \
|
||||
int64_t us = taosGetTimestampUs(); \
|
||||
int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \
|
||||
(_task)->profile.waitTime += us - (_task)->profile.execUseTime[idx]; \
|
||||
} while (0)
|
||||
|
||||
|
||||
#define SCH_LOG_TASK_END_TS(_task) \
|
||||
do { \
|
||||
int64_t us = taosGetTimestampUs(); \
|
||||
int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \
|
||||
(_task)->profile.execUseTime[idx] = us - (_task)->profile.execUseTime[idx]; \
|
||||
(_task)->profile.endTs = us; \
|
||||
} while (0)
|
||||
|
||||
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execUseTime[(_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES]) > (_task)->timeoutUsec)
|
||||
|
||||
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
|
||||
|
||||
#define SCH_LOCK_TASK(_task) SCH_LOCK(SCH_WRITE, &(_task)->lock)
|
||||
#define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock)
|
||||
|
||||
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
|
||||
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
|
||||
#define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1)
|
||||
|
@ -284,7 +332,7 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
|
|||
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
|
||||
int32_t schFetchFromRemote(SSchJob *pJob);
|
||||
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
|
||||
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId);
|
||||
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction);
|
||||
int32_t schCloneSMsgSendInfo(void *src, void **dst);
|
||||
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob);
|
||||
void schFreeJobImpl(void *job);
|
||||
|
@ -301,10 +349,9 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp);
|
|||
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp);
|
||||
void schProcessOnDataFetched(SSchJob *job);
|
||||
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask);
|
||||
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode);
|
||||
void schFreeRpcCtxVal(const void *arg);
|
||||
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
|
||||
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle);
|
||||
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execIdx);
|
||||
int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
|
||||
SSchResInfo *pRes, bool sync);
|
||||
int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
|
||||
|
@ -318,7 +365,8 @@ int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *p
|
|||
int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes);
|
||||
int32_t schFetchRows(SSchJob *pJob);
|
||||
int32_t schAsyncFetchRows(SSchJob *pJob);
|
||||
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t rspCode);
|
||||
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execIdx);
|
||||
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -28,11 +28,14 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) { qDebug("release jobId:0x%"PR
|
|||
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
|
||||
pTask->plan = pPlan;
|
||||
pTask->level = pLevel;
|
||||
pTask->execIdx = -1;
|
||||
pTask->maxExecTimes = SCH_TASK_MAX_EXEC_TIMES;
|
||||
pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
|
||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
|
||||
pTask->taskId = schGenTaskId();
|
||||
pTask->execNodes = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SSchNodeInfo));
|
||||
pTask->execNodes = taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
if (NULL == pTask->execNodes) {
|
||||
SCH_TASK_ELOG("taosArrayInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM);
|
||||
SCH_TASK_ELOG("taosHashInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -123,7 +126,34 @@ _return:
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
void schFreeTask(SSchTask *pTask) {
|
||||
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (!pTask->registerdHb) {
|
||||
return;
|
||||
}
|
||||
|
||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||
SQueryNodeEpId epId = {0};
|
||||
|
||||
epId.nodeId = addr->nodeId;
|
||||
|
||||
SEp* pEp = SCH_GET_CUR_EP(addr);
|
||||
strcpy(epId.ep.fqdn, pEp->fqdn);
|
||||
epId.ep.port = pEp->port;
|
||||
|
||||
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port);
|
||||
return;
|
||||
}
|
||||
|
||||
atomic_sub_fetch_64(&hb->taskNum, 1);
|
||||
|
||||
pTask->registerdHb = false;
|
||||
}
|
||||
|
||||
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
|
||||
schDeregisterTaskHb(pJob, pTask);
|
||||
|
||||
if (pTask->candidateAddrs) {
|
||||
taosArrayDestroy(pTask->candidateAddrs);
|
||||
}
|
||||
|
@ -139,7 +169,7 @@ void schFreeTask(SSchTask *pTask) {
|
|||
}
|
||||
|
||||
if (pTask->execNodes) {
|
||||
taosArrayDestroy(pTask->execNodes);
|
||||
taosHashCleanup(pTask->execNodes);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -329,44 +359,51 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle) {
|
||||
SSchNodeInfo nodeInfo = {.addr = *addr, .handle = handle};
|
||||
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execIdx) {
|
||||
SSchNodeInfo nodeInfo = {.addr = *addr, .handle = NULL};
|
||||
|
||||
if (NULL == taosArrayPush(pTask->execNodes, &nodeInfo)) {
|
||||
SCH_TASK_ELOG("taosArrayPush nodeInfo to execNodes list failed, errno:%d", errno);
|
||||
if (taosHashPut(pTask->execNodes, &execIdx, sizeof(execIdx), &nodeInfo, sizeof(nodeInfo))) {
|
||||
SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("task execNode recorded, handle:%p", handle);
|
||||
SCH_TASK_DLOG("task execNode added, execIdx:%d", execIdx);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle) {
|
||||
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execIdx) {
|
||||
if (NULL == pTask->execNodes) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t num = taosArrayGetSize(pTask->execNodes);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SSchNodeInfo* pNode = taosArrayGet(pTask->execNodes, i);
|
||||
if (pNode->handle == handle) {
|
||||
taosArrayRemove(pTask->execNodes, i);
|
||||
break;
|
||||
}
|
||||
taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx));
|
||||
if (execIdx != pTask->execIdx) { // ignore it
|
||||
SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t rspCode) {
|
||||
int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) {
|
||||
if (taosHashGetSize(pTask->execNodes) <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execIdx, sizeof(execIdx));
|
||||
nodeInfo->handle = handle;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execIdx) {
|
||||
if (dropExecNode) {
|
||||
SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execIdx));
|
||||
}
|
||||
|
||||
SCH_SET_TASK_HANDLE(pTask, handle);
|
||||
|
||||
schUpdateTaskExecNodeHandle(pTask, handle, rspCode);
|
||||
|
||||
if (msgType == TDMT_SCH_LINK_BROKEN) {
|
||||
schDropTaskExecNode(pJob, pTask, handle);
|
||||
}
|
||||
schUpdateTaskExecNode(pTask, handle, execIdx);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -672,7 +709,6 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
|
|||
|
||||
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
|
||||
int8_t status = 0;
|
||||
++pTask->tryTimes;
|
||||
|
||||
if (schJobNeedToStop(pJob, &status)) {
|
||||
*needRetry = false;
|
||||
|
@ -680,9 +716,19 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pTask->tryTimes >= REQUEST_MAX_TRY_TIMES) {
|
||||
if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
|
||||
pTask->maxExecTimes++;
|
||||
if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
|
||||
pTask->timeoutUsec *= 2;
|
||||
if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
|
||||
pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ((pTask->execIdx + 1) >= pTask->maxExecTimes) {
|
||||
*needRetry = false;
|
||||
SCH_TASK_DLOG("task no more retry since reach max try times, tryTimes:%d", pTask->tryTimes);
|
||||
SCH_TASK_DLOG("task no more retry since reach max try times, execIdx:%d", pTask->execIdx);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -694,16 +740,16 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
|
|||
|
||||
// TODO CHECK epList/condidateList
|
||||
if (SCH_IS_DATA_SRC_TASK(pTask)) {
|
||||
if (pTask->tryTimes >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
|
||||
if ((pTask->execIdx + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
|
||||
*needRetry = false;
|
||||
SCH_TASK_DLOG("task no more retry since all ep tried, tryTimes:%d, epNum:%d", pTask->tryTimes,
|
||||
SCH_TASK_DLOG("task no more retry since all ep tried, execIdx:%d, epNum:%d", pTask->execIdx,
|
||||
SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
|
||||
|
||||
if ((pTask->candidateIdx + 1) >= candidateNum) {
|
||||
if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) {
|
||||
*needRetry = false;
|
||||
SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
|
||||
pTask->candidateIdx, candidateNum);
|
||||
|
@ -712,7 +758,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
|
|||
}
|
||||
|
||||
*needRetry = true;
|
||||
SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->tryTimes, errCode, tstrerror(errCode));
|
||||
SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execIdx + 1, errCode, tstrerror(errCode));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -728,10 +774,15 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
|
|||
SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
|
||||
}
|
||||
|
||||
schDeregisterTaskHb(pJob, pTask);
|
||||
|
||||
if (SCH_IS_DATA_SRC_TASK(pTask)) {
|
||||
SCH_SWITCH_EPSET(&pTask->plan->execNode);
|
||||
} else {
|
||||
++pTask->candidateIdx;
|
||||
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
|
||||
if (++pTask->candidateIdx >= candidateNum) {
|
||||
pTask->candidateIdx = 0;
|
||||
}
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schLaunchTask(pJob, pTask));
|
||||
|
@ -906,6 +957,12 @@ void schProcessOnDataFetched(SSchJob *job) {
|
|||
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
|
||||
int8_t status = 0;
|
||||
|
||||
if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
|
||||
SCH_LOG_TASK_WAIT_TS(pTask);
|
||||
} else {
|
||||
SCH_LOG_TASK_END_TS(pTask);
|
||||
}
|
||||
|
||||
if (schJobNeedToStop(pJob, &status)) {
|
||||
SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status));
|
||||
SCH_RET(atomic_load_32(&pJob->errCode));
|
||||
|
@ -989,6 +1046,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
|||
|
||||
SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
|
||||
|
||||
SCH_LOG_TASK_END_TS(pTask);
|
||||
|
||||
SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved));
|
||||
|
||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
|
||||
|
@ -1105,6 +1164,92 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (NULL == pTask->execNodes) {
|
||||
SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
|
||||
|
||||
if (size <= 0) {
|
||||
SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
|
||||
return;
|
||||
}
|
||||
|
||||
SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
|
||||
while (nodeInfo) {
|
||||
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
|
||||
|
||||
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK);
|
||||
|
||||
nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("task has %d exec address", size);
|
||||
}
|
||||
|
||||
|
||||
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SCH_LOCK_TASK(pTask);
|
||||
if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask) {
|
||||
schDropTaskOnExecNode(pJob, pTask);
|
||||
taosHashClear(pTask->execNodes);
|
||||
schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR);
|
||||
}
|
||||
SCH_UNLOCK_TASK(pTask);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
|
||||
int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList);
|
||||
SSchTask *pTask = NULL;
|
||||
|
||||
qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn, pEpId->ep.port);
|
||||
|
||||
for (int32_t i = 0; i < taskNum; ++i) {
|
||||
STaskStatus *taskStatus = taosArrayGet(pStatusList, i);
|
||||
|
||||
SSchJob *pJob = schAcquireJob(taskStatus->refId);
|
||||
if (NULL == pJob) {
|
||||
qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
|
||||
taskStatus->queryId, taskStatus->taskId);
|
||||
// TODO DROP TASK FROM SERVER!!!!
|
||||
continue;
|
||||
}
|
||||
|
||||
SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId, jobTaskStatusStr(taskStatus->status));
|
||||
|
||||
pTask = NULL;
|
||||
schGetTaskInJob(pJob, taskStatus->taskId, &pTask);
|
||||
if (NULL == pTask) {
|
||||
// TODO DROP TASK FROM SERVER!!!!
|
||||
schReleaseJob(taskStatus->refId);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (taskStatus->status == JOB_TASK_STATUS_FAILED) {
|
||||
// RECORD AND HANDLE ERROR!!!!
|
||||
schReleaseJob(taskStatus->refId);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (taskStatus->status == JOB_TASK_STATUS_NOT_START && SCH_TASK_TIMEOUT(pTask)) {
|
||||
schRescheduleTask(pJob, pTask);
|
||||
}
|
||||
|
||||
schReleaseJob(taskStatus->refId);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
|
||||
if (rsp->tbFName[0]) {
|
||||
if (NULL == pJob->execRes.res) {
|
||||
|
@ -1156,25 +1301,16 @@ int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
|
||||
if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 ||
|
||||
taosArrayGetSize(pTask->execNodes) <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSchNodeInfo *nodeInfo = taosArrayGet(pTask->execNodes, 0);
|
||||
nodeInfo->handle = handle;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
|
||||
int8_t status = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
|
||||
|
||||
pTask->execIdx++;
|
||||
|
||||
SCH_LOG_TASK_START_TS(pTask);
|
||||
|
||||
if (schJobNeedToStop(pJob, &status)) {
|
||||
SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status));
|
||||
|
||||
|
@ -1257,29 +1393,6 @@ int32_t schLaunchJob(SSchJob *pJob) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (NULL == pTask->execNodes) {
|
||||
SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t size = (int32_t)taosArrayGetSize(pTask->execNodes);
|
||||
|
||||
if (size <= 0) {
|
||||
SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
|
||||
return;
|
||||
}
|
||||
|
||||
SSchNodeInfo *nodeInfo = NULL;
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
nodeInfo = (SSchNodeInfo *)taosArrayGet(pTask->execNodes, i);
|
||||
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
|
||||
|
||||
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("task has %d exec address", size);
|
||||
}
|
||||
|
||||
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
|
||||
if (!SCH_IS_NEED_DROP_JOB(pJob)) {
|
||||
|
@ -1332,7 +1445,7 @@ void schFreeJobImpl(void *job) {
|
|||
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
|
||||
for (int32_t j = 0; j < numOfTasks; ++j) {
|
||||
SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
|
||||
schFreeTask(pTask);
|
||||
schFreeTask(pJob, pTask);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pLevel->subTasks);
|
||||
|
|
|
@ -92,8 +92,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
|||
int8_t status = 0;
|
||||
|
||||
if (schJobNeedToStop(pJob, &status)) {
|
||||
SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status),
|
||||
rspCode);
|
||||
SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status), rspCode);
|
||||
taosMemoryFreeClear(msg);
|
||||
SCH_RET(atomic_load_32(&pJob->errCode));
|
||||
}
|
||||
|
@ -364,14 +363,26 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
|
|||
|
||||
SCH_ERR_JRET(schGetTaskInJob(pJob, pParam->taskId, &pTask));
|
||||
|
||||
SCH_LOCK_TASK(pTask);
|
||||
|
||||
SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
|
||||
|
||||
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, msgType, pMsg->handle, rspCode));
|
||||
if (pParam->execIdx != pTask->execIdx) {
|
||||
SCH_TASK_DLOG("execIdx %d mis-match current execIdx %d", pParam->execIdx, pTask->execIdx);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
||||
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execIdx));
|
||||
|
||||
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
|
||||
|
||||
_return:
|
||||
|
||||
if (pTask) {
|
||||
SCH_UNLOCK_TASK(pTask);
|
||||
}
|
||||
|
||||
if (pJob) {
|
||||
schReleaseJob(pParam->refId);
|
||||
}
|
||||
|
@ -426,7 +437,7 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c
|
|||
SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL};
|
||||
SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans));
|
||||
|
||||
SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId));
|
||||
SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId, NULL));
|
||||
} else {
|
||||
SCH_ERR_RET(schHandleCallback(param, pMsg, TDMT_SCH_LINK_BROKEN, code));
|
||||
}
|
||||
|
@ -454,7 +465,8 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, int32_t msgType,
|
|||
param->queryId = pJob->queryId;
|
||||
param->refId = pJob->refId;
|
||||
param->taskId = SCH_TASK_ID(pTask);
|
||||
param->transport = pJob->pTrans;
|
||||
param->pTrans = pJob->pTrans;
|
||||
param->execIdx = pTask->execIdx;
|
||||
|
||||
msgSendInfo->param = param;
|
||||
msgSendInfo->fp = fp;
|
||||
|
@ -661,7 +673,7 @@ int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *
|
|||
}
|
||||
|
||||
|
||||
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
|
||||
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) {
|
||||
SSchedulerHbReq req = {0};
|
||||
int32_t code = 0;
|
||||
SRpcCtx rpcCtx = {0};
|
||||
|
@ -717,7 +729,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
|
|||
__async_send_cb_fn_t fp = NULL;
|
||||
SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
|
||||
|
||||
param->transport = trans.pTrans;
|
||||
param->pTrans = trans.pTrans;
|
||||
|
||||
pMsgSendInfo->param = param;
|
||||
pMsgSendInfo->msgInfo.pData = msg;
|
||||
|
@ -763,15 +775,26 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
|
|||
strcpy(epId.ep.fqdn, pEp->fqdn);
|
||||
epId.ep.port = pEp->port;
|
||||
|
||||
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
|
||||
SSchHbTrans *hb = NULL;
|
||||
while (true) {
|
||||
hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
bool exist = false;
|
||||
SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist));
|
||||
if (!exist) {
|
||||
SCH_ERR_RET(schBuildAndSendHbMsg(&epId));
|
||||
SCH_ERR_RET(schBuildAndSendHbMsg(&epId, NULL));
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
atomic_add_fetch_64(&hb->taskNum, 1);
|
||||
|
||||
pTask->registerdHb = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -810,33 +833,12 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
|
|||
}
|
||||
|
||||
SSchTrans trans = {0};
|
||||
trans.pTrans = pParam->transport;
|
||||
trans.pTrans = pParam->pTrans;
|
||||
trans.pHandle = pMsg->handle;
|
||||
|
||||
SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
|
||||
|
||||
int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus);
|
||||
qDebug("%d task status in hb rsp, nodeId:%d, fqdn:%s, port:%d", taskNum, rsp.epId.nodeId, rsp.epId.ep.fqdn,
|
||||
rsp.epId.ep.port);
|
||||
|
||||
for (int32_t i = 0; i < taskNum; ++i) {
|
||||
STaskStatus *taskStatus = taosArrayGet(rsp.taskStatus, i);
|
||||
|
||||
SSchJob *pJob = schAcquireJob(taskStatus->refId);
|
||||
if (NULL == pJob) {
|
||||
qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
|
||||
taskStatus->queryId, taskStatus->taskId);
|
||||
// TODO DROP TASK FROM SERVER!!!!
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO
|
||||
|
||||
SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId,
|
||||
jobTaskStatusStr(taskStatus->status));
|
||||
|
||||
schReleaseJob(taskStatus->refId);
|
||||
}
|
||||
SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));
|
||||
|
||||
_return:
|
||||
|
||||
|
@ -856,7 +858,8 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
|
|||
param->queryId = pJob->queryId;
|
||||
param->refId = pJob->refId;
|
||||
param->taskId = SCH_TASK_ID(pTask);
|
||||
param->transport = pJob->pTrans;
|
||||
param->pTrans = pJob->pTrans;
|
||||
param->taskId = pTask->taskId;
|
||||
|
||||
*pParam = param;
|
||||
|
||||
|
@ -1158,7 +1161,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
(rpcCtx.args ? &rpcCtx : NULL)));
|
||||
|
||||
if (msgType == TDMT_VND_QUERY) {
|
||||
SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.pHandle));
|
||||
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execIdx));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -200,7 +200,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
|
||||
assert(pRollBackEntry != NULL);
|
||||
|
||||
// if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
|
||||
// if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) {
|
||||
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
||||
|
@ -229,7 +229,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
||||
if (ths->pFsm != NULL) {
|
||||
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta;
|
||||
cbMeta.index = pAppendEntry->index;
|
||||
|
@ -261,7 +261,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
||||
if (ths->pFsm != NULL) {
|
||||
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta;
|
||||
cbMeta.index = pAppendEntry->index;
|
||||
|
@ -346,7 +346,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
}
|
||||
|
||||
// config change
|
||||
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
|
||||
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
|
||||
|
||||
SSyncCfg newSyncCfg;
|
||||
|
|
|
@ -124,7 +124,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
// config change
|
||||
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
|
||||
SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
|
||||
|
||||
SSyncCfg newSyncCfg;
|
||||
|
|
|
@ -256,7 +256,7 @@ static void *syncIOConsumerFunc(void *param) {
|
|||
syncRpcMsgLog2((char *)"==syncIOConsumerFunc==", pRpcMsg);
|
||||
|
||||
// use switch case instead of if else
|
||||
if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
|
||||
if (pRpcMsg->msgType == TDMT_SYNC_PING) {
|
||||
if (io->FpOnSyncPing != NULL) {
|
||||
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
@ -264,7 +264,7 @@ static void *syncIOConsumerFunc(void *param) {
|
|||
syncPingDestroy(pSyncMsg);
|
||||
}
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
||||
if (io->FpOnSyncPingReply != NULL) {
|
||||
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
@ -272,7 +272,7 @@ static void *syncIOConsumerFunc(void *param) {
|
|||
syncPingReplyDestroy(pSyncMsg);
|
||||
}
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||
if (io->FpOnSyncClientRequest != NULL) {
|
||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
@ -280,7 +280,7 @@ static void *syncIOConsumerFunc(void *param) {
|
|||
syncClientRequestDestroy(pSyncMsg);
|
||||
}
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||
if (io->FpOnSyncRequestVote != NULL) {
|
||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
@ -288,7 +288,7 @@ static void *syncIOConsumerFunc(void *param) {
|
|||
syncRequestVoteDestroy(pSyncMsg);
|
||||
}
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
||||
if (io->FpOnSyncRequestVoteReply != NULL) {
|
||||
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
@ -296,7 +296,7 @@ static void *syncIOConsumerFunc(void *param) {
|
|||
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||
}
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
||||
if (io->FpOnSyncAppendEntries != NULL) {
|
||||
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
@ -304,7 +304,7 @@ static void *syncIOConsumerFunc(void *param) {
|
|||
syncAppendEntriesDestroy(pSyncMsg);
|
||||
}
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
||||
if (io->FpOnSyncAppendEntriesReply != NULL) {
|
||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
@ -312,7 +312,7 @@ static void *syncIOConsumerFunc(void *param) {
|
|||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||
}
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
||||
if (io->FpOnSyncTimeout != NULL) {
|
||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
@ -365,7 +365,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
}
|
||||
|
||||
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
if (pMsg->msgType == TDMT_VND_SYNC_COMMON_RESPONSE) {
|
||||
if (pMsg->msgType == TDMT_SYNC_COMMON_RESPONSE) {
|
||||
sTrace("==syncIOProcessReply==");
|
||||
} else {
|
||||
syncRpcMsgLog2((char *)"==syncIOProcessReply==", pMsg);
|
||||
|
|
|
@ -174,7 +174,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
|
|||
sInfo("==syncReconfig== newconfig:%s", configChange);
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE;
|
||||
rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE;
|
||||
rpcMsg.info.noResp = 1;
|
||||
rpcMsg.contLen = strlen(configChange) + 1;
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
|
@ -1399,7 +1399,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
|
|||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (ths->pFsm != NULL) {
|
||||
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta;
|
||||
cbMeta.index = pEntry->index;
|
||||
|
@ -1421,7 +1421,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
|
|||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (ths->pFsm != NULL) {
|
||||
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta;
|
||||
cbMeta.index = pEntry->index;
|
||||
|
|
|
@ -22,50 +22,50 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
|
|||
cJSON* pRoot;
|
||||
|
||||
// in compiler optimization, switch case = if else constants
|
||||
if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
|
||||
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
||||
SyncTimeout* pSyncMsg = syncTimeoutDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
pRoot = syncTimeout2Json(pSyncMsg);
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
|
||||
SyncPing* pSyncMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
pRoot = syncPing2Json(pSyncMsg);
|
||||
syncPingDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
||||
SyncPingReply* pSyncMsg = syncPingReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
pRoot = syncPingReply2Json(pSyncMsg);
|
||||
syncPingReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||
SyncClientRequest* pSyncMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
pRoot = syncClientRequest2Json(pSyncMsg);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST_REPLY) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_REPLY) {
|
||||
pRoot = syncRpcUnknownMsg2Json();
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||
SyncRequestVote* pSyncMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
pRoot = syncRequestVote2Json(pSyncMsg);
|
||||
syncRequestVoteDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
||||
SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
pRoot = syncRequestVoteReply2Json(pSyncMsg);
|
||||
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
||||
SyncAppendEntries* pSyncMsg = syncAppendEntriesDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
pRoot = syncAppendEntries2Json(pSyncMsg);
|
||||
syncAppendEntriesDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
||||
SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
pRoot = syncAppendEntriesReply2Json(pSyncMsg);
|
||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_COMMON_RESPONSE) {
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_COMMON_RESPONSE) {
|
||||
pRoot = cJSON_CreateObject();
|
||||
char* s;
|
||||
s = syncUtilprintBin((char*)(pRpcMsg->pCont), pRpcMsg->contLen);
|
||||
|
@ -98,7 +98,7 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
|
|||
|
||||
cJSON* syncRpcUnknownMsg2Json() {
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", TDMT_VND_SYNC_UNKNOWN);
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", TDMT_SYNC_UNKNOWN);
|
||||
cJSON_AddStringToObject(pRoot, "data", "unknown message");
|
||||
|
||||
cJSON* pJson = cJSON_CreateObject();
|
||||
|
@ -146,7 +146,7 @@ SyncTimeout* syncTimeoutBuild() {
|
|||
SyncTimeout* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->msgType = TDMT_VND_SYNC_TIMEOUT;
|
||||
pMsg->msgType = TDMT_SYNC_TIMEOUT;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
|
@ -275,7 +275,7 @@ SyncPing* syncPingBuild(uint32_t dataLen) {
|
|||
SyncPing* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->msgType = TDMT_VND_SYNC_PING;
|
||||
pMsg->msgType = TDMT_SYNC_PING;
|
||||
pMsg->dataLen = dataLen;
|
||||
return pMsg;
|
||||
}
|
||||
|
@ -535,7 +535,7 @@ SyncPingReply* syncPingReplyBuild(uint32_t dataLen) {
|
|||
SyncPingReply* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->msgType = TDMT_VND_SYNC_PING_REPLY;
|
||||
pMsg->msgType = TDMT_SYNC_PING_REPLY;
|
||||
pMsg->dataLen = dataLen;
|
||||
return pMsg;
|
||||
}
|
||||
|
@ -795,7 +795,7 @@ SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) {
|
|||
SyncClientRequest* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
|
||||
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
||||
pMsg->seqNum = 0;
|
||||
pMsg->isWeak = false;
|
||||
pMsg->dataLen = dataLen;
|
||||
|
@ -937,7 +937,7 @@ SyncRequestVote* syncRequestVoteBuild(int32_t vgId) {
|
|||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->msgType = TDMT_VND_SYNC_REQUEST_VOTE;
|
||||
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
|
@ -1086,7 +1086,7 @@ SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId) {
|
|||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->msgType = TDMT_VND_SYNC_REQUEST_VOTE_REPLY;
|
||||
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
|
@ -1232,7 +1232,7 @@ SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen, int32_t vgId) {
|
|||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->msgType = TDMT_VND_SYNC_APPEND_ENTRIES;
|
||||
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
|
||||
pMsg->dataLen = dataLen;
|
||||
return pMsg;
|
||||
}
|
||||
|
@ -1398,7 +1398,7 @@ SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) {
|
|||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->msgType = TDMT_VND_SYNC_APPEND_ENTRIES_REPLY;
|
||||
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
|
@ -1546,7 +1546,7 @@ SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) {
|
|||
SyncApplyMsg* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->msgType = TDMT_VND_SYNC_APPLY_MSG;
|
||||
pMsg->msgType = TDMT_SYNC_APPLY_MSG;
|
||||
pMsg->dataLen = dataLen;
|
||||
return pMsg;
|
||||
}
|
||||
|
|
|
@ -59,14 +59,14 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId)
|
|||
memset(&rpcMsg, 0, sizeof(SRpcMsg));
|
||||
rpcMsg.contLen = head.contLen;
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
rpcMsg.msgType = TDMT_VND_SYNC_NOOP;
|
||||
rpcMsg.msgType = TDMT_SYNC_NOOP;
|
||||
memcpy(rpcMsg.pCont, &head, sizeof(head));
|
||||
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild(rpcMsg.contLen);
|
||||
assert(pEntry != NULL);
|
||||
|
||||
pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
|
||||
pEntry->originalRpcType = TDMT_VND_SYNC_NOOP;
|
||||
pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
||||
pEntry->originalRpcType = TDMT_SYNC_NOOP;
|
||||
pEntry->seqNum = 0;
|
||||
pEntry->isWeak = 0;
|
||||
pEntry->term = term;
|
||||
|
|
|
@ -104,7 +104,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
|||
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
|
||||
assert(pEntry != NULL);
|
||||
|
||||
pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
|
||||
pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
||||
pEntry->originalRpcType = pWalHandle->pHead->head.msgType;
|
||||
pEntry->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
|
||||
pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
|
||||
|
|
|
@ -215,28 +215,28 @@ void syncUtilMsgNtoH(void* msg) {
|
|||
}
|
||||
|
||||
bool syncUtilIsData(tmsg_t msgType) {
|
||||
if (msgType == TDMT_VND_SYNC_NOOP || msgType == TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||
if (msgType == TDMT_SYNC_NOOP || msgType == TDMT_SYNC_CONFIG_CHANGE) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool syncUtilUserPreCommit(tmsg_t msgType) {
|
||||
if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool syncUtilUserCommit(tmsg_t msgType) {
|
||||
if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool syncUtilUserRollback(tmsg_t msgType) {
|
||||
if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
|
|
@ -444,6 +444,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_VG_META_MISMATCH, "table meta and vgroup
|
|||
//scheduler
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_INTERNAL_ERROR, "scheduler internal error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_TIMEOUT_ERROR, "Task timeout")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QW_MSG_ERROR, "Invalid msg order")
|
||||
|
||||
// parser
|
||||
|
|
Loading…
Reference in New Issue