add rpc update interface

This commit is contained in:
yihaoDeng 2023-09-06 17:22:30 +08:00
parent b5d40d01bb
commit ac8220012c
7 changed files with 36 additions and 18 deletions

View File

@ -1469,6 +1469,7 @@ typedef struct {
SClusterCfg clusterCfg; SClusterCfg clusterCfg;
SArray* pVloads; // array of SVnodeLoad SArray* pVloads; // array of SVnodeLoad
int32_t statusSeq; int32_t statusSeq;
int64_t ipWhiteVer;
} SStatusReq; } SStatusReq;
int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
@ -1499,6 +1500,7 @@ typedef struct {
SDnodeCfg dnodeCfg; SDnodeCfg dnodeCfg;
SArray* pDnodeEps; // Array of SDnodeEp SArray* pDnodeEps; // Array of SDnodeEp
int32_t statusSeq; int32_t statusSeq;
int64_t ipWhiteVer;
} SStatusRsp; } SStatusRsp;
int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);

View File

@ -1116,6 +1116,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, reserved) < 0) return -1; if (tEncodeI64(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1; if (tEncodeI64(&encoder, reserved) < 0) return -1;
} }
if (tEncodeI64(&encoder, pReq->ipWhiteVer) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -1226,6 +1228,9 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &reserved) < 0) return -1; if (tDecodeI64(&decoder, &reserved) < 0) return -1;
} }
} }
if (tDecodeI64(&decoder, &pReq->ipWhiteVer) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
@ -1258,6 +1263,8 @@ int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
} }
if (tEncodeI32(&encoder, pRsp->statusSeq) < 0) return -1; if (tEncodeI32(&encoder, pRsp->statusSeq) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->ipWhiteVer) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -1300,6 +1307,7 @@ int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
} }
if (tDecodeI32(&decoder, &pRsp->statusSeq) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->statusSeq) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->ipWhiteVer) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;

View File

@ -23,22 +23,23 @@ extern "C" {
#endif #endif
typedef struct SDnodeMgmt { typedef struct SDnodeMgmt {
SDnodeData *pData; SDnodeData *pData;
SMsgCb msgCb; SMsgCb msgCb;
const char *path; const char *path;
const char *name; const char *name;
TdThread statusThread; TdThread statusThread;
TdThread monitorThread; TdThread monitorThread;
TdThread crashReportThread; TdThread crashReportThread;
SSingleWorker mgmtWorker; SSingleWorker mgmtWorker;
ProcessCreateNodeFp processCreateNodeFp; ProcessCreateNodeFp processCreateNodeFp;
ProcessAlterNodeTypeFp processAlterNodeTypeFp; ProcessAlterNodeTypeFp processAlterNodeTypeFp;
ProcessDropNodeFp processDropNodeFp; ProcessDropNodeFp processDropNodeFp;
SendMonitorReportFp sendMonitorReportFp; SendMonitorReportFp sendMonitorReportFp;
GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsFp;
GetMnodeLoadsFp getMnodeLoadsFp; GetMnodeLoadsFp getMnodeLoadsFp;
GetQnodeLoadsFp getQnodeLoadsFp; GetQnodeLoadsFp getQnodeLoadsFp;
int32_t statusSeq; int32_t statusSeq;
int64_t ipWhiteVer;
} SDnodeMgmt; } SDnodeMgmt;
// dmHandle.c // dmHandle.c

View File

@ -55,6 +55,10 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps); dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
} }
if (pMgmt->ipWhiteVer != statusRsp.ipWhiteVer) {
//
}
} }
tFreeSStatusRsp(&statusRsp); tFreeSStatusRsp(&statusRsp);
} }

View File

@ -38,8 +38,10 @@ int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int3
int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db); int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db);
int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic); int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic);
int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew); int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew);
void mndUserFreeObj(SUserObj *pUser); void mndUserFreeObj(SUserObj *pUser);
int64_t mndGetIpWhiteVer(SMnode *pMnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -645,6 +645,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
} }
mndGetDnodeEps(pMnode, statusRsp.pDnodeEps); mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
statusRsp.ipWhiteVer = mndGetIpWhiteVer(pMnode);
int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp); int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
void *pHead = rpcMallocCont(contLen); void *pHead = rpcMallocCont(contLen);

View File

@ -113,7 +113,7 @@ void ipWhiteMgtUpdate2(SMnode *pMnode) {
taosThreadRwlockUnlock(&ipWhiteMgt.rw); taosThreadRwlockUnlock(&ipWhiteMgt.rw);
} }
int64_t ipWhiteMgtGetVer(SMnode *pMnode) { int64_t mndGetIpWhiteVer(SMnode *pMnode) {
taosThreadRwlockWrlock(&ipWhiteMgt.rw); taosThreadRwlockWrlock(&ipWhiteMgt.rw);
int64_t ver = ipWhiteMgt.ver; int64_t ver = ipWhiteMgt.ver;
if (ver == 0) { if (ver == 0) {