diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 70fa864853..406e5fcda7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -909,6 +909,13 @@ int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq); +typedef struct { + int64_t ipWhiteVer; +} SRetrieveIpWhiteReq; + +int32_t tSerializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq); +int32_t tDeserializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq); + typedef struct { int8_t alterType; int8_t superUser; @@ -1469,7 +1476,6 @@ typedef struct { SClusterCfg clusterCfg; SArray* pVloads; // array of SVnodeLoad int32_t statusSeq; - int64_t ipWhiteVer; } SStatusReq; int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index fb2c780724..6e3893ce42 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -177,6 +177,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE, "retrieve_ip_white", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 6957c70706..8ab9b5624d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1117,7 +1117,6 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tEncodeI64(&encoder, reserved) < 0) return -1; } - if (tEncodeI64(&encoder, pReq->ipWhiteVer) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1229,8 +1228,6 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { } } - if (tDecodeI64(&decoder, &pReq->ipWhiteVer) < 0) return -1; - tEndDecode(&decoder); tDecoderClear(&decoder); return 0; @@ -1487,6 +1484,31 @@ void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) { // impl later return; } +int32_t tSerializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeI64(&encoder, pReq->ipWhiteVer) < 0) { + return -1; + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} +int32_t tDeserializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + // impl later + if (tDecodeI64(&decoder, &pReq->ipWhiteVer) < 0) return -1; + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} int32_t tSerializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq) { SEncoder encoder = {0}; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index ef474f0cde..f854fe9cac 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -30,7 +30,30 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { taosThreadRwlockUnlock(&pMgmt->pData->lock); } } +static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) { + if (pMgmt->ipWhiteVer == ver) { + return; + } + int64_t oldVer = pMgmt->ipWhiteVer; + pMgmt->ipWhiteVer = ver; + SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer}; + int32_t contLen = tSerializeRetrieveIpWhite(NULL, 0, &req); + void *pHead = rpcMallocCont(contLen); + tSerializeRetrieveIpWhite(pHead, contLen, &req); + + SRpcMsg rpcMsg = {.pCont = pHead, + .contLen = contLen, + .msgType = TDMT_MND_RETRIEVE_IP_WHITE, + .info.ahandle = (void *)0x9527, + .info.refId = 0, + .info.noResp = 0}; + SEpSet epset = {0}; + + dmGetMnodeEpSet(pMgmt->pData, &epset); + + rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL); +} static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { const STraceId *trace = &pRsp->info.traceId; dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code); @@ -55,10 +78,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps); } - - if (pMgmt->ipWhiteVer != statusRsp.ipWhiteVer) { - // - } + dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer); } tFreeSStatusRsp(&statusRsp); } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 09783a5ea9..ab3852cd0c 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -55,6 +55,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp; pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp; + pMgmt->ipWhiteVer = -1; if (dmStartWorker(pMgmt) != 0) { return -1; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 4c43326959..6bbe75c254 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -161,6 +161,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_PAUSE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RESUME_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + + if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_CREATE_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 0c89b12734..0770f4a570 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -55,6 +55,7 @@ static void mndCancelGetNextUser(SMnode *pMnode, void *pIter); static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextPrivileges(SMnode *pMnode, void *pIter); SHashObj *mndFetchAllIpWhite(SMnode *pMnode); +static int32_t mndProcesSRetrieveIpWhiteReq(SRpcMsg *pReq); typedef struct { SHashObj *pIpWhiteList; @@ -124,8 +125,34 @@ int64_t mndGetIpWhiteVer(SMnode *pMnode) { taosThreadRwlockUnlock(&ipWhiteMgt.rw); return ver; } +int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) { + taosThreadRwlockWrlock(&ipWhiteMgt.rw); + int32_t num = taosHashGetSize(ipWhiteMgt.pIpWhiteList); + pUpdate->pUserIpWhite = taosMemoryCalloc(1, num * sizeof(SUpdateUserIpWhite)); + void *pIter = taosHashIterate(ipWhiteMgt.pIpWhiteList, NULL); + int32_t i = 0; + while (pIter) { + SUpdateUserIpWhite *pUser = &pUpdate->pUserIpWhite[i]; + SIpWhiteList *list = *(SIpWhiteList **)pIter; -// int64_t ipWhiteMgt + size_t klen; + char *key = taosHashGetKey(pIter, &klen); + if (list->num != 0) { + taosHashIterate(ipWhiteMgt.pIpWhiteList, pIter); + memcpy(pUser->user, key, klen); + + pUser->numOfRange = list->num; + pUser->pIpRanges = taosMemoryCalloc(1, list->num * sizeof(SIpV4Range)); + memcpy(pUser->pIpRanges, list->pIpRange, list->num * sizeof(SIpV4Range)); + i++; + } + taosHashIterate(ipWhiteMgt.pIpWhiteList, pIter); + } + pUpdate->numOfUser = i; + + taosThreadRwlockUnlock(&ipWhiteMgt.rw); + return 0; +} void destroyIpWhiteTab(SHashObj *pIpWhiteTab) { if (pIpWhiteTab == NULL) return; @@ -174,6 +201,7 @@ int32_t mndInitUser(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_ALTER_USER, mndProcessAlterUserReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_USER, mndProcessDropUserReq); mndSetMsgHandle(pMnode, TDMT_MND_GET_USER_AUTH, mndProcessGetUserAuthReq); + mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_IP_WHITE, mndProcesSRetrieveIpWhiteReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_USER, mndRetrieveUsers); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_USER, mndCancelGetNextUser); @@ -970,6 +998,34 @@ _OVER: tFreeSCreateUserReq(&createReq); return code; } +int32_t mndProcesSRetrieveIpWhiteReq(SRpcMsg *pReq) { + // impl later + SRetrieveIpWhiteReq req = {0}; + if (tDeserializeRetrieveIpWhite(pReq->pCont, pReq->contLen, &req) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + SUpdateIpWhite ipWhite; + + int32_t len = tSerializeSUpdateIpWhite(NULL, 0, &ipWhite); + + void *pRsp = rpcMallocCont(len); + tSerializeSUpdateIpWhite(pRsp, len, &ipWhite); + + if (req.ipWhiteVer == 0) { + pReq->info.rsp = pRsp; + pReq->info.rspLen = len; + } else { + pReq->info.rsp = pRsp; + pReq->info.rspLen = len; + } + tFreeSUpdateIpWhiteReq(&ipWhite); + + return 0; +_OVER: + return -1; +} static int32_t mndAlterUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SRpcMsg *pReq) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "alter-user");