feat: support cache db cfg
This commit is contained in:
parent
ba1219303b
commit
7f518ca4ea
|
@ -939,6 +939,8 @@ int32_t tSerializeSVDropTtlTableReq(void* buf, int32_t bufLen, SVDropTtlTableReq
|
||||||
int32_t tDeserializeSVDropTtlTableReq(void* buf, int32_t bufLen, SVDropTtlTableReq* pReq);
|
int32_t tDeserializeSVDropTtlTableReq(void* buf, int32_t bufLen, SVDropTtlTableReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
int64_t dbId;
|
||||||
|
int32_t cfgVersion;
|
||||||
int32_t numOfVgroups;
|
int32_t numOfVgroups;
|
||||||
int32_t numOfStables;
|
int32_t numOfStables;
|
||||||
int32_t buffer;
|
int32_t buffer;
|
||||||
|
@ -971,6 +973,8 @@ typedef struct {
|
||||||
int16_t sstTrigger;
|
int16_t sstTrigger;
|
||||||
} SDbCfgRsp;
|
} SDbCfgRsp;
|
||||||
|
|
||||||
|
typedef SDbCfgRsp SDbCfgInfo;
|
||||||
|
|
||||||
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp);
|
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp);
|
||||||
int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp);
|
int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp);
|
||||||
|
|
||||||
|
|
|
@ -127,13 +127,14 @@ typedef struct SSTableVersion {
|
||||||
int32_t smaVer;
|
int32_t smaVer;
|
||||||
} SSTableVersion;
|
} SSTableVersion;
|
||||||
|
|
||||||
typedef struct SDbVgVersion {
|
typedef struct SDbCacheInfo {
|
||||||
char dbFName[TSDB_DB_FNAME_LEN];
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
int64_t dbId;
|
int64_t dbId;
|
||||||
int32_t vgVersion;
|
int32_t vgVersion;
|
||||||
|
int32_t cfgVersion;
|
||||||
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
|
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
|
||||||
int64_t stateTs;
|
int64_t stateTs;
|
||||||
} SDbVgVersion;
|
} SDbCacheInfo;
|
||||||
|
|
||||||
typedef struct STbSVersion {
|
typedef struct STbSVersion {
|
||||||
char* tbFName;
|
char* tbFName;
|
||||||
|
@ -146,7 +147,6 @@ typedef struct SUserAuthVersion {
|
||||||
int32_t version;
|
int32_t version;
|
||||||
} SUserAuthVersion;
|
} SUserAuthVersion;
|
||||||
|
|
||||||
typedef SDbCfgRsp SDbCfgInfo;
|
|
||||||
typedef SUserIndexRsp SIndexInfo;
|
typedef SUserIndexRsp SIndexInfo;
|
||||||
|
|
||||||
typedef void (*catalogCallback)(SMetaData* pResult, void* param, int32_t code);
|
typedef void (*catalogCallback)(SMetaData* pResult, void* param, int32_t code);
|
||||||
|
@ -302,7 +302,7 @@ int32_t catalogGetDnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray*
|
||||||
|
|
||||||
int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableVersion** stables, uint32_t* num);
|
int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableVersion** stables, uint32_t* num);
|
||||||
|
|
||||||
int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion** dbs, uint32_t* num);
|
int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbCacheInfo** dbs, uint32_t* num);
|
||||||
|
|
||||||
int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion** users, uint32_t* num);
|
int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion** users, uint32_t* num);
|
||||||
|
|
||||||
|
|
|
@ -259,6 +259,7 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst);
|
||||||
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst);
|
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst);
|
||||||
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst);
|
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst);
|
||||||
void freeVgInfo(SDBVgInfo* vgInfo);
|
void freeVgInfo(SDBVgInfo* vgInfo);
|
||||||
|
void freeDbCfgInfo(SDbCfgInfo *pInfo);
|
||||||
|
|
||||||
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen,
|
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen,
|
||||||
void* (*mallocFp)(int64_t));
|
void* (*mallocFp)(int64_t));
|
||||||
|
|
|
@ -510,7 +510,7 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
|
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
|
||||||
SDbVgVersion *dbs = NULL;
|
SDbCacheInfo *dbs = NULL;
|
||||||
uint32_t dbNum = 0;
|
uint32_t dbNum = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -525,7 +525,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < dbNum; ++i) {
|
for (int32_t i = 0; i < dbNum; ++i) {
|
||||||
SDbVgVersion *db = &dbs[i];
|
SDbCacheInfo *db = &dbs[i];
|
||||||
tscDebug("the %dth expired dbFName:%s, dbId:%" PRId64 ", vgVersion:%d, numOfTable:%d, startTs:%" PRId64,
|
tscDebug("the %dth expired dbFName:%s, dbId:%" PRId64 ", vgVersion:%d, numOfTable:%d, startTs:%" PRId64,
|
||||||
i, db->dbFName, db->dbId, db->vgVersion, db->numOfTable, db->stateTs);
|
i, db->dbFName, db->dbId, db->vgVersion, db->numOfTable, db->stateTs);
|
||||||
|
|
||||||
|
@ -537,7 +537,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
|
||||||
|
|
||||||
SKv kv = {
|
SKv kv = {
|
||||||
.key = HEARTBEAT_KEY_DBINFO,
|
.key = HEARTBEAT_KEY_DBINFO,
|
||||||
.valueLen = sizeof(SDbVgVersion) * dbNum,
|
.valueLen = sizeof(SDbCacheInfo) * dbNum,
|
||||||
.value = dbs,
|
.value = dbs,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -2998,6 +2998,8 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pRsp->dbId) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pRsp->cfgVersion) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pRsp->numOfVgroups) < 0) return -1;
|
if (tEncodeI32(&encoder, pRsp->numOfVgroups) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pRsp->numOfStables) < 0) return -1;
|
if (tEncodeI32(&encoder, pRsp->numOfStables) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pRsp->buffer) < 0) return -1;
|
if (tEncodeI32(&encoder, pRsp->buffer) < 0) return -1;
|
||||||
|
@ -3045,6 +3047,8 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
|
||||||
tDecoderInit(&decoder, buf, bufLen);
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(&decoder, &pRsp->dbId) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pRsp->cfgVersion) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pRsp->numOfVgroups) < 0) return -1;
|
if (tDecodeI32(&decoder, &pRsp->numOfVgroups) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pRsp->numOfStables) < 0) return -1;
|
if (tDecodeI32(&decoder, &pRsp->numOfStables) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pRsp->buffer) < 0) return -1;
|
if (tDecodeI32(&decoder, &pRsp->buffer) < 0) return -1;
|
||||||
|
|
|
@ -26,7 +26,7 @@ int32_t mndInitDb(SMnode *pMnode);
|
||||||
void mndCleanupDb(SMnode *pMnode);
|
void mndCleanupDb(SMnode *pMnode);
|
||||||
SDbObj *mndAcquireDb(SMnode *pMnode, const char *db);
|
SDbObj *mndAcquireDb(SMnode *pMnode, const char *db);
|
||||||
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
|
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
|
||||||
int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen);
|
int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen);
|
||||||
int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq);
|
int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq);
|
||||||
bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb);
|
bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb);
|
||||||
|
|
||||||
|
|
|
@ -883,6 +883,8 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cfgRsp.dbId = pDb->uid;
|
||||||
|
cfgRsp.cfgVersion = pDb->cfgVersion;
|
||||||
cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups;
|
cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups;
|
||||||
cfgRsp.numOfStables = pDb->cfg.numOfStables;
|
cfgRsp.numOfStables = pDb->cfg.numOfStables;
|
||||||
cfgRsp.buffer = pDb->cfg.buffer;
|
cfgRsp.buffer = pDb->cfg.buffer;
|
||||||
|
@ -1309,7 +1311,7 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen) {
|
int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen) {
|
||||||
SUseDbBatchRsp batchUseRsp = {0};
|
SUseDbBatchRsp batchUseRsp = {0};
|
||||||
batchUseRsp.pArray = taosArrayInit(numOfDbs, sizeof(SUseDbRsp));
|
batchUseRsp.pArray = taosArrayInit(numOfDbs, sizeof(SUseDbRsp));
|
||||||
if (batchUseRsp.pArray == NULL) {
|
if (batchUseRsp.pArray == NULL) {
|
||||||
|
@ -1318,7 +1320,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfDbs; ++i) {
|
for (int32_t i = 0; i < numOfDbs; ++i) {
|
||||||
SDbVgVersion *pDbVgVersion = &pDbs[i];
|
SDbCacheInfo *pDbVgVersion = &pDbs[i];
|
||||||
pDbVgVersion->dbId = be64toh(pDbVgVersion->dbId);
|
pDbVgVersion->dbId = be64toh(pDbVgVersion->dbId);
|
||||||
pDbVgVersion->vgVersion = htonl(pDbVgVersion->vgVersion);
|
pDbVgVersion->vgVersion = htonl(pDbVgVersion->vgVersion);
|
||||||
pDbVgVersion->numOfTable = htonl(pDbVgVersion->numOfTable);
|
pDbVgVersion->numOfTable = htonl(pDbVgVersion->numOfTable);
|
||||||
|
|
|
@ -530,7 +530,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
||||||
case HEARTBEAT_KEY_DBINFO: {
|
case HEARTBEAT_KEY_DBINFO: {
|
||||||
void *rspMsg = NULL;
|
void *rspMsg = NULL;
|
||||||
int32_t rspLen = 0;
|
int32_t rspLen = 0;
|
||||||
mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
|
mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbCacheInfo), &rspMsg, &rspLen);
|
||||||
if (rspMsg && rspLen > 0) {
|
if (rspMsg && rspLen > 0) {
|
||||||
SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
|
SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
|
||||||
taosArrayPush(hbRsp.info, &kv1);
|
taosArrayPush(hbRsp.info, &kv1);
|
||||||
|
|
|
@ -55,6 +55,7 @@ enum {
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
CTG_OP_UPDATE_VGROUP = 0,
|
CTG_OP_UPDATE_VGROUP = 0,
|
||||||
|
CTG_OP_UPDATE_DB_CFG,
|
||||||
CTG_OP_UPDATE_TB_META,
|
CTG_OP_UPDATE_TB_META,
|
||||||
CTG_OP_DROP_DB_CACHE,
|
CTG_OP_DROP_DB_CACHE,
|
||||||
CTG_OP_DROP_DB_VGROUP,
|
CTG_OP_DROP_DB_VGROUP,
|
||||||
|
@ -204,11 +205,17 @@ typedef struct SCtgVgCache {
|
||||||
SDBVgInfo* vgInfo;
|
SDBVgInfo* vgInfo;
|
||||||
} SCtgVgCache;
|
} SCtgVgCache;
|
||||||
|
|
||||||
|
typedef struct SCtgCfgCache {
|
||||||
|
SRWLatch cfgLock;
|
||||||
|
SDbCfgInfo* cfgInfo;
|
||||||
|
} SCtgCfgCache;
|
||||||
|
|
||||||
typedef struct SCtgDBCache {
|
typedef struct SCtgDBCache {
|
||||||
SRWLatch dbLock; // RC between destroy tbCache/stbCache and all reads
|
SRWLatch dbLock; // RC between destroy tbCache/stbCache and all reads
|
||||||
uint64_t dbId;
|
uint64_t dbId;
|
||||||
int8_t deleted;
|
int8_t deleted;
|
||||||
SCtgVgCache vgCache;
|
SCtgVgCache vgCache;
|
||||||
|
SCtgCfgCache cfgCache;
|
||||||
SHashObj* tbCache; // key:tbname, value:SCtgTbCache
|
SHashObj* tbCache; // key:tbname, value:SCtgTbCache
|
||||||
SHashObj* stbCache; // key:suid, value:char*
|
SHashObj* stbCache; // key:suid, value:char*
|
||||||
} SCtgDBCache;
|
} SCtgDBCache;
|
||||||
|
@ -216,7 +223,7 @@ typedef struct SCtgDBCache {
|
||||||
typedef struct SCtgRentSlot {
|
typedef struct SCtgRentSlot {
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
bool needSort;
|
bool needSort;
|
||||||
SArray* meta; // element is SDbVgVersion or SSTableVersion
|
SArray* meta; // element is SDbCacheInfo or SSTableVersion
|
||||||
} SCtgRentSlot;
|
} SCtgRentSlot;
|
||||||
|
|
||||||
typedef struct SCtgRentMgmt {
|
typedef struct SCtgRentMgmt {
|
||||||
|
@ -367,6 +374,8 @@ typedef struct SCtgCacheStat {
|
||||||
uint64_t numOfUser;
|
uint64_t numOfUser;
|
||||||
uint64_t numOfVgHit;
|
uint64_t numOfVgHit;
|
||||||
uint64_t numOfVgMiss;
|
uint64_t numOfVgMiss;
|
||||||
|
uint64_t numOfCfgHit;
|
||||||
|
uint64_t numOfCfgMiss;
|
||||||
uint64_t numOfMetaHit;
|
uint64_t numOfMetaHit;
|
||||||
uint64_t numOfMetaMiss;
|
uint64_t numOfMetaMiss;
|
||||||
uint64_t numOfIndexHit;
|
uint64_t numOfIndexHit;
|
||||||
|
@ -393,6 +402,13 @@ typedef struct SCtgUpdateVgMsg {
|
||||||
SDBVgInfo* dbInfo;
|
SDBVgInfo* dbInfo;
|
||||||
} SCtgUpdateVgMsg;
|
} SCtgUpdateVgMsg;
|
||||||
|
|
||||||
|
typedef struct SCtgUpdateDbCfgMsg {
|
||||||
|
SCatalog* pCtg;
|
||||||
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
|
uint64_t dbId;
|
||||||
|
SDbCfgInfo* cfgInfo;
|
||||||
|
} SCtgUpdateDbCfgMsg;
|
||||||
|
|
||||||
typedef struct SCtgUpdateTbMetaMsg {
|
typedef struct SCtgUpdateTbMetaMsg {
|
||||||
SCatalog* pCtg;
|
SCatalog* pCtg;
|
||||||
STableMetaOutput* pMeta;
|
STableMetaOutput* pMeta;
|
||||||
|
@ -697,8 +713,10 @@ int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq
|
||||||
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
|
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
|
||||||
int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx,
|
int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx,
|
||||||
int32_t* fetchIdx, int32_t baseResIdx, SArray* pList);
|
int32_t* fetchIdx, int32_t baseResIdx, SArray* pList);
|
||||||
|
void* ctgCloneDbCfgInfo(void* pSrc);
|
||||||
|
|
||||||
int32_t ctgOpUpdateVgroup(SCtgCacheOperation* action);
|
int32_t ctgOpUpdateVgroup(SCtgCacheOperation* action);
|
||||||
|
int32_t ctgOpUpdateDbCfg(SCtgCacheOperation *operation);
|
||||||
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation* action);
|
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation* action);
|
||||||
int32_t ctgOpDropDbCache(SCtgCacheOperation* action);
|
int32_t ctgOpDropDbCache(SCtgCacheOperation* action);
|
||||||
int32_t ctgOpDropDbVgroup(SCtgCacheOperation* action);
|
int32_t ctgOpDropDbVgroup(SCtgCacheOperation* action);
|
||||||
|
@ -720,6 +738,7 @@ int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char* dbFName, int64_t dbId,
|
||||||
bool syncReq);
|
bool syncReq);
|
||||||
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char* dbFName, int64_t dbId, const char* tbName, bool syncReq);
|
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char* dbFName, int64_t dbId, const char* tbName, bool syncReq);
|
||||||
int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char* dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq);
|
int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char* dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq);
|
||||||
|
int32_t ctgUpdateDbCfgEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, SDbCfgInfo *cfgInfo, bool syncOp);
|
||||||
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput* output, bool syncReq);
|
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput* output, bool syncReq);
|
||||||
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp* pAuth, bool syncReq);
|
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp* pAuth, bool syncReq);
|
||||||
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char* dbFName, int32_t vgId, SEpSet* pEpSet);
|
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char* dbFName, int32_t vgId, SEpSet* pEpSet);
|
||||||
|
@ -790,9 +809,9 @@ int32_t ctgGetVgIdsFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, char* dbFNam
|
||||||
void ctgResetTbMetaTask(SCtgTask* pTask);
|
void ctgResetTbMetaTask(SCtgTask* pTask);
|
||||||
void ctgFreeDbCache(SCtgDBCache* dbCache);
|
void ctgFreeDbCache(SCtgDBCache* dbCache);
|
||||||
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);
|
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);
|
||||||
int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2);
|
int32_t ctgDbCacheInfoSortCompare(const void* key1, const void* key2);
|
||||||
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2);
|
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2);
|
||||||
int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2);
|
int32_t ctgDbCacheInfoSearchCompare(const void* key1, const void* key2);
|
||||||
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
|
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
|
||||||
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
|
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
|
||||||
int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target);
|
int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target);
|
||||||
|
@ -817,6 +836,7 @@ int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCt
|
||||||
void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache);
|
void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache);
|
||||||
void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache);
|
void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache);
|
||||||
int32_t ctgChkSetAuthRes(SCatalog *pCtg, SCtgAuthReq *req, SCtgAuthRsp* res);
|
int32_t ctgChkSetAuthRes(SCatalog *pCtg, SCtgAuthReq *req, SCtgAuthRsp* res);
|
||||||
|
int32_t ctgReadDBCfgFromCache(SCatalog *pCtg, const char* dbFName, SDbCfgInfo* pDbCfg);
|
||||||
|
|
||||||
extern SCatalogMgmt gCtgMgmt;
|
extern SCatalogMgmt gCtgMgmt;
|
||||||
extern SCtgDebug gCTGDebug;
|
extern SCtgDebug gCTGDebug;
|
||||||
|
|
|
@ -626,6 +626,23 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgGetDBCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* pDbCfg) {
|
||||||
|
CTG_ERR_RET(ctgReadDBCfgFromCache(pCtg, dbFName, pDbCfg));
|
||||||
|
|
||||||
|
if (pDbCfg->cfgVersion < 0) {
|
||||||
|
CTG_ERR_RET(ctgGetDBCfgFromMnode(pCtg, pConn, dbFName, pDbCfg, NULL));
|
||||||
|
SDbCfgInfo *pCfg = ctgCloneDbCfgInfo(pDbCfg);
|
||||||
|
if (NULL == pCfg) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_ERR_RET(ctgUpdateDbCfgEnqueue(pCtg, dbFName, pDbCfg->dbId, pCfg, false));
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogInit(SCatalogCfg* cfg) {
|
int32_t catalogInit(SCatalogCfg* cfg) {
|
||||||
if (gCtgMgmt.pCluster) {
|
if (gCtgMgmt.pCluster) {
|
||||||
qError("catalog already initialized");
|
qError("catalog already initialized");
|
||||||
|
@ -1326,14 +1343,14 @@ int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableVersion** stables, uint3
|
||||||
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void**)stables, num, sizeof(SSTableVersion)));
|
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void**)stables, num, sizeof(SSTableVersion)));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion** dbs, uint32_t* num) {
|
int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbCacheInfo** dbs, uint32_t* num) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
if (NULL == pCtg || NULL == dbs || NULL == num) {
|
if (NULL == pCtg || NULL == dbs || NULL == num) {
|
||||||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void**)dbs, num, sizeof(SDbVgVersion)));
|
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void**)dbs, num, sizeof(SDbCacheInfo)));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion** users, uint32_t* num) {
|
int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion** users, uint32_t* num) {
|
||||||
|
@ -1381,7 +1398,7 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbF
|
||||||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_API_LEAVE(ctgGetDBCfgFromMnode(pCtg, pConn, dbFName, pDbCfg, NULL));
|
CTG_API_LEAVE(ctgGetDBCfg(pCtg, pConn, dbFName, pDbCfg));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetIndexMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const char* indexName, SIndexInfo* pInfo) {
|
int32_t catalogGetIndexMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const char* indexName, SIndexInfo* pInfo) {
|
||||||
|
|
|
@ -1446,7 +1446,6 @@ int32_t ctgHandleGetTbIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
||||||
CTG_ERR_JRET(ctgCloneTableIndex(pOut->pIndex, &pInfo));
|
CTG_ERR_JRET(ctgCloneTableIndex(pOut->pIndex, &pInfo));
|
||||||
pTask->res = pInfo;
|
pTask->res = pInfo;
|
||||||
|
|
||||||
SCtgTbIndexCtx* ctx = pTask->taskCtx;
|
|
||||||
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pTask->pJob->pCtg, (STableIndex**)&pTask->msgCtx.out, false));
|
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pTask->pJob->pCtg, (STableIndex**)&pTask->msgCtx.out, false));
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
@ -1476,8 +1475,14 @@ _return:
|
||||||
int32_t ctgHandleGetDbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
|
int32_t ctgHandleGetDbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCtgTask* pTask = tReq->pTask;
|
SCtgTask* pTask = tReq->pTask;
|
||||||
|
SCtgDbCfgCtx* ctx = pTask->taskCtx;
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
|
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
|
||||||
|
|
||||||
|
SDbCfgInfo* pCfg = ctgCloneDbCfgInfo(pTask->msgCtx.out);
|
||||||
|
|
||||||
|
CTG_ERR_RET(ctgUpdateDbCfgEnqueue(pTask->pJob->pCtg, ctx->dbFName, pCfg->dbId, pCfg, false));
|
||||||
|
|
||||||
TSWAP(pTask->res, pTask->msgCtx.out);
|
TSWAP(pTask->res, pTask->msgCtx.out);
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
@ -1965,11 +1970,24 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask* pTask) {
|
||||||
SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx;
|
SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx;
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
SDbCfgInfo cfgInfo;
|
||||||
if (NULL == pMsgCtx->pBatchs) {
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
pMsgCtx->pBatchs = pJob->pBatchs;
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CTG_ERR_RET(ctgReadDBCfgFromCache(pCtg, pCtx->dbFName, &cfgInfo));
|
||||||
|
|
||||||
|
if (cfgInfo.cfgVersion < 0) {
|
||||||
CTG_ERR_RET(ctgGetDBCfgFromMnode(pCtg, pConn, pCtx->dbFName, NULL, pTask));
|
CTG_ERR_RET(ctgGetDBCfgFromMnode(pCtg, pConn, pCtx->dbFName, NULL, pTask));
|
||||||
|
} else {
|
||||||
|
pTask->res = taosMemoryCalloc(1, sizeof(SDbCfgInfo));
|
||||||
|
if (NULL == pTask->res) {
|
||||||
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pTask->res, &cfgInfo, sizeof(cfgInfo));
|
||||||
|
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
||||||
SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update vgInfo", ctgOpUpdateVgroup},
|
SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update vgInfo", ctgOpUpdateVgroup},
|
||||||
|
{CTG_OP_UPDATE_DB_CFG, "update dbCfg", ctgOpUpdateDbCfg},
|
||||||
{CTG_OP_UPDATE_TB_META, "update tbMeta", ctgOpUpdateTbMeta},
|
{CTG_OP_UPDATE_TB_META, "update tbMeta", ctgOpUpdateTbMeta},
|
||||||
{CTG_OP_DROP_DB_CACHE, "drop DB", ctgOpDropDbCache},
|
{CTG_OP_DROP_DB_CACHE, "drop DB", ctgOpDropDbCache},
|
||||||
{CTG_OP_DROP_DB_VGROUP, "drop DBVgroup", ctgOpDropDbVgroup},
|
{CTG_OP_DROP_DB_VGROUP, "drop DBVgroup", ctgOpDropDbVgroup},
|
||||||
|
@ -68,10 +69,15 @@ int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ctgRUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); }
|
void ctgRLockDbCfgInfo(SCtgDBCache *dbCache) { CTG_LOCK(CTG_READ, &dbCache->cfgCache.cfgLock); }
|
||||||
|
void ctgWLockDbCfgInfo(SCtgDBCache *dbCache) { CTG_LOCK(CTG_WRITE, &dbCache->cfgCache.cfgLock); }
|
||||||
|
|
||||||
|
void ctgRUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); }
|
||||||
void ctgWUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock); }
|
void ctgWUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock); }
|
||||||
|
|
||||||
|
void ctgRUnlockDbCfgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->cfgCache.cfgLock); }
|
||||||
|
void ctgWUnlockDbCfgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_WRITE, &dbCache->cfgCache.cfgLock); }
|
||||||
|
|
||||||
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
|
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
|
||||||
CTG_UNLOCK(CTG_READ, &dbCache->dbLock);
|
CTG_UNLOCK(CTG_READ, &dbCache->dbLock);
|
||||||
taosHashRelease(pCtg->dbCache, dbCache);
|
taosHashRelease(pCtg->dbCache, dbCache);
|
||||||
|
@ -679,6 +685,43 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgReadDBCfgFromCache(SCatalog *pCtg, const char* dbFName, SDbCfgInfo* pDbCfg) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SCtgDBCache *dbCache = NULL;
|
||||||
|
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
|
||||||
|
if (NULL == dbCache) {
|
||||||
|
ctgDebug("db %s not in cache", dbFName);
|
||||||
|
pDbCfg->cfgVersion = -1;
|
||||||
|
CTG_CACHE_STAT_INC(numOfCfgMiss, 1);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_LOCK(CTG_READ, &dbCache->cfgCache.cfgLock);
|
||||||
|
|
||||||
|
if (dbCache->cfgCache.cfgInfo) {
|
||||||
|
SDbCfgInfo *pInfo = ctgCloneDbCfgInfo(dbCache->cfgCache.cfgInfo);
|
||||||
|
if (NULL == pInfo) {
|
||||||
|
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pDbCfg, pInfo, sizeof(*pInfo));
|
||||||
|
taosMemoryFree(pInfo);
|
||||||
|
CTG_CACHE_STAT_INC(numOfCfgHit, 1);
|
||||||
|
} else {
|
||||||
|
pDbCfg->cfgVersion = -1;
|
||||||
|
CTG_CACHE_STAT_INC(numOfCfgMiss, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
if (dbCache) {
|
||||||
|
CTG_UNLOCK(CTG_READ, &dbCache->cfgCache.cfgLock);
|
||||||
|
ctgReleaseDBCache(pCtg, dbCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t ctgChkAuthFromCache(SCatalog *pCtg, SUserAuthInfo *pReq, bool *inCache, SCtgAuthRsp *pRes) {
|
int32_t ctgChkAuthFromCache(SCatalog *pCtg, SUserAuthInfo *pReq, bool *inCache, SCtgAuthRsp *pRes) {
|
||||||
if (IS_SYS_DBNAME(pReq->tbName.dbname)) {
|
if (IS_SYS_DBNAME(pReq->tbName.dbname)) {
|
||||||
*inCache = true;
|
*inCache = true;
|
||||||
|
@ -957,6 +1000,44 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t ctgUpdateDbCfgEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, SDbCfgInfo *cfgInfo, bool syncOp) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
|
||||||
|
op->opId = CTG_OP_UPDATE_DB_CFG;
|
||||||
|
op->syncOp = syncOp;
|
||||||
|
|
||||||
|
SCtgUpdateDbCfgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateDbCfgMsg));
|
||||||
|
if (NULL == msg) {
|
||||||
|
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateDbCfgMsg));
|
||||||
|
taosMemoryFree(op);
|
||||||
|
freeDbCfgInfo(cfgInfo);
|
||||||
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
char *p = strchr(dbFName, '.');
|
||||||
|
if (p && IS_SYS_DBNAME(p + 1)) {
|
||||||
|
dbFName = p + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
||||||
|
msg->pCtg = pCtg;
|
||||||
|
msg->dbId = dbId;
|
||||||
|
msg->cfgInfo = cfgInfo;
|
||||||
|
|
||||||
|
op->data = msg;
|
||||||
|
|
||||||
|
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
freeDbCfgInfo(cfgInfo);
|
||||||
|
CTG_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool syncOp) {
|
int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool syncOp) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
|
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
|
||||||
|
@ -1370,13 +1451,13 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
|
||||||
|
|
||||||
CTG_CACHE_STAT_INC(numOfDb, 1);
|
CTG_CACHE_STAT_INC(numOfDb, 1);
|
||||||
|
|
||||||
SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1, .stateTs = 0};
|
SDbCacheInfo vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1, .stateTs = 0};
|
||||||
tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
|
tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
|
||||||
|
|
||||||
ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
|
ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
|
||||||
|
|
||||||
if (!IS_SYS_DBNAME(dbFName)) {
|
if (!IS_SYS_DBNAME(dbFName)) {
|
||||||
CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));
|
CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbCacheInfo)));
|
||||||
|
|
||||||
ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, dbId);
|
ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, dbId);
|
||||||
}
|
}
|
||||||
|
@ -1422,7 +1503,7 @@ int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *d
|
||||||
|
|
||||||
CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock);
|
CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock);
|
||||||
|
|
||||||
CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
|
CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbId, ctgDbCacheInfoSortCompare, ctgDbCacheInfoSearchCompare));
|
||||||
ctgDebug("db removed from rent, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
|
ctgDebug("db removed from rent, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
|
||||||
|
|
||||||
if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
|
if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
|
||||||
|
@ -1717,7 +1798,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool newAdded = false;
|
bool newAdded = false;
|
||||||
SDbVgVersion vgVersion = {
|
SDbCacheInfo vgVersion = {
|
||||||
.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable, .stateTs = dbInfo->stateTs};
|
.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable, .stateTs = dbInfo->stateTs};
|
||||||
|
|
||||||
SCtgDBCache *dbCache = NULL;
|
SCtgDBCache *dbCache = NULL;
|
||||||
|
@ -1765,8 +1846,8 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
|
||||||
|
|
||||||
// if (!IS_SYS_DBNAME(dbFName)) {
|
// if (!IS_SYS_DBNAME(dbFName)) {
|
||||||
tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
|
tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
|
||||||
CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
|
CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbCacheInfo),
|
||||||
ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
|
ctgDbCacheInfoSortCompare, ctgDbCacheInfoSearchCompare));
|
||||||
//}
|
//}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
@ -1777,6 +1858,67 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgOpUpdateDbCfg(SCtgCacheOperation *operation) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SCtgUpdateDbCfgMsg *msg = operation->data;
|
||||||
|
SDbCfgInfo *cfgInfo = msg->cfgInfo;
|
||||||
|
char *dbFName = msg->dbFName;
|
||||||
|
SCatalog *pCtg = msg->pCtg;
|
||||||
|
|
||||||
|
if (pCtg->stopUpdate || NULL == cfgInfo) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cfgInfo->cfgVersion < 0) {
|
||||||
|
ctgDebug("invalid db cfgInfo, dbFName:%s, cfgVersion:%d", dbFName, cfgInfo->cfgVersion);
|
||||||
|
CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
SCtgDBCache *dbCache = NULL;
|
||||||
|
CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
|
||||||
|
if (NULL == dbCache) {
|
||||||
|
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, dbFName, msg->dbId);
|
||||||
|
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
SDbCacheInfo cacheInfo = {0};
|
||||||
|
cacheInfo.dbId = dbCache->dbId;
|
||||||
|
tstrncpy(cacheInfo.dbFName, dbFName, sizeof(cacheInfo.dbFName));
|
||||||
|
cacheInfo.cfgVersion = cfgInfo->cfgVersion;
|
||||||
|
|
||||||
|
SCtgVgCache *vgCache = &dbCache->vgCache;
|
||||||
|
if (vgCache->vgInfo) {
|
||||||
|
cacheInfo.vgVersion = vgCache->vgInfo->vgVersion;
|
||||||
|
cacheInfo.numOfTable = vgCache->vgInfo->numOfTable;
|
||||||
|
cacheInfo.stateTs = vgCache->vgInfo->stateTs;
|
||||||
|
} else {
|
||||||
|
cacheInfo.vgVersion = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ctgWLockDbCfgInfo(dbCache);
|
||||||
|
|
||||||
|
freeDbCfgInfo(dbCache->cfgCache.cfgInfo);
|
||||||
|
dbCache->cfgCache.cfgInfo = cfgInfo;
|
||||||
|
cfgInfo = NULL;
|
||||||
|
|
||||||
|
ctgWUnlockDbCfgInfo(dbCache);
|
||||||
|
|
||||||
|
ctgDebug("db cfgInfo updated, dbFName:%s, cfgVer:%d", dbFName, dbCache->cfgCache.cfgInfo->cfgVersion);
|
||||||
|
|
||||||
|
// if (!IS_SYS_DBNAME(dbFName)) {
|
||||||
|
CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &cacheInfo, cacheInfo.dbId, sizeof(SDbCacheInfo),
|
||||||
|
ctgDbCacheInfoSortCompare, ctgDbCacheInfoSearchCompare));
|
||||||
|
//}
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
freeDbCfgInfo(cfgInfo);
|
||||||
|
taosMemoryFreeClear(msg);
|
||||||
|
|
||||||
|
CTG_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
|
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCtgDropDBMsg *msg = operation->data;
|
SCtgDropDBMsg *msg = operation->data;
|
||||||
|
|
|
@ -1039,10 +1039,10 @@ int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2) {
|
int32_t ctgDbCacheInfoSearchCompare(const void* key1, const void* key2) {
|
||||||
if (*(int64_t*)key1 < ((SDbVgVersion*)key2)->dbId) {
|
if (*(int64_t*)key1 < ((SDbCacheInfo*)key2)->dbId) {
|
||||||
return -1;
|
return -1;
|
||||||
} else if (*(int64_t*)key1 > ((SDbVgVersion*)key2)->dbId) {
|
} else if (*(int64_t*)key1 > ((SDbCacheInfo*)key2)->dbId) {
|
||||||
return 1;
|
return 1;
|
||||||
} else {
|
} else {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1059,10 +1059,10 @@ int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2) {
|
int32_t ctgDbCacheInfoSortCompare(const void* key1, const void* key2) {
|
||||||
if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) {
|
if (((SDbCacheInfo*)key1)->dbId < ((SDbCacheInfo*)key2)->dbId) {
|
||||||
return -1;
|
return -1;
|
||||||
} else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) {
|
} else if (((SDbCacheInfo*)key1)->dbId > ((SDbCacheInfo*)key2)->dbId) {
|
||||||
return 1;
|
return 1;
|
||||||
} else {
|
} else {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1233,16 +1233,20 @@ static void* ctgCloneDbVgroup(void* pSrc) { return taosArrayDup((const SArray*)p
|
||||||
|
|
||||||
static void ctgFreeDbVgroup(void* p) { taosArrayDestroy((SArray*)((SMetaRes*)p)->pRes); }
|
static void ctgFreeDbVgroup(void* p) { taosArrayDestroy((SArray*)((SMetaRes*)p)->pRes); }
|
||||||
|
|
||||||
static void* ctgCloneDbCfgInfo(void* pSrc) {
|
void* ctgCloneDbCfgInfo(void* pSrc) {
|
||||||
SDbCfgInfo* pDst = taosMemoryMalloc(sizeof(SDbCfgInfo));
|
SDbCfgInfo* pDst = taosMemoryMalloc(sizeof(SDbCfgInfo));
|
||||||
if (NULL == pDst) {
|
if (NULL == pDst) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
memcpy(pDst, pSrc, sizeof(SDbCfgInfo));
|
memcpy(pDst, pSrc, sizeof(SDbCfgInfo));
|
||||||
|
pDst->pRetensions = taosArrayDup((const SArray*)pSrc, NULL);
|
||||||
return pDst;
|
return pDst;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void ctgFreeDbCfgInfo(void* p) { taosMemoryFree(((SMetaRes*)p)->pRes); }
|
static void ctgFreeDbCfgInfo(void* p) {
|
||||||
|
SDbCfgInfo* pDst = (SDbCfgInfo *)((SMetaRes*)p)->pRes;
|
||||||
|
freeDbCfgInfo(pDst);
|
||||||
|
}
|
||||||
|
|
||||||
static void* ctgCloneDbInfo(void* pSrc) {
|
static void* ctgCloneDbInfo(void* pSrc) {
|
||||||
SDbInfo* pDst = taosMemoryMalloc(sizeof(SDbInfo));
|
SDbInfo* pDst = taosMemoryMalloc(sizeof(SDbInfo));
|
||||||
|
|
|
@ -1326,7 +1326,7 @@ TEST(tableMeta, normalTable) {
|
||||||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
SDbVgVersion *dbs = NULL;
|
SDbCacheInfo *dbs = NULL;
|
||||||
SSTableVersion *stb = NULL;
|
SSTableVersion *stb = NULL;
|
||||||
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
|
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
|
@ -1438,7 +1438,7 @@ TEST(tableMeta, childTableCase) {
|
||||||
|
|
||||||
taosMemoryFree(tableMeta);
|
taosMemoryFree(tableMeta);
|
||||||
|
|
||||||
SDbVgVersion *dbs = NULL;
|
SDbCacheInfo *dbs = NULL;
|
||||||
SSTableVersion *stb = NULL;
|
SSTableVersion *stb = NULL;
|
||||||
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
|
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
|
@ -1579,7 +1579,7 @@ TEST(tableMeta, superTableCase) {
|
||||||
|
|
||||||
taosMemoryFree(tableMeta);
|
taosMemoryFree(tableMeta);
|
||||||
|
|
||||||
SDbVgVersion *dbs = NULL;
|
SDbCacheInfo *dbs = NULL;
|
||||||
SSTableVersion *stb = NULL;
|
SSTableVersion *stb = NULL;
|
||||||
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
|
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
|
@ -2675,7 +2675,7 @@ TEST(rentTest, allRent) {
|
||||||
SDBVgInfo dbVgroup = {0};
|
SDBVgInfo dbVgroup = {0};
|
||||||
SArray *vgList = NULL;
|
SArray *vgList = NULL;
|
||||||
ctgTestStop = false;
|
ctgTestStop = false;
|
||||||
SDbVgVersion *dbs = NULL;
|
SDbCacheInfo *dbs = NULL;
|
||||||
SSTableVersion *stable = NULL;
|
SSTableVersion *stable = NULL;
|
||||||
uint32_t num = 0;
|
uint32_t num = 0;
|
||||||
|
|
||||||
|
|
|
@ -548,3 +548,11 @@ int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void freeDbCfgInfo(SDbCfgInfo *pInfo) {
|
||||||
|
if (pInfo) {
|
||||||
|
taosArrayDestroy(pInfo->pRetensions);
|
||||||
|
}
|
||||||
|
taosMemoryFree(pInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -312,7 +312,7 @@ SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) {
|
SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) {
|
||||||
if (pSrc->size == 0) { // empty array list
|
if (NULL == pSrc || pSrc->size == 0) { // empty array list
|
||||||
return taosArrayInit(8, pSrc->elemSize);
|
return taosArrayInit(8, pSrc->elemSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue