Merge pull request #12802 from taosdata/feature/stream
fix: memory error
This commit is contained in:
commit
fb6bebe65c
|
@ -71,8 +71,8 @@ ELSE ()
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
||||||
IF (${SANITIZER} MATCHES "true")
|
IF (${SANITIZER} MATCHES "true")
|
||||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
|
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
|
||||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
|
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
|
||||||
MESSAGE(STATUS "Will compile with Address Sanitizer!")
|
MESSAGE(STATUS "Will compile with Address Sanitizer!")
|
||||||
ELSE ()
|
ELSE ()
|
||||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
|
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
|
||||||
|
|
|
@ -2524,7 +2524,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
|
||||||
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
|
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
|
||||||
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
|
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
|
||||||
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||||
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t));
|
||||||
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||||
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||||
if (pRsp->blockNum != 0) {
|
if (pRsp->blockNum != 0) {
|
||||||
|
|
|
@ -13,11 +13,11 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "trpc.h"
|
|
||||||
#include "query.h"
|
|
||||||
#include "tname.h"
|
|
||||||
#include "catalogInt.h"
|
#include "catalogInt.h"
|
||||||
|
#include "query.h"
|
||||||
#include "systable.h"
|
#include "systable.h"
|
||||||
|
#include "tname.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
|
||||||
int32_t ctgActUpdateVg(SCtgMetaAction *action);
|
int32_t ctgActUpdateVg(SCtgMetaAction *action);
|
||||||
int32_t ctgActUpdateTbl(SCtgMetaAction *action);
|
int32_t ctgActUpdateTbl(SCtgMetaAction *action);
|
||||||
|
@ -28,37 +28,10 @@ int32_t ctgActUpdateUser(SCtgMetaAction *action);
|
||||||
|
|
||||||
extern SCtgDebug gCTGDebug;
|
extern SCtgDebug gCTGDebug;
|
||||||
SCatalogMgmt gCtgMgmt = {0};
|
SCatalogMgmt gCtgMgmt = {0};
|
||||||
SCtgAction gCtgAction[CTG_ACT_MAX] = {{
|
SCtgAction gCtgAction[CTG_ACT_MAX] = {
|
||||||
CTG_ACT_UPDATE_VG,
|
{CTG_ACT_UPDATE_VG, "update vgInfo", ctgActUpdateVg}, {CTG_ACT_UPDATE_TBL, "update tbMeta", ctgActUpdateTbl},
|
||||||
"update vgInfo",
|
{CTG_ACT_REMOVE_DB, "remove DB", ctgActRemoveDB}, {CTG_ACT_REMOVE_STB, "remove stbMeta", ctgActRemoveStb},
|
||||||
ctgActUpdateVg
|
{CTG_ACT_REMOVE_TBL, "remove tbMeta", ctgActRemoveTbl}, {CTG_ACT_UPDATE_USER, "update user", ctgActUpdateUser}};
|
||||||
},
|
|
||||||
{
|
|
||||||
CTG_ACT_UPDATE_TBL,
|
|
||||||
"update tbMeta",
|
|
||||||
ctgActUpdateTbl
|
|
||||||
},
|
|
||||||
{
|
|
||||||
CTG_ACT_REMOVE_DB,
|
|
||||||
"remove DB",
|
|
||||||
ctgActRemoveDB
|
|
||||||
},
|
|
||||||
{
|
|
||||||
CTG_ACT_REMOVE_STB,
|
|
||||||
"remove stbMeta",
|
|
||||||
ctgActRemoveStb
|
|
||||||
},
|
|
||||||
{
|
|
||||||
CTG_ACT_REMOVE_TBL,
|
|
||||||
"remove tbMeta",
|
|
||||||
ctgActRemoveTbl
|
|
||||||
},
|
|
||||||
{
|
|
||||||
CTG_ACT_UPDATE_USER,
|
|
||||||
"update user",
|
|
||||||
ctgActUpdateUser
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
|
void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
|
||||||
if (NULL == mgmt->slots) {
|
if (NULL == mgmt->slots) {
|
||||||
|
@ -76,7 +49,6 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
|
||||||
taosMemoryFreeClear(mgmt->slots);
|
taosMemoryFreeClear(mgmt->slots);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) {
|
void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) {
|
||||||
CTG_LOCK(CTG_WRITE, &cache->stbLock);
|
CTG_LOCK(CTG_WRITE, &cache->stbLock);
|
||||||
if (cache->stbCache) {
|
if (cache->stbCache) {
|
||||||
|
@ -171,8 +143,6 @@ void ctgFreeHandle(SCatalog* pCtg) {
|
||||||
taosMemoryFree(pCtg);
|
taosMemoryFree(pCtg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void ctgWaitAction(SCtgMetaAction *action) {
|
void ctgWaitAction(SCtgMetaAction *action) {
|
||||||
while (true) {
|
while (true) {
|
||||||
tsem_wait(&gCtgMgmt.queue.rspSem);
|
tsem_wait(&gCtgMgmt.queue.rspSem);
|
||||||
|
@ -204,7 +174,6 @@ void ctgPopAction(SCtgMetaAction **action) {
|
||||||
*action = &node->action;
|
*action = &node->action;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgPushAction(SCatalog *pCtg, SCtgMetaAction *action) {
|
int32_t ctgPushAction(SCatalog *pCtg, SCtgMetaAction *action) {
|
||||||
SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
|
SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
|
||||||
if (NULL == node) {
|
if (NULL == node) {
|
||||||
|
@ -235,7 +204,6 @@ int32_t ctgPushAction(SCatalog* pCtg, SCtgMetaAction *action) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgPushRmDBMsgInQueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) {
|
int32_t ctgPushRmDBMsgInQueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCtgMetaAction action = {.act = CTG_ACT_REMOVE_DB};
|
SCtgMetaAction action = {.act = CTG_ACT_REMOVE_DB};
|
||||||
|
@ -266,8 +234,8 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgPushRmStbMsgInQueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid,
|
||||||
int32_t ctgPushRmStbMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq) {
|
bool syncReq) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCtgMetaAction action = {.act = CTG_ACT_REMOVE_STB, .syncReq = syncReq};
|
SCtgMetaAction action = {.act = CTG_ACT_REMOVE_STB, .syncReq = syncReq};
|
||||||
SCtgRemoveStbMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveStbMsg));
|
SCtgRemoveStbMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveStbMsg));
|
||||||
|
@ -294,8 +262,6 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgPushRmTblMsgInQueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq) {
|
int32_t ctgPushRmTblMsgInQueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCtgMetaAction action = {.act = CTG_ACT_REMOVE_TBL, .syncReq = syncReq};
|
SCtgMetaAction action = {.act = CTG_ACT_REMOVE_TBL, .syncReq = syncReq};
|
||||||
|
@ -423,7 +389,6 @@ int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (NULL == dbCache->vgInfo) {
|
if (NULL == dbCache->vgInfo) {
|
||||||
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
|
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
|
||||||
|
|
||||||
|
@ -449,18 +414,11 @@ int32_t ctgWAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
|
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) { taosHashRelease(pCtg->dbCache, dbCache); }
|
||||||
taosHashRelease(pCtg->dbCache, dbCache);
|
|
||||||
}
|
|
||||||
|
|
||||||
void ctgReleaseVgInfo(SCtgDBCache *dbCache) {
|
void ctgReleaseVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->vgLock); }
|
||||||
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
void ctgWReleaseVgInfo(SCtgDBCache *dbCache) {
|
|
||||||
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
void ctgWReleaseVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); }
|
||||||
|
|
||||||
int32_t ctgAcquireDBCacheImpl(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
|
int32_t ctgAcquireDBCacheImpl(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
|
||||||
char *p = strchr(dbFName, '.');
|
char *p = strchr(dbFName, '.');
|
||||||
|
@ -504,7 +462,6 @@ int32_t ctgGetDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache)
|
||||||
CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
|
CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache, bool *inCache) {
|
int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache, bool *inCache) {
|
||||||
SCtgDBCache *dbCache = NULL;
|
SCtgDBCache *dbCache = NULL;
|
||||||
|
|
||||||
|
@ -585,8 +542,8 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgGetDBVgInfoFromMnode(SCatalog *pCtg, void *pRpc, const SEpSet *pMgmtEps, SBuildUseDBInput *input,
|
||||||
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
|
SUseDbOutput *out) {
|
||||||
char *msg = NULL;
|
char *msg = NULL;
|
||||||
int32_t msgLen = 0;
|
int32_t msgLen = 0;
|
||||||
|
|
||||||
|
@ -660,7 +617,8 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *indexName, SIndexInfo *out) {
|
int32_t ctgGetIndexInfoFromMnode(SCatalog *pCtg, void *pRpc, const SEpSet *pMgmtEps, const char *indexName,
|
||||||
|
SIndexInfo *out) {
|
||||||
char *msg = NULL;
|
char *msg = NULL;
|
||||||
int32_t msgLen = 0;
|
int32_t msgLen = 0;
|
||||||
|
|
||||||
|
@ -697,7 +655,8 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *funcName, SFuncInfo **out) {
|
int32_t ctgGetUdfInfoFromMnode(SCatalog *pCtg, void *pRpc, const SEpSet *pMgmtEps, const char *funcName,
|
||||||
|
SFuncInfo **out) {
|
||||||
char *msg = NULL;
|
char *msg = NULL;
|
||||||
int32_t msgLen = 0;
|
int32_t msgLen = 0;
|
||||||
|
|
||||||
|
@ -740,7 +699,8 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEp
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *user, SGetUserAuthRsp *authRsp) {
|
int32_t ctgGetUserDbAuthFromMnode(SCatalog *pCtg, void *pRpc, const SEpSet *pMgmtEps, const char *user,
|
||||||
|
SGetUserAuthRsp *authRsp) {
|
||||||
char *msg = NULL;
|
char *msg = NULL;
|
||||||
int32_t msgLen = 0;
|
int32_t msgLen = 0;
|
||||||
|
|
||||||
|
@ -777,7 +737,6 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgIsTableMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *exist) {
|
int32_t ctgIsTableMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *exist) {
|
||||||
if (NULL == pCtg->dbCache) {
|
if (NULL == pCtg->dbCache) {
|
||||||
*exist = 0;
|
*exist = 0;
|
||||||
|
@ -814,8 +773,8 @@ int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgGetTableMetaFromCache(SCatalog *pCtg, const SName *pTableName, STableMeta **pTableMeta, bool *inCache,
|
||||||
int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta, bool *inCache, int32_t flag, uint64_t *dbId) {
|
int32_t flag, uint64_t *dbId) {
|
||||||
if (NULL == pCtg->dbCache) {
|
if (NULL == pCtg->dbCache) {
|
||||||
ctgDebug("empty tbmeta cache, tbName:%s", pTableName->tname);
|
ctgDebug("empty tbmeta cache, tbName:%s", pTableName->tname);
|
||||||
goto _return;
|
goto _return;
|
||||||
|
@ -839,7 +798,8 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
|
||||||
|
|
||||||
int32_t sz = 0;
|
int32_t sz = 0;
|
||||||
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
|
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
|
||||||
int32_t code = taosHashGetDup_m(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname), (void **)pTableMeta, &sz);
|
int32_t code = taosHashGetDup_m(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname),
|
||||||
|
(void **)pTableMeta, &sz);
|
||||||
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
|
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
|
||||||
|
|
||||||
if (NULL == *pTableMeta) {
|
if (NULL == *pTableMeta) {
|
||||||
|
@ -864,7 +824,8 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
ctgDebug("Got subtable meta from cache, type:%d, dbFName:%s, tbName:%s, suid:%" PRIx64, tbMeta->tableType, dbFName, pTableName->tname, tbMeta->suid);
|
ctgDebug("Got subtable meta from cache, type:%d, dbFName:%s, tbName:%s, suid:%" PRIx64, tbMeta->tableType, dbFName,
|
||||||
|
pTableName->tname, tbMeta->suid);
|
||||||
|
|
||||||
CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock);
|
CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock);
|
||||||
|
|
||||||
|
@ -881,7 +842,8 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
|
||||||
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
|
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
|
||||||
ctgReleaseDBCache(pCtg, dbCache);
|
ctgReleaseDBCache(pCtg, dbCache);
|
||||||
taosMemoryFreeClear(*pTableMeta);
|
taosMemoryFreeClear(*pTableMeta);
|
||||||
ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid);
|
ctgError("stable suid in stbCache mis-match, expected suid:%" PRIx64 ",actual suid:%" PRIx64, tbMeta->suid,
|
||||||
|
(*stbMeta)->suid);
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -951,7 +913,8 @@ int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const char* dbFName, const char
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass) {
|
int32_t ctgChkAuthFromCache(SCatalog *pCtg, const char *user, const char *dbFName, AUTH_TYPE type, bool *inCache,
|
||||||
|
bool *pass) {
|
||||||
if (NULL == pCtg->userCache) {
|
if (NULL == pCtg->userCache) {
|
||||||
ctgDebug("empty user auth cache, user:%s", user);
|
ctgDebug("empty user auth cache, user:%s", user);
|
||||||
goto _return;
|
goto _return;
|
||||||
|
@ -1000,7 +963,8 @@ _return:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgGetTableMetaFromMnodeImpl(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, char *dbFName, char* tbName, STableMetaOutput* output) {
|
int32_t ctgGetTableMetaFromMnodeImpl(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, char *dbFName, char *tbName,
|
||||||
|
STableMetaOutput *output) {
|
||||||
SBuildTableMetaInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
|
SBuildTableMetaInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
|
||||||
char *msg = NULL;
|
char *msg = NULL;
|
||||||
SEpSet *pVnodeEpSet = NULL;
|
SEpSet *pVnodeEpSet = NULL;
|
||||||
|
@ -1031,7 +995,8 @@ int32_t ctgGetTableMetaFromMnodeImpl(SCatalog* pCtg, void *pTrans, const SEpSet*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
ctgError("error rsp for stablemeta from mnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName, tbName);
|
ctgError("error rsp for stablemeta from mnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName,
|
||||||
|
tbName);
|
||||||
CTG_ERR_RET(rpcRsp.code);
|
CTG_ERR_RET(rpcRsp.code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1046,15 +1011,18 @@ int32_t ctgGetTableMetaFromMnodeImpl(SCatalog* pCtg, void *pTrans, const SEpSet*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgGetTableMetaFromMnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
|
int32_t ctgGetTableMetaFromMnode(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName,
|
||||||
|
STableMetaOutput *output) {
|
||||||
char dbFName[TSDB_DB_FNAME_LEN];
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
tNameGetFullDbName(pTableName, dbFName);
|
tNameGetFullDbName(pTableName, dbFName);
|
||||||
|
|
||||||
return ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, dbFName, (char *)pTableName->tname, output);
|
return ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, dbFName, (char *)pTableName->tname, output);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgGetTableMetaFromVnodeImpl(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
|
int32_t ctgGetTableMetaFromVnodeImpl(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName,
|
||||||
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
|
SVgroupInfo *vgroupInfo, STableMetaOutput *output) {
|
||||||
|
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo ||
|
||||||
|
NULL == output) {
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1063,13 +1031,15 @@ int32_t ctgGetTableMetaFromVnodeImpl(SCatalog* pCtg, void *pTrans, const SEpSet*
|
||||||
|
|
||||||
ctgDebug("try to get table meta from vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
|
ctgDebug("try to get table meta from vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
|
||||||
|
|
||||||
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)};
|
SBuildTableMetaInput bInput = {
|
||||||
|
.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)};
|
||||||
char *msg = NULL;
|
char *msg = NULL;
|
||||||
int32_t msgLen = 0;
|
int32_t msgLen = 0;
|
||||||
|
|
||||||
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen);
|
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen);
|
||||||
if (code) {
|
if (code) {
|
||||||
ctgError("Build vnode tablemeta msg failed, code:%x, dbFName:%s, tbName:%s", code, dbFName, tNameGetTableName(pTableName));
|
ctgError("Build vnode tablemeta msg failed, code:%x, dbFName:%s, tbName:%s", code, dbFName,
|
||||||
|
tNameGetTableName(pTableName));
|
||||||
CTG_ERR_RET(code);
|
CTG_ERR_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1089,13 +1059,15 @@ int32_t ctgGetTableMetaFromVnodeImpl(SCatalog* pCtg, void *pTrans, const SEpSet*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
ctgError("error rsp for table meta from vnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName, tNameGetTableName(pTableName));
|
ctgError("error rsp for table meta from vnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName,
|
||||||
|
tNameGetTableName(pTableName));
|
||||||
CTG_ERR_RET(rpcRsp.code);
|
CTG_ERR_RET(rpcRsp.code);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen);
|
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen);
|
||||||
if (code) {
|
if (code) {
|
||||||
ctgError("Process vnode tablemeta rsp failed, code:%s, dbFName:%s, tbName:%s", tstrerror(code), dbFName, tNameGetTableName(pTableName));
|
ctgError("Process vnode tablemeta rsp failed, code:%s, dbFName:%s, tbName:%s", tstrerror(code), dbFName,
|
||||||
|
tNameGetTableName(pTableName));
|
||||||
CTG_ERR_RET(code);
|
CTG_ERR_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1103,7 +1075,8 @@ int32_t ctgGetTableMetaFromVnodeImpl(SCatalog* pCtg, void *pTrans, const SEpSet*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
|
int32_t ctgGetTableMetaFromVnode(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName,
|
||||||
|
SVgroupInfo *vgroupInfo, STableMetaOutput *output) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t retryNum = 0;
|
int32_t retryNum = 0;
|
||||||
|
|
||||||
|
@ -1217,7 +1190,8 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == vgInfo) {
|
if (NULL == vgInfo) {
|
||||||
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db, taosHashGetSize(dbInfo->vgHash));
|
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db,
|
||||||
|
taosHashGetSize(dbInfo->vgHash));
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1266,7 +1240,6 @@ int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
|
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
|
||||||
mgmt->slotRIdx = 0;
|
mgmt->slotRIdx = 0;
|
||||||
mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
|
mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
|
||||||
|
@ -1285,7 +1258,6 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
|
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
|
||||||
int16_t widx = abs((int)(id % mgmt->slotNum));
|
int16_t widx = abs((int)(id % mgmt->slotNum));
|
||||||
|
|
||||||
|
@ -1296,7 +1268,8 @@ int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size)
|
||||||
if (NULL == slot->meta) {
|
if (NULL == slot->meta) {
|
||||||
slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
|
slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
|
||||||
if (NULL == slot->meta) {
|
if (NULL == slot->meta) {
|
||||||
qError("taosArrayInit %d failed, id:%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type);
|
qError("taosArrayInit %d failed, id:%" PRIx64 ", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx,
|
||||||
|
mgmt->type);
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1316,7 +1289,8 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
|
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare,
|
||||||
|
__compar_fn_t searchCompare) {
|
||||||
int16_t widx = abs((int)(id % mgmt->slotNum));
|
int16_t widx = abs((int)(id % mgmt->slotNum));
|
||||||
|
|
||||||
SCtgRentSlot *slot = &mgmt->slots[widx];
|
SCtgRentSlot *slot = &mgmt->slots[widx];
|
||||||
|
@ -1329,7 +1303,8 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si
|
||||||
}
|
}
|
||||||
|
|
||||||
if (slot->needSort) {
|
if (slot->needSort) {
|
||||||
qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
|
qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type,
|
||||||
|
(int32_t)taosArrayGetSize(slot->meta));
|
||||||
taosArraySort(slot->meta, sortCompare);
|
taosArraySort(slot->meta, sortCompare);
|
||||||
slot->needSort = false;
|
slot->needSort = false;
|
||||||
qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
|
qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
|
||||||
|
@ -1337,7 +1312,8 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si
|
||||||
|
|
||||||
void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
|
void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
|
||||||
if (NULL == orig) {
|
if (NULL == orig) {
|
||||||
qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
|
qError("meta not found in slot, id:%" PRIx64 ", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type,
|
||||||
|
(int32_t)taosArrayGetSize(slot->meta));
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1350,7 +1326,8 @@ _return:
|
||||||
CTG_UNLOCK(CTG_WRITE, &slot->lock);
|
CTG_UNLOCK(CTG_WRITE, &slot->lock);
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
qWarn("meta in rent update failed, will try to add it, code:%x, id:%"PRIx64", slot idx:%d, type:%d", code, id, widx, mgmt->type);
|
qWarn("meta in rent update failed, will try to add it, code:%x, id:%" PRIx64 ", slot idx:%d, type:%d", code, id,
|
||||||
|
widx, mgmt->type);
|
||||||
CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
|
CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1392,7 +1369,6 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
|
int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
|
||||||
int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
|
int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
|
||||||
if (ridx >= mgmt->slotNum) {
|
if (ridx >= mgmt->slotNum) {
|
||||||
|
@ -1468,13 +1444,15 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
|
||||||
SCtgDBCache newDBCache = {0};
|
SCtgDBCache newDBCache = {0};
|
||||||
newDBCache.dbId = dbId;
|
newDBCache.dbId = dbId;
|
||||||
|
|
||||||
newDBCache.tbCache.metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
newDBCache.tbCache.metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum,
|
||||||
|
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
if (NULL == newDBCache.tbCache.metaCache) {
|
if (NULL == newDBCache.tbCache.metaCache) {
|
||||||
ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
|
ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
newDBCache.tbCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
|
newDBCache.tbCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum,
|
||||||
|
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
|
||||||
if (NULL == newDBCache.tbCache.stbCache) {
|
if (NULL == newDBCache.tbCache.stbCache) {
|
||||||
ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
|
ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
|
@ -1511,7 +1489,6 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ctgRemoveStbRent(SCatalog *pCtg, SCtgTbMetaCache *cache) {
|
void ctgRemoveStbRent(SCatalog *pCtg, SCtgTbMetaCache *cache) {
|
||||||
CTG_LOCK(CTG_WRITE, &cache->stbLock);
|
CTG_LOCK(CTG_WRITE, &cache->stbLock);
|
||||||
if (cache->stbCache) {
|
if (cache->stbCache) {
|
||||||
|
@ -1520,7 +1497,8 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) {
|
||||||
uint64_t *suid = NULL;
|
uint64_t *suid = NULL;
|
||||||
suid = taosHashGetKey(pIter, NULL);
|
suid = taosHashGetKey(pIter, NULL);
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
|
if (TSDB_CODE_SUCCESS ==
|
||||||
|
ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
|
||||||
ctgDebug("stb removed from rent, suid:%" PRIx64, *suid);
|
ctgDebug("stb removed from rent, suid:%" PRIx64, *suid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1530,7 +1508,6 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) {
|
||||||
CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
|
CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgRemoveDB(SCatalog *pCtg, SCtgDBCache *dbCache, const char *dbFName) {
|
int32_t ctgRemoveDB(SCatalog *pCtg, SCtgDBCache *dbCache, const char *dbFName) {
|
||||||
uint64_t dbId = dbCache->dbId;
|
uint64_t dbId = dbCache->dbId;
|
||||||
|
|
||||||
|
@ -1558,7 +1535,6 @@ int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgGetAddDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
|
int32_t ctgGetAddDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCtgDBCache *dbCache = NULL;
|
SCtgDBCache *dbCache = NULL;
|
||||||
|
@ -1600,7 +1576,6 @@ int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCt
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgUpdateDBVgInfo(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SDBVgInfo **pDbInfo) {
|
int32_t ctgUpdateDBVgInfo(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SDBVgInfo **pDbInfo) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SDBVgInfo *dbInfo = *pDbInfo;
|
SDBVgInfo *dbInfo = *pDbInfo;
|
||||||
|
@ -1610,8 +1585,8 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
|
if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
|
||||||
ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d",
|
ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash,
|
||||||
dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
|
dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1630,14 +1605,16 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD
|
||||||
|
|
||||||
if (dbCache->vgInfo) {
|
if (dbCache->vgInfo) {
|
||||||
if (dbInfo->vgVersion < dbCache->vgInfo->vgVersion) {
|
if (dbInfo->vgVersion < dbCache->vgInfo->vgVersion) {
|
||||||
ctgDebug("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
|
ctgDebug("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion,
|
||||||
|
dbCache->vgInfo->vgVersion);
|
||||||
ctgWReleaseVgInfo(dbCache);
|
ctgWReleaseVgInfo(dbCache);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dbInfo->vgVersion == dbCache->vgInfo->vgVersion && dbInfo->numOfTable == dbCache->vgInfo->numOfTable) {
|
if (dbInfo->vgVersion == dbCache->vgInfo->vgVersion && dbInfo->numOfTable == dbCache->vgInfo->numOfTable) {
|
||||||
ctgDebug("no new db vgVersion or numOfTable, dbFName:%s, vgVersion:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable);
|
ctgDebug("no new db vgVersion or numOfTable, dbFName:%s, vgVersion:%d, numOfTable:%d", dbFName, dbInfo->vgVersion,
|
||||||
|
dbInfo->numOfTable);
|
||||||
ctgWReleaseVgInfo(dbCache);
|
ctgWReleaseVgInfo(dbCache);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1657,13 +1634,14 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD
|
||||||
dbCache = NULL;
|
dbCache = NULL;
|
||||||
|
|
||||||
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
|
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
|
||||||
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
|
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
|
||||||
|
ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
|
||||||
|
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName,
|
||||||
int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName, STableMeta *meta, int32_t metaSize) {
|
STableMeta *meta, int32_t metaSize) {
|
||||||
SCtgTbMetaCache *tbCache = &dbCache->tbCache;
|
SCtgTbMetaCache *tbCache = &dbCache->tbCache;
|
||||||
|
|
||||||
CTG_LOCK(CTG_READ, &tbCache->metaLock);
|
CTG_LOCK(CTG_READ, &tbCache->metaLock);
|
||||||
|
@ -1680,7 +1658,8 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
|
||||||
if (orig) {
|
if (orig) {
|
||||||
origType = orig->tableType;
|
origType = orig->tableType;
|
||||||
|
|
||||||
if (origType == meta->tableType && orig->uid == meta->uid && orig->sversion >= meta->sversion && orig->tversion >= meta->tversion) {
|
if (origType == meta->tableType && orig->uid == meta->uid && orig->sversion >= meta->sversion &&
|
||||||
|
orig->tversion >= meta->tversion) {
|
||||||
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
|
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1722,7 +1701,8 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
|
||||||
CTG_CACHE_STAT_ADD(tblNum, 1);
|
CTG_CACHE_STAT_ADD(tblNum, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d, suid:%" PRIx64, dbFName, tbName, meta->tableType, meta->suid);
|
ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d, suid:%" PRIx64, dbFName, tbName, meta->tableType,
|
||||||
|
meta->suid);
|
||||||
ctgdShowTableMeta(pCtg, tbName, meta);
|
ctgdShowTableMeta(pCtg, tbName, meta);
|
||||||
|
|
||||||
if (!isStb) {
|
if (!isStb) {
|
||||||
|
@ -1744,9 +1724,11 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
|
||||||
|
|
||||||
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
|
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
|
||||||
|
|
||||||
ctgDebug("stb updated to stbCache, dbFName:%s, tbName:%s, tbType:%d, suid:%" PRIx64 ",ma:%p", dbFName, tbName, meta->tableType, meta->suid, tbMeta);
|
ctgDebug("stb updated to stbCache, dbFName:%s, tbName:%s, tbType:%d, suid:%" PRIx64 ",ma:%p", dbFName, tbName,
|
||||||
|
meta->tableType, meta->suid, tbMeta);
|
||||||
|
|
||||||
SSTableMetaVersion metaRent = {.dbId = dbId, .suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion};
|
SSTableMetaVersion metaRent = {
|
||||||
|
.dbId = dbId, .suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion};
|
||||||
strcpy(metaRent.dbFName, dbFName);
|
strcpy(metaRent.dbFName, dbFName);
|
||||||
strcpy(metaRent.stbName, tbName);
|
strcpy(metaRent.stbName, tbName);
|
||||||
CTG_ERR_RET(ctgMetaRentAdd(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion)));
|
CTG_ERR_RET(ctgMetaRentAdd(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion)));
|
||||||
|
@ -1787,13 +1769,11 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
|
||||||
pIter = taosHashIterate(src->vgHash, pIter);
|
pIter = taosHashIterate(src->vgHash, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgGetDBVgInfo(SCatalog *pCtg, void *pRpc, const SEpSet *pMgmtEps, const char *dbFName, SCtgDBCache **dbCache,
|
||||||
|
SDBVgInfo **pInfo) {
|
||||||
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
|
|
||||||
bool inCache = false;
|
bool inCache = false;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -1869,8 +1849,6 @@ int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, c
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) {
|
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) {
|
||||||
*pOutput = taosMemoryMalloc(sizeof(STableMetaOutput));
|
*pOutput = taosMemoryMalloc(sizeof(STableMetaOutput));
|
||||||
if (NULL == *pOutput) {
|
if (NULL == *pOutput) {
|
||||||
|
@ -1895,9 +1873,8 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgRefreshTblMeta(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName, int32_t flag,
|
||||||
|
STableMetaOutput **pOutput, bool syncReq) {
|
||||||
int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t flag, STableMetaOutput **pOutput, bool syncReq) {
|
|
||||||
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName) {
|
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName) {
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
@ -1919,7 +1896,8 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
|
||||||
if (CTG_FLAG_IS_SYS_DB(flag)) {
|
if (CTG_FLAG_IS_SYS_DB(flag)) {
|
||||||
ctgDebug("will refresh tbmeta, supposed in information_schema, tbName:%s", tNameGetTableName(pTableName));
|
ctgDebug("will refresh tbmeta, supposed in information_schema, tbName:%s", tNameGetTableName(pTableName));
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, (char *)pTableName->dbname, (char *)pTableName->tname, output));
|
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, (char *)pTableName->dbname,
|
||||||
|
(char *)pTableName->tname, output));
|
||||||
} else if (CTG_FLAG_IS_STB(flag)) {
|
} else if (CTG_FLAG_IS_STB(flag)) {
|
||||||
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pTableName));
|
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pTableName));
|
||||||
|
|
||||||
|
@ -1972,9 +1950,11 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (CTG_IS_META_TABLE(output->metaType)) {
|
if (CTG_IS_META_TABLE(output->metaType)) {
|
||||||
ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d", output->dbFName, output->tbName, output->tbMeta->tableType);
|
ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d", output->dbFName, output->tbName,
|
||||||
|
output->tbMeta->tableType);
|
||||||
} else {
|
} else {
|
||||||
ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d, stbMetaGot:%d", output->dbFName, output->ctbName, output->ctbMeta.tableType, CTG_IS_META_BOTH(output->metaType));
|
ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d, stbMetaGot:%d", output->dbFName, output->ctbName,
|
||||||
|
output->ctbMeta.tableType, CTG_IS_META_BOTH(output->metaType));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOutput) {
|
if (pOutput) {
|
||||||
|
@ -1993,7 +1973,8 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t flag) {
|
int32_t ctgGetTableMeta(SCatalog *pCtg, void *pRpc, const SEpSet *pMgmtEps, const SName *pTableName,
|
||||||
|
STableMeta **pTableMeta, int32_t flag) {
|
||||||
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
|
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
@ -2013,7 +1994,8 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
|
||||||
int32_t tbType = 0;
|
int32_t tbType = 0;
|
||||||
|
|
||||||
if (inCache) {
|
if (inCache) {
|
||||||
if (CTG_FLAG_MATCH_STB(flag, (*pTableMeta)->tableType) && ((!CTG_FLAG_IS_FORCE_UPDATE(flag)) || (CTG_FLAG_IS_SYS_DB(flag)))) {
|
if (CTG_FLAG_MATCH_STB(flag, (*pTableMeta)->tableType) &&
|
||||||
|
((!CTG_FLAG_IS_FORCE_UPDATE(flag)) || (CTG_FLAG_IS_SYS_DB(flag)))) {
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2027,7 +2009,6 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
|
||||||
CTG_FLAG_SET_STB(flag, tbType);
|
CTG_FLAG_SET_STB(flag, tbType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, flag, &output, false));
|
CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, flag, &output, false));
|
||||||
|
|
||||||
|
@ -2095,7 +2076,8 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass) {
|
int32_t ctgChkAuth(SCatalog *pCtg, void *pRpc, const SEpSet *pMgmtEps, const char *user, const char *dbFName,
|
||||||
|
AUTH_TYPE type, bool *pass) {
|
||||||
bool inCache = false;
|
bool inCache = false;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -2135,7 +2117,6 @@ _return:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgActUpdateVg(SCtgMetaAction *action) {
|
int32_t ctgActUpdateVg(SCtgMetaAction *action) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCtgUpdateVgMsg *msg = action->data;
|
SCtgUpdateVgMsg *msg = action->data;
|
||||||
|
@ -2162,7 +2143,8 @@ int32_t ctgActRemoveDB(SCtgMetaAction *action) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dbCache->dbId != msg->dbId) {
|
if (dbCache->dbId != msg->dbId) {
|
||||||
ctgInfo("dbId already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId);
|
ctgInfo("dbId already updated, dbFName:%s, dbId:%" PRIx64 ", targetId:%" PRIx64, msg->dbFName, dbCache->dbId,
|
||||||
|
msg->dbId);
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2175,7 +2157,6 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgActUpdateTbl(SCtgMetaAction *action) {
|
int32_t ctgActUpdateTbl(SCtgMetaAction *action) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCtgUpdateTblMsg *msg = action->data;
|
SCtgUpdateTblMsg *msg = action->data;
|
||||||
|
@ -2202,11 +2183,13 @@ int32_t ctgActUpdateTbl(SCtgMetaAction *action) {
|
||||||
if (CTG_IS_META_TABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
|
if (CTG_IS_META_TABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
|
||||||
int32_t metaSize = CTG_META_SIZE(output->tbMeta);
|
int32_t metaSize = CTG_META_SIZE(output->tbMeta);
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->tbName, output->tbMeta, metaSize));
|
CTG_ERR_JRET(
|
||||||
|
ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->tbName, output->tbMeta, metaSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
|
if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
|
||||||
CTG_ERR_JRET(ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->ctbName, (STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta)));
|
CTG_ERR_JRET(ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->ctbName,
|
||||||
|
(STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta)));
|
||||||
}
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
@ -2221,7 +2204,6 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgActRemoveStb(SCtgMetaAction *action) {
|
int32_t ctgActRemoveStb(SCtgMetaAction *action) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCtgRemoveStbMsg *msg = action->data;
|
SCtgRemoveStbMsg *msg = action->data;
|
||||||
|
@ -2234,13 +2216,15 @@ int32_t ctgActRemoveStb(SCtgMetaAction *action) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msg->dbId && (dbCache->dbId != msg->dbId)) {
|
if (msg->dbId && (dbCache->dbId != msg->dbId)) {
|
||||||
ctgDebug("dbId already modified, dbFName:%s, current:%"PRIx64", dbId:%"PRIx64", stb:%s, suid:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
|
ctgDebug("dbId already modified, dbFName:%s, current:%" PRIx64 ", dbId:%" PRIx64 ", stb:%s, suid:%" PRIx64,
|
||||||
|
msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
|
CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
|
||||||
if (taosHashRemove(dbCache->tbCache.stbCache, &msg->suid, sizeof(msg->suid))) {
|
if (taosHashRemove(dbCache->tbCache.stbCache, &msg->suid, sizeof(msg->suid))) {
|
||||||
ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
|
ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%" PRIx64, msg->dbFName, msg->stbName,
|
||||||
|
msg->suid);
|
||||||
} else {
|
} else {
|
||||||
CTG_CACHE_STAT_SUB(stblNum, 1);
|
CTG_CACHE_STAT_SUB(stblNum, 1);
|
||||||
}
|
}
|
||||||
|
@ -2280,7 +2264,8 @@ int32_t ctgActRemoveTbl(SCtgMetaAction *action) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dbCache->dbId != msg->dbId) {
|
if (dbCache->dbId != msg->dbId) {
|
||||||
ctgDebug("dbId already modified, dbFName:%s, current:%"PRIx64", dbId:%"PRIx64", tbName:%s", msg->dbFName, dbCache->dbId, msg->dbId, msg->tbName);
|
ctgDebug("dbId already modified, dbFName:%s, current:%" PRIx64 ", dbId:%" PRIx64 ", tbName:%s", msg->dbFName,
|
||||||
|
dbCache->dbId, msg->dbId, msg->tbName);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2309,7 +2294,8 @@ int32_t ctgActUpdateUser(SCtgMetaAction *action) {
|
||||||
SCatalog *pCtg = msg->pCtg;
|
SCatalog *pCtg = msg->pCtg;
|
||||||
|
|
||||||
if (NULL == pCtg->userCache) {
|
if (NULL == pCtg->userCache) {
|
||||||
pCtg->userCache = taosHashInit(gCtgMgmt.cfg.maxUserCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
pCtg->userCache = taosHashInit(gCtgMgmt.cfg.maxUserCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
|
||||||
|
false, HASH_ENTRY_LOCK);
|
||||||
if (NULL == pCtg->userCache) {
|
if (NULL == pCtg->userCache) {
|
||||||
ctgError("taosHashInit %d user cache failed", gCtgMgmt.cfg.maxUserCacheNum);
|
ctgError("taosHashInit %d user cache failed", gCtgMgmt.cfg.maxUserCacheNum);
|
||||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
@ -2326,7 +2312,7 @@ int32_t ctgActUpdateUser(SCtgMetaAction *action) {
|
||||||
userAuth.readDbs = msg->userAuth.readDbs;
|
userAuth.readDbs = msg->userAuth.readDbs;
|
||||||
userAuth.writeDbs = msg->userAuth.writeDbs;
|
userAuth.writeDbs = msg->userAuth.writeDbs;
|
||||||
|
|
||||||
if (taosHashPut(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user), &userAuth, sizeof(userAuth))) {
|
if (taosHashPut(pCtg->userCache, msg->userAuth.user, sizeof(msg->userAuth.user), &userAuth, sizeof(userAuth))) {
|
||||||
ctgError("taosHashPut user %s to cache failed", msg->userAuth.user);
|
ctgError("taosHashPut user %s to cache failed", msg->userAuth.user);
|
||||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
@ -2356,7 +2342,6 @@ int32_t ctgActUpdateUser(SCtgMetaAction *action) {
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
|
|
||||||
taosHashCleanup(msg->userAuth.createdDbs);
|
taosHashCleanup(msg->userAuth.createdDbs);
|
||||||
taosHashCleanup(msg->userAuth.readDbs);
|
taosHashCleanup(msg->userAuth.readDbs);
|
||||||
taosHashCleanup(msg->userAuth.writeDbs);
|
taosHashCleanup(msg->userAuth.writeDbs);
|
||||||
|
@ -2366,7 +2351,6 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void *ctgUpdateThreadFunc(void *param) {
|
void *ctgUpdateThreadFunc(void *param) {
|
||||||
setThreadName("catalog");
|
setThreadName("catalog");
|
||||||
|
|
||||||
|
@ -2410,7 +2394,6 @@ void* ctgUpdateThreadFunc(void* param) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgStartUpdateThread() {
|
int32_t ctgStartUpdateThread() {
|
||||||
TdThreadAttr thAttr;
|
TdThreadAttr thAttr;
|
||||||
taosThreadAttrInit(&thAttr);
|
taosThreadAttrInit(&thAttr);
|
||||||
|
@ -2425,7 +2408,8 @@ int32_t ctgStartUpdateThread() {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) {
|
int32_t ctgGetTableDistVgInfo(SCatalog *pCtg, void *pRpc, const SEpSet *pMgmtEps, const SName *pTableName,
|
||||||
|
SArray **pVgList) {
|
||||||
STableMeta *tbMeta = NULL;
|
STableMeta *tbMeta = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SVgroupInfo vgroupInfo = {0};
|
SVgroupInfo vgroupInfo = {0};
|
||||||
|
@ -2534,7 +2518,8 @@ int32_t catalogInit(SCatalogCfg *cfg) {
|
||||||
gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
|
gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
|
||||||
}
|
}
|
||||||
|
|
||||||
gCtgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
gCtgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT),
|
||||||
|
false, HASH_ENTRY_LOCK);
|
||||||
if (NULL == gCtgMgmt.pCluster) {
|
if (NULL == gCtgMgmt.pCluster) {
|
||||||
qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
|
qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
|
@ -2559,7 +2544,8 @@ int32_t catalogInit(SCatalogCfg *cfg) {
|
||||||
|
|
||||||
CTG_ERR_RET(ctgStartUpdateThread());
|
CTG_ERR_RET(ctgStartUpdateThread());
|
||||||
|
|
||||||
qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec);
|
qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum,
|
||||||
|
gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2597,7 +2583,8 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
|
||||||
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB));
|
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB));
|
||||||
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE));
|
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE));
|
||||||
|
|
||||||
clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
|
||||||
|
false, HASH_ENTRY_LOCK);
|
||||||
if (NULL == clusterCtg->dbCache) {
|
if (NULL == clusterCtg->dbCache) {
|
||||||
qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
|
qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
|
@ -2684,7 +2671,8 @@ _return:
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SArray** vgroupList) {
|
int32_t catalogGetDBVgInfo(SCatalog *pCtg, void *pRpc, const SEpSet *pMgmtEps, const char *dbFName,
|
||||||
|
SArray **vgroupList) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
if (NULL == pCtg || NULL == dbFName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
|
if (NULL == pCtg || NULL == dbFName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
|
||||||
|
@ -2723,7 +2711,6 @@ _return:
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogUpdateDBVgInfo(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SDBVgInfo *dbInfo) {
|
int32_t catalogUpdateDBVgInfo(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SDBVgInfo *dbInfo) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
@ -2741,7 +2728,6 @@ _return:
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogRemoveDB(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
|
int32_t catalogRemoveDB(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
@ -2764,9 +2750,7 @@ _return:
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet) {
|
int32_t catalogUpdateVgEpSet(SCatalog *pCtg, const char *dbFName, int32_t vgId, SEpSet *epSet) { return 0; }
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t catalogRemoveTableMeta(SCatalog *pCtg, const SName *pTableName) {
|
int32_t catalogRemoveTableMeta(SCatalog *pCtg, const SName *pTableName) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
@ -2800,7 +2784,6 @@ int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName) {
|
||||||
CTG_ERR_JRET(ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, true));
|
CTG_ERR_JRET(ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
taosMemoryFreeClear(tblMeta);
|
taosMemoryFreeClear(tblMeta);
|
||||||
|
@ -2808,7 +2791,6 @@ _return:
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogRemoveStbMeta(SCatalog *pCtg, const char *dbFName, uint64_t dbId, const char *stbName, uint64_t suid) {
|
int32_t catalogRemoveStbMeta(SCatalog *pCtg, const char *dbFName, uint64_t dbId, const char *stbName, uint64_t suid) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
@ -2831,17 +2813,20 @@ _return:
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, const char *pIndexName, SIndexMeta** pIndexMeta) {
|
int32_t catalogGetIndexMeta(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName,
|
||||||
|
const char *pIndexName, SIndexMeta **pIndexMeta) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
|
int32_t catalogGetTableMeta(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName,
|
||||||
|
STableMeta **pTableMeta) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, pTableMeta, CTG_FLAG_UNKNOWN_STB));
|
CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, pTableMeta, CTG_FLAG_UNKNOWN_STB));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetSTableMeta(SCatalog* pCtg, void * pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
|
int32_t catalogGetSTableMeta(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName,
|
||||||
|
STableMeta **pTableMeta) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, pTableMeta, CTG_FLAG_STB));
|
CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, pTableMeta, CTG_FLAG_STB));
|
||||||
|
@ -2883,9 +2868,8 @@ _return:
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgGetTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tbType, uint64_t *suid,
|
||||||
|
char *stbName) {
|
||||||
int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* sver, int32_t *tbType, uint64_t *suid, char* stbName) {
|
|
||||||
*sver = -1;
|
*sver = -1;
|
||||||
|
|
||||||
if (NULL == pCtg->dbCache) {
|
if (NULL == pCtg->dbCache) {
|
||||||
|
@ -2941,7 +2925,8 @@ int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t*
|
||||||
if ((*stbMeta)->suid != *suid) {
|
if ((*stbMeta)->suid != *suid) {
|
||||||
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
|
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
|
||||||
ctgReleaseDBCache(pCtg, dbCache);
|
ctgReleaseDBCache(pCtg, dbCache);
|
||||||
ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, *suid, (*stbMeta)->suid);
|
ctgError("stable suid in stbCache mis-match, expected suid:%" PRIx64 ",actual suid:%" PRIx64, *suid,
|
||||||
|
(*stbMeta)->suid);
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3006,7 +2991,6 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
|
||||||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogRefreshDBVgInfo(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const char *dbFName) {
|
int32_t catalogRefreshDBVgInfo(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const char *dbFName) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
@ -3017,23 +3001,28 @@ int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmt
|
||||||
CTG_API_LEAVE(ctgRefreshDBVgInfo(pCtg, pTrans, pMgmtEps, dbFName));
|
CTG_API_LEAVE(ctgRefreshDBVgInfo(pCtg, pTrans, pMgmtEps, dbFName));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
|
int32_t catalogRefreshTableMeta(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName,
|
||||||
|
int32_t isSTable) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName) {
|
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName) {
|
||||||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable), NULL, true));
|
CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName,
|
||||||
|
CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable), NULL, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
|
int32_t catalogRefreshGetTableMeta(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName,
|
||||||
|
STableMeta **pTableMeta, int32_t isSTable) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, pTableMeta, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable)));
|
CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, pTableMeta,
|
||||||
|
CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable)));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) {
|
int32_t catalogGetTableDistVgInfo(SCatalog *pCtg, void *pRpc, const SEpSet *pMgmtEps, const SName *pTableName,
|
||||||
|
SArray **pVgList) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgList) {
|
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgList) {
|
||||||
|
@ -3051,7 +3040,8 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm
|
||||||
code = ctgGetTableDistVgInfo(pCtg, pRpc, pMgmtEps, pTableName, pVgList);
|
code = ctgGetTableDistVgInfo(pCtg, pRpc, pMgmtEps, pTableName, pVgList);
|
||||||
if (code) {
|
if (code) {
|
||||||
if (TSDB_CODE_CTG_VG_META_MISMATCH == code) {
|
if (TSDB_CODE_CTG_VG_META_MISMATCH == code) {
|
||||||
CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(CTG_FLAG_UNKNOWN_STB), NULL, true));
|
CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName,
|
||||||
|
CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(CTG_FLAG_UNKNOWN_STB), NULL, true));
|
||||||
|
|
||||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||||
tNameGetFullDbName(pTableName, dbFName);
|
tNameGetFullDbName(pTableName, dbFName);
|
||||||
|
@ -3069,8 +3059,8 @@ _return:
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName,
|
||||||
int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
|
SVgroupInfo *pVgroup) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
if (CTG_IS_SYS_DBNAME(pTableName->dbname)) {
|
if (CTG_IS_SYS_DBNAME(pTableName->dbname)) {
|
||||||
|
@ -3103,8 +3093,8 @@ _return:
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t catalogGetAllMeta(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SCatalogReq *pReq,
|
||||||
int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
|
SMetaData *pRsp) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
|
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
|
||||||
|
@ -3227,7 +3217,6 @@ int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_
|
||||||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogGetDBCfg(SCatalog *pCtg, void *pRpc, const SEpSet *pMgmtEps, const char *dbFName, SDbCfgInfo *pDbCfg) {
|
int32_t catalogGetDBCfg(SCatalog *pCtg, void *pRpc, const SEpSet *pMgmtEps, const char *dbFName, SDbCfgInfo *pDbCfg) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
@ -3238,7 +3227,8 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
|
||||||
CTG_API_LEAVE(ctgGetDBCfgFromMnode(pCtg, pRpc, pMgmtEps, dbFName, pDbCfg));
|
CTG_API_LEAVE(ctgGetDBCfgFromMnode(pCtg, pRpc, pMgmtEps, dbFName, pDbCfg));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo) {
|
int32_t catalogGetIndexInfo(SCatalog *pCtg, void *pRpc, const SEpSet *pMgmtEps, const char *indexName,
|
||||||
|
SIndexInfo *pInfo) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == indexName || NULL == pInfo) {
|
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == indexName || NULL == pInfo) {
|
||||||
|
@ -3272,7 +3262,8 @@ _return:
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass) {
|
int32_t catalogChkAuth(SCatalog *pCtg, void *pRpc, const SEpSet *pMgmtEps, const char *user, const char *dbFName,
|
||||||
|
AUTH_TYPE type, bool *pass) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == user || NULL == dbFName || NULL == pass) {
|
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == user || NULL == dbFName || NULL == pass) {
|
||||||
|
@ -3297,7 +3288,6 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) {
|
||||||
CTG_API_LEAVE(ctgPushUpdateUserMsgInQueue(pCtg, pAuth, false));
|
CTG_API_LEAVE(ctgPushUpdateUserMsgInQueue(pCtg, pAuth, false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void catalogDestroy(void) {
|
void catalogDestroy(void) {
|
||||||
qInfo("start to destroy catalog");
|
qInfo("start to destroy catalog");
|
||||||
|
|
||||||
|
@ -3341,5 +3331,3 @@ void catalogDestroy(void) {
|
||||||
qInfo("catalog destroyed");
|
qInfo("catalog destroyed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -15,8 +15,8 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "taoserror.h"
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "taoserror.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
|
||||||
// the add ref count operation may trigger the warning if the reference count is greater than the MAX_WARNING_REF_COUNT
|
// the add ref count operation may trigger the warning if the reference count is greater than the MAX_WARNING_REF_COUNT
|
||||||
|
@ -142,12 +142,11 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
|
||||||
return i;
|
return i;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE SHashNode *
|
static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntry *pe, const void *key, size_t keyLen,
|
||||||
doSearchInEntryList(SHashObj *pHashObj, SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) {
|
uint32_t hashVal) {
|
||||||
SHashNode *pNode = pe->next;
|
SHashNode *pNode = pe->next;
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
if ((pNode->keyLen == keyLen) &&
|
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
|
||||||
((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
|
|
||||||
pNode->removed == 0) {
|
pNode->removed == 0) {
|
||||||
assert(pNode->hashVal == hashVal);
|
assert(pNode->hashVal == hashVal);
|
||||||
break;
|
break;
|
||||||
|
@ -186,7 +185,8 @@ static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *p
|
||||||
* @param pNode the old node with requested key
|
* @param pNode the old node with requested key
|
||||||
* @param pNewNode the new node with requested key
|
* @param pNewNode the new node with requested key
|
||||||
*/
|
*/
|
||||||
static FORCE_INLINE void doUpdateHashNode(SHashObj *pHashObj, SHashEntry* pe, SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) {
|
static FORCE_INLINE void doUpdateHashNode(SHashObj *pHashObj, SHashEntry *pe, SHashNode *prev, SHashNode *pNode,
|
||||||
|
SHashNode *pNewNode) {
|
||||||
assert(pNode->keyLen == pNewNode->keyLen);
|
assert(pNode->keyLen == pNewNode->keyLen);
|
||||||
|
|
||||||
atomic_sub_fetch_16(&pNode->refCount, 1);
|
atomic_sub_fetch_16(&pNode->refCount, 1);
|
||||||
|
@ -227,9 +227,7 @@ static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj);
|
||||||
* @param pHashObj
|
* @param pHashObj
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) {
|
static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) { return taosHashGetSize(pHashObj) == 0; }
|
||||||
return taosHashGetSize(pHashObj) == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) {
|
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) {
|
||||||
if (fn == NULL) {
|
if (fn == NULL) {
|
||||||
|
@ -342,8 +340,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
|
||||||
|
|
||||||
SHashNode *prev = NULL;
|
SHashNode *prev = NULL;
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
if ((pNode->keyLen == keyLen) &&
|
if ((pNode->keyLen == keyLen) && (*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0 &&
|
||||||
(*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0 &&
|
|
||||||
pNode->removed == 0) {
|
pNode->removed == 0) {
|
||||||
assert(pNode->hashVal == hashVal);
|
assert(pNode->hashVal == hashVal);
|
||||||
break;
|
break;
|
||||||
|
@ -513,8 +510,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
|
||||||
SHashNode *prevNode = NULL;
|
SHashNode *prevNode = NULL;
|
||||||
|
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
if ((pNode->keyLen == keyLen) &&
|
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
|
||||||
((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
|
|
||||||
pNode->removed == 0) {
|
pNode->removed == 0) {
|
||||||
code = 0; // it is found
|
code = 0; // it is found
|
||||||
|
|
||||||
|
@ -688,12 +684,13 @@ void taosHashTableResize(SHashObj *pHashObj) {
|
||||||
|
|
||||||
int64_t et = taosGetTimestampUs();
|
int64_t et = taosGetTimestampUs();
|
||||||
|
|
||||||
// uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", (int32_t)pHashObj->capacity,
|
// uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms",
|
||||||
|
// (int32_t)pHashObj->capacity,
|
||||||
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
|
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) {
|
SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) {
|
||||||
SHashNode *pNewNode = taosMemoryMalloc(sizeof(SHashNode) + keyLen + dsize);
|
SHashNode *pNewNode = taosMemoryMalloc(sizeof(SHashNode) + keyLen + dsize + 1);
|
||||||
|
|
||||||
if (pNewNode == NULL) {
|
if (pNewNode == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -727,7 +724,8 @@ size_t taosHashGetMemSize(const SHashObj *pHashObj) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (pHashObj->capacity * (sizeof(SHashEntry) + sizeof(void*))) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + sizeof(SHashObj);
|
return (pHashObj->capacity * (sizeof(SHashEntry) + sizeof(void *))) + sizeof(SHashNode) * taosHashGetSize(pHashObj) +
|
||||||
|
sizeof(SHashObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *taosHashGetKey(void *data, size_t *keyLen) {
|
void *taosHashGetKey(void *data, size_t *keyLen) {
|
||||||
|
@ -751,8 +749,7 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
|
||||||
|
|
||||||
SHashNode *pNode = pe->next;
|
SHashNode *pNode = pe->next;
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
if (pNode == pOld)
|
if (pNode == pOld) break;
|
||||||
break;
|
|
||||||
|
|
||||||
prevNode = pNode;
|
prevNode = pNode;
|
||||||
pNode = pNode->next;
|
pNode = pNode->next;
|
||||||
|
|
Loading…
Reference in New Issue