refactor: add sync sem for vnode
This commit is contained in:
		
							parent
							
								
									1e2fdd57d6
								
							
						
					
					
						commit
						ae9e11bb0f
					
				| 
						 | 
				
			
			@ -52,10 +52,9 @@ typedef struct {
 | 
			
		|||
 | 
			
		||||
typedef struct {
 | 
			
		||||
  int32_t     vgId;
 | 
			
		||||
  int32_t     refCount;
 | 
			
		||||
  int32_t     vgVersion;
 | 
			
		||||
  int32_t     refCount;
 | 
			
		||||
  int8_t      dropped;
 | 
			
		||||
  int8_t      accessState;
 | 
			
		||||
  char       *path;
 | 
			
		||||
  SVnode     *pImpl;
 | 
			
		||||
  STaosQueue *pWriteQ;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -323,7 +323,6 @@ SArray *vmGetMsgHandles() {
 | 
			
		|||
  if (dmSetMgmtHandle(pArray, TDMT_MON_VM_INFO, vmPutMsgToMonitorQueue, 0) == NULL) goto _OVER;
 | 
			
		||||
  if (dmSetMgmtHandle(pArray, TDMT_MON_VM_LOAD, vmPutMsgToMonitorQueue, 0) == NULL) goto _OVER;
 | 
			
		||||
 | 
			
		||||
  // Requests handled by VNODE
 | 
			
		||||
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
 | 
			
		||||
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
 | 
			
		||||
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -49,10 +49,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  pVnode->vgId = pCfg->vgId;
 | 
			
		||||
  pVnode->refCount = 0;
 | 
			
		||||
  pVnode->vgVersion = pCfg->vgVersion;
 | 
			
		||||
  pVnode->refCount = 0;
 | 
			
		||||
  pVnode->dropped = 0;
 | 
			
		||||
  pVnode->accessState = TSDB_VN_ALL_ACCCESS;
 | 
			
		||||
  pVnode->path = tstrdup(pCfg->path);
 | 
			
		||||
  pVnode->pImpl = pImpl;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -96,7 +95,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
 | 
			
		|||
  dDebug("vgId:%d, vnode is closed", pVnode->vgId);
 | 
			
		||||
 | 
			
		||||
  if (pVnode->dropped) {
 | 
			
		||||
    dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped);
 | 
			
		||||
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
 | 
			
		||||
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
 | 
			
		||||
    vnodeDestroy(path, pMgmt->pTfs);
 | 
			
		||||
  }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -53,9 +53,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  if (IsReq(pMsg)) {
 | 
			
		||||
    if (code != 0 && terrno != 0) {
 | 
			
		||||
    if (code != 0) {
 | 
			
		||||
      if (terrno != 0) code = terrno;
 | 
			
		||||
      dError("msg:%p failed to process since %s", pMsg, terrstr());
 | 
			
		||||
      code = terrno;
 | 
			
		||||
    }
 | 
			
		||||
    vmSendRsp(pMsg, code);
 | 
			
		||||
  }
 | 
			
		||||
| 
						 | 
				
			
			@ -97,110 +97,6 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
 | 
			
		|||
  taosFreeQitem(pMsg);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
 | 
			
		||||
  int32_t    code = 0;
 | 
			
		||||
  SRpcMsg   *pMsg = NULL;
 | 
			
		||||
  SVnodeObj *pVnode = pInfo->ahandle;
 | 
			
		||||
  int64_t    sync = vnodeGetSyncHandle(pVnode->pImpl);
 | 
			
		||||
  SArray    *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg **));
 | 
			
		||||
 | 
			
		||||
  for (int32_t m = 0; m < numOfMsgs; m++) {
 | 
			
		||||
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
 | 
			
		||||
    dTrace("vgId:%d, msg:%p get from vnode-write queue", pVnode->vgId, pMsg);
 | 
			
		||||
 | 
			
		||||
    if (taosArrayPush(pArray, &pMsg) == NULL) {
 | 
			
		||||
      dError("vgId:%d, failed to push msg:%p to vnode-write array", pVnode->vgId, pMsg);
 | 
			
		||||
      vmSendRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  for (int32_t m = 0; m < taosArrayGetSize(pArray); m++) {
 | 
			
		||||
    pMsg = *(SRpcMsg **)taosArrayGet(pArray, m);
 | 
			
		||||
    code = vnodePreprocessReq(pVnode->pImpl, pMsg);
 | 
			
		||||
 | 
			
		||||
    if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
 | 
			
		||||
      dTrace("vgId:%d, msg:%p in progress and no rsp", pVnode->vgId, pMsg);
 | 
			
		||||
      continue;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (pMsg->msgType != TDMT_VND_ALTER_REPLICA) {
 | 
			
		||||
      code = syncPropose(sync, pMsg, false);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (code == TAOS_SYNC_PROPOSE_SUCCESS) {
 | 
			
		||||
      dTrace("vgId:%d, msg:%p is proposed and no rsp", pVnode->vgId, pMsg);
 | 
			
		||||
      continue;
 | 
			
		||||
    } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
 | 
			
		||||
      SEpSet newEpSet = {0};
 | 
			
		||||
      syncGetEpSet(sync, &newEpSet);
 | 
			
		||||
      SEp *pEp = &newEpSet.eps[newEpSet.inUse];
 | 
			
		||||
      if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) {
 | 
			
		||||
        newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      dTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->vgId, pMsg,
 | 
			
		||||
             newEpSet.numOfEps, newEpSet.inUse);
 | 
			
		||||
      for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
 | 
			
		||||
        dTrace("vgId:%d, msg:%p ep:%s:%u", pVnode->vgId, pMsg, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
 | 
			
		||||
      tmsgSendRedirectRsp(&rsp, &newEpSet);
 | 
			
		||||
    } else {
 | 
			
		||||
      dError("vgId:%d, msg:%p failed to propose write since %s, code:0x%x", pVnode->vgId, pMsg, tstrerror(code), code);
 | 
			
		||||
      vmSendRsp(pMsg, code);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  for (int32_t i = 0; i < numOfMsgs; i++) {
 | 
			
		||||
    pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
 | 
			
		||||
    dTrace("vgId:%d, msg:%p is freed", pVnode->vgId, pMsg);
 | 
			
		||||
    rpcFreeCont(pMsg->pCont);
 | 
			
		||||
    taosFreeQitem(pMsg);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  taosArrayDestroy(pArray);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
 | 
			
		||||
  SVnodeObj *pVnode = pInfo->ahandle;
 | 
			
		||||
  SRpcMsg   *pMsg = NULL;
 | 
			
		||||
 | 
			
		||||
  for (int32_t i = 0; i < numOfMsgs; ++i) {
 | 
			
		||||
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
 | 
			
		||||
    dTrace("vgId:%d, msg:%p get from vnode-apply queue", pVnode->vgId, pMsg);
 | 
			
		||||
 | 
			
		||||
    // init response rpc msg
 | 
			
		||||
    SRpcMsg rsp = {0};
 | 
			
		||||
 | 
			
		||||
    // get original rpc msg
 | 
			
		||||
    assert(pMsg->msgType == TDMT_SYNC_APPLY_MSG);
 | 
			
		||||
    SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(pMsg);
 | 
			
		||||
    syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg);
 | 
			
		||||
    SRpcMsg originalRpcMsg;
 | 
			
		||||
    syncApplyMsg2OriginalRpcMsg(pSyncApplyMsg, &originalRpcMsg);
 | 
			
		||||
 | 
			
		||||
    // apply data into tsdb
 | 
			
		||||
    if (vnodeProcessWriteReq(pVnode->pImpl, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) {
 | 
			
		||||
      rsp.code = terrno;
 | 
			
		||||
      dError("vgId:%d, msg:%p failed to apply since %s", pVnode->vgId, pMsg, terrstr());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    syncApplyMsgDestroy(pSyncApplyMsg);
 | 
			
		||||
    rpcFreeCont(originalRpcMsg.pCont);
 | 
			
		||||
 | 
			
		||||
    // if leader, send response
 | 
			
		||||
    if (pMsg->info.handle != NULL) {
 | 
			
		||||
      rsp.info = pMsg->info;
 | 
			
		||||
      tmsgSendRsp(&rsp);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, rsp.code);
 | 
			
		||||
    rpcFreeCont(pMsg->pCont);
 | 
			
		||||
    taosFreeQitem(pMsg);
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
 | 
			
		||||
  SVnodeObj *pVnode = pInfo->ahandle;
 | 
			
		||||
  SRpcMsg   *pMsg = NULL;
 | 
			
		||||
| 
						 | 
				
			
			@ -322,7 +218,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
 | 
			
		|||
  if (pMsg == NULL) return -1;
 | 
			
		||||
 | 
			
		||||
  SMsgHead *pHead = pRpc->pCont;
 | 
			
		||||
  dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pMsg->msgType));
 | 
			
		||||
  dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType));
 | 
			
		||||
 | 
			
		||||
  pHead->contLen = htonl(pHead->contLen);
 | 
			
		||||
  pHead->vgId = htonl(pHead->vgId);
 | 
			
		||||
| 
						 | 
				
			
			@ -362,9 +258,9 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
 | 
			
		||||
  pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
 | 
			
		||||
  pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeMsg);
 | 
			
		||||
  pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
 | 
			
		||||
  pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
 | 
			
		||||
  pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeApplyMsg);
 | 
			
		||||
  pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
 | 
			
		||||
  pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
 | 
			
		||||
  pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue);
 | 
			
		||||
| 
						 | 
				
			
			@ -381,8 +277,8 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
 | 
			
		|||
 | 
			
		||||
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
 | 
			
		||||
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
 | 
			
		||||
  tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
 | 
			
		||||
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
 | 
			
		||||
  tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
 | 
			
		||||
  tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
 | 
			
		||||
  tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
 | 
			
		||||
  tWWorkerFreeQueue(&pMgmt->mergePool, pVnode->pMergeQ);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -71,6 +71,9 @@ int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader);
 | 
			
		|||
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData);
 | 
			
		||||
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
 | 
			
		||||
 | 
			
		||||
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
 | 
			
		||||
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
 | 
			
		||||
 | 
			
		||||
// meta
 | 
			
		||||
typedef struct SMeta       SMeta;  // todo: remove
 | 
			
		||||
typedef struct SMetaReader SMetaReader;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -84,7 +84,6 @@ int32_t vnodeAsyncCommit(SVnode* pVnode);
 | 
			
		|||
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
 | 
			
		||||
void    vnodeSyncStart(SVnode* pVnode);
 | 
			
		||||
void    vnodeSyncClose(SVnode* pVnode);
 | 
			
		||||
int32_t vnodeSyncAlter(SVnode* pVnode, SRpcMsg* pMsg);
 | 
			
		||||
 | 
			
		||||
#ifdef __cplusplus
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -232,8 +232,10 @@ struct SVnode {
 | 
			
		|||
  SWal*      pWal;
 | 
			
		||||
  STQ*       pTq;
 | 
			
		||||
  SSink*     pSink;
 | 
			
		||||
  int64_t    sync;
 | 
			
		||||
  tsem_t     canCommit;
 | 
			
		||||
  int64_t    sync;
 | 
			
		||||
  int32_t    syncCount;
 | 
			
		||||
  sem_t      syncSem;
 | 
			
		||||
  SQHandle*  pQuery;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -81,7 +81,9 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
 | 
			
		|||
  pVnode->state.applied = info.state.committed;
 | 
			
		||||
  pVnode->pTfs = pTfs;
 | 
			
		||||
  pVnode->msgCb = msgCb;
 | 
			
		||||
  pVnode->syncCount = 0;
 | 
			
		||||
 | 
			
		||||
  tsem_init(&pVnode->syncSem, 0, 0);
 | 
			
		||||
  tsem_init(&(pVnode->canCommit), 0, 1);
 | 
			
		||||
 | 
			
		||||
  // open buffer pool
 | 
			
		||||
| 
						 | 
				
			
			@ -175,6 +177,7 @@ void vnodeClose(SVnode *pVnode) {
 | 
			
		|||
    vnodeCloseBufPool(pVnode);
 | 
			
		||||
    // destroy handle
 | 
			
		||||
    tsem_destroy(&(pVnode->canCommit));
 | 
			
		||||
    tsem_destroy(&pVnode->syncSem);
 | 
			
		||||
    taosMemoryFree(pVnode);
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -91,9 +91,6 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
 | 
			
		|||
      }
 | 
			
		||||
 | 
			
		||||
    } break;
 | 
			
		||||
    case TDMT_VND_ALTER_REPLICA: {
 | 
			
		||||
      code = vnodeSyncAlter(pVnode, pMsg);
 | 
			
		||||
    } break;
 | 
			
		||||
    default:
 | 
			
		||||
      break;
 | 
			
		||||
  }
 | 
			
		||||
| 
						 | 
				
			
			@ -107,7 +104,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
 | 
			
		|||
  int32_t len;
 | 
			
		||||
  int32_t ret;
 | 
			
		||||
 | 
			
		||||
  vTrace("vgId:%d, start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
 | 
			
		||||
  vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
 | 
			
		||||
         version);
 | 
			
		||||
 | 
			
		||||
  pVnode->state.applied = version;
 | 
			
		||||
| 
						 | 
				
			
			@ -173,7 +170,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
 | 
			
		|||
      break;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  vTrace("vgId:%d, process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
 | 
			
		||||
  vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
 | 
			
		||||
 | 
			
		||||
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
 | 
			
		||||
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -16,41 +16,42 @@
 | 
			
		|||
#define _DEFAULT_SOURCE
 | 
			
		||||
#include "vnd.h"
 | 
			
		||||
 | 
			
		||||
static int32_t   vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg);
 | 
			
		||||
static int32_t   vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg);
 | 
			
		||||
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode);
 | 
			
		||||
static void      vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
 | 
			
		||||
static void      vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
 | 
			
		||||
static void      vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
 | 
			
		||||
static int32_t   vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot);
 | 
			
		||||
 | 
			
		||||
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
 | 
			
		||||
  SSyncInfo syncInfo = {
 | 
			
		||||
      .vgId = pVnode->config.vgId,
 | 
			
		||||
      .isStandBy = pVnode->config.standby,
 | 
			
		||||
      .syncCfg = pVnode->config.syncCfg,
 | 
			
		||||
      .pWal = pVnode->pWal,
 | 
			
		||||
      .msgcb = NULL,
 | 
			
		||||
      .FpSendMsg = vnodeSyncSendMsg,
 | 
			
		||||
      .FpEqMsg = vnodeSyncEqMsg,
 | 
			
		||||
  };
 | 
			
		||||
 | 
			
		||||
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
 | 
			
		||||
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
 | 
			
		||||
 | 
			
		||||
  pVnode->sync = syncOpen(&syncInfo);
 | 
			
		||||
  if (pVnode->sync <= 0) {
 | 
			
		||||
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
 | 
			
		||||
    return -1;
 | 
			
		||||
static inline bool vnodeIsMsgBlock(tmsg_t type) {
 | 
			
		||||
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_ALTER_REPLICA);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
  setPingTimerMS(pVnode->sync, 3000);
 | 
			
		||||
  setElectTimerMS(pVnode->sync, 500);
 | 
			
		||||
  setHeartbeatTimerMS(pVnode->sync, 100);
 | 
			
		||||
  return 0;
 | 
			
		||||
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
 | 
			
		||||
 | 
			
		||||
static inline void vnodeAccumBlockMsg(SVnode *pVnode, tmsg_t type) {
 | 
			
		||||
  if (!vnodeIsMsgBlock(type)) return;
 | 
			
		||||
 | 
			
		||||
  int32_t count = atomic_add_fetch_32(&pVnode->syncCount, 1);
 | 
			
		||||
  vTrace("vgId:%d, accum block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int32_t vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) {
 | 
			
		||||
static inline void vnodeWaitBlockMsg(SVnode *pVnode) {
 | 
			
		||||
  int32_t count = atomic_load_32(&pVnode->syncCount);
 | 
			
		||||
  if (count <= 0) return;
 | 
			
		||||
 | 
			
		||||
  vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count);
 | 
			
		||||
  tsem_wait(&pVnode->syncSem);
 | 
			
		||||
  vTrace("vgId:%d, ===> block finish, count:%d", pVnode->config.vgId, count);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) {
 | 
			
		||||
  if (!vnodeIsMsgBlock(type)) return;
 | 
			
		||||
 | 
			
		||||
  int32_t count = atomic_load_32(&pVnode->syncCount);
 | 
			
		||||
  if (count <= 0) return;
 | 
			
		||||
 | 
			
		||||
  count = atomic_sub_fetch_32(&pVnode->syncCount, 1);
 | 
			
		||||
  vTrace("vgId:%d, post block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
 | 
			
		||||
  if (count <= 0) {
 | 
			
		||||
    tsem_post(&pVnode->syncSem);
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int32_t vnodeProcessSyncReconfigReq(SVnode *pVnode, SRpcMsg *pMsg) {
 | 
			
		||||
  SAlterVnodeReq req = {0};
 | 
			
		||||
  if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
 | 
			
		||||
    terrno = TSDB_CODE_INVALID_MSG;
 | 
			
		||||
| 
						 | 
				
			
			@ -66,29 +67,107 @@ int32_t vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) {
 | 
			
		|||
    vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  int32_t code = syncReconfig(pVnode->sync, &cfg);
 | 
			
		||||
  if (code == TAOS_SYNC_PROPOSE_SUCCESS) {
 | 
			
		||||
    // todo refactor
 | 
			
		||||
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
 | 
			
		||||
    tmsgSendRsp(&rsp);
 | 
			
		||||
    return TSDB_CODE_ACTION_IN_PROGRESS;
 | 
			
		||||
  return syncReconfig(pVnode->sync, &cfg);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
  return code;
 | 
			
		||||
}
 | 
			
		||||
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
 | 
			
		||||
  SVnode  *pVnode = pInfo->ahandle;
 | 
			
		||||
  int32_t  vgId = pVnode->config.vgId;
 | 
			
		||||
  int32_t  code = 0;
 | 
			
		||||
  SRpcMsg *pMsg = NULL;
 | 
			
		||||
 | 
			
		||||
void vnodeSyncStart(SVnode *pVnode) {
 | 
			
		||||
  syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
 | 
			
		||||
  if (pVnode->config.standby) {
 | 
			
		||||
    syncStartStandBy(pVnode->sync);
 | 
			
		||||
  for (int32_t m = 0; m < numOfMsgs; m++) {
 | 
			
		||||
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
 | 
			
		||||
    vTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle);
 | 
			
		||||
 | 
			
		||||
    if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
 | 
			
		||||
      code = vnodeProcessSyncReconfigReq(pVnode, pMsg);
 | 
			
		||||
    } else {
 | 
			
		||||
    syncStart(pVnode->sync);
 | 
			
		||||
      code = vnodePreprocessReq(pVnode, pMsg);
 | 
			
		||||
      if (code != 0) {
 | 
			
		||||
        vError("vgId:%d, failed to pre-process msg:%p since %s", vgId, pMsg, terrstr());
 | 
			
		||||
      } else {
 | 
			
		||||
        code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType));
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
 | 
			
		||||
    if (code == 0) {
 | 
			
		||||
      vnodeAccumBlockMsg(pVnode, pMsg->msgType);
 | 
			
		||||
    } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
 | 
			
		||||
      SEpSet newEpSet = {0};
 | 
			
		||||
      syncGetEpSet(pVnode->sync, &newEpSet);
 | 
			
		||||
      SEp *pEp = &newEpSet.eps[newEpSet.inUse];
 | 
			
		||||
      if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) {
 | 
			
		||||
        newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
 | 
			
		||||
      vTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
 | 
			
		||||
             newEpSet.inUse);
 | 
			
		||||
      for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
 | 
			
		||||
        vTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
 | 
			
		||||
      tmsgSendRedirectRsp(&rsp, &newEpSet);
 | 
			
		||||
    } else {
 | 
			
		||||
      if (terrno != 0) code = terrno;
 | 
			
		||||
      vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
 | 
			
		||||
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
 | 
			
		||||
      tmsgSendRsp(&rsp);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    vTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
 | 
			
		||||
    rpcFreeCont(pMsg->pCont);
 | 
			
		||||
    taosFreeQitem(pMsg);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  vnodeWaitBlockMsg(pVnode);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
 | 
			
		||||
  SVnode  *pVnode = pInfo->ahandle;
 | 
			
		||||
  int32_t  vgId = pVnode->config.vgId;
 | 
			
		||||
  int32_t  code = 0;
 | 
			
		||||
  SRpcMsg *pMsg = NULL;
 | 
			
		||||
 | 
			
		||||
  for (int32_t i = 0; i < numOfMsgs; ++i) {
 | 
			
		||||
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
 | 
			
		||||
    vTrace("vgId:%d, msg:%p get from vnode-apply queue", vgId, pMsg);
 | 
			
		||||
 | 
			
		||||
    // init response rpc msg
 | 
			
		||||
    SRpcMsg rsp = {0};
 | 
			
		||||
 | 
			
		||||
    // get original rpc msg
 | 
			
		||||
    assert(pMsg->msgType == TDMT_SYNC_APPLY_MSG);
 | 
			
		||||
    SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(pMsg);
 | 
			
		||||
    syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg);
 | 
			
		||||
    SRpcMsg originalRpcMsg;
 | 
			
		||||
    syncApplyMsg2OriginalRpcMsg(pSyncApplyMsg, &originalRpcMsg);
 | 
			
		||||
 | 
			
		||||
    // apply data into tsdb
 | 
			
		||||
    if (vnodeProcessWriteReq(pVnode, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) {
 | 
			
		||||
      rsp.code = terrno;
 | 
			
		||||
      vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    syncApplyMsgDestroy(pSyncApplyMsg);
 | 
			
		||||
    rpcFreeCont(originalRpcMsg.pCont);
 | 
			
		||||
 | 
			
		||||
    vnodePostBlockMsg(pVnode, originalRpcMsg.msgType);
 | 
			
		||||
 | 
			
		||||
    // if leader, send response
 | 
			
		||||
    if (pMsg->info.handle != NULL) {
 | 
			
		||||
      rsp.info = pMsg->info;
 | 
			
		||||
      tmsgSendRsp(&rsp);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    vTrace("vgId:%d, msg:%p is freed, code:0x%x handle:%p", vgId, pMsg, rsp.code, pMsg->info.handle);
 | 
			
		||||
    rpcFreeCont(pMsg->pCont);
 | 
			
		||||
    taosFreeQitem(pMsg);
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
 | 
			
		||||
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
 | 
			
		||||
  if (code != 0) {
 | 
			
		||||
    rpcFreeCont(pMsg->pCont);
 | 
			
		||||
| 
						 | 
				
			
			@ -97,7 +176,7 @@ int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
 | 
			
		|||
  return code;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
 | 
			
		||||
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
 | 
			
		||||
  int32_t code = tmsgSendReq(pEpSet, pMsg);
 | 
			
		||||
  if (code != 0) {
 | 
			
		||||
    rpcFreeCont(pMsg->pCont);
 | 
			
		||||
| 
						 | 
				
			
			@ -106,19 +185,19 @@ int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
 | 
			
		|||
  return code;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
 | 
			
		||||
static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
 | 
			
		||||
  vnodeGetSnapshot(pFsm->data, pSnapshot);
 | 
			
		||||
  return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
 | 
			
		||||
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
 | 
			
		||||
  SVnode *pVnode = pFsm->data;
 | 
			
		||||
  vInfo("vgId:%d, sync reconfig is confirmed", TD_VID(pVnode));
 | 
			
		||||
 | 
			
		||||
  // todo rpc response here
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
 | 
			
		||||
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
 | 
			
		||||
  SyncIndex beginIndex = SYNC_INDEX_INVALID;
 | 
			
		||||
  if (pFsm->FpGetSnapshot != NULL) {
 | 
			
		||||
    SSnapshot snapshot = {0};
 | 
			
		||||
| 
						 | 
				
			
			@ -164,7 +243,7 @@ void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta)
 | 
			
		|||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
 | 
			
		||||
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
 | 
			
		||||
  char logBuf[256] = {0};
 | 
			
		||||
  snprintf(logBuf, sizeof(logBuf),
 | 
			
		||||
           "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
 | 
			
		||||
| 
						 | 
				
			
			@ -172,14 +251,14 @@ void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMet
 | 
			
		|||
  syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
 | 
			
		||||
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
 | 
			
		||||
  char logBuf[256] = {0};
 | 
			
		||||
  snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
 | 
			
		||||
           pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
 | 
			
		||||
  syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
 | 
			
		||||
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
 | 
			
		||||
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
 | 
			
		||||
  pFsm->data = pVnode;
 | 
			
		||||
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
 | 
			
		||||
| 
						 | 
				
			
			@ -188,6 +267,42 @@ SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
 | 
			
		|||
  pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
 | 
			
		||||
  pFsm->FpRestoreFinishCb = NULL;
 | 
			
		||||
  pFsm->FpReConfigCb = vnodeSyncReconfig;
 | 
			
		||||
 | 
			
		||||
  return pFsm;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
 | 
			
		||||
  SSyncInfo syncInfo = {
 | 
			
		||||
      .vgId = pVnode->config.vgId,
 | 
			
		||||
      .isStandBy = pVnode->config.standby,
 | 
			
		||||
      .syncCfg = pVnode->config.syncCfg,
 | 
			
		||||
      .pWal = pVnode->pWal,
 | 
			
		||||
      .msgcb = NULL,
 | 
			
		||||
      .FpSendMsg = vnodeSyncSendMsg,
 | 
			
		||||
      .FpEqMsg = vnodeSyncEqMsg,
 | 
			
		||||
  };
 | 
			
		||||
 | 
			
		||||
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
 | 
			
		||||
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
 | 
			
		||||
 | 
			
		||||
  pVnode->sync = syncOpen(&syncInfo);
 | 
			
		||||
  if (pVnode->sync <= 0) {
 | 
			
		||||
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
 | 
			
		||||
    return -1;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  setPingTimerMS(pVnode->sync, 3000);
 | 
			
		||||
  setElectTimerMS(pVnode->sync, 500);
 | 
			
		||||
  setHeartbeatTimerMS(pVnode->sync, 100);
 | 
			
		||||
  return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void vnodeSyncStart(SVnode *pVnode) {
 | 
			
		||||
  syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
 | 
			
		||||
  if (pVnode->config.standby) {
 | 
			
		||||
    syncStartStandBy(pVnode->sync);
 | 
			
		||||
  } else {
 | 
			
		||||
    syncStart(pVnode->sync);
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue