feat: support uniq grant
This commit is contained in:
parent
1a6b491405
commit
3a08926319
|
@ -110,7 +110,7 @@ enum {
|
|||
HEARTBEAT_KEY_TMQ,
|
||||
HEARTBEAT_KEY_DYN_VIEW,
|
||||
HEARTBEAT_KEY_VIEWINFO,
|
||||
HEARTBEAT_KEY_GRANT,
|
||||
HEARTBEAT_KEY_GRANTINFO,
|
||||
};
|
||||
|
||||
typedef enum _mgmt_table {
|
||||
|
|
|
@ -146,7 +146,6 @@ typedef struct SSTableVersion {
|
|||
} SSTableVersion;
|
||||
|
||||
typedef struct SGrantVersion {
|
||||
int64_t grantId;
|
||||
int32_t version;
|
||||
} SGrantVersion;
|
||||
|
||||
|
@ -343,6 +342,8 @@ int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableVersion** stables, u
|
|||
|
||||
int32_t catalogGetExpiredViews(SCatalog* pCtg, SViewVersion** views, uint32_t* num, SDynViewVersion** dynViewVersion);
|
||||
|
||||
int32_t catalogGetExpiredGrants(SCatalog* pCtg, SGrantVersion** grants, uint32_t* num);
|
||||
|
||||
int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbCacheInfo** dbs, uint32_t* num);
|
||||
|
||||
int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion** users, uint32_t* num);
|
||||
|
|
|
@ -342,7 +342,8 @@ static int32_t hbProcessGrantInfoRsp(void *value, int32_t valueLen, struct SCata
|
|||
return -1;
|
||||
}
|
||||
|
||||
tscInfo("hb to update grant info:%u", hbRsp->flags);
|
||||
tscDebug("hb to update grant info, version:%d, flags:%u", hbRsp->version, hbRsp->flags);
|
||||
|
||||
catalogUpdateGrantInfo(pCatalog, hbRsp);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -398,11 +399,12 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray* pKvs, struct SCatalog *p
|
|||
hbProcessViewInfoRsp(kv->value, kv->valueLen, pCatalog);
|
||||
break;
|
||||
}
|
||||
case HEARTBEAT_KEY_GRANT: {
|
||||
case HEARTBEAT_KEY_GRANTINFO: {
|
||||
if (kv->valueLen <= 0 || NULL == kv->value) {
|
||||
tscError("invalid grant info, len:%d, value:%p", kv->valueLen, kv->value);
|
||||
break;
|
||||
}
|
||||
assert(0);
|
||||
|
||||
hbProcessGrantInfoRsp(kv->value, kv->valueLen, pCatalog);
|
||||
break;
|
||||
|
@ -872,7 +874,7 @@ int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
|
|||
view->version = htonl(view->version);
|
||||
}
|
||||
|
||||
tscDebug("hb got %d expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum);
|
||||
tscDebug("hb got %u expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum);
|
||||
|
||||
if (NULL == req->info) {
|
||||
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
|
||||
|
@ -895,6 +897,47 @@ int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t hbGetExpiredGrantInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
|
||||
SGrantVersion *grants = NULL;
|
||||
uint32_t grantNum = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
code = catalogGetExpiredGrants(pCatalog, &grants, &grantNum);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (grantNum <= 0) {
|
||||
taosMemoryFree(grants);
|
||||
return code;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < grantNum; ++i) {
|
||||
SGrantVersion *gv = &grants[i];
|
||||
gv->version = htonl(gv->version);
|
||||
}
|
||||
|
||||
tscDebug("hb got %d expired grant, valueLen:%u", grantNum, (int32_t)sizeof(SGrantVersion) * grantNum);
|
||||
|
||||
if (!req->info) {
|
||||
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
|
||||
if (!req->info) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(grants);
|
||||
return code;
|
||||
taosMemoryFree(grants);
|
||||
}
|
||||
}
|
||||
|
||||
SKv kv = {
|
||||
.key = HEARTBEAT_KEY_GRANTINFO,
|
||||
.valueLen = sizeof(SGrantVersion) * grantNum,
|
||||
.value = grants,
|
||||
};
|
||||
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
|
||||
SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
|
||||
|
@ -958,6 +1001,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
|
|||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = hbGetExpiredGrantInfo(connKey, pCatalog, req);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
#endif
|
||||
} else {
|
||||
req->app.appId = 0;
|
||||
|
|
|
@ -606,12 +606,12 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
|||
}
|
||||
break;
|
||||
}
|
||||
case HEARTBEAT_KEY_GRANT: {
|
||||
case HEARTBEAT_KEY_GRANTINFO: {
|
||||
void *rspMsg = NULL;
|
||||
int32_t rspLen = 0;
|
||||
mndValidateGrant(pMnode, kv->value, &rspMsg, &rspLen);
|
||||
if (rspMsg && rspLen > 0) {
|
||||
SKv kv1 = {.key = HEARTBEAT_KEY_GRANT, .valueLen = rspLen, .value = rspMsg};
|
||||
SKv kv1 = {.key = HEARTBEAT_KEY_GRANTINFO, .valueLen = rspLen, .value = rspMsg};
|
||||
taosArrayPush(hbRsp.info, &kv1);
|
||||
}
|
||||
break;
|
||||
|
|
|
@ -337,7 +337,6 @@ typedef struct SCatalog {
|
|||
SCtgRentMgmt dbRent;
|
||||
SCtgRentMgmt stbRent;
|
||||
SCtgRentMgmt viewRent;
|
||||
SCtgRentMgmt grantRent;
|
||||
SCtgCacheStat cacheStat;
|
||||
} SCatalog;
|
||||
|
||||
|
@ -569,7 +568,6 @@ typedef struct SCtgUpdateGrantInfoMsg {
|
|||
|
||||
typedef struct SCtgDropGrantInfoMsg {
|
||||
SCatalog* pCtg;
|
||||
int64_t grantId;
|
||||
} SCtgDropGrantInfoMsg;
|
||||
|
||||
typedef struct SCtgCacheOperation {
|
||||
|
@ -970,7 +968,7 @@ int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uin
|
|||
SCtgTbCache *pCache);
|
||||
int32_t ctgUpdateRentViewVersion(SCatalog *pCtg, char *dbFName, char *viewName, uint64_t dbId, uint64_t viewId,
|
||||
SCtgViewCache *pCache);
|
||||
int32_t ctgUpdateRentGrantVersion(SCatalog* pCtg, int32_t grantId, SGrantHbRsp* pGrant);
|
||||
// int32_t ctgUpdateRentGrantVersion(SCatalog* pCtg, int32_t grantId, SGrantHbRsp* pGrant);
|
||||
int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq);
|
||||
int32_t ctgUpdateViewMetaToCache(SCatalog *pCtg, SViewMetaRsp *pRsp, bool syncReq);
|
||||
int32_t ctgUpdateGrantInfoToCache(SCatalog *pCtg, SGrantHbRsp *pRsp, bool syncReq);
|
||||
|
@ -1043,12 +1041,12 @@ void ctgResetTbMetaTask(SCtgTask* pTask);
|
|||
void ctgFreeDbCache(SCtgDBCache* dbCache);
|
||||
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);
|
||||
int32_t ctgViewVersionSortCompare(const void* key1, const void* key2);
|
||||
int32_t ctgGrantVersionSortCompare(const void* key1, const void* key2);
|
||||
// int32_t ctgGrantVersionSortCompare(const void* key1, const void* key2);
|
||||
int32_t ctgDbCacheInfoSortCompare(const void* key1, const void* key2);
|
||||
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2);
|
||||
int32_t ctgDbCacheInfoSearchCompare(const void* key1, const void* key2);
|
||||
int32_t ctgViewVersionSearchCompare(const void* key1, const void* key2);
|
||||
int32_t ctgGrantVersionSearchCompare(const void* key1, const void* key2);
|
||||
// int32_t ctgGrantVersionSearchCompare(const void* key1, const void* key2);
|
||||
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
|
||||
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
|
||||
int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target);
|
||||
|
|
|
@ -1513,6 +1513,23 @@ int32_t catalogGetExpiredViews(SCatalog* pCtg, SViewVersion** views, uint32_t* n
|
|||
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->viewRent, (void**)views, num, sizeof(SViewVersion)));
|
||||
}
|
||||
|
||||
int32_t catalogGetExpiredGrants(SCatalog* pCtg, SGrantVersion** grants, uint32_t* num) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
if (NULL == pCtg || NULL == grants || NULL == num) {
|
||||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
*num = 1;
|
||||
*grants = taosMemoryCalloc(*num, sizeof(SGrantVersion));
|
||||
if (!(*grants)) {
|
||||
ctgError("calloc %d grantVersion failed", *num);
|
||||
CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
(*grants)[0].version = pCtg->grantCache.grantInfo.version;
|
||||
|
||||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbCacheInfo** dbs, uint32_t* num) {
|
||||
CTG_API_ENTER();
|
||||
|
|
|
@ -1742,8 +1742,6 @@ int32_t ctgWriteGrantInfoToCache(SCatalog *pCtg, SGrantHbRsp *pRsp) {
|
|||
|
||||
ctgDebug("grant info updated to cache, flags:%u, version:%d", pRsp->flags, pRsp->version);
|
||||
|
||||
CTG_ERR_RET(ctgUpdateRentGrantVersion(pCtg, CGT_GRANT_ID, pRsp));
|
||||
|
||||
_return:
|
||||
|
||||
CTG_RET(code);
|
||||
|
@ -2541,11 +2539,6 @@ int32_t ctgOpDropGrantInfo(SCtgCacheOperation *operation) {
|
|||
goto _return;
|
||||
}
|
||||
|
||||
CTG_ERR_JRET(
|
||||
ctgMetaRentRemove(&pCtg->grantRent, msg->grantId, ctgGrantVersionSortCompare, ctgGrantVersionSearchCompare));
|
||||
|
||||
printf("prop:grant:0x%" PRIx64 "removed from rent", msg->grantId);
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
|
|
@ -301,13 +301,3 @@ int32_t ctgUpdateRentViewVersion(SCatalog *pCtg, char *dbFName, char *viewName,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgUpdateRentGrantVersion(SCatalog *pCtg, int32_t grantId, SGrantHbRsp *pGrant) {
|
||||
SGrantVersion metaRent = {.grantId = grantId, .version = pGrant->version};
|
||||
|
||||
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->grantRent, &metaRent, metaRent.grantId, sizeof(SGrantVersion),
|
||||
ctgGrantVersionSortCompare, ctgGrantVersionSearchCompare));
|
||||
|
||||
ctgDebug("grant %d version %d updated to grantRent", grantId, metaRent.version);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -1328,16 +1328,6 @@ int32_t ctgViewVersionSearchCompare(const void* key1, const void* key2) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t ctgGrantVersionSearchCompare(const void* key1, const void* key2) {
|
||||
if (*(int32_t*)key1 == ((SGrantVersion*)key2)->grantId) {
|
||||
return 0;
|
||||
} else if (*(int32_t*)key1 < ((SGrantVersion*)key2)->grantId) {
|
||||
return -1;
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) {
|
||||
if (((SSTableVersion*)key1)->suid < ((SSTableVersion*)key2)->suid) {
|
||||
|
@ -1369,15 +1359,6 @@ int32_t ctgViewVersionSortCompare(const void* key1, const void* key2) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t ctgGrantVersionSortCompare(const void* key1, const void* key2) {
|
||||
if (((SGrantVersion*)key1)->grantId == ((SGrantVersion*)key2)->grantId) {
|
||||
return 0;
|
||||
} else if (((SGrantVersion*)key1)->grantId < ((SGrantVersion*)key2)->grantId) {
|
||||
return -1;
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t ctgMakeVgArray(SDBVgInfo* dbInfo) {
|
||||
if (NULL == dbInfo) {
|
||||
|
|
Loading…
Reference in New Issue