diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index fe9ffda69c..a4bbd48434 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -67,7 +67,6 @@ enum { enum { #endif - // Requests handled by DNODE TD_NEW_MSG_SEG(TDMT_DND_MSG) TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_DROP_MNODE, "dnode-drop-mnode", NULL, NULL) @@ -82,8 +81,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_DND_SERVER_STATUS, "server-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_NET_TEST, "net-test", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL) - - // Requests handled by MNODE + TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ACCT, "create-acct", NULL, NULL) @@ -103,6 +101,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_CREATE_QNODE, "create-qnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_QNODE, "alter-qnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_QNODE, "drop-qnode", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "qnode-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SNODE, "create-snode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_SNODE, "alter-snode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_SNODE, "drop-snode", NULL, NULL) @@ -115,49 +114,48 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_ALTER_DB, "alter-db", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SYNC_DB, "sync-db", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_DB, "compact-db", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "get-db-cfg", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "vgroup-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_FUNC, "create-func", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_FUNC, "retrieve-func", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_FUNC, "drop-func", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "create-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "drop-stb", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SMA, "create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_SMA, "drop-sma", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "table-meta", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "vgroup-list", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "qnode-list", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "heartbeat", SClientHbBatchReq, SClientHbBatchRsp) - TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "show", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "systable-retrieve", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "status", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) - TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) - TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "mq-ask-ep", SMqAskEpReq, SMqAskEpRsp) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mq-consumer-lost", SMqConsumerLostMsg, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "mq-consumer-recover", SMqConsumerRecoverMsg, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mq-do-rebalance", SMqDoRebalanceMsg, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "mq-drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "create-stream", SCMCreateStreamReq, SCMCreateStreamRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "alter-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "drop-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_INDEX, "create-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "get-db-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "apply-msg", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) + TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) + TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "heartbeat", SClientHbBatchReq, SClientHbBatchRsp) + TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "status", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "show", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "retrieve", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CONFIRM_WRITE, "mnode-confirm-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL) - // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_QUERY, "query", NULL, NULL) @@ -182,67 +180,40 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) -// TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp) -// TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp) TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "vnode-query-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_GET_TSMA_EXP_WNDS, "vnode-get-tsma-expired-windows", SVGetTsmaExpWndsReq, SVGetTsmaExpWndsRsp) + TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_START_WRITE, "start-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STOP_WRITE, "stop-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_CONFIRM_WRITE, "confirm-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "vnode-sync-timeout", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING, "vnode-sync-ping", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING_REPLY, "vnode-sync-ping-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST, "vnode-sync-client-request", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, "vnode-sync-client-request-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE, "vnode-sync-request-vote", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE_REPLY, "vnode-sync-request-vote-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES, "vnode-sync-append-entries", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, "vnode-sync-append-entries-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_NOOP, "vnode-sync-noop", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_UNKNOWN, "vnode-sync-unknown", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_COMMON_RESPONSE, "vnode-sync-common-response", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "vnode-sync-apply-msg", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "vnode-sync-config-change", NULL, NULL) - - TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "vnode-alter-config", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "vnode-alter-replica", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "vnode-compact", NULL, NULL) - - TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "vnode-delete-data", SVDeleteReq, SVDeleteRsp) - - // Requests handled by QNODE TD_NEW_MSG_SEG(TDMT_QND_MSG) - // Requests handled by SNODE TD_NEW_MSG_SEG(TDMT_SND_MSG) TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) - //TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) - //TD_DEF_MSG_TYPE(TDMT_SND_TASK_PIPE_EXEC, "snode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) - //TD_DEF_MSG_TYPE(TDMT_SND_TASK_MERGE_EXEC, "snode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) - TD_DEF_MSG_TYPE(TDMT_SND_TASK_RUN, "snode-stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SND_TASK_DISPATCH, "snode-stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SND_TASK_RECOVER, "snode-stream-task-recover", NULL, NULL) - // Requests handled by SCHEDULER TD_NEW_MSG_SEG(TDMT_SCH_MSG) - TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "scheduler-link-broken", NULL, NULL) - - // Monitor info exchange between processes + TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) + TD_NEW_MSG_SEG(TDMT_MON_MSG) TD_DEF_MSG_TYPE(TDMT_MON_MM_INFO, "monitor-minfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_VM_INFO, "monitor-vinfo", NULL, NULL) @@ -252,6 +223,22 @@ enum { TD_DEF_MSG_TYPE(TDMT_MON_VM_LOAD, "monitor-vload", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_MM_LOAD, "monitor-mload", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL) + + TD_NEW_MSG_SEG(TDMT_SYNC_MSG) + TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timeout", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_REPLY, "sync-client-request-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE, "sync-request-vote", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE_REPLY, "sync-request-vote-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES, "sync-append-entries", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES_REPLY, "sync-append-entries-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_NOOP, "sync-noop", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_UNKNOWN, "sync-unknown", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_COMMON_RESPONSE, "sync-common-response", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_APPLY_MSG, "sync-apply-msg", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_CONFIG_CHANGE, "sync-config-change", NULL, NULL) #if defined(TD_MSG_NUMBER_) TDMT_MAX diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 8d0d503d8f..bbdafbdfe1 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -223,15 +223,15 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index f08ac6b9d0..0819f79cf9 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -365,15 +365,15 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index bedf473379..653ec75ff6 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -163,7 +163,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO SRpcMsg rsp = {0}; // get original rpc msg - assert(pMsg->msgType == TDMT_VND_SYNC_APPLY_MSG); + assert(pMsg->msgType == TDMT_SYNC_APPLY_MSG); SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(pMsg); syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg); SRpcMsg originalRpcMsg; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 3a3fd7ebdb..d989ba6ec3 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -376,35 +376,35 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { syncRpcMsgLog2(logBuf, pMsg); taosMemoryFree(syncNodeStr); - if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) { + if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); syncTimeoutDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_PING) { + } else if (pMsg->msgType == TDMT_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); code = syncNodeOnPingCb(pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_PING_REPLY) { + } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) { + } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg); syncClientRequestDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) { + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); syncRequestVoteDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) { + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) { + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); syncAppendEntriesDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) { + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); @@ -625,7 +625,7 @@ int32_t mndAcquireRpcRef(SMnode *pMnode) { code = -1; } else { int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1); - mTrace("mnode rpc is acquired, ref:%d", ref); + // mTrace("mnode rpc is acquired, ref:%d", ref); } taosThreadRwlockUnlock(&pMnode->lock); return code; @@ -634,7 +634,7 @@ int32_t mndAcquireRpcRef(SMnode *pMnode) { void mndReleaseRpcRef(SMnode *pMnode) { taosThreadRwlockRdlock(&pMnode->lock); int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1); - mTrace("mnode rpc is released, ref:%d", ref); + // mTrace("mnode rpc is released, ref:%d", ref); taosThreadRwlockUnlock(&pMnode->lock); } @@ -675,7 +675,7 @@ int32_t mndAcquireSyncRef(SMnode *pMnode) { code = -1; } else { int32_t ref = atomic_add_fetch_32(&pMnode->syncRef, 1); - mTrace("mnode sync is acquired, ref:%d", ref); + // mTrace("mnode sync is acquired, ref:%d", ref); } taosThreadRwlockUnlock(&pMnode->lock); return code; @@ -684,6 +684,6 @@ int32_t mndAcquireSyncRef(SMnode *pMnode) { void mndReleaseSyncRef(SMnode *pMnode) { taosThreadRwlockRdlock(&pMnode->lock); int32_t ref = atomic_sub_fetch_32(&pMnode->syncRef, 1); - mTrace("mnode sync is released, ref:%d", ref); + // mTrace("mnode sync is released, ref:%d", ref); taosThreadRwlockUnlock(&pMnode->lock); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 4b0016a746..212718eef1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -279,56 +279,56 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { SRpcMsg *pRpcMsg = pMsg; - if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) { + if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); syncTimeoutDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) { + } else if (pRpcMsg->msgType == TDMT_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) { + } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg); syncClientRequestDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) { + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); syncRequestVoteDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) { + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); syncAppendEntriesDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 3afe7b15e2..89dcd8a476 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -200,7 +200,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index); assert(pRollBackEntry != NULL); - // if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) { + // if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) { if (syncUtilUserRollback(pRollBackEntry->msgType)) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); @@ -229,7 +229,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pAppendEntry->index; @@ -261,7 +261,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pAppendEntry->index; @@ -346,7 +346,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } // config change - if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { + if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) { SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg; SSyncCfg newSyncCfg; diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 4a1a40a2d7..c6376495a4 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -124,7 +124,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { } // config change - if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { + if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) { SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg; SSyncCfg newSyncCfg; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index e30a39e634..aa8484de99 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -256,7 +256,7 @@ static void *syncIOConsumerFunc(void *param) { syncRpcMsgLog2((char *)"==syncIOConsumerFunc==", pRpcMsg); // use switch case instead of if else - if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) { + if (pRpcMsg->msgType == TDMT_SYNC_PING) { if (io->FpOnSyncPing != NULL) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -264,7 +264,7 @@ static void *syncIOConsumerFunc(void *param) { syncPingDestroy(pSyncMsg); } - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { if (io->FpOnSyncPingReply != NULL) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -272,7 +272,7 @@ static void *syncIOConsumerFunc(void *param) { syncPingReplyDestroy(pSyncMsg); } - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) { + } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { if (io->FpOnSyncClientRequest != NULL) { SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -280,7 +280,7 @@ static void *syncIOConsumerFunc(void *param) { syncClientRequestDestroy(pSyncMsg); } - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) { + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { if (io->FpOnSyncRequestVote != NULL) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -288,7 +288,7 @@ static void *syncIOConsumerFunc(void *param) { syncRequestVoteDestroy(pSyncMsg); } - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { if (io->FpOnSyncRequestVoteReply != NULL) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -296,7 +296,7 @@ static void *syncIOConsumerFunc(void *param) { syncRequestVoteReplyDestroy(pSyncMsg); } - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) { + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { if (io->FpOnSyncAppendEntries != NULL) { SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -304,7 +304,7 @@ static void *syncIOConsumerFunc(void *param) { syncAppendEntriesDestroy(pSyncMsg); } - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { if (io->FpOnSyncAppendEntriesReply != NULL) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -312,7 +312,7 @@ static void *syncIOConsumerFunc(void *param) { syncAppendEntriesReplyDestroy(pSyncMsg); } - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) { + } else if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { if (io->FpOnSyncTimeout != NULL) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -365,7 +365,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { } static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - if (pMsg->msgType == TDMT_VND_SYNC_COMMON_RESPONSE) { + if (pMsg->msgType == TDMT_SYNC_COMMON_RESPONSE) { sTrace("==syncIOProcessReply=="); } else { syncRpcMsgLog2((char *)"==syncIOProcessReply==", pMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 66806dbd0c..d522b829dd 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -174,7 +174,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { sInfo("==syncReconfig== newconfig:%s", configChange); SRpcMsg rpcMsg = {0}; - rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE; + rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE; rpcMsg.info.noResp = 1; rpcMsg.contLen = strlen(configChange) + 1; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); @@ -1399,7 +1399,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -1421,7 +1421,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index fae069f2e6..6871e6b3ed 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -22,50 +22,50 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { cJSON* pRoot; // in compiler optimization, switch case = if else constants - if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) { + if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { SyncTimeout* pSyncMsg = syncTimeoutDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncTimeout2Json(pSyncMsg); syncTimeoutDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) { + } else if (pRpcMsg->msgType == TDMT_SYNC_PING) { SyncPing* pSyncMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncPing2Json(pSyncMsg); syncPingDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply* pSyncMsg = syncPingReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncPingReply2Json(pSyncMsg); syncPingReplyDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) { + } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { SyncClientRequest* pSyncMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncClientRequest2Json(pSyncMsg); syncClientRequestDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_REPLY) { pRoot = syncRpcUnknownMsg2Json(); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) { + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { SyncRequestVote* pSyncMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncRequestVote2Json(pSyncMsg); syncRequestVoteDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncRequestVoteReply2Json(pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) { + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { SyncAppendEntries* pSyncMsg = syncAppendEntriesDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncAppendEntries2Json(pSyncMsg); syncAppendEntriesDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncAppendEntriesReply2Json(pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_COMMON_RESPONSE) { + } else if (pRpcMsg->msgType == TDMT_SYNC_COMMON_RESPONSE) { pRoot = cJSON_CreateObject(); char* s; s = syncUtilprintBin((char*)(pRpcMsg->pCont), pRpcMsg->contLen); @@ -98,7 +98,7 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { cJSON* syncRpcUnknownMsg2Json() { cJSON* pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "msgType", TDMT_VND_SYNC_UNKNOWN); + cJSON_AddNumberToObject(pRoot, "msgType", TDMT_SYNC_UNKNOWN); cJSON_AddStringToObject(pRoot, "data", "unknown message"); cJSON* pJson = cJSON_CreateObject(); @@ -146,7 +146,7 @@ SyncTimeout* syncTimeoutBuild() { SyncTimeout* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; - pMsg->msgType = TDMT_VND_SYNC_TIMEOUT; + pMsg->msgType = TDMT_SYNC_TIMEOUT; return pMsg; } @@ -275,7 +275,7 @@ SyncPing* syncPingBuild(uint32_t dataLen) { SyncPing* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; - pMsg->msgType = TDMT_VND_SYNC_PING; + pMsg->msgType = TDMT_SYNC_PING; pMsg->dataLen = dataLen; return pMsg; } @@ -535,7 +535,7 @@ SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { SyncPingReply* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; - pMsg->msgType = TDMT_VND_SYNC_PING_REPLY; + pMsg->msgType = TDMT_SYNC_PING_REPLY; pMsg->dataLen = dataLen; return pMsg; } @@ -795,7 +795,7 @@ SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { SyncClientRequest* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; - pMsg->msgType = TDMT_VND_SYNC_CLIENT_REQUEST; + pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; pMsg->seqNum = 0; pMsg->isWeak = false; pMsg->dataLen = dataLen; @@ -937,7 +937,7 @@ SyncRequestVote* syncRequestVoteBuild(int32_t vgId) { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; - pMsg->msgType = TDMT_VND_SYNC_REQUEST_VOTE; + pMsg->msgType = TDMT_SYNC_REQUEST_VOTE; return pMsg; } @@ -1086,7 +1086,7 @@ SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId) { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; - pMsg->msgType = TDMT_VND_SYNC_REQUEST_VOTE_REPLY; + pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY; return pMsg; } @@ -1232,7 +1232,7 @@ SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen, int32_t vgId) { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; - pMsg->msgType = TDMT_VND_SYNC_APPEND_ENTRIES; + pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES; pMsg->dataLen = dataLen; return pMsg; } @@ -1398,7 +1398,7 @@ SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; - pMsg->msgType = TDMT_VND_SYNC_APPEND_ENTRIES_REPLY; + pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY; return pMsg; } @@ -1546,7 +1546,7 @@ SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) { SyncApplyMsg* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; - pMsg->msgType = TDMT_VND_SYNC_APPLY_MSG; + pMsg->msgType = TDMT_SYNC_APPLY_MSG; pMsg->dataLen = dataLen; return pMsg; } diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 8755f71654..05a2dbaa3f 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -59,14 +59,14 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) memset(&rpcMsg, 0, sizeof(SRpcMsg)); rpcMsg.contLen = head.contLen; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - rpcMsg.msgType = TDMT_VND_SYNC_NOOP; + rpcMsg.msgType = TDMT_SYNC_NOOP; memcpy(rpcMsg.pCont, &head, sizeof(head)); SSyncRaftEntry* pEntry = syncEntryBuild(rpcMsg.contLen); assert(pEntry != NULL); - pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST; - pEntry->originalRpcType = TDMT_VND_SYNC_NOOP; + pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST; + pEntry->originalRpcType = TDMT_SYNC_NOOP; pEntry->seqNum = 0; pEntry->isWeak = 0; pEntry->term = term; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index a6397f8cba..b353ed85db 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -104,7 +104,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); assert(pEntry != NULL); - pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST; + pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST; pEntry->originalRpcType = pWalHandle->pHead->head.msgType; pEntry->seqNum = pWalHandle->pHead->head.syncMeta.seqNum; pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index d754acd9f8..48567b75c2 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -215,28 +215,28 @@ void syncUtilMsgNtoH(void* msg) { } bool syncUtilIsData(tmsg_t msgType) { - if (msgType == TDMT_VND_SYNC_NOOP || msgType == TDMT_VND_SYNC_CONFIG_CHANGE) { + if (msgType == TDMT_SYNC_NOOP || msgType == TDMT_SYNC_CONFIG_CHANGE) { return false; } return true; } bool syncUtilUserPreCommit(tmsg_t msgType) { - if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) { + if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) { return true; } return false; } bool syncUtilUserCommit(tmsg_t msgType) { - if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) { + if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) { return true; } return false; } bool syncUtilUserRollback(tmsg_t msgType) { - if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) { + if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) { return true; } return false;