diff --git a/docs/zh/12-taos-sql/24-show.md b/docs/zh/12-taos-sql/24-show.md index db9e3fd136..4bd1e52284 100644 --- a/docs/zh/12-taos-sql/24-show.md +++ b/docs/zh/12-taos-sql/24-show.md @@ -211,10 +211,10 @@ SHOW USERS; 显示当前系统中所有用户的信息。包括用户自定义的用户和系统默认用户。 -## SHOW VARIABLES +## SHOW CLUSTER VARIABLES(3.0.1.6 之前为 SHOW VARIABLES) ```sql -SHOW VARIABLES; +SHOW CLUSTER VARIABLES; SHOW DNODE dnode_id VARIABLES; ``` diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 7df6124152..99bbfde3e1 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -126,6 +126,9 @@ extern char tsSmlChildTableName[]; extern char tsSmlTagName[]; extern bool tsSmlDataFormat; +// wal +extern int64_t tsWalFsyncDataSizeLimit; + // internal extern int32_t tsTransPullupInterval; extern int32_t tsMqRebalanceInterval; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 67c2a9081c..76b13579c1 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -57,10 +57,11 @@ extern int32_t tMsgDict[]; #define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) #define TMSG_INFO(TYPE) \ ((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || (TYPE) < TDMT_SCH_MAX_MSG || \ - (TYPE) < TDMT_VND_TMQ_MAX_MSG || (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_VND_STREAM_MAX_MSG || \ - (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) \ + (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) || \ + (TYPE) < TDMT_VND_STREAM_MSG || (TYPE) < TDMT_VND_TMQ_MSG \ ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \ : 0 + #define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) typedef uint16_t tmsg_t; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 3d9d55b101..70145b434d 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -139,6 +139,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) + TD_DEF_MSG_TYPE(TDMT_MND_UNUSED1, "unused", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp) @@ -147,7 +148,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_UNUSED2, "unused2", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "mq-tmr", SMTimerReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL) @@ -184,6 +186,21 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED1, "vnode-unused1", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED2, "vnode-unused2", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED3, "vnode-unused3", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED4, "vnode-unused4", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED5, "vnode-unused5", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED6, "vnode-unused6", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED7, "vnode-unused7", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED8, "vnode-unused8", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED9, "vnode-unused9", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED10, "vnode-unused10", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED11, "vnode-unused11", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED12, "vnode-unused12", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED13, "vnode-unused13", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED14, "vnode-unused14", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED15, "vnode-unused15", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) @@ -215,30 +232,17 @@ enum { TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL) - TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "vnode-stream-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) - TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) - TD_NEW_MSG_SEG(TDMT_MON_MSG) TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL) @@ -270,6 +274,22 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) + TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) + + TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL) + + #if defined(TD_MSG_NUMBER_) TDMT_MAX #endif diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index fc926cb1b4..dbe8a347da 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -132,27 +132,27 @@ typedef struct SSnapshotMeta { typedef struct SSyncFSM { void* data; - void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); - void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); - void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); + void (*FpCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta *pMeta); + void (*FpPreCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); + void (*FpRollBackCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); - void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); - void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta); - void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); + void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm); + void (*FpReConfigCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SReConfigCbMeta* pMeta); + void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); - void (*FpBecomeLeaderCb)(struct SSyncFSM* pFsm); - void (*FpBecomeFollowerCb)(struct SSyncFSM* pFsm); + void (*FpBecomeLeaderCb)(const struct SSyncFSM* pFsm); + void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm); - int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader); - int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); + int32_t (*FpGetSnapshot)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader); + int32_t (*FpGetSnapshotInfo)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot); - int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader); - int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader); - int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len); + int32_t (*FpSnapshotStartRead)(const struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader); + int32_t (*FpSnapshotStopRead)(const struct SSyncFSM* pFsm, void* pReader); + int32_t (*FpSnapshotDoRead)(const struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len); - int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter); - int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot); - int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len); + int32_t (*FpSnapshotStartWrite)(const struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter); + int32_t (*FpSnapshotStopWrite)(const struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot); + int32_t (*FpSnapshotDoWrite)(const struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len); } SSyncFSM; @@ -193,9 +193,13 @@ typedef struct SSyncInfo { SWal* pWal; SSyncFSM* pFsm; SMsgCb* msgcb; - int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); - int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); - int32_t (*FpEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); + int32_t pingMs; + int32_t electMs; + int32_t heartbeatMs; + + int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg); + int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); + int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); } SSyncInfo; int32_t syncInit(); @@ -228,6 +232,8 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); +const char* syncUtilState2String(ESyncState state); + #ifdef __cplusplus } #endif diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 9cb8a9d564..9586d1febb 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -28,22 +28,11 @@ typedef struct SRaftId { SyncGroupId vgId; } SRaftId; -int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); -int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo); -void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb); char* sync2SimpleStr(int64_t rid); -// set timer ms -void setPingTimerMS(int64_t rid, int32_t pingTimerMS); -void setElectTimerMS(int64_t rid, int32_t electTimerMS); -void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS); - // for compatibility, the same as syncPropose int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak); -// utils -const char* syncUtilState2String(ESyncState state); - // ------------------ for debug ------------------- void syncRpcMsgPrint(SRpcMsg* pMsg); void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 08dba5d50d..ed85fd3517 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -43,7 +43,6 @@ extern "C" { #define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL #define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) -#define WAL_RECOV_SIZE_LIMIT (100 * WAL_SCAN_BUF_SIZE) typedef enum { TAOS_WAL_WRITE = 1, @@ -159,6 +158,7 @@ void walCleanUp(); // handle open and ctl SWal *walOpen(const char *path, SWalCfg *pCfg); int32_t walAlter(SWal *, SWalCfg *pCfg); +int32_t walPersist(SWal *); void walClose(SWal *); // write interfaces diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 660730f0dc..7f4a826c5e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -156,6 +156,9 @@ char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPR // udf bool tsStartUdfd = true; +// wal +int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L); + // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; @@ -423,6 +426,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1; + if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, 0) != 0) + return -1; + if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1; @@ -722,6 +728,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32; tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32; + tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64; + tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath)); diff --git a/source/dnode/mgmt/node_util/src/dmUtil.c b/source/dnode/mgmt/node_util/src/dmUtil.c index 80bb8debd2..648a9ab9ce 100644 --- a/source/dnode/mgmt/node_util/src/dmUtil.c +++ b/source/dnode/mgmt/node_util/src/dmUtil.c @@ -55,7 +55,7 @@ void *dmSetMgmtHandle(SArray *pArray, tmsg_t msgType, void *nodeMsgFp, bool need } void dmGetMonitorSystemInfo(SMonSysInfo *pInfo) { - taosGetCpuUsage(&pInfo->cpu_engine, &pInfo->cpu_system); + taosGetCpuUsage(&pInfo->cpu_system, &pInfo->cpu_engine); taosGetCpuCores(&pInfo->cpu_cores); taosGetProcMemory(&pInfo->mem_engine); taosGetSysMemory(&pInfo->mem_system); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index ad2a3ec447..a0f3c98f83 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -20,7 +20,6 @@ #include "sdb.h" #include "sync.h" -#include "syncTools.h" #include "tcache.h" #include "tdatablock.h" #include "tglobal.h" @@ -90,7 +89,6 @@ typedef struct { int32_t errCode; int32_t transId; SRWLatch lock; - int8_t leaderTransferFinish; int8_t selfIndex; int8_t numOfReplicas; SReplica replicas[TSDB_MAX_REPLICA]; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index aee1a7a75c..df999316eb 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -66,7 +66,7 @@ int32_t mndInitConsumer(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq); - mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); + mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 891f2bbcd8..76f440f536 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -105,7 +105,7 @@ static void mndCalMqRebalance(SMnode *pMnode) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { - SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; + SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } } @@ -428,18 +428,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { void mndPreClose(SMnode *pMnode) { if (pMnode != NULL) { - atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0); syncLeaderTransfer(pMnode->syncMgmt.sync); - -#if 0 - mInfo("vgId:1, mnode start leader transfer"); - // wait for leader transfer finish - while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) { - taosMsleep(10); - mInfo("vgId:1, mnode waiting for leader transfer"); - } - mInfo("vgId:1, mnode finish leader transfer"); -#endif } } @@ -501,7 +490,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; const char *role = syncGetMyRoleStr(pMnode->syncMgmt.sync); bool restored = syncIsRestoreFinish(pMnode->syncMgmt.sync); - if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || + if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER) { mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 6853ff5ccc..3fbe92b264 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -72,25 +72,25 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return code; } -void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +void mndSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { SMnode *pMnode = pFsm->data; SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSdbRaw *pRaw = pMsg->pCont; // delete msg handle SRpcMsg rpcMsg = {0}; - syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info); + rpcMsg.info = pMsg->info; int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); - pMgmt->errCode = cbMeta.code; + pMgmt->errCode = pMeta->code; mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 " role:%s raw:%p", - transId, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex, syncStr(cbMeta.state), + transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state), pRaw); if (pMgmt->errCode == 0) { sdbWriteWithoutFree(pMnode->pSdb, pRaw); - sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex); + sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); } taosWLockLatch(&pMgmt->lock); @@ -120,7 +120,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM } } -int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) { +int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) { mInfo("start to read snapshot from sdb in atomic way"); SMnode *pMnode = pFsm->data; return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, @@ -128,13 +128,13 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pR return 0; } -int32_t mndSyncGetSnapshotInfo(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { +int32_t mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { SMnode *pMnode = pFsm->data; sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); return 0; } -void mndRestoreFinish(struct SSyncFSM *pFsm) { +void mndRestoreFinish(const SSyncFSM *pFsm) { SMnode *pMnode = pFsm->data; if (!pMnode->deploy) { @@ -146,32 +146,30 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { } } -void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {} - -int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { +int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) { mInfo("start to read snapshot from sdb"); SMnode *pMnode = pFsm->data; return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL); } -int32_t mndSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { +int32_t mndSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) { mInfo("stop to read snapshot from sdb"); SMnode *pMnode = pFsm->data; return sdbStopRead(pMnode->pSdb, pReader); } -int32_t mndSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { +int32_t mndSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { SMnode *pMnode = pFsm->data; return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len); } -int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { +int32_t mndSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) { mInfo("start to apply snapshot to sdb"); SMnode *pMnode = pFsm->data; return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter); } -int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { +int32_t mndSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex); SMnode *pMnode = pFsm->data; @@ -179,18 +177,12 @@ int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, pSnapshot->lastConfigIndex); } -int32_t mndSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { +int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { SMnode *pMnode = pFsm->data; return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len); } -void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SMnode *pMnode = pFsm->data; - atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 1); - mInfo("vgId:1, mnode leader transfer finish"); -} - -static void mndBecomeFollower(struct SSyncFSM *pFsm) { +static void mndBecomeFollower(const SSyncFSM *pFsm) { SMnode *pMnode = pFsm->data; mInfo("vgId:1, become follower"); @@ -205,7 +197,7 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) { taosWUnLockLatch(&pMnode->syncMgmt.lock); } -static void mndBecomeLeader(struct SSyncFSM *pFsm) { +static void mndBecomeLeader(const SSyncFSM *pFsm) { mInfo("vgId:1, become leader"); SMnode *pMnode = pFsm->data; } @@ -217,8 +209,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { pFsm->FpPreCommitCb = NULL; pFsm->FpRollBackCb = NULL; pFsm->FpRestoreFinishCb = mndRestoreFinish; - pFsm->FpLeaderTransferCb = mndLeaderTransfer; - pFsm->FpReConfigCb = mndReConfig; + pFsm->FpLeaderTransferCb = NULL; + pFsm->FpReConfigCb = NULL; pFsm->FpBecomeLeaderCb = mndBecomeLeader; pFsm->FpBecomeFollowerCb = mndBecomeFollower; pFsm->FpGetSnapshot = mndSyncGetSnapshot; @@ -242,10 +234,13 @@ int32_t mndInitSync(SMnode *pMnode) { .batchSize = 1, .vgId = 1, .pWal = pMnode->pWal, - .msgcb = NULL, - .FpSendMsg = mndSyncSendMsg, - .FpEqMsg = mndSyncEqMsg, - .FpEqCtrlMsg = mndSyncEqCtrlMsg, + .msgcb = &pMnode->msgCb, + .syncSendMSg = mndSyncSendMsg, + .syncEqMsg = mndSyncEqMsg, + .syncEqCtrlMsg = mndSyncEqCtrlMsg, + .pingMs = 5000, + .electMs = 3000, + .heartbeatMs = 500, }; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); @@ -269,11 +264,6 @@ int32_t mndInitSync(SMnode *pMnode) { return -1; } - // decrease election timer - setPingTimerMS(pMgmt->sync, 5000); - setElectTimerMS(pMgmt->sync, 3000); - setHeartbeatTimerMS(pMgmt->sync, 500); - mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync); return 0; } @@ -289,32 +279,26 @@ void mndCleanupSync(SMnode *pMnode) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; - if (req.contLen <= 0) { - terrno = TSDB_CODE_APP_ERROR; - return -1; - } + pMgmt->errCode = 0; + SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; req.pCont = rpcMallocCont(req.contLen); if (req.pCont == NULL) return -1; memcpy(req.pCont, pRaw, req.contLen); - pMgmt->errCode = 0; taosWLockLatch(&pMgmt->lock); if (pMgmt->transId != 0) { - mError("trans:%d, can't be proposed since trans:%d alrady waiting for confirm", transId, pMgmt->transId); + mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); taosWUnLockLatch(&pMgmt->lock); terrno = TSDB_CODE_APP_NOT_READY; return -1; - } else { - pMgmt->transId = transId; - mInfo("trans:%d, will be proposed", pMgmt->transId); - taosWUnLockLatch(&pMgmt->lock); } - const bool isWeak = false; - int32_t code = syncPropose(pMgmt->sync, &req, isWeak); + mInfo("trans:%d, will be proposed", transId); + pMgmt->transId = transId; + taosWUnLockLatch(&pMgmt->lock); + int32_t code = syncPropose(pMgmt->sync, &req, false); if (code == 0) { mInfo("trans:%d, is proposing and wait sem", pMgmt->transId); tsem_wait(&pMgmt->syncSem); @@ -327,8 +311,8 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); code = 0; } else { - taosWLockLatch(&pMgmt->lock); mInfo("trans:%d, failed to proposed since %s", transId, terrstr()); + taosWLockLatch(&pMgmt->lock); pMgmt->transId = 0; taosWUnLockLatch(&pMgmt->lock); if (terrno == TSDB_CODE_SYN_NOT_LEADER) { @@ -344,13 +328,12 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { return code; } - if (pMgmt->errCode != 0) terrno = pMgmt->errCode; + terrno = pMgmt->errCode; return pMgmt->errCode; } void mndSyncStart(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - syncSetMsgCb(pMgmt->sync, &pMnode->msgCb); syncStart(pMgmt->sync); mInfo("vgId:1, sync started, id:%" PRId64, pMgmt->sync); } diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 988ecc5dd3..d5ad500fdb 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -17,7 +17,6 @@ #define _TD_VND_H_ #include "sync.h" -#include "syncTools.h" #include "ttrace.h" #include "vnodeInt.h" diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 47a40ea495..094db9ebd0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1268,7 +1268,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { if (pIter == NULL) break; SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; - if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__RECOVER1) { + if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE) { tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus); continue; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 2f6ec0c39f..48e69f8f4d 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -199,6 +199,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea goto END; } + tqDebug("vgId:%d, taosx get msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, offset, + TMSG_INFO((*ppCkHead)->head.msgType)); + if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) { code = walFetchBody(pHandle->pWalReader, ppCkHead); @@ -216,17 +219,20 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea SWalCont* pHead = &((*ppCkHead)->head); if (IS_META_MSG(pHead->msgType)) { code = walFetchBody(pHandle->pWalReader, ppCkHead); - if (code < 0) { ASSERT(0); *fetchOffset = offset; code = -1; goto END; } + if (isValValidForTable(pHandle, pHead)) { *fetchOffset = offset; code = 0; goto END; + } else { + offset++; + continue; } } } @@ -586,8 +592,39 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0); } } + } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { + if (isAdd) { + SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); + SMetaReader mr = {0}; + metaReaderInit(&mr, pTq->pVnode->pMeta, 0); + for (int32_t i = 0; i < taosArrayGetSize(tbUidList); ++i) { + uint64_t* id = (uint64_t*)taosArrayGet(tbUidList, i); + + int32_t code = metaGetTableEntryByUid(&mr, *id); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno)); + continue; + } + + tDecoderClear(&mr.coder); + + if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pExec->execHandle.execTb.suid) { + tqDebug("table uid %" PRId64 " does not add to tq handle", *id); + continue; + } + tqDebug("table uid %" PRId64 " add to tq handle", *id); + taosArrayPush(qa, id); + } + metaReaderClear(&mr); + if (taosArrayGetSize(qa) > 0) { + tqReaderAddTbUidList(pExec->execHandle.pExecReader, qa); + } + taosArrayDestroy(qa); + } else { + // TODO handle delete table from stb + } } else { - // tq update id + ASSERT(0); } } while (1) { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index a964a46406..76236e5078 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -684,7 +684,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { if (*state->pDataFReader != NULL) { tsdbDataFReaderClose(state->pDataFReader); - resetLastBlockLoadInfo(state->pLoadInfo); + // resetLastBlockLoadInfo(state->pLoadInfo); } code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet); @@ -764,7 +764,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { if (--state->iBlock < 0) { tsdbDataFReaderClose(state->pDataFReader); *state->pDataFReader = NULL; - resetLastBlockLoadInfo(state->pLoadInfo); + // resetLastBlockLoadInfo(state->pLoadInfo); if (state->aBlockIdx) { taosArrayDestroy(state->aBlockIdx); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index e593bbd602..7040d2d7c8 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -212,6 +212,12 @@ int vnodeCommit(SVnode *pVnode) { vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applied); + // persist wal before starting + if (walPersist(pVnode->pWal) < 0) { + vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr()); + return -1; + } + pVnode->state.commitTerm = pVnode->state.applyTerm; // save info diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 478e20d1ab..f9755bcd12 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -237,7 +237,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t code = syncProcessMsg(pVnode->sync, pMsg); if (code != 0) { vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg, - TMSG_INFO(pMsg->msgType), terrstr()); + TMSG_INFO(pMsg->msgType), terrstr()); } return code; @@ -290,78 +290,61 @@ static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return code; } -static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { +static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { vnodeGetSnapshot(pFsm->data, pSnapshot); return 0; } -static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {} +static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { + SVnode *pVnode = pFsm->data; -static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - if (cbMeta.isWeak == 0) { - SVnode *pVnode = pFsm->data; + if (pMeta->code == 0) { + SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); + rpcMsg.info = pMsg->info; + rpcMsg.info.conn.applyIndex = pMeta->index; + rpcMsg.info.conn.applyTerm = pMeta->term; - if (cbMeta.code == 0) { - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); - syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); - rpcMsg.info.conn.applyIndex = cbMeta.index; - rpcMsg.info.conn.applyTerm = cbMeta.term; - - vInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 + const STraceId *trace = &pMsg->info.traceId; + vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", - syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.term, rpcMsg.info.conn.applyIndex, cbMeta.isWeak, - cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType)); + syncGetVgId(pVnode->sync), pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, + pMeta->code, pMeta->state, syncUtilState2String(pMeta->state), TMSG_INFO(pMsg->msgType)); - tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); - } else { - SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info}; - vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync), - TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code)); - if (rsp.info.handle != NULL) { - tmsgSendRsp(&rsp); - } + tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); + } else { + SRpcMsg rsp = {.code = pMeta->code, .info = pMsg->info}; + vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync), + TMSG_INFO(pMsg->msgType), pMeta->index, pMeta->code, tstrerror(pMeta->code)); + if (rsp.info.handle != NULL) { + tmsgSendRsp(&rsp); } } } -static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - if (cbMeta.isWeak == 1) { - SVnode *pVnode = pFsm->data; - vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", - syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, - syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType)); - - if (cbMeta.code == 0) { - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); - syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); - rpcMsg.info.conn.applyIndex = cbMeta.index; - rpcMsg.info.conn.applyTerm = cbMeta.term; - tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); - } else { - SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info}; - vError("vgId:%d, pre-commit-cb execute error, type:%s, error:0x%x %s", syncGetVgId(pVnode->sync), - TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code)); - if (rsp.info.handle != NULL) { - tmsgSendRsp(&rsp); - } - } +static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { + if (pMeta->isWeak == 0) { + vnodeSyncApplyMsg(pFsm, pMsg, pMeta); } } -static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { + if (pMeta->isWeak == 1) { + vnodeSyncApplyMsg(pFsm, pMsg, pMeta); + } +} + +static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { SVnode *pVnode = pFsm->data; vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", - syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, - syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType)); + syncGetVgId(pVnode->sync), pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, + syncUtilState2String(pMeta->state), TMSG_INFO(pMsg->msgType)); } #define USE_TSDB_SNAPSHOT -static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { +static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) { #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; SSnapshotParam *pSnapshotParam = pParam; @@ -373,7 +356,7 @@ static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void #endif } -static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { +static int32_t vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) { #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; int32_t code = vnodeSnapReaderClose(pReader); @@ -384,7 +367,7 @@ static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { #endif } -static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { +static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len); @@ -403,7 +386,7 @@ static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void ** #endif } -static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { +static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) { #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; SSnapshotParam *pSnapshotParam = pParam; @@ -427,7 +410,7 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void #endif } -static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { +static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, @@ -442,7 +425,7 @@ static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool #endif } -static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { +static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len); @@ -454,9 +437,7 @@ static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void * #endif } -static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {} - -static void vnodeRestoreFinish(struct SSyncFSM *pFsm) { +static void vnodeRestoreFinish(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; do { @@ -476,7 +457,7 @@ static void vnodeRestoreFinish(struct SSyncFSM *pFsm) { vDebug("vgId:%d, sync restore finished", pVnode->config.vgId); } -static void vnodeBecomeFollower(struct SSyncFSM *pFsm) { +static void vnodeBecomeFollower(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; vDebug("vgId:%d, become follower", pVnode->config.vgId); @@ -490,7 +471,7 @@ static void vnodeBecomeFollower(struct SSyncFSM *pFsm) { taosThreadMutexUnlock(&pVnode->lock); } -static void vnodeBecomeLeader(struct SSyncFSM *pFsm) { +static void vnodeBecomeLeader(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; vDebug("vgId:%d, become leader", pVnode->config.vgId); @@ -512,10 +493,10 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot; pFsm->FpRestoreFinishCb = vnodeRestoreFinish; - pFsm->FpLeaderTransferCb = vnodeLeaderTransfer; + pFsm->FpLeaderTransferCb = NULL; pFsm->FpBecomeLeaderCb = vnodeBecomeLeader; pFsm->FpBecomeFollowerCb = vnodeBecomeFollower; - pFsm->FpReConfigCb = vnodeSyncReconfig; + pFsm->FpReConfigCb = NULL; pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead; pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead; @@ -533,10 +514,13 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { .vgId = pVnode->config.vgId, .syncCfg = pVnode->config.syncCfg, .pWal = pVnode->pWal, - .msgcb = NULL, - .FpSendMsg = vnodeSyncSendMsg, - .FpEqMsg = vnodeSyncEqMsg, - .FpEqCtrlMsg = vnodeSyncEqCtrlMsg, + .msgcb = &pVnode->msgCb, + .syncSendMSg = vnodeSyncSendMsg, + .syncEqMsg = vnodeSyncEqMsg, + .syncEqCtrlMsg = vnodeSyncEqCtrlMsg, + .pingMs = 5000, + .electMs = 4000, + .heartbeatMs = 700, }; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP); @@ -555,15 +539,11 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { return -1; } - setPingTimerMS(pVnode->sync, 5000); - setElectTimerMS(pVnode->sync, 4000); - setHeartbeatTimerMS(pVnode->sync, 700); return 0; } void vnodeSyncStart(SVnode *pVnode) { vDebug("vgId:%d, start sync", pVnode->config.vgId); - syncSetMsgCb(pVnode->sync, &pVnode->msgCb); syncStart(pVnode->sync); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d0aed8988b..c20acd0cee 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1883,7 +1883,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } #endif -#if 1 if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 || pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; @@ -1919,7 +1918,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { return NULL; } -#endif size_t total = taosArrayGetSize(pInfo->pBlockLists); // TODO: refactor @@ -2301,7 +2299,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT pInfo->vnode = pHandle->vnode; pInfo->sContext = pHandle->sContext; - pOperator->name = "RawStreamScanOperator"; + pOperator->name = "RawScanOperator"; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 5e8041e54a..63aa6d81c9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -107,9 +107,9 @@ typedef struct SSyncNode { // sync io SWal* pWal; const SMsgCb* msgcb; - int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); - int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); - int32_t (*FpEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); + int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg); + int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); + int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); // init internal SNodeInfo myNodeInfo; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 26770a17d7..3089e9b39b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -56,6 +56,12 @@ int64_t syncOpen(SSyncInfo* pSyncInfo) { return -1; } + pSyncNode->pingBaseLine = pSyncInfo->pingMs; + pSyncNode->pingTimerMS = pSyncInfo->pingMs; + pSyncNode->electBaseLine = pSyncInfo->electMs; + pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs; + pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs; + pSyncNode->msgcb = pSyncInfo->msgcb; return pSyncNode->rid; } @@ -737,30 +743,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { syncNodeRelease(pSyncNode); } -int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - return TAOS_SYNC_STATE_ERROR; - } - ASSERT(rid == pSyncNode->rid); - - SRespStub stub; - int32_t ret = syncRespMgrGet(pSyncNode->pSyncRespMgr, index, &stub); - if (ret == 1) { - memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg)); - } - - syncNodeRelease(pSyncNode); - return ret; -} - -int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - return TAOS_SYNC_STATE_ERROR; - } - ASSERT(rid == pSyncNode->rid); - +static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandleInfo* pInfo) { SRespStub stub; int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub); if (ret == 1) { @@ -768,20 +751,6 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) } sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle); - syncNodeRelease(pSyncNode); - return ret; -} - -void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - sTrace("syncSetQ get pSyncNode is NULL, rid:%" PRId64, rid); - return; - } - ASSERT(rid == pSyncNode->rid); - pSyncNode->msgcb = msgcb; - - syncNodeRelease(pSyncNode); } char* sync2SimpleStr(int64_t rid) { @@ -797,41 +766,6 @@ char* sync2SimpleStr(int64_t rid) { return s; } -void setPingTimerMS(int64_t rid, int32_t pingTimerMS) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - return; - } - ASSERT(rid == pSyncNode->rid); - pSyncNode->pingBaseLine = pingTimerMS; - pSyncNode->pingTimerMS = pingTimerMS; - - syncNodeRelease(pSyncNode); -} - -void setElectTimerMS(int64_t rid, int32_t electTimerMS) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - return; - } - ASSERT(rid == pSyncNode->rid); - pSyncNode->electBaseLine = electTimerMS; - - syncNodeRelease(pSyncNode); -} - -void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - return; - } - ASSERT(rid == pSyncNode->rid); - pSyncNode->hbBaseLine = hbTimerMS; - pSyncNode->heartbeatTimerMS = hbTimerMS; - - syncNodeRelease(pSyncNode); -} - int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { @@ -928,7 +862,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { } } else { - if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { + if (pSyncNode->syncEqMsg != NULL && (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { ret = 0; } else { ret = -1; @@ -1059,9 +993,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { pSyncNode->pWal = pSyncInfo->pWal; pSyncNode->msgcb = pSyncInfo->msgcb; - pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; - pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; - pSyncNode->FpEqCtrlMsg = pSyncInfo->FpEqCtrlMsg; + pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg; + pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg; + pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg; // init raft config pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); @@ -1577,12 +1511,12 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); - if (pSyncNode->FpSendMsg != NULL) { + if (pSyncNode->syncSendMSg != NULL) { // htonl syncUtilMsgHtoN(pMsg->pCont); pMsg->info.noResp = 1; - pSyncNode->FpSendMsg(&epSet, pMsg); + pSyncNode->syncSendMSg(&epSet, pMsg); } else { sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId); return -1; @@ -1594,12 +1528,12 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; syncUtilnodeInfo2EpSet(nodeInfo, &epSet); - if (pSyncNode->FpSendMsg != NULL) { + if (pSyncNode->syncSendMSg != NULL) { // htonl syncUtilMsgHtoN(pMsg->pCont); pMsg->info.noResp = 1; - pSyncNode->FpSendMsg(&epSet, pMsg); + pSyncNode->syncSendMSg(&epSet, pMsg); } else { sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId); } @@ -1623,13 +1557,13 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb); cJSON_AddStringToObject(pRoot, "rpcClient", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg); - cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->syncSendMSg); + cJSON_AddStringToObject(pRoot, "syncSendMSg", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb); cJSON_AddStringToObject(pRoot, "queue", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg); - cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->syncEqMsg); + cJSON_AddStringToObject(pRoot, "syncEqMsg", u64buf); // init internal cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo); @@ -2642,8 +2576,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg); - if (pSyncNode->FpEqMsg != NULL) { - int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + if (pSyncNode->syncEqMsg != NULL) { + int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); @@ -2651,7 +2585,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { return; } } else { - sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL"); + sTrace("syncNodeEqPingTimer pSyncNode->syncEqMsg is NULL"); } syncTimeoutDestroy(pSyncMsg); @@ -2676,8 +2610,8 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { pSyncNode->vgId, pSyncNode); SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); - if (pSyncNode->FpEqMsg != NULL && pSyncNode->msgcb != NULL && pSyncNode->msgcb->putToQueueFp != NULL) { - int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + if (pSyncNode->syncEqMsg != NULL && pSyncNode->msgcb != NULL && pSyncNode->msgcb->putToQueueFp != NULL) { + int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); @@ -2693,7 +2627,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { } while (0); } else { - sTrace("syncNodeEqElectTimer FpEqMsg is NULL"); + sTrace("syncNodeEqElectTimer syncEqMsg is NULL"); } syncTimeoutDestroy(pSyncMsg); @@ -2725,8 +2659,8 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg); - if (pSyncNode->FpEqMsg != NULL) { - int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + if (pSyncNode->syncEqMsg != NULL) { + int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); @@ -2734,7 +2668,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { return; } } else { - sError("vgId:%d, enqueue msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId); + sError("vgId:%d, enqueue msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId); } syncTimeoutDestroy(pSyncMsg); @@ -2781,8 +2715,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { // eq msg #if 0 - if (pSyncNode->FpEqCtrlMsg != NULL) { - int32_t code = pSyncNode->FpEqCtrlMsg(pSyncNode->msgcb, &rpcMsg); + if (pSyncNode->syncEqCtrlMsg != NULL) { + int32_t code = pSyncNode->syncEqCtrlMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); @@ -2790,7 +2724,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { return; } } else { - sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId); + sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId); } #endif @@ -2830,10 +2764,10 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) { SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); - if (ths->FpEqMsg != NULL) { - ths->FpEqMsg(ths->msgcb, &rpcMsg); + if (ths->syncEqMsg != NULL) { + ths->syncEqMsg(ths->msgcb, &rpcMsg); } else { - sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL"); + sTrace("syncNodeEqNoop pSyncNode->syncEqMsg is NULL"); } syncEntryDestory(pEntry); @@ -2944,8 +2878,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { SRpcMsg rpcMsgLocalCmd; syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd); - if (ths->FpEqMsg != NULL && ths->msgcb != NULL) { - int32_t code = ths->FpEqMsg(ths->msgcb, &rpcMsgLocalCmd); + if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { + int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); if (code != 0) { sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code); rpcFreeCont(rpcMsgLocalCmd.pCont); @@ -3127,17 +3061,18 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p } if (ths->pFsm->FpLeaderTransferCb != NULL) { - SFsmCbMeta cbMeta = {0}; - cbMeta.code = 0; - cbMeta.currentTerm = ths->pRaftStore->currentTerm; - cbMeta.flag = 0; - cbMeta.index = pEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pEntry->isWeak; - cbMeta.seqNum = pEntry->seqNum; - cbMeta.state = ths->state; - cbMeta.term = pEntry->term; - ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, cbMeta); + SFsmCbMeta cbMeta = { + cbMeta.code = 0, + cbMeta.currentTerm = ths->pRaftStore->currentTerm, + cbMeta.flag = 0, + cbMeta.index = pEntry->index, + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index), + cbMeta.isWeak = pEntry->isWeak, + cbMeta.seqNum = pEntry->seqNum, + cbMeta.state = ths->state, + cbMeta.term = pEntry->term, + }; + ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta); } syncLeaderTransferDestroy(pSyncLeaderTransfer); @@ -3315,18 +3250,20 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde // execute fsm in apply thread, or execute outside syncPropose if (internalExecute) { - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pEntry->isWeak; - cbMeta.code = 0; - cbMeta.state = ths->state; - cbMeta.seqNum = pEntry->seqNum; - cbMeta.term = pEntry->term; - cbMeta.currentTerm = ths->pRaftStore->currentTerm; - cbMeta.flag = flag; + SFsmCbMeta cbMeta = { + .index = pEntry->index, + .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index), + .isWeak = pEntry->isWeak, + .code = 0, + .state = ths->state, + .seqNum = pEntry->seqNum, + .term = pEntry->term, + .currentTerm = ths->pRaftStore->currentTerm, + .flag = flag, + }; - ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); + syncGetAndDelRespRpc(ths, cbMeta.seqNum, &rpcMsg.info); + ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta); } } diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 88af5746d4..35c831b52f 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -145,16 +145,17 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { taosArrayPush(delIndexArray, pSeqNum); cnt++; - SFsmCbMeta cbMeta = {0}; - cbMeta.index = SYNC_INDEX_INVALID; - cbMeta.lastConfigIndex = SYNC_INDEX_INVALID; - cbMeta.isWeak = false; - cbMeta.code = TSDB_CODE_SYN_TIMEOUT; - cbMeta.state = pSyncNode->state; - cbMeta.seqNum = *pSeqNum; - cbMeta.term = SYNC_TERM_INVALID; - cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; - cbMeta.flag = 0; + SFsmCbMeta cbMeta = { + cbMeta.index = SYNC_INDEX_INVALID, + cbMeta.lastConfigIndex = SYNC_INDEX_INVALID, + cbMeta.isWeak = false, + cbMeta.code = TSDB_CODE_SYN_TIMEOUT, + cbMeta.state = pSyncNode->state, + cbMeta.seqNum = *pSeqNum, + cbMeta.term = SYNC_TERM_INVALID, + cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm, + cbMeta.flag = 0, + }; pStub->rpcMsg.pCont = NULL; pStub->rpcMsg.contLen = 0; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index f152201901..1750dce5ec 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -177,18 +177,6 @@ char* syncUtilRaftId2Str(const SRaftId* p) { } const char* syncUtilState2String(ESyncState state) { - /* - if (state == TAOS_SYNC_STATE_FOLLOWER) { - return "TAOS_SYNC_STATE_FOLLOWER"; - } else if (state == TAOS_SYNC_STATE_CANDIDATE) { - return "TAOS_SYNC_STATE_CANDIDATE"; - } else if (state == TAOS_SYNC_STATE_LEADER) { - return "TAOS_SYNC_STATE_LEADER"; - } else { - return "TAOS_SYNC_STATE_UNKNOWN"; - } - */ - if (state == TAOS_SYNC_STATE_FOLLOWER) { return "follower"; } else if (state == TAOS_SYNC_STATE_CANDIDATE) { diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 8f16be27e7..96e0b6c483 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -195,8 +195,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* SSyncInfo syncInfo; syncInfo.vgId = vgId; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = createFsm(); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); syncInfo.pWal = pWal; diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index d1244546c9..bf01f76607 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -120,8 +120,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* SSyncInfo syncInfo; syncInfo.vgId = vgId; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = createFsm(); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); syncInfo.pWal = pWal; diff --git a/source/libs/sync/test/syncElectTest.cpp b/source/libs/sync/test/syncElectTest.cpp index d09879a699..c290368c7f 100644 --- a/source/libs/sync/test/syncElectTest.cpp +++ b/source/libs/sync/test/syncElectTest.cpp @@ -45,8 +45,8 @@ SSyncNode* createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWa SSyncInfo syncInfo; syncInfo.vgId = vgId; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = NULL; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); syncInfo.pWal = pWal; diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index fdb5cf7ac8..35dc7e4398 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -32,8 +32,8 @@ SSyncNode *pSyncNode; SSyncNode *syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index 191e245b1e..d43789c91e 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -25,8 +25,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); @@ -97,7 +97,7 @@ int main(int argc, char** argv) { SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncEnqTest"); SRpcMsg rpcMsg; syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); - pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp index 4a457136c2..055f869130 100644 --- a/source/libs/sync/test/syncIOSendMsgTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgTest.cpp @@ -26,8 +26,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); @@ -103,7 +103,7 @@ int main(int argc, char** argv) { SEpSet epSet; syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet); rpcMsg.info.noResp = 1; - pSyncNode->FpSendMsg(&epSet, &rpcMsg); + pSyncNode->syncSendMSg(&epSet, &rpcMsg); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp index d654ad06fe..4333127405 100644 --- a/source/libs/sync/test/syncInitTest.cpp +++ b/source/libs/sync/test/syncInitTest.cpp @@ -26,8 +26,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./sync_init_test"); diff --git a/source/libs/sync/test/syncPingSelfTest.cpp b/source/libs/sync/test/syncPingSelfTest.cpp index f44cbb04d5..7d8ed73ac7 100644 --- a/source/libs/sync/test/syncPingSelfTest.cpp +++ b/source/libs/sync/test/syncPingSelfTest.cpp @@ -26,8 +26,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncPingTimerTest.cpp b/source/libs/sync/test/syncPingTimerTest.cpp index fd6342aa84..c074103f38 100644 --- a/source/libs/sync/test/syncPingTimerTest.cpp +++ b/source/libs/sync/test/syncPingTimerTest.cpp @@ -26,8 +26,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncPingTimerTest2.cpp b/source/libs/sync/test/syncPingTimerTest2.cpp index 295003dff3..2683f48487 100644 --- a/source/libs/sync/test/syncPingTimerTest2.cpp +++ b/source/libs/sync/test/syncPingTimerTest2.cpp @@ -26,8 +26,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index adb3deb22d..7552fc7ae3 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -100,8 +100,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* SSyncInfo syncInfo; syncInfo.vgId = vgId; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = createFsm(); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); syncInfo.pWal = pWal; diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 50771ac476..2aff0aad93 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -87,8 +87,8 @@ void initFsm() { SSyncNode *syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir); @@ -204,7 +204,7 @@ int main(int argc, char **argv) { SyncClientRequest *pSyncClientRequest = pMsg1; SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); - gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg); + gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index bdb4d7d2d8..a36f3af450 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -217,8 +217,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* SSyncInfo syncInfo; syncInfo.vgId = vgId; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = createFsm(); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); syncInfo.pWal = pWal; diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp index e2e8748697..bbf14c604e 100644 --- a/source/libs/sync/test/syncVotesGrantedTest.cpp +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -28,8 +28,8 @@ SSyncNode* pSyncNode; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncVotesRespondTest.cpp b/source/libs/sync/test/syncVotesRespondTest.cpp index 881a5331b1..adebfe1be2 100644 --- a/source/libs/sync/test/syncVotesRespondTest.cpp +++ b/source/libs/sync/test/syncVotesRespondTest.cpp @@ -28,8 +28,8 @@ SSyncNode* pSyncNode; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index fee98ddd52..0547f39bee 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -65,8 +65,8 @@ void initFsm() { SSyncNode *syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir); @@ -179,7 +179,7 @@ int main(int argc, char **argv) { SyncClientRequest *pSyncClientRequest = pMsg1; SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); - gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg); + gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg); taosMsleep(1000); } diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 1aea0e8148..6dc9922981 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -34,6 +34,7 @@ typedef struct { int64_t createTs; int64_t closeTs; int64_t fileSize; + int64_t syncedOffset; } SWalFileInfo; typedef struct WalIdxEntry { @@ -66,6 +67,12 @@ static inline int64_t walGetLastFileSize(SWal* pWal) { return pInfo->fileSize; } +static inline int64_t walGetLastFileCachedSize(SWal* pWal) { + if (taosArrayGetSize(pWal->fileInfoSet) == 0) return 0; + SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + return (pInfo->fileSize - pInfo->syncedOffset); +} + static inline int64_t walGetLastFileFirstVer(SWal* pWal) { if (taosArrayGetSize(pWal->fileInfoSet) == 0) return -1; SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index ad27a50f68..3baa906b19 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -16,6 +16,7 @@ #include "cJSON.h" #include "os.h" #include "taoserror.h" +#include "tglobal.h" #include "tutil.h" #include "walInt.h" @@ -65,32 +66,43 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { // ensure size as non-negative pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize); + int64_t stepSize = WAL_SCAN_BUF_SIZE; uint64_t magic = WAL_MAGIC; int64_t walCkHeadSz = sizeof(SWalCkHead); int64_t end = fileSize; - int64_t offset = 0; int64_t capacity = 0; int64_t readSize = 0; char* buf = NULL; - int64_t found = -1; bool firstTrial = pFileInfo->fileSize < fileSize; + int64_t offset = TMIN(pFileInfo->fileSize, fileSize); + int64_t offsetForward = offset - stepSize + walCkHeadSz - 1; + int64_t offsetBackward = offset; + int64_t retVer = -1; + int64_t lastEntryBeginOffset = 0; + int64_t lastEntryEndOffset = 0; + + // check recover size + if (2 * tsWalFsyncDataSizeLimit + offset < end) { + wWarn("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64 + ", end:%" PRId64 ", file:%s", + pWal->cfg.vgId, 2 * tsWalFsyncDataSizeLimit, offset, end, fnameStr); + } // search for the valid last WAL entry, e.g. block by block while (1) { - offset = (firstTrial) ? pFileInfo->fileSize : TMAX(0, end - WAL_SCAN_BUF_SIZE); + offset = (firstTrial) ? TMIN(fileSize, offsetForward + stepSize - walCkHeadSz + 1) + : TMAX(0, offsetBackward - stepSize + walCkHeadSz - 1); + end = TMIN(offset + stepSize, fileSize); + if (firstTrial) { + offsetForward = offset; + } else { + offsetBackward = offset; + } + ASSERT(offset <= end); readSize = end - offset; capacity = readSize + sizeof(magic); - int64_t limit = WAL_RECOV_SIZE_LIMIT; - if (limit < readSize) { - wError("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64 - ", end:%" PRId64 ", file:%s", - pWal->cfg.vgId, limit, offset, end, fnameStr); - terrno = TSDB_CODE_WAL_SIZE_LIMIT; - goto _err; - } - void* ptr = taosMemoryRealloc(buf, capacity); if (ptr == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; @@ -127,6 +139,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { } logContent = (SWalCkHead*)(buf + pos); if (walValidHeadCksum(logContent) != 0) { + terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH; wWarn("vgId:%d, failed to validate checksum of wal entry header. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, offset + pos, fnameStr); haystack = buf + pos + 1; @@ -179,46 +192,41 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { } // found one - found = pos; + retVer = logContent->head.version; + lastEntryBeginOffset = offset + pos; + lastEntryEndOffset = offset + pos + sizeof(SWalCkHead) + logContent->head.bodyLen; + + // try next haystack = buf + pos + 1; } - if (found >= 0 || offset == 0) break; - - // go backwards, e.g. by at most one WAL scan buf size - end = TMIN(offset + walCkHeadSz - 1, fileSize); - firstTrial = false; + if (end == fileSize) firstTrial = false; + if (firstTrial && terrno == TSDB_CODE_SUCCESS) continue; + if (retVer >= 0 || offset == 0) break; } - // determine end of last entry - SWalCkHead* lastEntry = (found >= 0) ? (SWalCkHead*)(buf + found) : NULL; - int64_t retVer = -1; - int64_t lastEntryBeginOffset = 0; - int64_t lastEntryEndOffset = 0; - - if (lastEntry == NULL) { + if (retVer < 0) { terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; - } else { - retVer = lastEntry->head.version; - lastEntryBeginOffset = offset + (int64_t)((char*)lastEntry - (char*)buf); - lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen; } // truncate file if (lastEntryEndOffset != fileSize) { wWarn("vgId:%d, repair meta truncate file %s to %" PRId64 ", orig size %" PRId64, pWal->cfg.vgId, fnameStr, lastEntryEndOffset, fileSize); + if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) { wError("failed to truncate file due to %s. file:%s", strerror(errno), fnameStr); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } + if (taosFsyncFile(pFile) < 0) { wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } } + pFileInfo->fileSize = lastEntryEndOffset; taosCloseFile(&pFile); @@ -621,6 +629,7 @@ int walRollFileInfo(SWal* pWal) { pNewInfo->createTs = ts; pNewInfo->closeTs = -1; pNewInfo->fileSize = 0; + pNewInfo->syncedOffset = 0; taosArrayPush(pArray, pNewInfo); taosMemoryFree(pNewInfo); return 0; @@ -771,6 +780,12 @@ static int walFindCurMetaVer(SWal* pWal) { return metaVer; } +void walUpdateSyncedOffset(SWal* pWal) { + SWalFileInfo* pFileInfo = walGetCurFileInfo(pWal); + if (pFileInfo == NULL) return; + pFileInfo->syncedOffset = pFileInfo->fileSize; +} + int walSaveMeta(SWal* pWal) { int metaVer = walFindCurMetaVer(pWal); char fnameStr[WAL_FILE_LEN]; @@ -790,6 +805,9 @@ int walSaveMeta(SWal* pWal) { return -1; } + // update synced offset + (void)walUpdateSyncedOffset(pWal); + // flush to a tmpfile n = walBuildTmpMetaName(pWal, tmpFnameStr); ASSERT(n < sizeof(tmpFnameStr) && "Buffer overflow of file name"); diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 0df1a6b387..1a70a3038f 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -187,6 +187,13 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; } +int32_t walPersist(SWal *pWal) { + taosThreadMutexLock(&pWal->mutex); + int32_t ret = walSaveMeta(pWal); + taosThreadMutexUnlock(&pWal->mutex); + return ret; +} + void walClose(SWal *pWal) { taosThreadMutexLock(&pWal->mutex); (void)walSaveMeta(pWal); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 8f493ddd85..1350ca0c37 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -198,7 +198,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { return -1; } - wDebug("vgId:%d, wal version reset from index:%" PRId64 "(invalid:%d) to index:%" PRId64, pReader->pWal->cfg.vgId, + wDebug("vgId:%d, wal version reset from %" PRId64 "(invalid:%d) to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, pReader->curInvalid, ver); pReader->curVersion = ver; @@ -347,22 +347,47 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) { int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { int64_t code; + int64_t contLen; + bool seeked = false; + + wDebug("vgId:%d try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 + ", applied ver:%" PRId64, + pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, + pRead->pWal->vers.appliedVer); // TODO: valid ver - if (ver > pRead->pWal->vers.commitVer) { + if (ver > pRead->pWal->vers.appliedVer) { return -1; } if (pRead->curInvalid || pRead->curVersion != ver) { code = walReadSeekVer(pRead, ver); - if (code < 0) return -1; + if (code < 0) { + pRead->curVersion = ver; + pRead->curInvalid = 1; + return -1; + } + seeked = true; } - ASSERT(taosValidFile(pRead->pLogFile) == true); - - code = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead)); - if (code != sizeof(SWalCkHead)) { - return -1; + while (1) { + contLen = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead)); + if (contLen == sizeof(SWalCkHead)) { + break; + } else if (contLen == 0 && !seeked) { + walReadSeekVerImpl(pRead, ver); + seeked = true; + continue; + } else { + if (contLen < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + } else { + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + } + ASSERT(0); + pRead->curInvalid = 1; + return -1; + } } code = walValidHeadCksum(pHead); @@ -373,13 +398,20 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { return -1; } + pRead->curInvalid = 0; return 0; } int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { int64_t code; - // ASSERT(pRead->curVersion == pHead->head.version); + wDebug("vgId:%d skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 + ", applied ver:%" PRId64, + pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, + pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer); + + ASSERT(pRead->curVersion == pHead->head.version); + ASSERT(pRead->curInvalid == 0); code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR); if (code < 0) { @@ -397,6 +429,11 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { SWalCont *pReadHead = &((*ppHead)->head); int64_t ver = pReadHead->version; + wDebug("vgId:%d fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 + ", applied ver:%" PRId64, + pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, + pRead->pWal->vers.appliedVer); + if (pRead->capacity < pReadHead->bodyLen) { SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen); if (ptr == NULL) { @@ -409,19 +446,32 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { } if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) { + if (pReadHead->bodyLen < 0) { + ASSERT(0); + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s", + pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno)); + } else { + wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted", + pRead->pWal->cfg.vgId, pReadHead->version, ver); + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + } + pRead->curInvalid = 1; ASSERT(0); return -1; } if (pReadHead->version != ver) { + ASSERT(0); wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, - pRead->pHead->head.version, ver); + pReadHead->version, ver); pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } if (walValidBodyCksum(*ppHead) != 0) { + ASSERT(0); wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); pRead->curInvalid = 1; diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index e016e72471..c723618a4b 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -16,6 +16,7 @@ #include "os.h" #include "taoserror.h" #include "tchecksum.h" +#include "tglobal.h" #include "walInt.h" int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { @@ -252,23 +253,36 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) { } } + if (walGetLastFileCachedSize(pWal) > tsWalFsyncDataSizeLimit) { + if (walSaveMeta(pWal) < 0) { + return -1; + } + } + return 0; } int32_t walBeginSnapshot(SWal *pWal, int64_t ver) { + taosThreadMutexLock(&pWal->mutex); + pWal->vers.verInSnapshotting = ver; wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer); // check file rolling if (pWal->cfg.retentionPeriod == 0) { - taosThreadMutexLock(&pWal->mutex); if (walGetLastFileSize(pWal) != 0) { - walRollImpl(pWal); + if (walRollImpl(pWal) < 0) { + wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr()); + goto _err; + } } - taosThreadMutexUnlock(&pWal->mutex); } - + taosThreadMutexUnlock(&pWal->mutex); return 0; + +_err: + taosThreadMutexUnlock(&pWal->mutex); + return -1; } int32_t walEndSnapshot(SWal *pWal) { diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 0a6dad4819..e5ca9faacb 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -227,9 +227,7 @@ void taosGetSystemInfo() { #ifdef WINDOWS taosGetCpuCores(&tsNumOfCores); taosGetTotalMemory(&tsTotalMemoryKB); - - double tmp1, tmp2, tmp3, tmp4; - taosGetCpuUsage(&tmp1, &tmp2); + taosGetCpuUsage(NULL, NULL); #elif defined(_TD_DARWIN_64) long physical_pages = sysconf(_SC_PHYS_PAGES); long page_size = sysconf(_SC_PAGESIZE); @@ -240,9 +238,7 @@ void taosGetSystemInfo() { taosGetProcIOnfos(); taosGetCpuCores(&tsNumOfCores); taosGetTotalMemory(&tsTotalMemoryKB); - - double tmp1, tmp2, tmp3, tmp4; - taosGetCpuUsage(&tmp1, &tmp2); + taosGetCpuUsage(NULL, NULL); #endif } @@ -447,8 +443,8 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) { static int64_t curSysTotal = 0; static int64_t curProcTotal = 0; - *cpu_system = 0; - *cpu_engine = 0; + if (cpu_system != NULL) *cpu_system = 0; + if (cpu_engine != NULL) *cpu_engine = 0; SysCpuInfo sysCpu = {0}; ProcCpuInfo procCpu = {0}; @@ -458,8 +454,12 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) { curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime; if (curSysTotal > lastSysTotal && curSysUsed >= lastSysUsed && curProcTotal >= lastProcTotal) { - *cpu_engine = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100; - *cpu_system = (curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100; + if (cpu_system != NULL) { + *cpu_system = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100; + } + if (cpu_engine != NULL) { + *cpu_engine = (curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100; + } } lastSysUsed = curSysUsed;