Merge pull request #10193 from taosdata/feature/mnode

status msg
This commit is contained in:
Shengliang Guan 2022-02-10 16:05:55 +08:00 committed by GitHub
commit 3662181ff2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 299 additions and 206 deletions

View File

@ -614,7 +614,6 @@ typedef struct {
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int8_t role; int8_t role;
int8_t align[3];
int64_t totalStorage; int64_t totalStorage;
int64_t compStorage; int64_t compStorage;
int64_t pointsWritten; int64_t pointsWritten;
@ -622,27 +621,22 @@ typedef struct {
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {
int32_t num; int32_t mver; // msg version
SVnodeLoad data[]; int32_t sver; // software version
} SVnodeLoads; int64_t dver; // dnode table version in sdb
typedef struct {
int32_t sver;
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
int64_t dver;
int64_t rebootTime; int64_t rebootTime;
int64_t updateTime; int64_t updateTime;
int32_t numOfCores; int32_t numOfCores;
int32_t numOfSupportVnodes; int32_t numOfSupportVnodes;
char dnodeEp[TSDB_EP_LEN]; char dnodeEp[TSDB_EP_LEN];
SClusterCfg clusterCfg; SClusterCfg clusterCfg;
SVnodeLoads vnodeLoads; SArray* pVloads; // array of SVnodeLoad
} SStatusReq; } SStatusReq;
typedef struct { int32_t tSerializeSStatusReq(void** buf, SStatusReq* pReq);
int32_t reserved; void* tDeserializeSStatusReq(void* buf, SStatusReq* pReq);
} STransReq;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
@ -652,21 +646,23 @@ typedef struct {
typedef struct { typedef struct {
int32_t id; int32_t id;
int8_t isMnode; int8_t isMnode;
int8_t align;
SEp ep; SEp ep;
} SDnodeEp; } SDnodeEp;
typedef struct { typedef struct {
int32_t num; int32_t mver;
SDnodeEp eps[];
} SDnodeEps;
typedef struct {
int64_t dver; int64_t dver;
SDnodeCfg dnodeCfg; SDnodeCfg dnodeCfg;
SDnodeEps dnodeEps; SArray* pDnodeEps; // Array of SDnodeEp
} SStatusRsp; } SStatusRsp;
int32_t tSerializeSStatusRsp(void** buf, SStatusRsp* pRsp);
void* tDeserializeSStatusRsp(void* buf, SStatusRsp* pRsp);
typedef struct {
int32_t mver;
} STransReq;
typedef struct { typedef struct {
int32_t id; int32_t id;
uint16_t port; // node sync Port uint16_t port; // node sync Port
@ -1207,8 +1203,6 @@ typedef struct {
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
int32_t tSerializeSVCreateTbRsp(void** buf, SVCreateTbRsp* pRsp);
void* tDeserializeSVCreateTbRsp(void* buf, SVCreateTbRsp* pRsp);
typedef struct { typedef struct {
uint64_t ver; // use a general definition uint64_t ver; // use a general definition
@ -1220,8 +1214,6 @@ typedef struct {
int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq); int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq); void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq);
int32_t tSerializeSVCreateTbBatchRsp(void** buf, SVCreateTbBatchRsp* pRsp);
void* tDeserializeSVCreateTbBatchRsp(void* buf, SVCreateTbBatchRsp* pRsp);
typedef struct { typedef struct {
uint64_t ver; uint64_t ver;
@ -1235,8 +1227,6 @@ typedef struct {
int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq); int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq);
void* tDeserializeSVDropTbReq(void* buf, SVDropTbReq* pReq); void* tDeserializeSVDropTbReq(void* buf, SVDropTbReq* pReq);
int32_t tSerializeSVDropTbRsp(void** buf, SVDropTbRsp* pRsp);
void* tDeserializeSVDropTbRsp(void* buf, SVDropTbRsp* pRsp);
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;

View File

@ -459,3 +459,145 @@ void *tDeserializeSMAlterStbReq(void *buf, SMAltertbReq *pReq) {
return buf; return buf;
} }
int32_t tSerializeSStatusReq(void **buf, SStatusReq *pReq) {
int32_t tlen = 0;
// status
tlen += taosEncodeFixedI32(buf, pReq->mver);
tlen += taosEncodeFixedI32(buf, pReq->sver);
tlen += taosEncodeFixedI64(buf, pReq->dver);
tlen += taosEncodeFixedI32(buf, pReq->dnodeId);
tlen += taosEncodeFixedI64(buf, pReq->clusterId);
tlen += taosEncodeFixedI64(buf, pReq->rebootTime);
tlen += taosEncodeFixedI64(buf, pReq->updateTime);
tlen += taosEncodeFixedI32(buf, pReq->numOfCores);
tlen += taosEncodeFixedI32(buf, pReq->numOfSupportVnodes);
tlen += taosEncodeString(buf, pReq->dnodeEp);
// cluster cfg
tlen += taosEncodeFixedI32(buf, pReq->clusterCfg.statusInterval);
tlen += taosEncodeFixedI64(buf, pReq->clusterCfg.checkTime);
tlen += taosEncodeString(buf, pReq->clusterCfg.timezone);
tlen += taosEncodeString(buf, pReq->clusterCfg.locale);
tlen += taosEncodeString(buf, pReq->clusterCfg.charset);
// vnode loads
int32_t vlen = (int32_t)taosArrayGetSize(pReq->pVloads);
tlen += taosEncodeFixedI32(buf, vlen);
for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i);
tlen += taosEncodeFixedI32(buf, pload->vgId);
tlen += taosEncodeFixedI8(buf, pload->role);
tlen += taosEncodeFixedI64(buf, pload->totalStorage);
tlen += taosEncodeFixedI64(buf, pload->compStorage);
tlen += taosEncodeFixedI64(buf, pload->pointsWritten);
tlen += taosEncodeFixedI64(buf, pload->tablesNum);
}
return tlen;
}
void *tDeserializeSStatusReq(void *buf, SStatusReq *pReq) {
// status
buf = taosDecodeFixedI32(buf, &pReq->mver);
buf = taosDecodeFixedI32(buf, &pReq->sver);
buf = taosDecodeFixedI64(buf, &pReq->dver);
buf = taosDecodeFixedI32(buf, &pReq->dnodeId);
buf = taosDecodeFixedI64(buf, &pReq->clusterId);
buf = taosDecodeFixedI64(buf, &pReq->rebootTime);
buf = taosDecodeFixedI64(buf, &pReq->updateTime);
buf = taosDecodeFixedI32(buf, &pReq->numOfCores);
buf = taosDecodeFixedI32(buf, &pReq->numOfSupportVnodes);
buf = taosDecodeStringTo(buf, pReq->dnodeEp);
// cluster cfg
buf = taosDecodeFixedI32(buf, &pReq->clusterCfg.statusInterval);
buf = taosDecodeFixedI64(buf, &pReq->clusterCfg.checkTime);
buf = taosDecodeStringTo(buf, pReq->clusterCfg.timezone);
buf = taosDecodeStringTo(buf, pReq->clusterCfg.locale);
buf = taosDecodeStringTo(buf, pReq->clusterCfg.charset);
// vnode loads
int32_t vlen = 0;
buf = taosDecodeFixedI32(buf, &vlen);
pReq->pVloads = taosArrayInit(vlen, sizeof(SVnodeLoad));
if (pReq->pVloads == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad vload = {0};
buf = taosDecodeFixedI32(buf, &vload.vgId);
buf = taosDecodeFixedI8(buf, &vload.role);
buf = taosDecodeFixedI64(buf, &vload.totalStorage);
buf = taosDecodeFixedI64(buf, &vload.compStorage);
buf = taosDecodeFixedI64(buf, &vload.pointsWritten);
buf = taosDecodeFixedI64(buf, &vload.tablesNum);
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
}
return buf;
}
int32_t tSerializeSStatusRsp(void **buf, SStatusRsp *pRsp) {
int32_t tlen = 0;
// status
tlen += taosEncodeFixedI32(buf, pRsp->mver);
tlen += taosEncodeFixedI64(buf, pRsp->dver);
// dnode cfg
tlen += taosEncodeFixedI32(buf, pRsp->dnodeCfg.dnodeId);
tlen += taosEncodeFixedI64(buf, pRsp->dnodeCfg.clusterId);
// dnode eps
int32_t dlen = (int32_t)taosArrayGetSize(pRsp->pDnodeEps);
tlen += taosEncodeFixedI32(buf, dlen);
for (int32_t i = 0; i < dlen; ++i) {
SDnodeEp *pDnodeEp = taosArrayGet(pRsp->pDnodeEps, i);
tlen += taosEncodeFixedI32(buf, pDnodeEp->id);
tlen += taosEncodeFixedI8(buf, pDnodeEp->isMnode);
tlen += taosEncodeString(buf, pDnodeEp->ep.fqdn);
tlen += taosEncodeFixedU16(buf, pDnodeEp->ep.port);
}
return tlen;
}
void *tDeserializeSStatusRsp(void *buf, SStatusRsp *pRsp) {
// status
buf = taosDecodeFixedI32(buf, &pRsp->mver);
buf = taosDecodeFixedI64(buf, &pRsp->dver);
// cluster cfg
buf = taosDecodeFixedI32(buf, &pRsp->dnodeCfg.dnodeId);
buf = taosDecodeFixedI64(buf, &pRsp->dnodeCfg.clusterId);
// dnode eps
int32_t dlen = 0;
buf = taosDecodeFixedI32(buf, &dlen);
pRsp->pDnodeEps = taosArrayInit(dlen, sizeof(SDnodeEp));
if (pRsp->pDnodeEps == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t i = 0; i < dlen; ++i) {
SDnodeEp dnodeEp = {0};
buf = taosDecodeFixedI32(buf, &dnodeEp.id);
buf = taosDecodeFixedI8(buf, &dnodeEp.isMnode);
buf = taosDecodeStringTo(buf, dnodeEp.ep.fqdn);
buf = taosDecodeFixedU16(buf, &dnodeEp.ep.port);
if (taosArrayPush(pRsp->pDnodeEps, &dnodeEp) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
}
return buf;
}

View File

@ -55,7 +55,7 @@ typedef struct {
SEpSet mnodeEpSet; SEpSet mnodeEpSet;
char *file; char *file;
SHashObj *dnodeHash; SHashObj *dnodeHash;
SDnodeEps *dnodeEps; SArray *pDnodeEps;
pthread_t *threadId; pthread_t *threadId;
SRWLatch latch; SRWLatch latch;
SDnodeWorker mgmtWorker; SDnodeWorker mgmtWorker;

View File

@ -23,7 +23,7 @@ extern "C" {
int32_t dndInitVnodes(SDnode *pDnode); int32_t dndInitVnodes(SDnode *pDnode);
void dndCleanupVnodes(SDnode *pDnode); void dndCleanupVnodes(SDnode *pDnode);
void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pVloads); void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads);
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);

View File

@ -113,35 +113,31 @@ static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
static void dndPrintDnodes(SDnode *pDnode) { static void dndPrintDnodes(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
dDebug("print dnode ep list, num:%d", pMgmt->dnodeEps->num); int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps);
for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) { dDebug("print dnode ep list, num:%d", numOfEps);
SDnodeEp *pEp = &pMgmt->dnodeEps->eps[i]; for (int32_t i = 0; i < numOfEps; i++) {
SDnodeEp *pEp = taosArrayGet(pMgmt->pDnodeEps, i);
dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode); dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode);
} }
} }
static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) { static void dndResetDnodes(SDnode *pDnode, SArray *pDnodeEps) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
int32_t size = sizeof(SDnodeEps) + pDnodeEps->num * sizeof(SDnodeEp); if (pMgmt->pDnodeEps != pDnodeEps) {
if (pDnodeEps->num > pMgmt->dnodeEps->num) { SArray *tmp = pMgmt->pDnodeEps;
SDnodeEps *tmp = calloc(1, size); pMgmt->pDnodeEps = taosArrayDup(pDnodeEps);
if (tmp == NULL) return; taosArrayDestroy(tmp);
tfree(pMgmt->dnodeEps);
pMgmt->dnodeEps = tmp;
}
if (pMgmt->dnodeEps != pDnodeEps) {
memcpy(pMgmt->dnodeEps, pDnodeEps, size);
} }
pMgmt->mnodeEpSet.inUse = 0; pMgmt->mnodeEpSet.inUse = 0;
pMgmt->mnodeEpSet.numOfEps = 0; pMgmt->mnodeEpSet.numOfEps = 0;
int32_t mIndex = 0; int32_t mIndex = 0;
for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) { int32_t numOfEps = (int32_t)taosArrayGetSize(pDnodeEps);
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
for (int32_t i = 0; i < numOfEps; i++) {
SDnodeEp *pDnodeEp = taosArrayGet(pDnodeEps, i);
if (!pDnodeEp->isMnode) continue; if (!pDnodeEp->isMnode) continue;
if (mIndex >= TSDB_MAX_REPLICA) continue; if (mIndex >= TSDB_MAX_REPLICA) continue;
pMgmt->mnodeEpSet.numOfEps++; pMgmt->mnodeEpSet.numOfEps++;
@ -150,8 +146,8 @@ static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) {
mIndex++; mIndex++;
} }
for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) { for (int32_t i = 0; i < numOfEps; i++) {
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; SDnodeEp *pDnodeEp = taosArrayGet(pDnodeEps, i);
taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
} }
@ -178,6 +174,12 @@ static bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp) {
static int32_t dndReadDnodes(SDnode *pDnode) { static int32_t dndReadDnodes(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
pMgmt->pDnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
if (pMgmt->pDnodeEps == NULL) {
dError("failed to calloc dnodeEp array since %s", strerror(errno));
goto PRASE_DNODE_OVER;
}
int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR;
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 256 * 1024; int32_t maxLen = 256 * 1024;
@ -238,18 +240,11 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pMgmt->dnodeEps = calloc(1, numOfDnodes * sizeof(SDnodeEp) + sizeof(SDnodeEps));
if (pMgmt->dnodeEps == NULL) {
dError("failed to calloc dnodeEpList since %s", strerror(errno));
goto PRASE_DNODE_OVER;
}
pMgmt->dnodeEps->num = numOfDnodes;
for (int32_t i = 0; i < numOfDnodes; ++i) { for (int32_t i = 0; i < numOfDnodes; ++i) {
cJSON *node = cJSON_GetArrayItem(dnodes, i); cJSON *node = cJSON_GetArrayItem(dnodes, i);
if (node == NULL) break; if (node == NULL) break;
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; SDnodeEp dnodeEp = {0};
cJSON *did = cJSON_GetObjectItem(node, "id"); cJSON *did = cJSON_GetObjectItem(node, "id");
if (!did || did->type != cJSON_Number) { if (!did || did->type != cJSON_Number) {
@ -257,14 +252,14 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pDnodeEp->id = dnodeId->valueint; dnodeEp.id = dnodeId->valueint;
cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn"); cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s since dnodeFqdn not found", pMgmt->file); dError("failed to read %s since dnodeFqdn not found", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
tstrncpy(pDnodeEp->ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(node, "port"); cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
if (!dnodePort || dnodePort->type != cJSON_Number) { if (!dnodePort || dnodePort->type != cJSON_Number) {
@ -272,14 +267,16 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pDnodeEp->ep.port = dnodePort->valueint; dnodeEp.ep.port = dnodePort->valueint;
cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode"); cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
if (!isMnode || isMnode->type != cJSON_Number) { if (!isMnode || isMnode->type != cJSON_Number) {
dError("failed to read %s since isMnode not found", pMgmt->file); dError("failed to read %s since isMnode not found", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pDnodeEp->isMnode = isMnode->valueint; dnodeEp.isMnode = isMnode->valueint;
taosArrayPush(pMgmt->pDnodeEps, &dnodeEp);
} }
code = 0; code = 0;
@ -296,15 +293,14 @@ PRASE_DNODE_OVER:
return -1; return -1;
} }
if (pMgmt->dnodeEps == NULL) { if (taosArrayGetSize(pMgmt->pDnodeEps) == 0) {
pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); SDnodeEp dnodeEp = {0};
pMgmt->dnodeEps->num = 1; dnodeEp.isMnode = 1;
pMgmt->dnodeEps->eps[0].isMnode = 1; taosGetFqdnPortFromEp(pDnode->cfg.firstEp, &dnodeEp.ep);
taosArrayPush(pMgmt->pDnodeEps, &dnodeEp);
taosGetFqdnPortFromEp(pDnode->cfg.firstEp, &(pMgmt->dnodeEps->eps[0].ep));
} }
dndResetDnodes(pDnode, pMgmt->dnodeEps); dndResetDnodes(pDnode, pMgmt->pDnodeEps);
terrno = 0; terrno = 0;
return 0; return 0;
@ -329,13 +325,15 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId); len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped); len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n"); len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n");
for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) {
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps);
for (int32_t i = 0; i < numOfEps; ++i) {
SDnodeEp *pDnodeEp = taosArrayGet(pMgmt->pDnodeEps, i);
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id); len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn); len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->ep.port); len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->ep.port);
len += snprintf(content + len, maxLen - len, " \"isMnode\": %d\n", pDnodeEp->isMnode); len += snprintf(content + len, maxLen - len, " \"isMnode\": %d\n", pDnodeEp->isMnode);
if (i < pMgmt->dnodeEps->num - 1) { if (i < numOfEps - 1) {
len += snprintf(content + len, maxLen - len, " },{\n"); len += snprintf(content + len, maxLen - len, " },{\n");
} else { } else {
len += snprintf(content + len, maxLen - len, " }]\n"); len += snprintf(content + len, maxLen - len, " }]\n");
@ -355,40 +353,39 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
} }
void dndSendStatusReq(SDnode *pDnode) { void dndSendStatusReq(SDnode *pDnode) {
int32_t contLen = sizeof(SStatusReq) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); SStatusReq req = {0};
SStatusReq *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) {
dError("failed to malloc status message");
return;
}
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
pStatus->sver = htonl(pDnode->env.sver); req.sver = pDnode->env.sver;
pStatus->dver = htobe64(pMgmt->dver); req.dver = pMgmt->dver;
pStatus->dnodeId = htonl(pMgmt->dnodeId); req.dnodeId = pMgmt->dnodeId;
pStatus->clusterId = htobe64(pMgmt->clusterId); req.clusterId = pMgmt->clusterId;
pStatus->rebootTime = htobe64(pMgmt->rebootTime); req.rebootTime = pMgmt->rebootTime;
pStatus->updateTime = htobe64(pMgmt->updateTime); req.updateTime = pMgmt->updateTime;
pStatus->numOfCores = htonl(pDnode->env.numOfCores); req.numOfCores = pDnode->env.numOfCores;
pStatus->numOfSupportVnodes = htonl(pDnode->cfg.numOfSupportVnodes); req.numOfSupportVnodes = pDnode->cfg.numOfSupportVnodes;
tstrncpy(pStatus->dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN); memcpy(req.dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN);
pStatus->clusterCfg.statusInterval = htonl(pDnode->cfg.statusInterval); req.clusterCfg.statusInterval = pDnode->cfg.statusInterval;
pStatus->clusterCfg.checkTime = 0; req.clusterCfg.checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00"; char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); (void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
pStatus->clusterCfg.checkTime = htonl(pStatus->clusterCfg.checkTime); memcpy(req.clusterCfg.timezone, pDnode->env.timezone, TSDB_TIMEZONE_LEN);
tstrncpy(pStatus->clusterCfg.timezone, pDnode->env.timezone, TSDB_TIMEZONE_LEN); memcpy(req.clusterCfg.locale, pDnode->env.locale, TSDB_LOCALE_LEN);
tstrncpy(pStatus->clusterCfg.locale, pDnode->env.locale, TSDB_LOCALE_LEN); memcpy(req.clusterCfg.charset, pDnode->env.charset, TSDB_LOCALE_LEN);
tstrncpy(pStatus->clusterCfg.charset, pDnode->env.charset, TSDB_LOCALE_LEN);
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads); req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad));
contLen = sizeof(SStatusReq) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); dndGetVnodeLoads(pDnode, req.pVloads);
SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; int32_t contLen = tSerializeSStatusReq(NULL, &req);
void *pHead = rpcMallocCont(contLen);
void *pBuf = pHead;
tSerializeSStatusReq(&pBuf, &req);
taosArrayDestroy(req.pVloads);
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527};
pMgmt->statusSent = 1; pMgmt->statusSent = 1;
dTrace("pDnode:%p, send status req to mnode", pDnode); dTrace("pDnode:%p, send status req to mnode", pDnode);
@ -407,18 +404,20 @@ static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
} }
} }
static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) { static void dndUpdateDnodeEps(SDnode *pDnode, SArray *pDnodeEps) {
if (pDnodeEps == NULL || pDnodeEps->num <= 0) return; int32_t numOfEps = taosArrayGetSize(pDnodeEps);
if (numOfEps <= 0) return;
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
if (pDnodeEps->num != pMgmt->dnodeEps->num) { int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps);
if (numOfEps != numOfEpsOld) {
dndResetDnodes(pDnode, pDnodeEps); dndResetDnodes(pDnode, pDnodeEps);
dndWriteDnodes(pDnode); dndWriteDnodes(pDnode);
} else { } else {
int32_t size = pDnodeEps->num * sizeof(SDnodeEp) + sizeof(SDnodeEps); int32_t size = numOfEps * sizeof(SDnodeEp);
if (memcmp(pMgmt->dnodeEps, pDnodeEps, size) != 0) { if (memcmp(pMgmt->pDnodeEps->pData, pDnodeEps->pData, size) != 0) {
dndResetDnodes(pDnode, pDnodeEps); dndResetDnodes(pDnode, pDnodeEps);
dndWriteDnodes(pDnode); dndWriteDnodes(pDnode);
} }
@ -431,33 +430,21 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pRsp->code != TSDB_CODE_SUCCESS) { if (pRsp->code != TSDB_CODE_SUCCESS) {
pMgmt->statusSent = 0;
if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) { if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) {
dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId); dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId);
pMgmt->dropped = 1; pMgmt->dropped = 1;
dndWriteDnodes(pDnode); dndWriteDnodes(pDnode);
} }
return; } else {
SStatusRsp statusRsp = {0};
if (pRsp->pCont != NULL && pRsp->contLen != 0 && tDeserializeSStatusRsp(pRsp->pCont, &statusRsp) != NULL) {
pMgmt->dver = statusRsp.dver;
dndUpdateDnodeCfg(pDnode, &statusRsp.dnodeCfg);
dndUpdateDnodeEps(pDnode, statusRsp.pDnodeEps);
}
taosArrayDestroy(statusRsp.pDnodeEps);
} }
if (pRsp->pCont != NULL && pRsp->contLen != 0) {
SStatusRsp *pStatus = pRsp->pCont;
pMgmt->dver = htobe64(pStatus->dver);
SDnodeCfg *pCfg = &pStatus->dnodeCfg;
pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->clusterId = htobe64(pCfg->clusterId);
dndUpdateDnodeCfg(pDnode, pCfg);
SDnodeEps *pDnodeEps = &pStatus->dnodeEps;
pDnodeEps->num = htonl(pDnodeEps->num);
for (int32_t i = 0; i < pDnodeEps->num; ++i) {
pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id);
pDnodeEps->eps[i].ep.port = htons(pDnodeEps->eps[i].ep.port);
}
dndUpdateDnodeEps(pDnode, pDnodeEps);
}
pMgmt->statusSent = 0; pMgmt->statusSent = 0;
} }
@ -572,9 +559,9 @@ void dndCleanupMgmt(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
if (pMgmt->dnodeEps != NULL) { if (pMgmt->pDnodeEps != NULL) {
free(pMgmt->dnodeEps); taosArrayDestroy(pMgmt->pDnodeEps);
pMgmt->dnodeEps = NULL; pMgmt->pDnodeEps = NULL;
} }
if (pMgmt->dnodeHash != NULL) { if (pMgmt->dnodeHash != NULL) {

View File

@ -1008,11 +1008,10 @@ void dndCleanupVnodes(SDnode *pDnode) {
dInfo("dnode-vnodes is cleaned up"); dInfo("dnode-vnodes is cleaned up");
} }
void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) { void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
pLoads->num = taosHashGetSize(pMgmt->hash);
int32_t v = 0; int32_t v = 0;
void *pIter = taosHashIterate(pMgmt->hash, NULL); void *pIter = taosHashIterate(pMgmt->hash, NULL);
@ -1021,14 +1020,9 @@ void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) {
if (ppVnode == NULL || *ppVnode == NULL) continue; if (ppVnode == NULL || *ppVnode == NULL) continue;
SVnodeObj *pVnode = *ppVnode; SVnodeObj *pVnode = *ppVnode;
SVnodeLoad *pLoad = &pLoads->data[v++]; SVnodeLoad vload = {0};
vnodeGetLoad(pVnode->pImpl, &vload);
vnodeGetLoad(pVnode->pImpl, pLoad); taosArrayPush(pLoads, &vload);
pLoad->vgId = htonl(pLoad->vgId);
pLoad->totalStorage = htobe64(pLoad->totalStorage);
pLoad->compStorage = htobe64(pLoad->compStorage);
pLoad->pointsWritten = htobe64(pLoad->pointsWritten);
pLoad->tablesNum = htobe64(pLoad->tablesNum);
pIter = taosHashIterate(pMgmt->hash, pIter); pIter = taosHashIterate(pMgmt->hash, pIter);
} }

View File

@ -244,7 +244,7 @@ bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) {
return true; return true;
} }
static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t maxEps) { static void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfEps = 0; int32_t numOfEps = 0;
@ -253,25 +253,20 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t maxEps) {
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break; if (pIter == NULL) break;
if (numOfEps >= maxEps) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pDnode);
break;
}
SDnodeEp *pEp = &pEps->eps[numOfEps]; SDnodeEp dnodeEp = {0};
pEp->id = htonl(pDnode->id); dnodeEp.id = pDnode->id;
pEp->ep.port = htons(pDnode->port); dnodeEp.isMnode = 0;
memcpy(pEp->ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN); dnodeEp.ep.port = pDnode->port;
pEp->isMnode = 0; memcpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
if (mndIsMnode(pMnode, pDnode->id)) { if (mndIsMnode(pMnode, pDnode->id)) {
pEp->isMnode = 1; dnodeEp.isMnode = 1;
}
numOfEps++;
sdbRelease(pSdb, pDnode);
} }
pEps->num = htonl(numOfEps); sdbRelease(pSdb, pDnode);
taosArrayPush(pDnodeEps, &dnodeEp);
}
} }
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
@ -299,50 +294,29 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
return 0; return 0;
} }
static void mndParseStatusMsg(SStatusReq *pStatus) {
pStatus->sver = htonl(pStatus->sver);
pStatus->dver = htobe64(pStatus->dver);
pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->clusterId = htobe64(pStatus->clusterId);
pStatus->rebootTime = htobe64(pStatus->rebootTime);
pStatus->updateTime = htobe64(pStatus->updateTime);
pStatus->numOfCores = htonl(pStatus->numOfCores);
pStatus->numOfSupportVnodes = htonl(pStatus->numOfSupportVnodes);
pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval);
pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime);
for (int32_t v = 0; v < pStatus->vnodeLoads.num; ++v) {
SVnodeLoad *pVload = &pStatus->vnodeLoads.data[v];
pVload->vgId = htonl(pVload->vgId);
pVload->totalStorage = htobe64(pVload->totalStorage);
pVload->compStorage = htobe64(pVload->compStorage);
pVload->pointsWritten = htobe64(pVload->pointsWritten);
pVload->tablesNum = htobe64(pVload->tablesNum);
}
}
static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SStatusReq *pStatus = pReq->rpcMsg.pCont; SStatusReq statusReq = {0};
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
int32_t code = -1; int32_t code = -1;
mndParseStatusMsg(pStatus); if (tDeserializeSStatusReq(pReq->rpcMsg.pCont, &statusReq) == NULL) goto PROCESS_STATUS_MSG_OVER;
if (pStatus->dnodeId == 0) { if (statusReq.dnodeId == 0) {
pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
if (pDnode == NULL) { if (pDnode == NULL) {
mDebug("dnode:%s, not created yet", pStatus->dnodeEp); mDebug("dnode:%s, not created yet", statusReq.dnodeEp);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
goto PROCESS_STATUS_MSG_OVER; goto PROCESS_STATUS_MSG_OVER;
} }
} else { } else {
pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId); pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
if (pDnode == NULL) { if (pDnode == NULL) {
pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
if (pDnode != NULL) { if (pDnode != NULL) {
pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH; pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
} }
mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp); mError("dnode:%d, %s not exist", statusReq.dnodeId, statusReq.dnodeEp);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
goto PROCESS_STATUS_MSG_OVER; goto PROCESS_STATUS_MSG_OVER;
} }
@ -350,28 +324,28 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
int64_t curMs = taosGetTimestampMs(); int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
bool dnodeChanged = (pStatus->dver != sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); bool dnodeChanged = (statusReq.dver != sdbGetTableVer(pMnode->pSdb, SDB_DNODE));
bool reboot = (pDnode->rebootTime != pStatus->rebootTime); bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
bool needCheck = !online || dnodeChanged || reboot; bool needCheck = !online || dnodeChanged || reboot;
if (needCheck) { if (needCheck) {
if (pStatus->sver != pMnode->cfg.sver) { if (statusReq.sver != pMnode->cfg.sver) {
if (pDnode != NULL) { if (pDnode != NULL) {
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
} }
mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver); mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, pMnode->cfg.sver);
terrno = TSDB_CODE_MND_INVALID_MSG_VERSION; terrno = TSDB_CODE_MND_INVALID_MSG_VERSION;
goto PROCESS_STATUS_MSG_OVER; goto PROCESS_STATUS_MSG_OVER;
} }
if (pStatus->dnodeId == 0) { if (statusReq.dnodeId == 0) {
mDebug("dnode:%d, %s first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId); mDebug("dnode:%d, %s first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
} else { } else {
if (pStatus->clusterId != pMnode->clusterId) { if (statusReq.clusterId != pMnode->clusterId) {
if (pDnode != NULL) { if (pDnode != NULL) {
pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
} }
mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
pMnode->clusterId); pMnode->clusterId);
terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID; terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
goto PROCESS_STATUS_MSG_OVER; goto PROCESS_STATUS_MSG_OVER;
@ -382,7 +356,7 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
} }
// Verify whether the cluster parameters are consistent when status change from offline to ready // Verify whether the cluster parameters are consistent when status change from offline to ready
int32_t ret = mndCheckClusterCfgPara(pMnode, &pStatus->clusterCfg); int32_t ret = mndCheckClusterCfgPara(pMnode, &statusReq.clusterCfg);
if (0 != ret) { if (0 != ret) {
pDnode->offlineReason = ret; pDnode->offlineReason = ret;
mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]); mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]);
@ -396,25 +370,30 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
mDebug("dnode:%d, send dnode eps", pDnode->id); mDebug("dnode:%d, send dnode eps", pDnode->id);
} }
pDnode->rebootTime = pStatus->rebootTime; pDnode->rebootTime = statusReq.rebootTime;
pDnode->numOfCores = pStatus->numOfCores; pDnode->numOfCores = statusReq.numOfCores;
pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes; pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
int32_t numOfEps = mndGetDnodeSize(pMnode); SStatusRsp statusRsp = {0};
int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp); statusRsp.dver = sdbGetTableVer(pMnode->pSdb, SDB_DNODE);
SStatusRsp *pRsp = rpcMallocCont(contLen); statusRsp.dnodeCfg.dnodeId = pDnode->id;
if (pRsp == NULL) { statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
if (statusRsp.pDnodeEps == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto PROCESS_STATUS_MSG_OVER; goto PROCESS_STATUS_MSG_OVER;
} }
pRsp->dver = htobe64(sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); mndGetDnodeData(pMnode, statusRsp.pDnodeEps);
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId); int32_t contLen = tSerializeSStatusRsp(NULL, &statusRsp);
mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); void *pHead = rpcMallocCont(contLen);
void *pBuf = pHead;
tSerializeSStatusRsp(&pBuf, &statusRsp);
taosArrayDestroy(statusRsp.pDnodeEps);
pReq->contLen = contLen; pReq->contLen = contLen;
pReq->pCont = pRsp; pReq->pCont = pHead;
} }
pDnode->lastAccessTime = curMs; pDnode->lastAccessTime = curMs;
@ -422,6 +401,7 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
PROCESS_STATUS_MSG_OVER: PROCESS_STATUS_MSG_OVER:
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
taosArrayDestroy(statusReq.pVloads);
return code; return code;
} }

View File

@ -592,8 +592,8 @@ CREATE_STB_OVER:
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
mndReleaseStb(pMnode, pTopicStb); mndReleaseStb(pMnode, pTopicStb);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
taosArrayClear(createReq.pColumns); taosArrayDestroy(createReq.pColumns);
taosArrayClear(createReq.pTags); taosArrayDestroy(createReq.pTags);
return code; return code;
} }
@ -1049,7 +1049,7 @@ ALTER_STB_OVER:
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
taosArrayClear(alterReq.pFields); taosArrayDestroy(alterReq.pFields);
return code; return code;
} }

View File

@ -1383,7 +1383,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
static void rpcProcessConnError(void *param, void *id) { static void rpcProcessConnError(void *param, void *id) {
SRpcReqContext *pContext = (SRpcReqContext *)param; SRpcReqContext *pContext = (SRpcReqContext *)param;
SRpcInfo * pRpc = pContext->pRpc; SRpcInfo * pRpc = pContext->pRpc;
SRpcMsg rpcMsg; SRpcMsg rpcMsg = {0};
if (pRpc == NULL) { if (pRpc == NULL) {
return; return;