diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 7cf092b144..eca8740d28 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -39,7 +39,7 @@ typedef enum { QUEUE_MAX, } EQueueType; -typedef void (*UpdateDnodeInfoFp)(void* pData, int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port); +typedef bool (*UpdateDnodeInfoFp)(void* pData, int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port); typedef int32_t (*PutToQueueFp)(void* pMgmt, EQueueType qtype, SRpcMsg* pMsg); typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg); @@ -70,7 +70,8 @@ void tmsgSendRsp(SRpcMsg* pMsg); void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg); void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type); void tmsgReportStartup(const char* name, const char* desc); -void tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port); +bool tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port); +void tmsgUpdateDnodeEpSet(SEpSet* epset); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index b6cce249ea..3dd8a19d92 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -79,6 +79,13 @@ static void dmClearVars(SDnode *pDnode) { SDnodeData *pData = &pDnode->data; taosThreadRwlockWrlock(&pData->lock); + if (pData->oldDnodeEps != NULL) { + if (dmWriteEps(pData) == 0) { + dmRemoveDnodePairs(pData); + } + taosArrayDestroy(pData->oldDnodeEps); + pData->oldDnodeEps = NULL; + } if (pData->dnodeEps != NULL) { taosArrayDestroy(pData->dnodeEps); pData->dnodeEps = NULL; diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 8ecce20f53..c2f403dfbb 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -100,6 +100,7 @@ typedef struct { bool stopped; SEpSet mnodeEps; SArray *dnodeEps; + SArray *oldDnodeEps; SHashObj *dnodeHash; TdThreadRwlock lock; SMsgCb msgCb; @@ -167,7 +168,8 @@ void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps); void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet); void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet); void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet); -void dmUpdateDnodeInfo(void *pData, int32_t *dnodeId, int64_t *clusterId, char *fqdn, uint16_t *port); +bool dmUpdateDnodeInfo(void *pData, int32_t *dnodeId, int64_t *clusterId, char *fqdn, uint16_t *port); +void dmRemoveDnodePairs(SDnodeData *pData); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/node_util/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c index 9640c0b5c9..2b06a24a54 100644 --- a/source/dnode/mgmt/node_util/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -18,9 +18,18 @@ #include "tjson.h" #include "tmisce.h" -static void dmPrintEps(SDnodeData *pData); -static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep); -static void dmResetEps(SDnodeData *pData, SArray *dnodeEps); +typedef struct { + int32_t id; + uint16_t oldPort; + uint16_t newPort; + char oldFqdn[TSDB_FQDN_LEN]; + char newFqdn[TSDB_FQDN_LEN]; +} SDnodeEpPair; + +static void dmPrintEps(SDnodeData *pData); +static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep); +static void dmResetEps(SDnodeData *pData, SArray *dnodeEps); +static int32_t dmReadDnodePairs(SDnodeData *pData); static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { taosThreadRwlockRdlock(&pData->lock); @@ -137,7 +146,7 @@ int32_t dmReadEps(SDnodeData *pData) { } code = 0; - dInfo("succceed to read mnode file %s", file); + dInfo("succceed to read dnode file %s", file); _OVER: if (content != NULL) taosMemoryFree(content); @@ -146,6 +155,7 @@ _OVER: if (code != 0) { dError("failed to read dnode file:%s since %s", file, terrstr()); + return code; } if (taosArrayGetSize(pData->dnodeEps) == 0) { @@ -155,10 +165,14 @@ _OVER: taosArrayPush(pData->dnodeEps, &dnodeEp); } + if (dmReadDnodePairs(pData) != 0) { + return -1; + } + dDebug("reset dnode list on startup"); dmResetEps(pData, pData->dnodeEps); - if (dmIsEpChanged(pData, pData->dnodeId, tsLocalEp)) { + if (pData->dnodeEps == NULL && dmIsEpChanged(pData, pData->dnodeId, tsLocalEp)) { dError("localEp %s different with %s and need reconfigured", tsLocalEp, file); return -1; } @@ -222,7 +236,8 @@ int32_t dmWriteEps(SDnodeData *pData) { code = 0; pData->updateTime = taosGetTimestampMs(); - dInfo("succeed to write dnode file:%s, dnodeVer:%" PRId64, realfile, pData->dnodeVer); + dInfo("succeed to write dnode file:%s, num:%d ver:%" PRId64, realfile, (int32_t)taosArrayGetSize(pData->dnodeEps), + pData->dnodeVer); _OVER: if (pJson != NULL) tjsonDelete(pJson); @@ -332,40 +347,204 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { } } -void dmUpdateDnodeInfo(void *data, int32_t *dnodeId, int64_t *clusterId, char *fqdn, uint16_t *port) { +bool dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn, uint16_t *port) { + bool updated = false; SDnodeData *pData = data; - int32_t ret = -1; + int32_t dnodeId = -1; + if (did != NULL) dnodeId = *did; + taosThreadRwlockRdlock(&pData->lock); - if (*dnodeId <= 0) { - for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->dnodeEps); ++i) { - SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i); - if (strcmp(pDnodeEp->ep.fqdn, fqdn) == 0 && pDnodeEp->ep.port == *port) { - dInfo("dnode:%s:%u, update dnodeId from %d to %d", fqdn, *port, *dnodeId, pDnodeEp->id); - *dnodeId = pDnodeEp->id; - if (clusterId != NULL) *clusterId = pData->clusterId; - ret = 0; + + if (pData->oldDnodeEps != NULL) { + int32_t size = (int32_t)taosArrayGetSize(pData->oldDnodeEps); + for (int32_t i = 0; i < size; ++i) { + SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i); + if (strcmp(pair->oldFqdn, fqdn) == 0 && pair->oldPort == *port) { + dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pair->newFqdn, pair->newPort); + tstrncpy(fqdn, pair->newFqdn, TSDB_FQDN_LEN); + *port = pair->newPort; + updated = true; } } - if (ret != 0) { - dInfo("dnode:%s:%u, failed to update dnodeId:%d", fqdn, *port, *dnodeId); - } - } else { - SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, dnodeId, sizeof(int32_t)); - if (pDnodeEp) { - if (strcmp(pDnodeEp->ep.fqdn, fqdn) != 0) { - dInfo("dnode:%d, update port from %s to %s", *dnodeId, fqdn, pDnodeEp->ep.fqdn); - tstrncpy(fqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN); - } - if (pDnodeEp->ep.port != *port) { - dInfo("dnode:%d, update port from %u to %u", *dnodeId, *port, pDnodeEp->ep.port); - *port = pDnodeEp->ep.port; - } - if (clusterId != NULL) *clusterId = pData->clusterId; - ret = 0; - } else { - dInfo("dnode:%d, failed to update dnode info", *dnodeId); - } } + + if (did != NULL && dnodeId <= 0) { + int32_t size = (int32_t)taosArrayGetSize(pData->dnodeEps); + for (int32_t i = 0; i < size; ++i) { + SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i); + if (strcmp(pDnodeEp->ep.fqdn, fqdn) == 0 && pDnodeEp->ep.port == *port) { + dInfo("dnode:%s:%u, update dnodeId to dnode:%d", fqdn, *port, pDnodeEp->id); + *did = pDnodeEp->id; + if (clusterId != NULL) *clusterId = pData->clusterId; + } + } + } + + if (dnodeId > 0) { + SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t)); + if (pDnodeEp) { + if (strcmp(pDnodeEp->ep.fqdn, fqdn) != 0 || pDnodeEp->ep.port != *port) { + dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pDnodeEp->ep.fqdn, pDnodeEp->ep.port); + tstrncpy(fqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN); + *port = pDnodeEp->ep.port; + updated = true; + } + if (clusterId != NULL) *clusterId = pData->clusterId; + } + } + taosThreadRwlockUnlock(&pData->lock); - // return ret; -} \ No newline at end of file + return updated; +} + +static int32_t dmDecodeEpPairs(SJson *pJson, SDnodeData *pData) { + int32_t code = 0; + + SJson *dnodes = tjsonGetObjectItem(pJson, "dnodes"); + if (dnodes == NULL) return 0; + int32_t numOfDnodes = tjsonGetArraySize(dnodes); + + for (int32_t i = 0; i < numOfDnodes; ++i) { + SJson *dnode = tjsonGetArrayItem(dnodes, i); + if (dnode == NULL) return -1; + + SDnodeEpPair pair = {0}; + tjsonGetInt32ValueFromDouble(dnode, "id", pair.id, code); + if (code < 0) return -1; + code = tjsonGetStringValue(dnode, "fqdn", pair.oldFqdn); + if (code < 0) return -1; + tjsonGetUInt16ValueFromDouble(dnode, "port", pair.oldPort, code); + if (code < 0) return -1; + code = tjsonGetStringValue(dnode, "new_fqdn", pair.newFqdn); + if (code < 0) return -1; + tjsonGetUInt16ValueFromDouble(dnode, "new_port", pair.newPort, code); + if (code < 0) return -1; + + if (taosArrayPush(pData->oldDnodeEps, &pair) == NULL) return -1; + } + + return code; +} + +void dmRemoveDnodePairs(SDnodeData *pData) { + char file[PATH_MAX] = {0}; + char bak[PATH_MAX] = {0}; + snprintf(file, sizeof(file), "%s%sdnode%sep.json", tsDataDir, TD_DIRSEP, TD_DIRSEP); + snprintf(bak, sizeof(bak), "%s%sdnode%sep.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP); + dInfo("dnode file:%s is rename to bak file", file); + (void)taosRenameFile(file, bak); +} + +static int32_t dmReadDnodePairs(SDnodeData *pData) { + int32_t code = -1; + TdFilePtr pFile = NULL; + char *content = NULL; + SJson *pJson = NULL; + char file[PATH_MAX] = {0}; + snprintf(file, sizeof(file), "%s%sdnode%sep.json", tsDataDir, TD_DIRSEP, TD_DIRSEP); + + if (taosStatFile(file, NULL, NULL) < 0) { + dDebug("dnode file:%s not exist", file); + code = 0; + goto _OVER; + } + + pFile = taosOpenFile(file, TD_FILE_READ); + if (pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to open dnode file:%s since %s", file, terrstr()); + goto _OVER; + } + + int64_t size = 0; + if (taosFStatFile(pFile, &size, NULL) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to fstat dnode file:%s since %s", file, terrstr()); + goto _OVER; + } + + content = taosMemoryMalloc(size + 1); + if (content == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + if (taosReadFile(pFile, content, size) != size) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to read dnode file:%s since %s", file, terrstr()); + goto _OVER; + } + + content[size] = '\0'; + + pJson = tjsonParse(content); + if (pJson == NULL) { + terrno = TSDB_CODE_INVALID_JSON_FORMAT; + goto _OVER; + } + + pData->oldDnodeEps = taosArrayInit(1, sizeof(SDnodeEpPair)); + if (pData->oldDnodeEps == NULL) { + dError("failed to calloc dnodeEp array since %s", strerror(errno)); + goto _OVER; + } + + if (dmDecodeEpPairs(pJson, pData) < 0) { + taosArrayDestroy(pData->oldDnodeEps); + pData->oldDnodeEps = NULL; + terrno = TSDB_CODE_INVALID_JSON_FORMAT; + goto _OVER; + } + + code = 0; + dInfo("succceed to read dnode file %s", file); + +_OVER: + if (content != NULL) taosMemoryFree(content); + if (pJson != NULL) cJSON_Delete(pJson); + if (pFile != NULL) taosCloseFile(&pFile); + + if (code != 0) { + dError("failed to read dnode file:%s since %s", file, terrstr()); + } + + for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->oldDnodeEps); ++i) { + SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i); + for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) { + SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j); + if (pDnodeEp->id != pair->id && + (strcmp(pDnodeEp->ep.fqdn, pair->newFqdn) == 0 && pDnodeEp->ep.port == pair->newPort)) { + dError("dnode:%d, can't update ep:%s:%u to %s:%u since already exists as dnode:%d", pair->id, pair->oldFqdn, + pair->oldPort, pair->newFqdn, pair->newPort, pDnodeEp->id); + tstrncpy(pDnodeEp->ep.fqdn, pair->newFqdn, TSDB_FQDN_LEN); + pDnodeEp->ep.port = pair->newPort; + } + +#if 0 + if (pDnodeEp->id == pair->id && + (strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort)) { + dError("dnode:%d, can't update ep:%s:%u to %s:%u since endpoint not matched", pair->id, pair->oldFqdn, + pair->oldPort, pair->newFqdn, pair->newPort, pDnodeEp->id); + tstrncpy(pDnodeEp->ep.fqdn, pair->newFqdn, TSDB_FQDN_LEN); + pDnodeEp->ep.port = pair->newPort; + } +#endif + } + } + + for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->oldDnodeEps); ++i) { + SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i); + for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) { + SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j); + if (strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort) { + dInfo("dnode:%d, will update ep:%s:%u to %s:%u", pDnodeEp->id, pDnodeEp->ep.fqdn, pDnodeEp->ep.port, + pair->newFqdn, pair->newPort); + tstrncpy(pDnodeEp->ep.fqdn, pair->newFqdn, TSDB_FQDN_LEN); + pDnodeEp->ep.port = pair->newPort; + } + } + } + + pData->dnodeVer = 0; + return code; +} diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index b2ca1e7ad8..eeb4249217 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -745,6 +745,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) { goto CM_DECODE_OVER; } + tmsgUpdateDnodeEpSet(&pConsumer->ep); terrno = TSDB_CODE_SUCCESS; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index ddb54a95ea..97490beb3c 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -180,6 +180,9 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) { SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER) terrno = 0; + if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) { + mInfo("dnode:%d, endpoint changed", pDnode->id); + } _OVER: if (terrno != 0) { @@ -188,7 +191,7 @@ _OVER: return NULL; } - mTrace("dnode:%d, decode from raw:%p, row:%p", pDnode->id, pRaw, pDnode); + mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port); return pRow; } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 7dcd287fb7..add32fd335 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -747,7 +747,7 @@ static void mndReloadSyncConfig(SMnode *pMnode) { pNode->clusterId = mndGetClusterId(pMnode); pNode->nodePort = pObj->pDnode->port; tstrncpy(pNode->nodeFqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN); - tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); + (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); mInfo("vgId:1, ep:%s:%u dnode:%d", pNode->nodeFqdn, pNode->nodePort, pNode->nodeId); if (pObj->pDnode->id == pMnode->selfDnodeId) { cfg.myIndex = cfg.replicaNum; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index b8ef185199..153bb8bd04 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -760,6 +760,27 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { goto SUB_DECODE_OVER; } + // update epset saved in mnode + if (pSub->unassignedVgs != NULL) { + int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs); + for (int32_t i = 0; i < size; ++i) { + SMqVgEp *pMqVgEp = taosArrayGet(pSub->unassignedVgs, i); + tmsgUpdateDnodeEpSet(&pMqVgEp->epSet); + } + } + if (pSub->consumerHash != NULL) { + void *pIter = taosHashIterate(pSub->consumerHash, NULL); + while (pIter) { + SMqConsumerEp *pConsumerEp = pIter; + int32_t size = (int32_t)taosArrayGetSize(pConsumerEp->vgs); + for (int32_t i = 0; i < size; ++i) { + SMqVgEp *pMqVgEp = taosArrayGet(pConsumerEp->vgs, i); + tmsgUpdateDnodeEpSet(&pMqVgEp->epSet); + } + pIter = taosHashIterate(pSub->consumerHash, pIter); + } + } + terrno = TSDB_CODE_SUCCESS; SUB_DECODE_OVER: diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 6b675586e4..7dc0912403 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -303,7 +303,7 @@ int32_t mndInitSync(SMnode *pMnode) { pNode->nodeId = pMgmt->replicas[i].id; pNode->nodePort = pMgmt->replicas[i].port; tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); - tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); + (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); mInfo("vgId:1, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->clusterId); } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 718fc5c73f..dfcd55bcba 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -329,6 +329,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { action.pRaw = NULL; } else if (action.actionType == TRANS_ACTION_MSG) { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); + tmsgUpdateDnodeEpSet(&action.epSet); SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER) SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER) diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 743ae17f2b..61cb75b1da 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -86,7 +86,7 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { pNode->nodeId = pReq->replicas[i].id; pNode->nodePort = pReq->replicas[i].port; tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); - tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); + (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId); } @@ -134,6 +134,21 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { return NULL; } + // save vnode info on dnode ep changed + bool updated = false; + SSyncCfg *pCfg = &info.config.syncCfg; + for (int32_t i = 0; i < pCfg->replicaNum; ++i) { + SNodeInfo *pNode = &pCfg->nodeInfo[i]; + if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) { + updated = true; + } + } + if (updated) { + vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId); + (void)vnodeSaveInfo(dir, &info); + (void)vnodeCommitInfo(dir, &info); + } + // create handle pVnode = taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1); if (pVnode == NULL) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 02f9795cad..ac377911eb 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -895,14 +895,25 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { // init by SSyncInfo pSyncNode->vgId = pSyncInfo->vgId; SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg; + bool updated = false; sInfo("vgId:%d, start to open sync node, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex); for (int32_t i = 0; i < pCfg->replicaNum; ++i) { SNodeInfo* pNode = &pCfg->nodeInfo[i]; - tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); + if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) { + updated = true; + } sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->clusterId); } + if (updated) { + sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId); + if (syncWriteCfgFile(pSyncNode) != 0) { + sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId); + goto _error; + } + } + pSyncNode->pWal = pSyncInfo->pWal; pSyncNode->msgcb = pSyncInfo->msgcb; pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg; diff --git a/source/libs/transport/src/tmsgcb.c b/source/libs/transport/src/tmsgcb.c index eea0d87684..9b8f1dfd07 100644 --- a/source/libs/transport/src/tmsgcb.c +++ b/source/libs/transport/src/tmsgcb.c @@ -59,6 +59,12 @@ void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) { (*defaultMsgCb.re void tmsgReportStartup(const char* name, const char* desc) { (*defaultMsgCb.reportStartupFp)(name, desc); } -void tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port) { - (*defaultMsgCb.updateDnodeInfoFp)(defaultMsgCb.data, dnodeId, clusterId, fqdn, port); +bool tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port) { + return (*defaultMsgCb.updateDnodeInfoFp)(defaultMsgCb.data, dnodeId, clusterId, fqdn, port); } + +void tmsgUpdateDnodeEpSet(SEpSet* epset) { + for (int32_t i = 0; i < epset->numOfEps; ++i) { + tmsgUpdateDnodeInfo(NULL, NULL, epset->eps[i].fqdn, &epset->eps[i].port); + } +} \ No newline at end of file