From bc8e6b7fd7a2a7d02cda9d0d471e786bf1c6ecc9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 8 Jun 2022 21:17:17 +0800 Subject: [PATCH] refactor: adjust vnode sync --- include/libs/sync/syncTools.h | 2 +- include/libs/transport/trpc.h | 5 +- source/dnode/vnode/src/vnd/vnodeSync.c | 74 ++++++++++---------------- source/libs/sync/src/syncMain.c | 4 +- 4 files changed, 34 insertions(+), 51 deletions(-) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 4b160c9e61..bd396edf55 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -33,7 +33,7 @@ SSyncNode* syncNodeAcquire(int64_t rid); void syncNodeRelease(SSyncNode* pNode); int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); -int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); +int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo); void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb); char* sync2SimpleStr(int64_t rid); diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 839194da94..1535f9d290 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -33,7 +33,10 @@ extern int32_t tsRpcHeadSize; typedef struct { uint32_t clientIp; uint16_t clientPort; - char user[TSDB_USER_LEN]; + union { + char user[TSDB_USER_LEN]; + int64_t applyIndex; + }; } SRpcConnInfo; typedef struct SRpcHandleInfo { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index efff466c18..203eecd8ab 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -35,7 +35,6 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode) { vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count); tsem_wait(&pVnode->syncSem); - vTrace("vgId:%d, ===> block finish, count:%d", pVnode->config.vgId, count); } static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) { @@ -93,6 +92,11 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { if (code == 0) { vnodeAccumBlockMsg(pVnode, pMsg->msgType); + if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) { + // todo refactor + SRpcMsg rsp = {.code = code, .info = pMsg->info}; + tmsgSendRsp(&rsp); + } } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { SEpSet newEpSet = {0}; syncGetEpSet(pVnode->sync, &newEpSet); @@ -132,36 +136,22 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { for (int32_t i = 0; i < numOfMsgs; ++i) { if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; - vTrace("vgId:%d, msg:%p get from vnode-apply queue", vgId, pMsg); + vTrace("vgId:%d, msg:%p get from vnode-apply queue, handle:%p", vgId, pMsg, pMsg->info.handle); - // init response rpc msg - SRpcMsg rsp = {0}; - - // get original rpc msg - assert(pMsg->msgType == TDMT_SYNC_APPLY_MSG); - SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(pMsg); - syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg); - SRpcMsg originalRpcMsg; - syncApplyMsg2OriginalRpcMsg(pSyncApplyMsg, &originalRpcMsg); - - // apply data into tsdb - if (vnodeProcessWriteReq(pVnode, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) { - rsp.code = terrno; - vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr()); + SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; + if (rsp.code == 0) { + if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->conn.applyIndex, &rsp) < 0) { + rsp.code = terrno; + vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr()); + } } - syncApplyMsgDestroy(pSyncApplyMsg); - rpcFreeCont(originalRpcMsg.pCont); - - vnodePostBlockMsg(pVnode, originalRpcMsg.msgType); - - // if leader, send response - if (pMsg->info.handle != NULL) { - rsp.info = pMsg->info; + vnodePostBlockMsg(pVnode, pMsg->msgType); + if (rsp.info.handle != NULL) { tmsgSendRsp(&rsp); } - vTrace("vgId:%d, msg:%p is freed, code:0x%x handle:%p", vgId, pMsg, rsp.code, pMsg->info.handle); + vTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, rsp.code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -195,43 +185,33 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigC vInfo("vgId:%d, sync reconfig is confirmed", TD_VID(pVnode)); // todo rpc response here + // build rpc msg + // put into apply queue } static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + SVnode *pVnode = pFsm->data; + SSnapshot snapshot = {0}; SyncIndex beginIndex = SYNC_INDEX_INVALID; + char logBuf[256] = {0}; + if (pFsm->FpGetSnapshot != NULL) { - SSnapshot snapshot = {0}; - pFsm->FpGetSnapshot(pFsm, &snapshot); + (*pFsm->FpGetSnapshot)(pFsm, &snapshot); beginIndex = snapshot.lastApplyIndex; } if (cbMeta.index > beginIndex) { - char logBuf[256] = {0}; snprintf( logBuf, sizeof(logBuf), "==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); - SVnode *pVnode = pFsm->data; - SyncApplyMsg *pSyncApplyMsg = syncApplyMsgBuild2(pMsg, pVnode->config.vgId, &cbMeta); - SRpcMsg applyMsg; - syncApplyMsg2RpcMsg(pSyncApplyMsg, &applyMsg); - syncApplyMsgDestroy(pSyncApplyMsg); - - // recover handle for response - SRpcMsg saveRpcMsg; - int32_t ret = syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &saveRpcMsg); - if (ret == 1 && cbMeta.state == TAOS_SYNC_STATE_LEADER) { - applyMsg.info = saveRpcMsg.info; - } else { - applyMsg.info.handle = NULL; - applyMsg.info.ahandle = NULL; - } - - // put to applyQ - tmsgPutToQueue(&(pVnode->msgCb), APPLY_QUEUE, &applyMsg); - + SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index}; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); + syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); + tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); } else { char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d522b829dd..795d3e3c27 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -281,7 +281,7 @@ int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { return ret; } -int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { +int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; @@ -291,7 +291,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { SRespStub stub; int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub); if (ret == 1) { - memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg)); + *pInfo = stub.rpcMsg.info; } taosReleaseRef(tsNodeRefId, pSyncNode->rid);