add rpc update interface

This commit is contained in:
yihaoDeng 2023-09-10 14:09:36 +08:00
parent 56706193eb
commit 25bc0b226b
9 changed files with 82 additions and 13 deletions

View File

@ -177,7 +177,7 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE, "retrieve_ip_white", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE, "retrieve-ip-white", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)

View File

@ -37,6 +37,7 @@ typedef struct {
int64_t applyIndex;
uint64_t applyTerm;
char user[TSDB_USER_LEN];
} SRpcConnInfo;
typedef struct SRpcHandleInfo {
@ -60,6 +61,8 @@ typedef struct SRpcHandleInfo {
STraceId traceId;
SRpcConnInfo conn;
int8_t forbiddenIp;
} SRpcHandleInfo;
typedef struct SRpcMsg {

View File

@ -123,6 +123,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_INVALID_DATA_FMT TAOS_DEF_ERROR_CODE(0, 0x0132)
#define TSDB_CODE_INVALID_CFG_VALUE TAOS_DEF_ERROR_CODE(0, 0x0133)
#define TSDB_CODE_IP_NOT_IN_WHITE_LIST TAOS_DEF_ERROR_CODE(0, 0x0134)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
#define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201)

View File

@ -221,7 +221,7 @@ static const SSysDbTableSchema userUsersSchema[] = {
{.name = "enable", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = false},
{.name = "sysinfo", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = false},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "host", .bytes = TSDB_PRIVILEDGE_HOST_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "allowed_host", .bytes = TSDB_PRIVILEDGE_HOST_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
GRANTS_SCHEMA;

View File

@ -91,6 +91,18 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
goto _OVER;
}
if (pRpc->info.forbiddenIp == 1) {
struct in_addr addr;
addr.s_addr = pRpc->info.conn.clientIp;
char tbuf[40] = {0};
uv_inet_ntop(AF_INET, &addr, tbuf, 40);
dError("User %s host:%s not in ip white list", pRpc->info.conn.user, tbuf);
terrno = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
goto _OVER;
}
switch (pRpc->msgType) {
case TDMT_DND_NET_TEST:
dmProcessNetTestReq(pDnode, pRpc);

View File

@ -139,7 +139,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
mndUpdateIpWhite("_dnd", dnodeObj.fqdn, IP_WHITE_ADD, 1);
mndUpdateIpWhite(TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD, 1);
_OVER:
mndTransDrop(pTrans);
@ -705,7 +705,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
mndUpdateIpWhite("_dnd", dnodeObj.fqdn, IP_WHITE_ADD, 1);
mndUpdateIpWhite(TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD, 1);
_OVER:
mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
@ -1087,7 +1087,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
mndUpdateIpWhite("_dnd", pDnode->fqdn, IP_WHITE_DROP, 1);
mndUpdateIpWhite(TSDB_DEFAULT_USER, pDnode->fqdn, IP_WHITE_DROP, 1);
code = 0;
_OVER:

View File

@ -271,7 +271,9 @@ int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) {
taosThreadRwlockWrlock(&ipWhiteMgt.rw);
ver = ipWhiteMgt.ver;
int32_t num = taosHashGetSize(ipWhiteMgt.pIpWhiteTab);
pUpdate->pUserIpWhite = taosMemoryCalloc(1, num * sizeof(SUpdateUserIpWhite));
void *pIter = taosHashIterate(ipWhiteMgt.pIpWhiteTab, NULL);
int32_t i = 0;
while (pIter) {
@ -291,6 +293,7 @@ int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) {
pIter = taosHashIterate(ipWhiteMgt.pIpWhiteTab, pIter);
}
pUpdate->numOfUser = i;
pUpdate->ver = ver;
taosThreadRwlockUnlock(&ipWhiteMgt.rw);
return 0;
@ -328,7 +331,7 @@ SHashObj *mndFetchAllIpWhite(SMnode *pMnode) {
for (int i = 0; i < taosArrayGetSize(fqdns); i++) {
char *fqdn = taosArrayGetP(fqdns, i);
mndUpdateIpWhiteImpl(pIpWhiteTab, "_dnd", fqdn, IP_WHITE_ADD);
mndUpdateIpWhiteImpl(pIpWhiteTab, TSDB_DEFAULT_USER, fqdn, IP_WHITE_ADD);
taosMemoryFree(fqdn);
}

View File

@ -290,7 +290,7 @@ SWhiteList* uvWhiteListCreate() {
SWhiteList* pWhiteList = taosMemoryCalloc(1, sizeof(SWhiteList));
pWhiteList->pList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK);
pWhiteList->ver = 0;
pWhiteList->ver = -1;
return pWhiteList;
}
void uvWhiteListDestroy(SWhiteList* pWhite) {
@ -307,6 +307,51 @@ void uvWhiteListDestroy(SWhiteList* pWhite) {
taosMemoryFree(pWhite);
}
void uvWhiteListToStr(SWhiteUserList* plist, char* user, char** ppBuf) {
int32_t len = 0;
char* pBuf = taosMemoryCalloc(1, plist->pList->num * 36);
len = sprintf(pBuf, "user: %s, ver: %" PRId64 ", ip: {", user, plist->ver);
for (int i = 0; i < plist->pList->num; i++) {
SIpV4Range* pRange = &plist->pList->pIpRange[i];
{
char tbuf[32] = {0};
struct in_addr addr;
addr.s_addr = pRange->ip;
uv_inet_ntop(AF_INET, &addr, tbuf, 32);
len += sprintf(pBuf + len, "%s", tbuf);
if (pRange->mask != 0) {
len += sprintf(pBuf + len, "%d", pRange->mask);
}
}
if (i == plist->pList->num - 1) {
len += sprintf(pBuf + len, "}");
} else {
len += sprintf(pBuf + len, ",");
}
}
pBuf[len] = 0;
*ppBuf = pBuf;
}
void uvWhiteListDebug(SWhiteList* pWrite) {
SHashObj* pWhiteList = pWrite->pList;
void* pIter = taosHashIterate(pWhiteList, NULL);
while (pIter) {
size_t klen = 0;
char user[TSDB_USER_LEN + 1] = {0};
char* pUser = taosHashGetKey(pIter, &klen);
memcpy(user, pUser, klen);
SWhiteUserList* pUserList = *(SWhiteUserList**)pIter;
char* buf = NULL;
uvWhiteListToStr(pUserList, user, &buf);
tDebug("white %s", buf);
taosMemoryFree(buf);
pIter = taosHashIterate(pWhiteList, pIter);
}
}
void uvWhiteListAdd(SWhiteList* pWhite, char* user, SIpWhiteList* plist, int64_t ver) {
SHashObj* pWhiteList = pWhite->pList;
@ -325,6 +370,7 @@ void uvWhiteListAdd(SWhiteList* pWhite, char* user, SIpWhiteList* plist, int64_t
pUserList->ver = ver;
pUserList->pList = plist;
}
uvWhiteListDebug(pWhite);
}
void uvWhiteListUpdate(SWhiteList* pWhite, SHashObj* pTable) {
@ -338,7 +384,7 @@ bool uvWhiteListFilte(SWhiteList* pWhite, char* user, uint32_t ip, int64_t ver)
bool valid = false;
SWhiteUserList** ppList = taosHashGet(pWhiteList, user, strlen(user));
if (ppList == NULL || *ppList == NULL) {
return true;
return false;
}
SWhiteUserList* pList = *ppList;
if (pList->ver == ver) return true;
@ -354,7 +400,9 @@ bool uvWhiteListFilte(SWhiteList* pWhite, char* user, uint32_t ip, int64_t ver)
return valid;
}
bool uvWhiteListCheckConn(SWhiteList* pWhite, SSvrConn* pConn) {
if (pWhite->ver == pConn->whiteListVer || strncmp(pConn->user, "_dnd", strlen("_dnd")) == 0) return true;
if (pConn->inType == TDMT_MND_STATUS || pConn->inType == TDMT_MND_RETRIEVE_IP_WHITE ||
pWhite->ver == pConn->whiteListVer /*|| strncmp(pConn->user, "_dnd", strlen("_dnd")) == 0*/)
return true;
return uvWhiteListFilte(pWhite, pConn->user, pConn->clientIp, pConn->whiteListVer);
}
@ -382,11 +430,11 @@ static bool uvHandleReq(SSvrConn* pConn) {
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
pConn->inType = pHead->msgType;
memcpy(pConn->user, pHead->user, strlen(pHead->user));
if (uvWhiteListCheckConn(pThrd->pWhiteList, pConn) == false) {
return false;
} else {
int8_t forbiddenIp = uvWhiteListCheckConn(pThrd->pWhiteList, pConn) == false ? 1 : 0;
if (forbiddenIp == 0) {
uvWhiteListSetConnVer(pThrd->pWhiteList, pConn);
}
@ -408,7 +456,6 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code;
pConn->inType = pHead->msgType;
if (pConn->status == ConnNormal) {
if (pHead->persist == 1) {
pConn->status = ConnAcquire;
@ -451,6 +498,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.info.refId = pConn->refId;
transMsg.info.traceId = pHead->traceId;
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
transMsg.info.forbiddenIp = forbiddenIp;
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn,
pConn->refId);

View File

@ -100,6 +100,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_DATA_FMT, "Invalid data format")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG_VALUE, "Invalid configuration value")
TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "User ip not in ip white list")
//client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")