add rpc update interface

This commit is contained in:
yihaoDeng 2023-09-11 10:45:45 +08:00
parent 30a60be214
commit 2a30e12547
5 changed files with 18 additions and 14 deletions

View File

@ -1485,6 +1485,7 @@ typedef struct {
SClusterCfg clusterCfg; SClusterCfg clusterCfg;
SArray* pVloads; // array of SVnodeLoad SArray* pVloads; // array of SVnodeLoad
int32_t statusSeq; int32_t statusSeq;
int64_t ipWhiteVer;
} SStatusReq; } SStatusReq;
int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);

View File

@ -1117,6 +1117,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, reserved) < 0) return -1; if (tEncodeI64(&encoder, reserved) < 0) return -1;
} }
if (tEncodeI64(&encoder, pReq->ipWhiteVer) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -1227,6 +1228,9 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &reserved) < 0) return -1; if (tDecodeI64(&decoder, &reserved) < 0) return -1;
} }
} }
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI64(&decoder, &pReq->ipWhiteVer) < 0) return -1;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
@ -1304,7 +1308,9 @@ int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
} }
if (tDecodeI32(&decoder, &pRsp->statusSeq) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->statusSeq) < 0) return -1;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI64(&decoder, &pRsp->ipWhiteVer) < 0) return -1; if (tDecodeI64(&decoder, &pRsp->ipWhiteVer) < 0) return -1;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;

View File

@ -136,6 +136,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
pMgmt->statusSeq++; pMgmt->statusSeq++;
req.statusSeq = pMgmt->statusSeq; req.statusSeq = pMgmt->statusSeq;
req.ipWhiteVer = pMgmt->ipWhiteVer;
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen); void *pHead = rpcMallocCont(contLen);

View File

@ -527,7 +527,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
} }
} }
} }
int64_t ver = mndGetIpWhiteVer(pMnode);
pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
int64_t curMs = taosGetTimestampMs(); int64_t curMs = taosGetTimestampMs();
@ -535,9 +536,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer); bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
bool reboot = (pDnode->rebootTime != statusReq.rebootTime); bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
bool supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes; bool supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || pMnode->ipWhiteVer != ver; bool needCheck =
!online || dnodeChanged || reboot || supportVnodesChanged || pMnode->ipWhiteVer != statusReq.ipWhiteVer;
pMnode->ipWhiteVer = ver;
const STraceId *trace = &pReq->info.traceId; const STraceId *trace = &pReq->info.traceId;
mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id, mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
@ -658,7 +658,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
} }
mndGetDnodeEps(pMnode, statusRsp.pDnodeEps); mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
statusRsp.ipWhiteVer = ver; statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp); int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
void *pHead = rpcMallocCont(contLen); void *pHead = rpcMallocCont(contLen);

View File

@ -66,7 +66,7 @@ static SIpWhiteMgt ipWhiteMgt;
void ipWhiteMgtInit() { void ipWhiteMgtInit() {
ipWhiteMgt.pIpWhiteTab = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 1, HASH_ENTRY_LOCK); ipWhiteMgt.pIpWhiteTab = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 1, HASH_ENTRY_LOCK);
ipWhiteMgt.ver = 0; ipWhiteMgt.ver = taosGetTimestampMs();
taosThreadRwlockInit(&ipWhiteMgt.rw, NULL); taosThreadRwlockInit(&ipWhiteMgt.rw, NULL);
} }
void ipWhiteMgtCleanup() { void ipWhiteMgtCleanup() {
@ -1194,13 +1194,9 @@ int32_t mndProcesSRetrieveIpWhiteReq(SRpcMsg *pReq) {
void *pRsp = rpcMallocCont(len); void *pRsp = rpcMallocCont(len);
tSerializeSUpdateIpWhite(pRsp, len, &ipWhite); tSerializeSUpdateIpWhite(pRsp, len, &ipWhite);
if (req.ipWhiteVer == 0) {
pReq->info.rsp = pRsp; pReq->info.rsp = pRsp;
pReq->info.rspLen = len; pReq->info.rspLen = len;
} else { //}
pReq->info.rsp = pRsp;
pReq->info.rspLen = len;
}
tFreeSUpdateIpWhiteReq(&ipWhite); tFreeSUpdateIpWhiteReq(&ipWhite);
return 0; return 0;