Merge pull request #22998 from taosdata/enh/TS-3934-3.0

enh:  notify and remove cache when user dropped
This commit is contained in:
dapan1121 2023-09-21 19:22:21 +08:00 committed by GitHub
commit 982dd6a373
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 160 additions and 32 deletions

View File

@ -125,7 +125,8 @@ typedef enum {
typedef enum {
TAOS_NOTIFY_PASSVER = 0,
TAOS_NOTIFY_WHITELIST_VER = 1
TAOS_NOTIFY_WHITELIST_VER = 1,
TAOS_NOTIFY_USER_DROPPED = 2,
} TAOS_NOTIFY_TYPE;
#define RET_MSG_LENGTH 1024

View File

@ -959,7 +959,7 @@ typedef struct {
int8_t superAuth;
int8_t sysInfo;
int8_t enable;
int8_t reserve;
int8_t dropped;
SHashObj* createdDbs;
SHashObj* readDbs;
SHashObj* writeDbs;

View File

@ -133,33 +133,37 @@ typedef struct {
int32_t ver;
void* param;
__taos_notify_fn_t fp;
} SPassInfo;
} STscNotifyInfo;
typedef STscNotifyInfo SPassInfo;
typedef struct {
int64_t ver;
void* param;
int64_t ver;
void* param;
__taos_notify_fn_t fp;
} SWhiteListInfo;
typedef struct STscObj {
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_FNAME_LEN];
char sVer[TSDB_VERSION_LEN];
char sDetailVer[128];
int8_t sysInfo;
int8_t connType;
int32_t acctId;
uint32_t connId;
int64_t id; // ref ID returned by taosAddRef
TdThreadMutex mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection
int32_t authVer;
SAppInstInfo* pAppInfo;
SHashObj* pRequests;
SPassInfo passInfo;
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_FNAME_LEN];
char sVer[TSDB_VERSION_LEN];
char sDetailVer[128];
int8_t sysInfo;
int8_t connType;
int8_t dropped;
int8_t biMode;
int32_t acctId;
uint32_t connId;
int64_t id; // ref ID returned by taosAddRef
TdThreadMutex mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection
int32_t authVer;
SAppInstInfo* pAppInfo;
SHashObj* pRequests;
SPassInfo passInfo;
SWhiteListInfo whiteListInfo;
int8_t biMode;
STscNotifyInfo userDroppedInfo;
} STscObj;
typedef struct STscDbg {

View File

@ -96,6 +96,19 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
}
}
if (pRsp->dropped == 1) {
if (atomic_val_compare_exchange_8(&pTscObj->dropped, 0, 1) == 0) {
if (pTscObj->userDroppedInfo.fp) {
SPassInfo *dropInfo = &pTscObj->userDroppedInfo;
if (dropInfo->fp) {
(*dropInfo->fp)(dropInfo->param, NULL, TAOS_NOTIFY_USER_DROPPED);
}
}
}
releaseTscObj(pReq->connKey.tscRid);
continue;
}
pTscObj->authVer = pRsp->version;
if (pTscObj->sysInfo != pRsp->sysInfo) {
@ -768,6 +781,16 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
SHbParam *hbParam = (SHbParam *)param;
SCatalog *pCatalog = NULL;
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
if (!pTscObj) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
return TSDB_CODE_APP_ERROR;
} else if (atomic_load_8(&pTscObj->dropped) == 1) {
tscDebug("tscObj rid %" PRIx64 " user:%s dropped", connKey->tscRid, pTscObj->user);
releaseTscObj(connKey->tscRid);
return TSDB_CODE_SUCCESS;
}
if (hbParam->reqCnt == 0) {
code = catalogGetHandle(hbParam->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -147,6 +147,13 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)
taosThreadMutexUnlock(&pObj->mutex);
break;
}
case TAOS_NOTIFY_USER_DROPPED: {
taosThreadMutexLock(&pObj->mutex);
pObj->userDroppedInfo.fp = fp;
pObj->userDroppedInfo.param = param;
taosThreadMutexUnlock(&pObj->mutex);
break;
}
default: {
terrno = TSDB_CODE_INVALID_PARA;
releaseTscObj(*(int64_t *)taos);

View File

@ -1652,7 +1652,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
if (tEncodeI8(pEncoder, pRsp->superAuth) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->sysInfo) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->enable) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->reserve) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->dropped) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->version) < 0) return -1;
int32_t numOfCreatedDbs = taosHashGetSize(pRsp->createdDbs);
@ -1767,7 +1767,7 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
if (tDecodeI8(pDecoder, &pRsp->superAuth) < 0) goto _err;
if (tDecodeI8(pDecoder, &pRsp->sysInfo) < 0) goto _err;
if (tDecodeI8(pDecoder, &pRsp->enable) < 0) goto _err;
if (tDecodeI8(pDecoder, &pRsp->reserve) < 0) goto _err;
if (tDecodeI8(pDecoder, &pRsp->dropped) < 0) goto _err;
if (tDecodeI32(pDecoder, &pRsp->version) < 0) goto _err;
int32_t numOfCreatedDbs = 0;

View File

@ -2289,6 +2289,11 @@ int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_
for (int32_t i = 0; i < numOfUses; ++i) {
SUserObj *pUser = mndAcquireUser(pMnode, pUsers[i].user);
if (pUser == NULL) {
if (TSDB_CODE_MND_USER_NOT_EXIST == terrno) {
SGetUserAuthRsp rsp = {.dropped = 1};
memcpy(rsp.user, pUsers[i].user, TSDB_USER_LEN);
taosArrayPush(batchRsp.pArray, &rsp);
}
mError("user:%s, failed to auth user since %s", pUsers[i].user, terrstr());
continue;
}

View File

@ -975,6 +975,7 @@ void ctgFreeQNode(SCtgQNode* node);
void ctgClearHandle(SCatalog* pCtg);
void ctgFreeTbCacheImpl(SCtgTbCache* pCache, bool lock);
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName);
int32_t ctgRemoveCacheUser(SCatalog* pCtg, const char* user);
int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup,
bool* exists);
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch);

View File

@ -2243,11 +2243,15 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user));
if (NULL == pUser) {
if (msg->userAuth.dropped == 1) {
goto _return;
}
SCtgUserAuth userAuth = {0};
memcpy(&userAuth.userAuth, &msg->userAuth, sizeof(msg->userAuth));
userAuth.userCacheSize = ctgGetUserCacheSize(&userAuth.userAuth);
if (taosHashPut(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user), &userAuth, sizeof(userAuth))) {
ctgError("taosHashPut user %s to cache failed", msg->userAuth.user);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
@ -2258,6 +2262,11 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
CTG_CACHE_NUM_INC(CTG_CI_USER, 1);
return TSDB_CODE_SUCCESS;
} else if (msg->userAuth.dropped == 1) {
if (ctgRemoveCacheUser(pCtg, msg->userAuth.user) == 0) {
CTG_CACHE_NUM_DEC(CTG_CI_USER, 1);
}
goto _return;
}
CTG_LOCK(CTG_WRITE, &pUser->lock);

View File

@ -311,6 +311,22 @@ void ctgFreeHandleImpl(SCatalog* pCtg) {
taosMemoryFree(pCtg);
}
int32_t ctgRemoveCacheUser(SCatalog* pCtg, const char* user) {
if (!pCtg || !user) {
return -1;
}
SCtgUserAuth* pUser = (SCtgUserAuth*)taosHashGet(pCtg->userCache, user, strlen(user));
if (pUser) {
ctgFreeSCtgUserAuth(pUser);
if (taosHashRemove(pCtg->userCache, user, strlen(user)) == 0) {
return 0; // user found and removed
}
}
return -1;
}
void ctgFreeHandle(SCatalog* pCtg) {
if (NULL == pCtg) {
return;

View File

@ -47,8 +47,10 @@ typedef uint16_t VarDataLenT;
void createUsers(TAOS *taos, const char *host, char *qstr);
void passVerTestMulti(const char *host, char *qstr);
void sysInfoTest(TAOS *taos, const char *host, char *qstr);
void userDroppedTest(TAOS *taos, const char *host, char *qstr);
int nPassVerNotified = 0;
int nUserDropped = 0;
TAOS *taosu[nRoot] = {0};
char users[nUser][USER_LEN] = {0};
@ -56,11 +58,16 @@ void __taos_notify_cb(void *param, void *ext, int type) {
switch (type) {
case TAOS_NOTIFY_PASSVER: {
++nPassVerNotified;
printf("%s:%d type:%d user:%s ver:%d\n", __func__, __LINE__, type, param ? (char *)param : "NULL", *(int *)ext);
printf("%s:%d type:%d user:%s passVer:%d\n", __func__, __LINE__, type, param ? (char *)param : "NULL", *(int *)ext);
break;
}
case TAOS_NOTIFY_USER_DROPPED: {
++nUserDropped;
printf("%s:%d type:%d user:%s dropped\n", __func__, __LINE__, type, param ? (char *)param : "NULL");
break;
}
default:
printf("%s:%d unknown type:%d\n", __func__, __LINE__, type);
printf("%s:%d unknown notify type:%d\n", __func__, __LINE__, type);
break;
}
}
@ -202,6 +209,7 @@ int main(int argc, char *argv[]) {
createUsers(taos, argv[1], qstr);
passVerTestMulti(argv[1], qstr);
sysInfoTest(taos, argv[1], qstr);
userDroppedTest(taos, argv[1], qstr);
taos_close(taos);
taos_cleanup();
@ -223,9 +231,9 @@ void createUsers(TAOS *taos, const char *host, char *qstr) {
int code = taos_set_notify_cb(taosu[i], __taos_notify_cb, users[i], TAOS_NOTIFY_PASSVER);
if (code != 0) {
fprintf(stderr, "failed to run: taos_set_notify_cb for user:%s since %d\n", users[i], code);
fprintf(stderr, "failed to run: taos_set_notify_cb(TAOS_NOTIFY_PASSVER) for user:%s since %d\n", users[i], code);
} else {
fprintf(stderr, "success to run: taos_set_notify_cb for user:%s\n", users[i]);
fprintf(stderr, "success to run: taos_set_notify_cb(TAOS_NOTIFY_PASSVER) for user:%s\n", users[i]);
}
// alter pass for users
@ -290,17 +298,19 @@ void passVerTestMulti(const char *host, char *qstr) {
}
fprintf(stderr, "######## %s #########\n", __func__);
if (nPassVerNotified >= nConn) {
fprintf(stderr, ">>> succeed to get passVer notification since nNotify %d >= nConn %d\n", nPassVerNotified,
if (nPassVerNotified == nConn) {
fprintf(stderr, ">>> succeed to get passVer notification since nNotify %d == nConn %d\n", nPassVerNotified,
nConn);
} else {
fprintf(stderr, ">>> failed to get passVer notification since nNotify %d < nConn %d\n", nPassVerNotified, nConn);
fprintf(stderr, ">>> failed to get passVer notification since nNotify %d != nConn %d\n", nPassVerNotified, nConn);
exit(1);
}
fprintf(stderr, "######## %s #########\n", __func__);
// sleep(300);
}
void sysInfoTest(TAOS *taosRoot, const char *host, char *qstr) {
fprintf(stderr, "######## %s entry #########\n", __func__);
TAOS *taos[nRoot] = {0};
char userName[USER_LEN] = "user0";
@ -376,4 +386,56 @@ _REP:
fprintf(stderr, "######## %s #########\n", __func__);
fprintf(stderr, ">>> succeed to run sysInfoTest\n");
fprintf(stderr, "######## %s #########\n", __func__);
}
void userDroppedTest(TAOS *taos, const char *host, char *qstr) {
// users
int nTestUsers = nUser;
for (int i = 0; i < nTestUsers; ++i) {
// sprintf(users[i], "user%d", i);
taosu[i] = taos_connect(host, users[i], "taos", NULL, 0);
if (taosu[i] == NULL) {
printf("failed to connect to server, user:%s, reason:%s\n", users[i], "null taos" /*taos_errstr(taos)*/);
exit(1);
}
int code = taos_set_notify_cb(taosu[i], __taos_notify_cb, users[i], TAOS_NOTIFY_USER_DROPPED);
if (code != 0) {
fprintf(stderr, "failed to run: taos_set_notify_cb:%d for user:%s since %d\n", TAOS_NOTIFY_USER_DROPPED, users[i],
code);
} else {
fprintf(stderr, "success to run: taos_set_notify_cb:%d for user:%s\n", TAOS_NOTIFY_USER_DROPPED, users[i]);
}
}
for (int i = 0; i < nTestUsers; ++i) {
// drop user0 ... user${nUser}
sprintf(qstr, "drop user %s", users[i]);
queryDB(taos, qstr);
}
// calculate the nUserDropped for users
int nConn = nTestUsers;
for (int i = 0; i < 15; ++i) {
printf("%s:%d [%d] second(s) elasped, user dropped notification received:%d, total:%d\n", __func__, __LINE__, i,
nUserDropped, nConn);
if (nUserDropped >= nConn) break;
sleep(1);
}
for (int i = 0; i < nTestUsers; ++i) {
taos_close(taosu[i]);
printf("%s:%d close taosu[%d]\n", __func__, __LINE__, i);
sleep(1);
}
fprintf(stderr, "######## %s #########\n", __func__);
if (nUserDropped == nConn) {
fprintf(stderr, ">>> succeed to get user dropped notification since nNotify %d == nConn %d\n", nUserDropped, nConn);
} else {
fprintf(stderr, ">>> failed to get user dropped notification since nNotify %d != nConn %d\n", nUserDropped, nConn);
exit(1);
}
fprintf(stderr, "######## %s #########\n", __func__);
// sleep(300);
}