Merge branch 'taosdata:3.0' into 3.0

This commit is contained in:
Alex Duan 2023-09-18 14:28:00 +08:00 committed by GitHub
commit 570ac79a79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 2109 additions and 263 deletions

View File

@ -125,6 +125,7 @@ typedef enum {
typedef enum {
TAOS_NOTIFY_PASSVER = 0,
TAOS_NOTIFY_WHITELIST_VER = 1
} TAOS_NOTIFY_TYPE;
#define RET_MSG_LENGTH 1024
@ -236,6 +237,9 @@ DLL_EXPORT void taos_set_hb_quit(int8_t quitByKill);
DLL_EXPORT int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type);
typedef void (*__taos_async_whitelist_fn_t)(void *param, int code, TAOS *taos, int numOfWhiteLists, uint64_t* pWhiteLists);
DLL_EXPORT void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *param);
/* --------------------------schemaless INTERFACE------------------------------- */
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);

View File

@ -849,6 +849,7 @@ typedef struct {
int32_t authVer;
char sVer[TSDB_VERSION_LEN];
char sDetailVer[128];
int64_t whiteListVer;
} SConnectRsp;
int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
@ -875,11 +876,17 @@ typedef struct {
int32_t tSerializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
int32_t tDeserializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
typedef struct SIpV4Range {
uint32_t ip;
uint32_t mask;
typedef struct SIpV4Range{
uint32_t ip;
uint32_t mask;
} SIpV4Range;
typedef struct {
int32_t num;
SIpV4Range pIpRange[];
} SIpWhiteList;
SIpWhiteList* cloneIpWhiteList(SIpWhiteList* pIpWhiteList);
typedef struct {
int8_t createType;
int8_t superUser; // denote if it is a super user or not
@ -895,6 +902,30 @@ int32_t tSerializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq
int32_t tDeserializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq);
void tFreeSCreateUserReq(SCreateUserReq* pReq);
typedef struct {
int64_t ver;
char user[TSDB_USER_LEN];
int32_t numOfRange;
SIpV4Range* pIpRanges;
} SUpdateUserIpWhite;
typedef struct {
int64_t ver;
int numOfUser;
SUpdateUserIpWhite* pUserIpWhite;
} SUpdateIpWhite;
int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq);
SUpdateIpWhite* cloneSUpdateIpWhiteReq(SUpdateIpWhite* pReq);
typedef struct {
int64_t ipWhiteVer;
} SRetrieveIpWhiteReq;
int32_t tSerializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq);
int32_t tDeserializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteReq* pReq);
typedef struct {
int8_t alterType;
int8_t superUser;
@ -935,12 +966,30 @@ typedef struct {
SHashObj* readTbs;
SHashObj* writeTbs;
SHashObj* useDbs;
int64_t whiteListVer;
} SGetUserAuthRsp;
int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp);
typedef struct {
char user[TSDB_USER_LEN];
} SGetUserWhiteListReq;
int32_t tSerializeSGetUserWhiteListReq(void* buf, int32_t bufLen, SGetUserWhiteListReq* pReq);
int32_t tDeserializeSGetUserWhiteListReq(void* buf, int32_t bufLen, SGetUserWhiteListReq* pReq);
typedef struct {
char user[TSDB_USER_LEN];
int32_t numWhiteLists;
SIpV4Range* pWhiteLists;
} SGetUserWhiteListRsp;
int32_t tSerializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp);
int32_t tDeserializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp);
void tFreeSGetUserWhiteListRsp(SGetUserWhiteListRsp* pRsp);
/*
* for client side struct, only column id, type, bytes are necessary
* But for data in vnode side, we need all the following information.
@ -1455,6 +1504,7 @@ typedef struct {
SClusterCfg clusterCfg;
SArray* pVloads; // array of SVnodeLoad
int32_t statusSeq;
int64_t ipWhiteVer;
} SStatusReq;
int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
@ -1485,6 +1535,7 @@ typedef struct {
SDnodeCfg dnodeCfg;
SArray* pDnodeEps; // Array of SDnodeEp
int32_t statusSeq;
int64_t ipWhiteVer;
} SStatusRsp;
int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);

View File

@ -52,6 +52,7 @@ typedef struct {
void* data;
void* mgmt;
void* clientRpc;
void* serverRpc;
PutToQueueFp putToQueueFp;
GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp;

View File

@ -177,6 +177,8 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE, "retrieve-ip-white", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_WHITELIST, "get-user-whitelist", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)

View File

@ -37,6 +37,7 @@ typedef struct {
int64_t applyIndex;
uint64_t applyTerm;
char user[TSDB_USER_LEN];
} SRpcConnInfo;
typedef struct SRpcHandleInfo {
@ -60,6 +61,8 @@ typedef struct SRpcHandleInfo {
STraceId traceId;
SRpcConnInfo conn;
int8_t forbiddenIp;
} SRpcHandleInfo;
typedef struct SRpcMsg {
@ -162,6 +165,11 @@ int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg,
int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
void *rpcAllocHandle();
void rpcSetIpWhite(void *thandl, void *arg);
int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf);
int32_t rpcUtilSWhiteListToStr(SIpWhiteList *pWhiteList, char **ppBuf);
#ifdef __cplusplus
}

View File

@ -123,6 +123,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_INVALID_DATA_FMT TAOS_DEF_ERROR_CODE(0, 0x0132)
#define TSDB_CODE_INVALID_CFG_VALUE TAOS_DEF_ERROR_CODE(0, 0x0133)
#define TSDB_CODE_IP_NOT_IN_WHITE_LIST TAOS_DEF_ERROR_CODE(0, 0x0134)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
#define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201)
@ -245,6 +247,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_AUTH_FAILURE TAOS_DEF_ERROR_CODE(0, 0x0357)
#define TSDB_CODE_MND_USER_NOT_AVAILABLE TAOS_DEF_ERROR_CODE(0, 0x0358)
#define TSDB_CODE_MND_PRIVILEDGE_EXIST TAOS_DEF_ERROR_CODE(0, 0x0359)
#define TSDB_CODE_MND_USER_HOST_EXIST TAOS_DEF_ERROR_CODE(0, 0x035A)
#define TSDB_CODE_MND_USER_HOST_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x035B)
#define TSDB_CODE_MND_TOO_MANY_USER_HOST TAOS_DEF_ERROR_CODE(0, 0x035C)
#define TSDB_CODE_MND_USER_LOCAL_HOST_NOT_DROP TAOS_DEF_ERROR_CODE(0, 0x035D)
// mnode-stable-part1
#define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360)

View File

@ -184,7 +184,7 @@ typedef enum ELogicConditionType {
#define TSDB_UNI_LEN 24
#define TSDB_USER_LEN TSDB_UNI_LEN
#define TSDB_POINTER_PRINT_BYTES 18 // 0x1122334455667788
#define TSDB_POINTER_PRINT_BYTES 18 // 0x1122334455667788
// ACCOUNT is a 32 bit positive integer
// this is the length of its string representation, including the terminator zero
#define TSDB_ACCT_ID_LEN 11
@ -202,6 +202,7 @@ typedef enum ELogicConditionType {
#define TSDB_DB_NAME_LEN 65
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_PRIVILEDGE_CONDITION_LEN 48 * 1024
#define TSDB_PRIVILEDGE_HOST_LEN 48 * 1024
#define TSDB_FUNC_NAME_LEN 65
#define TSDB_FUNC_COMMENT_LEN 1024 * 1024
@ -417,7 +418,7 @@ typedef enum ELogicConditionType {
#define TSDB_EXPLAIN_RESULT_ROW_SIZE (16 * 1024)
#define TSDB_EXPLAIN_RESULT_COLUMN_NAME "QUERY_PLAN"
#define TSDB_MAX_FIELD_LEN 65519 // 16384:65519
#define TSDB_MAX_FIELD_LEN 65519 // 16384:65519
#define TSDB_MAX_BINARY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519
#define TSDB_MAX_NCHAR_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519
#define TSDB_MAX_GEOMETRY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519

View File

@ -135,6 +135,12 @@ typedef struct {
__taos_notify_fn_t fp;
} SPassInfo;
typedef struct {
int64_t ver;
void* param;
__taos_notify_fn_t fp;
} SWhiteListInfo;
typedef struct STscObj {
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
@ -152,6 +158,7 @@ typedef struct STscObj {
SAppInstInfo* pAppInfo;
SHashObj* pRequests;
SPassInfo passInfo;
SWhiteListInfo whiteListInfo;
} STscObj;
typedef struct STscDbg {

View File

@ -116,6 +116,19 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
atomic_load_32(&passInfo->ver), pTscObj->id);
}
}
if (pTscObj->whiteListInfo.fp) {
SWhiteListInfo *whiteListInfo = &pTscObj->whiteListInfo;
int64_t oldVer = atomic_load_64(&whiteListInfo->ver);
if (oldVer < pRsp->whiteListVer) {
atomic_store_64(&whiteListInfo->ver, pRsp->whiteListVer);
if (whiteListInfo->fp) {
(*whiteListInfo->fp)(whiteListInfo->param, &pRsp->whiteListVer, TAOS_NOTIFY_WHITELIST_VER);
}
tscDebug("update whitelist version of user %s from %"PRId64" to %"PRId64", tscRid:%" PRIi64, pRsp->user, oldVer,
atomic_load_64(&whiteListInfo->ver), pTscObj->id);
}
}
releaseTscObj(pReq->connKey.tscRid);
}
}

View File

@ -140,6 +140,13 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)
taosThreadMutexUnlock(&pObj->mutex);
break;
}
case TAOS_NOTIFY_WHITELIST_VER: {
taosThreadMutexLock(&pObj->mutex);
pObj->whiteListInfo.fp = fp;
pObj->whiteListInfo.param = param;
taosThreadMutexUnlock(&pObj->mutex);
break;
}
default: {
terrno = TSDB_CODE_INVALID_PARA;
releaseTscObj(*(int64_t *)taos);
@ -151,6 +158,113 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)
return 0;
}
typedef struct SFetchWhiteListInfo{
int64_t connId;
__taos_async_whitelist_fn_t userCbFn;
void* userParam;
} SFetchWhiteListInfo;
int32_t fetchWhiteListCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
SFetchWhiteListInfo* pInfo = (SFetchWhiteListInfo*)param;
TAOS* taos = &pInfo->connId;
if (code != TSDB_CODE_SUCCESS) {
pInfo->userCbFn(pInfo->userParam, code, taos, 0, NULL);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pInfo);
return code;
}
SGetUserWhiteListRsp wlRsp;
tDeserializeSGetUserWhiteListRsp(pMsg->pData, pMsg->len, &wlRsp);
uint64_t* pWhiteLists = taosMemoryMalloc(wlRsp.numWhiteLists * sizeof(uint64_t));
if (pWhiteLists == NULL) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pInfo);
tFreeSGetUserWhiteListRsp(&wlRsp);
}
for (int i = 0; i < wlRsp.numWhiteLists; ++i) {
pWhiteLists[i] = ((uint64_t)wlRsp.pWhiteLists[i].mask << 32) | wlRsp.pWhiteLists[i].ip;
}
pInfo->userCbFn(pInfo->userParam, code, taos, wlRsp.numWhiteLists, pWhiteLists);
taosMemoryFree(pWhiteLists);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pInfo);
tFreeSGetUserWhiteListRsp(&wlRsp);
return code;
}
void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *param) {
if (NULL == taos) {
fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
return;
}
int64_t connId = *(int64_t*)taos;
STscObj *pTsc = acquireTscObj(connId);
if (NULL == pTsc) {
fp(param, TSDB_CODE_TSC_DISCONNECTED, taos, 0, NULL);
return;
}
SGetUserWhiteListReq req;
memcpy(req.user, pTsc->user, TSDB_USER_LEN);
int32_t msgLen = tSerializeSGetUserWhiteListReq(NULL, 0, &req);
void* pReq = taosMemoryMalloc(msgLen);
if (pReq == NULL) {
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
releaseTscObj(connId);
return;
}
if (tSerializeSGetUserWhiteListReq(pReq, msgLen, &req) < 0) {
fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
taosMemoryFree(pReq);
releaseTscObj(connId);
return;
}
SFetchWhiteListInfo* pParam = taosMemoryMalloc(sizeof(SFetchWhiteListInfo));
if (pParam == NULL) {
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
taosMemoryFree(pReq);
releaseTscObj(connId);
return;
}
pParam->connId = connId;
pParam->userCbFn = fp;
pParam->userParam = param;
SMsgSendInfo* pSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pSendInfo == NULL) {
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
taosMemoryFree(pParam);
taosMemoryFree(pReq);
releaseTscObj(connId);
return;
}
pSendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = msgLen, .handle = NULL};
pSendInfo->requestId = generateRequestId();
pSendInfo->requestObjRefId = 0;
pSendInfo->param = pParam;
pSendInfo->fp = fetchWhiteListCallbackFn;
pSendInfo->msgType = TDMT_MND_GET_USER_WHITELIST;
int64_t transportId = 0;
SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp);
asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, &transportId, pSendInfo);
releaseTscObj(connId);
return;
}
void taos_close_internal(void *taos) {
if (taos == NULL) {
return;

View File

@ -139,6 +139,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
pTscObj->connType = connectRsp.connType;
pTscObj->passInfo.ver = connectRsp.passVer;
pTscObj->authVer = connectRsp.authVer;
pTscObj->whiteListInfo.ver = connectRsp.whiteListVer;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType);

View File

@ -221,6 +221,7 @@ static const SSysDbTableSchema userUsersSchema[] = {
{.name = "enable", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = false},
{.name = "sysinfo", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = false},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "allowed_host", .bytes = TSDB_PRIVILEDGE_HOST_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
GRANTS_SCHEMA;
@ -308,7 +309,7 @@ static const SSysDbTableSchema userUserPrivilegesSchema[] = {
{.name = "privilege", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "db_name", .bytes = TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "table_name", .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "condition", .bytes = TSDB_PRIVILEDGE_CONDITION_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "condition", .bytes = TSDB_PRIVILEDGE_CONDITION_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysTableMeta infosMeta[] = {

View File

@ -1116,6 +1116,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
}
if (tEncodeI64(&encoder, pReq->ipWhiteVer) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -1226,6 +1228,10 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
}
}
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI64(&decoder, &pReq->ipWhiteVer) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
@ -1258,6 +1264,8 @@ int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
}
if (tEncodeI32(&encoder, pRsp->statusSeq) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->ipWhiteVer) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -1300,6 +1308,10 @@ int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
}
if (tDecodeI32(&decoder, &pRsp->statusSeq) < 0) return -1;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI64(&decoder, &pRsp->ipWhiteVer) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
@ -1371,6 +1383,15 @@ int32_t tDeserializeSDropUserReq(void *buf, int32_t bufLen, SDropUserReq *pReq)
return 0;
}
SIpWhiteList *cloneIpWhiteList(SIpWhiteList *pIpWhiteList) {
if (pIpWhiteList == NULL) return NULL;
int32_t sz = sizeof(SIpWhiteList) + pIpWhiteList->num * sizeof(SIpV4Range);
SIpWhiteList *pNew = taosMemoryCalloc(1, sz);
memcpy(pNew, pIpWhiteList, sz);
return pNew;
}
int32_t tSerializeSCreateUserReq(void *buf, int32_t bufLen, SCreateUserReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -1414,12 +1435,123 @@ int32_t tDeserializeSCreateUserReq(void *buf, int32_t bufLen, SCreateUserReq *pR
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSCreateUserReq(SCreateUserReq *pReq) { taosMemoryFree(pReq->pIpRanges); }
int32_t tSerializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pReq) {
// impl later
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->ver) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfUser) < 0) return -1;
for (int i = 0; i < pReq->numOfUser; i++) {
SUpdateUserIpWhite *pUser = &(pReq->pUserIpWhite[i]);
if (tEncodeI64(&encoder, pUser->ver) < 0) return -1;
if (tEncodeCStr(&encoder, pUser->user) < 0) return -1;
if (tEncodeI32(&encoder, pUser->numOfRange) < 0) return -1;
for (int j = 0; j < pUser->numOfRange; j++) {
SIpV4Range *pRange = &pUser->pIpRanges[j];
if (tEncodeU32(&encoder, pRange->ip) < 0) return -1;
if (tEncodeU32(&encoder, pRange->mask) < 0) return -1;
}
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
// impl later
if (tDecodeI64(&decoder, &pReq->ver) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfUser) < 0) return -1;
pReq->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser);
for (int i = 0; i < pReq->numOfUser; i++) {
SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i];
if (tDecodeI64(&decoder, &pUserWhite->ver) < 0) return -1;
if (tDecodeCStrTo(&decoder, pUserWhite->user) < 0) return -1;
if (tDecodeI32(&decoder, &pUserWhite->numOfRange) < 0) return -1;
pUserWhite->pIpRanges = taosMemoryCalloc(1, pUserWhite->numOfRange * sizeof(SIpV4Range));
for (int j = 0; j < pUserWhite->numOfRange; j++) {
SIpV4Range *pRange = &pUserWhite->pIpRanges[j];
if (tDecodeU32(&decoder, &pRange->ip) < 0) return -1;
if (tDecodeU32(&decoder, &pRange->mask) < 0) return -1;
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
for (int i = 0; i < pReq->numOfUser; i++) {
SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i];
taosMemoryFree(pUserWhite->pIpRanges);
}
taosMemoryFree(pReq->pUserIpWhite);
// impl later
return;
}
SUpdateIpWhite *cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
SUpdateIpWhite *pClone = taosMemoryCalloc(1, sizeof(SUpdateIpWhite));
pClone->numOfUser = pReq->numOfUser;
pClone->ver = pReq->ver;
pClone->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser);
for (int i = 0; i < pReq->numOfUser; i++) {
SUpdateUserIpWhite *pNew = &pClone->pUserIpWhite[i];
SUpdateUserIpWhite *pOld = &pReq->pUserIpWhite[i];
pNew->ver = pOld->ver;
memcpy(pNew->user, pOld->user, strlen(pOld->user));
pNew->numOfRange = pOld->numOfRange;
int32_t sz = pOld->numOfRange * sizeof(SIpV4Range);
pNew->pIpRanges = taosMemoryCalloc(1, sz);
memcpy(pNew->pIpRanges, pOld->pIpRanges, sz);
}
return pClone;
}
int32_t tSerializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->ipWhiteVer) < 0) {
return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
// impl later
if (tDecodeI64(&decoder, &pReq->ipWhiteVer) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -1599,7 +1731,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
// since 3.0.7.0
if (tEncodeI32(pEncoder, pRsp->passVer) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->whiteListVer) < 0) return -1;
return 0;
}
@ -1731,6 +1863,11 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
} else {
pRsp->passVer = 0;
}
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI64(pDecoder, &pRsp->whiteListVer) < 0) goto _err;
} else {
pRsp->whiteListVer = 0;
}
}
return 0;
_err:
@ -1769,6 +1906,73 @@ void tFreeSGetUserAuthRsp(SGetUserAuthRsp *pRsp) {
taosHashCleanup(pRsp->useDbs);
}
int32_t tSerializeSGetUserWhiteListReq(void *buf, int32_t bufLen, SGetUserWhiteListReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSGetUserWhiteListReq(void *buf, int32_t bufLen, SGetUserWhiteListReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->user) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->numWhiteLists) < 0) return -1;
for (int i = 0; i < pRsp->numWhiteLists; ++i) {
if (tEncodeU32(&encoder, pRsp->pWhiteLists[i].ip) < 0) return -1;
if (tEncodeU32(&encoder, pRsp->pWhiteLists[i].mask) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSGetUserWhiteListRsp(void* buf, int32_t bufLen, SGetUserWhiteListRsp* pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->user) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numWhiteLists) < 0) return -1;
pRsp->pWhiteLists = taosMemoryMalloc(pRsp->numWhiteLists * sizeof(SIpV4Range));
if (pRsp->pWhiteLists == NULL) return -1;
for (int32_t i = 0; i < pRsp->numWhiteLists; ++i) {
if (tDecodeU32(&decoder, &(pRsp->pWhiteLists[i].ip)) < 0) return -1;
if (tDecodeU32(&decoder, &(pRsp->pWhiteLists[i].mask)) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSGetUserWhiteListRsp(SGetUserWhiteListRsp* pRsp) {
taosMemoryFree(pRsp->pWhiteLists);
}
int32_t tSerializeSCreateDropMQSNodeReq(void *buf, int32_t bufLen, SMCreateQnodeReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -4235,6 +4439,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->passVer) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->authVer) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->whiteListVer) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -4271,6 +4476,11 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
pRsp->authVer = 0;
}
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI64(&decoder, &pRsp->whiteListVer) < 0) return -1;
} else {
pRsp->whiteListVer = 0;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);

View File

@ -23,22 +23,22 @@ extern "C" {
#endif
typedef struct SDnodeMgmt {
SDnodeData *pData;
SMsgCb msgCb;
const char *path;
const char *name;
TdThread statusThread;
TdThread monitorThread;
TdThread crashReportThread;
SSingleWorker mgmtWorker;
ProcessCreateNodeFp processCreateNodeFp;
SDnodeData *pData;
SMsgCb msgCb;
const char *path;
const char *name;
TdThread statusThread;
TdThread monitorThread;
TdThread crashReportThread;
SSingleWorker mgmtWorker;
ProcessCreateNodeFp processCreateNodeFp;
ProcessAlterNodeTypeFp processAlterNodeTypeFp;
ProcessDropNodeFp processDropNodeFp;
SendMonitorReportFp sendMonitorReportFp;
GetVnodeLoadsFp getVnodeLoadsFp;
GetMnodeLoadsFp getMnodeLoadsFp;
GetQnodeLoadsFp getQnodeLoadsFp;
int32_t statusSeq;
ProcessDropNodeFp processDropNodeFp;
SendMonitorReportFp sendMonitorReportFp;
GetVnodeLoadsFp getVnodeLoadsFp;
GetMnodeLoadsFp getMnodeLoadsFp;
GetQnodeLoadsFp getQnodeLoadsFp;
int32_t statusSeq;
} SDnodeMgmt;
// dmHandle.c

View File

@ -30,7 +30,36 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
taosThreadRwlockUnlock(&pMgmt->pData->lock);
}
}
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
if (pMgmt->pData->ipWhiteVer == ver) {
if (ver == 0) {
dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL);
// pMgmt->ipWhiteVer = ver;
}
return;
}
int64_t oldVer = pMgmt->pData->ipWhiteVer;
// pMgmt->ipWhiteVer = ver;
SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer};
int32_t contLen = tSerializeRetrieveIpWhite(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
tSerializeRetrieveIpWhite(pHead, contLen, &req);
SRpcMsg rpcMsg = {.pCont = pHead,
.contLen = contLen,
.msgType = TDMT_MND_RETRIEVE_IP_WHITE,
.info.ahandle = (void *)0x9527,
.info.refId = 0,
.info.noResp = 0};
SEpSet epset = {0};
dmGetMnodeEpSet(pMgmt->pData, &epset);
rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
}
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
const STraceId *trace = &pRsp->info.traceId;
dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
@ -55,6 +84,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
}
dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
}
tFreeSStatusRsp(&statusRsp);
}
@ -111,6 +141,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
pMgmt->statusSeq++;
req.statusSeq = pMgmt->statusSeq;
req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);

View File

@ -55,6 +55,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp;
// pMgmt->pData->ipWhiteVer = 0;
if (dmStartWorker(pMgmt) != 0) {
return -1;
}

View File

@ -161,6 +161,9 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_PAUSE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RESUME_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_WHITELIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_CREATE_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -63,6 +63,29 @@ static void dmConvertErrCode(tmsg_t msgType) {
terrno = TSDB_CODE_VND_STOPPED;
}
}
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
SUpdateIpWhite ipWhite = {0}; // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
rpcSetIpWhite(pTrans, &ipWhite);
pData->ipWhiteVer = ipWhite.ver;
tFreeSUpdateIpWhiteReq(&ipWhite);
rpcFreeCont(pRpc->pCont);
}
static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
if (forbidden) {
SIpV4Range range = {.ip = clientIp, .mask = 32};
char buf[36] = {0};
rpcUtilSIpRangeToStr(&range, buf);
dError("User:%s host:%s not in ip white list", user, buf);
return true;
} else {
return false;
}
}
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans;
int32_t code = -1;
@ -81,6 +104,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
goto _OVER;
}
bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, pRpc->info.conn.clientIp);
if (isForbidden) {
terrno = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
goto _OVER;
}
switch (pRpc->msgType) {
case TDMT_DND_NET_TEST:
dmProcessNetTestReq(pDnode, pRpc);
@ -97,6 +126,10 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dmSetMnodeEpSet(&pDnode->data, pEpSet);
}
break;
case TDMT_MND_RETRIEVE_IP_WHITE_RSP: {
dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
return;
} break;
default:
break;
}
@ -372,6 +405,7 @@ void dmCleanupServer(SDnode *pDnode) {
SMsgCb dmGetMsgcb(SDnode *pDnode) {
SMsgCb msgCb = {
.clientRpc = pDnode->trans.clientRpc,
.serverRpc = pDnode->trans.serverRpc,
.sendReqFp = dmSendReq,
.sendRspFp = dmSendRsp,
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,

View File

@ -107,6 +107,7 @@ typedef struct {
TdThreadRwlock lock;
SMsgCb msgCb;
bool validMnodeEps;
int64_t ipWhiteVer;
} SDnodeData;
typedef struct {

View File

@ -277,18 +277,21 @@ typedef struct {
} SAcctObj;
typedef struct {
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
char acct[TSDB_USER_LEN];
int64_t createdTime;
int64_t updateTime;
int8_t superUser;
int8_t sysInfo;
int8_t enable;
int8_t reserve;
int32_t acctId;
int32_t authVersion;
int32_t passVersion;
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
char acct[TSDB_USER_LEN];
int64_t createdTime;
int64_t updateTime;
int8_t superUser;
int8_t sysInfo;
int8_t enable;
int8_t reserve;
int32_t acctId;
int32_t authVersion;
int32_t passVersion;
int64_t ipWhiteListVer;
SIpWhiteList* pIpWhiteList;
SHashObj* readDbs;
SHashObj* writeDbs;
SHashObj* topics;

View File

@ -130,6 +130,7 @@ typedef struct SMnode {
SGrantInfo grant;
MndMsgFp msgFp[TDMT_MAX];
SMsgCb msgCb;
int64_t ipWhiteVer;
} SMnode;
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
@ -140,6 +141,8 @@ bool mndGetRestored(SMnode *pMnode);
void mndSetStop(SMnode *pMnode);
bool mndGetStop(SMnode *pMnode);
SArray *mndGetAllDnodeFqdns(SMnode *pMnode);
#ifdef __cplusplus
}
#endif

View File

@ -33,6 +33,9 @@ int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType
int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, EShowType showType, const char *dbname);
int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter);
int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp);
int32_t mndSetUserWhiteListRsp(SMnode* pMnode, SUserObj* pUser, SGetUserWhiteListRsp* pWhiteListRsp);
int32_t mndEnableIpWhiteList(SMnode *pMnode);
int32_t mndFetchIpWhiteList(SIpWhiteList *ipList, char **buf);
#ifdef __cplusplus
}

View File

@ -23,6 +23,10 @@
extern "C" {
#endif
enum {
IP_WHITE_ADD,
IP_WHITE_DROP,
};
int32_t mndInitUser(SMnode *pMnode);
void mndCleanupUser(SMnode *pMnode);
SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName);
@ -38,8 +42,15 @@ int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int3
int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db);
int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic);
int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew);
void mndUserFreeObj(SUserObj *pUser);
int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew);
void mndUserFreeObj(SUserObj *pUser);
int64_t mndGetIpWhiteVer(SMnode *pMnode);
void mndUpdateIpWhite(SMnode *pMnode, char *user, char *fqdn, int8_t type, int8_t lock);
int32_t mndRefreshUserIpWhiteList(SMnode *pMnode);
#ifdef __cplusplus
}
#endif

View File

@ -51,6 +51,12 @@ enum {
DND_CONN_ACTIVE_CODE,
};
enum {
DND_CREATE,
DND_ADD,
DND_DROP,
};
static int32_t mndCreateDefaultDnode(SMnode *pMnode);
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
@ -103,7 +109,10 @@ int32_t mndInitDnode(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupDnode(SMnode *pMnode) {}
SIpWhiteList *mndCreateIpWhiteOfDnode(SMnode *pMnode);
SIpWhiteList *mndAddIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
SIpWhiteList *mndRmIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
void mndCleanupDnode(SMnode *pMnode) {}
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
int32_t code = -1;
@ -130,6 +139,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
mndUpdateIpWhite(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD, 1);
_OVER:
mndTransDrop(pTrans);
@ -518,13 +528,16 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
}
}
pMnode->ipWhiteVer = mndGetIpWhiteVer(pMnode);
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pDnode, curMs);
bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
bool supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged;
bool needCheck =
!online || dnodeChanged || reboot || supportVnodesChanged || pMnode->ipWhiteVer != statusReq.ipWhiteVer;
const STraceId *trace = &pReq->info.traceId;
mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
@ -645,6 +658,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
}
mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
void *pHead = rpcMallocCont(contLen);
@ -691,6 +705,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
mndUpdateIpWhite(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD, 1);
_OVER:
mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
@ -983,6 +998,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
mndUpdateIpWhite(pMnode, TSDB_DEFAULT_USER, pDnode->fqdn, IP_WHITE_DROP, 1);
code = 0;
_OVER:
@ -1455,3 +1471,19 @@ _err:
terrno = TSDB_CODE_INVALID_CFG;
return -1;
}
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
SDnodeObj *pObj = NULL;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
SArray *fqdns = taosArrayInit(4, sizeof(void *));
while (1) {
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
if (pIter == NULL) break;
char *fqdn = taosStrdup(pObj->fqdn);
taosArrayPush(fqdns, &fqdn);
sdbRelease(pSdb, pObj);
}
return fqdns;
}

View File

@ -28,10 +28,33 @@ int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType
int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname) {
return 0;
}
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic) { return 0; }
int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName) {
return 0;
}
// TODO: for community version use the commented version
int32_t mndSetUserWhiteListRsp(SMnode *pMnode, SUserObj *pUser, SGetUserWhiteListRsp *pWhiteListRsp) {
memcpy(pWhiteListRsp->user, pUser->user, TSDB_USER_LEN);
pWhiteListRsp->numWhiteLists = 1;
pWhiteListRsp->pWhiteLists = taosMemoryMalloc(pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range));
if (pWhiteListRsp->pWhiteLists == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memset(pWhiteListRsp->pWhiteLists, 0, pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range));
// pWhiteListRsp->numWhiteLists = pUser->pIpWhiteList->num;
// pWhiteListRsp->pWhiteLists = taosMemoryMalloc(pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range));
// if (pWhiteListRsp->pWhiteLists == NULL) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
// memcpy(pWhiteListRsp->pWhiteLists, pUser->pIpWhiteList->pIpRange,
// pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range));
return 0;
}
int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp) {
memcpy(pRsp->user, pUser->user, TSDB_USER_LEN);
pRsp->superAuth = 1;
@ -39,6 +62,14 @@ int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp
pRsp->sysInfo = pUser->sysInfo;
pRsp->version = pUser->authVersion;
pRsp->passVer = pUser->passVersion;
pRsp->whiteListVer = pUser->ipWhiteListVer;
return 0;
}
#endif
int32_t mndEnableIpWhiteList(SMnode *pMnode) { return 1; }
int32_t mndFetchIpWhiteList(SIpWhiteList *ipList, char **buf) {
*buf = NULL;
return 0;
}
#endif

View File

@ -290,6 +290,7 @@ _CONNECT:
connectRsp.svrTimestamp = taosGetTimestampSec();
connectRsp.passVer = pUser->passVersion;
connectRsp.authVer = pUser->authVersion;
connectRsp.whiteListVer = pUser->ipWhiteListVer;
strcpy(connectRsp.sVer, version);
snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "mndStb.h"
#include "audit.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndIndex.h"
@ -31,7 +32,6 @@
#include "mndUser.h"
#include "mndVgroup.h"
#include "tname.h"
#include "audit.h"
#define STB_VER_NUMBER 1
#define STB_RESERVE_SIZE 64
@ -858,7 +858,19 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
}
return 0;
}
static int32_t mndGenIdxNameForFirstTag(char *fullname, char *dbname, char *tagname) {
char randStr[24] = {0};
int8_t start = 8;
int8_t end = sizeof(randStr) - 1;
// gen rand str len [base:end]
// note: ignore rand performance issues
int64_t len = taosRand() % (end - start + 1) + start;
taosRandStr2(randStr, len);
sprintf(fullname, "%s.%s_%s", dbname, tagname, randStr);
return 0;
}
static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
SStbObj stbObj = {0};
int32_t code = -1;
@ -871,11 +883,8 @@ static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCrea
mInfo("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
if (mndBuildStbFromReq(pMnode, &stbObj, pCreate, pDb) != 0) goto _OVER;
char randStr[24] = {0};
taosRandStr2(randStr, tListLen(randStr) - 1);
SSchema *pSchema = &(stbObj.pTags[0]);
sprintf(fullIdxName, "%s.%s_%s", pDb->name, pSchema->name, randStr);
mndGenIdxNameForFirstTag(fullIdxName, pDb->name, pSchema->name);
SSIdx idx = {0};
if (mndAcquireGlobalIdx(pMnode, fullIdxName, SDB_IDX, &idx) == 0 && idx.pIdx != NULL) {
terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
@ -1066,78 +1075,75 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
return TSDB_CODE_SUCCESS;
}
static char* mndAuditFieldTypeStr(int32_t type){
switch (type)
{
case TSDB_DATA_TYPE_NULL:
return "null";
case TSDB_DATA_TYPE_BOOL:
return "bool";
case TSDB_DATA_TYPE_TINYINT:
return "tinyint";
case TSDB_DATA_TYPE_SMALLINT:
return "smallint";
case TSDB_DATA_TYPE_INT:
return "int";
case TSDB_DATA_TYPE_BIGINT:
return "bigint";
case TSDB_DATA_TYPE_FLOAT:
return "float";
case TSDB_DATA_TYPE_DOUBLE:
return "double";
case TSDB_DATA_TYPE_VARCHAR:
return "varchar";
case TSDB_DATA_TYPE_TIMESTAMP:
return "timestamp";
case TSDB_DATA_TYPE_NCHAR:
return "nchar";
case TSDB_DATA_TYPE_UTINYINT:
return "utinyint";
case TSDB_DATA_TYPE_USMALLINT:
return "usmallint";
case TSDB_DATA_TYPE_UINT:
return "uint";
case TSDB_DATA_TYPE_UBIGINT:
return "ubigint";
case TSDB_DATA_TYPE_JSON:
return "json";
case TSDB_DATA_TYPE_VARBINARY:
return "varbinary";
case TSDB_DATA_TYPE_DECIMAL:
return "decimal";
case TSDB_DATA_TYPE_BLOB:
return "blob";
case TSDB_DATA_TYPE_MEDIUMBLOB:
return "mediumblob";
case TSDB_DATA_TYPE_GEOMETRY:
return "geometry";
static char *mndAuditFieldTypeStr(int32_t type) {
switch (type) {
case TSDB_DATA_TYPE_NULL:
return "null";
case TSDB_DATA_TYPE_BOOL:
return "bool";
case TSDB_DATA_TYPE_TINYINT:
return "tinyint";
case TSDB_DATA_TYPE_SMALLINT:
return "smallint";
case TSDB_DATA_TYPE_INT:
return "int";
case TSDB_DATA_TYPE_BIGINT:
return "bigint";
case TSDB_DATA_TYPE_FLOAT:
return "float";
case TSDB_DATA_TYPE_DOUBLE:
return "double";
case TSDB_DATA_TYPE_VARCHAR:
return "varchar";
case TSDB_DATA_TYPE_TIMESTAMP:
return "timestamp";
case TSDB_DATA_TYPE_NCHAR:
return "nchar";
case TSDB_DATA_TYPE_UTINYINT:
return "utinyint";
case TSDB_DATA_TYPE_USMALLINT:
return "usmallint";
case TSDB_DATA_TYPE_UINT:
return "uint";
case TSDB_DATA_TYPE_UBIGINT:
return "ubigint";
case TSDB_DATA_TYPE_JSON:
return "json";
case TSDB_DATA_TYPE_VARBINARY:
return "varbinary";
case TSDB_DATA_TYPE_DECIMAL:
return "decimal";
case TSDB_DATA_TYPE_BLOB:
return "blob";
case TSDB_DATA_TYPE_MEDIUMBLOB:
return "mediumblob";
case TSDB_DATA_TYPE_GEOMETRY:
return "geometry";
default:
return "error";
default:
return "error";
}
}
static void mndAuditFieldStr(char* detail, SArray *arr, int32_t len, int32_t max){
static void mndAuditFieldStr(char *detail, SArray *arr, int32_t len, int32_t max) {
int32_t detialLen = strlen(detail);
int32_t fieldLen = 0;
for (int32_t i = 0; i < len; ++i) {
SField *pField = taosArrayGet(arr, i);
char field[TSDB_COL_NAME_LEN + 20] = {0};
char field[TSDB_COL_NAME_LEN + 20] = {0};
fieldLen = strlen(", ");
if(detialLen > 0 && detialLen < max-fieldLen-1) {
if (detialLen > 0 && detialLen < max - fieldLen - 1) {
strcat(detail, ", ");
detialLen += fieldLen;
}
else{
} else {
break;
}
sprintf(field, "%s:%s", pField->name, mndAuditFieldTypeStr(pField->type));
fieldLen = strlen(field);
if(detialLen < max-fieldLen-1) {
if (detialLen < max - fieldLen - 1) {
strcat(detail, field);
detialLen += fieldLen;
}
else{
} else {
break;
}
}
@ -1252,14 +1258,17 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char detail[AUDIT_DETAIL_MAX] = {0};
sprintf(detail, "colVer:%d, delay1:%" PRId64 ", delay2:%" PRId64 ", deleteMark1:%" PRId64 ", "
"deleteMark2:%" PRId64 ", igExists:%d, numOfColumns:%d, numOfFuncs:%d, numOfTags:%d, "
"source:%d, suid:%" PRId64 ", tagVer:%d, ttl:%d, "
sprintf(detail,
"colVer:%d, delay1:%" PRId64 ", delay2:%" PRId64 ", deleteMark1:%" PRId64
", "
"deleteMark2:%" PRId64
", igExists:%d, numOfColumns:%d, numOfFuncs:%d, numOfTags:%d, "
"source:%d, suid:%" PRId64
", tagVer:%d, ttl:%d, "
"watermark1:%" PRId64 ", watermark2:%" PRId64,
createReq.colVer, createReq.delay1, createReq.delay2, createReq.deleteMark1,
createReq.deleteMark2, createReq.igExists, createReq.numOfColumns, createReq.numOfFuncs, createReq.numOfTags,
createReq.source, createReq.suid, createReq.tagVer, createReq.ttl,
createReq.watermark1, createReq.watermark2);
createReq.colVer, createReq.delay1, createReq.delay2, createReq.deleteMark1, createReq.deleteMark2,
createReq.igExists, createReq.numOfColumns, createReq.numOfFuncs, createReq.numOfTags, createReq.source,
createReq.suid, createReq.tagVer, createReq.ttl, createReq.watermark1, createReq.watermark2);
mndAuditFieldStr(detail, createReq.pColumns, createReq.numOfColumns, AUDIT_DETAIL_MAX);
mndAuditFieldStr(detail, createReq.pTags, createReq.numOfTags, AUDIT_DETAIL_MAX);
@ -2338,8 +2347,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char detail[2000] = {0};
sprintf(detail, "alterType:%d, numOfFields:%d, ttl:%d" ,
alterReq.alterType, alterReq.numOfFields, alterReq.ttl);
sprintf(detail, "alterType:%d, numOfFields:%d, ttl:%d", alterReq.alterType, alterReq.numOfFields, alterReq.ttl);
SName name = {0};
tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
@ -2608,8 +2616,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char detail[2000] = {0};
sprintf(detail, "igNotExists:%d, source:%d" ,
dropReq.igNotExists, dropReq.source);
sprintf(detail, "igNotExists:%d, source:%d", dropReq.igNotExists, dropReq.source);
SName name = {0};
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
@ -3369,7 +3376,7 @@ static int32_t buildSysDbColsInfo(SSDataBlock *p, int8_t buildWhichDBs, char *tb
return p->info.rows;
}
static int8_t determineBuildColForWhichDBs(const char* db) {
static int8_t determineBuildColForWhichDBs(const char *db) {
int8_t buildWhichDBs;
if (!db[0])
buildWhichDBs = BUILD_COL_FOR_ALL_DB;
@ -3387,11 +3394,11 @@ static int8_t determineBuildColForWhichDBs(const char* db) {
}
static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
uint8_t buildWhichDBs;
uint8_t buildWhichDBs;
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SStbObj *pStb = NULL;
int32_t numOfRows = 0;
int32_t numOfRows = 0;
buildWhichDBs = determineBuildColForWhichDBs(pShow->db);

View File

@ -17,6 +17,7 @@
#include "mndSync.h"
#include "mndCluster.h"
#include "mndTrans.h"
#include "mndUser.h"
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) {
@ -167,7 +168,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
SSdbRaw *pRaw = pMsg->pCont;
STrans *pTrans = NULL;
int32_t code = -1;
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
if (transId <= 0) {
mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
@ -304,6 +305,7 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
} else {
mInfo("vgId:1, sync restore finished");
}
mndRefreshUserIpWhiteList(pMnode);
ASSERT(commitIdx == mndSyncAppliedIndex(pFsm));
}

File diff suppressed because it is too large Load Diff

View File

@ -1948,6 +1948,10 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq,
extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
if (!pVnode->restored) {
vInfo("vgId:%d, ignore compact req during restoring. ver:%" PRId64, TD_VID(pVnode), ver);
return 0;
}
return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp);
}

View File

@ -13,8 +13,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <uv.h>
#include <regex.h>
#include <uv.h>
#include "parAst.h"
#include "parUtil.h"
@ -1666,8 +1666,8 @@ SNode* createShowTableTagsStmt(SAstCreateContext* pCxt, SNode* pTbName, SNode* p
static int32_t getIpV4RangeFromWhitelistItem(char* ipRange, SIpV4Range* pIpRange) {
int32_t code = TSDB_CODE_SUCCESS;
char* ipCopy = taosStrdup(ipRange);
char* slash = strchr(ipCopy, '/');
char* ipCopy = taosStrdup(ipRange);
char* slash = strchr(ipCopy, '/');
if (slash) {
*slash = '\0';
struct in_addr addr;
@ -1675,11 +1675,9 @@ static int32_t getIpV4RangeFromWhitelistItem(char* ipRange, SIpV4Range* pIpRange
int prefix = atoi(slash + 1);
if (prefix < 0 || prefix > 32) {
code = TSDB_CODE_PAR_INVALID_IP_RANGE;
} else {
} else {
pIpRange->ip = addr.s_addr;
uint32_t mask = (1 << (32 - prefix)) - 1;
mask = htonl(~mask);
pIpRange->mask = mask;
pIpRange->mask = prefix;
code = TSDB_CODE_SUCCESS;
}
} else {
@ -1689,7 +1687,7 @@ static int32_t getIpV4RangeFromWhitelistItem(char* ipRange, SIpV4Range* pIpRange
struct in_addr addr;
if (uv_inet_pton(AF_INET, ipCopy, &addr) == 0) {
pIpRange->ip = addr.s_addr;
pIpRange->mask = 0xFFFFFFFF;
pIpRange->mask = 32;
code = TSDB_CODE_SUCCESS;
} else {
code = TSDB_CODE_PAR_INVALID_IP_RANGE;
@ -1697,7 +1695,7 @@ static int32_t getIpV4RangeFromWhitelistItem(char* ipRange, SIpV4Range* pIpRange
}
taosMemoryFreeClear(ipCopy);
return code;
return code;
}
static int32_t fillIpRangesFromWhiteList(SAstCreateContext* pCxt, SNodeList* pIpRangesNodeList, SIpV4Range* pIpRanges) {
@ -1769,7 +1767,7 @@ SNode* createAlterUserStmt(SAstCreateContext* pCxt, SToken* pUserName, int8_t al
pStmt->alterType = alterType;
switch (alterType) {
case TSDB_ALTER_USER_PASSWD: {
char password[TSDB_USET_PASSWORD_LEN] = {0};
char password[TSDB_USET_PASSWORD_LEN] = {0};
SToken* pVal = pAlterInfo;
if (!checkPassword(pCxt, pVal, password)) {
nodesDestroyNode((SNode*)pStmt);
@ -1788,7 +1786,7 @@ SNode* createAlterUserStmt(SAstCreateContext* pCxt, SToken* pUserName, int8_t al
pStmt->sysinfo = taosStr2Int8(pVal->z, NULL, 10);
break;
}
case TSDB_ALTER_USER_ADD_WHITE_LIST:
case TSDB_ALTER_USER_ADD_WHITE_LIST:
case TSDB_ALTER_USER_DROP_WHITE_LIST: {
SNodeList* pIpRangesNodeList = pAlterInfo;
pStmt->pNodeListIpRanges = pIpRangesNodeList;

View File

@ -1712,25 +1712,24 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
if (pState != NULL && idx != -1) {
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
rocksdb_column_family_handle_t* cf = NULL;
taosThreadRwlockRdlock(&wrapper->rwLock);
taosThreadRwlockWrlock(&wrapper->rwLock);
cf = wrapper->pHandle[idx];
taosThreadRwlockUnlock(&wrapper->rwLock);
if (cf == NULL) {
char buf[128] = {0};
GEN_COLUMN_FAMILY_NAME(buf, wrapper->idstr, ginitDict[idx].key);
char* err = NULL;
taosThreadRwlockWrlock(&wrapper->rwLock);
cf = rocksdb_create_column_family(wrapper->rocksdb, wrapper->cfOpts[idx], buf, &err);
if (err != NULL) {
idx = -1;
qError("failed to to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err);
taosMemoryFree(err);
} else {
qDebug("succ to to open cf, %p %s_%s", pState, wrapper->idstr, funcName);
wrapper->pHandle[idx] = cf;
}
taosThreadRwlockUnlock(&wrapper->rwLock);
}
taosThreadRwlockUnlock(&wrapper->rwLock);
}
return idx;

View File

@ -29,6 +29,8 @@ extern "C" {
#include "ttrace.h"
#include "tutil.h"
typedef bool (*FilteFunc)(void* arg);
typedef void* queue[2];
/* Private macros. */
#define QUEUE_NEXT(q) (*(queue**)&((*(q))[0]))
@ -303,11 +305,12 @@ void transUnrefCliHandle(void* handle);
int transReleaseCliHandle(void* handle);
int transReleaseSrvHandle(void* handle);
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
int transSendResponse(const STransMsg* msg);
int transRegisterMsg(const STransMsg* msg);
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
int transSendResponse(const STransMsg* msg);
int transRegisterMsg(const STransMsg* msg);
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
void transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func);
int transSockInfo2Str(struct sockaddr* sockname, char* dst);
@ -434,6 +437,23 @@ int32_t transGetRefMgt();
int32_t transGetInstMgt();
void transHttpEnvDestroy();
typedef struct {
uint32_t netmask;
uint32_t address;
uint32_t network;
uint32_t broadcast;
char info[32];
int8_t type;
} SubnetUtils;
int32_t subnetInit(SubnetUtils* pUtils, SIpV4Range* pRange);
int32_t subnetCheckIp(SubnetUtils* pUtils, uint32_t ip);
int32_t subnetDebugInfoToBuf(SubnetUtils* pUtils, char* buf);
int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf);
int32_t transUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf);
#ifdef __cplusplus
}
#endif

View File

@ -178,13 +178,21 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle
int rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); }
int rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); }
// client only
int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
// later
return transSetDefaultAddr(thandle, ip, fqdn);
}
// server only
void rpcSetIpWhite(void* thandle, void* arg) { transSetIpWhiteList(thandle, arg, NULL); }
void* rpcAllocHandle() { return (void*)transAllocHandle(); }
int32_t rpcUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) { return transUtilSIpRangeToStr(pRange, buf); }
int32_t rpcUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf) {
return transUtilSWhiteListToStr(pWhiteList, ppBuf);
}
int32_t rpcInit() {
transInit();
return 0;

View File

@ -654,3 +654,106 @@ void transDestoryExHandle(void* handle) {
}
taosMemoryFree(handle);
}
// void subnetIp2int(const char* const ip_addr, uint8_t* dst) {
// char ip_addr_cpy[20];
// char ip[5];
// tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));
// char *s_start, *s_end;
// s_start = ip_addr_cpy;
// s_end = ip_addr_cpy;
// int32_t k = 0;
// for (k = 0; *s_start != '\0'; s_start = s_end) {
// for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
// }
// if (*s_end == '.') {
// *s_end = '\0';
// s_end++;
// }
// dst[k++] = (char)atoi(s_start);
// }
// }
uint32_t subnetIpRang2Int(SIpV4Range* pRange) {
uint32_t ip = pRange->ip;
return ((ip & 0xFF) << 24) | ((ip & 0xFF00) << 8) | ((ip & 0xFF0000) >> 8) | ((ip >> 24) & 0xFF);
}
int32_t subnetInit(SubnetUtils* pUtils, SIpV4Range* pRange) {
if (pRange->mask == 32) {
pUtils->type = 0;
pUtils->address = pRange->ip;
return 0;
}
pUtils->address = subnetIpRang2Int(pRange);
for (int i = 0; i < pRange->mask; i++) {
pUtils->netmask |= (1 << (31 - i));
}
pUtils->network = pUtils->address & pUtils->netmask;
pUtils->broadcast = (pUtils->network) | (pUtils->netmask ^ 0xFFFFFFFF);
pUtils->type = (pRange->mask == 32 ? 0 : 1);
return 0;
}
int32_t subnetDebugInfoToBuf(SubnetUtils* pUtils, char* buf) {
sprintf(buf, "raw: %s, address: %d, netmask:%d, network:%d, broadcast:%d", pUtils->info, pUtils->address,
pUtils->netmask, pUtils->network, pUtils->broadcast);
return 0;
}
int32_t subnetCheckIp(SubnetUtils* pUtils, uint32_t ip) {
// impl later
if (pUtils == NULL) return false;
if (pUtils->type == 0) {
return pUtils->address == ip;
} else {
SIpV4Range range = {.ip = ip, .mask = 32};
uint32_t t = subnetIpRang2Int(&range);
return t >= pUtils->network && t <= pUtils->broadcast;
}
}
int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) {
int32_t len = 0;
struct in_addr addr;
addr.s_addr = pRange->ip;
uv_inet_ntop(AF_INET, &addr, buf, 32);
len = strlen(buf);
if (pRange->mask != 32) {
len += sprintf(buf + len, "/%d", pRange->mask);
}
buf[len] = 0;
return len;
}
int32_t transUtilSWhiteListToStr(SIpWhiteList* pList, char** ppBuf) {
if (pList->num == 0) {
*ppBuf = NULL;
return 0;
}
int32_t len = 0;
char* pBuf = taosMemoryCalloc(1, pList->num * 36);
for (int i = 0; i < pList->num; i++) {
SIpV4Range* pRange = &pList->pIpRange[i];
char tbuf[32] = {0};
int tlen = transUtilSIpRangeToStr(pRange, tbuf);
len += sprintf(pBuf + len, "%s,", tbuf);
}
if (len > 0) {
pBuf[len - 1] = 0;
}
*ppBuf = pBuf;
return len;
}

View File

@ -43,6 +43,7 @@ typedef struct SSvrConn {
ConnStatus status;
uint32_t serverIp;
uint32_t clientIp;
uint16_t port;
@ -55,6 +56,8 @@ typedef struct SSvrConn {
char user[TSDB_UNI_LEN]; // user ID for the link
char secret[TSDB_PASSWORD_LEN];
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
int64_t whiteListVer;
} SSvrConn;
typedef struct SSvrMsg {
@ -62,8 +65,22 @@ typedef struct SSvrMsg {
STransMsg msg;
queue q;
STransMsgType type;
void* arg;
FilteFunc func;
} SSvrMsg;
typedef struct {
int64_t ver;
SIpWhiteList* pList;
// SArray* list;
} SWhiteUserList;
typedef struct {
SHashObj* pList;
int64_t ver;
} SIpWhiteListTab;
typedef struct SWorkThrd {
TdThread thread;
uv_connect_t connect_req;
@ -77,6 +94,10 @@ typedef struct SWorkThrd {
queue conn;
void* pTransInst;
bool quit;
SIpWhiteListTab* pWhiteList;
int64_t whiteListVer;
int8_t enableIpWhiteList;
} SWorkThrd;
typedef struct SServerObj {
@ -99,6 +120,14 @@ typedef struct SServerObj {
bool inited;
} SServerObj;
SIpWhiteListTab* uvWhiteListCreate();
void uvWhiteListDestroy(SIpWhiteListTab* pWhite);
void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* pList, int64_t ver);
void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable);
bool uvWhiteListCheckConn(SIpWhiteListTab* pWhite, SSvrConn* pConn);
bool uvWhiteListFilte(SIpWhiteListTab* pWhite, char* user, uint32_t ip, int64_t ver);
void uvWhiteListSetConnVer(SIpWhiteListTab* pWhite, SSvrConn* pConn);
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
@ -141,8 +170,9 @@ static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleUpdate(SSvrMsg* pMsg, SWorkThrd* thrd);
static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
uvHandleRegister, NULL};
uvHandleRegister, uvHandleUpdate};
static void uvDestroyConn(uv_handle_t* handle);
@ -181,8 +211,134 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
tDebug("%p timeout since no activity", conn);
}
static bool uvCheckIp(SIpV4Range* pRange, int32_t ip) {
// impl later
SubnetUtils subnet = {0};
if (subnetInit(&subnet, pRange) != 0) {
return false;
}
return subnetCheckIp(&subnet, ip);
}
SIpWhiteListTab* uvWhiteListCreate() {
SIpWhiteListTab* pWhiteList = taosMemoryCalloc(1, sizeof(SIpWhiteListTab));
pWhiteList->pList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK);
pWhiteList->ver = -1;
return pWhiteList;
}
void uvWhiteListDestroy(SIpWhiteListTab* pWhite) {
SHashObj* pWhiteList = pWhite->pList;
void* pIter = taosHashIterate(pWhiteList, NULL);
while (pIter) {
SWhiteUserList* pUserList = *(SWhiteUserList**)pIter;
taosMemoryFree(pUserList->pList);
taosMemoryFree(pUserList);
pIter = taosHashIterate(pWhiteList, pIter);
}
taosHashCleanup(pWhiteList);
taosMemoryFree(pWhite);
}
void uvWhiteListToStr(SWhiteUserList* plist, char* user, char** ppBuf) {
char* tmp = NULL;
int32_t tlen = transUtilSWhiteListToStr(plist->pList, &tmp);
char* pBuf = taosMemoryCalloc(1, tlen + 64);
int32_t len = sprintf(pBuf, "user: %s, ver: %" PRId64 ", ip: {%s}", user, plist->ver, tmp);
taosMemoryFree(tmp);
*ppBuf = pBuf;
}
void uvWhiteListDebug(SIpWhiteListTab* pWrite) {
SHashObj* pWhiteList = pWrite->pList;
void* pIter = taosHashIterate(pWhiteList, NULL);
while (pIter) {
size_t klen = 0;
char user[TSDB_USER_LEN + 1] = {0};
char* pUser = taosHashGetKey(pIter, &klen);
memcpy(user, pUser, klen);
SWhiteUserList* pUserList = *(SWhiteUserList**)pIter;
char* buf = NULL;
uvWhiteListToStr(pUserList, user, &buf);
tDebug("ip-white-list %s", buf);
taosMemoryFree(buf);
pIter = taosHashIterate(pWhiteList, pIter);
}
}
void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* plist, int64_t ver) {
SHashObj* pWhiteList = pWhite->pList;
SWhiteUserList** ppUserList = taosHashGet(pWhiteList, user, strlen(user));
if (ppUserList == NULL || *ppUserList == NULL) {
SWhiteUserList* pUserList = taosMemoryCalloc(1, sizeof(SWhiteUserList));
pUserList->ver = ver;
pUserList->pList = plist;
taosHashPut(pWhiteList, user, strlen(user), &pUserList, sizeof(void*));
} else {
SWhiteUserList* pUserList = *ppUserList;
taosMemoryFreeClear(pUserList->pList);
pUserList->ver = ver;
pUserList->pList = plist;
}
uvWhiteListDebug(pWhite);
}
void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable) {
pWhite->ver++;
// impl later
}
static bool uvWhiteListIsDefaultAddr(uint32_t ip) {
// 127.0.0.1
static SIpV4Range range = {.ip = 16777343, .mask = 32};
return range.ip == ip;
}
bool uvWhiteListFilte(SIpWhiteListTab* pWhite, char* user, uint32_t ip, int64_t ver) {
// impl check
SHashObj* pWhiteList = pWhite->pList;
bool valid = false;
if (uvWhiteListIsDefaultAddr(ip)) return true;
SWhiteUserList** ppList = taosHashGet(pWhiteList, user, strlen(user));
if (ppList == NULL || *ppList == NULL) {
return false;
}
SWhiteUserList* pUserList = *ppList;
if (pUserList->ver == ver) return true;
SIpWhiteList* pIpWhiteList = pUserList->pList;
for (int i = 0; i < pIpWhiteList->num; i++) {
SIpV4Range* range = &pIpWhiteList->pIpRange[i];
if (uvCheckIp(range, ip)) {
valid = true;
break;
}
}
return valid;
}
bool uvWhiteListCheckConn(SIpWhiteListTab* pWhite, SSvrConn* pConn) {
if (pConn->inType == TDMT_MND_STATUS || pConn->inType == TDMT_MND_RETRIEVE_IP_WHITE ||
pConn->serverIp == pConn->clientIp ||
pWhite->ver == pConn->whiteListVer /*|| strncmp(pConn->user, "_dnd", strlen("_dnd")) == 0*/)
return true;
return uvWhiteListFilte(pWhite, pConn->user, pConn->clientIp, pConn->whiteListVer);
}
void uvWhiteListSetConnVer(SIpWhiteListTab* pWhite, SSvrConn* pConn) {
// if conn already check by current whiteLis
pConn->whiteListVer = pWhite->ver;
}
static bool uvHandleReq(SSvrConn* pConn) {
STrans* pTransInst = pConn->pTransInst;
STrans* pTransInst = pConn->pTransInst;
SWorkThrd* pThrd = pConn->hostThrd;
STransMsgHead* pHead = NULL;
@ -199,8 +355,17 @@ static bool uvHandleReq(SSvrConn* pConn) {
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
pConn->inType = pHead->msgType;
memcpy(pConn->user, pHead->user, strlen(pHead->user));
int8_t forbiddenIp = 0;
if (pThrd->enableIpWhiteList) {
forbiddenIp = !uvWhiteListCheckConn(pThrd->pWhiteList, pConn) ? 1 : 0;
if (forbiddenIp == 0) {
uvWhiteListSetConnVer(pThrd->pWhiteList, pConn);
}
}
if (uvRecvReleaseReq(pConn, pHead)) {
return true;
}
@ -219,7 +384,6 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code;
pConn->inType = pHead->msgType;
if (pConn->status == ConnNormal) {
if (pHead->persist == 1) {
pConn->status = ConnAcquire;
@ -262,6 +426,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.info.refId = pConn->refId;
transMsg.info.traceId = pHead->traceId;
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
transMsg.info.forbiddenIp = forbiddenIp;
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn,
pConn->refId);
@ -395,7 +560,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
if (status == 0) {
tTrace("success to dispatch conn to work thread");
} else {
tError("fail to dispatch conn to work thread, code:%s", uv_strerror(status));
tError("fail to dispatch conn to work thread, reason:%s", uv_strerror(status));
}
if (!uv_is_closing((uv_handle_t*)req->data)) {
uv_close((uv_handle_t*)req->data, uvFreeCb);
@ -542,7 +707,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
}
// release handle to rpc init
if (msg->type == Quit) {
if (msg->type == Quit || msg->type == Update) {
(*transAsyncHandle[msg->type])(msg, pThrd);
} else {
STransMsg transMsg = msg->msg;
@ -638,7 +803,7 @@ static void uvPrepareCb(uv_prepare_t* handle) {
continue;
}
// release handle to rpc init
if (msg->type == Quit) {
if (msg->type == Quit || msg->type == Update) {
(*transAsyncHandle[msg->type])(msg, pThrd);
continue;
} else {
@ -796,7 +961,10 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
transSockInfo2Str(&sockname, pConn->src);
struct sockaddr_in addr = *(struct sockaddr_in*)&peername;
struct sockaddr_in saddr = *(struct sockaddr_in*)&sockname;
pConn->clientIp = addr.sin_addr.s_addr;
pConn->serverIp = saddr.sin_addr.s_addr;
pConn->port = ntohs(addr.sin_port);
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
@ -1066,9 +1234,10 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));
thrd->pTransInst = shandle;
thrd->quit = false;
srv->pThreadObj[i] = thrd;
thrd->pTransInst = shandle;
thrd->pWhiteList = uvWhiteListCreate();
srv->pThreadObj[i] = thrd;
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
thrd->pipe = &(srv->pipe[i][1]); // init read
@ -1093,6 +1262,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
thrd->pTransInst = shandle;
thrd->quit = false;
thrd->pTransInst = shandle;
thrd->pWhiteList = uvWhiteListCreate();
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
srv->pThreadObj[i] = thrd;
@ -1192,6 +1362,31 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) {
taosMemoryFree(msg);
}
}
void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
SUpdateIpWhite* req = msg->arg;
if (req != NULL) {
for (int i = 0; i < req->numOfUser; i++) {
SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i];
int32_t sz = pUser->numOfRange * sizeof(SIpV4Range);
SIpWhiteList* pList = taosMemoryCalloc(1, sz + sizeof(SIpWhiteList));
pList->num = pUser->numOfRange;
memcpy(pList->pIpRange, pUser->pIpRanges, sz);
uvWhiteListAdd(thrd->pWhiteList, pUser->user, pList, pUser->ver);
}
thrd->pWhiteList->ver = req->ver;
thrd->enableIpWhiteList = 1;
tFreeSUpdateIpWhiteReq(req);
taosMemoryFree(req);
} else {
thrd->enableIpWhiteList = 0;
}
taosMemoryFree(msg);
return;
}
void destroyWorkThrd(SWorkThrd* pThrd) {
if (pThrd == NULL) {
return;
@ -1200,6 +1395,9 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg);
transAsyncPoolDestroy(pThrd->asyncPool);
uvWhiteListDestroy(pThrd->pWhiteList);
taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd);
@ -1367,5 +1565,23 @@ _return2:
rpcFreeCont(msg->pCont);
return -1;
}
void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)thandle);
tDebug("ip-white-list update on rpc");
SServerObj* svrObj = pTransInst->tcphandle;
for (int i = 0; i < svrObj->numOfThreads; i++) {
SWorkThrd* pThrd = svrObj->pThreadObj[i];
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
SUpdateIpWhite* pReq = (arg != NULL ? cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg) : NULL);
msg->type = Update;
msg->arg = pReq;
transAsyncSend(pThrd->asyncPool, &msg->q);
}
transReleaseExHandle(transGetInstMgt(), (int64_t)thandle);
}
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }

View File

@ -86,8 +86,9 @@ void taosRandStr(char* str, int32_t size) {
}
void taosRandStr2(char* str, int32_t size) {
const char* set = "abcdefghijklmnopqrstuvwxyz0123456789";
int32_t len = 36;
const char* set = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ@";
int32_t len = strlen(set);
for (int32_t i = 0; i < size; ++i) {
str[i] = set[taosRand() % len];

View File

@ -746,9 +746,9 @@ bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
serverAdd.sin_port = (uint16_t)htons(port);
fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd < 0) { // exception
if (fd < 0) { // exception
return false;
} else if (fd <= 2) { // in, out, err
} else if (fd <= 2) { // in, out, err
taosCloseSocketNoCheck1(fd);
return false;
}
@ -895,32 +895,6 @@ int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len
}
// Function converting an IP address string to an uint32_t.
uint32_t ip2uint(const char *const ip_addr) {
char ip_addr_cpy[20];
char ip[5];
tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));
char *s_start, *s_end;
s_start = ip_addr_cpy;
s_end = ip_addr_cpy;
int32_t k;
for (k = 0; *s_start != '\0'; s_start = s_end) {
for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
}
if (*s_end == '.') {
*s_end = '\0';
s_end++;
}
ip[k++] = (char)atoi(s_start);
}
ip[k] = '\0';
return *((uint32_t *)ip);
}
#endif // endif 0

View File

@ -100,6 +100,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_DATA_FMT, "Invalid data format")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG_VALUE, "Invalid configuration value")
TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connect")
//client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
@ -193,6 +194,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_USERS, "Too many users")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ALTER_OPER, "Invalid alter operation")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_AUTH_FAILURE, "Authentication failure")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_PRIVILEDGE_EXIST, "User already have this priviledge")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_HOST_EXIST, "Host already exist in ip white list")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_HOST_NOT_EXIST, "Host not exist in ip white list")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_USER_HOST, "Too many host in ip white list")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_LOCAL_HOST_NOT_DROP, "Host can not be dropped")
//mnode-stable-part1
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "STable already exists")
@ -216,7 +221,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_BUFSIZE, "Invalid func bufSize"
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve msg")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST, "index already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST, "index already exists in db")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_INDEX_NOT_EXIST, "index not exist")
@ -307,7 +312,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STREAMS, "Too many streams")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TARGET_TABLE, "Cannot write the same stable as other stream")
// mnode-sma
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "index already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "index already exists in db")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "index not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SMA_OPTION, "Invalid sma index option")

View File

@ -161,8 +161,8 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -i True
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -n 3 -i True
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-19201.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3404.py
@ -825,7 +825,7 @@
,,y,script,./test.sh -f tsim/dnode/balance3.sim
,,y,script,./test.sh -f tsim/vnode/replica3_many.sim
,,y,script,./test.sh -f tsim/stable/metrics_idx.sim
,,y,script,./test.sh -f tsim/db/alter_replica_13.sim
# ,,y,script,./test.sh -f tsim/db/alter_replica_13.sim
,,y,script,./test.sh -f tsim/sync/3Replica1VgElect.sim
,,y,script,./test.sh -f tsim/sync/3Replica5VgElect.sim
,,n,script,./test.sh -f tsim/valgrind/checkError6.sim
@ -839,7 +839,7 @@
,,y,script,./test.sh -f tsim/user/privilege_table.sim
,,y,script,./test.sh -f tsim/user/privilege_create_db.sim
,,y,script,./test.sh -f tsim/db/alter_option.sim
,,y,script,./test.sh -f tsim/db/alter_replica_31.sim
# ,,y,script,./test.sh -f tsim/db/alter_replica_31.sim
,,y,script,./test.sh -f tsim/db/basic1.sim
,,y,script,./test.sh -f tsim/db/basic2.sim
,,y,script,./test.sh -f tsim/db/basic3.sim

View File

@ -16,6 +16,7 @@ exe:
gcc $(CFLAGS) ./dbTableRoute.c -o $(ROOT)dbTableRoute $(LFLAGS)
gcc $(CFLAGS) ./insertSameTs.c -o $(ROOT)insertSameTs $(LFLAGS)
gcc $(CFLAGS) ./passwdTest.c -o $(ROOT)passwdTest $(LFLAGS)
gcc $(CFLAGS) ./whiteListTest.c -o $(ROOT)whiteListTest $(LFLAGS)
clean:
rm $(ROOT)batchprepare
@ -23,3 +24,4 @@ clean:
rm $(ROOT)dbTableRoute
rm $(ROOT)insertSameTs
rm $(ROOT)passwdTest
rm $(ROOT)whiteListTest

View File

@ -0,0 +1,170 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o demo demo.c -ltaos
/**
* passwdTest.c
* - Run the test case in clear TDengine environment with default root passwd 'taosdata'
*/
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "taos.h" // TAOS header file
#define nDup 1
#define nRoot 10
#define nUser 10
#define USER_LEN 24
#define BUF_LEN 1024
void createUsers(TAOS *taos, const char *host);
void dropUsers(TAOS* taos);
int nPassVerNotified = 0;
int nWhiteListVerNotified = 0;
TAOS *taosu[nRoot] = {0};
char users[nUser][USER_LEN] = {0};
void __taos_notify_cb(void *param, void *ext, int type) {
switch (type) {
case TAOS_NOTIFY_PASSVER: {
++nPassVerNotified;
printf("%s:%d type:%d user:%s ver:%d\n", __func__, __LINE__, type, param ? (char *)param : "NULL", *(int *)ext);
break;
}
case TAOS_NOTIFY_WHITELIST_VER: {
++nWhiteListVerNotified;
printf("%s:%d type:%d user:%s ver:%"PRId64 "\n", __func__, __LINE__, type, param ? (char *)param : "NULL", *(int64_t *)ext);
break;
}
default:
printf("%s:%d unknown type:%d\n", __func__, __LINE__, type);
break;
}
}
void __taos_async_whitelist_cb(void *param, int code, TAOS *taos, int numOfWhiteLists, uint64_t* pWhiteList) {
if (code == 0) {
printf("fetch whitelist cb. user: %s numofWhitelist: %d\n", param ? (char*)param : NULL, numOfWhiteLists);
for (int i = 0; i < numOfWhiteLists; ++i) {
printf(" %d: 0x%llx\n", i, pWhiteList[i]);
}
} else {
printf("fetch whitelist cb error %d\n", code);
}
}
static void queryDB(TAOS *taos, char *command) {
int i;
TAOS_RES *pSql = NULL;
int32_t code = -1;
for (i = 0; i < nDup; ++i) {
if (NULL != pSql) {
taos_free_result(pSql);
pSql = NULL;
}
pSql = taos_query(taos, command);
code = taos_errno(pSql);
if (0 == code) {
break;
}
}
if (code != 0) {
fprintf(stderr, "failed to run: %s, reason: %s\n", command, taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
exit(EXIT_FAILURE);
} else {
fprintf(stderr, "success to run: %s\n", command);
}
taos_free_result(pSql);
}
int main(int argc, char *argv[]) {
char qstr[1024];
// connect to server
if (argc < 2) {
printf("please input server-ip \n");
return 0;
}
TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL) {
printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/);
exit(1);
}
createUsers(taos, argv[1]);
while (nWhiteListVerNotified < 10) {
printf("white list update notified %d times\n", nWhiteListVerNotified);
sleep(1);
}
printf("succeed in getting white list nofication. %d times\n", nWhiteListVerNotified);
dropUsers(taos);
taos_close(taos);
taos_cleanup();
}
void dropUsers(TAOS *taos) {
char qstr[1024];
for (int i = 0; i < nUser; ++i) {
sprintf(users[i], "user%d", i);
sprintf(qstr, "DROP USER %s", users[i]);
queryDB(taos, qstr);
taos_close(taosu[i]);
}
}
void createUsers(TAOS *taos, const char *host) {
char qstr[1024];
// users
for (int i = 0; i < nUser; ++i) {
sprintf(users[i], "user%d", i);
sprintf(qstr, "CREATE USER %s PASS 'taosdata'", users[i]);
queryDB(taos, qstr);
taosu[i] = taos_connect(host, users[i], "taosdata", NULL, 0);
if (taosu[i] == NULL) {
printf("failed to connect to server, user:%s, reason:%s\n", users[i], "null taos" /*taos_errstr(taos)*/);
exit(1);
}
int code = taos_set_notify_cb(taosu[i], __taos_notify_cb, users[i], TAOS_NOTIFY_WHITELIST_VER);
if (code != 0) {
fprintf(stderr, "failed to run: taos_set_notify_cb for user:%s since %d\n", users[i], code);
} else {
fprintf(stderr, "success to run: taos_set_notify_cb for user:%s\n", users[i]);
}
// alter whitelist for users
sprintf(qstr, "alter user %s add host '%d.%d.%d.%d/24'", users[i], i, i, i, i);
queryDB(taos, qstr);
taos_fetch_whitelist_a(taosu[i], __taos_async_whitelist_cb, users[i]);
}
}

View File

@ -292,11 +292,12 @@ if $rows != 1 then
return -1
endi
sql drop index $data[0][0]
#$drop_name=`$data[0][0]`
#sql drop index `$data[0][0]\`
if $rows != 0 then
return -1
endi
#if $rows != 0 then
# return -1
#endi
sql_error drop index t2
@ -304,7 +305,7 @@ sql_error drop index t3
sql create index ti0 on $mtPrefix (t1)
#sql create index ti0 on $mtPrefix (t1)
$i = $interval
while $i < $limit

View File

@ -8,7 +8,7 @@ sql create user u_read pass 'taosdata1' host '127.0.0.1/24','192.168.1.0/24'
sql create user u_write pass 'taosdata1' host '127.0.0.1','192.168.1.0'
sql alter user u_read add host '3.3.3.4/24'
sql alter user u_write drop host '4.4.4.5/25'
sql_error alter user u_write drop host '4.4.4.5/25'
sql show users
if $rows != 3 then

View File

@ -67,7 +67,7 @@ set "FILE_NAME=testSuite.sim"
if "%1" == "-f" set "FILE_NAME=%2"
set FILE_NAME=%FILE_NAME:/=\%
start cmd /k "timeout /t 600 /NOBREAK && taskkill /f /im tsim.exe & exit /b"
start cmd /k "timeout /t 800 /NOBREAK && taskkill /f /im tsim.exe & exit /b"
rem echo FILE_NAME: %FILE_NAME%
echo ExcuteCmd: %tsim% -c %CFG_DIR% -f %FILE_NAME%

View File

@ -217,7 +217,7 @@ class TDTestCase:
tdSql.checkEqual(20470,len(tdSql.queryResult))
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdSql.checkEqual(193, len(tdSql.queryResult))
tdSql.checkEqual(194, len(tdSql.queryResult))
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(54, len(tdSql.queryResult))

View File

@ -24,7 +24,7 @@ class TDTestCase:
for i in range(self.tables):
tdSql.execute(f'create table t{i} using stb tags({i}) ttl {self.ttl}')
time.sleep(self.ttl * 2)
time.sleep(self.ttl + 3)
tdSql.query('show tables')
tdSql.checkRows(90)

View File

@ -166,22 +166,23 @@ class TDTestCase:
nodePort = 6030 + i*100
newTdSql=tdCom.newTdSql(port=nodePort)
tdDnodes[1].stoptaosd()
dataPath = tdDnodes[1].dataDir
os.system(f"rm -rf {dataPath}/*")
os.system(f"rm -rf {dataPath}/.runing")
tdDnodes[1].stoptaosd()
tdDnodes[1].starttaosd()
sleep(5)
for i in range(6):
nodePort = 6030 + i*100
newTdSql=tdCom.newTdSql(port=nodePort)
tdDnodes[0].stoptaosd()
dataPath = tdDnodes[0].dataDir
os.system(f"rm -rf {dataPath}/*")
os.system(f"rm -rf {dataPath}/.runing")
tdDnodes[0].stoptaosd()
tdDnodes[0].starttaosd()
sleep(5)
for i in range(6):

View File

@ -109,7 +109,7 @@ if __name__ == "__main__":
websocket = False
replicaVar = 1
asan = False
independentMnode = True
independentMnode = False
previousCluster = False
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWD:n:i:aP', [
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums','queryPolicy','createDnodeNums','restful','websocket','adaptercfgupdate','replicaVar','independentMnode','previous'])
@ -202,7 +202,7 @@ if __name__ == "__main__":
createDnodeNums = value
if key in ['-i', '--independentMnode']:
independentMnode = False
independentMnode = value
if key in ['-R', '--restful']:
restful = True