From b5d40d01bb96c766909e5604618271850d7ba41c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 6 Sep 2023 16:24:29 +0800 Subject: [PATCH] add rpc update interface --- include/common/tmsg.h | 14 +++ source/common/src/tmsg.c | 60 +++++++++++ source/dnode/mnode/impl/src/mndUser.c | 137 ++++++++++++++++++++++++-- source/libs/transport/src/transSvr.c | 2 +- 4 files changed, 206 insertions(+), 7 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bb843ced91..e5ec7f48c3 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -895,6 +895,20 @@ int32_t tSerializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq int32_t tDeserializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq); void tFreeSCreateUserReq(SCreateUserReq* pReq); +typedef struct { + char user[TSDB_USER_LEN]; + int32_t numOfRange; + SIpV4Range* pIpRanges; +} SUpdateUserIpWhite; +typedef struct { + int numOfUser; + SUpdateUserIpWhite* pUserIpWhite; +} SUpdateIpWhite; + +int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); +int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); +void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq); + typedef struct { int8_t alterType; int8_t superUser; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 0553f73bb3..e581a2d5df 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1420,6 +1420,66 @@ int32_t tDeserializeSCreateUserReq(void *buf, int32_t bufLen, SCreateUserReq *pR void tFreeSCreateUserReq(SCreateUserReq *pReq) { taosMemoryFree(pReq->pIpRanges); } +int32_t tSerializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pReq) { + // impl later + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeI32(&encoder, pReq->numOfUser) < 0) return -1; + for (int i = 0; i < pReq->numOfUser; i++) { + SUpdateUserIpWhite *pUser = &(pReq->pUserIpWhite[i]); + if (tEncodeCStr(&encoder, pUser->user) < 0) return -1; + if (tEncodeI32(&encoder, pUser->numOfRange) < 0) return -1; + for (int j = 0; j < pUser->numOfRange; j++) { + SIpV4Range *pRange = &pUser->pIpRanges[j]; + if (tEncodeU32(&encoder, pRange->ip) < 0) return -1; + if (tEncodeU32(&encoder, pRange->mask) < 0) return -1; + } + } + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} +int32_t tDeserializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + // impl later + if (tDecodeI32(&decoder, &pReq->numOfUser) < 0) return -1; + + pReq->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser); + for (int i = 0; i < pReq->numOfUser; i++) { + SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i]; + if (tDecodeCStrTo(&decoder, pUserWhite->user) < 0) return -1; + if (tDecodeI32(&decoder, &pUserWhite->numOfRange) < 0) return -1; + + pUserWhite->pIpRanges = taosMemoryCalloc(1, pUserWhite->numOfRange * sizeof(SIpV4Range)); + for (int j = 0; j < pUserWhite->numOfRange; j++) { + SIpV4Range *pRange = &pUserWhite->pIpRanges[j]; + if (tDecodeU32(&decoder, &pRange->ip) < 0) return -1; + if (tDecodeU32(&decoder, &pRange->mask) < 0) return -1; + } + } + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} +void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) { + for (int i = 0; i < pReq->numOfUser; i++) { + SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i]; + taosMemoryFree(pUserWhite->pIpRanges); + } + taosMemoryFree(pReq->pUserIpWhite); + // impl later + return; +} + int32_t tSerializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 5a40fb1e30..1a0134ea46 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -31,6 +31,15 @@ #define USER_VER_NUMBER 5 #define USER_RESERVE_SIZE 64 +static SIpWhiteList *cloneIpWhiteList(SIpWhiteList *pIpWhiteList); +static SIpWhiteList *createDefaultIpWhiteList(); +SIpWhiteList *createIpWhiteList(void *buf, int32_t len); +static bool updateIpWhiteList(SIpWhiteList *pOld, SIpWhiteList *pNew); +static bool isIpWhiteListEqual(SIpWhiteList *a, SIpWhiteList *b); +static bool isIpRangeEqual(SIpV4Range *a, SIpV4Range *b); + +void destroyIpWhiteTab(SHashObj *pIpWhiteTab); + static int32_t mndCreateDefaultUsers(SMnode *pMnode); static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw); static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser); @@ -45,8 +54,111 @@ static int32_t mndRetrieveUsers(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB static void mndCancelGetNextUser(SMnode *pMnode, void *pIter); static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextPrivileges(SMnode *pMnode, void *pIter); +SHashObj *mndFetchAllIpWhite(SMnode *pMnode); + +typedef struct { + SHashObj *pIpWhiteList; + int64_t ver; + TdThreadRwlock rw; +} SIpWhiteMgt; + +static SIpWhiteMgt ipWhiteMgt; + +void ipWhiteMgtInit() { + ipWhiteMgt.pIpWhiteList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 1, HASH_ENTRY_LOCK); + ipWhiteMgt.ver = 0; + taosThreadRwlockInit(&ipWhiteMgt.rw, NULL); +} +void ipWhiteMgtCleanup() { + destroyIpWhiteTab(ipWhiteMgt.pIpWhiteList); + taosThreadRwlockDestroy(&ipWhiteMgt.rw); +} + +int32_t ipWhiteMgtUpdate(char *user, SIpWhiteList *pNew) { + bool update = true; + taosThreadRwlockWrlock(&ipWhiteMgt.rw); + SIpWhiteList **ppList = taosHashGet(ipWhiteMgt.pIpWhiteList, user, strlen(user)); + if (ppList == NULL || *ppList == NULL) { + SIpWhiteList *p = cloneIpWhiteList(pNew); + taosHashPut(ipWhiteMgt.pIpWhiteList, user, strlen(user), &p, sizeof(void *)); + } else { + SIpWhiteList *pOld = *ppList; + if (isIpWhiteListEqual(pOld, pNew)) { + update = false; + } else { + taosMemoryFree(pOld); + SIpWhiteList *p = cloneIpWhiteList(pNew); + taosHashPut(ipWhiteMgt.pIpWhiteList, user, strlen(user), &p, sizeof(void *)); + } + } + if (update) ipWhiteMgt.ver++; + + taosThreadRwlockUnlock(&ipWhiteMgt.rw); + return 0; +} + +void ipWhiteMgtUpdateAll(SMnode *pMnode) { + ipWhiteMgt.ver++; + SHashObj *pNew = mndFetchAllIpWhite(pMnode); + SHashObj *pOld = ipWhiteMgt.pIpWhiteList; + + ipWhiteMgt.pIpWhiteList = pNew; + destroyIpWhiteTab(pOld); +} +void ipWhiteMgtUpdate2(SMnode *pMnode) { + taosThreadRwlockWrlock(&ipWhiteMgt.rw); + + ipWhiteMgtUpdateAll(pMnode); + + taosThreadRwlockUnlock(&ipWhiteMgt.rw); +} + +int64_t ipWhiteMgtGetVer(SMnode *pMnode) { + taosThreadRwlockWrlock(&ipWhiteMgt.rw); + int64_t ver = ipWhiteMgt.ver; + if (ver == 0) { + ipWhiteMgtUpdateAll(pMnode); + } + + ver = ipWhiteMgt.ver; + taosThreadRwlockUnlock(&ipWhiteMgt.rw); + return ver; +} + +// int64_t ipWhiteMgt + +void destroyIpWhiteTab(SHashObj *pIpWhiteTab) { + if (pIpWhiteTab == NULL) return; + + void *pIter = taosHashIterate(pIpWhiteTab, NULL); + while (pIter) { + SIpWhiteList *list = *(SIpWhiteList **)pIter; + taosMemoryFree(list); + pIter = taosHashIterate(pIpWhiteTab, NULL); + } + + taosHashCleanup(pIpWhiteTab); +} +SHashObj *mndFetchAllIpWhite(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SHashObj *pIpWhiteTab = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 1, HASH_ENTRY_LOCK); + while (1) { + SUserObj *pUser = NULL; + pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pUser); + if (pIter == NULL) break; + + SIpWhiteList *pWhiteList = cloneIpWhiteList(pUser->pIpWhiteList); + taosHashPut(pIpWhiteTab, pUser->user, strlen(pUser->user), &pWhiteList, sizeof(void *)); + + sdbRelease(pSdb, pUser); + } + return pIpWhiteTab; +} int32_t mndInitUser(SMnode *pMnode) { + ipWhiteMgtInit(); + SSdbTable table = { .sdbType = SDB_USER, .keyType = SDB_KEY_BINARY, @@ -70,7 +182,7 @@ int32_t mndInitUser(SMnode *pMnode) { return sdbSetTable(pMnode->pSdb, table); } -void mndCleanupUser(SMnode *pMnode) {} +void mndCleanupUser(SMnode *pMnode) { ipWhiteMgtCleanup(); } static void ipRangeToStr(SIpV4Range *range, char *buf) { struct in_addr addr; @@ -91,21 +203,34 @@ static void ipRangeListToStr(SIpV4Range *range, int32_t num, char *buf) { } if (len > 0) buf[len - 1] = 0; } + +static bool isIpRangeEqual(SIpV4Range *a, SIpV4Range *b) { + // equal or not + return a->ip == b->ip && a->mask == b->mask; +} static bool isRangeInIpWhiteList(SIpWhiteList *pList, SIpV4Range *tgt) { for (int i = 0; i < pList->num; i++) { - SIpV4Range *el = &pList->pIpRange[i]; - if (tgt->ip == el->ip && tgt->mask == el->mask) { - return true; - } + if (isIpRangeEqual(&pList->pIpRange[i], tgt)) return true; } return false; } +static bool isIpWhiteListEqual(SIpWhiteList *a, SIpWhiteList *b) { + if (a->num != b->num) { + return false; + } + for (int i = 0; i < a->num; i++) { + if (!isIpRangeEqual(&a->pIpRange[i], &b->pIpRange[i])) { + return false; + } + } + return true; +} int32_t convertIpWhiteListToStr(SIpWhiteList *pList, char **buf) { if (pList->num == 0) { *buf = NULL; return 0; } - *buf = taosMemoryCalloc(1, pList->num * 36 + 4); + *buf = taosMemoryCalloc(1, pList->num * (sizeof(SIpWhiteList) + 4) + 4); ipRangeListToStr(pList->pIpRange, pList->num, *buf); return strlen(*buf); } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 6caeb98d4c..baaca9d35d 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -580,7 +580,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { if (status == 0) { tTrace("success to dispatch conn to work thread"); } else { - tError("fail to dispatch conn to work thread"); + tError("fail to dispatch conn to work thread, reason:%s", uv_strerror(status)); } if (!uv_is_closing((uv_handle_t*)req->data)) { uv_close((uv_handle_t*)req->data, uvFreeCb);