add rpc update interface

This commit is contained in:
yihaoDeng 2023-09-06 21:36:16 +08:00
parent ac8220012c
commit ad5639df89
7 changed files with 117 additions and 9 deletions

View File

@ -909,6 +909,13 @@ int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq
int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq);
typedef struct {
int64_t ipWhiteVer;
} SRetrieveIpWhiteReq;
int32_t tSerializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq);
int32_t tDeserializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq);
typedef struct {
int8_t alterType;
int8_t superUser;
@ -1469,7 +1476,6 @@ typedef struct {
SClusterCfg clusterCfg;
SArray* pVloads; // array of SVnodeLoad
int32_t statusSeq;
int64_t ipWhiteVer;
} SStatusReq;
int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);

View File

@ -177,6 +177,7 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE, "retrieve_ip_white", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)

View File

@ -1117,7 +1117,6 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, reserved) < 0) return -1;
}
if (tEncodeI64(&encoder, pReq->ipWhiteVer) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -1229,8 +1228,6 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
}
}
if (tDecodeI64(&decoder, &pReq->ipWhiteVer) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
@ -1487,6 +1484,31 @@ void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
// impl later
return;
}
int32_t tSerializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->ipWhiteVer) < 0) {
return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
// impl later
if (tDecodeI64(&decoder, &pReq->ipWhiteVer) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq) {
SEncoder encoder = {0};

View File

@ -30,7 +30,30 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
taosThreadRwlockUnlock(&pMgmt->pData->lock);
}
}
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
if (pMgmt->ipWhiteVer == ver) {
return;
}
int64_t oldVer = pMgmt->ipWhiteVer;
pMgmt->ipWhiteVer = ver;
SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer};
int32_t contLen = tSerializeRetrieveIpWhite(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
tSerializeRetrieveIpWhite(pHead, contLen, &req);
SRpcMsg rpcMsg = {.pCont = pHead,
.contLen = contLen,
.msgType = TDMT_MND_RETRIEVE_IP_WHITE,
.info.ahandle = (void *)0x9527,
.info.refId = 0,
.info.noResp = 0};
SEpSet epset = {0};
dmGetMnodeEpSet(pMgmt->pData, &epset);
rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
}
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
const STraceId *trace = &pRsp->info.traceId;
dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
@ -55,10 +78,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
}
if (pMgmt->ipWhiteVer != statusRsp.ipWhiteVer) {
//
}
dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
}
tFreeSStatusRsp(&statusRsp);
}

View File

@ -55,6 +55,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp;
pMgmt->ipWhiteVer = -1;
if (dmStartWorker(pMgmt) != 0) {
return -1;
}

View File

@ -161,6 +161,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_PAUSE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RESUME_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_CREATE_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -55,6 +55,7 @@ static void mndCancelGetNextUser(SMnode *pMnode, void *pIter);
static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextPrivileges(SMnode *pMnode, void *pIter);
SHashObj *mndFetchAllIpWhite(SMnode *pMnode);
static int32_t mndProcesSRetrieveIpWhiteReq(SRpcMsg *pReq);
typedef struct {
SHashObj *pIpWhiteList;
@ -124,8 +125,34 @@ int64_t mndGetIpWhiteVer(SMnode *pMnode) {
taosThreadRwlockUnlock(&ipWhiteMgt.rw);
return ver;
}
int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) {
taosThreadRwlockWrlock(&ipWhiteMgt.rw);
int32_t num = taosHashGetSize(ipWhiteMgt.pIpWhiteList);
pUpdate->pUserIpWhite = taosMemoryCalloc(1, num * sizeof(SUpdateUserIpWhite));
void *pIter = taosHashIterate(ipWhiteMgt.pIpWhiteList, NULL);
int32_t i = 0;
while (pIter) {
SUpdateUserIpWhite *pUser = &pUpdate->pUserIpWhite[i];
SIpWhiteList *list = *(SIpWhiteList **)pIter;
// int64_t ipWhiteMgt
size_t klen;
char *key = taosHashGetKey(pIter, &klen);
if (list->num != 0) {
taosHashIterate(ipWhiteMgt.pIpWhiteList, pIter);
memcpy(pUser->user, key, klen);
pUser->numOfRange = list->num;
pUser->pIpRanges = taosMemoryCalloc(1, list->num * sizeof(SIpV4Range));
memcpy(pUser->pIpRanges, list->pIpRange, list->num * sizeof(SIpV4Range));
i++;
}
taosHashIterate(ipWhiteMgt.pIpWhiteList, pIter);
}
pUpdate->numOfUser = i;
taosThreadRwlockUnlock(&ipWhiteMgt.rw);
return 0;
}
void destroyIpWhiteTab(SHashObj *pIpWhiteTab) {
if (pIpWhiteTab == NULL) return;
@ -174,6 +201,7 @@ int32_t mndInitUser(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_USER, mndProcessAlterUserReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_USER, mndProcessDropUserReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_USER_AUTH, mndProcessGetUserAuthReq);
mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_IP_WHITE, mndProcesSRetrieveIpWhiteReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_USER, mndRetrieveUsers);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_USER, mndCancelGetNextUser);
@ -970,6 +998,34 @@ _OVER:
tFreeSCreateUserReq(&createReq);
return code;
}
int32_t mndProcesSRetrieveIpWhiteReq(SRpcMsg *pReq) {
// impl later
SRetrieveIpWhiteReq req = {0};
if (tDeserializeRetrieveIpWhite(pReq->pCont, pReq->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
SUpdateIpWhite ipWhite;
int32_t len = tSerializeSUpdateIpWhite(NULL, 0, &ipWhite);
void *pRsp = rpcMallocCont(len);
tSerializeSUpdateIpWhite(pRsp, len, &ipWhite);
if (req.ipWhiteVer == 0) {
pReq->info.rsp = pRsp;
pReq->info.rspLen = len;
} else {
pReq->info.rsp = pRsp;
pReq->info.rspLen = len;
}
tFreeSUpdateIpWhiteReq(&ipWhite);
return 0;
_OVER:
return -1;
}
static int32_t mndAlterUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SRpcMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "alter-user");