refactor(sync): add term in rpcMsg
This commit is contained in:
parent
fcb2197200
commit
b73914fb0d
|
@ -27,7 +27,7 @@ extern "C" {
|
|||
|
||||
#define TAOS_CONN_SERVER 0
|
||||
#define TAOS_CONN_CLIENT 1
|
||||
#define IsReq(pMsg) (pMsg->msgType & 1U)
|
||||
#define IsReq(pMsg) (pMsg->msgType & 1U)
|
||||
|
||||
extern int32_t tsRpcHeadSize;
|
||||
|
||||
|
@ -35,12 +35,13 @@ typedef struct {
|
|||
uint32_t clientIp;
|
||||
uint16_t clientPort;
|
||||
int64_t applyIndex;
|
||||
uint64_t applyTerm;
|
||||
char user[TSDB_USER_LEN];
|
||||
} SRpcConnInfo;
|
||||
|
||||
typedef struct SRpcHandleInfo {
|
||||
// rpc info
|
||||
void * handle; // rpc handle returned to app
|
||||
void *handle; // rpc handle returned to app
|
||||
int64_t refId; // refid, used by server
|
||||
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
|
||||
int32_t persistHandle; // persist handle or not
|
||||
|
@ -53,7 +54,7 @@ typedef struct SRpcHandleInfo {
|
|||
void *node; // node mgmt handle
|
||||
|
||||
// resp info
|
||||
void * rsp;
|
||||
void *rsp;
|
||||
int32_t rspLen;
|
||||
|
||||
// conn info
|
||||
|
@ -62,7 +63,7 @@ typedef struct SRpcHandleInfo {
|
|||
|
||||
typedef struct SRpcMsg {
|
||||
tmsg_t msgType;
|
||||
void * pCont;
|
||||
void *pCont;
|
||||
int32_t contLen;
|
||||
int32_t code;
|
||||
SRpcHandleInfo info;
|
||||
|
@ -74,7 +75,7 @@ typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
|
|||
typedef struct SRpcInit {
|
||||
char localFqdn[TSDB_FQDN_LEN];
|
||||
uint16_t localPort; // local port
|
||||
char * label; // for debug purpose
|
||||
char *label; // for debug purpose
|
||||
int32_t numOfThreads; // number of threads to handle connections
|
||||
int32_t sessions; // number of sessions allowed
|
||||
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
|
||||
|
@ -99,12 +100,12 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t msgType;
|
||||
void * val;
|
||||
void *val;
|
||||
int32_t (*clone)(void *src, void **dst);
|
||||
} SRpcBrokenlinkVal;
|
||||
|
||||
typedef struct {
|
||||
SHashObj * args;
|
||||
SHashObj *args;
|
||||
SRpcBrokenlinkVal brokenVal;
|
||||
void (*freeFunc)(const void *arg);
|
||||
} SRpcCtx;
|
||||
|
|
|
@ -411,6 +411,7 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
|||
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
||||
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
|
||||
rpcMsg.info.conn.applyIndex = cbMeta.index;
|
||||
rpcMsg.info.conn.applyTerm = cbMeta.term;
|
||||
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
||||
}
|
||||
|
||||
|
|
|
@ -818,6 +818,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
|||
int32_t code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, &retIndex);
|
||||
if (code == 0) {
|
||||
pMsg->info.conn.applyIndex = retIndex;
|
||||
pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
|
||||
ret = 1;
|
||||
|
|
Loading…
Reference in New Issue