refactor: mnode sync
This commit is contained in:
parent
01521a2541
commit
a3f1273640
|
@ -32,9 +32,7 @@ typedef struct {
|
|||
int32_t dnodeId;
|
||||
bool standby;
|
||||
bool deploy;
|
||||
int8_t replica;
|
||||
int8_t selfIndex;
|
||||
SReplica replicas[TSDB_MAX_REPLICA];
|
||||
SReplica replica;
|
||||
SMsgCb msgCb;
|
||||
} SMnodeOpt;
|
||||
|
||||
|
|
|
@ -86,12 +86,14 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed) {
|
|||
}
|
||||
|
||||
code = 0;
|
||||
dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed);
|
||||
|
||||
_OVER:
|
||||
if (content != NULL) taosMemoryFree(content);
|
||||
if (root != NULL) cJSON_Delete(root);
|
||||
if (pFile != NULL) taosCloseFile(&pFile);
|
||||
if (code == 0) {
|
||||
dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed);
|
||||
}
|
||||
|
||||
terrno = code;
|
||||
return code;
|
||||
|
|
|
@ -126,7 +126,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
|||
|
||||
SArray *mmGetMsgHandles() {
|
||||
int32_t code = -1;
|
||||
SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle));
|
||||
SArray *pArray = taosArrayInit(128, sizeof(SMgmtHandle));
|
||||
if (pArray == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -43,13 +43,9 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu
|
|||
pOption->deploy = true;
|
||||
pOption->msgCb = pMgmt->msgCb;
|
||||
pOption->dnodeId = pMgmt->pData->dnodeId;
|
||||
pOption->replica = 1;
|
||||
pOption->selfIndex = 0;
|
||||
|
||||
SReplica *pReplica = &pOption->replicas[0];
|
||||
pReplica->id = 1;
|
||||
pReplica->port = tsServerPort;
|
||||
tstrncpy(pReplica->fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
|
||||
pOption->replica.id = 1;
|
||||
pOption->replica.port = tsServerPort;
|
||||
tstrncpy(pOption->replica.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
|
||||
}
|
||||
|
||||
static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, const SReplica *pReplica, SMnodeOpt *pOption) {
|
||||
|
@ -57,12 +53,9 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, const SReplica *pReplica, SM
|
|||
pOption->deploy = false;
|
||||
pOption->msgCb = pMgmt->msgCb;
|
||||
pOption->dnodeId = pMgmt->pData->dnodeId;
|
||||
|
||||
if (pReplica->id > 0) {
|
||||
pOption->standby = true;
|
||||
pOption->replica = 1;
|
||||
pOption->selfIndex = 0;
|
||||
memcpy(&pOption->replicas[0], pReplica, sizeof(SReplica));
|
||||
pOption->replica = *pReplica;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -76,11 +76,12 @@ typedef struct {
|
|||
} STelemMgmt;
|
||||
|
||||
typedef struct {
|
||||
sem_t syncSem;
|
||||
int64_t sync;
|
||||
bool standby;
|
||||
int32_t errCode;
|
||||
int32_t transId;
|
||||
sem_t syncSem;
|
||||
int64_t sync;
|
||||
bool standby;
|
||||
SReplica replica;
|
||||
int32_t errCode;
|
||||
int32_t transId;
|
||||
} SSyncMgmt;
|
||||
|
||||
typedef struct {
|
||||
|
@ -98,9 +99,6 @@ typedef struct SMnode {
|
|||
bool stopped;
|
||||
bool restored;
|
||||
bool deploy;
|
||||
int8_t replica;
|
||||
int8_t selfIndex;
|
||||
SReplica replicas[TSDB_MAX_REPLICA];
|
||||
char *path;
|
||||
int64_t checkTime;
|
||||
SSdb *pSdb;
|
||||
|
|
|
@ -95,8 +95,8 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
|
|||
dnodeObj.id = 1;
|
||||
dnodeObj.createdTime = taosGetTimestampMs();
|
||||
dnodeObj.updateTime = dnodeObj.createdTime;
|
||||
dnodeObj.port = pMnode->replicas[0].port;
|
||||
memcpy(&dnodeObj.fqdn, pMnode->replicas[0].fqdn, TSDB_FQDN_LEN);
|
||||
dnodeObj.port = tsServerPort;
|
||||
memcpy(&dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
|
||||
snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
|
||||
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL);
|
||||
|
|
|
@ -289,11 +289,9 @@ static int32_t mndExecSteps(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
|
||||
pMnode->replica = pOption->replica;
|
||||
pMnode->selfIndex = pOption->selfIndex;
|
||||
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
|
||||
pMnode->msgCb = pOption->msgCb;
|
||||
pMnode->selfDnodeId = pOption->dnodeId;
|
||||
pMnode->syncMgmt.replica = pOption->replica;
|
||||
pMnode->syncMgmt.standby = pOption->standby;
|
||||
}
|
||||
|
||||
|
|
|
@ -188,15 +188,15 @@ int32_t mndInitSync(SMnode *pMnode) {
|
|||
syncInfo.isStandBy = pMgmt->standby;
|
||||
syncInfo.snapshotEnable = true;
|
||||
|
||||
SSyncCfg *pCfg = &syncInfo.syncCfg;
|
||||
pCfg->replicaNum = pMnode->replica;
|
||||
pCfg->myIndex = pMnode->selfIndex;
|
||||
mInfo("start to open mnode sync, replica:%d myindex:%d standby:%d", pCfg->replicaNum, pCfg->myIndex, pMgmt->standby);
|
||||
for (int32_t i = 0; i < pMnode->replica; ++i) {
|
||||
SNodeInfo *pNode = &pCfg->nodeInfo[i];
|
||||
tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
|
||||
pNode->nodePort = pMnode->replicas[i].port;
|
||||
mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
|
||||
mInfo("start to open mnode sync, standby:%d", pMgmt->standby);
|
||||
if (pMgmt->standby || pMgmt->replica.id > 0) {
|
||||
SSyncCfg *pCfg = &syncInfo.syncCfg;
|
||||
pCfg->replicaNum = 1;
|
||||
pCfg->myIndex = 0;
|
||||
SNodeInfo *pNode = &pCfg->nodeInfo[0];
|
||||
tstrncpy(pNode->nodeFqdn, pMgmt->replica.fqdn, sizeof(pNode->nodeFqdn));
|
||||
pNode->nodePort = pMgmt->replica.port;
|
||||
mInfo("fqdn:%s port:%u", pNode->nodeFqdn, pNode->nodePort);
|
||||
}
|
||||
|
||||
tsem_init(&pMgmt->syncSem, 0, 0);
|
||||
|
|
Loading…
Reference in New Issue