TD-10431 process status msg
This commit is contained in:
parent
cebbc0719b
commit
e9ef9a42c2
|
@ -115,25 +115,25 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223) //"Value out of range")
|
#define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223) //"Value out of range")
|
||||||
|
|
||||||
// mnode
|
// mnode
|
||||||
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed")
|
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300)
|
||||||
#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0301) //"Message is progressing")
|
#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0301)
|
||||||
#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0302) //"Messag need to be reprocessed")
|
#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0302)
|
||||||
#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0303) //"Insufficient privilege for operation")
|
#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0303)
|
||||||
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0304) //"Unexpected generic error in mnode")
|
#define TSDB_CODE_MND_INVALID_OPTIONS TAOS_DEF_ERROR_CODE(0, 0x0304)
|
||||||
#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0305) //"Invalid message connection")
|
#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0305)
|
||||||
#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0306) //"Incompatible protocol version")
|
#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0306)
|
||||||
#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0307) //"Invalid message length")
|
#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0307)
|
||||||
#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0308) //"Invalid message type")
|
#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0308)
|
||||||
#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x0309) //"Too many connections")
|
#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x0309)
|
||||||
#define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x030B) //"Data expired")
|
#define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x030B)
|
||||||
#define TSDB_CODE_MND_INVALID_QUERY_ID TAOS_DEF_ERROR_CODE(0, 0x030C) //"Invalid query id")
|
#define TSDB_CODE_MND_INVALID_QUERY_ID TAOS_DEF_ERROR_CODE(0, 0x030C)
|
||||||
#define TSDB_CODE_MND_INVALID_STREAM_ID TAOS_DEF_ERROR_CODE(0, 0x030D) //"Invalid stream id")
|
#define TSDB_CODE_MND_INVALID_STREAM_ID TAOS_DEF_ERROR_CODE(0, 0x030D)
|
||||||
#define TSDB_CODE_MND_INVALID_CONN_ID TAOS_DEF_ERROR_CODE(0, 0x030E) //"Invalid connection id")
|
#define TSDB_CODE_MND_INVALID_CONN_ID TAOS_DEF_ERROR_CODE(0, 0x030E)
|
||||||
#define TSDB_CODE_MND_MNODE_IS_RUNNING TAOS_DEF_ERROR_CODE(0, 0x0310) //"mnode is alreay running")
|
#define TSDB_CODE_MND_MNODE_IS_RUNNING TAOS_DEF_ERROR_CODE(0, 0x0310)
|
||||||
#define TSDB_CODE_MND_FAILED_TO_CONFIG_SYNC TAOS_DEF_ERROR_CODE(0, 0x0311) //"failed to config sync")
|
#define TSDB_CODE_MND_FAILED_TO_CONFIG_SYNC TAOS_DEF_ERROR_CODE(0, 0x0311)
|
||||||
#define TSDB_CODE_MND_FAILED_TO_START_SYNC TAOS_DEF_ERROR_CODE(0, 0x0312) //"failed to start sync")
|
#define TSDB_CODE_MND_FAILED_TO_START_SYNC TAOS_DEF_ERROR_CODE(0, 0x0312)
|
||||||
#define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313) //"failed to create mnode dir")
|
#define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313)
|
||||||
#define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) //"failed to init components")
|
#define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314)
|
||||||
|
|
||||||
#define TSDB_CODE_SDB_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0320)
|
#define TSDB_CODE_SDB_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0320)
|
||||||
#define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0321)
|
#define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0321)
|
||||||
|
|
|
@ -190,78 +190,78 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
|
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
|
||||||
if (!dnodeId || dnodeId->type != cJSON_String) {
|
if (!dnodeId || dnodeId->type != cJSON_Number) {
|
||||||
dError("failed to read %s since dnodeId not found", pMgmt->file);
|
dError("failed to read %s since dnodeId not found", pMgmt->file);
|
||||||
goto PRASE_DNODE_OVER;
|
goto PRASE_DNODE_OVER;
|
||||||
}
|
}
|
||||||
pMgmt->dnodeId = atoi(dnodeId->valuestring);
|
pMgmt->dnodeId = dnodeId->valueint;
|
||||||
|
|
||||||
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
|
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
|
||||||
if (!clusterId || clusterId->type != cJSON_String) {
|
if (!clusterId || clusterId->type != cJSON_Number) {
|
||||||
dError("failed to read %s since clusterId not found", pMgmt->file);
|
dError("failed to read %s since clusterId not found", pMgmt->file);
|
||||||
goto PRASE_DNODE_OVER;
|
goto PRASE_DNODE_OVER;
|
||||||
}
|
}
|
||||||
pMgmt->clusterId = atol(clusterId->valuestring);
|
pMgmt->clusterId = clusterId->valueint;
|
||||||
|
|
||||||
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
|
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
|
||||||
if (!dropped || dropped->type != cJSON_String) {
|
if (!dropped || dropped->type != cJSON_Number) {
|
||||||
dError("failed to read %s since dropped not found", pMgmt->file);
|
dError("failed to read %s since dropped not found", pMgmt->file);
|
||||||
goto PRASE_DNODE_OVER;
|
goto PRASE_DNODE_OVER;
|
||||||
}
|
}
|
||||||
pMgmt->dropped = atoi(dropped->valuestring);
|
pMgmt->dropped = dropped->valueint;
|
||||||
|
|
||||||
cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos");
|
cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes");
|
||||||
if (!dnodeInfos || dnodeInfos->type != cJSON_Array) {
|
if (!dnodes || dnodes->type != cJSON_Array) {
|
||||||
dError("failed to read %s since dnodeInfos not found", pMgmt->file);
|
dError("failed to read %s since dnodes not found", pMgmt->file);
|
||||||
goto PRASE_DNODE_OVER;
|
goto PRASE_DNODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos);
|
int32_t numOfNodes = cJSON_GetArraySize(dnodes);
|
||||||
if (dnodeInfosSize <= 0) {
|
if (numOfNodes <= 0) {
|
||||||
dError("failed to read %s since dnodeInfos size:%d invalid", pMgmt->file, dnodeInfosSize);
|
dError("failed to read %s since numOfNodes:%d invalid", pMgmt->file, numOfNodes);
|
||||||
goto PRASE_DNODE_OVER;
|
goto PRASE_DNODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMgmt->dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps));
|
pMgmt->dnodeEps = calloc(1, numOfNodes * sizeof(SDnodeEp) + sizeof(SDnodeEps));
|
||||||
if (pMgmt->dnodeEps == NULL) {
|
if (pMgmt->dnodeEps == NULL) {
|
||||||
dError("failed to calloc dnodeEpList since %s", strerror(errno));
|
dError("failed to calloc dnodeEpList since %s", strerror(errno));
|
||||||
goto PRASE_DNODE_OVER;
|
goto PRASE_DNODE_OVER;
|
||||||
}
|
}
|
||||||
pMgmt->dnodeEps->num = dnodeInfosSize;
|
pMgmt->dnodeEps->num = numOfNodes;
|
||||||
|
|
||||||
for (int32_t i = 0; i < dnodeInfosSize; ++i) {
|
for (int32_t i = 0; i < numOfNodes; ++i) {
|
||||||
cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i);
|
cJSON *node = cJSON_GetArrayItem(dnodes, i);
|
||||||
if (dnodeInfo == NULL) break;
|
if (node == NULL) break;
|
||||||
|
|
||||||
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
|
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
|
||||||
|
|
||||||
cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId");
|
cJSON *dnodeId = cJSON_GetObjectItem(node, "id");
|
||||||
if (!dnodeId || dnodeId->type != cJSON_String) {
|
if (!dnodeId || dnodeId->type != cJSON_Number) {
|
||||||
dError("failed to read %s, dnodeId not found", pMgmt->file);
|
dError("failed to read %s, dnodeId not found", pMgmt->file);
|
||||||
goto PRASE_DNODE_OVER;
|
goto PRASE_DNODE_OVER;
|
||||||
}
|
}
|
||||||
pDnodeEp->id = atoi(dnodeId->valuestring);
|
pDnodeEp->id = dnodeId->valueint;
|
||||||
|
|
||||||
cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode");
|
cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
|
||||||
if (!isMnode || isMnode->type != cJSON_String) {
|
|
||||||
dError("failed to read %s, isMnode not found", pMgmt->file);
|
|
||||||
goto PRASE_DNODE_OVER;
|
|
||||||
}
|
|
||||||
pDnodeEp->isMnode = atoi(isMnode->valuestring);
|
|
||||||
|
|
||||||
cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn");
|
|
||||||
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
|
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
|
||||||
dError("failed to read %s, dnodeFqdn not found", pMgmt->file);
|
dError("failed to read %s, dnodeFqdn not found", pMgmt->file);
|
||||||
goto PRASE_DNODE_OVER;
|
goto PRASE_DNODE_OVER;
|
||||||
}
|
}
|
||||||
tstrncpy(pDnodeEp->fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
|
tstrncpy(pDnodeEp->fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
|
||||||
|
|
||||||
cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort");
|
cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
|
||||||
if (!dnodePort || dnodePort->type != cJSON_String) {
|
if (!dnodePort || dnodePort->type != cJSON_Number) {
|
||||||
dError("failed to read %s, dnodePort not found", pMgmt->file);
|
dError("failed to read %s, dnodePort not found", pMgmt->file);
|
||||||
goto PRASE_DNODE_OVER;
|
goto PRASE_DNODE_OVER;
|
||||||
}
|
}
|
||||||
pDnodeEp->port = atoi(dnodePort->valuestring);
|
pDnodeEp->port = dnodePort->valueint;
|
||||||
|
|
||||||
|
cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
|
||||||
|
if (!isMnode || isMnode->type != cJSON_Number) {
|
||||||
|
dError("failed to read %s, isMnode not found", pMgmt->file);
|
||||||
|
goto PRASE_DNODE_OVER;
|
||||||
|
}
|
||||||
|
pDnodeEp->isMnode = isMnode->valueint;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -307,16 +307,16 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
|
||||||
char *content = calloc(1, maxLen + 1);
|
char *content = calloc(1, maxLen + 1);
|
||||||
|
|
||||||
len += snprintf(content + len, maxLen - len, "{\n");
|
len += snprintf(content + len, maxLen - len, "{\n");
|
||||||
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pMgmt->dnodeId);
|
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pMgmt->dnodeId);
|
||||||
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%d\",\n", pMgmt->clusterId);
|
len += snprintf(content + len, maxLen - len, " \"clusterId\": %d,\n", pMgmt->clusterId);
|
||||||
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pMgmt->dropped);
|
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped);
|
||||||
len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n");
|
len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n");
|
||||||
for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) {
|
for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) {
|
||||||
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
|
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
|
||||||
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pDnodeEp->id);
|
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id);
|
||||||
len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", pDnodeEp->isMnode);
|
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->fqdn);
|
||||||
len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", pDnodeEp->fqdn);
|
len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->port);
|
||||||
len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", pDnodeEp->port);
|
len += snprintf(content + len, maxLen - len, " \"isMnode\": %d\n", pDnodeEp->isMnode);
|
||||||
if (i < pMgmt->dnodeEps->num - 1) {
|
if (i < pMgmt->dnodeEps->num - 1) {
|
||||||
len += snprintf(content + len, maxLen - len, " },{\n");
|
len += snprintf(content + len, maxLen - len, " },{\n");
|
||||||
} else {
|
} else {
|
||||||
|
@ -344,8 +344,6 @@ static void dndSendStatusMsg(SDnode *pDnode) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool changed = false;
|
|
||||||
|
|
||||||
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||||
taosRLockLatch(&pMgmt->latch);
|
taosRLockLatch(&pMgmt->latch);
|
||||||
pStatus->sver = htonl(pDnode->opt.sver);
|
pStatus->sver = htonl(pDnode->opt.sver);
|
||||||
|
@ -440,6 +438,7 @@ static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { a
|
||||||
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); }
|
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); }
|
||||||
|
|
||||||
static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) {
|
static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
|
dDebug("config msg is received");
|
||||||
SCfgDnodeMsg *pCfg = pMsg->pCont;
|
SCfgDnodeMsg *pCfg = pMsg->pCont;
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_OPS_NOT_SUPPORT;
|
int32_t code = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
|
@ -449,12 +448,12 @@ static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) {
|
static void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
dInfo("startup msg is received");
|
dDebug("startup msg is received");
|
||||||
|
|
||||||
SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg));
|
SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg));
|
||||||
dndGetStartup(pDnode, pStartup);
|
dndGetStartup(pDnode, pStartup);
|
||||||
|
|
||||||
dInfo("startup msg is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
|
dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStartup, .contLen = sizeof(SStartupMsg)};
|
SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStartup, .contLen = sizeof(SStartupMsg)};
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
|
|
|
@ -130,43 +130,43 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
|
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
|
||||||
if (!deployed || deployed->type != cJSON_String) {
|
if (!deployed || deployed->type != cJSON_Number) {
|
||||||
dError("failed to read %s since deployed not found", pMgmt->file);
|
dError("failed to read %s since deployed not found", pMgmt->file);
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
pMgmt->deployed = atoi(deployed->valuestring);
|
pMgmt->deployed = deployed->valueint;
|
||||||
|
|
||||||
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
|
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
|
||||||
if (!dropped || dropped->type != cJSON_String) {
|
if (!dropped || dropped->type != cJSON_Number) {
|
||||||
dError("failed to read %s since dropped not found", pMgmt->file);
|
dError("failed to read %s since dropped not found", pMgmt->file);
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
pMgmt->dropped = atoi(dropped->valuestring);
|
pMgmt->dropped = dropped->valueint;
|
||||||
|
|
||||||
cJSON *nodes = cJSON_GetObjectItem(root, "nodes");
|
cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
|
||||||
if (!nodes || nodes->type != cJSON_Array) {
|
if (!mnodes || mnodes->type != cJSON_Array) {
|
||||||
dError("failed to read %s since nodes not found", pMgmt->file);
|
dError("failed to read %s since nodes not found", pMgmt->file);
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMgmt->replica = cJSON_GetArraySize(nodes);
|
pMgmt->replica = cJSON_GetArraySize(mnodes);
|
||||||
if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
|
if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
|
||||||
dError("failed to read %s since nodes size %d invalid", pMgmt->file, pMgmt->replica);
|
dError("failed to read %s since mnodes size %d invalid", pMgmt->file, pMgmt->replica);
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pMgmt->replica; ++i) {
|
for (int32_t i = 0; i < pMgmt->replica; ++i) {
|
||||||
cJSON *node = cJSON_GetArrayItem(nodes, i);
|
cJSON *node = cJSON_GetArrayItem(mnodes, i);
|
||||||
if (node == NULL) break;
|
if (node == NULL) break;
|
||||||
|
|
||||||
SReplica *pReplica = &pMgmt->replicas[i];
|
SReplica *pReplica = &pMgmt->replicas[i];
|
||||||
|
|
||||||
cJSON *id = cJSON_GetObjectItem(node, "id");
|
cJSON *id = cJSON_GetObjectItem(node, "id");
|
||||||
if (!id || id->type != cJSON_String || id->valuestring == NULL) {
|
if (!id || id->type != cJSON_Number) {
|
||||||
dError("failed to read %s since id not found", pMgmt->file);
|
dError("failed to read %s since id not found", pMgmt->file);
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
pReplica->id = atoi(id->valuestring);
|
pReplica->id = id->valueint;
|
||||||
|
|
||||||
cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
|
cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
|
||||||
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
|
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
|
||||||
|
@ -176,11 +176,11 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) {
|
||||||
tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
|
tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
|
||||||
|
|
||||||
cJSON *port = cJSON_GetObjectItem(node, "port");
|
cJSON *port = cJSON_GetObjectItem(node, "port");
|
||||||
if (!port || port->type != cJSON_String || port->valuestring == NULL) {
|
if (!port || port->type != cJSON_Number) {
|
||||||
dError("failed to read %s since port not found", pMgmt->file);
|
dError("failed to read %s since port not found", pMgmt->file);
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
pReplica->port = atoi(port->valuestring);
|
pReplica->port = port->valueint;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -213,15 +213,15 @@ static int32_t dndWriteMnodeFile(SDnode *pDnode) {
|
||||||
char *content = calloc(1, maxLen + 1);
|
char *content = calloc(1, maxLen + 1);
|
||||||
|
|
||||||
len += snprintf(content + len, maxLen - len, "{\n");
|
len += snprintf(content + len, maxLen - len, "{\n");
|
||||||
len += snprintf(content + len, maxLen - len, " \"deployed\": \"%d\",\n", pMgmt->deployed);
|
len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed);
|
||||||
|
|
||||||
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pMgmt->dropped);
|
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped);
|
||||||
len += snprintf(content + len, maxLen - len, " \"nodes\": [{\n");
|
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
|
||||||
for (int32_t i = 0; i < pMgmt->replica; ++i) {
|
for (int32_t i = 0; i < pMgmt->replica; ++i) {
|
||||||
SReplica *pReplica = &pMgmt->replicas[i];
|
SReplica *pReplica = &pMgmt->replicas[i];
|
||||||
len += snprintf(content + len, maxLen - len, " \"id\": \"%d\",\n", pReplica->id);
|
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
|
||||||
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
|
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
|
||||||
len += snprintf(content + len, maxLen - len, " \"port\": \"%u\"\n", pReplica->port);
|
len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port);
|
||||||
if (i < pMgmt->replica - 1) {
|
if (i < pMgmt->replica - 1) {
|
||||||
len += snprintf(content + len, maxLen - len, " },{\n");
|
len += snprintf(content + len, maxLen - len, " },{\n");
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -140,13 +140,13 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
|
||||||
DndMsgFp fp = pMgmt->msgFp[msgType];
|
DndMsgFp fp = pMgmt->msgFp[msgType];
|
||||||
if (fp != NULL) {
|
if (fp != NULL) {
|
||||||
dTrace("RPC %p, rsp:%s app:%p will be processed, result:%s", pMsg->handle, taosMsg[msgType], pMsg->ahandle,
|
|
||||||
tstrerror(pMsg->code));
|
|
||||||
(*fp)(pDnode, pMsg, pEpSet);
|
(*fp)(pDnode, pMsg, pEpSet);
|
||||||
|
dTrace("RPC %p, rsp:%s app:%p is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->ahandle,
|
||||||
|
pMsg->code & 0XFFFF);
|
||||||
} else {
|
} else {
|
||||||
dError("RPC %p, rsp:%s app:%p not processed", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
|
dError("RPC %p, rsp:%s app:%p not processed", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
}
|
}
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dndInitClient(SDnode *pDnode) {
|
static int32_t dndInitClient(SDnode *pDnode) {
|
||||||
|
|
|
@ -83,6 +83,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
|
||||||
mDebug("cluster:%d, uid is %s", clusterObj.id, clusterObj.uid);
|
mDebug("cluster:%d, uid is %s", clusterObj.id, clusterObj.uid);
|
||||||
}
|
}
|
||||||
clusterObj.id = MurmurHash3_32(clusterObj.uid, TSDB_CLUSTER_ID_LEN);
|
clusterObj.id = MurmurHash3_32(clusterObj.uid, TSDB_CLUSTER_ID_LEN);
|
||||||
|
clusterObj.id = abs(clusterObj.id);
|
||||||
pMnode->clusterId = clusterObj.id;
|
pMnode->clusterId = clusterObj.id;
|
||||||
|
|
||||||
SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj);
|
SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj);
|
||||||
|
|
|
@ -175,7 +175,7 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) {
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
|
|
||||||
pEps->num = i;
|
pEps->num = htonl(i);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
|
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
|
||||||
|
|
|
@ -213,7 +213,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
|
||||||
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
|
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
|
||||||
pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 ||
|
pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 ||
|
||||||
pMnode->statusInterval < 1 || pOption->mnodeEqualVnodeNum < 0) {
|
pMnode->statusInterval < 1 || pOption->mnodeEqualVnodeNum < 0) {
|
||||||
terrno = TSDB_CODE_MND_APP_ERROR;
|
terrno = TSDB_CODE_MND_INVALID_OPTIONS;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,7 +245,8 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
|
||||||
|
|
||||||
int32_t code = mnodeCreateDir(pMnode, path);
|
int32_t code = mnodeCreateDir(pMnode, path);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("failed to open mnode since %s", tstrerror(code));
|
code = terrno;
|
||||||
|
mError("failed to open mnode since %s", terrstr());
|
||||||
mndClose(pMnode);
|
mndClose(pMnode);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -253,7 +254,8 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
|
||||||
|
|
||||||
code = mndSetOptions(pMnode, pOption);
|
code = mndSetOptions(pMnode, pOption);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("failed to open mnode since %s", tstrerror(code));
|
code = terrno;
|
||||||
|
mError("failed to open mnode since %s", terrstr());
|
||||||
mndClose(pMnode);
|
mndClose(pMnode);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -261,7 +263,8 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
|
||||||
|
|
||||||
code = mndInitSteps(pMnode);
|
code = mndInitSteps(pMnode);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("failed to open mnode since %s", tstrerror(code));
|
code = terrno;
|
||||||
|
mError("failed to open mnode since %s", terrstr());
|
||||||
mndClose(pMnode);
|
mndClose(pMnode);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -269,7 +272,8 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
|
||||||
|
|
||||||
code = mndExecSteps(pMnode);
|
code = mndExecSteps(pMnode);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("failed to open mnode since %s", tstrerror(code));
|
code = terrno;
|
||||||
|
mError("failed to open mnode since %s", terrstr());
|
||||||
mndClose(pMnode);
|
mndClose(pMnode);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -237,7 +237,15 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
|
||||||
SRWLatch *pLock = &pSdb->locks[type];
|
SRWLatch *pLock = &pSdb->locks[type];
|
||||||
taosRLockLatch(pLock);
|
taosRLockLatch(pLock);
|
||||||
|
|
||||||
SSdbRow **ppRow = taosHashIterate(hash, ppRow);
|
if (pIter != NULL) {
|
||||||
|
SSdbRow *pLastRow = *(SSdbRow **)pIter;
|
||||||
|
int32_t ref = atomic_sub_fetch_32(&pLastRow->refCount, 1);
|
||||||
|
if (ref <= 0 && pLastRow->status == SDB_STATUS_DROPPED) {
|
||||||
|
sdbFreeRow(pLastRow);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SSdbRow **ppRow = taosHashIterate(hash, pIter);
|
||||||
while (ppRow != NULL) {
|
while (ppRow != NULL) {
|
||||||
SSdbRow *pRow = *ppRow;
|
SSdbRow *pRow = *ppRow;
|
||||||
if (pRow == NULL || pRow->status != SDB_STATUS_READY) {
|
if (pRow == NULL || pRow->status != SDB_STATUS_READY) {
|
||||||
|
|
|
@ -130,7 +130,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_IN_PROGRESS, "Message is progressing")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_IN_PROGRESS, "Message is progressing")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_NEED_REPROCESSED, "Message need to be reprocessed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_NEED_REPROCESSED, "Message need to be reprocessed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_RIGHTS, "Insufficient privilege for operation")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_RIGHTS, "Insufficient privilege for operation")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Unexpected generic error in mnode")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_OPTIONS, "Invalid mnode options")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONNECTION, "Invalid message connection")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONNECTION, "Invalid message connection")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_VERSION, "Incompatible protocol version")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_VERSION, "Incompatible protocol version")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_LEN, "Invalid message length")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_LEN, "Invalid message length")
|
||||||
|
|
Loading…
Reference in New Issue