From aefebfca7fcb7ddee7d4301cb5f34efbaa2e616e Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 28 Jun 2023 20:49:29 +0800 Subject: [PATCH] enh: update of user auth version --- include/common/tmsg.h | 1 + source/client/inc/clientInt.h | 1 + source/client/src/clientHb.c | 156 ++++++++++++++++++--- source/client/src/clientMsgHandler.c | 1 + source/common/src/tmsg.c | 7 + source/dnode/mnode/impl/src/mndPrivilege.c | 1 + source/dnode/mnode/impl/src/mndProfile.c | 1 + source/dnode/mnode/impl/src/mndUser.c | 2 + source/libs/parser/src/parAuthenticator.c | 21 +++ 9 files changed, 174 insertions(+), 17 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9b392c0240..0757496e54 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -636,6 +636,7 @@ typedef struct { SEpSet epSet; int32_t svrTimestamp; int32_t passVer; + int32_t authVer; char sVer[TSDB_VERSION_LEN]; char sDetailVer[128]; } SConnectRsp; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 18891bb932..4140574bb6 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -146,6 +146,7 @@ typedef struct STscObj { int64_t id; // ref ID returned by taosAddRef TdThreadMutex mutex; // used to protect the operation on db int32_t numOfReqs; // number of sqlObj bound to this connection + int32_t authVer; SAppInstInfo* pAppInfo; SHashObj* pRequests; SPassInfo passInfo; diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 2dddfec2bd..53326396d7 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -25,6 +25,7 @@ typedef struct { int64_t clusterId; int32_t passKeyCnt; int32_t passVer; + int32_t authVer; int32_t reqCnt; }; }; @@ -39,7 +40,8 @@ static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } -static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { +static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog, + SAppHbMgr *pAppHbMgr) { int32_t code = 0; SUserAuthBatchRsp batchRsp = {0}; @@ -48,14 +50,39 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC return -1; } - int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray); + int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray); for (int32_t i = 0; i < numOfBatchs; ++i) { SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i); tscDebug("hb user auth rsp, user:%s, version:%d", rsp->user, rsp->version); catalogUpdateUserAuthInfo(pCatalog, rsp); } +#if 1 + SClientHbReq *pReq = NULL; + while ((pReq = taosHashIterate(pAppHbMgr->activeInfo, pReq))) { + STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid); + if (!pTscObj) { + continue; + } + for (int32_t i = 0; i < numOfBatchs; ++i) { + SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i); + pTscObj->authVer = rsp->version; + if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) { + if (pTscObj->sysInfo != rsp->sysInfo) { + printf("update sysInfo of user %s from %" PRIi8 " to %" PRIi8 ", tscRid:%" PRIi64 "\n", rsp->user, + pTscObj->sysInfo, rsp->sysInfo, pTscObj->id); + pTscObj->sysInfo = rsp->sysInfo; + } else { + printf("not update sysInfo of user %s since not change: %" PRIi8 ", tscRid:%" PRIi64 "\n", rsp->user, + pTscObj->sysInfo, pTscObj->id); + } + break; + } + } + releaseTscObj(pReq->connKey.tscRid); + } +#endif taosArrayDestroy(batchRsp.pArray); return TSDB_CODE_SUCCESS; } @@ -78,10 +105,10 @@ static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHb continue; } SPassInfo *passInfo = &pTscObj->passInfo; - if (!passInfo->fp) { - releaseTscObj(pReq->connKey.tscRid); - continue; - } + // if (!passInfo->fp) { + // releaseTscObj(pReq->connKey.tscRid); + // continue; + // } for (int32_t i = 0; i < numOfBatchs; ++i) { SGetUserPassRsp *rsp = taosArrayGet(batchRsp.pArray, i); @@ -92,7 +119,7 @@ static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHb if (passInfo->fp) { (*passInfo->fp)(passInfo->param, &passInfo->ver, TAOS_NOTIFY_PASSVER); } - tscDebug("update passVer of user %s from %d to %d, tscRid:%" PRIi64, rsp->user, oldVer, + printf("update passVer of user %s from %d to %d, tscRid:%" PRIi64 "\n", rsp->user, oldVer, atomic_load_32(&passInfo->ver), pTscObj->id); } break; @@ -316,7 +343,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { break; } - hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog); + hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr); break; } case HEARTBEAT_KEY_DBINFO: { @@ -556,6 +583,17 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClien goto _return; } + SKv kv = {.key = HEARTBEAT_KEY_USER_PASSINFO}; + SKv *pKv = NULL; + if ((pKv = taosHashGet(req->info, &kv.key, sizeof(kv.key)))) { + SUserPassVersion *pPassVer = (SUserPassVersion *)pKv->value; + tscDebug("hb got user basic info, already exists:%s, update passVer from %d to %d", pTscObj->user, + pPassVer->version, pTscObj->passInfo.ver); + pPassVer->version = pTscObj->passInfo.ver; + if (param) param->passVer = pPassVer->version; + goto _return; + } + SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion)); if (!user) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -564,11 +602,8 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClien strncpy(user->user, pTscObj->user, TSDB_USER_LEN); user->version = htonl(pTscObj->passInfo.ver); - SKv kv = { - .key = HEARTBEAT_KEY_USER_PASSINFO, - .valueLen = sizeof(SUserPassVersion), - .value = user, - }; + kv.valueLen = sizeof(SUserPassVersion); + kv.value = user; tscDebug("hb got user basic info, valueLen:%d, user:%s, passVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user, pTscObj->passInfo.ver, connKey->tscRid); @@ -578,6 +613,7 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClien } if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) < 0) { + taosMemoryFreeClear(user); code = terrno ? terrno : TSDB_CODE_APP_ERROR; goto _return; } @@ -596,6 +632,89 @@ _return: return code; } +static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) { + STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); + if (!pTscObj) { + tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); + return TSDB_CODE_APP_ERROR; + } + + int32_t code = 0; + + if (param && (param->authVer != INT32_MIN) && (param->authVer <= pTscObj->authVer)) { + tscDebug("hb got user auth info, no need since authVer %d <= %d", param->authVer, pTscObj->authVer); + goto _return; + } + + SKv kv = {.key = HEARTBEAT_KEY_USER_AUTHINFO}; + SKv *pKv = NULL; + if ((pKv = taosHashGet(req->info, &kv.key, sizeof(kv.key)))) { + int32_t userNum = pKv->valueLen / sizeof(SUserAuthVersion); + SUserAuthVersion *pUserAuths = (SUserAuthVersion *)pKv->value; + for (int32_t i = 0; i < userNum; ++i) { + SUserAuthVersion *pUserAuth = pUserAuths + i; + // user exist + if (strncmp(pUserAuth->user, pTscObj->user, TSDB_USER_LEN) == 0) { + if (htonl(pUserAuth->version) > pTscObj->authVer) { + pUserAuth->version = htonl(pTscObj->authVer); + } + if (param) param->authVer = htonl(pUserAuth->version); + goto _return; + } + } + // key exists, but user not exist + SUserAuthVersion *qUserAuth = + (SUserAuthVersion *)taosMemoryRealloc(pKv->value, (userNum + 1) * sizeof(SUserAuthVersion)); + if (qUserAuth) { + strncpy((qUserAuth + userNum)->user, pTscObj->user, TSDB_USER_LEN); + (qUserAuth + userNum)->version = htonl(pTscObj->authVer); + pKv->value = qUserAuth; + pKv->valueLen += sizeof(SUserAuthVersion); + if (param) param->authVer = pTscObj->authVer; + } else { + code = TSDB_CODE_OUT_OF_MEMORY; + } + goto _return; + } + + SUserAuthVersion *user = taosMemoryMalloc(sizeof(SUserAuthVersion)); + if (!user) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _return; + } + strncpy(user->user, pTscObj->user, TSDB_USER_LEN); + user->version = htonl(pTscObj->authVer); + kv.valueLen = sizeof(SUserAuthVersion); + kv.value = user; + + tscDebug("hb got user auth info, valueLen:%d, user:%s, authVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user, + pTscObj->authVer, connKey->tscRid); + + if (!req->info) { + req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + } + + if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) < 0) { + taosMemoryFreeClear(user); + code = terrno ? terrno : TSDB_CODE_APP_ERROR; + goto _return; + } + + if (param) { + param->authVer = pTscObj->authVer; + } + +_return: + releaseTscObj(connKey->tscRid); + if (code) { + tscError("hb got user auth info failed since %s", terrstr(code)); + } + + return code; +} + + + int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { SUserAuthVersion *users = NULL; uint32_t userNum = 0; @@ -748,11 +867,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req hbGetQueryBasicInfo(connKey, req); - if (hbParam->passKeyCnt > 0) { + // if (hbParam->passKeyCnt > 0) { hbGetUserBasicInfo(connKey, hbParam, req); - } + // } - if (hbParam->reqCnt == 0) { + if (hbParam->reqCnt == 0) { code = hbGetExpiredUserInfo(connKey, pCatalog, req); if (TSDB_CODE_SUCCESS != code) { return code; @@ -768,9 +887,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req return code; } } - ++hbParam->reqCnt; // success to get catalog info + // N.B. put after hbGetExpiredUserInfo + hbGetUserAuthInfo(connKey, hbParam, req); + return TSDB_CODE_SUCCESS; } @@ -815,6 +936,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { // init param.clusterId = pOneReq->clusterId; param.passVer = INT32_MIN; + param.authVer = INT32_MIN; } param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt); break; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 6d53f2b4c5..ae751f430c 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -130,6 +130,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { pTscObj->connType = connectRsp.connType; pTscObj->passInfo.ver = connectRsp.passVer; + pTscObj->authVer = connectRsp.authVer; hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ac035e0a2b..a07f3ca0f4 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4154,6 +4154,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tEncodeCStr(&encoder, pRsp->sVer) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1; if (tEncodeI32(&encoder, pRsp->passVer) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->authVer) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -4180,8 +4181,14 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (!tDecodeIsEnd(&decoder)) { if (tDecodeI32(&decoder, &pRsp->passVer) < 0) return -1; + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI32(&decoder, &pRsp->authVer) < 0) return -1; + } else { + pRsp->authVer = 0; + } } else { pRsp->passVer = 0; + pRsp->authVer = 0; } tEndDecode(&decoder); diff --git a/source/dnode/mnode/impl/src/mndPrivilege.c b/source/dnode/mnode/impl/src/mndPrivilege.c index de0374c6e8..0ddd8dcf77 100644 --- a/source/dnode/mnode/impl/src/mndPrivilege.c +++ b/source/dnode/mnode/impl/src/mndPrivilege.c @@ -37,6 +37,7 @@ int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp pRsp->superAuth = 1; pRsp->enable = pUser->enable; pRsp->version = pUser->authVersion; + pRsp->sysInfo = pUser->sysInfo; return 0; } #endif \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 0bfab227c4..4655ebe27f 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -287,6 +287,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { connectRsp.dnodeNum = mndGetDnodeSize(pMnode); connectRsp.svrTimestamp = taosGetTimestampSec(); connectRsp.passVer = pUser->passVersion; + connectRsp.authVer = pUser->authVersion; strcpy(connectRsp.sVer, version); snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 3da594109a..29fcb804a5 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -1383,6 +1383,8 @@ int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_ pUsers[i].version = ntohl(pUsers[i].version); if (pUser->authVersion <= pUsers[i].version) { + printf("%s:%d pUser->authVersion:%d <= pUsers[i].version:%d\n", __func__, __LINE__, pUser->authVersion, + pUsers[i].version); mndReleaseUser(pMnode, pUser); continue; } diff --git a/source/libs/parser/src/parAuthenticator.c b/source/libs/parser/src/parAuthenticator.c index 9b2ac662c8..136a8d516f 100644 --- a/source/libs/parser/src/parAuthenticator.c +++ b/source/libs/parser/src/parAuthenticator.c @@ -164,6 +164,25 @@ static int32_t authDropUser(SAuthCxt* pCxt, SDropUserStmt* pStmt) { } return TSDB_CODE_SUCCESS; } +#if 0 +static int32_t authAlterUser(SAuthCxt* pCxt, SAlterUserStmt* pStmt) { + SParseContext* pParseCxt = pCxt->pParseCxt; + + SUserAuthInfo authInfo = {0}; + snprintf(authInfo.user, sizeof(authInfo.user), "%s", pStmt->userName); + authInfo.type = AUTH_TYPE_OTHER; + + int32_t code = TSDB_CODE_SUCCESS; + SUserAuthRes authRes = {0}; + SRequestConnInfo conn = {.pTrans = pParseCxt->pTransporter, + .requestId = pParseCxt->requestId, + .requestObjRefId = pParseCxt->requestRid, + .mgmtEps = pParseCxt->mgmtEpSet}; + code = catalogChkAuth(pParseCxt->pCatalog, &conn, &authInfo, &authRes); + + return TSDB_CODE_SUCCESS == code ? (authRes.pass ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED) : code; +} +#endif static int32_t authDelete(SAuthCxt* pCxt, SDeleteStmt* pDelete) { SNode* pTagCond = NULL; @@ -246,6 +265,8 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) { return authSelect(pCxt, (SSelectStmt*)pStmt); case QUERY_NODE_DROP_USER_STMT: return authDropUser(pCxt, (SDropUserStmt*)pStmt); + // case QUERY_NODE_ALTER_USER_STMT: + // return authAlterUser(pCxt, (SAlterUserStmt*)pStmt); case QUERY_NODE_DELETE_STMT: return authDelete(pCxt, (SDeleteStmt*)pStmt); case QUERY_NODE_INSERT_STMT: