This commit is contained in:
yihaoDeng 2022-11-01 21:47:58 +08:00
commit 12be1a8b22
46 changed files with 476 additions and 437 deletions

View File

@ -211,10 +211,10 @@ SHOW USERS;
显示当前系统中所有用户的信息。包括用户自定义的用户和系统默认用户。 显示当前系统中所有用户的信息。包括用户自定义的用户和系统默认用户。
## SHOW VARIABLES ## SHOW CLUSTER VARIABLES(3.0.1.6 之前为 SHOW VARIABLES)
```sql ```sql
SHOW VARIABLES; SHOW CLUSTER VARIABLES;
SHOW DNODE dnode_id VARIABLES; SHOW DNODE dnode_id VARIABLES;
``` ```

View File

@ -126,6 +126,9 @@ extern char tsSmlChildTableName[];
extern char tsSmlTagName[]; extern char tsSmlTagName[];
extern bool tsSmlDataFormat; extern bool tsSmlDataFormat;
// wal
extern int64_t tsWalFsyncDataSizeLimit;
// internal // internal
extern int32_t tsTransPullupInterval; extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval; extern int32_t tsMqRebalanceInterval;

View File

@ -57,10 +57,11 @@ extern int32_t tMsgDict[];
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) #define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
#define TMSG_INFO(TYPE) \ #define TMSG_INFO(TYPE) \
((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || (TYPE) < TDMT_SCH_MAX_MSG || \ ((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || (TYPE) < TDMT_SCH_MAX_MSG || \
(TYPE) < TDMT_VND_TMQ_MAX_MSG || (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_VND_STREAM_MAX_MSG || \ (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) || \
(TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) \ (TYPE) < TDMT_VND_STREAM_MSG || (TYPE) < TDMT_VND_TMQ_MSG \
? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \ ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \
: 0 : 0
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) #define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
typedef uint16_t tmsg_t; typedef uint16_t tmsg_t;

View File

@ -139,6 +139,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_UNUSED1, "unused", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp)
@ -147,7 +148,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_UNUSED2, "unused2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "mq-tmr", SMTimerReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL)
@ -184,6 +186,21 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED1, "vnode-unused1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED2, "vnode-unused2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED3, "vnode-unused3", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED4, "vnode-unused4", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED5, "vnode-unused5", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED6, "vnode-unused6", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED7, "vnode-unused7", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED8, "vnode-unused8", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED9, "vnode-unused9", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED10, "vnode-unused10", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED11, "vnode-unused11", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED12, "vnode-unused12", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED13, "vnode-unused13", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED14, "vnode-unused14", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED15, "vnode-unused15", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
@ -215,30 +232,17 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "vnode-stream-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "vnode-stream-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MON_MSG) TD_NEW_MSG_SEG(TDMT_MON_MSG)
TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL)
@ -270,6 +274,22 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)
#if defined(TD_MSG_NUMBER_) #if defined(TD_MSG_NUMBER_)
TDMT_MAX TDMT_MAX
#endif #endif

View File

@ -132,27 +132,27 @@ typedef struct SSnapshotMeta {
typedef struct SSyncFSM { typedef struct SSyncFSM {
void* data; void* data;
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta *pMeta);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpPreCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRollBackCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta); void (*FpReConfigCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SReConfigCbMeta* pMeta);
void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpBecomeLeaderCb)(struct SSyncFSM* pFsm); void (*FpBecomeLeaderCb)(const struct SSyncFSM* pFsm);
void (*FpBecomeFollowerCb)(struct SSyncFSM* pFsm); void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader); int32_t (*FpGetSnapshot)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpGetSnapshotInfo)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader); int32_t (*FpSnapshotStartRead)(const struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader);
int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader); int32_t (*FpSnapshotStopRead)(const struct SSyncFSM* pFsm, void* pReader);
int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len); int32_t (*FpSnapshotDoRead)(const struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter); int32_t (*FpSnapshotStartWrite)(const struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter);
int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot); int32_t (*FpSnapshotStopWrite)(const struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot);
int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len); int32_t (*FpSnapshotDoWrite)(const struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len);
} SSyncFSM; } SSyncFSM;
@ -193,9 +193,13 @@ typedef struct SSyncInfo {
SWal* pWal; SWal* pWal;
SSyncFSM* pFsm; SSyncFSM* pFsm;
SMsgCb* msgcb; SMsgCb* msgcb;
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t pingMs;
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t electMs;
int32_t (*FpEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t heartbeatMs;
int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
} SSyncInfo; } SSyncInfo;
int32_t syncInit(); int32_t syncInit();
@ -228,6 +232,8 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
const char* syncUtilState2String(ESyncState state);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -28,22 +28,11 @@ typedef struct SRaftId {
SyncGroupId vgId; SyncGroupId vgId;
} SRaftId; } SRaftId;
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg);
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo);
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb);
char* sync2SimpleStr(int64_t rid); char* sync2SimpleStr(int64_t rid);
// set timer ms
void setPingTimerMS(int64_t rid, int32_t pingTimerMS);
void setElectTimerMS(int64_t rid, int32_t electTimerMS);
void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS);
// for compatibility, the same as syncPropose // for compatibility, the same as syncPropose
int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak); int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak);
// utils
const char* syncUtilState2String(ESyncState state);
// ------------------ for debug ------------------- // ------------------ for debug -------------------
void syncRpcMsgPrint(SRpcMsg* pMsg); void syncRpcMsgPrint(SRpcMsg* pMsg);
void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg); void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);

View File

@ -43,7 +43,6 @@ extern "C" {
#define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL #define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) #define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
#define WAL_RECOV_SIZE_LIMIT (100 * WAL_SCAN_BUF_SIZE)
typedef enum { typedef enum {
TAOS_WAL_WRITE = 1, TAOS_WAL_WRITE = 1,
@ -159,6 +158,7 @@ void walCleanUp();
// handle open and ctl // handle open and ctl
SWal *walOpen(const char *path, SWalCfg *pCfg); SWal *walOpen(const char *path, SWalCfg *pCfg);
int32_t walAlter(SWal *, SWalCfg *pCfg); int32_t walAlter(SWal *, SWalCfg *pCfg);
int32_t walPersist(SWal *);
void walClose(SWal *); void walClose(SWal *);
// write interfaces // write interfaces

View File

@ -156,6 +156,9 @@ char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPR
// udf // udf
bool tsStartUdfd = true; bool tsStartUdfd = true;
// wal
int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L);
// internal // internal
int32_t tsTransPullupInterval = 2; int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2; int32_t tsMqRebalanceInterval = 2;
@ -423,6 +426,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, 0) != 0)
return -1;
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1;
@ -722,6 +728,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32; tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32;
tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32; tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32;
tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64;
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs));
tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath)); tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath));

View File

@ -55,7 +55,7 @@ void *dmSetMgmtHandle(SArray *pArray, tmsg_t msgType, void *nodeMsgFp, bool need
} }
void dmGetMonitorSystemInfo(SMonSysInfo *pInfo) { void dmGetMonitorSystemInfo(SMonSysInfo *pInfo) {
taosGetCpuUsage(&pInfo->cpu_engine, &pInfo->cpu_system); taosGetCpuUsage(&pInfo->cpu_system, &pInfo->cpu_engine);
taosGetCpuCores(&pInfo->cpu_cores); taosGetCpuCores(&pInfo->cpu_cores);
taosGetProcMemory(&pInfo->mem_engine); taosGetProcMemory(&pInfo->mem_engine);
taosGetSysMemory(&pInfo->mem_system); taosGetSysMemory(&pInfo->mem_system);

View File

@ -20,7 +20,6 @@
#include "sdb.h" #include "sdb.h"
#include "sync.h" #include "sync.h"
#include "syncTools.h"
#include "tcache.h" #include "tcache.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
@ -90,7 +89,6 @@ typedef struct {
int32_t errCode; int32_t errCode;
int32_t transId; int32_t transId;
SRWLatch lock; SRWLatch lock;
int8_t leaderTransferFinish;
int8_t selfIndex; int8_t selfIndex;
int8_t numOfReplicas; int8_t numOfReplicas;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];

View File

@ -66,7 +66,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);

View File

@ -105,7 +105,7 @@ static void mndCalMqRebalance(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
} }
} }
@ -428,18 +428,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
void mndPreClose(SMnode *pMnode) { void mndPreClose(SMnode *pMnode) {
if (pMnode != NULL) { if (pMnode != NULL) {
atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
syncLeaderTransfer(pMnode->syncMgmt.sync); syncLeaderTransfer(pMnode->syncMgmt.sync);
#if 0
mInfo("vgId:1, mnode start leader transfer");
// wait for leader transfer finish
while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
taosMsleep(10);
mInfo("vgId:1, mnode waiting for leader transfer");
}
mInfo("vgId:1, mnode finish leader transfer");
#endif
} }
} }
@ -501,7 +490,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
const char *role = syncGetMyRoleStr(pMnode->syncMgmt.sync); const char *role = syncGetMyRoleStr(pMnode->syncMgmt.sync);
bool restored = syncIsRestoreFinish(pMnode->syncMgmt.sync); bool restored = syncIsRestoreFinish(pMnode->syncMgmt.sync);
if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
pMsg->msgType == TDMT_MND_UPTIME_TIMER) { pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,

View File

@ -72,25 +72,25 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return code; return code;
} }
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void mndSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont; SSdbRaw *pRaw = pMsg->pCont;
// delete msg handle // delete msg handle
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info); rpcMsg.info = pMsg->info;
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
pMgmt->errCode = cbMeta.code; pMgmt->errCode = pMeta->code;
mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
" role:%s raw:%p", " role:%s raw:%p",
transId, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex, syncStr(cbMeta.state), transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
pRaw); pRaw);
if (pMgmt->errCode == 0) { if (pMgmt->errCode == 0) {
sdbWriteWithoutFree(pMnode->pSdb, pRaw); sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
} }
taosWLockLatch(&pMgmt->lock); taosWLockLatch(&pMgmt->lock);
@ -120,7 +120,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
} }
} }
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) { int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
mInfo("start to read snapshot from sdb in atomic way"); mInfo("start to read snapshot from sdb in atomic way");
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm,
@ -128,13 +128,13 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pR
return 0; return 0;
} }
int32_t mndSyncGetSnapshotInfo(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { int32_t mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
return 0; return 0;
} }
void mndRestoreFinish(struct SSyncFSM *pFsm) { void mndRestoreFinish(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
if (!pMnode->deploy) { if (!pMnode->deploy) {
@ -146,32 +146,30 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
} }
} }
void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {} int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
mInfo("start to read snapshot from sdb"); mInfo("start to read snapshot from sdb");
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL); return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
} }
int32_t mndSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { int32_t mndSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
mInfo("stop to read snapshot from sdb"); mInfo("stop to read snapshot from sdb");
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbStopRead(pMnode->pSdb, pReader); return sdbStopRead(pMnode->pSdb, pReader);
} }
int32_t mndSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { int32_t mndSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len); return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
} }
int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { int32_t mndSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
mInfo("start to apply snapshot to sdb"); mInfo("start to apply snapshot to sdb");
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter); return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
} }
int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { int32_t mndSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply, mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex); pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
@ -179,18 +177,12 @@ int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply,
pSnapshot->lastConfigIndex); pSnapshot->lastConfigIndex);
} }
int32_t mndSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len); return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len);
} }
void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void mndBecomeFollower(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 1);
mInfo("vgId:1, mnode leader transfer finish");
}
static void mndBecomeFollower(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
mInfo("vgId:1, become follower"); mInfo("vgId:1, become follower");
@ -205,7 +197,7 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) {
taosWUnLockLatch(&pMnode->syncMgmt.lock); taosWUnLockLatch(&pMnode->syncMgmt.lock);
} }
static void mndBecomeLeader(struct SSyncFSM *pFsm) { static void mndBecomeLeader(const SSyncFSM *pFsm) {
mInfo("vgId:1, become leader"); mInfo("vgId:1, become leader");
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
} }
@ -217,8 +209,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
pFsm->FpPreCommitCb = NULL; pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL; pFsm->FpRollBackCb = NULL;
pFsm->FpRestoreFinishCb = mndRestoreFinish; pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpLeaderTransferCb = mndLeaderTransfer; pFsm->FpLeaderTransferCb = NULL;
pFsm->FpReConfigCb = mndReConfig; pFsm->FpReConfigCb = NULL;
pFsm->FpBecomeLeaderCb = mndBecomeLeader; pFsm->FpBecomeLeaderCb = mndBecomeLeader;
pFsm->FpBecomeFollowerCb = mndBecomeFollower; pFsm->FpBecomeFollowerCb = mndBecomeFollower;
pFsm->FpGetSnapshot = mndSyncGetSnapshot; pFsm->FpGetSnapshot = mndSyncGetSnapshot;
@ -242,10 +234,13 @@ int32_t mndInitSync(SMnode *pMnode) {
.batchSize = 1, .batchSize = 1,
.vgId = 1, .vgId = 1,
.pWal = pMnode->pWal, .pWal = pMnode->pWal,
.msgcb = NULL, .msgcb = &pMnode->msgCb,
.FpSendMsg = mndSyncSendMsg, .syncSendMSg = mndSyncSendMsg,
.FpEqMsg = mndSyncEqMsg, .syncEqMsg = mndSyncEqMsg,
.FpEqCtrlMsg = mndSyncEqCtrlMsg, .syncEqCtrlMsg = mndSyncEqCtrlMsg,
.pingMs = 5000,
.electMs = 3000,
.heartbeatMs = 500,
}; };
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
@ -269,11 +264,6 @@ int32_t mndInitSync(SMnode *pMnode) {
return -1; return -1;
} }
// decrease election timer
setPingTimerMS(pMgmt->sync, 5000);
setElectTimerMS(pMgmt->sync, 3000);
setHeartbeatTimerMS(pMgmt->sync, 500);
mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync); mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
return 0; return 0;
} }
@ -289,32 +279,26 @@ void mndCleanupSync(SMnode *pMnode) {
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; pMgmt->errCode = 0;
if (req.contLen <= 0) {
terrno = TSDB_CODE_APP_ERROR;
return -1;
}
SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
req.pCont = rpcMallocCont(req.contLen); req.pCont = rpcMallocCont(req.contLen);
if (req.pCont == NULL) return -1; if (req.pCont == NULL) return -1;
memcpy(req.pCont, pRaw, req.contLen); memcpy(req.pCont, pRaw, req.contLen);
pMgmt->errCode = 0;
taosWLockLatch(&pMgmt->lock); taosWLockLatch(&pMgmt->lock);
if (pMgmt->transId != 0) { if (pMgmt->transId != 0) {
mError("trans:%d, can't be proposed since trans:%d alrady waiting for confirm", transId, pMgmt->transId); mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
taosWUnLockLatch(&pMgmt->lock); taosWUnLockLatch(&pMgmt->lock);
terrno = TSDB_CODE_APP_NOT_READY; terrno = TSDB_CODE_APP_NOT_READY;
return -1; return -1;
} else {
pMgmt->transId = transId;
mInfo("trans:%d, will be proposed", pMgmt->transId);
taosWUnLockLatch(&pMgmt->lock);
} }
const bool isWeak = false; mInfo("trans:%d, will be proposed", transId);
int32_t code = syncPropose(pMgmt->sync, &req, isWeak); pMgmt->transId = transId;
taosWUnLockLatch(&pMgmt->lock);
int32_t code = syncPropose(pMgmt->sync, &req, false);
if (code == 0) { if (code == 0) {
mInfo("trans:%d, is proposing and wait sem", pMgmt->transId); mInfo("trans:%d, is proposing and wait sem", pMgmt->transId);
tsem_wait(&pMgmt->syncSem); tsem_wait(&pMgmt->syncSem);
@ -327,8 +311,8 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
code = 0; code = 0;
} else { } else {
taosWLockLatch(&pMgmt->lock);
mInfo("trans:%d, failed to proposed since %s", transId, terrstr()); mInfo("trans:%d, failed to proposed since %s", transId, terrstr());
taosWLockLatch(&pMgmt->lock);
pMgmt->transId = 0; pMgmt->transId = 0;
taosWUnLockLatch(&pMgmt->lock); taosWUnLockLatch(&pMgmt->lock);
if (terrno == TSDB_CODE_SYN_NOT_LEADER) { if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
@ -344,13 +328,12 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
return code; return code;
} }
if (pMgmt->errCode != 0) terrno = pMgmt->errCode; terrno = pMgmt->errCode;
return pMgmt->errCode; return pMgmt->errCode;
} }
void mndSyncStart(SMnode *pMnode) { void mndSyncStart(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
syncStart(pMgmt->sync); syncStart(pMgmt->sync);
mInfo("vgId:1, sync started, id:%" PRId64, pMgmt->sync); mInfo("vgId:1, sync started, id:%" PRId64, pMgmt->sync);
} }

View File

@ -17,7 +17,6 @@
#define _TD_VND_H_ #define _TD_VND_H_
#include "sync.h" #include "sync.h"
#include "syncTools.h"
#include "ttrace.h" #include "ttrace.h"
#include "vnodeInt.h" #include "vnodeInt.h"

View File

@ -1268,7 +1268,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
if (pIter == NULL) break; if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__RECOVER1) { if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE) {
tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus); tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus);
continue; continue;
} }

View File

@ -199,6 +199,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
goto END; goto END;
} }
tqDebug("vgId:%d, taosx get msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, offset,
TMSG_INFO((*ppCkHead)->head.msgType));
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) { if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader, ppCkHead); code = walFetchBody(pHandle->pWalReader, ppCkHead);
@ -216,17 +219,20 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
SWalCont* pHead = &((*ppCkHead)->head); SWalCont* pHead = &((*ppCkHead)->head);
if (IS_META_MSG(pHead->msgType)) { if (IS_META_MSG(pHead->msgType)) {
code = walFetchBody(pHandle->pWalReader, ppCkHead); code = walFetchBody(pHandle->pWalReader, ppCkHead);
if (code < 0) { if (code < 0) {
ASSERT(0); ASSERT(0);
*fetchOffset = offset; *fetchOffset = offset;
code = -1; code = -1;
goto END; goto END;
} }
if (isValValidForTable(pHandle, pHead)) { if (isValValidForTable(pHandle, pHead)) {
*fetchOffset = offset; *fetchOffset = offset;
code = 0; code = 0;
goto END; goto END;
} else {
offset++;
continue;
} }
} }
} }
@ -586,8 +592,39 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0); taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
} }
} }
} else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
if (isAdd) {
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); ++i) {
uint64_t* id = (uint64_t*)taosArrayGet(tbUidList, i);
int32_t code = metaGetTableEntryByUid(&mr, *id);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
continue;
}
tDecoderClear(&mr.coder);
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pExec->execHandle.execTb.suid) {
tqDebug("table uid %" PRId64 " does not add to tq handle", *id);
continue;
}
tqDebug("table uid %" PRId64 " add to tq handle", *id);
taosArrayPush(qa, id);
}
metaReaderClear(&mr);
if (taosArrayGetSize(qa) > 0) {
tqReaderAddTbUidList(pExec->execHandle.pExecReader, qa);
}
taosArrayDestroy(qa);
} else { } else {
// tq update id // TODO handle delete table from stb
}
} else {
ASSERT(0);
} }
} }
while (1) { while (1) {

View File

@ -684,7 +684,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (*state->pDataFReader != NULL) { if (*state->pDataFReader != NULL) {
tsdbDataFReaderClose(state->pDataFReader); tsdbDataFReaderClose(state->pDataFReader);
resetLastBlockLoadInfo(state->pLoadInfo); // resetLastBlockLoadInfo(state->pLoadInfo);
} }
code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet); code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet);
@ -764,7 +764,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (--state->iBlock < 0) { if (--state->iBlock < 0) {
tsdbDataFReaderClose(state->pDataFReader); tsdbDataFReaderClose(state->pDataFReader);
*state->pDataFReader = NULL; *state->pDataFReader = NULL;
resetLastBlockLoadInfo(state->pLoadInfo); // resetLastBlockLoadInfo(state->pLoadInfo);
if (state->aBlockIdx) { if (state->aBlockIdx) {
taosArrayDestroy(state->aBlockIdx); taosArrayDestroy(state->aBlockIdx);

View File

@ -212,6 +212,12 @@ int vnodeCommit(SVnode *pVnode) {
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
pVnode->state.applied); pVnode->state.applied);
// persist wal before starting
if (walPersist(pVnode->pWal) < 0) {
vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr());
return -1;
}
pVnode->state.commitTerm = pVnode->state.applyTerm; pVnode->state.commitTerm = pVnode->state.applyTerm;
// save info // save info

View File

@ -290,78 +290,61 @@ static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return code; return code;
} }
static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
vnodeGetSnapshot(pFsm->data, pSnapshot); vnodeGetSnapshot(pFsm->data, pSnapshot);
return 0; return 0;
} }
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {} static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
if (cbMeta.isWeak == 0) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
if (cbMeta.code == 0) { if (pMeta->code == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); rpcMsg.info = pMsg->info;
rpcMsg.info.conn.applyIndex = cbMeta.index; rpcMsg.info.conn.applyIndex = pMeta->index;
rpcMsg.info.conn.applyTerm = cbMeta.term; rpcMsg.info.conn.applyTerm = pMeta->term;
vInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
", weak:%d, code:%d, state:%d %s, type:%s", ", weak:%d, code:%d, state:%d %s, type:%s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.term, rpcMsg.info.conn.applyIndex, cbMeta.isWeak, syncGetVgId(pVnode->sync), pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak,
cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType)); pMeta->code, pMeta->state, syncUtilState2String(pMeta->state), TMSG_INFO(pMsg->msgType));
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else { } else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info}; SRpcMsg rsp = {.code = pMeta->code, .info = pMsg->info};
vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync), vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync),
TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code)); TMSG_INFO(pMsg->msgType), pMeta->index, pMeta->code, tstrerror(pMeta->code));
if (rsp.info.handle != NULL) { if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
} }
}
static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
if (pMeta->isWeak == 0) {
vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
} }
} }
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
if (cbMeta.isWeak == 1) { if (pMeta->isWeak == 1) {
SVnode *pVnode = pFsm->data; vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
if (cbMeta.code == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
vError("vgId:%d, pre-commit-cb execute error, type:%s, error:0x%x %s", syncGetVgId(pVnode->sync),
TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
}
} }
} }
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncGetVgId(pVnode->sync), pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state,
syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType)); syncUtilState2String(pMeta->state), TMSG_INFO(pMsg->msgType));
} }
#define USE_TSDB_SNAPSHOT #define USE_TSDB_SNAPSHOT
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
SSnapshotParam *pSnapshotParam = pParam; SSnapshotParam *pSnapshotParam = pParam;
@ -373,7 +356,7 @@ static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void
#endif #endif
} }
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { static int32_t vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapReaderClose(pReader); int32_t code = vnodeSnapReaderClose(pReader);
@ -384,7 +367,7 @@ static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
#endif #endif
} }
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len); int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
@ -403,7 +386,7 @@ static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **
#endif #endif
} }
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
SSnapshotParam *pSnapshotParam = pParam; SSnapshotParam *pSnapshotParam = pParam;
@ -427,7 +410,7 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
#endif #endif
} }
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
@ -442,7 +425,7 @@ static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool
#endif #endif
} }
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len); vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len);
@ -454,9 +437,7 @@ static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *
#endif #endif
} }
static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {} static void vnodeRestoreFinish(const SSyncFSM *pFsm) {
static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
do { do {
@ -476,7 +457,7 @@ static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
vDebug("vgId:%d, sync restore finished", pVnode->config.vgId); vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
} }
static void vnodeBecomeFollower(struct SSyncFSM *pFsm) { static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, become follower", pVnode->config.vgId); vDebug("vgId:%d, become follower", pVnode->config.vgId);
@ -490,7 +471,7 @@ static void vnodeBecomeFollower(struct SSyncFSM *pFsm) {
taosThreadMutexUnlock(&pVnode->lock); taosThreadMutexUnlock(&pVnode->lock);
} }
static void vnodeBecomeLeader(struct SSyncFSM *pFsm) { static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, become leader", pVnode->config.vgId); vDebug("vgId:%d, become leader", pVnode->config.vgId);
@ -512,10 +493,10 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinishCb = vnodeRestoreFinish; pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
pFsm->FpLeaderTransferCb = vnodeLeaderTransfer; pFsm->FpLeaderTransferCb = NULL;
pFsm->FpBecomeLeaderCb = vnodeBecomeLeader; pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
pFsm->FpBecomeFollowerCb = vnodeBecomeFollower; pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
pFsm->FpReConfigCb = vnodeSyncReconfig; pFsm->FpReConfigCb = NULL;
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead; pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead; pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
@ -533,10 +514,13 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
.vgId = pVnode->config.vgId, .vgId = pVnode->config.vgId,
.syncCfg = pVnode->config.syncCfg, .syncCfg = pVnode->config.syncCfg,
.pWal = pVnode->pWal, .pWal = pVnode->pWal,
.msgcb = NULL, .msgcb = &pVnode->msgCb,
.FpSendMsg = vnodeSyncSendMsg, .syncSendMSg = vnodeSyncSendMsg,
.FpEqMsg = vnodeSyncEqMsg, .syncEqMsg = vnodeSyncEqMsg,
.FpEqCtrlMsg = vnodeSyncEqCtrlMsg, .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
.pingMs = 5000,
.electMs = 4000,
.heartbeatMs = 700,
}; };
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
@ -555,15 +539,11 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
return -1; return -1;
} }
setPingTimerMS(pVnode->sync, 5000);
setElectTimerMS(pVnode->sync, 4000);
setHeartbeatTimerMS(pVnode->sync, 700);
return 0; return 0;
} }
void vnodeSyncStart(SVnode *pVnode) { void vnodeSyncStart(SVnode *pVnode) {
vDebug("vgId:%d, start sync", pVnode->config.vgId); vDebug("vgId:%d, start sync", pVnode->config.vgId);
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
syncStart(pVnode->sync); syncStart(pVnode->sync);
} }

View File

@ -1883,7 +1883,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
#endif #endif
#if 1
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 || if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) { pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
@ -1919,7 +1918,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
#endif
size_t total = taosArrayGetSize(pInfo->pBlockLists); size_t total = taosArrayGetSize(pInfo->pBlockLists);
// TODO: refactor // TODO: refactor
@ -2301,7 +2299,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
pInfo->vnode = pHandle->vnode; pInfo->vnode = pHandle->vnode;
pInfo->sContext = pHandle->sContext; pInfo->sContext = pHandle->sContext;
pOperator->name = "RawStreamScanOperator"; pOperator->name = "RawScanOperator";
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;

View File

@ -107,9 +107,9 @@ typedef struct SSyncNode {
// sync io // sync io
SWal* pWal; SWal* pWal;
const SMsgCb* msgcb; const SMsgCb* msgcb;
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t (*FpEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
// init internal // init internal
SNodeInfo myNodeInfo; SNodeInfo myNodeInfo;

View File

@ -56,6 +56,12 @@ int64_t syncOpen(SSyncInfo* pSyncInfo) {
return -1; return -1;
} }
pSyncNode->pingBaseLine = pSyncInfo->pingMs;
pSyncNode->pingTimerMS = pSyncInfo->pingMs;
pSyncNode->electBaseLine = pSyncInfo->electMs;
pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
pSyncNode->msgcb = pSyncInfo->msgcb;
return pSyncNode->rid; return pSyncNode->rid;
} }
@ -737,30 +743,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
} }
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandleInfo* pInfo) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR;
}
ASSERT(rid == pSyncNode->rid);
SRespStub stub;
int32_t ret = syncRespMgrGet(pSyncNode->pSyncRespMgr, index, &stub);
if (ret == 1) {
memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg));
}
syncNodeRelease(pSyncNode);
return ret;
}
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR;
}
ASSERT(rid == pSyncNode->rid);
SRespStub stub; SRespStub stub;
int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub); int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
if (ret == 1) { if (ret == 1) {
@ -768,20 +751,6 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo)
} }
sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle); sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle);
syncNodeRelease(pSyncNode);
return ret;
}
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
sTrace("syncSetQ get pSyncNode is NULL, rid:%" PRId64, rid);
return;
}
ASSERT(rid == pSyncNode->rid);
pSyncNode->msgcb = msgcb;
syncNodeRelease(pSyncNode);
} }
char* sync2SimpleStr(int64_t rid) { char* sync2SimpleStr(int64_t rid) {
@ -797,41 +766,6 @@ char* sync2SimpleStr(int64_t rid) {
return s; return s;
} }
void setPingTimerMS(int64_t rid, int32_t pingTimerMS) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return;
}
ASSERT(rid == pSyncNode->rid);
pSyncNode->pingBaseLine = pingTimerMS;
pSyncNode->pingTimerMS = pingTimerMS;
syncNodeRelease(pSyncNode);
}
void setElectTimerMS(int64_t rid, int32_t electTimerMS) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return;
}
ASSERT(rid == pSyncNode->rid);
pSyncNode->electBaseLine = electTimerMS;
syncNodeRelease(pSyncNode);
}
void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return;
}
ASSERT(rid == pSyncNode->rid);
pSyncNode->hbBaseLine = hbTimerMS;
pSyncNode->heartbeatTimerMS = hbTimerMS;
syncNodeRelease(pSyncNode);
}
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
@ -928,7 +862,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
} }
} else { } else {
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { if (pSyncNode->syncEqMsg != NULL && (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
ret = 0; ret = 0;
} else { } else {
ret = -1; ret = -1;
@ -1059,9 +993,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->pWal = pSyncInfo->pWal; pSyncNode->pWal = pSyncInfo->pWal;
pSyncNode->msgcb = pSyncInfo->msgcb; pSyncNode->msgcb = pSyncInfo->msgcb;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
pSyncNode->FpEqCtrlMsg = pSyncInfo->FpEqCtrlMsg; pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
// init raft config // init raft config
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
@ -1577,12 +1511,12 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet; SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet); syncUtilraftId2EpSet(destRaftId, &epSet);
if (pSyncNode->FpSendMsg != NULL) { if (pSyncNode->syncSendMSg != NULL) {
// htonl // htonl
syncUtilMsgHtoN(pMsg->pCont); syncUtilMsgHtoN(pMsg->pCont);
pMsg->info.noResp = 1; pMsg->info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, pMsg); pSyncNode->syncSendMSg(&epSet, pMsg);
} else { } else {
sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId); sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
return -1; return -1;
@ -1594,12 +1528,12 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) { int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet; SEpSet epSet;
syncUtilnodeInfo2EpSet(nodeInfo, &epSet); syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
if (pSyncNode->FpSendMsg != NULL) { if (pSyncNode->syncSendMSg != NULL) {
// htonl // htonl
syncUtilMsgHtoN(pMsg->pCont); syncUtilMsgHtoN(pMsg->pCont);
pMsg->info.noResp = 1; pMsg->info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, pMsg); pSyncNode->syncSendMSg(&epSet, pMsg);
} else { } else {
sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId); sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
} }
@ -1623,13 +1557,13 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
cJSON_AddStringToObject(pRoot, "rpcClient", u64buf); cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->syncSendMSg);
cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf); cJSON_AddStringToObject(pRoot, "syncSendMSg", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
cJSON_AddStringToObject(pRoot, "queue", u64buf); cJSON_AddStringToObject(pRoot, "queue", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->syncEqMsg);
cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf); cJSON_AddStringToObject(pRoot, "syncEqMsg", u64buf);
// init internal // init internal
cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo); cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo);
@ -2642,8 +2576,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) { if (pSyncNode->syncEqMsg != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code); sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
@ -2651,7 +2585,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
return; return;
} }
} else { } else {
sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL"); sTrace("syncNodeEqPingTimer pSyncNode->syncEqMsg is NULL");
} }
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
@ -2676,8 +2610,8 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
pSyncNode->vgId, pSyncNode); pSyncNode->vgId, pSyncNode);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
if (pSyncNode->FpEqMsg != NULL && pSyncNode->msgcb != NULL && pSyncNode->msgcb->putToQueueFp != NULL) { if (pSyncNode->syncEqMsg != NULL && pSyncNode->msgcb != NULL && pSyncNode->msgcb->putToQueueFp != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
@ -2693,7 +2627,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
} while (0); } while (0);
} else { } else {
sTrace("syncNodeEqElectTimer FpEqMsg is NULL"); sTrace("syncNodeEqElectTimer syncEqMsg is NULL");
} }
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
@ -2725,8 +2659,8 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) { if (pSyncNode->syncEqMsg != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code); sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
@ -2734,7 +2668,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
return; return;
} }
} else { } else {
sError("vgId:%d, enqueue msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId); sError("vgId:%d, enqueue msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
} }
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
@ -2781,8 +2715,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
// eq msg // eq msg
#if 0 #if 0
if (pSyncNode->FpEqCtrlMsg != NULL) { if (pSyncNode->syncEqCtrlMsg != NULL) {
int32_t code = pSyncNode->FpEqCtrlMsg(pSyncNode->msgcb, &rpcMsg); int32_t code = pSyncNode->syncEqCtrlMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code); sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
@ -2790,7 +2724,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
return; return;
} }
} else { } else {
sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId); sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
} }
#endif #endif
@ -2830,10 +2764,10 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
if (ths->FpEqMsg != NULL) { if (ths->syncEqMsg != NULL) {
ths->FpEqMsg(ths->msgcb, &rpcMsg); ths->syncEqMsg(ths->msgcb, &rpcMsg);
} else { } else {
sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL"); sTrace("syncNodeEqNoop pSyncNode->syncEqMsg is NULL");
} }
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
@ -2944,8 +2878,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
SRpcMsg rpcMsgLocalCmd; SRpcMsg rpcMsgLocalCmd;
syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd); syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);
if (ths->FpEqMsg != NULL && ths->msgcb != NULL) { if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->FpEqMsg(ths->msgcb, &rpcMsgLocalCmd); int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code); sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
rpcFreeCont(rpcMsgLocalCmd.pCont); rpcFreeCont(rpcMsgLocalCmd.pCont);
@ -3127,17 +3061,18 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
} }
if (ths->pFsm->FpLeaderTransferCb != NULL) { if (ths->pFsm->FpLeaderTransferCb != NULL) {
SFsmCbMeta cbMeta = {0}; SFsmCbMeta cbMeta = {
cbMeta.code = 0; cbMeta.code = 0,
cbMeta.currentTerm = ths->pRaftStore->currentTerm; cbMeta.currentTerm = ths->pRaftStore->currentTerm,
cbMeta.flag = 0; cbMeta.flag = 0,
cbMeta.index = pEntry->index; cbMeta.index = pEntry->index,
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
cbMeta.isWeak = pEntry->isWeak; cbMeta.isWeak = pEntry->isWeak,
cbMeta.seqNum = pEntry->seqNum; cbMeta.seqNum = pEntry->seqNum,
cbMeta.state = ths->state; cbMeta.state = ths->state,
cbMeta.term = pEntry->term; cbMeta.term = pEntry->term,
ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, cbMeta); };
ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
} }
syncLeaderTransferDestroy(pSyncLeaderTransfer); syncLeaderTransferDestroy(pSyncLeaderTransfer);
@ -3315,18 +3250,20 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
// execute fsm in apply thread, or execute outside syncPropose // execute fsm in apply thread, or execute outside syncPropose
if (internalExecute) { if (internalExecute) {
SFsmCbMeta cbMeta = {0}; SFsmCbMeta cbMeta = {
cbMeta.index = pEntry->index; .index = pEntry->index,
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
cbMeta.isWeak = pEntry->isWeak; .isWeak = pEntry->isWeak,
cbMeta.code = 0; .code = 0,
cbMeta.state = ths->state; .state = ths->state,
cbMeta.seqNum = pEntry->seqNum; .seqNum = pEntry->seqNum,
cbMeta.term = pEntry->term; .term = pEntry->term,
cbMeta.currentTerm = ths->pRaftStore->currentTerm; .currentTerm = ths->pRaftStore->currentTerm,
cbMeta.flag = flag; .flag = flag,
};
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); syncGetAndDelRespRpc(ths, cbMeta.seqNum, &rpcMsg.info);
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
} }
} }

View File

@ -145,16 +145,17 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
taosArrayPush(delIndexArray, pSeqNum); taosArrayPush(delIndexArray, pSeqNum);
cnt++; cnt++;
SFsmCbMeta cbMeta = {0}; SFsmCbMeta cbMeta = {
cbMeta.index = SYNC_INDEX_INVALID; cbMeta.index = SYNC_INDEX_INVALID,
cbMeta.lastConfigIndex = SYNC_INDEX_INVALID; cbMeta.lastConfigIndex = SYNC_INDEX_INVALID,
cbMeta.isWeak = false; cbMeta.isWeak = false,
cbMeta.code = TSDB_CODE_SYN_TIMEOUT; cbMeta.code = TSDB_CODE_SYN_TIMEOUT,
cbMeta.state = pSyncNode->state; cbMeta.state = pSyncNode->state,
cbMeta.seqNum = *pSeqNum; cbMeta.seqNum = *pSeqNum,
cbMeta.term = SYNC_TERM_INVALID; cbMeta.term = SYNC_TERM_INVALID,
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm,
cbMeta.flag = 0; cbMeta.flag = 0,
};
pStub->rpcMsg.pCont = NULL; pStub->rpcMsg.pCont = NULL;
pStub->rpcMsg.contLen = 0; pStub->rpcMsg.contLen = 0;

View File

@ -177,18 +177,6 @@ char* syncUtilRaftId2Str(const SRaftId* p) {
} }
const char* syncUtilState2String(ESyncState state) { const char* syncUtilState2String(ESyncState state) {
/*
if (state == TAOS_SYNC_STATE_FOLLOWER) {
return "TAOS_SYNC_STATE_FOLLOWER";
} else if (state == TAOS_SYNC_STATE_CANDIDATE) {
return "TAOS_SYNC_STATE_CANDIDATE";
} else if (state == TAOS_SYNC_STATE_LEADER) {
return "TAOS_SYNC_STATE_LEADER";
} else {
return "TAOS_SYNC_STATE_UNKNOWN";
}
*/
if (state == TAOS_SYNC_STATE_FOLLOWER) { if (state == TAOS_SYNC_STATE_FOLLOWER) {
return "follower"; return "follower";
} else if (state == TAOS_SYNC_STATE_CANDIDATE) { } else if (state == TAOS_SYNC_STATE_CANDIDATE) {

View File

@ -195,8 +195,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = vgId; syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm(); syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal; syncInfo.pWal = pWal;

View File

@ -120,8 +120,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = vgId; syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm(); syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal; syncInfo.pWal = pWal;

View File

@ -45,8 +45,8 @@ SSyncNode* createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWa
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = vgId; syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = NULL; syncInfo.pFsm = NULL;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal; syncInfo.pWal = pWal;

View File

@ -32,8 +32,8 @@ SSyncNode *pSyncNode;
SSyncNode *syncNodeInit() { SSyncNode *syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");

View File

@ -25,8 +25,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
@ -97,7 +97,7 @@ int main(int argc, char** argv) {
SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncEnqTest"); SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncEnqTest");
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
} }

View File

@ -26,8 +26,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
@ -103,7 +103,7 @@ int main(int argc, char** argv) {
SEpSet epSet; SEpSet epSet;
syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet); syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet);
rpcMsg.info.noResp = 1; rpcMsg.info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, &rpcMsg); pSyncNode->syncSendMSg(&epSet, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
} }

View File

@ -26,8 +26,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./sync_init_test"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./sync_init_test");

View File

@ -26,8 +26,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");

View File

@ -26,8 +26,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");

View File

@ -26,8 +26,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");

View File

@ -100,8 +100,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = vgId; syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm(); syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal; syncInfo.pWal = pWal;

View File

@ -87,8 +87,8 @@ void initFsm() {
SSyncNode *syncNodeInit() { SSyncNode *syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir);
@ -204,7 +204,7 @@ int main(int argc, char **argv) {
SyncClientRequest *pSyncClientRequest = pMsg1; SyncClientRequest *pSyncClientRequest = pMsg1;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg); gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
} }

View File

@ -217,8 +217,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = vgId; syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm(); syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal; syncInfo.pWal = pWal;

View File

@ -28,8 +28,8 @@ SSyncNode* pSyncNode;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");

View File

@ -28,8 +28,8 @@ SSyncNode* pSyncNode;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");

View File

@ -65,8 +65,8 @@ void initFsm() {
SSyncNode *syncNodeInit() { SSyncNode *syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir);
@ -179,7 +179,7 @@ int main(int argc, char **argv) {
SyncClientRequest *pSyncClientRequest = pMsg1; SyncClientRequest *pSyncClientRequest = pMsg1;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg); gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
} }

View File

@ -34,6 +34,7 @@ typedef struct {
int64_t createTs; int64_t createTs;
int64_t closeTs; int64_t closeTs;
int64_t fileSize; int64_t fileSize;
int64_t syncedOffset;
} SWalFileInfo; } SWalFileInfo;
typedef struct WalIdxEntry { typedef struct WalIdxEntry {
@ -66,6 +67,12 @@ static inline int64_t walGetLastFileSize(SWal* pWal) {
return pInfo->fileSize; return pInfo->fileSize;
} }
static inline int64_t walGetLastFileCachedSize(SWal* pWal) {
if (taosArrayGetSize(pWal->fileInfoSet) == 0) return 0;
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
return (pInfo->fileSize - pInfo->syncedOffset);
}
static inline int64_t walGetLastFileFirstVer(SWal* pWal) { static inline int64_t walGetLastFileFirstVer(SWal* pWal) {
if (taosArrayGetSize(pWal->fileInfoSet) == 0) return -1; if (taosArrayGetSize(pWal->fileInfoSet) == 0) return -1;
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);

View File

@ -16,6 +16,7 @@
#include "cJSON.h" #include "cJSON.h"
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tglobal.h"
#include "tutil.h" #include "tutil.h"
#include "walInt.h" #include "walInt.h"
@ -65,32 +66,43 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
// ensure size as non-negative // ensure size as non-negative
pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize); pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize);
int64_t stepSize = WAL_SCAN_BUF_SIZE;
uint64_t magic = WAL_MAGIC; uint64_t magic = WAL_MAGIC;
int64_t walCkHeadSz = sizeof(SWalCkHead); int64_t walCkHeadSz = sizeof(SWalCkHead);
int64_t end = fileSize; int64_t end = fileSize;
int64_t offset = 0;
int64_t capacity = 0; int64_t capacity = 0;
int64_t readSize = 0; int64_t readSize = 0;
char* buf = NULL; char* buf = NULL;
int64_t found = -1;
bool firstTrial = pFileInfo->fileSize < fileSize; bool firstTrial = pFileInfo->fileSize < fileSize;
int64_t offset = TMIN(pFileInfo->fileSize, fileSize);
int64_t offsetForward = offset - stepSize + walCkHeadSz - 1;
int64_t offsetBackward = offset;
int64_t retVer = -1;
int64_t lastEntryBeginOffset = 0;
int64_t lastEntryEndOffset = 0;
// check recover size
if (2 * tsWalFsyncDataSizeLimit + offset < end) {
wWarn("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64
", end:%" PRId64 ", file:%s",
pWal->cfg.vgId, 2 * tsWalFsyncDataSizeLimit, offset, end, fnameStr);
}
// search for the valid last WAL entry, e.g. block by block // search for the valid last WAL entry, e.g. block by block
while (1) { while (1) {
offset = (firstTrial) ? pFileInfo->fileSize : TMAX(0, end - WAL_SCAN_BUF_SIZE); offset = (firstTrial) ? TMIN(fileSize, offsetForward + stepSize - walCkHeadSz + 1)
: TMAX(0, offsetBackward - stepSize + walCkHeadSz - 1);
end = TMIN(offset + stepSize, fileSize);
if (firstTrial) {
offsetForward = offset;
} else {
offsetBackward = offset;
}
ASSERT(offset <= end); ASSERT(offset <= end);
readSize = end - offset; readSize = end - offset;
capacity = readSize + sizeof(magic); capacity = readSize + sizeof(magic);
int64_t limit = WAL_RECOV_SIZE_LIMIT;
if (limit < readSize) {
wError("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64
", end:%" PRId64 ", file:%s",
pWal->cfg.vgId, limit, offset, end, fnameStr);
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
goto _err;
}
void* ptr = taosMemoryRealloc(buf, capacity); void* ptr = taosMemoryRealloc(buf, capacity);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
@ -127,6 +139,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
} }
logContent = (SWalCkHead*)(buf + pos); logContent = (SWalCkHead*)(buf + pos);
if (walValidHeadCksum(logContent) != 0) { if (walValidHeadCksum(logContent) != 0) {
terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH;
wWarn("vgId:%d, failed to validate checksum of wal entry header. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, wWarn("vgId:%d, failed to validate checksum of wal entry header. offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
offset + pos, fnameStr); offset + pos, fnameStr);
haystack = buf + pos + 1; haystack = buf + pos + 1;
@ -179,46 +192,41 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
} }
// found one // found one
found = pos; retVer = logContent->head.version;
lastEntryBeginOffset = offset + pos;
lastEntryEndOffset = offset + pos + sizeof(SWalCkHead) + logContent->head.bodyLen;
// try next
haystack = buf + pos + 1; haystack = buf + pos + 1;
} }
if (found >= 0 || offset == 0) break; if (end == fileSize) firstTrial = false;
if (firstTrial && terrno == TSDB_CODE_SUCCESS) continue;
// go backwards, e.g. by at most one WAL scan buf size if (retVer >= 0 || offset == 0) break;
end = TMIN(offset + walCkHeadSz - 1, fileSize);
firstTrial = false;
} }
// determine end of last entry if (retVer < 0) {
SWalCkHead* lastEntry = (found >= 0) ? (SWalCkHead*)(buf + found) : NULL;
int64_t retVer = -1;
int64_t lastEntryBeginOffset = 0;
int64_t lastEntryEndOffset = 0;
if (lastEntry == NULL) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
} else {
retVer = lastEntry->head.version;
lastEntryBeginOffset = offset + (int64_t)((char*)lastEntry - (char*)buf);
lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen;
} }
// truncate file // truncate file
if (lastEntryEndOffset != fileSize) { if (lastEntryEndOffset != fileSize) {
wWarn("vgId:%d, repair meta truncate file %s to %" PRId64 ", orig size %" PRId64, pWal->cfg.vgId, fnameStr, wWarn("vgId:%d, repair meta truncate file %s to %" PRId64 ", orig size %" PRId64, pWal->cfg.vgId, fnameStr,
lastEntryEndOffset, fileSize); lastEntryEndOffset, fileSize);
if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) { if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) {
wError("failed to truncate file due to %s. file:%s", strerror(errno), fnameStr); wError("failed to truncate file due to %s. file:%s", strerror(errno), fnameStr);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
if (taosFsyncFile(pFile) < 0) { if (taosFsyncFile(pFile) < 0) {
wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr); wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
} }
pFileInfo->fileSize = lastEntryEndOffset; pFileInfo->fileSize = lastEntryEndOffset;
taosCloseFile(&pFile); taosCloseFile(&pFile);
@ -621,6 +629,7 @@ int walRollFileInfo(SWal* pWal) {
pNewInfo->createTs = ts; pNewInfo->createTs = ts;
pNewInfo->closeTs = -1; pNewInfo->closeTs = -1;
pNewInfo->fileSize = 0; pNewInfo->fileSize = 0;
pNewInfo->syncedOffset = 0;
taosArrayPush(pArray, pNewInfo); taosArrayPush(pArray, pNewInfo);
taosMemoryFree(pNewInfo); taosMemoryFree(pNewInfo);
return 0; return 0;
@ -771,6 +780,12 @@ static int walFindCurMetaVer(SWal* pWal) {
return metaVer; return metaVer;
} }
void walUpdateSyncedOffset(SWal* pWal) {
SWalFileInfo* pFileInfo = walGetCurFileInfo(pWal);
if (pFileInfo == NULL) return;
pFileInfo->syncedOffset = pFileInfo->fileSize;
}
int walSaveMeta(SWal* pWal) { int walSaveMeta(SWal* pWal) {
int metaVer = walFindCurMetaVer(pWal); int metaVer = walFindCurMetaVer(pWal);
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
@ -790,6 +805,9 @@ int walSaveMeta(SWal* pWal) {
return -1; return -1;
} }
// update synced offset
(void)walUpdateSyncedOffset(pWal);
// flush to a tmpfile // flush to a tmpfile
n = walBuildTmpMetaName(pWal, tmpFnameStr); n = walBuildTmpMetaName(pWal, tmpFnameStr);
ASSERT(n < sizeof(tmpFnameStr) && "Buffer overflow of file name"); ASSERT(n < sizeof(tmpFnameStr) && "Buffer overflow of file name");

View File

@ -187,6 +187,13 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
return 0; return 0;
} }
int32_t walPersist(SWal *pWal) {
taosThreadMutexLock(&pWal->mutex);
int32_t ret = walSaveMeta(pWal);
taosThreadMutexUnlock(&pWal->mutex);
return ret;
}
void walClose(SWal *pWal) { void walClose(SWal *pWal) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
(void)walSaveMeta(pWal); (void)walSaveMeta(pWal);

View File

@ -198,7 +198,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
return -1; return -1;
} }
wDebug("vgId:%d, wal version reset from index:%" PRId64 "(invalid:%d) to index:%" PRId64, pReader->pWal->cfg.vgId, wDebug("vgId:%d, wal version reset from %" PRId64 "(invalid:%d) to %" PRId64, pReader->pWal->cfg.vgId,
pReader->curVersion, pReader->curInvalid, ver); pReader->curVersion, pReader->curInvalid, ver);
pReader->curVersion = ver; pReader->curVersion = ver;
@ -347,23 +347,48 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int64_t code; int64_t code;
int64_t contLen;
bool seeked = false;
wDebug("vgId:%d try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
", applied ver:%" PRId64,
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
pRead->pWal->vers.appliedVer);
// TODO: valid ver // TODO: valid ver
if (ver > pRead->pWal->vers.commitVer) { if (ver > pRead->pWal->vers.appliedVer) {
return -1; return -1;
} }
if (pRead->curInvalid || pRead->curVersion != ver) { if (pRead->curInvalid || pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver); code = walReadSeekVer(pRead, ver);
if (code < 0) return -1; if (code < 0) {
pRead->curVersion = ver;
pRead->curInvalid = 1;
return -1;
}
seeked = true;
} }
ASSERT(taosValidFile(pRead->pLogFile) == true); while (1) {
contLen = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
code = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead)); if (contLen == sizeof(SWalCkHead)) {
if (code != sizeof(SWalCkHead)) { break;
} else if (contLen == 0 && !seeked) {
walReadSeekVerImpl(pRead, ver);
seeked = true;
continue;
} else {
if (contLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
ASSERT(0);
pRead->curInvalid = 1;
return -1; return -1;
} }
}
code = walValidHeadCksum(pHead); code = walValidHeadCksum(pHead);
@ -373,13 +398,20 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
return -1; return -1;
} }
pRead->curInvalid = 0;
return 0; return 0;
} }
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
int64_t code; int64_t code;
// ASSERT(pRead->curVersion == pHead->head.version); wDebug("vgId:%d skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
", applied ver:%" PRId64,
pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);
ASSERT(pRead->curVersion == pHead->head.version);
ASSERT(pRead->curInvalid == 0);
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR); code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
if (code < 0) { if (code < 0) {
@ -397,6 +429,11 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
SWalCont *pReadHead = &((*ppHead)->head); SWalCont *pReadHead = &((*ppHead)->head);
int64_t ver = pReadHead->version; int64_t ver = pReadHead->version;
wDebug("vgId:%d fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
", applied ver:%" PRId64,
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
pRead->pWal->vers.appliedVer);
if (pRead->capacity < pReadHead->bodyLen) { if (pRead->capacity < pReadHead->bodyLen) {
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen); SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
if (ptr == NULL) { if (ptr == NULL) {
@ -409,19 +446,32 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
} }
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) { if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
if (pReadHead->bodyLen < 0) {
ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno));
} else {
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
pRead->pWal->cfg.vgId, pReadHead->version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
pRead->curInvalid = 1;
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
if (pReadHead->version != ver) { if (pReadHead->version != ver) {
ASSERT(0);
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pRead->pHead->head.version, ver); pReadHead->version, ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
if (walValidBodyCksum(*ppHead) != 0) { if (walValidBodyCksum(*ppHead) != 0) {
ASSERT(0);
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
ver); ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;

View File

@ -16,6 +16,7 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tglobal.h"
#include "walInt.h" #include "walInt.h"
int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
@ -252,23 +253,36 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
} }
} }
if (walGetLastFileCachedSize(pWal) > tsWalFsyncDataSizeLimit) {
if (walSaveMeta(pWal) < 0) {
return -1;
}
}
return 0; return 0;
} }
int32_t walBeginSnapshot(SWal *pWal, int64_t ver) { int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
taosThreadMutexLock(&pWal->mutex);
pWal->vers.verInSnapshotting = ver; pWal->vers.verInSnapshotting = ver;
wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64,
pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer); pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer);
// check file rolling // check file rolling
if (pWal->cfg.retentionPeriod == 0) { if (pWal->cfg.retentionPeriod == 0) {
taosThreadMutexLock(&pWal->mutex);
if (walGetLastFileSize(pWal) != 0) { if (walGetLastFileSize(pWal) != 0) {
walRollImpl(pWal); if (walRollImpl(pWal) < 0) {
wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr());
goto _err;
}
}
} }
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
}
return 0; return 0;
_err:
taosThreadMutexUnlock(&pWal->mutex);
return -1;
} }
int32_t walEndSnapshot(SWal *pWal) { int32_t walEndSnapshot(SWal *pWal) {

View File

@ -227,9 +227,7 @@ void taosGetSystemInfo() {
#ifdef WINDOWS #ifdef WINDOWS
taosGetCpuCores(&tsNumOfCores); taosGetCpuCores(&tsNumOfCores);
taosGetTotalMemory(&tsTotalMemoryKB); taosGetTotalMemory(&tsTotalMemoryKB);
taosGetCpuUsage(NULL, NULL);
double tmp1, tmp2, tmp3, tmp4;
taosGetCpuUsage(&tmp1, &tmp2);
#elif defined(_TD_DARWIN_64) #elif defined(_TD_DARWIN_64)
long physical_pages = sysconf(_SC_PHYS_PAGES); long physical_pages = sysconf(_SC_PHYS_PAGES);
long page_size = sysconf(_SC_PAGESIZE); long page_size = sysconf(_SC_PAGESIZE);
@ -240,9 +238,7 @@ void taosGetSystemInfo() {
taosGetProcIOnfos(); taosGetProcIOnfos();
taosGetCpuCores(&tsNumOfCores); taosGetCpuCores(&tsNumOfCores);
taosGetTotalMemory(&tsTotalMemoryKB); taosGetTotalMemory(&tsTotalMemoryKB);
taosGetCpuUsage(NULL, NULL);
double tmp1, tmp2, tmp3, tmp4;
taosGetCpuUsage(&tmp1, &tmp2);
#endif #endif
} }
@ -447,8 +443,8 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) {
static int64_t curSysTotal = 0; static int64_t curSysTotal = 0;
static int64_t curProcTotal = 0; static int64_t curProcTotal = 0;
*cpu_system = 0; if (cpu_system != NULL) *cpu_system = 0;
*cpu_engine = 0; if (cpu_engine != NULL) *cpu_engine = 0;
SysCpuInfo sysCpu = {0}; SysCpuInfo sysCpu = {0};
ProcCpuInfo procCpu = {0}; ProcCpuInfo procCpu = {0};
@ -458,8 +454,12 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) {
curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime; curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime;
if (curSysTotal > lastSysTotal && curSysUsed >= lastSysUsed && curProcTotal >= lastProcTotal) { if (curSysTotal > lastSysTotal && curSysUsed >= lastSysUsed && curProcTotal >= lastProcTotal) {
*cpu_engine = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100; if (cpu_system != NULL) {
*cpu_system = (curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100; *cpu_system = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100;
}
if (cpu_engine != NULL) {
*cpu_engine = (curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100;
}
} }
lastSysUsed = curSysUsed; lastSysUsed = curSysUsed;