feat: support uniq grant

This commit is contained in:
kailixu 2024-02-02 13:44:33 +08:00
parent a452480249
commit 3f061a8460
5 changed files with 61 additions and 28 deletions

View File

@ -1887,6 +1887,7 @@ int32_t tDeserializeSViewHbRsp(void* buf, int32_t bufLen, SViewHbRsp* pRsp);
void tFreeSViewHbRsp(SViewHbRsp* pRsp); void tFreeSViewHbRsp(SViewHbRsp* pRsp);
typedef struct { typedef struct {
int32_t version;
uint32_t flags; uint32_t flags;
} SGrantHbRsp; } SGrantHbRsp;

View File

@ -146,6 +146,7 @@ typedef struct SSTableVersion {
} SSTableVersion; } SSTableVersion;
typedef struct SGrantVersion { typedef struct SGrantVersion {
int32_t grantId;
int32_t version; int32_t version;
} SGrantVersion; } SGrantVersion;

View File

@ -1520,18 +1520,6 @@ int32_t ctgGetAddDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCt
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetAddGrantCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
int32_t code = 0;
SCtgDBCache *dbCache = NULL;
ctgGetDBCache(pCtg, dbFName, &dbCache);
*pCache = dbCache;
return TSDB_CODE_SUCCESS;
}
int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName, int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName,
STableMeta *meta, int32_t metaSize) { STableMeta *meta, int32_t metaSize) {
if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) { if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) {
@ -1743,6 +1731,30 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgWriteGrantInfoToCache(SCatalog *pCtg, SGrantHbRsp *pRsp) {
int32_t code = TSDB_CODE_SUCCESS;
SCtgGrantCache *pCache = &pCtg->grantCache;
CTG_LOCK(CTG_WRITE, &pCache->lock);
pCache->grantInfo = *pRsp;
CTG_UNLOCK(CTG_WRITE, &pCache->lock);
ctgDebug("grant info updated to cache, version");
CTG_ERR_RET(ctgUpdateRentViewVersion(pCtg, dbFName, viewName, dbCache->dbId, pMeta->viewId, pCache));
pMeta = NULL;
_return:
if (pMeta) {
ctgFreeSViewMeta(pMeta);
taosMemoryFree(pMeta);
}
CTG_RET(code);
}
int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool syncReq) { int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool syncReq) {
STableMetaOutput *pOutput = NULL; STableMetaOutput *pOutput = NULL;
int32_t code = 0; int32_t code = 0;
@ -2508,6 +2520,7 @@ int32_t ctgOpUpdateGrantInfo(SCtgCacheOperation *operation) {
SCtgUpdateGrantInfoMsg *msg = operation->data; SCtgUpdateGrantInfoMsg *msg = operation->data;
SCatalog *pCtg = msg->pCtg; SCatalog *pCtg = msg->pCtg;
SGrantHbRsp *pRsp = msg->pRsp; SGrantHbRsp *pRsp = msg->pRsp;
SCtgGrantCache *pGrantCache = NULL;
taosMemoryFreeClear(msg); taosMemoryFreeClear(msg);
@ -2515,29 +2528,14 @@ int32_t ctgOpUpdateGrantInfo(SCtgCacheOperation *operation) {
goto _return; goto _return;
} }
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pRsp->dbFName, pRsp->dbId, &dbCache)); CTG_ERR_JRET(ctgAcquireGrantCache(pCtg, &pGrantCache));
if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pRsp->dbFName, pRsp->dbId);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
pMeta = taosMemoryCalloc(1, sizeof(SViewMeta));
if (NULL == pMeta) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_JRET(dupViewMetaFromRsp(pRsp, pMeta));
code = ctgWriteViewMetaToCache(pCtg, dbCache, pRsp->dbFName, pRsp->name, pMeta); code = ctgWriteViewMetaToCache(pCtg, dbCache, pRsp->dbFName, pRsp->name, pMeta);
pMeta = NULL; pMeta = NULL;
_return: _return:
tFreeSViewMetaRsp(pRsp);
taosMemoryFree(pRsp); taosMemoryFree(pRsp);
ctgFreeSViewMeta(pMeta);
taosMemoryFree(pMeta);
CTG_RET(code); CTG_RET(code);
} }

View File

@ -301,6 +301,20 @@ int32_t ctgUpdateRentViewVersion(SCatalog *pCtg, char *dbFName, char *viewName,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgUpdateRentGrantVersion(SCatalog *pCtg, int32_t grantId,
SCtgGrantCache *pCache) {
SGrantVersion metaRent = {.grantId = grantId, .version = pCache->grantInfo.version};
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->grantRent, &metaRent, metaRent.grantId, sizeof(SGrantVersion),
ctgGrantVersionSortCompare, ctgGrantVersionSearchCompare));
ctgDebug("grant %d version %d updated to grantRent", grantId, metaRent.version);
return TSDB_CODE_SUCCESS;
}

View File

@ -1328,6 +1328,16 @@ int32_t ctgViewVersionSearchCompare(const void* key1, const void* key2) {
} }
} }
int32_t ctgGrantVersionSearchCompare(const void* key1, const void* key2) {
if (*(int32_t*)key1 == ((SGrantVersion*)key2)->grantId) {
return 0;
} else if (*(int32_t*)key1 < ((SGrantVersion*)key2)->grantId) {
return -1;
} else {
return 1;
}
}
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) { int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) {
if (((SSTableVersion*)key1)->suid < ((SSTableVersion*)key2)->suid) { if (((SSTableVersion*)key1)->suid < ((SSTableVersion*)key2)->suid) {
@ -1359,6 +1369,15 @@ int32_t ctgViewVersionSortCompare(const void* key1, const void* key2) {
} }
} }
int32_t ctgGrantVersionSortCompare(const void* key1, const void* key2) {
if (((SGrantVersion*)key1)->grantId == ((SGrantVersion*)key2)->grantId) {
return 0;
} else if (((SGrantVersion*)key1)->grantId < ((SGrantVersion*)key2)->grantId) {
return -1;
} else {
return 1;
}
}
int32_t ctgMakeVgArray(SDBVgInfo* dbInfo) { int32_t ctgMakeVgArray(SDBVgInfo* dbInfo) {
if (NULL == dbInfo) { if (NULL == dbInfo) {