diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 4b160c9e61..bd396edf55 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -33,7 +33,7 @@ SSyncNode* syncNodeAcquire(int64_t rid); void syncNodeRelease(SSyncNode* pNode); int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); -int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); +int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo); void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb); char* sync2SimpleStr(int64_t rid); diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 839194da94..1535f9d290 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -33,7 +33,10 @@ extern int32_t tsRpcHeadSize; typedef struct { uint32_t clientIp; uint16_t clientPort; - char user[TSDB_USER_LEN]; + union { + char user[TSDB_USER_LEN]; + int64_t applyIndex; + }; } SRpcConnInfo; typedef struct SRpcHandleInfo { diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index adc18fdced..0f7daf0e1d 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -52,10 +52,9 @@ typedef struct { typedef struct { int32_t vgId; - int32_t refCount; int32_t vgVersion; + int32_t refCount; int8_t dropped; - int8_t accessState; char *path; SVnode *pImpl; STaosQueue *pWriteQ; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 4c5a32536f..0ae6c2b336 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -323,7 +323,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MON_VM_INFO, vmPutMsgToMonitorQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MON_VM_LOAD, vmPutMsgToMonitorQueue, 0) == NULL) goto _OVER; - // Requests handled by VNODE if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 23927255bb..9e4e7713f2 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -49,10 +49,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { } pVnode->vgId = pCfg->vgId; - pVnode->refCount = 0; pVnode->vgVersion = pCfg->vgVersion; + pVnode->refCount = 0; pVnode->dropped = 0; - pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->path = tstrdup(pCfg->path); pVnode->pImpl = pImpl; @@ -96,7 +95,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { dDebug("vgId:%d, vnode is closed", pVnode->vgId); if (pVnode->dropped) { - dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped); + dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId); vnodeDestroy(path, pMgmt->pTfs); } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 427b3d5c94..b6913f93f2 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -53,9 +53,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { } if (IsReq(pMsg)) { - if (code != 0 && terrno != 0) { + if (code != 0) { + if (terrno != 0) code = terrno; dError("msg:%p failed to process since %s", pMsg, terrstr()); - code = terrno; } vmSendRsp(pMsg, code); } @@ -97,110 +97,6 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - int32_t code = 0; - SRpcMsg *pMsg = NULL; - SVnodeObj *pVnode = pInfo->ahandle; - int64_t sync = vnodeGetSyncHandle(pVnode->pImpl); - SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg **)); - - for (int32_t m = 0; m < numOfMsgs; m++) { - if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; - dTrace("vgId:%d, msg:%p get from vnode-write queue", pVnode->vgId, pMsg); - - if (taosArrayPush(pArray, &pMsg) == NULL) { - dError("vgId:%d, failed to push msg:%p to vnode-write array", pVnode->vgId, pMsg); - vmSendRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY); - } - } - - for (int32_t m = 0; m < taosArrayGetSize(pArray); m++) { - pMsg = *(SRpcMsg **)taosArrayGet(pArray, m); - code = vnodePreprocessReq(pVnode->pImpl, pMsg); - - if (code == TSDB_CODE_ACTION_IN_PROGRESS) { - dTrace("vgId:%d, msg:%p in progress and no rsp", pVnode->vgId, pMsg); - continue; - } - - if (pMsg->msgType != TDMT_VND_ALTER_REPLICA) { - code = syncPropose(sync, pMsg, false); - } - - if (code == TAOS_SYNC_PROPOSE_SUCCESS) { - dTrace("vgId:%d, msg:%p is proposed and no rsp", pVnode->vgId, pMsg); - continue; - } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { - SEpSet newEpSet = {0}; - syncGetEpSet(sync, &newEpSet); - SEp *pEp = &newEpSet.eps[newEpSet.inUse]; - if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) { - newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps; - } - - dTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->vgId, pMsg, - newEpSet.numOfEps, newEpSet.inUse); - for (int32_t i = 0; i < newEpSet.numOfEps; ++i) { - dTrace("vgId:%d, msg:%p ep:%s:%u", pVnode->vgId, pMsg, newEpSet.eps[i].fqdn, newEpSet.eps[i].port); - } - - SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; - tmsgSendRedirectRsp(&rsp, &newEpSet); - } else { - dError("vgId:%d, msg:%p failed to propose write since %s, code:0x%x", pVnode->vgId, pMsg, tstrerror(code), code); - vmSendRsp(pMsg, code); - } - } - - for (int32_t i = 0; i < numOfMsgs; i++) { - pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); - dTrace("vgId:%d, msg:%p is freed", pVnode->vgId, pMsg); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - } - - taosArrayDestroy(pArray); -} - -static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg *pMsg = NULL; - - for (int32_t i = 0; i < numOfMsgs; ++i) { - if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; - dTrace("vgId:%d, msg:%p get from vnode-apply queue", pVnode->vgId, pMsg); - - // init response rpc msg - SRpcMsg rsp = {0}; - - // get original rpc msg - assert(pMsg->msgType == TDMT_SYNC_APPLY_MSG); - SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(pMsg); - syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg); - SRpcMsg originalRpcMsg; - syncApplyMsg2OriginalRpcMsg(pSyncApplyMsg, &originalRpcMsg); - - // apply data into tsdb - if (vnodeProcessWriteReq(pVnode->pImpl, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) { - rsp.code = terrno; - dError("vgId:%d, msg:%p failed to apply since %s", pVnode->vgId, pMsg, terrstr()); - } - - syncApplyMsgDestroy(pSyncApplyMsg); - rpcFreeCont(originalRpcMsg.pCont); - - // if leader, send response - if (pMsg->info.handle != NULL) { - rsp.info = pMsg->info; - tmsgSendRsp(&rsp); - } - - dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, rsp.code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - } -} - static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; SRpcMsg *pMsg = NULL; @@ -322,7 +218,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { if (pMsg == NULL) return -1; SMsgHead *pHead = pRpc->pCont; - dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pMsg->msgType)); + dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType)); pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); @@ -362,9 +258,9 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { } int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { - pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue); + pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeMsg); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); - pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); + pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeApplyMsg); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue); @@ -381,8 +277,8 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); - tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); + tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tWWorkerFreeQueue(&pMgmt->mergePool, pVnode->pMergeQ); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 8c7a78a5af..01e57d5eaf 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -71,6 +71,9 @@ int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader); int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); +void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); +void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); + // meta typedef struct SMeta SMeta; // todo: remove typedef struct SMetaReader SMetaReader; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 5f4f7e70da..b0599b82ef 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -84,7 +84,6 @@ int32_t vnodeAsyncCommit(SVnode* pVnode); int32_t vnodeSyncOpen(SVnode* pVnode, char* path); void vnodeSyncStart(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode); -int32_t vnodeSyncAlter(SVnode* pVnode, SRpcMsg* pMsg); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index de2fc03f69..5532f202fc 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -220,8 +220,10 @@ struct SVnode { SWal* pWal; STQ* pTq; SSink* pSink; - int64_t sync; tsem_t canCommit; + int64_t sync; + int32_t syncCount; + sem_t syncSem; SQHandle* pQuery; }; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index a85b830616..dfc258b42b 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -81,7 +81,9 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->state.applied = info.state.committed; pVnode->pTfs = pTfs; pVnode->msgCb = msgCb; + pVnode->syncCount = 0; + tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&(pVnode->canCommit), 0, 1); // open buffer pool @@ -175,6 +177,7 @@ void vnodeClose(SVnode *pVnode) { vnodeCloseBufPool(pVnode); // destroy handle tsem_destroy(&(pVnode->canCommit)); + tsem_destroy(&pVnode->syncSem); taosMemoryFree(pVnode); } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 7a9c9ef393..1524d8fe6f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -91,9 +91,6 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { } } break; - case TDMT_VND_ALTER_REPLICA: { - code = vnodeSyncAlter(pVnode, pMsg); - } break; default: break; } @@ -107,7 +104,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp int32_t len; int32_t ret; - vTrace("vgId:%d, start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), + vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); pVnode->state.applied = version; @@ -173,7 +170,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp break; } - vTrace("vgId:%d, process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); + vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 8f98e9cec5..203eecd8ab 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -16,13 +16,239 @@ #define _DEFAULT_SOURCE #include "vnd.h" -static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg); -static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg); -static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode); -static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta); -static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta); -static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta); -static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot); +static inline bool vnodeIsMsgBlock(tmsg_t type) { + return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_ALTER_REPLICA); +} + +static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } + +static inline void vnodeAccumBlockMsg(SVnode *pVnode, tmsg_t type) { + if (!vnodeIsMsgBlock(type)) return; + + int32_t count = atomic_add_fetch_32(&pVnode->syncCount, 1); + vTrace("vgId:%d, accum block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type)); +} + +static inline void vnodeWaitBlockMsg(SVnode *pVnode) { + int32_t count = atomic_load_32(&pVnode->syncCount); + if (count <= 0) return; + + vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count); + tsem_wait(&pVnode->syncSem); +} + +static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) { + if (!vnodeIsMsgBlock(type)) return; + + int32_t count = atomic_load_32(&pVnode->syncCount); + if (count <= 0) return; + + count = atomic_sub_fetch_32(&pVnode->syncCount, 1); + vTrace("vgId:%d, post block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type)); + if (count <= 0) { + tsem_post(&pVnode->syncSem); + } +} + +static int32_t vnodeProcessSyncReconfigReq(SVnode *pVnode, SRpcMsg *pMsg) { + SAlterVnodeReq req = {0}; + if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return TSDB_CODE_INVALID_MSG; + } + + vInfo("vgId:%d, start to alter vnode replica to %d", TD_VID(pVnode), req.replica); + SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex}; + for (int32_t r = 0; r < req.replica; ++r) { + SNodeInfo *pNode = &cfg.nodeInfo[r]; + tstrncpy(pNode->nodeFqdn, req.replicas[r].fqdn, sizeof(pNode->nodeFqdn)); + pNode->nodePort = req.replicas[r].port; + vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort); + } + + return syncReconfig(pVnode->sync, &cfg); +} + +void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnode *pVnode = pInfo->ahandle; + int32_t vgId = pVnode->config.vgId; + int32_t code = 0; + SRpcMsg *pMsg = NULL; + + for (int32_t m = 0; m < numOfMsgs; m++) { + if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + vTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle); + + if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) { + code = vnodeProcessSyncReconfigReq(pVnode, pMsg); + } else { + code = vnodePreprocessReq(pVnode, pMsg); + if (code != 0) { + vError("vgId:%d, failed to pre-process msg:%p since %s", vgId, pMsg, terrstr()); + } else { + code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType)); + } + } + + if (code == 0) { + vnodeAccumBlockMsg(pVnode, pMsg->msgType); + if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) { + // todo refactor + SRpcMsg rsp = {.code = code, .info = pMsg->info}; + tmsgSendRsp(&rsp); + } + } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { + SEpSet newEpSet = {0}; + syncGetEpSet(pVnode->sync, &newEpSet); + SEp *pEp = &newEpSet.eps[newEpSet.inUse]; + if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) { + newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps; + } + + vTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps, + newEpSet.inUse); + for (int32_t i = 0; i < newEpSet.numOfEps; ++i) { + vTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port); + } + + SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; + tmsgSendRedirectRsp(&rsp, &newEpSet); + } else { + if (terrno != 0) code = terrno; + vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code); + SRpcMsg rsp = {.code = code, .info = pMsg->info}; + tmsgSendRsp(&rsp); + } + + vTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } + + vnodeWaitBlockMsg(pVnode); +} + +void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnode *pVnode = pInfo->ahandle; + int32_t vgId = pVnode->config.vgId; + int32_t code = 0; + SRpcMsg *pMsg = NULL; + + for (int32_t i = 0; i < numOfMsgs; ++i) { + if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + vTrace("vgId:%d, msg:%p get from vnode-apply queue, handle:%p", vgId, pMsg, pMsg->info.handle); + + SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; + if (rsp.code == 0) { + if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->conn.applyIndex, &rsp) < 0) { + rsp.code = terrno; + vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr()); + } + } + + vnodePostBlockMsg(pVnode, pMsg->msgType); + if (rsp.info.handle != NULL) { + tmsgSendRsp(&rsp); + } + + vTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, rsp.code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } +} + +static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); + if (code != 0) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + } + return code; +} + +static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { + int32_t code = tmsgSendReq(pEpSet, pMsg); + if (code != 0) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + } + return code; +} + +static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { + vnodeGetSnapshot(pFsm->data, pSnapshot); + return 0; +} + +static void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { + SVnode *pVnode = pFsm->data; + vInfo("vgId:%d, sync reconfig is confirmed", TD_VID(pVnode)); + + // todo rpc response here + // build rpc msg + // put into apply queue +} + +static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + SVnode *pVnode = pFsm->data; + SSnapshot snapshot = {0}; + SyncIndex beginIndex = SYNC_INDEX_INVALID; + char logBuf[256] = {0}; + + if (pFsm->FpGetSnapshot != NULL) { + (*pFsm->FpGetSnapshot)(pFsm, &snapshot); + beginIndex = snapshot.lastApplyIndex; + } + + if (cbMeta.index > beginIndex) { + snprintf( + logBuf, sizeof(logBuf), + "==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex); + syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); + + SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index}; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); + syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); + tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); + } else { + char logBuf[256] = {0}; + snprintf(logBuf, sizeof(logBuf), + "==callback== ==CommitCb== do not execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, " + "beginIndex :%ld\n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), + beginIndex); + syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); + } +} + +static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + char logBuf[256] = {0}; + snprintf(logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, + cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); +} + +static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + char logBuf[256] = {0}; + snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); +} + +static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { + SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); + pFsm->data = pVnode; + pFsm->FpCommitCb = vnodeSyncCommitMsg; + pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; + pFsm->FpRollBackCb = vnodeSyncRollBackMsg; + pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; + pFsm->FpRestoreFinishCb = NULL; + pFsm->FpReConfigCb = vnodeSyncReconfig; + return pFsm; +} int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { SSyncInfo syncInfo = { @@ -50,33 +276,6 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { return 0; } -int32_t vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) { - SAlterVnodeReq req = {0}; - if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return TSDB_CODE_INVALID_MSG; - } - - vInfo("vgId:%d, start to alter vnode replica to %d", TD_VID(pVnode), req.replica); - SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex}; - for (int32_t r = 0; r < req.replica; ++r) { - SNodeInfo *pNode = &cfg.nodeInfo[r]; - tstrncpy(pNode->nodeFqdn, req.replicas[r].fqdn, sizeof(pNode->nodeFqdn)); - pNode->nodePort = req.replicas[r].port; - vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort); - } - - int32_t code = syncReconfig(pVnode->sync, &cfg); - if (code == TAOS_SYNC_PROPOSE_SUCCESS) { - // todo refactor - SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - tmsgSendRsp(&rsp); - return TSDB_CODE_ACTION_IN_PROGRESS; - } - - return code; -} - void vnodeSyncStart(SVnode *pVnode) { syncSetMsgCb(pVnode->sync, &pVnode->msgCb); if (pVnode->config.standby) { @@ -87,107 +286,3 @@ void vnodeSyncStart(SVnode *pVnode) { } void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } - -int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { - int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); - if (code != 0) { - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - } - return code; -} - -int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { - int32_t code = tmsgSendReq(pEpSet, pMsg); - if (code != 0) { - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - } - return code; -} - -int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { - vnodeGetSnapshot(pFsm->data, pSnapshot); - return 0; -} - -void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { - SVnode *pVnode = pFsm->data; - vInfo("vgId:%d, sync reconfig is confirmed", TD_VID(pVnode)); - - // todo rpc response here -} - -void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SyncIndex beginIndex = SYNC_INDEX_INVALID; - if (pFsm->FpGetSnapshot != NULL) { - SSnapshot snapshot = {0}; - pFsm->FpGetSnapshot(pFsm, &snapshot); - beginIndex = snapshot.lastApplyIndex; - } - - if (cbMeta.index > beginIndex) { - char logBuf[256] = {0}; - snprintf( - logBuf, sizeof(logBuf), - "==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); - - SVnode *pVnode = pFsm->data; - SyncApplyMsg *pSyncApplyMsg = syncApplyMsgBuild2(pMsg, pVnode->config.vgId, &cbMeta); - SRpcMsg applyMsg; - syncApplyMsg2RpcMsg(pSyncApplyMsg, &applyMsg); - syncApplyMsgDestroy(pSyncApplyMsg); - - // recover handle for response - SRpcMsg saveRpcMsg; - int32_t ret = syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &saveRpcMsg); - if (ret == 1 && cbMeta.state == TAOS_SYNC_STATE_LEADER) { - applyMsg.info = saveRpcMsg.info; - } else { - applyMsg.info.handle = NULL; - applyMsg.info.ahandle = NULL; - } - - // put to applyQ - tmsgPutToQueue(&(pVnode->msgCb), APPLY_QUEUE, &applyMsg); - - } else { - char logBuf[256] = {0}; - snprintf(logBuf, sizeof(logBuf), - "==callback== ==CommitCb== do not execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, " - "beginIndex :%ld\n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), - beginIndex); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); - } -} - -void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - char logBuf[256] = {0}; - snprintf(logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, - cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); -} - -void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - char logBuf[256] = {0}; - snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); -} - -SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { - SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); - pFsm->data = pVnode; - pFsm->FpCommitCb = vnodeSyncCommitMsg; - pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; - pFsm->FpRollBackCb = vnodeSyncRollBackMsg; - pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; - pFsm->FpRestoreFinishCb = NULL; - pFsm->FpReConfigCb = vnodeSyncReconfig; - - return pFsm; -} \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d522b829dd..795d3e3c27 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -281,7 +281,7 @@ int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { return ret; } -int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { +int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; @@ -291,7 +291,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { SRespStub stub; int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub); if (ret == 1) { - memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg)); + *pInfo = stub.rpcMsg.info; } taosReleaseRef(tsNodeRefId, pSyncNode->rid);