diff --git a/include/common/tmsg.h b/include/common/tmsg.h index af05b52362..3889784f2f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -73,7 +73,8 @@ typedef uint16_t tmsg_t; enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__MAX }; enum { - HEARTBEAT_KEY_DBINFO = 1, + HEARTBEAT_KEY_USER_AUTHINFO = 1, + HEARTBEAT_KEY_DBINFO, HEARTBEAT_KEY_STBINFO, HEARTBEAT_KEY_MQ_TMP, }; @@ -426,7 +427,9 @@ int32_t tDeserializeSGetUserAuthReq(void* buf, int32_t bufLen, SGetUserAuthReq* typedef struct { char user[TSDB_USER_LEN]; + int32_t version; int8_t superAuth; + SHashObj* createdDbs; SHashObj* readDbs; SHashObj* writeDbs; } SGetUserAuthRsp; @@ -669,10 +672,20 @@ typedef struct { SArray* pArray; // Array of SUseDbRsp } SUseDbBatchRsp; + int32_t tSerializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp); int32_t tDeserializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp); void tFreeSUseDbBatchRsp(SUseDbBatchRsp* pRsp); +typedef struct { + SArray* pArray; // Array of SGetUserAuthRsp +} SUserAuthBatchRsp; + +int32_t tSerializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp); +int32_t tDeserializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp); +void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp); + + typedef struct { char db[TSDB_DB_FNAME_LEN]; } SCompactDbReq; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 30d1bd0a51..04a24c4f32 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -40,6 +40,11 @@ enum { CTG_DBG_STB_RENT_NUM, }; +typedef enum { + AUTH_TYPE_READ = 1, + AUTH_TYPE_WRITE, + AUTH_TYPE_OTHER, +} AUTH_TYPE; typedef struct SCatalogReq { SArray *pTableName; // element is SNAME @@ -57,6 +62,7 @@ typedef struct SMetaData { typedef struct SCatalogCfg { uint32_t maxTblCacheNum; uint32_t maxDBCacheNum; + uint32_t maxUserCacheNum; uint32_t dbRentSec; uint32_t stbRentSec; } SCatalogCfg; @@ -77,6 +83,11 @@ typedef struct SDbVgVersion { int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SDbVgVersion; +typedef struct SUserAuthVersion { + char user[TSDB_USER_LEN]; + int32_t version; +} SUserAuthVersion; + typedef SDbCfgRsp SDbCfgInfo; typedef SUserIndexRsp SIndexInfo; @@ -219,12 +230,18 @@ int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stable int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num); +int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_t *num); + int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg); int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo); int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo** pInfo); +int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass); + +int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth); + /** * Destroy catalog and relase all resources diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index b11a49fa1a..fc39e80c1e 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -28,6 +28,27 @@ static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } +static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { + int32_t code = 0; + + SUserAuthBatchRsp batchRsp = {0}; + if (tDeserializeSUserAuthBatchRsp(value, valueLen, &batchRsp) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray); + for (int32_t i = 0; i < numOfBatchs; ++i) { + SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i); + tscDebug("hb user auth rsp, user:%s, version:%d", rsp->user, rsp->version); + + catalogUpdateUserAuthInfo(pCatalog, rsp); + } + + tFreeSUserAuthBatchRsp(&batchRsp); + return TSDB_CODE_SUCCESS; +} + static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { int32_t code = 0; @@ -148,6 +169,24 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { for (int32_t i = 0; i < kvNum; ++i) { SKv *kv = taosArrayGet(pRsp->info, i); switch (kv->key) { + case HEARTBEAT_KEY_USER_AUTHINFO: { + if (kv->valueLen <= 0 || NULL == kv->value) { + tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value); + break; + } + + int64_t *clusterId = (int64_t *)info->param; + struct SCatalog *pCatalog = NULL; + + int32_t code = catalogGetHandle(*clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code)); + break; + } + + hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog); + break; + } case HEARTBEAT_KEY_DBINFO: { if (kv->valueLen <= 0 || NULL == kv->value) { tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value); @@ -327,6 +366,39 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { return TSDB_CODE_SUCCESS; } +int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { + SUserAuthVersion *users = NULL; + uint32_t userNum = 0; + int32_t code = 0; + + code = catalogGetExpiredUsers(pCatalog, &users, &userNum); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + if (userNum <= 0) { + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < userNum; ++i) { + SUserAuthVersion *user = &users[i]; + user->version = htonl(user->version); + } + + SKv kv = { + .key = HEARTBEAT_KEY_USER_AUTHINFO, + .valueLen = sizeof(SUserAuthVersion) * userNum, + .value = users, + }; + + tscDebug("hb got %d expired users, valueLen:%d", userNum, kv.valueLen); + + taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + + return TSDB_CODE_SUCCESS; +} + + int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { SDbVgVersion *dbs = NULL; uint32_t dbNum = 0; @@ -407,6 +479,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req hbGetQueryBasicInfo(connKey, req); + code = hbGetExpiredUserInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + code = hbGetExpiredDBInfo(connKey, pCatalog, req); if (TSDB_CODE_SUCCESS != code) { return code; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f6c68fbeb6..6278b52a04 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1164,31 +1164,47 @@ int32_t tDeserializeSGetUserAuthReq(void *buf, int32_t bufLen, SGetUserAuthReq * return 0; } -int32_t tSerializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pRsp) { - SCoder encoder = {0}; - tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); - - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeCStr(&encoder, pRsp->user) < 0) return -1; - if (tEncodeI8(&encoder, pRsp->superAuth) < 0) return -1; +int32_t tSerializeSGetUserAuthRspImpl(SCoder *pEncoder, SGetUserAuthRsp *pRsp) { + if (tEncodeCStr(pEncoder, pRsp->user) < 0) return -1; + if (tEncodeI8(pEncoder, pRsp->superAuth) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->version) < 0) return -1; + int32_t numOfCreatedDbs = taosHashGetSize(pRsp->createdDbs); int32_t numOfReadDbs = taosHashGetSize(pRsp->readDbs); int32_t numOfWriteDbs = taosHashGetSize(pRsp->writeDbs); - if (tEncodeI32(&encoder, numOfReadDbs) < 0) return -1; - if (tEncodeI32(&encoder, numOfWriteDbs) < 0) return -1; + if (tEncodeI32(pEncoder, numOfCreatedDbs) < 0) return -1; + if (tEncodeI32(pEncoder, numOfReadDbs) < 0) return -1; + if (tEncodeI32(pEncoder, numOfWriteDbs) < 0) return -1; - char *db = taosHashIterate(pRsp->readDbs, NULL); + char *db = taosHashIterate(pRsp->createdDbs, NULL); while (db != NULL) { - if (tEncodeCStr(&encoder, db) < 0) return -1; + if (tEncodeCStr(pEncoder, db) < 0) return -1; + db = taosHashIterate(pRsp->createdDbs, db); + } + + db = taosHashIterate(pRsp->readDbs, NULL); + while (db != NULL) { + if (tEncodeCStr(pEncoder, db) < 0) return -1; db = taosHashIterate(pRsp->readDbs, db); } db = taosHashIterate(pRsp->writeDbs, NULL); while (db != NULL) { - if (tEncodeCStr(&encoder, db) < 0) return -1; + if (tEncodeCStr(pEncoder, db) < 0) return -1; db = taosHashIterate(pRsp->writeDbs, db); } + return 0; +} + +int32_t tSerializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + + if (tSerializeSGetUserAuthRspImpl(&encoder, pRsp) < 0) return -1; + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1196,39 +1212,58 @@ int32_t tSerializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pR return tlen; } -int32_t tDeserializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pRsp) { - pRsp->readDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - pRsp->writeDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); +int32_t tDeserializeSGetUserAuthRspImpl(SCoder *pDecoder, SGetUserAuthRsp *pRsp) { + pRsp->createdDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + pRsp->readDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + pRsp->writeDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (pRsp->readDbs == NULL || pRsp->writeDbs == NULL) { return -1; } - SCoder decoder = {0}; - tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); - - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeCStrTo(&decoder, pRsp->user) < 0) return -1; - if (tDecodeI8(&decoder, &pRsp->superAuth) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pRsp->user) < 0) return -1; + if (tDecodeI8(pDecoder, &pRsp->superAuth) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->version) < 0) return -1; + int32_t numOfCreatedDbs = 0; int32_t numOfReadDbs = 0; int32_t numOfWriteDbs = 0; - if (tDecodeI32(&decoder, &numOfReadDbs) < 0) return -1; - if (tDecodeI32(&decoder, &numOfWriteDbs) < 0) return -1; + if (tDecodeI32(pDecoder, &numOfCreatedDbs) < 0) return -1; + if (tDecodeI32(pDecoder, &numOfReadDbs) < 0) return -1; + if (tDecodeI32(pDecoder, &numOfWriteDbs) < 0) return -1; + + for (int32_t i = 0; i < numOfCreatedDbs; ++i) { + char db[TSDB_DB_FNAME_LEN] = {0}; + if (tDecodeCStrTo(pDecoder, db) < 0) return -1; + int32_t len = strlen(db) + 1; + taosHashPut(pRsp->createdDbs, db, len, db, len); + } for (int32_t i = 0; i < numOfReadDbs; ++i) { char db[TSDB_DB_FNAME_LEN] = {0}; - if (tDecodeCStrTo(&decoder, db) < 0) return -1; + if (tDecodeCStrTo(pDecoder, db) < 0) return -1; int32_t len = strlen(db) + 1; taosHashPut(pRsp->readDbs, db, len, db, len); } for (int32_t i = 0; i < numOfWriteDbs; ++i) { char db[TSDB_DB_FNAME_LEN] = {0}; - if (tDecodeCStrTo(&decoder, db) < 0) return -1; + if (tDecodeCStrTo(pDecoder, db) < 0) return -1; int32_t len = strlen(db) + 1; taosHashPut(pRsp->writeDbs, db, len, db, len); } + return 0; +} + + +int32_t tDeserializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDeserializeSGetUserAuthRspImpl(&decoder, pRsp) < 0) return -1; + tEndDecode(&decoder); tCoderClear(&decoder); @@ -1236,6 +1271,7 @@ int32_t tDeserializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp * } void tFreeSGetUserAuthRsp(SGetUserAuthRsp *pRsp) { + taosHashCleanup(pRsp->createdDbs); taosHashCleanup(pRsp->readDbs); taosHashCleanup(pRsp->writeDbs); } @@ -2055,6 +2091,62 @@ void tFreeSUseDbBatchRsp(SUseDbBatchRsp *pRsp) { taosArrayDestroy(pRsp->pArray); } +int32_t tSerializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp){ + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + + int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); + if (tEncodeI32(&encoder, numOfBatch) < 0) return -1; + for (int32_t i = 0; i < numOfBatch; ++i) { + SGetUserAuthRsp *pUserAuthRsp = taosArrayGet(pRsp->pArray, i); + if (tSerializeSGetUserAuthRspImpl(&encoder, pUserAuthRsp) < 0) return -1; + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp){ + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + + int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); + if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1; + + pRsp->pArray = taosArrayInit(numOfBatch, sizeof(SGetUserAuthRsp)); + if (pRsp->pArray == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + for (int32_t i = 0; i < numOfBatch; ++i) { + SGetUserAuthRsp rsp = {0}; + if (tDeserializeSGetUserAuthRspImpl(&decoder, &rsp) < 0) return -1; + taosArrayPush(pRsp->pArray, &rsp); + } + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp){ + int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); + for (int32_t i = 0; i < numOfBatch; ++i) { + SGetUserAuthRsp *pUserAuthRsp = taosArrayGet(pRsp->pArray, i); + tFreeSGetUserAuthRsp(pUserAuthRsp); + } + + taosArrayDestroy(pRsp->pArray); +} + + int32_t tSerializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 81ebcf9082..89638524ae 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -255,6 +255,7 @@ typedef struct { int64_t updateTime; int8_t superUser; int32_t acctId; + int32_t authVersion; SHashObj* readDbs; SHashObj* writeDbs; SRWLatch lock; diff --git a/source/dnode/mnode/impl/inc/mndUser.h b/source/dnode/mnode/impl/inc/mndUser.h index b3eb7f2f95..2140d0fa67 100644 --- a/source/dnode/mnode/impl/inc/mndUser.h +++ b/source/dnode/mnode/impl/inc/mndUser.h @@ -29,6 +29,7 @@ void mndReleaseUser(SMnode *pMnode, SUserObj *pUser); // for trans test SSdbRaw *mndUserActionEncode(SUserObj *pUser); +int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp, int32_t *pRspLen); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 59d07fd4aa..2de337537f 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -403,6 +403,16 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb SKv *kv = pIter; switch (kv->key) { + case HEARTBEAT_KEY_USER_AUTHINFO: { + void * rspMsg = NULL; + int32_t rspLen = 0; + mndValidateUserAuthInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserAuthVersion), &rspMsg, &rspLen); + if (rspMsg && rspLen > 0) { + SKv kv1 = {.key = HEARTBEAT_KEY_USER_AUTHINFO, .valueLen = rspLen, .value = rspMsg}; + taosArrayPush(hbRsp.info, &kv1); + } + break; + } case HEARTBEAT_KEY_DBINFO: { void * rspMsg = NULL; int32_t rspLen = 0; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 31e78e0ca0..8b5fa06faf 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -457,13 +457,16 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } + newUser.authVersion++; } else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_DB) { if (taosHashRemove(newUser.readDbs, alterReq.dbname, len) != 0) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; goto _OVER; } + newUser.authVersion++; } else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_READ_DB) { taosHashClear(newUser.readDbs); + newUser.authVersion++; } else if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB) { if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; @@ -473,13 +476,16 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } + newUser.authVersion++; } else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB) { if (taosHashRemove(newUser.writeDbs, alterReq.dbname, len) != 0) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; goto _OVER; } + newUser.authVersion++; } else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_WRITE_DB) { taosHashClear(newUser.writeDbs); + newUser.authVersion++; } else { terrno = TSDB_CODE_MND_INVALID_ALTER_OPER; goto _OVER; @@ -582,6 +588,38 @@ _OVER: return code; } +static int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp) { + memcpy(pRsp->user, pUser->user, TSDB_USER_LEN); + pRsp->superAuth = pUser->superUser; + pRsp->version = pUser->authVersion; + taosRLockLatch(&pUser->lock); + pRsp->readDbs = mndDupDbHash(pUser->readDbs); + pRsp->writeDbs = mndDupDbHash(pUser->writeDbs); + taosRUnLockLatch(&pUser->lock); + pRsp->createdDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (NULL == pRsp->createdDbs) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + while (1) { + SDbObj *pDb = NULL; + pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb); + if (pIter == NULL) break; + + if (strcmp(pDb->createUser, pUser->user) == 0) { + int32_t len = strlen(pDb->name) + 1; + taosHashPut(pRsp->createdDbs, pDb->name, len, pDb->name, len); + } + + sdbRelease(pSdb, pDb); + } + + return 0; +} + static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) { SMnode *pMnode = pReq->pNode; int32_t code = -1; @@ -602,28 +640,9 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) { goto _OVER; } - memcpy(authRsp.user, pUser->user, TSDB_USER_LEN); - authRsp.superAuth = pUser->superUser; - - taosRLockLatch(&pUser->lock); - authRsp.readDbs = mndDupDbHash(pUser->readDbs); - authRsp.writeDbs = mndDupDbHash(pUser->writeDbs); - taosRUnLockLatch(&pUser->lock); - - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - while (1) { - SDbObj *pDb = NULL; - pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb); - if (pIter == NULL) break; - - if (strcmp(pDb->createUser, pUser->user) == 0) { - int32_t len = strlen(pDb->name) + 1; - taosHashPut(authRsp.readDbs, pDb->name, len, pDb->name, len); - taosHashPut(authRsp.writeDbs, pDb->name, len, pDb->name, len); - } - - sdbRelease(pSdb, pDb); + code = mndSetUserAuthRsp(pMnode, pUser, &authRsp); + if (code) { + goto _OVER; } int32_t contLen = tSerializeSGetUserAuthRsp(NULL, 0, &authRsp); @@ -640,6 +659,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) { code = 0; _OVER: + mndReleaseUser(pMnode, pUser); tFreeSGetUserAuthRsp(&authRsp); @@ -690,3 +710,72 @@ static void mndCancelGetNextUser(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + +int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp, int32_t *pRspLen) { + SUserAuthBatchRsp batchRsp = {0}; + batchRsp.pArray = taosArrayInit(numOfUses, sizeof(SGetUserAuthRsp)); + if (batchRsp.pArray == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + int32_t code = 0; + for (int32_t i = 0; i < numOfUses; ++i) { + SUserObj *pUser = mndAcquireUser(pMnode, pUsers[i].user); + if (pUser == NULL) { + mError("user:%s, failed to auth user since %s", pUsers[i].user, terrstr()); + continue; + } + + if (pUser->authVersion <= pUsers[i].version) { + mndReleaseUser(pMnode, pUser); + continue; + } + + SGetUserAuthRsp rsp = {0}; + code = mndSetUserAuthRsp(pMnode, pUser, &rsp); + if (code) { + mndReleaseUser(pMnode, pUser); + tFreeSGetUserAuthRsp(&rsp); + goto _OVER; + } + + + taosArrayPush(batchRsp.pArray, &rsp); + mndReleaseUser(pMnode, pUser); + } + + if (taosArrayGetSize(batchRsp.pArray) <= 0) { + *ppRsp = NULL; + *pRspLen = 0; + + tFreeSUserAuthBatchRsp(&batchRsp); + return 0; + } + + int32_t rspLen = tSerializeSUserAuthBatchRsp(NULL, 0, &batchRsp); + void *pRsp = taosMemoryMalloc(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tFreeSUserAuthBatchRsp(&batchRsp); + return -1; + } + tSerializeSUserAuthBatchRsp(pRsp, rspLen, &batchRsp); + + *ppRsp = pRsp; + *pRspLen = rspLen; + + tFreeSUserAuthBatchRsp(&batchRsp); + return 0; + +_OVER: + + *ppRsp = NULL; + *pRspLen = 0; + + tFreeSUserAuthBatchRsp(&batchRsp); + return code; +} + + + diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 918892b786..3e8528e3d9 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -54,6 +54,7 @@ enum { CTG_ACT_REMOVE_DB, CTG_ACT_REMOVE_STB, CTG_ACT_REMOVE_TBL, + CTG_ACT_UPDATE_USER, CTG_ACT_MAX }; @@ -95,8 +96,18 @@ typedef struct SCtgRentMgmt { SCtgRentSlot *slots; } SCtgRentMgmt; +typedef struct SCtgUserAuth { + int32_t version; + SRWLatch lock; + bool superUser; + SHashObj *createdDbs; + SHashObj *readDbs; + SHashObj *writeDbs; +} SCtgUserAuth; + typedef struct SCatalog { uint64_t clusterId; + SHashObj *userCache; //key:user, value:SCtgUserAuth SHashObj *dbCache; //key:dbname, value:SCtgDBCache SCtgRentMgmt dbRent; SCtgRentMgmt stbRent; @@ -124,6 +135,8 @@ typedef struct SCtgCacheStat { uint64_t vgMissNum; uint64_t tblHitNum; uint64_t tblMissNum; + uint64_t userHitNum; + uint64_t userMissNum; } SCtgCacheStat; typedef struct SCatalogStat { @@ -169,6 +182,11 @@ typedef struct SCtgRemoveTblMsg { uint64_t dbId; } SCtgRemoveTblMsg; +typedef struct SCtgUpdateUserMsg { + SCatalog* pCtg; + SGetUserAuthRsp userAuth; +} SCtgUpdateUserMsg; + typedef struct SCtgMetaAction { int32_t act; @@ -234,6 +252,8 @@ typedef struct SCtgAction { #define CTG_FLAG_SYS_DB 0x8 #define CTG_FLAG_FORCE_UPDATE 0x10 +#define CTG_FLAG_SET(_flag, _v) ((_flag) |= (_v)) + #define CTG_FLAG_IS_STB(_flag) ((_flag) & CTG_FLAG_STB) #define CTG_FLAG_IS_NOT_STB(_flag) ((_flag) & CTG_FLAG_NOT_STB) #define CTG_FLAG_IS_UNKNOWN_STB(_flag) ((_flag) & CTG_FLAG_UNKNOWN_STB) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 6f1f34a57b..f485f85809 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -24,6 +24,7 @@ int32_t ctgActUpdateTbl(SCtgMetaAction *action); int32_t ctgActRemoveDB(SCtgMetaAction *action); int32_t ctgActRemoveStb(SCtgMetaAction *action); int32_t ctgActRemoveTbl(SCtgMetaAction *action); +int32_t ctgActUpdateUser(SCtgMetaAction *action); extern SCtgDebug gCTGDebug; SCatalogMgmt gCtgMgmt = {0}; @@ -51,6 +52,11 @@ SCtgAction gCtgAction[CTG_ACT_MAX] = {{ CTG_ACT_REMOVE_TBL, "remove tbMeta", ctgActRemoveTbl + }, + { + CTG_ACT_UPDATE_USER, + "update user", + ctgActUpdateUser } }; @@ -357,6 +363,31 @@ _return: CTG_RET(code); } +int32_t ctgPushUpdateUserMsgInQueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq) { + int32_t code = 0; + SCtgMetaAction action= {.act = CTG_ACT_UPDATE_USER, .syncReq = syncReq}; + SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + msg->pCtg = pCtg; + msg->userAuth = *pAuth; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + + return TSDB_CODE_SUCCESS; + +_return: + + tFreeSGetUserAuthRsp(pAuth); + taosMemoryFreeClear(msg); + + CTG_RET(code); +} int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) { CTG_LOCK(CTG_READ, &dbCache->vgLock); @@ -687,6 +718,43 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEp return TSDB_CODE_SUCCESS; } +int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *user, SGetUserAuthRsp *authRsp) { + char *msg = NULL; + int32_t msgLen = 0; + + ctgDebug("try to get user auth from mnode, user:%s", user); + + int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)]((void *)user, &msg, 0, &msgLen); + if (code) { + ctgError("Build get user auth msg failed, code:%x, db:%s", code, user); + CTG_ERR_RET(code); + } + + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_GET_USER_AUTH, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + + rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + if (TSDB_CODE_SUCCESS != rpcRsp.code) { + ctgError("error rsp for get user auth, error:%s, user:%s", tstrerror(rpcRsp.code), user); + CTG_ERR_RET(rpcRsp.code); + } + + code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)](authRsp, rpcRsp.pCont, rpcRsp.contLen); + if (code) { + ctgError("Process get user auth rsp failed, code:%x, user:%s", code, user); + CTG_ERR_RET(code); + } + + ctgDebug("Got user auth from mnode, user:%s", user); + + return TSDB_CODE_SUCCESS; +} + int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) { if (NULL == pCtg->dbCache) { @@ -859,6 +927,55 @@ int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const char* dbFName, const char return TSDB_CODE_SUCCESS; } +int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass) { + if (NULL == pCtg->userCache) { + ctgDebug("empty user auth cache, user:%s", user); + goto _return; + } + + SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, user, strlen(user)); + if (NULL == pUser) { + ctgDebug("user not in cache, user:%s", user); + goto _return; + } + + *inCache = true; + + ctgDebug("Got user from cache, user:%s", user); + CTG_CACHE_STAT_ADD(userHitNum, 1); + + if (pUser->superUser) { + *pass = true; + return TSDB_CODE_SUCCESS; + } + + CTG_LOCK(CTG_READ, &pUser->lock); + if (pUser->createdDbs && taosHashGet(pUser->createdDbs, dbFName, strlen(dbFName))) { + *pass = true; + CTG_UNLOCK(CTG_READ, &pUser->lock); + return TSDB_CODE_SUCCESS; + } + + if (pUser->readDbs && taosHashGet(pUser->readDbs, dbFName, strlen(dbFName)) && type == AUTH_TYPE_READ) { + *pass = true; + } + + if (pUser->writeDbs && taosHashGet(pUser->writeDbs, dbFName, strlen(dbFName)) && type == AUTH_TYPE_WRITE) { + *pass = true; + } + + CTG_UNLOCK(CTG_READ, &pUser->lock); + + return TSDB_CODE_SUCCESS; + +_return: + + *inCache = false; + CTG_CACHE_STAT_ADD(userMissNum, 1); + + return TSDB_CODE_SUCCESS; +} + int32_t ctgGetTableMetaFromMnodeImpl(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, char *dbFName, char* tbName, STableMetaOutput* output) { SBuildTableMetaInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName}; char *msg = NULL; @@ -1952,6 +2069,45 @@ _return: CTG_RET(code); } +int32_t ctgChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass) { + bool inCache = false; + int32_t code = 0; + + *pass = false; + + CTG_ERR_RET(ctgChkAuthFromCache(pCtg, user, dbFName, type, &inCache, pass)); + + if (inCache) { + return TSDB_CODE_SUCCESS; + } + + SGetUserAuthRsp authRsp = {0}; + CTG_ERR_RET(ctgGetUserDbAuthFromMnode(pCtg, pRpc, pMgmtEps, user, &authRsp)); + + if (authRsp.superAuth) { + *pass = true; + goto _return; + } + + if (authRsp.createdDbs && taosHashGet(authRsp.createdDbs, dbFName, strlen(dbFName))) { + *pass = true; + goto _return; + } + + if (authRsp.readDbs && taosHashGet(authRsp.readDbs, dbFName, strlen(dbFName)) && type == AUTH_TYPE_READ) { + *pass = true; + } + + if (authRsp.writeDbs && taosHashGet(authRsp.writeDbs, dbFName, strlen(dbFName)) && type == AUTH_TYPE_WRITE) { + *pass = true; + } + +_return: + + ctgPushUpdateUserMsgInQueue(pCtg, &authRsp, false); + + return TSDB_CODE_SUCCESS; +} int32_t ctgActUpdateVg(SCtgMetaAction *action) { @@ -2121,6 +2277,67 @@ _return: CTG_RET(code); } +int32_t ctgActUpdateUser(SCtgMetaAction *action) { + int32_t code = 0; + SCtgUpdateUserMsg *msg = action->data; + SCatalog* pCtg = msg->pCtg; + + if (NULL == pCtg->userCache) { + pCtg->userCache = taosHashInit(gCtgMgmt.cfg.maxUserCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == pCtg->userCache) { + ctgError("taosHashInit %d user cache failed", gCtgMgmt.cfg.maxUserCacheNum); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + } + + SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user)); + if (NULL == pUser) { + SCtgUserAuth userAuth = {0}; + + userAuth.version = msg->userAuth.version; + userAuth.superUser = msg->userAuth.superAuth; + userAuth.createdDbs = msg->userAuth.createdDbs; + userAuth.readDbs = msg->userAuth.readDbs; + userAuth.writeDbs = msg->userAuth.writeDbs; + + if (taosHashPut(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user), &userAuth, sizeof(userAuth))) { + ctgError("taosHashPut user %s to cache failed", msg->userAuth.user); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; + } + + pUser->version = msg->userAuth.version; + + CTG_LOCK(CTG_WRITE, &pUser->lock); + + taosHashCleanup(pUser->createdDbs); + pUser->createdDbs = msg->userAuth.createdDbs; + msg->userAuth.createdDbs = NULL; + + taosHashCleanup(pUser->readDbs); + pUser->readDbs = msg->userAuth.readDbs; + msg->userAuth.readDbs = NULL; + + taosHashCleanup(pUser->writeDbs); + pUser->writeDbs = msg->userAuth.writeDbs; + msg->userAuth.writeDbs = NULL; + + CTG_UNLOCK(CTG_WRITE, &pUser->lock); + +_return: + + + taosHashCleanup(msg->userAuth.createdDbs); + taosHashCleanup(msg->userAuth.readDbs); + taosHashCleanup(msg->userAuth.writeDbs); + + taosMemoryFreeClear(msg); + + CTG_RET(code); +} + void* ctgUpdateThreadFunc(void* param) { setThreadName("catalog"); @@ -2836,6 +3053,35 @@ int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num) CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void **)dbs, num, sizeof(SDbVgVersion))); } +int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_t *num) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == users || NULL == num) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + *num = taosHashGetSize(pCtg->userCache); + if (*num > 0) { + *users = taosMemoryCalloc(*num, sizeof(SUserAuthVersion)); + if (NULL == *users) { + ctgError("calloc %d userAuthVersion failed", *num); + CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY); + } + } + + uint32_t i = 0; + SCtgUserAuth *pAuth = taosHashIterate(pCtg->userCache, NULL); + while (pAuth != NULL) { + void *key = taosHashGetKey(pAuth, NULL); + strncpy((*users)[i].user, key, sizeof((*users)[i].user)); + (*users)[i].version = pAuth->version; + pAuth = taosHashIterate(pCtg->userCache, pAuth); + } + + CTG_API_LEAVE(TSDB_CODE_SUCCESS); +} + + int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg) { CTG_API_ENTER(); @@ -2880,6 +3126,31 @@ _return: CTG_API_LEAVE(code); } +int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == user || NULL == dbFName || NULL == pass) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + int32_t code = 0; + CTG_ERR_JRET(ctgChkAuth(pCtg, pRpc, pMgmtEps, user, dbFName, type, pass)); + +_return: + + CTG_API_LEAVE(code); +} + +int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pAuth) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + CTG_API_LEAVE(ctgPushUpdateUserMsgInQueue(pCtg, pAuth, false)); +} + void catalogDestroy(void) { qInfo("start to destroy catalog"); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 286e7d3d44..822c214fe5 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -181,6 +181,25 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3 return TSDB_CODE_SUCCESS; } +int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { + if (NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SGetUserAuthReq req = {0}; + strncpy(req.user, input, sizeof(req.user)); + + int32_t bufLen = tSerializeSGetUserAuthReq(NULL, 0, &req); + void *pBuf = rpcMallocCont(bufLen); + tSerializeSGetUserAuthReq(pBuf, bufLen, &req); + + *msg = pBuf; + *msgLen = bufLen; + + return TSDB_CODE_SUCCESS; +} + + int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { SUseDbOutput *pOut = output; SUseDbRsp usedbRsp = {0}; @@ -419,6 +438,20 @@ int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) { return TSDB_CODE_SUCCESS; } +int32_t queryProcessGetUserAuthRsp(void *output, char *msg, int32_t msgSize) { + if (NULL == output || NULL == msg || msgSize <= 0) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + if (tDeserializeSGetUserAuthRsp(msg, msgSize, (SGetUserAuthRsp *)output) != 0) { + qError("tDeserializeSGetUserAuthRsp failed, msgSize:%d", msgSize); + return TSDB_CODE_INVALID_MSG; + } + + return TSDB_CODE_SUCCESS; +} + + void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg; @@ -427,6 +460,8 @@ void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryBuildGetUserAuthMsg; + queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; @@ -435,6 +470,7 @@ void initQueryModuleMsgHandle() { queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryProcessGetUserAuthRsp; } #pragma GCC diagnostic pop