enhance: update notification and get user whitelist api

This commit is contained in:
shenglian zhou 2023-09-11 17:04:35 +08:00
parent 569283f806
commit 9da204e5ff
6 changed files with 252 additions and 3 deletions

View File

@ -237,6 +237,9 @@ DLL_EXPORT void taos_set_hb_quit(int8_t quitByKill);
DLL_EXPORT int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type);
typedef void (*__taos_async_whitelist_fn_t)(void *param, int code, TAOS *taos, int numOfWhiteLists, uint64_t* pWhiteLists);
DLL_EXPORT void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *param);
/* --------------------------schemaless INTERFACE------------------------------- */
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);

View File

@ -876,9 +876,14 @@ typedef struct {
int32_t tSerializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
int32_t tDeserializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
typedef struct SIpV4Range {
uint32_t ip;
uint32_t mask;
typedef union {
struct {
uint64_t ip_mask;
};
struct {
uint32_t ip;
uint32_t mask;
};
} SIpV4Range;
typedef struct {
@ -973,6 +978,23 @@ int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pR
int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp);
typedef struct {
char user[TSDB_USER_LEN];
} SGetUserWhiteListReq;
int32_t tSerializeSGetUserWhiteListReq(void* buf, int32_t bufLen, SGetUserWhiteListReq* pReq);
int32_t tDeserializeSGetUserWhiteListReq(void* buf, int32_t bufLen, SGetUserWhiteListReq* pReq);
typedef struct {
char user[TSDB_USER_LEN];
int32_t numWhiteLists;
SIpV4Range* pWhiteLists;
} SGetUserWhiteListRsp;
int32_t tSerializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp);
int32_t tDeserializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp);
void tFreeSGetUserWhiteListRsp(SGetUserWhiteListRsp* pRsp);
/*
* for client side struct, only column id, type, bytes are necessary
* But for data in vnode side, we need all the following information.

View File

@ -178,6 +178,7 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE, "retrieve-ip-white", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_WHITELIST, "get-user-whitelist", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)

View File

@ -140,6 +140,13 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)
taosThreadMutexUnlock(&pObj->mutex);
break;
}
case TAOS_NOTIFY_WHITELIST_VER: {
taosThreadMutexLock(&pObj->mutex);
pObj->whiteListInfo.fp = fp;
pObj->whiteListInfo.param = param;
taosThreadMutexUnlock(&pObj->mutex);
break;
}
default: {
terrno = TSDB_CODE_INVALID_PARA;
releaseTscObj(*(int64_t *)taos);
@ -151,6 +158,97 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)
return 0;
}
typedef struct SFetchWhiteListInfo{
int64_t connId;
__taos_async_whitelist_fn_t userCbFn;
void* userParam;
} SFetchWhiteListInfo;
int32_t fetchWhiteListCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
SFetchWhiteListInfo* pInfo = (SFetchWhiteListInfo*)param;
TAOS* taos = &pInfo->connId;
if (code != TSDB_CODE_SUCCESS) {
pInfo->userCbFn(pInfo->userParam, code, taos, 0, NULL);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pInfo);
return code;
}
SGetUserWhiteListRsp *pRsp = pMsg->pData;
pInfo->userCbFn(pInfo->userParam, code, taos, pRsp->numWhiteLists, &pRsp->pWhiteLists->ip_mask);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pInfo);
return code;
}
void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *param) {
if (NULL == taos) {
fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
return;
}
int64_t connId = *(int64_t*)taos;
STscObj *pTsc = acquireTscObj(connId);
if (NULL == pTsc) {
fp(param, TSDB_CODE_TSC_DISCONNECTED, taos, 0, NULL);
return;
}
SGetUserWhiteListReq req;
memcpy(req.user, pTsc->user, TSDB_USER_LEN);
int32_t msgLen = tSerializeSGetUserWhiteListReq(NULL, 0, &req);
void* pReq = taosMemoryMalloc(msgLen);
if (pReq == NULL) {
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
releaseTscObj(connId);
return;
}
if (tSerializeSGetUserWhiteListReq(pReq, msgLen, &req) < 0) {
fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
taosMemoryFree(pReq);
releaseTscObj(connId);
return;
}
SFetchWhiteListInfo* pParam = taosMemoryMalloc(sizeof(SFetchWhiteListInfo));
if (pParam == NULL) {
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
taosMemoryFree(pReq);
releaseTscObj(connId);
return;
}
pParam->connId = connId;
pParam->userCbFn = fp;
pParam->userParam = param;
SMsgSendInfo* pSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pSendInfo == NULL) {
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
taosMemoryFree(pParam);
taosMemoryFree(pReq);
releaseTscObj(connId);
return;
}
pSendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = msgLen, .handle = NULL};
pSendInfo->requestId = generateRequestId();
pSendInfo->requestObjRefId = 0;
pSendInfo->param = pParam;
pSendInfo->fp = fetchWhiteListCallbackFn;
pSendInfo->msgType = TDMT_MND_GET_USER_WHITELIST;
int64_t transportId = 0;
SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp);
asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, &transportId, pSendInfo);
releaseTscObj(connId);
return;
}
void taos_close_internal(void *taos) {
if (taos == NULL) {
return;

View File

@ -1426,6 +1426,7 @@ int32_t tDeserializeSCreateUserReq(void *buf, int32_t bufLen, SCreateUserReq *pR
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
@ -1896,6 +1897,73 @@ void tFreeSGetUserAuthRsp(SGetUserAuthRsp *pRsp) {
taosHashCleanup(pRsp->useDbs);
}
int32_t tSerializeSGetWhiteListReq(void *buf, int32_t bufLen, SGetUserWhiteListReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSGetUserWhiteListReq(void *buf, int32_t bufLen, SGetUserWhiteListReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->user) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->numWhiteLists) < 0) return -1;
for (int i = 0; i < pRsp->numWhiteLists; ++i) {
if (tEncodeU32(&encoder, pRsp->pWhiteLists[i].ip) < 0) return -1;
if (tEncodeU32(&encoder, pRsp->pWhiteLists[i].mask) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->user) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numWhiteLists) < 0) return -1;
pRsp->pWhiteLists = taosMemoryMalloc(pRsp->numWhiteLists * sizeof(SIpV4Range));
if (pRsp->pWhiteLists == NULL) return -1;
for (int32_t i = 0; i < pRsp->numWhiteLists; ++i) {
if (tDecodeU32(&decoder, &(pRsp->pWhiteLists[i].ip)) < 0) return -1;
if (tDecodeU32(&decoder, &(pRsp->pWhiteLists[i].mask)) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSGetUserWhiteListRsp(SGetUserWhiteListRsp* pRsp) {
taosMemoryFree(pRsp->pWhiteLists);
}
int32_t tSerializeSCreateDropMQSNodeReq(void *buf, int32_t bufLen, SMCreateQnodeReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);

View File

@ -49,6 +49,7 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq);
static int32_t mndProcessAlterUserReq(SRpcMsg *pReq);
static int32_t mndProcessDropUserReq(SRpcMsg *pReq);
static int32_t mndProcessGetUserAuthReq(SRpcMsg *pReq);
static int32_t mndProcessGetUserWhiteListReq(SRpcMsg *pReq);
static int32_t mndRetrieveUsers(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextUser(SMnode *pMnode, void *pIter);
static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
@ -387,6 +388,8 @@ int32_t mndInitUser(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_USER, mndProcessAlterUserReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_USER, mndProcessDropUserReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_USER_AUTH, mndProcessGetUserAuthReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_USER_WHITELIST, mndProcessGetUserWhiteListReq);
mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_IP_WHITE, mndProcesSRetrieveIpWhiteReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_USER, mndRetrieveUsers);
@ -1178,6 +1181,60 @@ _OVER:
tFreeSCreateUserReq(&createReq);
return code;
}
//TODO: this is enterpise version, or shared version between enterprise and community?
static int32_t mndSetUserWhiteListRsp(SMnode* pMnode, SUserObj* pUser, SGetUserWhiteListRsp* pWhiteListRsp) {
memcpy(pWhiteListRsp->user, pUser->user, TSDB_USER_LEN);
pWhiteListRsp->numWhiteLists = pUser->pIpWhiteList->num;
pWhiteListRsp->pWhiteLists = taosMemoryMalloc(pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range));
if (pWhiteListRsp->pWhiteLists == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(pUser->pIpWhiteList->pIpRange, pUser->pIpWhiteList->pIpRange, pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range));
return 0;
}
int32_t mndProcessGetUserWhiteListReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SUserObj *pUser = NULL;
SGetUserWhiteListReq wlReq = {0};
SGetUserWhiteListRsp wlRsp = {0};
if (tDeserializeSGetUserWhiteListReq(pReq->pCont, pReq->contLen, &wlReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
mTrace("user: %s, start to get whitelist", wlReq.user);
pUser = mndAcquireUser(pMnode, wlReq.user);
if (pUser == NULL) {
terrno = TSDB_CODE_MND_USER_NOT_EXIST;
goto _OVER;
}
code = mndSetUserWhiteListRsp(pMnode, pUser, &wlRsp);
if (code) {
goto _OVER;
}
int32_t contLen = tSerializeSGetUserWhiteListRsp(NULL, 0, &wlRsp);
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
tSerializeSGetUserWhiteListRsp(pRsp, contLen, &wlRsp);
pReq->info.rsp = pRsp;
pReq->info.rspLen = contLen;
code = 0;
_OVER:
mndReleaseUser(pMnode, pUser);
tFreeSGetUserWhiteListRsp(&wlRsp);
return code;
}
int32_t mndProcesSRetrieveIpWhiteReq(SRpcMsg *pReq) {
// impl later
SRetrieveIpWhiteReq req = {0};