enh: update epset on dnode info changed
This commit is contained in:
parent
307f457103
commit
b09b67acd2
|
@ -83,6 +83,10 @@ static void dmClearVars(SDnode *pDnode) {
|
|||
taosArrayDestroy(pData->dnodeEps);
|
||||
pData->dnodeEps = NULL;
|
||||
}
|
||||
if (pData->oldDnodeEps != NULL) {
|
||||
taosArrayDestroy(pData->oldDnodeEps);
|
||||
pData->oldDnodeEps = NULL;
|
||||
}
|
||||
if (pData->dnodeHash != NULL) {
|
||||
taosHashCleanup(pData->dnodeHash);
|
||||
pData->dnodeHash = NULL;
|
||||
|
|
|
@ -18,9 +18,18 @@
|
|||
#include "tjson.h"
|
||||
#include "tmisce.h"
|
||||
|
||||
static void dmPrintEps(SDnodeData *pData);
|
||||
static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep);
|
||||
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps);
|
||||
typedef struct {
|
||||
int32_t id;
|
||||
uint16_t oldPort;
|
||||
uint16_t newPort;
|
||||
char oldFqdn[TSDB_FQDN_LEN];
|
||||
char newFqdn[TSDB_FQDN_LEN];
|
||||
} SDnodeEpPair;
|
||||
|
||||
static void dmPrintEps(SDnodeData *pData);
|
||||
static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep);
|
||||
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps);
|
||||
static int32_t dmReadDnodePairs(SDnodeData *pData);
|
||||
|
||||
static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
|
||||
taosThreadRwlockRdlock(&pData->lock);
|
||||
|
@ -342,11 +351,11 @@ void dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn,
|
|||
if (pData->oldDnodeEps != NULL) {
|
||||
int32_t size = (int32_t)taosArrayGetSize(pData->oldDnodeEps);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SDnodeEp *pDnodeEp = taosArrayGet(pData->oldDnodeEps, i);
|
||||
if (strcmp(pDnodeEp->ep.fqdn, fqdn) == 0 && pDnodeEp->ep.port == *port) {
|
||||
dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
|
||||
tstrncpy(fqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
|
||||
*port = pDnodeEp->ep.port;
|
||||
SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
|
||||
if (strcmp(pair->oldFqdn, fqdn) == 0 && pair->oldPort == *port) {
|
||||
dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pair->newFqdn, pair->newPort);
|
||||
tstrncpy(fqdn, pair->newFqdn, TSDB_FQDN_LEN);
|
||||
*port = pair->newPort;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -376,4 +385,146 @@ void dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn,
|
|||
}
|
||||
|
||||
taosThreadRwlockUnlock(&pData->lock);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t dmDecodeEpPairs(SJson *pJson, SDnodeData *pData) {
|
||||
int32_t code = 0;
|
||||
|
||||
SJson *dnodes = tjsonGetObjectItem(pJson, "dnodes");
|
||||
if (dnodes == NULL) return 0;
|
||||
int32_t numOfDnodes = tjsonGetArraySize(dnodes);
|
||||
|
||||
for (int32_t i = 0; i < numOfDnodes; ++i) {
|
||||
SJson *dnode = tjsonGetArrayItem(dnodes, i);
|
||||
if (dnode == NULL) return -1;
|
||||
|
||||
SDnodeEpPair pair = {0};
|
||||
tjsonGetInt32ValueFromDouble(dnode, "id", pair.id, code);
|
||||
if (code < 0) return -1;
|
||||
code = tjsonGetStringValue(dnode, "fqdn", pair.oldFqdn);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetUInt16ValueFromDouble(dnode, "port", pair.oldPort, code);
|
||||
if (code < 0) return -1;
|
||||
code = tjsonGetStringValue(dnode, "new_fqdn", pair.newFqdn);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetUInt16ValueFromDouble(dnode, "new_port", pair.newPort, code);
|
||||
if (code < 0) return -1;
|
||||
|
||||
if (taosArrayPush(pData->oldDnodeEps, &pair) == NULL) return -1;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t dmReadDnodePairs(SDnodeData *pData) {
|
||||
int32_t code = -1;
|
||||
TdFilePtr pFile = NULL;
|
||||
char *content = NULL;
|
||||
SJson *pJson = NULL;
|
||||
char file[PATH_MAX] = {0};
|
||||
snprintf(file, sizeof(file), "%s%sdnode%sep.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
|
||||
|
||||
pData->oldDnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
|
||||
if (pData->oldDnodeEps == NULL) {
|
||||
dError("failed to calloc dnodeEp array since %s", strerror(errno));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (taosStatFile(file, NULL, NULL) < 0) {
|
||||
dDebug("dnode file:%s not exist", file);
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pFile = taosOpenFile(file, TD_FILE_READ);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to open dnode file:%s since %s", file, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
int64_t size = 0;
|
||||
if (taosFStatFile(pFile, &size, NULL) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to fstat dnode file:%s since %s", file, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
content = taosMemoryMalloc(size + 1);
|
||||
if (content == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (taosReadFile(pFile, content, size) != size) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to read dnode file:%s since %s", file, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
content[size] = '\0';
|
||||
|
||||
pJson = tjsonParse(content);
|
||||
if (pJson == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (dmDecodeEpPairs(pJson, pData) < 0) {
|
||||
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
code = 0;
|
||||
dInfo("succceed to read mnode file %s", file);
|
||||
|
||||
_OVER:
|
||||
if (content != NULL) taosMemoryFree(content);
|
||||
if (pJson != NULL) cJSON_Delete(pJson);
|
||||
if (pFile != NULL) taosCloseFile(&pFile);
|
||||
|
||||
if (code != 0) {
|
||||
dError("failed to read dnode file:%s since %s", file, terrstr());
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->oldDnodeEps); ++i) {
|
||||
SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
|
||||
for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) {
|
||||
SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j);
|
||||
if (strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort) {
|
||||
dInfo("dnode:%d, will update ep:%s:%u to %s:%u in array", pDnodeEp->id, pDnodeEp->ep.fqdn, pDnodeEp->ep.port,
|
||||
pair->newFqdn, pair->newPort);
|
||||
tstrncpy(pDnodeEp->ep.fqdn, pair->newFqdn, TSDB_FQDN_LEN);
|
||||
pDnodeEp->ep.port = pair->newPort;
|
||||
}
|
||||
}
|
||||
void *pIter = taosHashIterate(pData->dnodeHash, NULL);
|
||||
while (pIter) {
|
||||
SDnodeEp *pDnodeEp = pIter;
|
||||
if (strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort) {
|
||||
dDebug("dnode:%d, will update ep:%s:%u to %s:%u in hash", pDnodeEp->id, pDnodeEp->ep.fqdn, pDnodeEp->ep.port,
|
||||
pair->newFqdn, pair->newPort);
|
||||
tstrncpy(pDnodeEp->ep.fqdn, pair->newFqdn, TSDB_FQDN_LEN);
|
||||
pDnodeEp->ep.port = pair->newPort;
|
||||
}
|
||||
pIter = taosHashIterate(pData->dnodeHash, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pData->dnodeEps) == 0) {
|
||||
SDnodeEp dnodeEp = {0};
|
||||
dnodeEp.isMnode = 1;
|
||||
taosGetFqdnPortFromEp(tsFirst, &dnodeEp.ep);
|
||||
taosArrayPush(pData->dnodeEps, &dnodeEp);
|
||||
}
|
||||
|
||||
dDebug("reset dnode list on startup");
|
||||
dmResetEps(pData, pData->dnodeEps);
|
||||
|
||||
if (dmIsEpChanged(pData, pData->dnodeId, tsLocalEp)) {
|
||||
dError("localEp %s different with %s and need reconfigured", tsLocalEp, file);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue