status msg
This commit is contained in:
parent
17be3f19aa
commit
0555cf4b1e
|
@ -638,10 +638,6 @@ typedef struct {
|
|||
int32_t tSerializeSStatusReq(void** buf, SStatusReq* pReq);
|
||||
void* tDeserializeSStatusReq(void* buf, SStatusReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int32_t reserved;
|
||||
} STransReq;
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
int64_t clusterId;
|
||||
|
@ -650,21 +646,23 @@ typedef struct {
|
|||
typedef struct {
|
||||
int32_t id;
|
||||
int8_t isMnode;
|
||||
int8_t align;
|
||||
SEp ep;
|
||||
} SDnodeEp;
|
||||
|
||||
typedef struct {
|
||||
int32_t num;
|
||||
SDnodeEp eps[];
|
||||
} SDnodeEps;
|
||||
|
||||
typedef struct {
|
||||
int32_t mver;
|
||||
int64_t dver;
|
||||
SDnodeCfg dnodeCfg;
|
||||
SDnodeEps dnodeEps;
|
||||
SArray* pDnodeEps; // Array of SDnodeEp
|
||||
} SStatusRsp;
|
||||
|
||||
int32_t tSerializeSStatusRsp(void** buf, SStatusRsp* pRsp);
|
||||
void* tDeserializeSStatusRsp(void* buf, SStatusRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
int32_t mver;
|
||||
} STransReq;
|
||||
|
||||
typedef struct {
|
||||
int32_t id;
|
||||
uint16_t port; // node sync Port
|
||||
|
@ -1201,8 +1199,6 @@ typedef struct {
|
|||
|
||||
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
|
||||
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
|
||||
int32_t tSerializeSVCreateTbRsp(void** buf, SVCreateTbRsp* pRsp);
|
||||
void* tDeserializeSVCreateTbRsp(void* buf, SVCreateTbRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
uint64_t ver; // use a general definition
|
||||
|
@ -1214,8 +1210,6 @@ typedef struct {
|
|||
|
||||
int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
|
||||
void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq);
|
||||
int32_t tSerializeSVCreateTbBatchRsp(void** buf, SVCreateTbBatchRsp* pRsp);
|
||||
void* tDeserializeSVCreateTbBatchRsp(void* buf, SVCreateTbBatchRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
uint64_t ver;
|
||||
|
@ -1229,8 +1223,6 @@ typedef struct {
|
|||
|
||||
int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq);
|
||||
void* tDeserializeSVDropTbReq(void* buf, SVDropTbReq* pReq);
|
||||
int32_t tSerializeSVDropTbRsp(void** buf, SVDropTbRsp* pRsp);
|
||||
void* tDeserializeSVDropTbRsp(void* buf, SVDropTbRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
|
|
|
@ -543,3 +543,61 @@ void *tDeserializeSStatusReq(void *buf, SStatusReq *pReq) {
|
|||
|
||||
return buf;
|
||||
}
|
||||
|
||||
int32_t tSerializeSStatusRsp(void **buf, SStatusRsp *pRsp) {
|
||||
int32_t tlen = 0;
|
||||
|
||||
// status
|
||||
tlen += taosEncodeFixedI32(buf, pRsp->mver);
|
||||
tlen += taosEncodeFixedI64(buf, pRsp->dver);
|
||||
|
||||
// dnode cfg
|
||||
tlen += taosEncodeFixedI32(buf, pRsp->dnodeCfg.dnodeId);
|
||||
tlen += taosEncodeFixedI64(buf, pRsp->dnodeCfg.clusterId);
|
||||
|
||||
// dnode eps
|
||||
int32_t dlen = (int32_t)taosArrayGetSize(pRsp->pDnodeEps);
|
||||
tlen += taosEncodeFixedI32(buf, dlen);
|
||||
for (int32_t i = 0; i < dlen; ++i) {
|
||||
SDnodeEp *pDnodeEp = taosArrayGet(pRsp->pDnodeEps, i);
|
||||
tlen += taosEncodeFixedI32(buf, pDnodeEp->id);
|
||||
tlen += taosEncodeFixedI8(buf, pDnodeEp->isMnode);
|
||||
tlen += taosEncodeString(buf, pDnodeEp->ep.fqdn);
|
||||
tlen += taosEncodeFixedU16(buf, pDnodeEp->ep.port);
|
||||
}
|
||||
|
||||
return tlen;
|
||||
}
|
||||
|
||||
void *tDeserializeSStatusRsp(void *buf, SStatusRsp *pRsp) {
|
||||
// status
|
||||
buf = taosDecodeFixedI32(buf, &pRsp->mver);
|
||||
buf = taosDecodeFixedI64(buf, &pRsp->dver);
|
||||
|
||||
// cluster cfg
|
||||
buf = taosDecodeFixedI32(buf, &pRsp->dnodeCfg.dnodeId);
|
||||
buf = taosDecodeFixedI64(buf, &pRsp->dnodeCfg.clusterId);
|
||||
|
||||
// dnode eps
|
||||
int32_t dlen = 0;
|
||||
buf = taosDecodeFixedI32(buf, &dlen);
|
||||
pRsp->pDnodeEps = taosArrayInit(dlen, sizeof(SDnodeEp));
|
||||
if (pRsp->pDnodeEps == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < dlen; ++i) {
|
||||
SDnodeEp dnodeEp = {0};
|
||||
buf = taosDecodeFixedI32(buf, &dnodeEp.id);
|
||||
buf = taosDecodeFixedI8(buf, &dnodeEp.isMnode);
|
||||
buf = taosDecodeStringTo(buf, dnodeEp.ep.fqdn);
|
||||
buf = taosDecodeFixedU16(buf, &dnodeEp.ep.port);
|
||||
if (taosArrayPush(pRsp->pDnodeEps, &dnodeEp) == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ typedef struct {
|
|||
SEpSet mnodeEpSet;
|
||||
char *file;
|
||||
SHashObj *dnodeHash;
|
||||
SDnodeEps *dnodeEps;
|
||||
SArray *pDnodeEps;
|
||||
pthread_t *threadId;
|
||||
SRWLatch latch;
|
||||
SDnodeWorker mgmtWorker;
|
||||
|
|
|
@ -113,35 +113,31 @@ static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
|
|||
static void dndPrintDnodes(SDnode *pDnode) {
|
||||
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||
|
||||
dDebug("print dnode ep list, num:%d", pMgmt->dnodeEps->num);
|
||||
for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) {
|
||||
SDnodeEp *pEp = &pMgmt->dnodeEps->eps[i];
|
||||
int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps);
|
||||
dDebug("print dnode ep list, num:%d", numOfEps);
|
||||
for (int32_t i = 0; i < numOfEps; i++) {
|
||||
SDnodeEp *pEp = taosArrayGet(pMgmt->pDnodeEps, i);
|
||||
dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode);
|
||||
}
|
||||
}
|
||||
|
||||
static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) {
|
||||
static void dndResetDnodes(SDnode *pDnode, SArray *pDnodeEps) {
|
||||
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||
|
||||
int32_t size = sizeof(SDnodeEps) + pDnodeEps->num * sizeof(SDnodeEp);
|
||||
if (pDnodeEps->num > pMgmt->dnodeEps->num) {
|
||||
SDnodeEps *tmp = calloc(1, size);
|
||||
if (tmp == NULL) return;
|
||||
|
||||
tfree(pMgmt->dnodeEps);
|
||||
pMgmt->dnodeEps = tmp;
|
||||
}
|
||||
|
||||
if (pMgmt->dnodeEps != pDnodeEps) {
|
||||
memcpy(pMgmt->dnodeEps, pDnodeEps, size);
|
||||
if (pMgmt->pDnodeEps != pDnodeEps) {
|
||||
SArray *tmp = pMgmt->pDnodeEps;
|
||||
pMgmt->pDnodeEps = taosArrayDup(pDnodeEps);
|
||||
taosArrayDestroy(tmp);
|
||||
}
|
||||
|
||||
pMgmt->mnodeEpSet.inUse = 0;
|
||||
pMgmt->mnodeEpSet.numOfEps = 0;
|
||||
|
||||
int32_t mIndex = 0;
|
||||
for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) {
|
||||
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
|
||||
int32_t numOfEps = (int32_t)taosArrayGetSize(pDnodeEps);
|
||||
|
||||
for (int32_t i = 0; i < numOfEps; i++) {
|
||||
SDnodeEp *pDnodeEp = taosArrayGet(pDnodeEps, i);
|
||||
if (!pDnodeEp->isMnode) continue;
|
||||
if (mIndex >= TSDB_MAX_REPLICA) continue;
|
||||
pMgmt->mnodeEpSet.numOfEps++;
|
||||
|
@ -150,8 +146,8 @@ static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) {
|
|||
mIndex++;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) {
|
||||
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
|
||||
for (int32_t i = 0; i < numOfEps; i++) {
|
||||
SDnodeEp *pDnodeEp = taosArrayGet(pDnodeEps, i);
|
||||
taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
|
||||
}
|
||||
|
||||
|
@ -178,6 +174,12 @@ static bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp) {
|
|||
static int32_t dndReadDnodes(SDnode *pDnode) {
|
||||
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||
|
||||
pMgmt->pDnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
|
||||
if (pMgmt->pDnodeEps == NULL) {
|
||||
dError("failed to calloc dnodeEp array since %s", strerror(errno));
|
||||
goto PRASE_DNODE_OVER;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR;
|
||||
int32_t len = 0;
|
||||
int32_t maxLen = 256 * 1024;
|
||||
|
@ -238,18 +240,11 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
|
|||
goto PRASE_DNODE_OVER;
|
||||
}
|
||||
|
||||
pMgmt->dnodeEps = calloc(1, numOfDnodes * sizeof(SDnodeEp) + sizeof(SDnodeEps));
|
||||
if (pMgmt->dnodeEps == NULL) {
|
||||
dError("failed to calloc dnodeEpList since %s", strerror(errno));
|
||||
goto PRASE_DNODE_OVER;
|
||||
}
|
||||
pMgmt->dnodeEps->num = numOfDnodes;
|
||||
|
||||
for (int32_t i = 0; i < numOfDnodes; ++i) {
|
||||
cJSON *node = cJSON_GetArrayItem(dnodes, i);
|
||||
if (node == NULL) break;
|
||||
|
||||
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
|
||||
SDnodeEp dnodeEp = {0};
|
||||
|
||||
cJSON *did = cJSON_GetObjectItem(node, "id");
|
||||
if (!did || did->type != cJSON_Number) {
|
||||
|
@ -257,14 +252,14 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
|
|||
goto PRASE_DNODE_OVER;
|
||||
}
|
||||
|
||||
pDnodeEp->id = dnodeId->valueint;
|
||||
dnodeEp.id = dnodeId->valueint;
|
||||
|
||||
cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
|
||||
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
|
||||
dError("failed to read %s since dnodeFqdn not found", pMgmt->file);
|
||||
goto PRASE_DNODE_OVER;
|
||||
}
|
||||
tstrncpy(pDnodeEp->ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
|
||||
tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
|
||||
|
||||
cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
|
||||
if (!dnodePort || dnodePort->type != cJSON_Number) {
|
||||
|
@ -272,14 +267,16 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
|
|||
goto PRASE_DNODE_OVER;
|
||||
}
|
||||
|
||||
pDnodeEp->ep.port = dnodePort->valueint;
|
||||
dnodeEp.ep.port = dnodePort->valueint;
|
||||
|
||||
cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
|
||||
if (!isMnode || isMnode->type != cJSON_Number) {
|
||||
dError("failed to read %s since isMnode not found", pMgmt->file);
|
||||
goto PRASE_DNODE_OVER;
|
||||
}
|
||||
pDnodeEp->isMnode = isMnode->valueint;
|
||||
dnodeEp.isMnode = isMnode->valueint;
|
||||
|
||||
taosArrayPush(pMgmt->pDnodeEps, &dnodeEp);
|
||||
}
|
||||
|
||||
code = 0;
|
||||
|
@ -296,15 +293,14 @@ PRASE_DNODE_OVER:
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (pMgmt->dnodeEps == NULL) {
|
||||
pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp));
|
||||
pMgmt->dnodeEps->num = 1;
|
||||
pMgmt->dnodeEps->eps[0].isMnode = 1;
|
||||
|
||||
taosGetFqdnPortFromEp(pDnode->cfg.firstEp, &(pMgmt->dnodeEps->eps[0].ep));
|
||||
if (taosArrayGetSize(pMgmt->pDnodeEps) == 0) {
|
||||
SDnodeEp dnodeEp = {0};
|
||||
dnodeEp.isMnode = 1;
|
||||
taosGetFqdnPortFromEp(pDnode->cfg.firstEp, &dnodeEp.ep);
|
||||
taosArrayPush(pMgmt->pDnodeEps, &dnodeEp);
|
||||
}
|
||||
|
||||
dndResetDnodes(pDnode, pMgmt->dnodeEps);
|
||||
dndResetDnodes(pDnode, pMgmt->pDnodeEps);
|
||||
|
||||
terrno = 0;
|
||||
return 0;
|
||||
|
@ -329,13 +325,15 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
|
|||
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId);
|
||||
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped);
|
||||
len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n");
|
||||
for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) {
|
||||
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
|
||||
|
||||
int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps);
|
||||
for (int32_t i = 0; i < numOfEps; ++i) {
|
||||
SDnodeEp *pDnodeEp = taosArrayGet(pMgmt->pDnodeEps, i);
|
||||
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id);
|
||||
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn);
|
||||
len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->ep.port);
|
||||
len += snprintf(content + len, maxLen - len, " \"isMnode\": %d\n", pDnodeEp->isMnode);
|
||||
if (i < pMgmt->dnodeEps->num - 1) {
|
||||
if (i < numOfEps - 1) {
|
||||
len += snprintf(content + len, maxLen - len, " },{\n");
|
||||
} else {
|
||||
len += snprintf(content + len, maxLen - len, " }]\n");
|
||||
|
@ -385,6 +383,7 @@ void dndSendStatusReq(SDnode *pDnode) {
|
|||
void *pHead = rpcMallocCont(contLen);
|
||||
void *pBuf = pHead;
|
||||
tSerializeSStatusReq(&pBuf, &req);
|
||||
taosArrayDestroy(req.pVloads);
|
||||
|
||||
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527};
|
||||
pMgmt->statusSent = 1;
|
||||
|
@ -405,18 +404,20 @@ static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
|
|||
}
|
||||
}
|
||||
|
||||
static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) {
|
||||
if (pDnodeEps == NULL || pDnodeEps->num <= 0) return;
|
||||
static void dndUpdateDnodeEps(SDnode *pDnode, SArray *pDnodeEps) {
|
||||
int32_t numOfEps = taosArrayGetSize(pDnodeEps);
|
||||
if (numOfEps <= 0) return;
|
||||
|
||||
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||
taosWLockLatch(&pMgmt->latch);
|
||||
|
||||
if (pDnodeEps->num != pMgmt->dnodeEps->num) {
|
||||
int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps);
|
||||
if (numOfEps != numOfEpsOld) {
|
||||
dndResetDnodes(pDnode, pDnodeEps);
|
||||
dndWriteDnodes(pDnode);
|
||||
} else {
|
||||
int32_t size = pDnodeEps->num * sizeof(SDnodeEp) + sizeof(SDnodeEps);
|
||||
if (memcmp(pMgmt->dnodeEps, pDnodeEps, size) != 0) {
|
||||
int32_t size = numOfEps * sizeof(SDnodeEp);
|
||||
if (memcmp(pMgmt->pDnodeEps->pData, pDnodeEps->pData, size) != 0) {
|
||||
dndResetDnodes(pDnode, pDnodeEps);
|
||||
dndWriteDnodes(pDnode);
|
||||
}
|
||||
|
@ -429,33 +430,21 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
|
|||
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||
|
||||
if (pRsp->code != TSDB_CODE_SUCCESS) {
|
||||
pMgmt->statusSent = 0;
|
||||
if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) {
|
||||
dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId);
|
||||
pMgmt->dropped = 1;
|
||||
dndWriteDnodes(pDnode);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (pRsp->pCont != NULL && pRsp->contLen != 0) {
|
||||
SStatusRsp *pStatus = pRsp->pCont;
|
||||
pMgmt->dver = htobe64(pStatus->dver);
|
||||
|
||||
SDnodeCfg *pCfg = &pStatus->dnodeCfg;
|
||||
pCfg->dnodeId = htonl(pCfg->dnodeId);
|
||||
pCfg->clusterId = htobe64(pCfg->clusterId);
|
||||
dndUpdateDnodeCfg(pDnode, pCfg);
|
||||
|
||||
SDnodeEps *pDnodeEps = &pStatus->dnodeEps;
|
||||
pDnodeEps->num = htonl(pDnodeEps->num);
|
||||
for (int32_t i = 0; i < pDnodeEps->num; ++i) {
|
||||
pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id);
|
||||
pDnodeEps->eps[i].ep.port = htons(pDnodeEps->eps[i].ep.port);
|
||||
} else {
|
||||
SStatusRsp statusRsp = {0};
|
||||
if (pRsp->pCont != NULL && pRsp->contLen != 0 && tDeserializeSStatusRsp(pRsp->pCont, &statusRsp) != NULL) {
|
||||
pMgmt->dver = statusRsp.dver;
|
||||
dndUpdateDnodeCfg(pDnode, &statusRsp.dnodeCfg);
|
||||
dndUpdateDnodeEps(pDnode, statusRsp.pDnodeEps);
|
||||
}
|
||||
|
||||
dndUpdateDnodeEps(pDnode, pDnodeEps);
|
||||
taosArrayDestroy(statusRsp.pDnodeEps);
|
||||
}
|
||||
|
||||
pMgmt->statusSent = 0;
|
||||
}
|
||||
|
||||
|
@ -570,9 +559,9 @@ void dndCleanupMgmt(SDnode *pDnode) {
|
|||
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||
taosWLockLatch(&pMgmt->latch);
|
||||
|
||||
if (pMgmt->dnodeEps != NULL) {
|
||||
free(pMgmt->dnodeEps);
|
||||
pMgmt->dnodeEps = NULL;
|
||||
if (pMgmt->pDnodeEps != NULL) {
|
||||
taosArrayDestroy(pMgmt->pDnodeEps);
|
||||
pMgmt->pDnodeEps = NULL;
|
||||
}
|
||||
|
||||
if (pMgmt->dnodeHash != NULL) {
|
||||
|
|
|
@ -244,7 +244,7 @@ bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t maxEps) {
|
||||
static void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
int32_t numOfEps = 0;
|
||||
|
@ -253,25 +253,20 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t maxEps) {
|
|||
SDnodeObj *pDnode = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
|
||||
if (pIter == NULL) break;
|
||||
if (numOfEps >= maxEps) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pDnode);
|
||||
break;
|
||||
}
|
||||
|
||||
SDnodeEp *pEp = &pEps->eps[numOfEps];
|
||||
pEp->id = htonl(pDnode->id);
|
||||
pEp->ep.port = htons(pDnode->port);
|
||||
memcpy(pEp->ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
pEp->isMnode = 0;
|
||||
SDnodeEp dnodeEp = {0};
|
||||
dnodeEp.id = pDnode->id;
|
||||
dnodeEp.isMnode = 0;
|
||||
dnodeEp.ep.port = pDnode->port;
|
||||
memcpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
|
||||
if (mndIsMnode(pMnode, pDnode->id)) {
|
||||
pEp->isMnode = 1;
|
||||
dnodeEp.isMnode = 1;
|
||||
}
|
||||
numOfEps++;
|
||||
sdbRelease(pSdb, pDnode);
|
||||
}
|
||||
|
||||
pEps->num = htonl(numOfEps);
|
||||
sdbRelease(pSdb, pDnode);
|
||||
taosArrayPush(pDnodeEps, &dnodeEp);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
|
||||
|
@ -379,21 +374,26 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
|
|||
pDnode->numOfCores = statusReq.numOfCores;
|
||||
pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
|
||||
|
||||
int32_t numOfEps = mndGetDnodeSize(pMnode);
|
||||
int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp);
|
||||
SStatusRsp *pRsp = rpcMallocCont(contLen);
|
||||
if (pRsp == NULL) {
|
||||
SStatusRsp statusRsp = {0};
|
||||
statusRsp.dver = sdbGetTableVer(pMnode->pSdb, SDB_DNODE);
|
||||
statusRsp.dnodeCfg.dnodeId = pDnode->id;
|
||||
statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
|
||||
statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
|
||||
if (statusRsp.pDnodeEps == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto PROCESS_STATUS_MSG_OVER;
|
||||
}
|
||||
|
||||
pRsp->dver = htobe64(sdbGetTableVer(pMnode->pSdb, SDB_DNODE));
|
||||
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
|
||||
pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId);
|
||||
mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
|
||||
mndGetDnodeData(pMnode, statusRsp.pDnodeEps);
|
||||
|
||||
int32_t contLen = tSerializeSStatusRsp(NULL, &statusRsp);
|
||||
void *pHead = rpcMallocCont(contLen);
|
||||
void *pBuf = pHead;
|
||||
tSerializeSStatusRsp(&pBuf, &statusRsp);
|
||||
taosArrayDestroy(statusRsp.pDnodeEps);
|
||||
|
||||
pReq->contLen = contLen;
|
||||
pReq->pCont = pRsp;
|
||||
pReq->pCont = pHead;
|
||||
}
|
||||
|
||||
pDnode->lastAccessTime = curMs;
|
||||
|
@ -401,6 +401,7 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
|
|||
|
||||
PROCESS_STATUS_MSG_OVER:
|
||||
mndReleaseDnode(pMnode, pDnode);
|
||||
taosArrayDestroy(statusReq.pVloads);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -592,8 +592,8 @@ CREATE_STB_OVER:
|
|||
mndReleaseStb(pMnode, pStb);
|
||||
mndReleaseStb(pMnode, pTopicStb);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
taosArrayClear(createReq.pColumns);
|
||||
taosArrayClear(createReq.pTags);
|
||||
taosArrayDestroy(createReq.pColumns);
|
||||
taosArrayDestroy(createReq.pTags);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1049,7 +1049,7 @@ ALTER_STB_OVER:
|
|||
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
taosArrayClear(alterReq.pFields);
|
||||
taosArrayDestroy(alterReq.pFields);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -276,6 +276,8 @@ char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* len, SParseCo
|
|||
|
||||
void* buf = req;
|
||||
tSerializeSMCreateStbReq(&buf, &createReq);
|
||||
taosArrayDestroy(createReq.pColumns);
|
||||
taosArrayDestroy(createReq.pTags);
|
||||
*len = tlen;
|
||||
return req;
|
||||
}
|
||||
|
|
|
@ -1383,7 +1383,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
|
|||
static void rpcProcessConnError(void *param, void *id) {
|
||||
SRpcReqContext *pContext = (SRpcReqContext *)param;
|
||||
SRpcInfo * pRpc = pContext->pRpc;
|
||||
SRpcMsg rpcMsg;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
|
||||
if (pRpc == NULL) {
|
||||
return;
|
||||
|
|
Loading…
Reference in New Issue