Merge pull request #22828 from taosdata/enh/ipWhiteList

Enh/ip white list
This commit is contained in:
Haojun Liao 2023-09-15 17:49:37 +08:00 committed by GitHub
commit 05e9f40834
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1998 additions and 165 deletions

View File

@ -125,6 +125,7 @@ typedef enum {
typedef enum { typedef enum {
TAOS_NOTIFY_PASSVER = 0, TAOS_NOTIFY_PASSVER = 0,
TAOS_NOTIFY_WHITELIST_VER = 1
} TAOS_NOTIFY_TYPE; } TAOS_NOTIFY_TYPE;
#define RET_MSG_LENGTH 1024 #define RET_MSG_LENGTH 1024
@ -236,6 +237,9 @@ DLL_EXPORT void taos_set_hb_quit(int8_t quitByKill);
DLL_EXPORT int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type); DLL_EXPORT int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type);
typedef void (*__taos_async_whitelist_fn_t)(void *param, int code, TAOS *taos, int numOfWhiteLists, uint64_t* pWhiteLists);
DLL_EXPORT void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *param);
/* --------------------------schemaless INTERFACE------------------------------- */ /* --------------------------schemaless INTERFACE------------------------------- */
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision); DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);

View File

@ -849,6 +849,7 @@ typedef struct {
int32_t authVer; int32_t authVer;
char sVer[TSDB_VERSION_LEN]; char sVer[TSDB_VERSION_LEN];
char sDetailVer[128]; char sDetailVer[128];
int64_t whiteListVer;
} SConnectRsp; } SConnectRsp;
int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp); int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
@ -875,11 +876,17 @@ typedef struct {
int32_t tSerializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq); int32_t tSerializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
int32_t tDeserializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq); int32_t tDeserializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
typedef struct SIpV4Range { typedef struct SIpV4Range{
uint32_t ip; uint32_t ip;
uint32_t mask; uint32_t mask;
} SIpV4Range; } SIpV4Range;
typedef struct {
int32_t num;
SIpV4Range pIpRange[];
} SIpWhiteList;
SIpWhiteList* cloneIpWhiteList(SIpWhiteList* pIpWhiteList);
typedef struct { typedef struct {
int8_t createType; int8_t createType;
int8_t superUser; // denote if it is a super user or not int8_t superUser; // denote if it is a super user or not
@ -895,6 +902,30 @@ int32_t tSerializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq
int32_t tDeserializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq); int32_t tDeserializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq);
void tFreeSCreateUserReq(SCreateUserReq* pReq); void tFreeSCreateUserReq(SCreateUserReq* pReq);
typedef struct {
int64_t ver;
char user[TSDB_USER_LEN];
int32_t numOfRange;
SIpV4Range* pIpRanges;
} SUpdateUserIpWhite;
typedef struct {
int64_t ver;
int numOfUser;
SUpdateUserIpWhite* pUserIpWhite;
} SUpdateIpWhite;
int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq);
SUpdateIpWhite* cloneSUpdateIpWhiteReq(SUpdateIpWhite* pReq);
typedef struct {
int64_t ipWhiteVer;
} SRetrieveIpWhiteReq;
int32_t tSerializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq);
int32_t tDeserializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq);
typedef struct { typedef struct {
int8_t alterType; int8_t alterType;
int8_t superUser; int8_t superUser;
@ -935,12 +966,30 @@ typedef struct {
SHashObj* readTbs; SHashObj* readTbs;
SHashObj* writeTbs; SHashObj* writeTbs;
SHashObj* useDbs; SHashObj* useDbs;
int64_t whiteListVer;
} SGetUserAuthRsp; } SGetUserAuthRsp;
int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp); int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp); int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp); void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp);
typedef struct {
char user[TSDB_USER_LEN];
} SGetUserWhiteListReq;
int32_t tSerializeSGetUserWhiteListReq(void* buf, int32_t bufLen, SGetUserWhiteListReq* pReq);
int32_t tDeserializeSGetUserWhiteListReq(void* buf, int32_t bufLen, SGetUserWhiteListReq* pReq);
typedef struct {
char user[TSDB_USER_LEN];
int32_t numWhiteLists;
SIpV4Range* pWhiteLists;
} SGetUserWhiteListRsp;
int32_t tSerializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp);
int32_t tDeserializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp);
void tFreeSGetUserWhiteListRsp(SGetUserWhiteListRsp* pRsp);
/* /*
* for client side struct, only column id, type, bytes are necessary * for client side struct, only column id, type, bytes are necessary
* But for data in vnode side, we need all the following information. * But for data in vnode side, we need all the following information.
@ -1455,6 +1504,7 @@ typedef struct {
SClusterCfg clusterCfg; SClusterCfg clusterCfg;
SArray* pVloads; // array of SVnodeLoad SArray* pVloads; // array of SVnodeLoad
int32_t statusSeq; int32_t statusSeq;
int64_t ipWhiteVer;
} SStatusReq; } SStatusReq;
int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
@ -1485,6 +1535,7 @@ typedef struct {
SDnodeCfg dnodeCfg; SDnodeCfg dnodeCfg;
SArray* pDnodeEps; // Array of SDnodeEp SArray* pDnodeEps; // Array of SDnodeEp
int32_t statusSeq; int32_t statusSeq;
int64_t ipWhiteVer;
} SStatusRsp; } SStatusRsp;
int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);

View File

@ -52,6 +52,7 @@ typedef struct {
void* data; void* data;
void* mgmt; void* mgmt;
void* clientRpc; void* clientRpc;
void* serverRpc;
PutToQueueFp putToQueueFp; PutToQueueFp putToQueueFp;
GetQueueSizeFp qsizeFp; GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp; SendReqFp sendReqFp;

View File

@ -177,6 +177,8 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE, "retrieve-ip-white", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_WHITELIST, "get-user-whitelist", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)

View File

@ -37,6 +37,7 @@ typedef struct {
int64_t applyIndex; int64_t applyIndex;
uint64_t applyTerm; uint64_t applyTerm;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
} SRpcConnInfo; } SRpcConnInfo;
typedef struct SRpcHandleInfo { typedef struct SRpcHandleInfo {
@ -60,6 +61,8 @@ typedef struct SRpcHandleInfo {
STraceId traceId; STraceId traceId;
SRpcConnInfo conn; SRpcConnInfo conn;
int8_t forbiddenIp;
} SRpcHandleInfo; } SRpcHandleInfo;
typedef struct SRpcMsg { typedef struct SRpcMsg {
@ -162,6 +165,11 @@ int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg,
int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
void *rpcAllocHandle(); void *rpcAllocHandle();
void rpcSetIpWhite(void *thandl, void *arg);
int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf);
int32_t rpcUtilSWhiteListToStr(SIpWhiteList *pWhiteList, char **ppBuf);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -123,6 +123,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_INVALID_DATA_FMT TAOS_DEF_ERROR_CODE(0, 0x0132) #define TSDB_CODE_INVALID_DATA_FMT TAOS_DEF_ERROR_CODE(0, 0x0132)
#define TSDB_CODE_INVALID_CFG_VALUE TAOS_DEF_ERROR_CODE(0, 0x0133) #define TSDB_CODE_INVALID_CFG_VALUE TAOS_DEF_ERROR_CODE(0, 0x0133)
#define TSDB_CODE_IP_NOT_IN_WHITE_LIST TAOS_DEF_ERROR_CODE(0, 0x0134)
//client //client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
#define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201) #define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201)
@ -245,6 +247,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_AUTH_FAILURE TAOS_DEF_ERROR_CODE(0, 0x0357) #define TSDB_CODE_MND_AUTH_FAILURE TAOS_DEF_ERROR_CODE(0, 0x0357)
#define TSDB_CODE_MND_USER_NOT_AVAILABLE TAOS_DEF_ERROR_CODE(0, 0x0358) #define TSDB_CODE_MND_USER_NOT_AVAILABLE TAOS_DEF_ERROR_CODE(0, 0x0358)
#define TSDB_CODE_MND_PRIVILEDGE_EXIST TAOS_DEF_ERROR_CODE(0, 0x0359) #define TSDB_CODE_MND_PRIVILEDGE_EXIST TAOS_DEF_ERROR_CODE(0, 0x0359)
#define TSDB_CODE_MND_USER_HOST_EXIST TAOS_DEF_ERROR_CODE(0, 0x035A)
#define TSDB_CODE_MND_USER_HOST_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x035B)
#define TSDB_CODE_MND_TOO_MANY_USER_HOST TAOS_DEF_ERROR_CODE(0, 0x035C)
#define TSDB_CODE_MND_USER_LOCAL_HOST_NOT_DROP TAOS_DEF_ERROR_CODE(0, 0x035D)
// mnode-stable-part1 // mnode-stable-part1
#define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360) #define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360)

View File

@ -184,7 +184,7 @@ typedef enum ELogicConditionType {
#define TSDB_UNI_LEN 24 #define TSDB_UNI_LEN 24
#define TSDB_USER_LEN TSDB_UNI_LEN #define TSDB_USER_LEN TSDB_UNI_LEN
#define TSDB_POINTER_PRINT_BYTES 18 // 0x1122334455667788 #define TSDB_POINTER_PRINT_BYTES 18 // 0x1122334455667788
// ACCOUNT is a 32 bit positive integer // ACCOUNT is a 32 bit positive integer
// this is the length of its string representation, including the terminator zero // this is the length of its string representation, including the terminator zero
#define TSDB_ACCT_ID_LEN 11 #define TSDB_ACCT_ID_LEN 11
@ -202,6 +202,7 @@ typedef enum ELogicConditionType {
#define TSDB_DB_NAME_LEN 65 #define TSDB_DB_NAME_LEN 65
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_PRIVILEDGE_CONDITION_LEN 48 * 1024 #define TSDB_PRIVILEDGE_CONDITION_LEN 48 * 1024
#define TSDB_PRIVILEDGE_HOST_LEN 48 * 1024
#define TSDB_FUNC_NAME_LEN 65 #define TSDB_FUNC_NAME_LEN 65
#define TSDB_FUNC_COMMENT_LEN 1024 * 1024 #define TSDB_FUNC_COMMENT_LEN 1024 * 1024
@ -417,7 +418,7 @@ typedef enum ELogicConditionType {
#define TSDB_EXPLAIN_RESULT_ROW_SIZE (16 * 1024) #define TSDB_EXPLAIN_RESULT_ROW_SIZE (16 * 1024)
#define TSDB_EXPLAIN_RESULT_COLUMN_NAME "QUERY_PLAN" #define TSDB_EXPLAIN_RESULT_COLUMN_NAME "QUERY_PLAN"
#define TSDB_MAX_FIELD_LEN 65519 // 16384:65519 #define TSDB_MAX_FIELD_LEN 65519 // 16384:65519
#define TSDB_MAX_BINARY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519 #define TSDB_MAX_BINARY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519
#define TSDB_MAX_NCHAR_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519 #define TSDB_MAX_NCHAR_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519
#define TSDB_MAX_GEOMETRY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519 #define TSDB_MAX_GEOMETRY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519

View File

@ -135,6 +135,12 @@ typedef struct {
__taos_notify_fn_t fp; __taos_notify_fn_t fp;
} SPassInfo; } SPassInfo;
typedef struct {
int64_t ver;
void* param;
__taos_notify_fn_t fp;
} SWhiteListInfo;
typedef struct STscObj { typedef struct STscObj {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN]; char pass[TSDB_PASSWORD_LEN];
@ -152,6 +158,7 @@ typedef struct STscObj {
SAppInstInfo* pAppInfo; SAppInstInfo* pAppInfo;
SHashObj* pRequests; SHashObj* pRequests;
SPassInfo passInfo; SPassInfo passInfo;
SWhiteListInfo whiteListInfo;
} STscObj; } STscObj;
typedef struct STscDbg { typedef struct STscDbg {

View File

@ -116,6 +116,19 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
atomic_load_32(&passInfo->ver), pTscObj->id); atomic_load_32(&passInfo->ver), pTscObj->id);
} }
} }
if (pTscObj->whiteListInfo.fp) {
SWhiteListInfo *whiteListInfo = &pTscObj->whiteListInfo;
int64_t oldVer = atomic_load_64(&whiteListInfo->ver);
if (oldVer < pRsp->whiteListVer) {
atomic_store_64(&whiteListInfo->ver, pRsp->whiteListVer);
if (whiteListInfo->fp) {
(*whiteListInfo->fp)(whiteListInfo->param, &pRsp->whiteListVer, TAOS_NOTIFY_WHITELIST_VER);
}
tscDebug("update whitelist version of user %s from %"PRId64" to %"PRId64", tscRid:%" PRIi64, pRsp->user, oldVer,
atomic_load_64(&whiteListInfo->ver), pTscObj->id);
}
}
releaseTscObj(pReq->connKey.tscRid); releaseTscObj(pReq->connKey.tscRid);
} }
} }

View File

@ -140,6 +140,13 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)
taosThreadMutexUnlock(&pObj->mutex); taosThreadMutexUnlock(&pObj->mutex);
break; break;
} }
case TAOS_NOTIFY_WHITELIST_VER: {
taosThreadMutexLock(&pObj->mutex);
pObj->whiteListInfo.fp = fp;
pObj->whiteListInfo.param = param;
taosThreadMutexUnlock(&pObj->mutex);
break;
}
default: { default: {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
releaseTscObj(*(int64_t *)taos); releaseTscObj(*(int64_t *)taos);
@ -151,6 +158,113 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)
return 0; return 0;
} }
typedef struct SFetchWhiteListInfo{
int64_t connId;
__taos_async_whitelist_fn_t userCbFn;
void* userParam;
} SFetchWhiteListInfo;
int32_t fetchWhiteListCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
SFetchWhiteListInfo* pInfo = (SFetchWhiteListInfo*)param;
TAOS* taos = &pInfo->connId;
if (code != TSDB_CODE_SUCCESS) {
pInfo->userCbFn(pInfo->userParam, code, taos, 0, NULL);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pInfo);
return code;
}
SGetUserWhiteListRsp wlRsp;
tDeserializeSGetUserWhiteListRsp(pMsg->pData, pMsg->len, &wlRsp);
uint64_t* pWhiteLists = taosMemoryMalloc(wlRsp.numWhiteLists * sizeof(uint64_t));
if (pWhiteLists == NULL) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pInfo);
tFreeSGetUserWhiteListRsp(&wlRsp);
}
for (int i = 0; i < wlRsp.numWhiteLists; ++i) {
pWhiteLists[i] = ((uint64_t)wlRsp.pWhiteLists[i].mask << 32) | wlRsp.pWhiteLists[i].ip;
}
pInfo->userCbFn(pInfo->userParam, code, taos, wlRsp.numWhiteLists, pWhiteLists);
taosMemoryFree(pWhiteLists);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pInfo);
tFreeSGetUserWhiteListRsp(&wlRsp);
return code;
}
void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *param) {
if (NULL == taos) {
fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
return;
}
int64_t connId = *(int64_t*)taos;
STscObj *pTsc = acquireTscObj(connId);
if (NULL == pTsc) {
fp(param, TSDB_CODE_TSC_DISCONNECTED, taos, 0, NULL);
return;
}
SGetUserWhiteListReq req;
memcpy(req.user, pTsc->user, TSDB_USER_LEN);
int32_t msgLen = tSerializeSGetUserWhiteListReq(NULL, 0, &req);
void* pReq = taosMemoryMalloc(msgLen);
if (pReq == NULL) {
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
releaseTscObj(connId);
return;
}
if (tSerializeSGetUserWhiteListReq(pReq, msgLen, &req) < 0) {
fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
taosMemoryFree(pReq);
releaseTscObj(connId);
return;
}
SFetchWhiteListInfo* pParam = taosMemoryMalloc(sizeof(SFetchWhiteListInfo));
if (pParam == NULL) {
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
taosMemoryFree(pReq);
releaseTscObj(connId);
return;
}
pParam->connId = connId;
pParam->userCbFn = fp;
pParam->userParam = param;
SMsgSendInfo* pSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pSendInfo == NULL) {
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
taosMemoryFree(pParam);
taosMemoryFree(pReq);
releaseTscObj(connId);
return;
}
pSendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = msgLen, .handle = NULL};
pSendInfo->requestId = generateRequestId();
pSendInfo->requestObjRefId = 0;
pSendInfo->param = pParam;
pSendInfo->fp = fetchWhiteListCallbackFn;
pSendInfo->msgType = TDMT_MND_GET_USER_WHITELIST;
int64_t transportId = 0;
SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp);
asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, &transportId, pSendInfo);
releaseTscObj(connId);
return;
}
void taos_close_internal(void *taos) { void taos_close_internal(void *taos) {
if (taos == NULL) { if (taos == NULL) {
return; return;

View File

@ -139,6 +139,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
pTscObj->connType = connectRsp.connType; pTscObj->connType = connectRsp.connType;
pTscObj->passInfo.ver = connectRsp.passVer; pTscObj->passInfo.ver = connectRsp.passVer;
pTscObj->authVer = connectRsp.authVer; pTscObj->authVer = connectRsp.authVer;
pTscObj->whiteListInfo.ver = connectRsp.whiteListVer;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType);

View File

@ -221,6 +221,7 @@ static const SSysDbTableSchema userUsersSchema[] = {
{.name = "enable", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = false}, {.name = "enable", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = false},
{.name = "sysinfo", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = false}, {.name = "sysinfo", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = false},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "allowed_host", .bytes = TSDB_PRIVILEDGE_HOST_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
}; };
GRANTS_SCHEMA; GRANTS_SCHEMA;
@ -308,7 +309,7 @@ static const SSysDbTableSchema userUserPrivilegesSchema[] = {
{.name = "privilege", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "privilege", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "db_name", .bytes = TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "db_name", .bytes = TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "table_name", .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "table_name", .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "condition", .bytes = TSDB_PRIVILEDGE_CONDITION_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "condition", .bytes = TSDB_PRIVILEDGE_CONDITION_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
}; };
static const SSysTableMeta infosMeta[] = { static const SSysTableMeta infosMeta[] = {

View File

@ -1116,6 +1116,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, reserved) < 0) return -1; if (tEncodeI64(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1; if (tEncodeI64(&encoder, reserved) < 0) return -1;
} }
if (tEncodeI64(&encoder, pReq->ipWhiteVer) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -1226,6 +1228,10 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &reserved) < 0) return -1; if (tDecodeI64(&decoder, &reserved) < 0) return -1;
} }
} }
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI64(&decoder, &pReq->ipWhiteVer) < 0) return -1;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
@ -1258,6 +1264,8 @@ int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
} }
if (tEncodeI32(&encoder, pRsp->statusSeq) < 0) return -1; if (tEncodeI32(&encoder, pRsp->statusSeq) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->ipWhiteVer) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -1300,6 +1308,10 @@ int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
} }
if (tDecodeI32(&decoder, &pRsp->statusSeq) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->statusSeq) < 0) return -1;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI64(&decoder, &pRsp->ipWhiteVer) < 0) return -1;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
@ -1371,6 +1383,15 @@ int32_t tDeserializeSDropUserReq(void *buf, int32_t bufLen, SDropUserReq *pReq)
return 0; return 0;
} }
SIpWhiteList *cloneIpWhiteList(SIpWhiteList *pIpWhiteList) {
if (pIpWhiteList == NULL) return NULL;
int32_t sz = sizeof(SIpWhiteList) + pIpWhiteList->num * sizeof(SIpV4Range);
SIpWhiteList *pNew = taosMemoryCalloc(1, sz);
memcpy(pNew, pIpWhiteList, sz);
return pNew;
}
int32_t tSerializeSCreateUserReq(void *buf, int32_t bufLen, SCreateUserReq *pReq) { int32_t tSerializeSCreateUserReq(void *buf, int32_t bufLen, SCreateUserReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
@ -1414,12 +1435,123 @@ int32_t tDeserializeSCreateUserReq(void *buf, int32_t bufLen, SCreateUserReq *pR
} }
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
void tFreeSCreateUserReq(SCreateUserReq *pReq) { taosMemoryFree(pReq->pIpRanges); } void tFreeSCreateUserReq(SCreateUserReq *pReq) { taosMemoryFree(pReq->pIpRanges); }
int32_t tSerializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pReq) {
// impl later
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->ver) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfUser) < 0) return -1;
for (int i = 0; i < pReq->numOfUser; i++) {
SUpdateUserIpWhite *pUser = &(pReq->pUserIpWhite[i]);
if (tEncodeI64(&encoder, pUser->ver) < 0) return -1;
if (tEncodeCStr(&encoder, pUser->user) < 0) return -1;
if (tEncodeI32(&encoder, pUser->numOfRange) < 0) return -1;
for (int j = 0; j < pUser->numOfRange; j++) {
SIpV4Range *pRange = &pUser->pIpRanges[j];
if (tEncodeU32(&encoder, pRange->ip) < 0) return -1;
if (tEncodeU32(&encoder, pRange->mask) < 0) return -1;
}
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
// impl later
if (tDecodeI64(&decoder, &pReq->ver) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfUser) < 0) return -1;
pReq->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser);
for (int i = 0; i < pReq->numOfUser; i++) {
SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i];
if (tDecodeI64(&decoder, &pUserWhite->ver) < 0) return -1;
if (tDecodeCStrTo(&decoder, pUserWhite->user) < 0) return -1;
if (tDecodeI32(&decoder, &pUserWhite->numOfRange) < 0) return -1;
pUserWhite->pIpRanges = taosMemoryCalloc(1, pUserWhite->numOfRange * sizeof(SIpV4Range));
for (int j = 0; j < pUserWhite->numOfRange; j++) {
SIpV4Range *pRange = &pUserWhite->pIpRanges[j];
if (tDecodeU32(&decoder, &pRange->ip) < 0) return -1;
if (tDecodeU32(&decoder, &pRange->mask) < 0) return -1;
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
for (int i = 0; i < pReq->numOfUser; i++) {
SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i];
taosMemoryFree(pUserWhite->pIpRanges);
}
taosMemoryFree(pReq->pUserIpWhite);
// impl later
return;
}
SUpdateIpWhite *cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
SUpdateIpWhite *pClone = taosMemoryCalloc(1, sizeof(SUpdateIpWhite));
pClone->numOfUser = pReq->numOfUser;
pClone->ver = pReq->ver;
pClone->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser);
for (int i = 0; i < pReq->numOfUser; i++) {
SUpdateUserIpWhite *pNew = &pClone->pUserIpWhite[i];
SUpdateUserIpWhite *pOld = &pReq->pUserIpWhite[i];
pNew->ver = pOld->ver;
memcpy(pNew->user, pOld->user, strlen(pOld->user));
pNew->numOfRange = pOld->numOfRange;
int32_t sz = pOld->numOfRange * sizeof(SIpV4Range);
pNew->pIpRanges = taosMemoryCalloc(1, sz);
memcpy(pNew->pIpRanges, pOld->pIpRanges, sz);
}
return pClone;
}
int32_t tSerializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->ipWhiteVer) < 0) {
return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
// impl later
if (tDecodeI64(&decoder, &pReq->ipWhiteVer) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq) { int32_t tSerializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
@ -1599,7 +1731,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
// since 3.0.7.0 // since 3.0.7.0
if (tEncodeI32(pEncoder, pRsp->passVer) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->passVer) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->whiteListVer) < 0) return -1;
return 0; return 0;
} }
@ -1731,6 +1863,11 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
} else { } else {
pRsp->passVer = 0; pRsp->passVer = 0;
} }
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI64(pDecoder, &pRsp->whiteListVer) < 0) goto _err;
} else {
pRsp->whiteListVer = 0;
}
} }
return 0; return 0;
_err: _err:
@ -1769,6 +1906,73 @@ void tFreeSGetUserAuthRsp(SGetUserAuthRsp *pRsp) {
taosHashCleanup(pRsp->useDbs); taosHashCleanup(pRsp->useDbs);
} }
int32_t tSerializeSGetUserWhiteListReq(void *buf, int32_t bufLen, SGetUserWhiteListReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSGetUserWhiteListReq(void *buf, int32_t bufLen, SGetUserWhiteListReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->user) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->numWhiteLists) < 0) return -1;
for (int i = 0; i < pRsp->numWhiteLists; ++i) {
if (tEncodeU32(&encoder, pRsp->pWhiteLists[i].ip) < 0) return -1;
if (tEncodeU32(&encoder, pRsp->pWhiteLists[i].mask) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->user) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numWhiteLists) < 0) return -1;
pRsp->pWhiteLists = taosMemoryMalloc(pRsp->numWhiteLists * sizeof(SIpV4Range));
if (pRsp->pWhiteLists == NULL) return -1;
for (int32_t i = 0; i < pRsp->numWhiteLists; ++i) {
if (tDecodeU32(&decoder, &(pRsp->pWhiteLists[i].ip)) < 0) return -1;
if (tDecodeU32(&decoder, &(pRsp->pWhiteLists[i].mask)) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSGetUserWhiteListRsp(SGetUserWhiteListRsp* pRsp) {
taosMemoryFree(pRsp->pWhiteLists);
}
int32_t tSerializeSCreateDropMQSNodeReq(void *buf, int32_t bufLen, SMCreateQnodeReq *pReq) { int32_t tSerializeSCreateDropMQSNodeReq(void *buf, int32_t bufLen, SMCreateQnodeReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
@ -4235,6 +4439,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->passVer) < 0) return -1; if (tEncodeI32(&encoder, pRsp->passVer) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->authVer) < 0) return -1; if (tEncodeI32(&encoder, pRsp->authVer) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->whiteListVer) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -4271,6 +4476,11 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
pRsp->authVer = 0; pRsp->authVer = 0;
} }
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI64(&decoder, &pRsp->whiteListVer) < 0) return -1;
} else {
pRsp->whiteListVer = 0;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);

View File

@ -23,22 +23,22 @@ extern "C" {
#endif #endif
typedef struct SDnodeMgmt { typedef struct SDnodeMgmt {
SDnodeData *pData; SDnodeData *pData;
SMsgCb msgCb; SMsgCb msgCb;
const char *path; const char *path;
const char *name; const char *name;
TdThread statusThread; TdThread statusThread;
TdThread monitorThread; TdThread monitorThread;
TdThread crashReportThread; TdThread crashReportThread;
SSingleWorker mgmtWorker; SSingleWorker mgmtWorker;
ProcessCreateNodeFp processCreateNodeFp; ProcessCreateNodeFp processCreateNodeFp;
ProcessAlterNodeTypeFp processAlterNodeTypeFp; ProcessAlterNodeTypeFp processAlterNodeTypeFp;
ProcessDropNodeFp processDropNodeFp; ProcessDropNodeFp processDropNodeFp;
SendMonitorReportFp sendMonitorReportFp; SendMonitorReportFp sendMonitorReportFp;
GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsFp;
GetMnodeLoadsFp getMnodeLoadsFp; GetMnodeLoadsFp getMnodeLoadsFp;
GetQnodeLoadsFp getQnodeLoadsFp; GetQnodeLoadsFp getQnodeLoadsFp;
int32_t statusSeq; int32_t statusSeq;
} SDnodeMgmt; } SDnodeMgmt;
// dmHandle.c // dmHandle.c

View File

@ -30,7 +30,36 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
taosThreadRwlockUnlock(&pMgmt->pData->lock); taosThreadRwlockUnlock(&pMgmt->pData->lock);
} }
} }
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
if (pMgmt->pData->ipWhiteVer == ver) {
if (ver == 0) {
dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL);
// pMgmt->ipWhiteVer = ver;
}
return;
}
int64_t oldVer = pMgmt->pData->ipWhiteVer;
// pMgmt->ipWhiteVer = ver;
SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer};
int32_t contLen = tSerializeRetrieveIpWhite(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
tSerializeRetrieveIpWhite(pHead, contLen, &req);
SRpcMsg rpcMsg = {.pCont = pHead,
.contLen = contLen,
.msgType = TDMT_MND_RETRIEVE_IP_WHITE,
.info.ahandle = (void *)0x9527,
.info.refId = 0,
.info.noResp = 0};
SEpSet epset = {0};
dmGetMnodeEpSet(pMgmt->pData, &epset);
rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
}
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
const STraceId *trace = &pRsp->info.traceId; const STraceId *trace = &pRsp->info.traceId;
dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code); dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
@ -55,6 +84,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps); dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
} }
dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
} }
tFreeSStatusRsp(&statusRsp); tFreeSStatusRsp(&statusRsp);
} }
@ -111,6 +141,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
pMgmt->statusSeq++; pMgmt->statusSeq++;
req.statusSeq = pMgmt->statusSeq; req.statusSeq = pMgmt->statusSeq;
req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen); void *pHead = rpcMallocCont(contLen);

View File

@ -55,6 +55,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp; pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp; pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp;
// pMgmt->pData->ipWhiteVer = 0;
if (dmStartWorker(pMgmt) != 0) { if (dmStartWorker(pMgmt) != 0) {
return -1; return -1;
} }

View File

@ -161,6 +161,9 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_PAUSE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_PAUSE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RESUME_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RESUME_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_WHITELIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_CREATE_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_CREATE_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -63,6 +63,29 @@ static void dmConvertErrCode(tmsg_t msgType) {
terrno = TSDB_CODE_VND_STOPPED; terrno = TSDB_CODE_VND_STOPPED;
} }
} }
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
SUpdateIpWhite ipWhite = {0}; // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
rpcSetIpWhite(pTrans, &ipWhite);
pData->ipWhiteVer = ipWhite.ver;
tFreeSUpdateIpWhiteReq(&ipWhite);
rpcFreeCont(pRpc->pCont);
}
static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
if (forbidden) {
SIpV4Range range = {.ip = clientIp, .mask = 32};
char buf[36] = {0};
rpcUtilSIpRangeToStr(&range, buf);
dError("User:%s host:%s not in ip white list", user, buf);
return true;
} else {
return false;
}
}
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
int32_t code = -1; int32_t code = -1;
@ -81,6 +104,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
goto _OVER; goto _OVER;
} }
bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, pRpc->info.conn.clientIp);
if (isForbidden) {
terrno = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
goto _OVER;
}
switch (pRpc->msgType) { switch (pRpc->msgType) {
case TDMT_DND_NET_TEST: case TDMT_DND_NET_TEST:
dmProcessNetTestReq(pDnode, pRpc); dmProcessNetTestReq(pDnode, pRpc);
@ -97,6 +126,10 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dmSetMnodeEpSet(&pDnode->data, pEpSet); dmSetMnodeEpSet(&pDnode->data, pEpSet);
} }
break; break;
case TDMT_MND_RETRIEVE_IP_WHITE_RSP: {
dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
return;
} break;
default: default:
break; break;
} }
@ -372,6 +405,7 @@ void dmCleanupServer(SDnode *pDnode) {
SMsgCb dmGetMsgcb(SDnode *pDnode) { SMsgCb dmGetMsgcb(SDnode *pDnode) {
SMsgCb msgCb = { SMsgCb msgCb = {
.clientRpc = pDnode->trans.clientRpc, .clientRpc = pDnode->trans.clientRpc,
.serverRpc = pDnode->trans.serverRpc,
.sendReqFp = dmSendReq, .sendReqFp = dmSendReq,
.sendRspFp = dmSendRsp, .sendRspFp = dmSendRsp,
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,

View File

@ -107,6 +107,7 @@ typedef struct {
TdThreadRwlock lock; TdThreadRwlock lock;
SMsgCb msgCb; SMsgCb msgCb;
bool validMnodeEps; bool validMnodeEps;
int64_t ipWhiteVer;
} SDnodeData; } SDnodeData;
typedef struct { typedef struct {

View File

@ -277,18 +277,21 @@ typedef struct {
} SAcctObj; } SAcctObj;
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN]; char pass[TSDB_PASSWORD_LEN];
char acct[TSDB_USER_LEN]; char acct[TSDB_USER_LEN];
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
int8_t superUser; int8_t superUser;
int8_t sysInfo; int8_t sysInfo;
int8_t enable; int8_t enable;
int8_t reserve; int8_t reserve;
int32_t acctId; int32_t acctId;
int32_t authVersion; int32_t authVersion;
int32_t passVersion; int32_t passVersion;
int64_t ipWhiteListVer;
SIpWhiteList* pIpWhiteList;
SHashObj* readDbs; SHashObj* readDbs;
SHashObj* writeDbs; SHashObj* writeDbs;
SHashObj* topics; SHashObj* topics;

View File

@ -130,6 +130,7 @@ typedef struct SMnode {
SGrantInfo grant; SGrantInfo grant;
MndMsgFp msgFp[TDMT_MAX]; MndMsgFp msgFp[TDMT_MAX];
SMsgCb msgCb; SMsgCb msgCb;
int64_t ipWhiteVer;
} SMnode; } SMnode;
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
@ -140,6 +141,8 @@ bool mndGetRestored(SMnode *pMnode);
void mndSetStop(SMnode *pMnode); void mndSetStop(SMnode *pMnode);
bool mndGetStop(SMnode *pMnode); bool mndGetStop(SMnode *pMnode);
SArray *mndGetAllDnodeFqdns(SMnode *pMnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -33,6 +33,9 @@ int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType
int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, EShowType showType, const char *dbname); int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, EShowType showType, const char *dbname);
int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter); int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter);
int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp); int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp);
int32_t mndSetUserWhiteListRsp(SMnode* pMnode, SUserObj* pUser, SGetUserWhiteListRsp* pWhiteListRsp);
int32_t mndEnableIpWhiteList(SMnode *pMnode);
int32_t mndFetchIpWhiteList(SIpWhiteList *ipList, char **buf);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -23,6 +23,10 @@
extern "C" { extern "C" {
#endif #endif
enum {
IP_WHITE_ADD,
IP_WHITE_DROP,
};
int32_t mndInitUser(SMnode *pMnode); int32_t mndInitUser(SMnode *pMnode);
void mndCleanupUser(SMnode *pMnode); void mndCleanupUser(SMnode *pMnode);
SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName); SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName);
@ -38,8 +42,15 @@ int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int3
int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db); int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db);
int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic); int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic);
int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew); int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew);
void mndUserFreeObj(SUserObj *pUser); void mndUserFreeObj(SUserObj *pUser);
int64_t mndGetIpWhiteVer(SMnode *pMnode);
void mndUpdateIpWhite(SMnode *pMnode, char *user, char *fqdn, int8_t type, int8_t lock);
int32_t mndRefreshUserIpWhiteList(SMnode *pMnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -51,6 +51,12 @@ enum {
DND_CONN_ACTIVE_CODE, DND_CONN_ACTIVE_CODE,
}; };
enum {
DND_CREATE,
DND_ADD,
DND_DROP,
};
static int32_t mndCreateDefaultDnode(SMnode *pMnode); static int32_t mndCreateDefaultDnode(SMnode *pMnode);
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode); static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw); static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
@ -103,7 +109,10 @@ int32_t mndInitDnode(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
void mndCleanupDnode(SMnode *pMnode) {} SIpWhiteList *mndCreateIpWhiteOfDnode(SMnode *pMnode);
SIpWhiteList *mndAddIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
SIpWhiteList *mndRmIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
void mndCleanupDnode(SMnode *pMnode) {}
static int32_t mndCreateDefaultDnode(SMnode *pMnode) { static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
int32_t code = -1; int32_t code = -1;
@ -130,6 +139,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
mndUpdateIpWhite(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD, 1);
_OVER: _OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -518,13 +528,16 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
} }
} }
pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
int64_t curMs = taosGetTimestampMs(); int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pDnode, curMs); bool online = mndIsDnodeOnline(pDnode, curMs);
bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer); bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
bool reboot = (pDnode->rebootTime != statusReq.rebootTime); bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
bool supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes; bool supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged; bool needCheck =
!online || dnodeChanged || reboot || supportVnodesChanged || pMnode->ipWhiteVer != statusReq.ipWhiteVer;
const STraceId *trace = &pReq->info.traceId; const STraceId *trace = &pReq->info.traceId;
mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id, mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
@ -645,6 +658,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
} }
mndGetDnodeEps(pMnode, statusRsp.pDnodeEps); mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp); int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
void *pHead = rpcMallocCont(contLen); void *pHead = rpcMallocCont(contLen);
@ -691,6 +705,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
mndUpdateIpWhite(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD, 1);
_OVER: _OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
@ -983,6 +998,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
mndUpdateIpWhite(pMnode, TSDB_DEFAULT_USER, pDnode->fqdn, IP_WHITE_DROP, 1);
code = 0; code = 0;
_OVER: _OVER:
@ -1455,3 +1471,19 @@ _err:
terrno = TSDB_CODE_INVALID_CFG; terrno = TSDB_CODE_INVALID_CFG;
return -1; return -1;
} }
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
SDnodeObj *pObj = NULL;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
SArray *fqdns = taosArrayInit(4, sizeof(void *));
while (1) {
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
if (pIter == NULL) break;
char *fqdn = taosStrdup(pObj->fqdn);
taosArrayPush(fqdns, &fqdn);
sdbRelease(pSdb, pObj);
}
return fqdns;
}

View File

@ -28,10 +28,33 @@ int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType
int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname) { int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname) {
return 0; return 0;
} }
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic) { return 0; } int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic) { return 0; }
int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName) { int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName) {
return 0; return 0;
} }
// TODO: for community version use the commented version
int32_t mndSetUserWhiteListRsp(SMnode *pMnode, SUserObj *pUser, SGetUserWhiteListRsp *pWhiteListRsp) {
memcpy(pWhiteListRsp->user, pUser->user, TSDB_USER_LEN);
pWhiteListRsp->numWhiteLists = 1;
pWhiteListRsp->pWhiteLists = taosMemoryMalloc(pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range));
if (pWhiteListRsp->pWhiteLists == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memset(pWhiteListRsp->pWhiteLists, 0, pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range));
// pWhiteListRsp->numWhiteLists = pUser->pIpWhiteList->num;
// pWhiteListRsp->pWhiteLists = taosMemoryMalloc(pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range));
// if (pWhiteListRsp->pWhiteLists == NULL) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
// memcpy(pWhiteListRsp->pWhiteLists, pUser->pIpWhiteList->pIpRange,
// pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range));
return 0;
}
int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp) { int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp) {
memcpy(pRsp->user, pUser->user, TSDB_USER_LEN); memcpy(pRsp->user, pUser->user, TSDB_USER_LEN);
pRsp->superAuth = 1; pRsp->superAuth = 1;
@ -39,6 +62,14 @@ int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp
pRsp->sysInfo = pUser->sysInfo; pRsp->sysInfo = pUser->sysInfo;
pRsp->version = pUser->authVersion; pRsp->version = pUser->authVersion;
pRsp->passVer = pUser->passVersion; pRsp->passVer = pUser->passVersion;
pRsp->whiteListVer = pUser->ipWhiteListVer;
return 0; return 0;
} }
#endif
int32_t mndEnableIpWhiteList(SMnode *pMnode) { return 1; }
int32_t mndFetchIpWhiteList(SIpWhiteList *ipList, char **buf) {
*buf = NULL;
return 0;
}
#endif

View File

@ -290,6 +290,7 @@ _CONNECT:
connectRsp.svrTimestamp = taosGetTimestampSec(); connectRsp.svrTimestamp = taosGetTimestampSec();
connectRsp.passVer = pUser->passVersion; connectRsp.passVer = pUser->passVersion;
connectRsp.authVer = pUser->authVersion; connectRsp.authVer = pUser->authVersion;
connectRsp.whiteListVer = pUser->ipWhiteListVer;
strcpy(connectRsp.sVer, version); strcpy(connectRsp.sVer, version);
snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,

View File

@ -17,6 +17,7 @@
#include "mndSync.h" #include "mndSync.h"
#include "mndCluster.h" #include "mndCluster.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h"
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) { if (pMsg == NULL || pMsg->pCont == NULL) {
@ -167,7 +168,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
SSdbRaw *pRaw = pMsg->pCont; SSdbRaw *pRaw = pMsg->pCont;
STrans *pTrans = NULL; STrans *pTrans = NULL;
int32_t code = -1; int32_t code = -1;
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
if (transId <= 0) { if (transId <= 0) {
mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq); mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
@ -304,6 +305,7 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
} else { } else {
mInfo("vgId:1, sync restore finished"); mInfo("vgId:1, sync restore finished");
} }
mndRefreshUserIpWhiteList(pMnode);
ASSERT(commitIdx == mndSyncAppliedIndex(pFsm)); ASSERT(commitIdx == mndSyncAppliedIndex(pFsm));
} }

File diff suppressed because it is too large Load Diff

View File

@ -13,8 +13,8 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <uv.h>
#include <regex.h> #include <regex.h>
#include <uv.h>
#include "parAst.h" #include "parAst.h"
#include "parUtil.h" #include "parUtil.h"
@ -1666,8 +1666,8 @@ SNode* createShowTableTagsStmt(SAstCreateContext* pCxt, SNode* pTbName, SNode* p
static int32_t getIpV4RangeFromWhitelistItem(char* ipRange, SIpV4Range* pIpRange) { static int32_t getIpV4RangeFromWhitelistItem(char* ipRange, SIpV4Range* pIpRange) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
char* ipCopy = taosStrdup(ipRange); char* ipCopy = taosStrdup(ipRange);
char* slash = strchr(ipCopy, '/'); char* slash = strchr(ipCopy, '/');
if (slash) { if (slash) {
*slash = '\0'; *slash = '\0';
struct in_addr addr; struct in_addr addr;
@ -1675,11 +1675,9 @@ static int32_t getIpV4RangeFromWhitelistItem(char* ipRange, SIpV4Range* pIpRange
int prefix = atoi(slash + 1); int prefix = atoi(slash + 1);
if (prefix < 0 || prefix > 32) { if (prefix < 0 || prefix > 32) {
code = TSDB_CODE_PAR_INVALID_IP_RANGE; code = TSDB_CODE_PAR_INVALID_IP_RANGE;
} else { } else {
pIpRange->ip = addr.s_addr; pIpRange->ip = addr.s_addr;
uint32_t mask = (1 << (32 - prefix)) - 1; pIpRange->mask = prefix;
mask = htonl(~mask);
pIpRange->mask = mask;
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
} }
} else { } else {
@ -1689,7 +1687,7 @@ static int32_t getIpV4RangeFromWhitelistItem(char* ipRange, SIpV4Range* pIpRange
struct in_addr addr; struct in_addr addr;
if (uv_inet_pton(AF_INET, ipCopy, &addr) == 0) { if (uv_inet_pton(AF_INET, ipCopy, &addr) == 0) {
pIpRange->ip = addr.s_addr; pIpRange->ip = addr.s_addr;
pIpRange->mask = 0xFFFFFFFF; pIpRange->mask = 32;
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
} else { } else {
code = TSDB_CODE_PAR_INVALID_IP_RANGE; code = TSDB_CODE_PAR_INVALID_IP_RANGE;
@ -1697,7 +1695,7 @@ static int32_t getIpV4RangeFromWhitelistItem(char* ipRange, SIpV4Range* pIpRange
} }
taosMemoryFreeClear(ipCopy); taosMemoryFreeClear(ipCopy);
return code; return code;
} }
static int32_t fillIpRangesFromWhiteList(SAstCreateContext* pCxt, SNodeList* pIpRangesNodeList, SIpV4Range* pIpRanges) { static int32_t fillIpRangesFromWhiteList(SAstCreateContext* pCxt, SNodeList* pIpRangesNodeList, SIpV4Range* pIpRanges) {
@ -1769,7 +1767,7 @@ SNode* createAlterUserStmt(SAstCreateContext* pCxt, SToken* pUserName, int8_t al
pStmt->alterType = alterType; pStmt->alterType = alterType;
switch (alterType) { switch (alterType) {
case TSDB_ALTER_USER_PASSWD: { case TSDB_ALTER_USER_PASSWD: {
char password[TSDB_USET_PASSWORD_LEN] = {0}; char password[TSDB_USET_PASSWORD_LEN] = {0};
SToken* pVal = pAlterInfo; SToken* pVal = pAlterInfo;
if (!checkPassword(pCxt, pVal, password)) { if (!checkPassword(pCxt, pVal, password)) {
nodesDestroyNode((SNode*)pStmt); nodesDestroyNode((SNode*)pStmt);
@ -1788,7 +1786,7 @@ SNode* createAlterUserStmt(SAstCreateContext* pCxt, SToken* pUserName, int8_t al
pStmt->sysinfo = taosStr2Int8(pVal->z, NULL, 10); pStmt->sysinfo = taosStr2Int8(pVal->z, NULL, 10);
break; break;
} }
case TSDB_ALTER_USER_ADD_WHITE_LIST: case TSDB_ALTER_USER_ADD_WHITE_LIST:
case TSDB_ALTER_USER_DROP_WHITE_LIST: { case TSDB_ALTER_USER_DROP_WHITE_LIST: {
SNodeList* pIpRangesNodeList = pAlterInfo; SNodeList* pIpRangesNodeList = pAlterInfo;
pStmt->pNodeListIpRanges = pIpRangesNodeList; pStmt->pNodeListIpRanges = pIpRangesNodeList;

View File

@ -29,6 +29,8 @@ extern "C" {
#include "ttrace.h" #include "ttrace.h"
#include "tutil.h" #include "tutil.h"
typedef bool (*FilteFunc)(void* arg);
typedef void* queue[2]; typedef void* queue[2];
/* Private macros. */ /* Private macros. */
#define QUEUE_NEXT(q) (*(queue**)&((*(q))[0])) #define QUEUE_NEXT(q) (*(queue**)&((*(q))[0]))
@ -303,11 +305,12 @@ void transUnrefCliHandle(void* handle);
int transReleaseCliHandle(void* handle); int transReleaseCliHandle(void* handle);
int transReleaseSrvHandle(void* handle); int transReleaseSrvHandle(void* handle);
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
int transSendResponse(const STransMsg* msg); int transSendResponse(const STransMsg* msg);
int transRegisterMsg(const STransMsg* msg); int transRegisterMsg(const STransMsg* msg);
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
void transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func);
int transSockInfo2Str(struct sockaddr* sockname, char* dst); int transSockInfo2Str(struct sockaddr* sockname, char* dst);
@ -434,6 +437,23 @@ int32_t transGetRefMgt();
int32_t transGetInstMgt(); int32_t transGetInstMgt();
void transHttpEnvDestroy(); void transHttpEnvDestroy();
typedef struct {
uint32_t netmask;
uint32_t address;
uint32_t network;
uint32_t broadcast;
char info[32];
int8_t type;
} SubnetUtils;
int32_t subnetInit(SubnetUtils* pUtils, SIpV4Range* pRange);
int32_t subnetCheckIp(SubnetUtils* pUtils, uint32_t ip);
int32_t subnetDebugInfoToBuf(SubnetUtils* pUtils, char* buf);
int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf);
int32_t transUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -178,13 +178,21 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle
int rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); } int rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); }
int rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); } int rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); }
// client only
int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
// later // later
return transSetDefaultAddr(thandle, ip, fqdn); return transSetDefaultAddr(thandle, ip, fqdn);
} }
// server only
void rpcSetIpWhite(void* thandle, void* arg) { transSetIpWhiteList(thandle, arg, NULL); }
void* rpcAllocHandle() { return (void*)transAllocHandle(); } void* rpcAllocHandle() { return (void*)transAllocHandle(); }
int32_t rpcUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) { return transUtilSIpRangeToStr(pRange, buf); }
int32_t rpcUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf) {
return transUtilSWhiteListToStr(pWhiteList, ppBuf);
}
int32_t rpcInit() { int32_t rpcInit() {
transInit(); transInit();
return 0; return 0;

View File

@ -654,3 +654,106 @@ void transDestoryExHandle(void* handle) {
} }
taosMemoryFree(handle); taosMemoryFree(handle);
} }
// void subnetIp2int(const char* const ip_addr, uint8_t* dst) {
// char ip_addr_cpy[20];
// char ip[5];
// tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));
// char *s_start, *s_end;
// s_start = ip_addr_cpy;
// s_end = ip_addr_cpy;
// int32_t k = 0;
// for (k = 0; *s_start != '\0'; s_start = s_end) {
// for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
// }
// if (*s_end == '.') {
// *s_end = '\0';
// s_end++;
// }
// dst[k++] = (char)atoi(s_start);
// }
// }
uint32_t subnetIpRang2Int(SIpV4Range* pRange) {
uint32_t ip = pRange->ip;
return ((ip & 0xFF) << 24) | ((ip & 0xFF00) << 8) | ((ip & 0xFF0000) >> 8) | ((ip >> 24) & 0xFF);
}
int32_t subnetInit(SubnetUtils* pUtils, SIpV4Range* pRange) {
if (pRange->mask == 32) {
pUtils->type = 0;
pUtils->address = pRange->ip;
return 0;
}
pUtils->address = subnetIpRang2Int(pRange);
for (int i = 0; i < pRange->mask; i++) {
pUtils->netmask |= (1 << (31 - i));
}
pUtils->network = pUtils->address & pUtils->netmask;
pUtils->broadcast = (pUtils->network) | (pUtils->netmask ^ 0xFFFFFFFF);
pUtils->type = (pRange->mask == 32 ? 0 : 1);
return 0;
}
int32_t subnetDebugInfoToBuf(SubnetUtils* pUtils, char* buf) {
sprintf(buf, "raw: %s, address: %d, netmask:%d, network:%d, broadcast:%d", pUtils->info, pUtils->address,
pUtils->netmask, pUtils->network, pUtils->broadcast);
return 0;
}
int32_t subnetCheckIp(SubnetUtils* pUtils, uint32_t ip) {
// impl later
if (pUtils == NULL) return false;
if (pUtils->type == 0) {
return pUtils->address == ip;
} else {
SIpV4Range range = {.ip = ip, .mask = 32};
uint32_t t = subnetIpRang2Int(&range);
return t >= pUtils->network && t <= pUtils->broadcast;
}
}
int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) {
int32_t len = 0;
struct in_addr addr;
addr.s_addr = pRange->ip;
uv_inet_ntop(AF_INET, &addr, buf, 32);
len = strlen(buf);
if (pRange->mask != 32) {
len += sprintf(buf + len, "/%d", pRange->mask);
}
buf[len] = 0;
return len;
}
int32_t transUtilSWhiteListToStr(SIpWhiteList* pList, char** ppBuf) {
if (pList->num == 0) {
*ppBuf = NULL;
return 0;
}
int32_t len = 0;
char* pBuf = taosMemoryCalloc(1, pList->num * 36);
for (int i = 0; i < pList->num; i++) {
SIpV4Range* pRange = &pList->pIpRange[i];
char tbuf[32] = {0};
int tlen = transUtilSIpRangeToStr(pRange, tbuf);
len += sprintf(pBuf + len, "%s,", tbuf);
}
if (len > 0) {
pBuf[len - 1] = 0;
}
*ppBuf = pBuf;
return len;
}

View File

@ -43,6 +43,7 @@ typedef struct SSvrConn {
ConnStatus status; ConnStatus status;
uint32_t serverIp;
uint32_t clientIp; uint32_t clientIp;
uint16_t port; uint16_t port;
@ -55,6 +56,8 @@ typedef struct SSvrConn {
char user[TSDB_UNI_LEN]; // user ID for the link char user[TSDB_UNI_LEN]; // user ID for the link
char secret[TSDB_PASSWORD_LEN]; char secret[TSDB_PASSWORD_LEN];
char ckey[TSDB_PASSWORD_LEN]; // ciphering key char ckey[TSDB_PASSWORD_LEN]; // ciphering key
int64_t whiteListVer;
} SSvrConn; } SSvrConn;
typedef struct SSvrMsg { typedef struct SSvrMsg {
@ -62,8 +65,22 @@ typedef struct SSvrMsg {
STransMsg msg; STransMsg msg;
queue q; queue q;
STransMsgType type; STransMsgType type;
void* arg;
FilteFunc func;
} SSvrMsg; } SSvrMsg;
typedef struct {
int64_t ver;
SIpWhiteList* pList;
// SArray* list;
} SWhiteUserList;
typedef struct {
SHashObj* pList;
int64_t ver;
} SIpWhiteListTab;
typedef struct SWorkThrd { typedef struct SWorkThrd {
TdThread thread; TdThread thread;
uv_connect_t connect_req; uv_connect_t connect_req;
@ -77,6 +94,10 @@ typedef struct SWorkThrd {
queue conn; queue conn;
void* pTransInst; void* pTransInst;
bool quit; bool quit;
SIpWhiteListTab* pWhiteList;
int64_t whiteListVer;
int8_t enableIpWhiteList;
} SWorkThrd; } SWorkThrd;
typedef struct SServerObj { typedef struct SServerObj {
@ -99,6 +120,14 @@ typedef struct SServerObj {
bool inited; bool inited;
} SServerObj; } SServerObj;
SIpWhiteListTab* uvWhiteListCreate();
void uvWhiteListDestroy(SIpWhiteListTab* pWhite);
void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* pList, int64_t ver);
void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable);
bool uvWhiteListCheckConn(SIpWhiteListTab* pWhite, SSvrConn* pConn);
bool uvWhiteListFilte(SIpWhiteListTab* pWhite, char* user, uint32_t ip, int64_t ver);
void uvWhiteListSetConnVer(SIpWhiteListTab* pWhite, SSvrConn* pConn);
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
@ -141,8 +170,9 @@ static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleUpdate(SSvrMsg* pMsg, SWorkThrd* thrd);
static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
uvHandleRegister, NULL}; uvHandleRegister, uvHandleUpdate};
static void uvDestroyConn(uv_handle_t* handle); static void uvDestroyConn(uv_handle_t* handle);
@ -181,8 +211,134 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
tDebug("%p timeout since no activity", conn); tDebug("%p timeout since no activity", conn);
} }
static bool uvCheckIp(SIpV4Range* pRange, int32_t ip) {
// impl later
SubnetUtils subnet = {0};
if (subnetInit(&subnet, pRange) != 0) {
return false;
}
return subnetCheckIp(&subnet, ip);
}
SIpWhiteListTab* uvWhiteListCreate() {
SIpWhiteListTab* pWhiteList = taosMemoryCalloc(1, sizeof(SIpWhiteListTab));
pWhiteList->pList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK);
pWhiteList->ver = -1;
return pWhiteList;
}
void uvWhiteListDestroy(SIpWhiteListTab* pWhite) {
SHashObj* pWhiteList = pWhite->pList;
void* pIter = taosHashIterate(pWhiteList, NULL);
while (pIter) {
SWhiteUserList* pUserList = *(SWhiteUserList**)pIter;
taosMemoryFree(pUserList->pList);
taosMemoryFree(pUserList);
pIter = taosHashIterate(pWhiteList, pIter);
}
taosHashCleanup(pWhiteList);
taosMemoryFree(pWhite);
}
void uvWhiteListToStr(SWhiteUserList* plist, char* user, char** ppBuf) {
char* tmp = NULL;
int32_t tlen = transUtilSWhiteListToStr(plist->pList, &tmp);
char* pBuf = taosMemoryCalloc(1, tlen + 64);
int32_t len = sprintf(pBuf, "user: %s, ver: %" PRId64 ", ip: {%s}", user, plist->ver, tmp);
taosMemoryFree(tmp);
*ppBuf = pBuf;
}
void uvWhiteListDebug(SIpWhiteListTab* pWrite) {
SHashObj* pWhiteList = pWrite->pList;
void* pIter = taosHashIterate(pWhiteList, NULL);
while (pIter) {
size_t klen = 0;
char user[TSDB_USER_LEN + 1] = {0};
char* pUser = taosHashGetKey(pIter, &klen);
memcpy(user, pUser, klen);
SWhiteUserList* pUserList = *(SWhiteUserList**)pIter;
char* buf = NULL;
uvWhiteListToStr(pUserList, user, &buf);
tDebug("ip-white-list %s", buf);
taosMemoryFree(buf);
pIter = taosHashIterate(pWhiteList, pIter);
}
}
void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* plist, int64_t ver) {
SHashObj* pWhiteList = pWhite->pList;
SWhiteUserList** ppUserList = taosHashGet(pWhiteList, user, strlen(user));
if (ppUserList == NULL || *ppUserList == NULL) {
SWhiteUserList* pUserList = taosMemoryCalloc(1, sizeof(SWhiteUserList));
pUserList->ver = ver;
pUserList->pList = plist;
taosHashPut(pWhiteList, user, strlen(user), &pUserList, sizeof(void*));
} else {
SWhiteUserList* pUserList = *ppUserList;
taosMemoryFreeClear(pUserList->pList);
pUserList->ver = ver;
pUserList->pList = plist;
}
uvWhiteListDebug(pWhite);
}
void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable) {
pWhite->ver++;
// impl later
}
static bool uvWhiteListIsDefaultAddr(uint32_t ip) {
// 127.0.0.1
static SIpV4Range range = {.ip = 16777343, .mask = 32};
return range.ip == ip;
}
bool uvWhiteListFilte(SIpWhiteListTab* pWhite, char* user, uint32_t ip, int64_t ver) {
// impl check
SHashObj* pWhiteList = pWhite->pList;
bool valid = false;
if (uvWhiteListIsDefaultAddr(ip)) return true;
SWhiteUserList** ppList = taosHashGet(pWhiteList, user, strlen(user));
if (ppList == NULL || *ppList == NULL) {
return false;
}
SWhiteUserList* pUserList = *ppList;
if (pUserList->ver == ver) return true;
SIpWhiteList* pIpWhiteList = pUserList->pList;
for (int i = 0; i < pIpWhiteList->num; i++) {
SIpV4Range* range = &pIpWhiteList->pIpRange[i];
if (uvCheckIp(range, ip)) {
valid = true;
break;
}
}
return valid;
}
bool uvWhiteListCheckConn(SIpWhiteListTab* pWhite, SSvrConn* pConn) {
if (pConn->inType == TDMT_MND_STATUS || pConn->inType == TDMT_MND_RETRIEVE_IP_WHITE ||
pConn->serverIp == pConn->clientIp ||
pWhite->ver == pConn->whiteListVer /*|| strncmp(pConn->user, "_dnd", strlen("_dnd")) == 0*/)
return true;
return uvWhiteListFilte(pWhite, pConn->user, pConn->clientIp, pConn->whiteListVer);
}
void uvWhiteListSetConnVer(SIpWhiteListTab* pWhite, SSvrConn* pConn) {
// if conn already check by current whiteLis
pConn->whiteListVer = pWhite->ver;
}
static bool uvHandleReq(SSvrConn* pConn) { static bool uvHandleReq(SSvrConn* pConn) {
STrans* pTransInst = pConn->pTransInst; STrans* pTransInst = pConn->pTransInst;
SWorkThrd* pThrd = pConn->hostThrd;
STransMsgHead* pHead = NULL; STransMsgHead* pHead = NULL;
@ -199,8 +355,17 @@ static bool uvHandleReq(SSvrConn* pConn) {
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen); pHead->msgLen = htonl(pHead->msgLen);
pConn->inType = pHead->msgType;
memcpy(pConn->user, pHead->user, strlen(pHead->user)); memcpy(pConn->user, pHead->user, strlen(pHead->user));
int8_t forbiddenIp = 0;
if (pThrd->enableIpWhiteList) {
forbiddenIp = !uvWhiteListCheckConn(pThrd->pWhiteList, pConn) ? 1 : 0;
if (forbiddenIp == 0) {
uvWhiteListSetConnVer(pThrd->pWhiteList, pConn);
}
}
if (uvRecvReleaseReq(pConn, pHead)) { if (uvRecvReleaseReq(pConn, pHead)) {
return true; return true;
} }
@ -219,7 +384,6 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.msgType = pHead->msgType; transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code; transMsg.code = pHead->code;
pConn->inType = pHead->msgType;
if (pConn->status == ConnNormal) { if (pConn->status == ConnNormal) {
if (pHead->persist == 1) { if (pHead->persist == 1) {
pConn->status = ConnAcquire; pConn->status = ConnAcquire;
@ -262,6 +426,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.info.refId = pConn->refId; transMsg.info.refId = pConn->refId;
transMsg.info.traceId = pHead->traceId; transMsg.info.traceId = pHead->traceId;
transMsg.info.cliVer = htonl(pHead->compatibilityVer); transMsg.info.cliVer = htonl(pHead->compatibilityVer);
transMsg.info.forbiddenIp = forbiddenIp;
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn, tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn,
pConn->refId); pConn->refId);
@ -395,7 +560,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
if (status == 0) { if (status == 0) {
tTrace("success to dispatch conn to work thread"); tTrace("success to dispatch conn to work thread");
} else { } else {
tError("fail to dispatch conn to work thread, code:%s", uv_strerror(status)); tError("fail to dispatch conn to work thread, reason:%s", uv_strerror(status));
} }
if (!uv_is_closing((uv_handle_t*)req->data)) { if (!uv_is_closing((uv_handle_t*)req->data)) {
uv_close((uv_handle_t*)req->data, uvFreeCb); uv_close((uv_handle_t*)req->data, uvFreeCb);
@ -542,7 +707,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
} }
// release handle to rpc init // release handle to rpc init
if (msg->type == Quit) { if (msg->type == Quit || msg->type == Update) {
(*transAsyncHandle[msg->type])(msg, pThrd); (*transAsyncHandle[msg->type])(msg, pThrd);
} else { } else {
STransMsg transMsg = msg->msg; STransMsg transMsg = msg->msg;
@ -638,7 +803,7 @@ static void uvPrepareCb(uv_prepare_t* handle) {
continue; continue;
} }
// release handle to rpc init // release handle to rpc init
if (msg->type == Quit) { if (msg->type == Quit || msg->type == Update) {
(*transAsyncHandle[msg->type])(msg, pThrd); (*transAsyncHandle[msg->type])(msg, pThrd);
continue; continue;
} else { } else {
@ -796,7 +961,10 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
transSockInfo2Str(&sockname, pConn->src); transSockInfo2Str(&sockname, pConn->src);
struct sockaddr_in addr = *(struct sockaddr_in*)&peername; struct sockaddr_in addr = *(struct sockaddr_in*)&peername;
struct sockaddr_in saddr = *(struct sockaddr_in*)&sockname;
pConn->clientIp = addr.sin_addr.s_addr; pConn->clientIp = addr.sin_addr.s_addr;
pConn->serverIp = saddr.sin_addr.s_addr;
pConn->port = ntohs(addr.sin_port); pConn->port = ntohs(addr.sin_port);
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb); uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
@ -1066,9 +1234,10 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd)); SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));
thrd->pTransInst = shandle; thrd->pTransInst = shandle;
thrd->quit = false; thrd->quit = false;
srv->pThreadObj[i] = thrd;
thrd->pTransInst = shandle; thrd->pTransInst = shandle;
thrd->pWhiteList = uvWhiteListCreate();
srv->pThreadObj[i] = thrd;
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
thrd->pipe = &(srv->pipe[i][1]); // init read thrd->pipe = &(srv->pipe[i][1]); // init read
@ -1093,6 +1262,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
thrd->pTransInst = shandle; thrd->pTransInst = shandle;
thrd->quit = false; thrd->quit = false;
thrd->pTransInst = shandle; thrd->pTransInst = shandle;
thrd->pWhiteList = uvWhiteListCreate();
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
srv->pThreadObj[i] = thrd; srv->pThreadObj[i] = thrd;
@ -1192,6 +1362,31 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) {
taosMemoryFree(msg); taosMemoryFree(msg);
} }
} }
void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
SUpdateIpWhite* req = msg->arg;
if (req != NULL) {
for (int i = 0; i < req->numOfUser; i++) {
SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i];
int32_t sz = pUser->numOfRange * sizeof(SIpV4Range);
SIpWhiteList* pList = taosMemoryCalloc(1, sz + sizeof(SIpWhiteList));
pList->num = pUser->numOfRange;
memcpy(pList->pIpRange, pUser->pIpRanges, sz);
uvWhiteListAdd(thrd->pWhiteList, pUser->user, pList, pUser->ver);
}
thrd->pWhiteList->ver = req->ver;
thrd->enableIpWhiteList = 1;
tFreeSUpdateIpWhiteReq(req);
taosMemoryFree(req);
} else {
thrd->enableIpWhiteList = 0;
}
taosMemoryFree(msg);
return;
}
void destroyWorkThrd(SWorkThrd* pThrd) { void destroyWorkThrd(SWorkThrd* pThrd) {
if (pThrd == NULL) { if (pThrd == NULL) {
return; return;
@ -1200,6 +1395,9 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
SRV_RELEASE_UV(pThrd->loop); SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg);
transAsyncPoolDestroy(pThrd->asyncPool); transAsyncPoolDestroy(pThrd->asyncPool);
uvWhiteListDestroy(pThrd->pWhiteList);
taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
@ -1367,5 +1565,23 @@ _return2:
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
return -1; return -1;
} }
void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)thandle);
tDebug("ip-white-list update on rpc");
SServerObj* svrObj = pTransInst->tcphandle;
for (int i = 0; i < svrObj->numOfThreads; i++) {
SWorkThrd* pThrd = svrObj->pThreadObj[i];
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
SUpdateIpWhite* pReq = (arg != NULL ? cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg) : NULL);
msg->type = Update;
msg->arg = pReq;
transAsyncSend(pThrd->asyncPool, &msg->q);
}
transReleaseExHandle(transGetInstMgt(), (int64_t)thandle);
}
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; } int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }

View File

@ -746,9 +746,9 @@ bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
serverAdd.sin_port = (uint16_t)htons(port); serverAdd.sin_port = (uint16_t)htons(port);
fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd < 0) { // exception if (fd < 0) { // exception
return false; return false;
} else if (fd <= 2) { // in, out, err } else if (fd <= 2) { // in, out, err
taosCloseSocketNoCheck1(fd); taosCloseSocketNoCheck1(fd);
return false; return false;
} }
@ -895,32 +895,6 @@ int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len
} }
// Function converting an IP address string to an uint32_t. // Function converting an IP address string to an uint32_t.
uint32_t ip2uint(const char *const ip_addr) {
char ip_addr_cpy[20];
char ip[5];
tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));
char *s_start, *s_end;
s_start = ip_addr_cpy;
s_end = ip_addr_cpy;
int32_t k;
for (k = 0; *s_start != '\0'; s_start = s_end) {
for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
}
if (*s_end == '.') {
*s_end = '\0';
s_end++;
}
ip[k++] = (char)atoi(s_start);
}
ip[k] = '\0';
return *((uint32_t *)ip);
}
#endif // endif 0 #endif // endif 0

View File

@ -100,6 +100,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_DATA_FMT, "Invalid data format") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_DATA_FMT, "Invalid data format")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG_VALUE, "Invalid configuration value") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG_VALUE, "Invalid configuration value")
TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connect")
//client //client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
@ -193,6 +194,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_USERS, "Too many users")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ALTER_OPER, "Invalid alter operation") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ALTER_OPER, "Invalid alter operation")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_AUTH_FAILURE, "Authentication failure") TAOS_DEFINE_ERROR(TSDB_CODE_MND_AUTH_FAILURE, "Authentication failure")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_PRIVILEDGE_EXIST, "User already have this priviledge") TAOS_DEFINE_ERROR(TSDB_CODE_MND_PRIVILEDGE_EXIST, "User already have this priviledge")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_HOST_EXIST, "Host already exist in ip white list")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_HOST_NOT_EXIST, "Host not exist in ip white list")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_USER_HOST, "Too many host in ip white list")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_LOCAL_HOST_NOT_DROP, "Host can not be dropped")
//mnode-stable-part1 //mnode-stable-part1
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "STable already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "STable already exists")

View File

@ -16,6 +16,7 @@ exe:
gcc $(CFLAGS) ./dbTableRoute.c -o $(ROOT)dbTableRoute $(LFLAGS) gcc $(CFLAGS) ./dbTableRoute.c -o $(ROOT)dbTableRoute $(LFLAGS)
gcc $(CFLAGS) ./insertSameTs.c -o $(ROOT)insertSameTs $(LFLAGS) gcc $(CFLAGS) ./insertSameTs.c -o $(ROOT)insertSameTs $(LFLAGS)
gcc $(CFLAGS) ./passwdTest.c -o $(ROOT)passwdTest $(LFLAGS) gcc $(CFLAGS) ./passwdTest.c -o $(ROOT)passwdTest $(LFLAGS)
gcc $(CFLAGS) ./whiteListTest.c -o $(ROOT)whiteListTest $(LFLAGS)
clean: clean:
rm $(ROOT)batchprepare rm $(ROOT)batchprepare
@ -23,3 +24,4 @@ clean:
rm $(ROOT)dbTableRoute rm $(ROOT)dbTableRoute
rm $(ROOT)insertSameTs rm $(ROOT)insertSameTs
rm $(ROOT)passwdTest rm $(ROOT)passwdTest
rm $(ROOT)whiteListTest

View File

@ -0,0 +1,170 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o demo demo.c -ltaos
/**
* passwdTest.c
* - Run the test case in clear TDengine environment with default root passwd 'taosdata'
*/
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "taos.h" // TAOS header file
#define nDup 1
#define nRoot 10
#define nUser 10
#define USER_LEN 24
#define BUF_LEN 1024
void createUsers(TAOS *taos, const char *host);
void dropUsers(TAOS* taos);
int nPassVerNotified = 0;
int nWhiteListVerNotified = 0;
TAOS *taosu[nRoot] = {0};
char users[nUser][USER_LEN] = {0};
void __taos_notify_cb(void *param, void *ext, int type) {
switch (type) {
case TAOS_NOTIFY_PASSVER: {
++nPassVerNotified;
printf("%s:%d type:%d user:%s ver:%d\n", __func__, __LINE__, type, param ? (char *)param : "NULL", *(int *)ext);
break;
}
case TAOS_NOTIFY_WHITELIST_VER: {
++nWhiteListVerNotified;
printf("%s:%d type:%d user:%s ver:%"PRId64 "\n", __func__, __LINE__, type, param ? (char *)param : "NULL", *(int64_t *)ext);
break;
}
default:
printf("%s:%d unknown type:%d\n", __func__, __LINE__, type);
break;
}
}
void __taos_async_whitelist_cb(void *param, int code, TAOS *taos, int numOfWhiteLists, uint64_t* pWhiteList) {
if (code == 0) {
printf("fetch whitelist cb. user: %s numofWhitelist: %d\n", param ? (char*)param : NULL, numOfWhiteLists);
for (int i = 0; i < numOfWhiteLists; ++i) {
printf(" %d: 0x%llx\n", i, pWhiteList[i]);
}
} else {
printf("fetch whitelist cb error %d\n", code);
}
}
static void queryDB(TAOS *taos, char *command) {
int i;
TAOS_RES *pSql = NULL;
int32_t code = -1;
for (i = 0; i < nDup; ++i) {
if (NULL != pSql) {
taos_free_result(pSql);
pSql = NULL;
}
pSql = taos_query(taos, command);
code = taos_errno(pSql);
if (0 == code) {
break;
}
}
if (code != 0) {
fprintf(stderr, "failed to run: %s, reason: %s\n", command, taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
exit(EXIT_FAILURE);
} else {
fprintf(stderr, "success to run: %s\n", command);
}
taos_free_result(pSql);
}
int main(int argc, char *argv[]) {
char qstr[1024];
// connect to server
if (argc < 2) {
printf("please input server-ip \n");
return 0;
}
TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL) {
printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/);
exit(1);
}
createUsers(taos, argv[1]);
while (nWhiteListVerNotified < 10) {
printf("white list update notified %d times\n", nWhiteListVerNotified);
sleep(1);
}
printf("succeed in getting white list nofication. %d times\n", nWhiteListVerNotified);
dropUsers(taos);
taos_close(taos);
taos_cleanup();
}
void dropUsers(TAOS *taos) {
char qstr[1024];
for (int i = 0; i < nUser; ++i) {
sprintf(users[i], "user%d", i);
sprintf(qstr, "DROP USER %s", users[i]);
queryDB(taos, qstr);
taos_close(taosu[i]);
}
}
void createUsers(TAOS *taos, const char *host) {
char qstr[1024];
// users
for (int i = 0; i < nUser; ++i) {
sprintf(users[i], "user%d", i);
sprintf(qstr, "CREATE USER %s PASS 'taosdata'", users[i]);
queryDB(taos, qstr);
taosu[i] = taos_connect(host, users[i], "taosdata", NULL, 0);
if (taosu[i] == NULL) {
printf("failed to connect to server, user:%s, reason:%s\n", users[i], "null taos" /*taos_errstr(taos)*/);
exit(1);
}
int code = taos_set_notify_cb(taosu[i], __taos_notify_cb, users[i], TAOS_NOTIFY_WHITELIST_VER);
if (code != 0) {
fprintf(stderr, "failed to run: taos_set_notify_cb for user:%s since %d\n", users[i], code);
} else {
fprintf(stderr, "success to run: taos_set_notify_cb for user:%s\n", users[i]);
}
// alter whitelist for users
sprintf(qstr, "alter user %s add host '%d.%d.%d.%d/24'", users[i], i, i, i, i);
queryDB(taos, qstr);
taos_fetch_whitelist_a(taosu[i], __taos_async_whitelist_cb, users[i]);
}
}

View File

@ -8,7 +8,7 @@ sql create user u_read pass 'taosdata1' host '127.0.0.1/24','192.168.1.0/24'
sql create user u_write pass 'taosdata1' host '127.0.0.1','192.168.1.0' sql create user u_write pass 'taosdata1' host '127.0.0.1','192.168.1.0'
sql alter user u_read add host '3.3.3.4/24' sql alter user u_read add host '3.3.3.4/24'
sql alter user u_write drop host '4.4.4.5/25' sql_error alter user u_write drop host '4.4.4.5/25'
sql show users sql show users
if $rows != 3 then if $rows != 3 then

View File

@ -217,7 +217,7 @@ class TDTestCase:
tdSql.checkEqual(20470,len(tdSql.queryResult)) tdSql.checkEqual(20470,len(tdSql.queryResult))
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdSql.checkEqual(193, len(tdSql.queryResult)) tdSql.checkEqual(194, len(tdSql.queryResult))
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(54, len(tdSql.queryResult)) tdSql.checkEqual(54, len(tdSql.queryResult))