enh: refact dmEps.c

This commit is contained in:
Shengliang Guan 2023-01-11 11:06:16 +08:00
parent 9a95c3d7ab
commit ba7f9f1df1
1 changed files with 70 additions and 102 deletions

View File

@ -41,14 +41,49 @@ static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pF
taosThreadRwlockUnlock(&pData->lock); taosThreadRwlockUnlock(&pData->lock);
} }
static int32_t dmDecodeEps(SJson *pJson, SDnodeData *pData) {
int32_t code = 0;
tjsonGetInt32ValueFromDouble(pJson, "dnodeId", pData->dnodeId, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "dnodeVer", pData->dnodeVer, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "clusterId", pData->clusterId, code);
if (code < 0) return -1;
tjsonGetInt32ValueFromDouble(pJson, "dropped", pData->dropped, code);
if (code < 0) return -1;
SJson *dnodes = tjsonGetObjectItem(pJson, "dnodes");
if (dnodes == NULL) return 0;
int32_t numOfDnodes = tjsonGetArraySize(dnodes);
for (int32_t i = 0; i < numOfDnodes; ++i) {
SJson *dnode = tjsonGetArrayItem(dnodes, i);
if (dnode == NULL) return -1;
SDnodeEp dnodeEp = {0};
tjsonGetInt32ValueFromDouble(dnode, "id", dnodeEp.id, code);
if (code < 0) return -1;
code = tjsonGetStringValue(dnode, "fqdn", dnodeEp.ep.fqdn);
if (code < 0) return -1;
tjsonGetUInt16ValueFromDouble(dnode, "port", dnodeEp.ep.port, code);
if (code < 0) return -1;
tjsonGetInt8ValueFromDouble(dnode, "isMnode", dnodeEp.isMnode, code);
if (code < 0) return -1;
if (taosArrayPush(pData->dnodeEps, &dnodeEp) == NULL) return -1;
}
return 0;
}
int32_t dmReadEps(SDnodeData *pData) { int32_t dmReadEps(SDnodeData *pData) {
int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int32_t code = -1;
int32_t len = 0;
int32_t maxLen = 256 * 1024;
char *content = taosMemoryCalloc(1, maxLen + 1);
cJSON *root = NULL;
char file[PATH_MAX] = {0};
TdFilePtr pFile = NULL; TdFilePtr pFile = NULL;
char *content = NULL;
SJson *pJson = NULL;
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
pData->dnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); pData->dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
if (pData->dnodeEps == NULL) { if (pData->dnodeEps == NULL) {
@ -56,129 +91,62 @@ int32_t dmReadEps(SDnodeData *pData) {
goto _OVER; goto _OVER;
} }
snprintf(file, sizeof(file), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP); if (taosStatFile(file, NULL, NULL) < 0) {
pFile = taosOpenFile(file, TD_FILE_READ); dInfo("dnode file:%s not exist", file);
if (pFile == NULL) {
code = 0; code = 0;
goto _OVER; goto _OVER;
} }
len = (int32_t)taosReadFile(pFile, content, maxLen); pFile = taosOpenFile(file, TD_FILE_READ);
if (len <= 0) { if (pFile == NULL) {
dError("failed to read %s since content is null", file); terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to open dnode file:%s since %s", file, terrstr());
goto _OVER; goto _OVER;
} }
content[len] = 0; int64_t size = 0;
root = cJSON_Parse(content); if (taosFStatFile(pFile, &size, NULL) < 0) {
if (root == NULL) { terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to read %s since invalid json format", file); dError("failed to fstat dnode file:%s since %s", file, terrstr());
goto _OVER; goto _OVER;
} }
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); content = taosMemoryMalloc(size + 1);
if (!dnodeId || dnodeId->type != cJSON_Number) { if (content == NULL) {
dError("failed to read %s since dnodeId not found", file); terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
pData->dnodeId = dnodeId->valueint;
cJSON *dnodeVer = cJSON_GetObjectItem(root, "dnodeVer");
if (!dnodeVer || dnodeVer->type != cJSON_String) {
dError("failed to read %s since dnodeVer not found", file);
goto _OVER;
}
pData->dnodeVer = atoll(dnodeVer->valuestring);
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s since clusterId not found", file);
goto _OVER;
}
pData->clusterId = atoll(clusterId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", file);
goto _OVER;
}
pData->dropped = dropped->valueint;
cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes");
if (!dnodes || dnodes->type != cJSON_Array) {
dError("failed to read %s since dnodes not found", file);
goto _OVER; goto _OVER;
} }
int32_t numOfDnodes = cJSON_GetArraySize(dnodes); if (taosReadFile(pFile, content, size) != size) {
if (numOfDnodes <= 0) { terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to read %s since numOfDnodes:%d invalid", file, numOfDnodes); dError("failed to read dnode file:%s since %s", file, terrstr());
goto _OVER; goto _OVER;
} }
for (int32_t i = 0; i < numOfDnodes; ++i) { content[size] = '\0';
cJSON *node = cJSON_GetArrayItem(dnodes, i);
if (node == NULL) break;
SDnodeEp dnodeEp = {0}; pJson = tjsonParse(content);
if (pJson == NULL) {
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
goto _OVER;
}
cJSON *did = cJSON_GetObjectItem(node, "id"); if (dmDecodeEps(pJson, pData) < 0) {
if (!did || did->type != cJSON_Number) { terrno = TSDB_CODE_INVALID_JSON_FORMAT;
dError("failed to read %s since dnodeId not found", file); goto _OVER;
goto _OVER;
}
dnodeEp.id = did->valueint;
cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s since dnodeFqdn not found", file);
goto _OVER;
}
tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
if (!dnodePort || dnodePort->type != cJSON_Number) {
dError("failed to read %s since dnodePort not found", file);
goto _OVER;
}
dnodeEp.ep.port = dnodePort->valueint;
cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
if (!isMnode || isMnode->type != cJSON_Number) {
dError("failed to read %s since isMnode not found", file);
goto _OVER;
}
dnodeEp.isMnode = isMnode->valueint;
taosArrayPush(pData->dnodeEps, &dnodeEp);
} }
code = 0; code = 0;
dDebug("succcessed to read file %s", file); dInfo("succceed to read mnode file %s", file);
_OVER: _OVER:
if (content != NULL) taosMemoryFree(content); if (content != NULL) taosMemoryFree(content);
if (root != NULL) cJSON_Delete(root); if (pJson != NULL) cJSON_Delete(pJson);
if (pFile != NULL) taosCloseFile(&pFile); if (pFile != NULL) taosCloseFile(&pFile);
if (taosArrayGetSize(pData->dnodeEps) == 0) { if (code != 0) {
SDnodeEp dnodeEp = {0}; dError("failed to read dnode file:%s since %s", file, terrstr());
dnodeEp.isMnode = 1;
taosGetFqdnPortFromEp(tsFirst, &dnodeEp.ep);
taosArrayPush(pData->dnodeEps, &dnodeEp);
} }
dDebug("reset dnode list on startup");
dmResetEps(pData, pData->dnodeEps);
if (dmIsEpChanged(pData, pData->dnodeId, tsLocalEp)) {
dError("localEp %s different with %s and need reconfigured", tsLocalEp, file);
return -1;
}
terrno = code;
return code; return code;
} }