diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index f2c8c916c8..69260d720c 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -29,6 +29,7 @@ extern "C" { typedef struct SMnode SMnode; typedef struct { + bool standby; bool deploy; int8_t replica; int8_t selfIndex; diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 94d41a7416..a9e2f5a71a 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -44,12 +44,9 @@ extern "C" { } #define SDB_GET_INT64(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt64, int64_t) - #define SDB_GET_INT32(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt32, int32_t) - #define SDB_GET_INT16(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt16, int16_t) - -#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t) +#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t) #define SDB_GET_RESERVE(pRaw, dataPos, valLen, pos) \ { \ @@ -66,11 +63,8 @@ extern "C" { } #define SDB_SET_INT64(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt64, int64_t) - #define SDB_SET_INT32(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt32, int32_t) - #define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t) - #define SDB_SET_INT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t) #define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \ @@ -356,6 +350,14 @@ typedef struct SSdb { SdbDecodeFp decodeFps[SDB_MAX]; } SSdb; +typedef struct SSdbIter { + TdFilePtr file; + int64_t readlen; +} SSdbIter; + +SSdbIter *sdbIterInit(SSdb *pSdb); +SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *iter, char **ppBuf, int32_t *len); + #ifdef __cplusplus } #endif diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 2bf678fa48..5ffcbb7a09 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -82,14 +82,29 @@ typedef struct SFsmCbMeta { SyncTerm currentTerm; } SFsmCbMeta; +typedef struct SReConfigCbMeta { + int32_t code; + SyncIndex index; + SyncTerm term; + SyncTerm currentTerm; +} SReConfigCbMeta; + typedef struct SSyncFSM { void* data; + void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); - void (*FpRestoreFinish)(struct SSyncFSM* pFsm); + + void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); - int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); + void* (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, void* iter, char** ppBuf, int32_t* len); + int32_t (*FpSnapshotApply)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len); + + void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta); + + // int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); + } SSyncFSM; // abstract definition of log store in raft diff --git a/include/util/tdef.h b/include/util/tdef.h index 808fcf0152..59b72d8785 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -428,11 +428,11 @@ enum { }; #define DEFAULT_HANDLE 0 -#define MNODE_HANDLE -1 -#define QNODE_HANDLE -2 -#define SNODE_HANDLE -3 -#define VNODE_HANDLE -4 -#define BNODE_HANDLE -5 +#define MNODE_HANDLE 1 +#define QNODE_HANDLE -1 +#define SNODE_HANDLE -2 +#define VNODE_HANDLE -3 +#define BNODE_HANDLE -4 #define TSDB_CONFIG_OPTION_LEN 16 #define TSDB_CONIIG_VALUE_LEN 48 diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c index 2aa1087770..478d6abd52 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c @@ -53,43 +53,45 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { *pDeployed = deployed->valueint; cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); - if (!mnodes || mnodes->type != cJSON_Array) { - dError("failed to read %s since nodes not found", file); - goto _OVER; - } - - pMgmt->replica = cJSON_GetArraySize(mnodes); - if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { - dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica); - goto _OVER; - } - - for (int32_t i = 0; i < pMgmt->replica; ++i) { - cJSON *node = cJSON_GetArrayItem(mnodes, i); - if (node == NULL) break; - - SReplica *pReplica = &pMgmt->replicas[i]; - - cJSON *id = cJSON_GetObjectItem(node, "id"); - if (!id || id->type != cJSON_Number) { - dError("failed to read %s since id not found", file); + if (mnodes != NULL) { + if (!mnodes || mnodes->type != cJSON_Array) { + dError("failed to read %s since nodes not found", file); goto _OVER; } - pReplica->id = id->valueint; - cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); - if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { - dError("failed to read %s since fqdn not found", file); + pMgmt->replica = cJSON_GetArraySize(mnodes); + if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { + dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica); goto _OVER; } - tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); - cJSON *port = cJSON_GetObjectItem(node, "port"); - if (!port || port->type != cJSON_Number) { - dError("failed to read %s since port not found", file); - goto _OVER; + for (int32_t i = 0; i < pMgmt->replica; ++i) { + cJSON *node = cJSON_GetArrayItem(mnodes, i); + if (node == NULL) break; + + SReplica *pReplica = &pMgmt->replicas[i]; + + cJSON *id = cJSON_GetObjectItem(node, "id"); + if (!id || id->type != cJSON_Number) { + dError("failed to read %s since id not found", file); + goto _OVER; + } + pReplica->id = id->valueint; + + cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); + if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { + dError("failed to read %s since fqdn not found", file); + goto _OVER; + } + tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); + + cJSON *port = cJSON_GetObjectItem(node, "port"); + if (!port || port->type != cJSON_Number) { + dError("failed to read %s since port not found", file); + goto _OVER; + } + pReplica->port = port->valueint; } - pReplica->port = port->valueint; } code = 0; @@ -122,21 +124,23 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) { char *content = taosMemoryCalloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica); - for (int32_t i = 0; i < replica; ++i) { - SReplica *pReplica = &pMgmt->replicas[i]; - if (pMsg != NULL) { - pReplica = &pMsg->replicas[i]; - } - len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); - len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); - len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); - if (i < replica - 1) { - len += snprintf(content + len, maxLen - len, " },{\n"); - } else { - len += snprintf(content + len, maxLen - len, " }],\n"); + if (replica > 0) { + len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); + for (int32_t i = 0; i < replica; ++i) { + SReplica *pReplica = &pMgmt->replicas[i]; + if (pMsg != NULL) { + pReplica = &pMsg->replicas[i]; + } + len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); + len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); + len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); + if (i < replica - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }],\n"); + } } } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 43113d05af..875be9768c 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -39,32 +39,44 @@ static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) { } static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInput, SMnodeOpt *pOption) { + pOption->standby = false; + pOption->deploy = true; pOption->msgCb = pMgmt->msgCb; pOption->replica = 1; pOption->selfIndex = 0; + SReplica *pReplica = &pOption->replicas[0]; pReplica->id = 1; pReplica->port = tsServerPort; tstrncpy(pReplica->fqdn, tsLocalFqdn, TSDB_FQDN_LEN); - pOption->deploy = true; - - pMgmt->selfIndex = pOption->selfIndex; - pMgmt->replica = pOption->replica; - memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); } static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { pOption->msgCb = pMgmt->msgCb; - pOption->selfIndex = pMgmt->selfIndex; - pOption->replica = pMgmt->replica; - memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); pOption->deploy = false; + pOption->standby = false; + + if (pMgmt->replica > 0) { + pOption->standby = true; + pOption->replica = 1; + pOption->selfIndex = 0; + SReplica *pReplica = &pOption->replicas[0]; + for (int32_t i = 0; i < pMgmt->replica; ++i) { + if (pMgmt->replicas[i].id != pMgmt->pData->dnodeId) continue; + pReplica->id = pMgmt->replicas[i].id; + pReplica->port = pMgmt->replicas[i].port; + memcpy(pReplica->fqdn, pMgmt->replicas[i].fqdn, TSDB_FQDN_LEN); + } + } } -static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { +static int32_t mmBuildOptionForAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { pOption->msgCb = pMgmt->msgCb; + pOption->standby = false; + pOption->deploy = false; pOption->replica = pCreate->replica; pOption->selfIndex = -1; + for (int32_t i = 0; i < pCreate->replica; ++i) { SReplica *pReplica = &pOption->replicas[i]; pReplica->id = pCreate->replicas[i].id; @@ -79,17 +91,13 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre dError("failed to build mnode options since %s", terrstr()); return -1; } - pOption->deploy = true; - pMgmt->selfIndex = pOption->selfIndex; - pMgmt->replica = pOption->replica; - memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); return 0; } int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) { SMnodeOpt option = {0}; - if (mmBuildOptionFromReq(pMgmt, &option, pMsg) != 0) { + if (mmBuildOptionForAlter(pMgmt, &option, pMsg) != 0) { return -1; } @@ -97,12 +105,6 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) { return -1; } - bool deployed = true; - if (mmWriteFile(pMgmt, pMsg, deployed) != 0) { - dError("failed to write mnode file since %s", terrstr()); - return -1; - } - return 0; } @@ -177,7 +179,8 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { } tmsgReportStartup("mnode-worker", "initialized"); - if (!deployed) { + if (!deployed || pMgmt->replica > 0) { + pMgmt->replica = 0; deployed = true; if (mmWriteFile(pMgmt, NULL, deployed) != 0) { dError("failed to write mnode file since %s", terrstr()); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 59d0c491a1..71cc2d2693 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -61,6 +61,11 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { dTrace("msg:%p, get from mnode-sync queue", pMsg); pMsg->info.node = pMgmt->pMnode; + + SMsgHead *pHead = pMsg->pCont; + pHead->contLen = ntohl(pHead->contLen); + pHead->vgId = ntohl(pHead->vgId); + int32_t code = mndProcessSyncMsg(pMsg); dTrace("msg:%p, is freed, code:0x%x", pMsg, code); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 5a1653b937..a258a369d3 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -76,11 +76,12 @@ typedef struct { typedef struct { SWal *pWal; - int32_t errCode; - bool restored; sem_t syncSem; int64_t sync; ESyncState state; + bool standby; + bool restored; + int32_t errCode; } SSyncMgmt; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 7f86eb8b32..0bb89a1d91 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -39,14 +39,16 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter); int32_t mndInitMnode(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_MNODE, - .keyType = SDB_KEY_INT32, - .deployFp = (SdbDeployFp)mndCreateDefaultMnode, - .encodeFp = (SdbEncodeFp)mndMnodeActionEncode, - .decodeFp = (SdbDecodeFp)mndMnodeActionDecode, - .insertFp = (SdbInsertFp)mndMnodeActionInsert, - .updateFp = (SdbUpdateFp)mndMnodeActionUpdate, - .deleteFp = (SdbDeleteFp)mndMnodeActionDelete}; + SSdbTable table = { + .sdbType = SDB_MNODE, + .keyType = SDB_KEY_INT32, + .deployFp = (SdbDeployFp)mndCreateDefaultMnode, + .encodeFp = (SdbEncodeFp)mndMnodeActionEncode, + .decodeFp = (SdbDecodeFp)mndMnodeActionDecode, + .insertFp = (SdbInsertFp)mndMnodeActionInsert, + .updateFp = (SdbUpdateFp)mndMnodeActionUpdate, + .deleteFp = (SdbDeleteFp)mndMnodeActionDelete, + }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index a4e6cfd5ca..3ac7c3713f 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,22 +17,26 @@ #include "mndSync.h" #include "mndTrans.h" -int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } +int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + SMsgHead *pHead = pMsg->pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + + return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); +} int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SMnode *pMnode = pFsm->data; - SSdb *pSdb = pMnode->pSdb; - SSyncMgmt *pMgmt = &pMnode->syncMgmt; - SSdbRaw *pRaw = pMsg->pCont; + SMnode *pMnode = pFsm->data; + SSdbRaw *pRaw = pMsg->pCont; mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state)); - sdbWriteWithoutFree(pSdb, pRaw); - sdbSetApplyIndex(pSdb, cbMeta.index); - sdbSetApplyTerm(pSdb, cbMeta.term); + sdbWriteWithoutFree(pMnode->pSdb, pRaw); + sdbSetApplyIndex(pMnode->pSdb, cbMeta.index); + sdbSetApplyTerm(pMnode->pSdb, cbMeta.term); if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { - tsem_post(&pMgmt->syncSem); + tsem_post(&pMnode->syncMgmt.syncSem); } } @@ -49,15 +53,41 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { pMnode->syncMgmt.restored = true; } +void *mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *snapshot, void *iter, char **ppBuf, int32_t *len) { + SMnode *pMnode = pFsm->data; + SSdbIter *pIter = iter; + + if (iter == NULL) { + pIter = sdbIterInit(pMnode->pSdb); + } + + return sdbIterRead(pMnode->pSdb, pIter, ppBuf, len); +} + +int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len) { + SMnode *pMnode = pFsm->data; + sdbWrite(pMnode->pSdb, (SSdbRaw*)pBuf); + return 0; +} + +void mndReConfig(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { + +} + SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pMnode; + pFsm->FpCommitCb = mndSyncCommitMsg; pFsm->FpPreCommitCb = NULL; pFsm->FpRollBackCb = NULL; + pFsm->FpGetSnapshot = mndSyncGetSnapshot; - pFsm->FpRestoreFinish = mndRestoreFinish; - pFsm->FpRestoreSnapshot = NULL; + pFsm->FpRestoreFinishCb = mndRestoreFinish; + pFsm->FpSnapshotRead = mndSnapshotRead; + pFsm->FpSnapshotApply = mndSnapshotApply; + pFsm->FpReConfigCb = mndReConfig; + return pFsm; } @@ -90,10 +120,13 @@ int32_t mndInitSync(SMnode *pMnode) { SSyncCfg *pCfg = &syncInfo.syncCfg; pCfg->replicaNum = pMnode->replica; pCfg->myIndex = pMnode->selfIndex; + mInfo("start to open mnode sync, replica:%d myindex:%d standby:%d", pCfg->replicaNum, pCfg->myIndex, + pMgmt->standby); for (int32_t i = 0; i < pMnode->replica; ++i) { SNodeInfo *pNode = &pCfg->nodeInfo[i]; tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); pNode->nodePort = pMnode->replicas[i].port; + mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort); } tsem_init(&pMgmt->syncSem, 0, 0); @@ -149,7 +182,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { void mndSyncStart(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; syncSetMsgCb(pMgmt->sync, &pMnode->msgCb); - syncStart(pMgmt->sync); + if (pMgmt->standby) { + syncStartStandBy(pMgmt->sync); + } else { + syncStart(pMgmt->sync); + } mDebug("sync:%" PRId64 " is started", pMgmt->sync); } @@ -161,3 +198,18 @@ bool mndIsMaster(SMnode *pMnode) { return (pMgmt->state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored); } + +int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { + SSyncCfg cfg = {.replicaNum = pOption->replica, .myIndex = pOption->selfIndex}; + mInfo("start to alter mnode sync, replica:%d myindex:%d standby:%d", cfg.replicaNum, cfg.myIndex, pOption->standby); + for (int32_t i = 0; i < pOption->replica; ++i) { + SNodeInfo *pNode = &cfg.nodeInfo[i]; + tstrncpy(pNode->nodeFqdn, pOption->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); + pNode->nodePort = pOption->replicas[i].port; + mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort); + } + + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + pMgmt->standby = pOption->standby; + return syncReconfig(pMgmt->sync, &cfg); +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 775c64ceab..572844d8d4 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -263,6 +263,7 @@ static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); pMnode->msgCb = pOption->msgCb; pMnode->selfId = pOption->replicas[pOption->selfIndex].id; + pMnode->syncMgmt.standby = pOption->standby; } SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { @@ -329,12 +330,6 @@ void mndClose(SMnode *pMnode) { } } -int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { - mDebug("start to alter mnode"); - mDebug("mnode is altered"); - return 0; -} - int32_t mndStart(SMnode *pMnode) { mndSyncStart(pMnode); return mndInitTimer(pMnode); diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index b000c208c8..eac7f4af5d 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -392,3 +392,66 @@ int32_t sdbDeploy(SSdb *pSdb) { return 0; } + +SSdbIter *sdbIterInit(SSdb *pSdb) { + char datafile[PATH_MAX] = {0}; + char tmpfile[PATH_MAX] = {0}; + snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); + snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); + + if (taosCopyFile(datafile, tmpfile) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr()); + return NULL; + } + + SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter)); + if (pIter == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pIter->file = taosOpenFile(tmpfile, TD_FILE_READ); + if (pIter->file == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to read snapshot file:%s since %s", tmpfile, terrstr()); + taosMemoryFree(pIter); + return NULL; + } + + mDebug("start to read snapshot file:%s, iter:%p", tmpfile, pIter); + return pIter; +} + +SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *pIter, char **ppBuf, int32_t *buflen) { + const int32_t maxlen = 100; + + char *pBuf = taosMemoryCalloc(1, maxlen); + if (pBuf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen); + if (readlen == 0) { + mTrace("read snapshot to the end, readlen:%" PRId64, pIter->readlen); + taosMemoryFree(pBuf); + taosCloseFile(&pIter->file); + taosMemoryFree(pIter); + pIter = NULL; + } else if (readlen < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to read snapshot since %s, readlen:%" PRId64, terrstr(), pIter->readlen); + taosMemoryFree(pBuf); + taosCloseFile(&pIter->file); + taosMemoryFree(pIter); + pIter = NULL; + } else { + pIter->readlen += readlen; + mTrace("read snapshot, readlen:%" PRId64, pIter->readlen); + *ppBuf = pBuf; + *buflen = readlen; + } + + return pIter; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 882ee912cd..d8f3110a16 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -147,6 +147,10 @@ SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; - pFsm->FpRestoreFinish = NULL; + pFsm->FpRestoreFinishCb = NULL; + pFsm->FpSnapshotRead = NULL; + pFsm->FpSnapshotApply = NULL; + pFsm->FpReConfigCb = NULL; + return pFsm; } \ No newline at end of file diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 9246041b81..69549d2a7e 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -148,8 +148,8 @@ typedef struct SSyncNode { SSyncRespMgr* pSyncRespMgr; // restore state - bool restoreFinish; - //sem_t restoreSem; + bool restoreFinish; + // sem_t restoreSem; SSnapshot* pSnapshot; } SSyncNode; diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index 5bc240e921..716d2f620c 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -42,6 +42,7 @@ typedef struct SVotesGranted { SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode); void voteGrantedDestroy(SVotesGranted *pVotesGranted); +void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode); bool voteGrantedMajority(SVotesGranted *pVotesGranted); void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); @@ -65,6 +66,7 @@ typedef struct SVotesRespond { SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode); void votesRespondDestory(SVotesRespond *pVotesRespond); +void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode); bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index fa735e71c0..0411628c5c 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -362,8 +362,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // restore finish if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) { if (ths->restoreFinish == false) { - if (ths->pFsm->FpRestoreFinish != NULL) { - ths->pFsm->FpRestoreFinish(ths->pFsm); + if (ths->pFsm->FpRestoreFinishCb != NULL) { + ths->pFsm->FpRestoreFinishCb(ths->pFsm); } ths->restoreFinish = true; sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 18c6f8930a..36713ceed5 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -139,8 +139,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { // restore finish if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) { if (pSyncNode->restoreFinish == false) { - if (pSyncNode->pFsm->FpRestoreFinish != NULL) { - pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm); + if (pSyncNode->pFsm->FpRestoreFinishCb != NULL) { + pSyncNode->pFsm->FpRestoreFinishCb(pSyncNode->pFsm); } pSyncNode->restoreFinish = true; sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a69a94831d..4aca68fa64 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -509,7 +509,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); } - //tsem_init(&(pSyncNode->restoreSem), 0, 0); + // tsem_init(&(pSyncNode->restoreSem), 0, 0); // start in syncNodeStart // start raft @@ -606,7 +606,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { taosMemoryFree(pSyncNode->pSnapshot); } - //tsem_destroy(&pSyncNode->restoreSem); + // tsem_destroy(&pSyncNode->restoreSem); // free memory in syncFreeNode // taosMemoryFree(pSyncNode); @@ -920,6 +920,17 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { } void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { + bool hit = false; + for (int i = 0; i < newConfig->replicaNum; ++i) { + if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newConfig->nodeInfo)[i].nodeFqdn) == 0 && + pSyncNode->myNodeInfo.nodePort == (newConfig->nodeInfo)[i].nodePort) { + newConfig->myIndex = i; + hit = true; + break; + } + } + ASSERT(hit == true); + pSyncNode->pRaftCfg->cfg = *newConfig; int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg); ASSERT(ret == 0); @@ -949,6 +960,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode); syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode); + voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode); + votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode); syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode); } diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 733dfd05b6..1c1f0809bd 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -45,6 +45,17 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted) { } } +void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode) { + pVotesGranted->replicas = &(pSyncNode->replicasId); + pVotesGranted->replicaNum = pSyncNode->replicaNum; + voteGrantedClearVotes(pVotesGranted); + + pVotesGranted->term = 0; + pVotesGranted->quorum = pSyncNode->quorum; + pVotesGranted->toLeader = false; + pVotesGranted->pSyncNode = pSyncNode; +} + bool voteGrantedMajority(SVotesGranted *pVotesGranted) { bool ret = pVotesGranted->votes >= pVotesGranted->quorum; return ret; @@ -168,6 +179,13 @@ void votesRespondDestory(SVotesRespond *pVotesRespond) { } } +void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode) { + pVotesRespond->replicas = &(pSyncNode->replicasId); + pVotesRespond->replicaNum = pSyncNode->replicaNum; + pVotesRespond->term = 0; + pVotesRespond->pSyncNode = pSyncNode; +} + bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) { bool ret = false; for (int i = 0; i < pVotesRespond->replicaNum; ++i) { diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 0850ef6343..f52fef0019 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -73,9 +73,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } -void FpRestoreFinishCb(struct SSyncFSM* pFsm) { - sTrace("==callback== ==FpRestoreFinishCb=="); -} +void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } SSyncFSM* createFsm() { SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); @@ -83,7 +81,7 @@ SSyncFSM* createFsm() { pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; pFsm->FpGetSnapshot = GetSnapshotCb; - pFsm->FpRestoreFinish = FpRestoreFinishCb; + pFsm->FpRestoreFinishCb = RestoreFinishCb; return pFsm; } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 6ecc816442..7aaf1e1eca 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -55,7 +55,8 @@ ./test.sh -f tsim/bnode/basic1.sim # ---- mnode -./test.sh -f tsim/mnode/basic1.sim +#./test.sh -f tsim/mnode/basic1.sim +./test.sh -f tsim/mnode/basic2.sim # ---- show ./test.sh -f tsim/show/basic.sim @@ -106,7 +107,7 @@ ./test.sh -f tsim/tmq/basic3.sim -m ./test.sh -f tsim/stable/vnode3.sim -m ./test.sh -f tsim/qnode/basic1.sim -m -./test.sh -f tsim/mnode/basic1.sim -m +#./test.sh -f tsim/mnode/basic1.sim -m # --- sma ./test.sh -f tsim/sma/tsmaCreateInsertData.sim diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index da295f640e..5edc0a4d3e 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -136,7 +136,7 @@ echo "qDebugFlag 143" >> $TAOS_CFG echo "rpcDebugFlag 143" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG echo "uDebugFlag 143" >> $TAOS_CFG -echo "sDebugFlag 135" >> $TAOS_CFG +echo "sDebugFlag 143" >> $TAOS_CFG echo "wDebugFlag 143" >> $TAOS_CFG echo "numOfLogLines 20000000" >> $TAOS_CFG echo "statusInterval 1" >> $TAOS_CFG diff --git a/tests/script/tsim/mnode/basic2.sim b/tests/script/tsim/mnode/basic2.sim new file mode 100644 index 0000000000..53dea6821e --- /dev/null +++ b/tests/script/tsim/mnode/basic2.sim @@ -0,0 +1,84 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +sql connect + +print =============== show dnodes +sql show dnodes; +if $rows != 1 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +sql show mnodes; +if $rows != 1 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +if $data02 != LEADER then + return -1 +endi + +print =============== create dnodes +sql create dnode $hostname port 7200 +sleep 2000 + +sql show dnodes; +if $rows != 2 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +if $data10 != 2 then + return -1 +endi + +print $data02 +if $data02 != 0 then + return -1 +endi + +if $data12 != 0 then + return -1 +endi + +if $data04 != ready then + return -1 +endi + +if $data14 != ready then + return -1 +endi + +sql show mnodes; +if $rows != 1 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +if $data02 != LEADER then + return -1 +endi + +print =============== create mnode 2 +sql create mnode on dnode 2 +sql show mnodes +if $rows != 2 then + return -1 +endi +