Merge pull request #5571 from taosdata/feature/TD-3036
[TD-3036]<feature>: syncdb <db_name> replica;
This commit is contained in:
commit
ccbb062cec
|
@ -43,6 +43,7 @@ int32_t dnodeInitServer() {
|
|||
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToVMgmtQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToVMgmtQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeDispatchToVMgmtQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue;
|
||||
|
|
|
@ -30,6 +30,7 @@ static taos_queue tsVMgmtQueue = NULL;
|
|||
static void * dnodeProcessMgmtQueue(void *param);
|
||||
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
|
||||
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
|
||||
static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *pMsg);
|
||||
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
|
||||
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
|
||||
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
|
||||
|
@ -39,6 +40,7 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
|
|||
int32_t dnodeInitVMgmt() {
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeProcessSyncVnodeMsg;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
|
||||
|
@ -179,6 +181,13 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *rpcMsg) {
|
||||
SSyncVnodeMsg *pSyncVnode = rpcMsg->pCont;
|
||||
pSyncVnode->vgId = htonl(pSyncVnode->vgId);
|
||||
|
||||
return vnodeSync(pSyncVnode->vgId);
|
||||
}
|
||||
|
||||
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
|
||||
SDropVnodeMsg *pDrop = rpcMsg->pCont;
|
||||
pDrop->vgId = htonl(pDrop->vgId);
|
||||
|
|
|
@ -202,10 +202,11 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
|
|||
char clusterId[TSDB_CLUSTER_ID_LEN];
|
||||
dnodeGetClusterId(clusterId);
|
||||
if (clusterId[0] != '\0') {
|
||||
dError("exit zombie dropped dnode");
|
||||
exit(EXIT_FAILURE);
|
||||
dError("exit zombie dropped dnode");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" )
|
|||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_SYNC_VNODE, "sync-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
|
||||
|
@ -389,7 +390,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
} SDropVnodeMsg;
|
||||
} SDropVnodeMsg, SSyncVnodeMsg;
|
||||
|
||||
typedef struct SColIndex {
|
||||
int16_t colId; // column id
|
||||
|
|
|
@ -60,6 +60,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
|
|||
int32_t vnodeDrop(int32_t vgId);
|
||||
int32_t vnodeOpen(int32_t vgId);
|
||||
int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
|
||||
int32_t vnodeSync(int32_t vgId);
|
||||
int32_t vnodeClose(int32_t vgId);
|
||||
|
||||
// vnodeMgmt
|
||||
|
@ -89,4 +90,4 @@ int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -49,6 +49,7 @@ void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable);
|
|||
void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle);
|
||||
void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
|
||||
void mnodeSendAlterVgroupMsg(SVgObj *pVgroup);
|
||||
void mnodeSendSyncVgroupMsg(SVgObj *pVgroup);
|
||||
|
||||
SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup);
|
||||
SRpcEpSet mnodeGetEpSetFromIp(char *ep);
|
||||
|
|
|
@ -1186,8 +1186,44 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
|
|||
return mnodeDropDb(pMsg);
|
||||
}
|
||||
|
||||
static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) {
|
||||
void *pIter = NULL;
|
||||
SVgObj *pVgroup = NULL;
|
||||
while (1) {
|
||||
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
|
||||
if (pVgroup == NULL) break;
|
||||
if (pVgroup->pDb == pDb) {
|
||||
mnodeSendSyncVgroupMsg(pVgroup);
|
||||
}
|
||||
mnodeDecVgroupRef(pVgroup);
|
||||
}
|
||||
|
||||
mLInfo("db:%s, is synced by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) {
|
||||
return 0;
|
||||
SSyncDbMsg *pSyncDb = pMsg->rpcMsg.pCont;
|
||||
mDebug("db:%s, syncdb is received from thandle:%p, ignore:%d", pSyncDb->db, pMsg->rpcMsg.handle, pSyncDb->ignoreNotExists);
|
||||
|
||||
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pSyncDb->db);
|
||||
if (pMsg->pDb == NULL) {
|
||||
if (pSyncDb->ignoreNotExists) {
|
||||
mDebug("db:%s, db is not exist, treat as success", pSyncDb->db);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
mError("db:%s, failed to sync, invalid db", pSyncDb->db);
|
||||
return TSDB_CODE_MND_INVALID_DB;
|
||||
}
|
||||
}
|
||||
|
||||
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
|
||||
mError("db:%s, status:%d, in dropping", pSyncDb->db, pMsg->pDb->status);
|
||||
return TSDB_CODE_MND_DB_IN_DROPPING;
|
||||
}
|
||||
|
||||
return mnodeSyncDb(pMsg->pDb, pMsg);
|
||||
}
|
||||
|
||||
void mnodeDropAllDbs(SAcctObj *pAcct) {
|
||||
|
|
|
@ -60,6 +60,7 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
|
|||
static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
|
||||
static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg);
|
||||
static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg);
|
||||
static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg);
|
||||
static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) ;
|
||||
static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle);
|
||||
|
@ -236,6 +237,7 @@ int32_t mnodeInitVgroups() {
|
|||
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_VGROUP, mnodeCancelGetNextVgroup);
|
||||
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp);
|
||||
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessAlterVnodeRsp);
|
||||
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessSyncVnodeRsp);
|
||||
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mnodeProcessDropVnodeRsp);
|
||||
mnodeAddPeerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mnodeProcessVnodeCfgMsg);
|
||||
|
||||
|
@ -967,6 +969,38 @@ void mnodeSendAlterVgroupMsg(SVgObj *pVgroup) {
|
|||
}
|
||||
}
|
||||
|
||||
static SSyncVnodeMsg *mnodeBuildSyncVnodeMsg(int32_t vgId) {
|
||||
SSyncVnodeMsg *pSyncVnode = rpcMallocCont(sizeof(SSyncVnodeMsg));
|
||||
if (pSyncVnode == NULL) return NULL;
|
||||
|
||||
pSyncVnode->vgId = htonl(vgId);
|
||||
return pSyncVnode;
|
||||
}
|
||||
|
||||
static void mnodeSendSyncVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
|
||||
SSyncVnodeMsg *pSyncVnode = mnodeBuildSyncVnodeMsg(pVgroup->vgId);
|
||||
SRpcMsg rpcMsg = {
|
||||
.ahandle = NULL,
|
||||
.pCont = pSyncVnode,
|
||||
.contLen = pSyncVnode ? sizeof(SSyncVnodeMsg) : 0,
|
||||
.code = 0,
|
||||
.msgType = TSDB_MSG_TYPE_MD_SYNC_VNODE
|
||||
};
|
||||
|
||||
dnodeSendMsgToDnode(epSet, &rpcMsg);
|
||||
}
|
||||
|
||||
void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) {
|
||||
mDebug("vgId:%d, send sync all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes,
|
||||
pVgroup->dbName);
|
||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
|
||||
mDebug("vgId:%d, index:%d, send sync vnode msg to dnode %s", pVgroup->vgId, i,
|
||||
pVgroup->vnodeGid[i].pDnode->dnodeEp);
|
||||
mnodeSendSyncVnodeMsg(pVgroup, &epSet);
|
||||
}
|
||||
}
|
||||
|
||||
static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet, void *ahandle) {
|
||||
SCreateVnodeMsg *pCreate = mnodeBuildVnodeMsg(pVgroup);
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -994,6 +1028,10 @@ static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) {
|
|||
mDebug("alter vnode rsp received");
|
||||
}
|
||||
|
||||
static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg) {
|
||||
mDebug("sync vnode rsp received");
|
||||
}
|
||||
|
||||
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
|
||||
if (rpcMsg->ahandle == NULL) return;
|
||||
|
||||
|
|
|
@ -410,7 +410,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force)
|
|||
syncReleaseNode(pNode);
|
||||
}
|
||||
|
||||
#if 0
|
||||
#if 1
|
||||
void syncRecover(int64_t rid) {
|
||||
SSyncPeer *pPeer;
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
|
|||
int32_t vnodeDrop(int32_t vgId);
|
||||
int32_t vnodeOpen(int32_t vgId);
|
||||
int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
|
||||
int32_t vnodeSync(int32_t vgId);
|
||||
int32_t vnodeClose(int32_t vgId);
|
||||
void vnodeCleanUp(SVnodeObj *pVnode);
|
||||
void vnodeDestroy(SVnodeObj *pVnode);
|
||||
|
@ -33,4 +34,4 @@ void vnodeDestroy(SVnodeObj *pVnode);
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -91,6 +91,23 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t vnodeSync(int32_t vgId) {
|
||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||
if (pVnode == NULL) {
|
||||
vDebug("vgId:%d, failed to sync, vnode not find", vgId);
|
||||
return TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||
}
|
||||
|
||||
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
||||
vInfo("vgId:%d, vnode will sync, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
||||
syncRecover(pVnode->sync);
|
||||
}
|
||||
|
||||
vnodeRelease(pVnode);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeDrop(int32_t vgId) {
|
||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||
if (pVnode == NULL) {
|
||||
|
|
Loading…
Reference in New Issue