add rpc update interface

This commit is contained in:
yihaoDeng 2023-09-06 16:24:29 +08:00
parent 393ef21348
commit b5d40d01bb
4 changed files with 206 additions and 7 deletions

View File

@ -895,6 +895,20 @@ int32_t tSerializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq
int32_t tDeserializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq);
void tFreeSCreateUserReq(SCreateUserReq* pReq);
typedef struct {
char user[TSDB_USER_LEN];
int32_t numOfRange;
SIpV4Range* pIpRanges;
} SUpdateUserIpWhite;
typedef struct {
int numOfUser;
SUpdateUserIpWhite* pUserIpWhite;
} SUpdateIpWhite;
int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq);
typedef struct {
int8_t alterType;
int8_t superUser;

View File

@ -1420,6 +1420,66 @@ int32_t tDeserializeSCreateUserReq(void *buf, int32_t bufLen, SCreateUserReq *pR
void tFreeSCreateUserReq(SCreateUserReq *pReq) { taosMemoryFree(pReq->pIpRanges); }
int32_t tSerializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pReq) {
// impl later
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfUser) < 0) return -1;
for (int i = 0; i < pReq->numOfUser; i++) {
SUpdateUserIpWhite *pUser = &(pReq->pUserIpWhite[i]);
if (tEncodeCStr(&encoder, pUser->user) < 0) return -1;
if (tEncodeI32(&encoder, pUser->numOfRange) < 0) return -1;
for (int j = 0; j < pUser->numOfRange; j++) {
SIpV4Range *pRange = &pUser->pIpRanges[j];
if (tEncodeU32(&encoder, pRange->ip) < 0) return -1;
if (tEncodeU32(&encoder, pRange->mask) < 0) return -1;
}
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
// impl later
if (tDecodeI32(&decoder, &pReq->numOfUser) < 0) return -1;
pReq->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser);
for (int i = 0; i < pReq->numOfUser; i++) {
SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i];
if (tDecodeCStrTo(&decoder, pUserWhite->user) < 0) return -1;
if (tDecodeI32(&decoder, &pUserWhite->numOfRange) < 0) return -1;
pUserWhite->pIpRanges = taosMemoryCalloc(1, pUserWhite->numOfRange * sizeof(SIpV4Range));
for (int j = 0; j < pUserWhite->numOfRange; j++) {
SIpV4Range *pRange = &pUserWhite->pIpRanges[j];
if (tDecodeU32(&decoder, &pRange->ip) < 0) return -1;
if (tDecodeU32(&decoder, &pRange->mask) < 0) return -1;
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
for (int i = 0; i < pReq->numOfUser; i++) {
SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i];
taosMemoryFree(pUserWhite->pIpRanges);
}
taosMemoryFree(pReq->pUserIpWhite);
// impl later
return;
}
int32_t tSerializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);

View File

@ -31,6 +31,15 @@
#define USER_VER_NUMBER 5
#define USER_RESERVE_SIZE 64
static SIpWhiteList *cloneIpWhiteList(SIpWhiteList *pIpWhiteList);
static SIpWhiteList *createDefaultIpWhiteList();
SIpWhiteList *createIpWhiteList(void *buf, int32_t len);
static bool updateIpWhiteList(SIpWhiteList *pOld, SIpWhiteList *pNew);
static bool isIpWhiteListEqual(SIpWhiteList *a, SIpWhiteList *b);
static bool isIpRangeEqual(SIpV4Range *a, SIpV4Range *b);
void destroyIpWhiteTab(SHashObj *pIpWhiteTab);
static int32_t mndCreateDefaultUsers(SMnode *pMnode);
static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw);
static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser);
@ -45,8 +54,111 @@ static int32_t mndRetrieveUsers(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
static void mndCancelGetNextUser(SMnode *pMnode, void *pIter);
static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextPrivileges(SMnode *pMnode, void *pIter);
SHashObj *mndFetchAllIpWhite(SMnode *pMnode);
typedef struct {
SHashObj *pIpWhiteList;
int64_t ver;
TdThreadRwlock rw;
} SIpWhiteMgt;
static SIpWhiteMgt ipWhiteMgt;
void ipWhiteMgtInit() {
ipWhiteMgt.pIpWhiteList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 1, HASH_ENTRY_LOCK);
ipWhiteMgt.ver = 0;
taosThreadRwlockInit(&ipWhiteMgt.rw, NULL);
}
void ipWhiteMgtCleanup() {
destroyIpWhiteTab(ipWhiteMgt.pIpWhiteList);
taosThreadRwlockDestroy(&ipWhiteMgt.rw);
}
int32_t ipWhiteMgtUpdate(char *user, SIpWhiteList *pNew) {
bool update = true;
taosThreadRwlockWrlock(&ipWhiteMgt.rw);
SIpWhiteList **ppList = taosHashGet(ipWhiteMgt.pIpWhiteList, user, strlen(user));
if (ppList == NULL || *ppList == NULL) {
SIpWhiteList *p = cloneIpWhiteList(pNew);
taosHashPut(ipWhiteMgt.pIpWhiteList, user, strlen(user), &p, sizeof(void *));
} else {
SIpWhiteList *pOld = *ppList;
if (isIpWhiteListEqual(pOld, pNew)) {
update = false;
} else {
taosMemoryFree(pOld);
SIpWhiteList *p = cloneIpWhiteList(pNew);
taosHashPut(ipWhiteMgt.pIpWhiteList, user, strlen(user), &p, sizeof(void *));
}
}
if (update) ipWhiteMgt.ver++;
taosThreadRwlockUnlock(&ipWhiteMgt.rw);
return 0;
}
void ipWhiteMgtUpdateAll(SMnode *pMnode) {
ipWhiteMgt.ver++;
SHashObj *pNew = mndFetchAllIpWhite(pMnode);
SHashObj *pOld = ipWhiteMgt.pIpWhiteList;
ipWhiteMgt.pIpWhiteList = pNew;
destroyIpWhiteTab(pOld);
}
void ipWhiteMgtUpdate2(SMnode *pMnode) {
taosThreadRwlockWrlock(&ipWhiteMgt.rw);
ipWhiteMgtUpdateAll(pMnode);
taosThreadRwlockUnlock(&ipWhiteMgt.rw);
}
int64_t ipWhiteMgtGetVer(SMnode *pMnode) {
taosThreadRwlockWrlock(&ipWhiteMgt.rw);
int64_t ver = ipWhiteMgt.ver;
if (ver == 0) {
ipWhiteMgtUpdateAll(pMnode);
}
ver = ipWhiteMgt.ver;
taosThreadRwlockUnlock(&ipWhiteMgt.rw);
return ver;
}
// int64_t ipWhiteMgt
void destroyIpWhiteTab(SHashObj *pIpWhiteTab) {
if (pIpWhiteTab == NULL) return;
void *pIter = taosHashIterate(pIpWhiteTab, NULL);
while (pIter) {
SIpWhiteList *list = *(SIpWhiteList **)pIter;
taosMemoryFree(list);
pIter = taosHashIterate(pIpWhiteTab, NULL);
}
taosHashCleanup(pIpWhiteTab);
}
SHashObj *mndFetchAllIpWhite(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SHashObj *pIpWhiteTab = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 1, HASH_ENTRY_LOCK);
while (1) {
SUserObj *pUser = NULL;
pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pUser);
if (pIter == NULL) break;
SIpWhiteList *pWhiteList = cloneIpWhiteList(pUser->pIpWhiteList);
taosHashPut(pIpWhiteTab, pUser->user, strlen(pUser->user), &pWhiteList, sizeof(void *));
sdbRelease(pSdb, pUser);
}
return pIpWhiteTab;
}
int32_t mndInitUser(SMnode *pMnode) {
ipWhiteMgtInit();
SSdbTable table = {
.sdbType = SDB_USER,
.keyType = SDB_KEY_BINARY,
@ -70,7 +182,7 @@ int32_t mndInitUser(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupUser(SMnode *pMnode) {}
void mndCleanupUser(SMnode *pMnode) { ipWhiteMgtCleanup(); }
static void ipRangeToStr(SIpV4Range *range, char *buf) {
struct in_addr addr;
@ -91,21 +203,34 @@ static void ipRangeListToStr(SIpV4Range *range, int32_t num, char *buf) {
}
if (len > 0) buf[len - 1] = 0;
}
static bool isIpRangeEqual(SIpV4Range *a, SIpV4Range *b) {
// equal or not
return a->ip == b->ip && a->mask == b->mask;
}
static bool isRangeInIpWhiteList(SIpWhiteList *pList, SIpV4Range *tgt) {
for (int i = 0; i < pList->num; i++) {
SIpV4Range *el = &pList->pIpRange[i];
if (tgt->ip == el->ip && tgt->mask == el->mask) {
return true;
}
if (isIpRangeEqual(&pList->pIpRange[i], tgt)) return true;
}
return false;
}
static bool isIpWhiteListEqual(SIpWhiteList *a, SIpWhiteList *b) {
if (a->num != b->num) {
return false;
}
for (int i = 0; i < a->num; i++) {
if (!isIpRangeEqual(&a->pIpRange[i], &b->pIpRange[i])) {
return false;
}
}
return true;
}
int32_t convertIpWhiteListToStr(SIpWhiteList *pList, char **buf) {
if (pList->num == 0) {
*buf = NULL;
return 0;
}
*buf = taosMemoryCalloc(1, pList->num * 36 + 4);
*buf = taosMemoryCalloc(1, pList->num * (sizeof(SIpWhiteList) + 4) + 4);
ipRangeListToStr(pList->pIpRange, pList->num, *buf);
return strlen(*buf);
}

View File

@ -580,7 +580,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
if (status == 0) {
tTrace("success to dispatch conn to work thread");
} else {
tError("fail to dispatch conn to work thread");
tError("fail to dispatch conn to work thread, reason:%s", uv_strerror(status));
}
if (!uv_is_closing((uv_handle_t*)req->data)) {
uv_close((uv_handle_t*)req->data, uvFreeCb);