commit
ff59295424
|
@ -62,6 +62,18 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle)
|
||||||
|
|
||||||
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
|
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a DB's all vgroup info.
|
||||||
|
* @param pCatalog (input, got with catalogGetHandle)
|
||||||
|
* @param pRpc (input, rpc object)
|
||||||
|
* @param pMgmtEps (input, mnode EPs)
|
||||||
|
* @param pDBName (input, full db name)
|
||||||
|
* @param forceUpdate (input, force update db vgroup info from mnode)
|
||||||
|
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
|
||||||
|
* @return error code
|
||||||
|
*/
|
||||||
|
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, int32_t forceUpdate, SArray** pVgroupList);
|
||||||
|
|
||||||
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
|
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -75,6 +75,7 @@ typedef struct STableMeta {
|
||||||
} STableMeta;
|
} STableMeta;
|
||||||
|
|
||||||
typedef struct SDBVgroupInfo {
|
typedef struct SDBVgroupInfo {
|
||||||
|
int32_t lock;
|
||||||
int32_t vgVersion;
|
int32_t vgVersion;
|
||||||
int8_t hashMethod;
|
int8_t hashMethod;
|
||||||
SHashObj *vgInfo; //key:vgId, value:SVgroupInfo
|
SHashObj *vgInfo; //key:vgId, value:SVgroupInfo
|
||||||
|
|
|
@ -144,6 +144,16 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
|
||||||
*/
|
*/
|
||||||
void *taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* destBuf);
|
void *taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* destBuf);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clone the result to interval allocated buffer
|
||||||
|
* @param pHashObj
|
||||||
|
* @param key
|
||||||
|
* @param keyLen
|
||||||
|
* @param destBuf
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void** d, size_t *sz);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* remove item with the specified key
|
* remove item with the specified key
|
||||||
* @param pHashObj
|
* @param pHashObj
|
||||||
|
@ -200,6 +210,26 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p);
|
||||||
*/
|
*/
|
||||||
int32_t taosHashGetKey(void *data, void** key, size_t* keyLen);
|
int32_t taosHashGetKey(void *data, void** key, size_t* keyLen);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* return the payload data with the specified key(reference number added)
|
||||||
|
*
|
||||||
|
* @param pHashObj
|
||||||
|
* @param key
|
||||||
|
* @param keyLen
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
void* taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* release the prevous acquired obj
|
||||||
|
*
|
||||||
|
* @param pHashObj
|
||||||
|
* @param data
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
void taosHashRelease(SHashObj *pHashObj, void *p);
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -31,6 +31,11 @@ extern "C" {
|
||||||
|
|
||||||
#define CTG_DEFAULT_INVALID_VERSION (-1)
|
#define CTG_DEFAULT_INVALID_VERSION (-1)
|
||||||
|
|
||||||
|
enum {
|
||||||
|
CTG_READ = 1,
|
||||||
|
CTG_WRITE,
|
||||||
|
};
|
||||||
|
|
||||||
typedef struct SVgroupListCache {
|
typedef struct SVgroupListCache {
|
||||||
int32_t vgroupVersion;
|
int32_t vgroupVersion;
|
||||||
SHashObj *cache; // key:vgId, value:SVgroupInfo
|
SHashObj *cache; // key:vgId, value:SVgroupInfo
|
||||||
|
@ -41,6 +46,7 @@ typedef struct SDBVgroupCache {
|
||||||
} SDBVgroupCache;
|
} SDBVgroupCache;
|
||||||
|
|
||||||
typedef struct STableMetaCache {
|
typedef struct STableMetaCache {
|
||||||
|
SRWLatch stableLock;
|
||||||
SHashObj *cache; //key:fulltablename, value:STableMeta
|
SHashObj *cache; //key:fulltablename, value:STableMeta
|
||||||
SHashObj *stableCache; //key:suid, value:STableMeta*
|
SHashObj *stableCache; //key:suid, value:STableMeta*
|
||||||
} STableMetaCache;
|
} STableMetaCache;
|
||||||
|
@ -71,6 +77,31 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
|
||||||
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
|
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
|
||||||
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
|
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
|
||||||
|
|
||||||
|
#define CTG_LOCK(type, _lock) do { \
|
||||||
|
if (CTG_READ == (type)) { \
|
||||||
|
if ((*(_lock)) < 0) assert(0); \
|
||||||
|
taosRLockLatch(_lock); \
|
||||||
|
ctgDebug("CTG RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
|
||||||
|
} else { \
|
||||||
|
if ((*(_lock)) < 0) assert(0); \
|
||||||
|
taosWLockLatch(_lock); \
|
||||||
|
ctgDebug("CTG WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define CTG_UNLOCK(type, _lock) do { \
|
||||||
|
if (CTG_READ == (type)) { \
|
||||||
|
if ((*(_lock)) <= 0) assert(0); \
|
||||||
|
taosRUnLockLatch(_lock); \
|
||||||
|
ctgDebug("CTG RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
|
||||||
|
} else { \
|
||||||
|
if ((*(_lock)) <= 0) assert(0); \
|
||||||
|
taosWUnLockLatch(_lock); \
|
||||||
|
ctgDebug("CTG WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -20,24 +20,28 @@
|
||||||
|
|
||||||
SCatalogMgmt ctgMgmt = {0};
|
SCatalogMgmt ctgMgmt = {0};
|
||||||
|
|
||||||
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo *dbInfo, int32_t *exist) {
|
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) {
|
||||||
if (NULL == pCatalog->dbCache.cache) {
|
if (NULL == pCatalog->dbCache.cache) {
|
||||||
*exist = 0;
|
*inCache = false;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDBVgroupInfo *info = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
SDBVgroupInfo *info = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
||||||
|
|
||||||
if (NULL == info) {
|
if (NULL == info) {
|
||||||
*exist = 0;
|
*inCache = false;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dbInfo) {
|
CTG_LOCK(CTG_READ, &info->lock);
|
||||||
*dbInfo = *info;
|
if (NULL == info->vgInfo) {
|
||||||
|
CTG_UNLOCK(CTG_READ, &info->lock);
|
||||||
|
*inCache = false;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
*exist = 1;
|
*dbInfo = info;
|
||||||
|
*inCache = true;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -80,46 +84,51 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
|
||||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
tNameExtractFullName(pTableName, tbFullName);
|
tNameExtractFullName(pTableName, tbFullName);
|
||||||
|
|
||||||
STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName));
|
*pTableMeta = NULL;
|
||||||
|
|
||||||
if (NULL == tbMeta) {
|
size_t sz = 0;
|
||||||
|
STableMeta *tbMeta = taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)pTableMeta, &sz);
|
||||||
|
|
||||||
|
if (NULL == *pTableMeta) {
|
||||||
*exist = 0;
|
*exist = 0;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tbMeta->tableType == TSDB_CHILD_TABLE) {
|
*exist = 1;
|
||||||
STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid));
|
|
||||||
if (NULL == stbMeta || NULL == *stbMeta) {
|
|
||||||
*exist = 0;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((*stbMeta)->suid != tbMeta->suid) {
|
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
|
||||||
ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid);
|
return TSDB_CODE_SUCCESS;
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
}
|
||||||
}
|
|
||||||
|
CTG_LOCK(CTG_READ, &pCatalog->tableCache.stableLock);
|
||||||
int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
|
|
||||||
*pTableMeta = calloc(1, metaSize);
|
STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid));
|
||||||
if (NULL == *pTableMeta) {
|
if (NULL == stbMeta || NULL == *stbMeta) {
|
||||||
ctgError("calloc size[%d] failed", metaSize);
|
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
qError("no stable:%"PRIx64 " meta in cache", tbMeta->suid);
|
||||||
}
|
tfree(*pTableMeta);
|
||||||
|
*exist = 0;
|
||||||
memcpy(*pTableMeta, tbMeta, sizeof(SCTableMeta));
|
return TSDB_CODE_SUCCESS;
|
||||||
memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));
|
|
||||||
} else {
|
|
||||||
int32_t metaSize = sizeof(STableMeta) + (tbMeta->tableInfo.numOfTags + tbMeta->tableInfo.numOfColumns) * sizeof(SSchema);
|
|
||||||
*pTableMeta = calloc(1, metaSize);
|
|
||||||
if (NULL == *pTableMeta) {
|
|
||||||
ctgError("calloc size[%d] failed", metaSize);
|
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(*pTableMeta, tbMeta, metaSize);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
*exist = 1;
|
if ((*stbMeta)->suid != tbMeta->suid) {
|
||||||
|
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
|
||||||
|
tfree(*pTableMeta);
|
||||||
|
ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid);
|
||||||
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
|
||||||
|
*pTableMeta = realloc(*pTableMeta, metaSize);
|
||||||
|
if (NULL == *pTableMeta) {
|
||||||
|
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
|
||||||
|
ctgError("calloc size[%d] failed", metaSize);
|
||||||
|
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));
|
||||||
|
|
||||||
|
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -223,9 +232,11 @@ int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
|
||||||
int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) {
|
int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) {
|
||||||
SHashObj *vgroupHash = NULL;
|
SHashObj *vgroupHash = NULL;
|
||||||
SVgroupInfo *vgInfo = NULL;
|
SVgroupInfo *vgInfo = NULL;
|
||||||
|
SArray *vgList = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
*vgroupList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo));
|
vgList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo));
|
||||||
if (NULL == *vgroupList) {
|
if (NULL == vgList) {
|
||||||
ctgError("taosArrayInit failed");
|
ctgError("taosArrayInit failed");
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
|
@ -234,19 +245,34 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
vgInfo = pIter;
|
vgInfo = pIter;
|
||||||
|
|
||||||
if (NULL == taosArrayPush(*vgroupList, vgInfo)) {
|
if (NULL == taosArrayPush(vgList, vgInfo)) {
|
||||||
ctgError("taosArrayPush failed");
|
ctgError("taosArrayPush failed");
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
pIter = taosHashIterate(dbInfo->vgInfo, pIter);
|
pIter = taosHashIterate(dbInfo->vgInfo, pIter);
|
||||||
vgInfo = NULL;
|
vgInfo = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*vgroupList = vgList;
|
||||||
|
vgList = NULL;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
if (vgList) {
|
||||||
|
taosArrayDestroy(vgList);
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
|
int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
CTG_LOCK(CTG_READ, &dbInfo->lock);
|
||||||
|
|
||||||
int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
|
int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
|
||||||
char db[TSDB_DB_FNAME_LEN] = {0};
|
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||||
tNameGetFullDbName(pTableName, db);
|
tNameGetFullDbName(pTableName, db);
|
||||||
|
@ -259,7 +285,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName
|
||||||
tableNameHashFp fp = NULL;
|
tableNameHashFp fp = NULL;
|
||||||
SVgroupInfo *vgInfo = NULL;
|
SVgroupInfo *vgInfo = NULL;
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
|
CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
|
||||||
|
|
||||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
tNameExtractFullName(pTableName, tbFullName);
|
tNameExtractFullName(pTableName, tbFullName);
|
||||||
|
@ -279,19 +305,23 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName
|
||||||
|
|
||||||
if (NULL == vgInfo) {
|
if (NULL == vgInfo) {
|
||||||
ctgError("no hash range found for hashvalue[%u]", hashValue);
|
ctgError("no hash range found for hashvalue[%u]", hashValue);
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
*pVgroup = *vgInfo;
|
*pVgroup = *vgInfo;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
_return:
|
||||||
|
|
||||||
|
CTG_UNLOCK(CTG_READ, &dbInfo->lock);
|
||||||
|
|
||||||
|
CTG_RET(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
|
int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
|
||||||
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
|
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t exist = 0;
|
int32_t exist = 0;
|
||||||
|
|
||||||
if (!forceUpdate) {
|
if (!forceUpdate) {
|
||||||
|
@ -316,21 +346,23 @@ int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
|
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
if (output->metaNum != 1 && output->metaNum != 2) {
|
if (output->metaNum != 1 && output->metaNum != 2) {
|
||||||
ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
|
ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == output->tbMeta) {
|
if (NULL == output->tbMeta) {
|
||||||
ctgError("no valid table meta got from meta rsp");
|
ctgError("no valid table meta got from meta rsp");
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == pCatalog->tableCache.cache) {
|
if (NULL == pCatalog->tableCache.cache) {
|
||||||
pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
if (NULL == pCatalog->tableCache.cache) {
|
if (NULL == pCatalog->tableCache.cache) {
|
||||||
ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
|
ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -338,50 +370,59 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
|
||||||
pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
|
pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
|
||||||
if (NULL == pCatalog->tableCache.stableCache) {
|
if (NULL == pCatalog->tableCache.stableCache) {
|
||||||
ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
|
ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (output->metaNum == 2) {
|
if (output->metaNum == 2) {
|
||||||
if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
|
if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
|
||||||
ctgError("push ctable[%s] to table cache failed", output->ctbFname);
|
ctgError("push ctable[%s] to table cache failed", output->ctbFname);
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
|
if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
|
||||||
ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
|
ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
|
int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
|
||||||
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
|
|
||||||
ctgError("push table[%s] to table cache failed", output->tbFname);
|
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
|
if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
|
||||||
if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &output->tbMeta, POINTER_BYTES) != 0) {
|
CTG_LOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
|
||||||
|
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
|
||||||
|
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
|
||||||
|
ctgError("push table[%s] to table cache failed", output->tbFname);
|
||||||
|
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname));
|
||||||
|
if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) {
|
||||||
|
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
|
||||||
ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
|
ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
|
}
|
||||||
|
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
|
||||||
|
} else {
|
||||||
|
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
|
||||||
|
ctgError("push table[%s] to table cache failed", output->tbFname);
|
||||||
|
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_return:
|
||||||
|
tfree(output->tbMeta);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) {
|
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo) {
|
||||||
if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) {
|
bool inCache = false;
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t exist = 0;
|
|
||||||
|
|
||||||
if (0 == forceUpdate) {
|
if (0 == forceUpdate) {
|
||||||
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist));
|
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
|
||||||
|
|
||||||
if (exist) {
|
if (inCache) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -397,9 +438,7 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
|
||||||
|
|
||||||
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup));
|
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup));
|
||||||
|
|
||||||
if (dbInfo) {
|
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
|
||||||
*dbInfo = DbOut.dbVgroup;
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -479,17 +518,68 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDBVgroupInfo * dbInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
SDBVgroupInfo * dbInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
||||||
if (NULL == dbInfo) {
|
if (NULL == dbInfo) {
|
||||||
*version = CTG_DEFAULT_INVALID_VERSION;
|
*version = CTG_DEFAULT_INVALID_VERSION;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
*version = dbInfo->vgVersion;
|
*version = dbInfo->vgVersion;
|
||||||
|
taosHashRelease(pCatalog->dbCache.cache, dbInfo);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SArray** vgroupList) {
|
||||||
|
if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
|
||||||
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
SDBVgroupInfo* db = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
SVgroupInfo *vgInfo = NULL;
|
||||||
|
SArray *vgList = NULL;
|
||||||
|
|
||||||
|
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbName, forceUpdate, &db));
|
||||||
|
|
||||||
|
vgList = taosArrayInit(taosHashGetSize(db->vgInfo), sizeof(SVgroupInfo));
|
||||||
|
if (NULL == vgList) {
|
||||||
|
ctgError("taosArrayInit failed");
|
||||||
|
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *pIter = taosHashIterate(db->vgInfo, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
vgInfo = pIter;
|
||||||
|
|
||||||
|
if (NULL == taosArrayPush(vgList, vgInfo)) {
|
||||||
|
ctgError("taosArrayPush failed");
|
||||||
|
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter = taosHashIterate(db->vgInfo, pIter);
|
||||||
|
vgInfo = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
*vgroupList = vgList;
|
||||||
|
vgList = NULL;
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
if (db) {
|
||||||
|
CTG_UNLOCK(CTG_READ, &db->lock);
|
||||||
|
taosHashRelease(pCatalog->dbCache.cache, db);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (vgList) {
|
||||||
|
taosArrayDestroy(vgList);
|
||||||
|
vgList = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
|
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
|
||||||
if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
|
if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
|
@ -497,13 +587,17 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
|
||||||
|
|
||||||
if (dbInfo->vgVersion < 0) {
|
if (dbInfo->vgVersion < 0) {
|
||||||
if (pCatalog->dbCache.cache) {
|
if (pCatalog->dbCache.cache) {
|
||||||
SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
SDBVgroupInfo *oldInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
||||||
if (oldInfo && oldInfo->vgInfo) {
|
if (oldInfo) {
|
||||||
taosHashCleanup(oldInfo->vgInfo);
|
CTG_LOCK(CTG_WRITE, &oldInfo->lock);
|
||||||
oldInfo->vgInfo = NULL;
|
if (oldInfo->vgInfo) {
|
||||||
}
|
taosHashCleanup(oldInfo->vgInfo);
|
||||||
|
oldInfo->vgInfo = NULL;
|
||||||
|
}
|
||||||
|
CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
|
||||||
|
|
||||||
taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
taosHashRelease(pCatalog->dbCache.cache, oldInfo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ctgWarn("remove db [%s] from cache", dbName);
|
ctgWarn("remove db [%s] from cache", dbName);
|
||||||
|
@ -517,10 +611,16 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
SDBVgroupInfo *oldInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
||||||
if (oldInfo && oldInfo->vgInfo) {
|
if (oldInfo) {
|
||||||
taosHashCleanup(oldInfo->vgInfo);
|
CTG_LOCK(CTG_WRITE, &oldInfo->lock);
|
||||||
oldInfo->vgInfo = NULL;
|
if (oldInfo->vgInfo) {
|
||||||
|
taosHashCleanup(oldInfo->vgInfo);
|
||||||
|
oldInfo->vgInfo = NULL;
|
||||||
|
}
|
||||||
|
CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
|
||||||
|
|
||||||
|
taosHashRelease(pCatalog->dbCache.cache, oldInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -573,7 +673,10 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
|
||||||
STableMeta *tbMeta = NULL;
|
STableMeta *tbMeta = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SVgroupInfo vgroupInfo = {0};
|
SVgroupInfo vgroupInfo = {0};
|
||||||
SDBVgroupInfo dbVgroup = {0};
|
SDBVgroupInfo* dbVgroup = NULL;
|
||||||
|
SArray *vgList = NULL;
|
||||||
|
|
||||||
|
*pVgroupList = NULL;
|
||||||
|
|
||||||
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pTableName, &tbMeta));
|
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pTableName, &tbMeta));
|
||||||
|
|
||||||
|
@ -582,38 +685,48 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
|
||||||
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup));
|
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup));
|
||||||
|
|
||||||
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
|
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
|
||||||
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList));
|
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList));
|
||||||
} else {
|
} else {
|
||||||
int32_t vgId = tbMeta->vgId;
|
int32_t vgId = tbMeta->vgId;
|
||||||
if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
|
if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
|
||||||
ctgError("vgId[%d] not found in vgroup list", vgId);
|
ctgError("vgId[%d] not found in vgroup list", vgId);
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
*pVgroupList = taosArrayInit(1, sizeof(SVgroupInfo));
|
vgList = taosArrayInit(1, sizeof(SVgroupInfo));
|
||||||
if (NULL == *pVgroupList) {
|
if (NULL == vgList) {
|
||||||
ctgError("taosArrayInit failed");
|
ctgError("taosArrayInit failed");
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == taosArrayPush(*pVgroupList, &vgroupInfo)) {
|
if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
|
||||||
ctgError("push vgroupInfo to array failed");
|
ctgError("push vgroupInfo to array failed");
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
tfree(tbMeta);
|
*pVgroupList = vgList;
|
||||||
return TSDB_CODE_SUCCESS;
|
vgList = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
tfree(tbMeta);
|
tfree(tbMeta);
|
||||||
taosArrayDestroy(*pVgroupList);
|
|
||||||
|
if (dbVgroup) {
|
||||||
|
CTG_UNLOCK(CTG_READ, &dbVgroup->lock);
|
||||||
|
taosHashRelease(pCatalog->dbCache.cache, dbVgroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (vgList) {
|
||||||
|
taosArrayDestroy(vgList);
|
||||||
|
vgList = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
|
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
|
||||||
SDBVgroupInfo dbInfo = {0};
|
SDBVgroupInfo* dbInfo = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
char db[TSDB_DB_FNAME_LEN] = {0};
|
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||||
|
@ -621,12 +734,14 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter,
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbInfo));
|
CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbInfo));
|
||||||
|
|
||||||
if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
|
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pTableName, pVgroup));
|
||||||
ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", db, dbInfo.vgVersion, dbInfo.vgInfo);
|
|
||||||
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
|
|
||||||
}
|
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pTableName, pVgroup));
|
_return:
|
||||||
|
|
||||||
|
if (dbInfo) {
|
||||||
|
CTG_UNLOCK(CTG_READ, &dbInfo->lock);
|
||||||
|
taosHashRelease(pCatalog->dbCache.cache, dbInfo);
|
||||||
|
}
|
||||||
|
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,11 +106,11 @@ typedef struct SQWorkerMgmt {
|
||||||
if (QW_READ == (type)) { \
|
if (QW_READ == (type)) { \
|
||||||
if ((*(_lock)) < 0) assert(0); \
|
if ((*(_lock)) < 0) assert(0); \
|
||||||
taosRLockLatch(_lock); \
|
taosRLockLatch(_lock); \
|
||||||
qDebug("RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
|
qDebug("QW RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
|
||||||
} else { \
|
} else { \
|
||||||
if ((*(_lock)) < 0) assert(0); \
|
if ((*(_lock)) < 0) assert(0); \
|
||||||
taosWLockLatch(_lock); \
|
taosWLockLatch(_lock); \
|
||||||
qDebug("WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
|
qDebug("QW WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
@ -118,11 +118,11 @@ typedef struct SQWorkerMgmt {
|
||||||
if (QW_READ == (type)) { \
|
if (QW_READ == (type)) { \
|
||||||
if ((*(_lock)) <= 0) assert(0); \
|
if ((*(_lock)) <= 0) assert(0); \
|
||||||
taosRUnLockLatch(_lock); \
|
taosRUnLockLatch(_lock); \
|
||||||
qDebug("RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
|
qDebug("QW RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
|
||||||
} else { \
|
} else { \
|
||||||
if ((*(_lock)) <= 0) assert(0); \
|
if ((*(_lock)) <= 0) assert(0); \
|
||||||
taosWUnLockLatch(_lock); \
|
taosWUnLockLatch(_lock); \
|
||||||
qDebug("WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
|
qDebug("QW WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
|
|
@ -12,4 +12,6 @@ target_link_libraries(
|
||||||
PUBLIC zlib
|
PUBLIC zlib
|
||||||
PUBLIC lz4_static
|
PUBLIC lz4_static
|
||||||
PUBLIC api
|
PUBLIC api
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ADD_SUBDIRECTORY(test)
|
||||||
|
|
|
@ -362,7 +362,7 @@ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, vo
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* d) {
|
void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void* d, bool acquire) {
|
||||||
if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) {
|
if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -404,6 +404,10 @@ void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void*
|
||||||
memcpy(d, GET_HASH_NODE_DATA(pNode), pNode->dataLen);
|
memcpy(d, GET_HASH_NODE_DATA(pNode), pNode->dataLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (acquire) {
|
||||||
|
pNode->count++;
|
||||||
|
}
|
||||||
|
|
||||||
data = GET_HASH_NODE_DATA(pNode);
|
data = GET_HASH_NODE_DATA(pNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -415,6 +419,15 @@ void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void*
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* d) {
|
||||||
|
return taosHashGetCloneImpl(pHashObj, key, keyLen, d, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
void* taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen) {
|
||||||
|
return taosHashGetCloneImpl(pHashObj, key, keyLen, NULL, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, void *data, size_t dsize*/) {
|
int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, void *data, size_t dsize*/) {
|
||||||
if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) {
|
if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -919,3 +932,9 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) {
|
||||||
|
|
||||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void taosHashRelease(SHashObj *pHashObj, void *p) {
|
||||||
|
taosHashCancelIterate(pHashObj, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -13,17 +13,22 @@ IF (HEADER_GTEST_INCLUDE_DIR AND (LIB_GTEST_STATIC_DIR OR LIB_GTEST_SHARED_DIR))
|
||||||
|
|
||||||
LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c)
|
LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c)
|
||||||
ADD_EXECUTABLE(utilTest ${SOURCE_LIST})
|
ADD_EXECUTABLE(utilTest ${SOURCE_LIST})
|
||||||
TARGET_LINK_LIBRARIES(utilTest tutil common os gtest pthread gcov)
|
TARGET_LINK_LIBRARIES(utilTest util common os gtest pthread gcov)
|
||||||
|
|
||||||
|
LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/cacheTest.cpp)
|
||||||
|
LIST(APPEND SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/hashTest.cpp)
|
||||||
|
ADD_EXECUTABLE(hashTest ${SOURCE_LIST})
|
||||||
|
TARGET_LINK_LIBRARIES(hashTest util common os gtest pthread gcov)
|
||||||
|
|
||||||
LIST(APPEND BIN_SRC ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c)
|
LIST(APPEND BIN_SRC ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c)
|
||||||
ADD_EXECUTABLE(trefTest ${BIN_SRC})
|
ADD_EXECUTABLE(trefTest ${BIN_SRC})
|
||||||
TARGET_LINK_LIBRARIES(trefTest common tutil)
|
TARGET_LINK_LIBRARIES(trefTest common util)
|
||||||
|
|
||||||
ENDIF()
|
ENDIF()
|
||||||
|
|
||||||
#IF (TD_LINUX)
|
#IF (TD_LINUX)
|
||||||
# ADD_EXECUTABLE(trefTest ./trefTest.c)
|
# ADD_EXECUTABLE(trefTest ./trefTest.c)
|
||||||
# TARGET_LINK_LIBRARIES(trefTest tutil common)
|
# TARGET_LINK_LIBRARIES(trefTest util common)
|
||||||
#ENDIF ()
|
#ENDIF ()
|
||||||
|
|
||||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||||
|
|
|
@ -4,10 +4,15 @@
|
||||||
#include <taosdef.h>
|
#include <taosdef.h>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
|
||||||
#include "hash.h"
|
#include "thash.h"
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
|
typedef struct TESTSTRUCT {
|
||||||
|
char *p;
|
||||||
|
}TESTSTRUCT;
|
||||||
|
|
||||||
// the simple test code for basic operations
|
// the simple test code for basic operations
|
||||||
void simpleTest() {
|
void simpleTest() {
|
||||||
SHashObj* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
|
SHashObj* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
|
||||||
|
@ -141,6 +146,52 @@ void invalidOperationTest() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void acquireRleaseTest() {
|
||||||
|
SHashObj* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||||
|
ASSERT_EQ(taosHashGetSize(hashTable), 0);
|
||||||
|
|
||||||
|
int32_t key = 2;
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t num = 0;
|
||||||
|
TESTSTRUCT data = {0};
|
||||||
|
char *str1 = "abcdefg";
|
||||||
|
char *str2 = "aaaaaaa";
|
||||||
|
char *str3 = "123456789";
|
||||||
|
|
||||||
|
data.p = (char *)malloc(10);
|
||||||
|
strcpy(data.p, str1);
|
||||||
|
|
||||||
|
code = taosHashPut(hashTable, &key, sizeof(key), &data, sizeof(data));
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
TESTSTRUCT* pdata = (TESTSTRUCT*)taosHashAcquire(hashTable, &key, sizeof(key));
|
||||||
|
ASSERT_TRUE(pdata != nullptr);
|
||||||
|
ASSERT_TRUE(strcmp(pdata->p, str1) == 0);
|
||||||
|
|
||||||
|
code = taosHashRemove(hashTable, &key, sizeof(key));
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
ASSERT_TRUE(strcmp(pdata->p, str1) == 0);
|
||||||
|
|
||||||
|
num = taosHashGetSize(hashTable);
|
||||||
|
ASSERT_EQ(num, 1);
|
||||||
|
|
||||||
|
strcpy(pdata->p, str3);
|
||||||
|
|
||||||
|
data.p = (char *)malloc(10);
|
||||||
|
strcpy(data.p, str2);
|
||||||
|
code = taosHashPut(hashTable, &key, sizeof(key), &data, sizeof(data));
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
num = taosHashGetSize(hashTable);
|
||||||
|
ASSERT_EQ(num, 2);
|
||||||
|
|
||||||
|
printf("%s,expect:%s", pdata->p, str3);
|
||||||
|
ASSERT_TRUE(strcmp(pdata->p, str3) == 0);
|
||||||
|
|
||||||
|
taosHashRelease(hashTable, pdata);
|
||||||
|
num = taosHashGetSize(hashTable);
|
||||||
|
ASSERT_EQ(num, 1);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
@ -153,4 +204,5 @@ TEST(testCase, hashTest) {
|
||||||
stringKeyTest();
|
stringKeyTest();
|
||||||
noLockPerformanceTest();
|
noLockPerformanceTest();
|
||||||
multithreadsTest();
|
multithreadsTest();
|
||||||
|
acquireRleaseTest();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue