enh: add catalog return code processing

This commit is contained in:
dapan1121 2024-07-17 19:23:20 +08:00
parent a62c0fda83
commit 2eec17b20c
8 changed files with 1408 additions and 592 deletions

View File

@ -55,11 +55,12 @@ typedef enum { M2C = 0, C2M } ConvType;
#define tstrncpy(dst, src, size) \
do { \
strncpy((dst), (src), (size)); \
(void)strncpy((dst), (src), (size)); \
(dst)[(size)-1] = 0; \
} while (0)
#define TAOS_STRCPY(_dst, _src) ((void)strcpy(_dst, _src))
#define TAOS_STRNCPY(_dst, _src, _size) ((void)strncpy(_dst, _src, _size))
char *tstrdup(const char *src);
int32_t taosUcs4len(TdUcs4 *ucs4);

View File

@ -669,12 +669,12 @@ typedef struct SCtgCacheItemInfo {
#define CTG_AUTH_READ(_t) ((_t) == AUTH_TYPE_READ || (_t) == AUTH_TYPE_READ_OR_WRITE)
#define CTG_AUTH_WRITE(_t) ((_t) == AUTH_TYPE_WRITE || (_t) == AUTH_TYPE_READ_OR_WRITE)
#define CTG_QUEUE_INC() atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_QUEUE_DEC() atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_QUEUE_INC() (void)atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_QUEUE_DEC() (void)atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n)
#define CTG_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n)
#define CTG_STAT_GET(_item) atomic_load_64(&(_item))
#define CTG_STAT_INC(_item, _n) (void)atomic_add_fetch_64(&(_item), _n)
#define CTG_STAT_DEC(_item, _n) (void)atomic_sub_fetch_64(&(_item), _n)
#define CTG_STAT_GET(_item) (void)atomic_load_64(&(_item))
#define CTG_STAT_API_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.api.item, n))
#define CTG_STAT_RT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.statInfo.runtime.item, n))
@ -971,7 +971,7 @@ int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx,
int32_t* fetchIdx, int32_t baseResIdx, SArray* pList);
void* ctgCloneDbCfgInfo(void* pSrc);
int32_t ctgCloneDbCfgInfo(void* pSrc, SDbCfgInfo** ppDst);
int32_t ctgOpUpdateVgroup(SCtgCacheOperation* action);
int32_t ctgOpUpdateDbCfg(SCtgCacheOperation *operation);
@ -1112,7 +1112,7 @@ int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName);
int32_t ctgRemoveCacheUser(SCatalog* pCtg, SCtgUserAuth* pUser, const char* user);
int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup,
bool* exists);
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch);
int32_t ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch, SName** ppName);
int32_t ctgdGetOneHandle(SCatalog** pHandle);
int ctgVgInfoComp(const void* lp, const void* rp);
int32_t ctgMakeVgArray(SDBVgInfo* dbInfo);
@ -1165,6 +1165,7 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn,
void* bInput);
int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx, int32_t flag,
CTG_TSMA_FETCH_TYPE fetchType, const SName* sourceTbName);
void ctgFreeTask(SCtgTask* pTask, bool freeRes);
extern SCatalogMgmt gCtgMgmt;
extern SCtgDebug gCTGDebug;

View File

@ -86,7 +86,7 @@ int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char*
if (code) {
if (CTG_DB_NOT_EXIST(code) && (NULL != dbCache)) {
ctgDebug("db no longer exist, dbFName:%s, dbId:0x%" PRIx64, input.db, input.dbId);
ctgDropDbCacheEnqueue(pCtg, input.db, input.dbId);
CTG_ERR_RET(ctgDropDbCacheEnqueue(pCtg, input.db, input.dbId));
}
CTG_ERR_RET(code);
@ -116,8 +116,7 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx*
if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
ctgDebug("will refresh tbmeta, supposed in information_schema, tbName:%s", tNameGetTableName(ctx->pName));
CTG_ERR_JRET(
ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char*)ctx->pName->dbname, (char*)ctx->pName->tname, output, NULL));
CTG_ERR_JRET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char*)ctx->pName->dbname, (char*)ctx->pName->tname, output, NULL));
} else if (CTG_FLAG_IS_STB(ctx->flag)) {
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(ctx->pName));
@ -128,8 +127,7 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx*
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgroupInfo, output, NULL));
}
} else {
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName),
ctx->flag);
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
// if get from vnode failed or no table meta, will not try mnode
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgroupInfo, output, NULL));
@ -166,7 +164,7 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx*
if (CTG_IS_META_NULL(output->metaType)) {
ctgError("no tbmeta got, tbName:%s", tNameGetTableName(ctx->pName));
ctgRemoveTbMetaFromCache(pCtg, ctx->pName, false);
CTG_ERR_JRET(ctgRemoveTbMetaFromCache(pCtg, ctx->pName, false));
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
}
@ -216,7 +214,7 @@ int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* ctx
}
if (CTG_IS_META_BOTH(output->metaType)) {
memcpy(output->tbMeta, &output->ctbMeta, sizeof(output->ctbMeta));
TAOS_MEMCPY(output->tbMeta, &output->ctbMeta, sizeof(output->ctbMeta));
*pTableMeta = output->tbMeta;
goto _return;
@ -233,7 +231,7 @@ int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* ctx
taosMemoryFreeClear(output->tbMeta);
SName stbName = *ctx->pName;
strcpy(stbName.tname, output->tbName);
TAOS_STRCPY(stbName.tname, output->tbName);
SCtgTbMetaCtx stbCtx = {0};
stbCtx.flag = ctx->flag;
stbCtx.pName = &stbName;
@ -244,7 +242,7 @@ int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* ctx
continue;
}
memcpy(*pTableMeta, &output->ctbMeta, sizeof(output->ctbMeta));
TAOS_MEMCPY(*pTableMeta, &output->ctbMeta, sizeof(output->ctbMeta));
break;
}
@ -254,15 +252,15 @@ _return:
if (CTG_TABLE_NOT_EXIST(code) && ctx->tbInfo.inCache) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
strcpy(dbFName, ctx->pName->dbname);
TAOS_STRCPY(dbFName, ctx->pName->dbname);
} else {
tNameGetFullDbName(ctx->pName, dbFName);
(void)tNameGetFullDbName(ctx->pName, dbFName);
}
if (TSDB_SUPER_TABLE == ctx->tbInfo.tbType) {
ctgDropStbMetaEnqueue(pCtg, dbFName, ctx->tbInfo.dbId, ctx->pName->tname, ctx->tbInfo.suid, false);
(void)ctgDropStbMetaEnqueue(pCtg, dbFName, ctx->tbInfo.dbId, ctx->pName->tname, ctx->tbInfo.suid, false); // already in error
} else {
ctgDropTbMetaEnqueue(pCtg, dbFName, ctx->tbInfo.dbId, ctx->pName->tname, false);
(void)ctgDropTbMetaEnqueue(pCtg, dbFName, ctx->tbInfo.dbId, ctx->pName->tname, false); // already in error
}
}
@ -285,18 +283,18 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) {
int32_t code = 0;
strcpy(output->dbFName, rspMsg->dbFName);
TAOS_STRCPY(output->dbFName, rspMsg->dbFName);
output->dbId = rspMsg->dbId;
if (TSDB_CHILD_TABLE == rspMsg->tableType && NULL == rspMsg->pSchemas) {
strcpy(output->ctbName, rspMsg->tbName);
TAOS_STRCPY(output->ctbName, rspMsg->tbName);
SET_META_TYPE_CTABLE(output->metaType);
CTG_ERR_JRET(queryCreateCTableMetaFromMsg(rspMsg, &output->ctbMeta));
} else {
strcpy(output->tbName, rspMsg->tbName);
TAOS_STRCPY(output->tbName, rspMsg->tbName);
SET_META_TYPE_TABLE(output->metaType);
@ -348,14 +346,14 @@ int32_t ctgChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, SUserAuthInfo *pReq,
_return:
ctgUpdateUserEnqueue(pCtg, &req.authInfo, false);
(void)ctgUpdateUserEnqueue(pCtg, &req.authInfo, false); // cache update not fatal error
CTG_RET(code);
}
int32_t ctgGetTbType(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, int32_t* tbType) {
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
(void)tNameGetFullDbName(pTableName, dbFName);
CTG_ERR_RET(ctgReadTbTypeFromCache(pCtg, dbFName, pTableName->tname, tbType));
if (*tbType > 0) {
return TSDB_CODE_SUCCESS;
@ -454,7 +452,11 @@ int32_t ctgGetTbTag(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName,
tagVal.type = TSDB_DATA_TYPE_JSON;
tagVal.pData = pJson;
tagVal.nData = strlen(pJson);
taosArrayPush(pTagVals, &tagVal);
if (NULL == taosArrayPush(pTagVals, &tagVal)) {
taosMemoryFree(pJson);
taosArrayDestroy(pTagVals);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
} else {
CTG_ERR_JRET(tTagToValArray((const STag*)pCfg->pTags, &pTagVals));
}
@ -484,7 +486,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTabl
CTG_ERR_JRET(ctgGetTbMeta(pCtg, pConn, &ctx, &tbMeta));
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
(void)tNameGetFullDbName(pTableName, db);
SHashObj* vgHash = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo, NULL));
@ -555,7 +557,7 @@ int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName*
SCtgDBCache* dbCache = NULL;
int32_t code = 0;
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
(void)tNameGetFullDbName(pTableName, db);
SDBVgInfo* vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo, exists));
@ -590,7 +592,7 @@ int32_t ctgGetTbsHashVgId(SCatalog* pCtg, SRequestConnInfo* pConn, int32_t acctI
SCtgDBCache* dbCache = NULL;
int32_t code = 0;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%s", acctId, pDb);
(void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%s", acctId, pDb);
SDBVgInfo* vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, dbFName, &dbCache, &vgInfo, NULL));
@ -615,7 +617,7 @@ _return:
int32_t ctgGetCachedTbVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) {
int32_t code = 0;
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
(void)tNameGetFullDbName(pTableName, db);
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
@ -688,8 +690,8 @@ _return:
void ctgProcessTimerEvent(void *param, void *tmrId) {
CTG_API_NENTER();
ctgdShowCacheInfo();
ctgdShowStatInfo();
(void)ctgdShowCacheInfo();
(void)ctgdShowStatInfo();
int32_t cacheMaxSize = atomic_load_32(&tsMetaCacheMaxSize);
if (cacheMaxSize >= 0) {
@ -703,7 +705,7 @@ void ctgProcessTimerEvent(void *param, void *tmrId) {
int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false);
if (code) {
qError("clear cache enqueue failed, error:%s", tstrerror(code));
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
(void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
}
goto _return;
@ -711,7 +713,7 @@ void ctgProcessTimerEvent(void *param, void *tmrId) {
}
qTrace("reset catalog timer");
taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
(void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
_return:
@ -723,10 +725,8 @@ int32_t ctgGetDBCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName
if (pDbCfg->cfgVersion < 0) {
CTG_ERR_RET(ctgGetDBCfgFromMnode(pCtg, pConn, dbFName, pDbCfg, NULL));
SDbCfgInfo *pCfg = ctgCloneDbCfgInfo(pDbCfg);
if (NULL == pCfg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SDbCfgInfo *pCfg = NULL;
CTG_ERR_RET(ctgCloneDbCfgInfo(pDbCfg, &pCfg));
CTG_ERR_RET(ctgUpdateDbCfgEnqueue(pCtg, dbFName, pDbCfg->dbId, pCfg, false));
}
@ -735,6 +735,59 @@ int32_t ctgGetDBCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName
}
int32_t ctgGetTbTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, SArray** ppRes) {
STableTSMAInfoRsp tsmasRsp = {0};
int32_t code = ctgGetTbTSMAFromMnode(pCtg, pConn, pTableName, &tsmasRsp, NULL, TDMT_MND_GET_TABLE_TSMA);
if (code == TSDB_CODE_MND_SMA_NOT_EXIST) {
code = 0;
goto _return;
}
CTG_ERR_JRET(code);
*ppRes = tsmasRsp.pTsmas;
tsmasRsp.pTsmas = NULL;
for (int32_t i = 0; i < (*ppRes)->size; ++i) {
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, taosArrayGet((*ppRes), i), 0, false));
}
return TSDB_CODE_SUCCESS;
_return:
if (tsmasRsp.pTsmas) {
tFreeTableTSMAInfoRsp(&tsmasRsp);
}
CTG_RET(code);
}
int32_t ctgGetTsma(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTsmaName, STableTSMAInfo** pTsma) {
STableTSMAInfoRsp tsmaRsp = {0};
int32_t code = ctgGetTbTSMAFromMnode(pCtg, pConn, pTsmaName, &tsmaRsp, NULL, TDMT_MND_GET_TSMA);
if (code == TSDB_CODE_MND_SMA_NOT_EXIST) {
code = 0;
goto _return;
}
CTG_ERR_JRET(code);
ASSERT(tsmaRsp.pTsmas && tsmaRsp.pTsmas->size == 1);
*pTsma = taosArrayGetP(tsmaRsp.pTsmas, 0);
taosArrayDestroy(tsmaRsp.pTsmas);
tsmaRsp.pTsmas = NULL;
_return:
if (tsmaRsp.pTsmas) {
tFreeTableTSMAInfoRsp(&tsmaRsp);
}
CTG_RET(code);
}
int32_t catalogInit(SCatalogCfg* cfg) {
qDebug("catalogInit start");
if (gCtgMgmt.pCluster) {
@ -742,10 +795,10 @@ int32_t catalogInit(SCatalogCfg* cfg) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
TAOS_MEMSET(&gCtgMgmt, 0, sizeof(gCtgMgmt));
if (cfg) {
memcpy(&gCtgMgmt.cfg, cfg, sizeof(*cfg));
TAOS_MEMCPY(&gCtgMgmt.cfg, cfg, sizeof(*cfg));
if (gCtgMgmt.cfg.maxDBCacheNum == 0) {
gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
@ -1129,7 +1182,7 @@ int32_t catalogUpdateTableIndex(SCatalog* pCtg, STableIndexRsp* pRsp) {
CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pIndex, pRsp, sizeof(STableIndex));
TAOS_MEMCPY(pIndex, pRsp, sizeof(STableIndex));
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pCtg, &pIndex, false));
@ -1247,17 +1300,26 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo* pConn, SArray*
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t code = TSDB_CODE_SUCCESS;
SName name = {0};
int32_t sver = 0;
int32_t tver = 0;
int32_t tbNum = taosArrayGetSize(pTables);
for (int32_t i = 0; i < tbNum; ++i) {
STbSVersion* pTb = (STbSVersion*)taosArrayGet(pTables, i);
if (NULL == pTb) {
ctgError("fail to get the %dth table, tbNum:%d", i, tbNum);
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
}
if (NULL == pTb->tbFName || 0 == pTb->tbFName[0]) {
continue;
}
tNameFromString(&name, pTb->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (tNameFromString(&name, pTb->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) {
ctgError("invalid tbFName format, tbFName:%s, idx:%d", pTb->tbFName, i);
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
}
if (IS_SYS_DBNAME(name.dbname)) {
continue;
@ -1266,18 +1328,18 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo* pConn, SArray*
int32_t tbType = 0;
uint64_t suid = 0;
char stbName[TSDB_TABLE_FNAME_LEN];
ctgReadTbVerFromCache(pCtg, &name, &sver, &tver, &tbType, &suid, stbName);
CTG_ERR_JRET(ctgReadTbVerFromCache(pCtg, &name, &sver, &tver, &tbType, &suid, stbName));
if ((sver >= 0 && sver < pTb->sver) || (tver >= 0 && tver < pTb->tver)) {
switch (tbType) {
case TSDB_CHILD_TABLE: {
SName stb = name;
tstrncpy(stb.tname, stbName, sizeof(stb.tname));
ctgRemoveTbMeta(pCtg, &stb);
CTG_ERR_JRET(ctgRemoveTbMeta(pCtg, &stb));
break;
}
case TSDB_SUPER_TABLE:
case TSDB_NORMAL_TABLE:
ctgRemoveTbMeta(pCtg, &name);
CTG_ERR_JRET(ctgRemoveTbMeta(pCtg, &name));
break;
default:
ctgError("ignore table type %d", tbType);
@ -1286,7 +1348,9 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo* pConn, SArray*
}
}
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
_return:
CTG_API_LEAVE(code);
}
int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName) {
@ -1454,10 +1518,10 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SC
_return:
if (pJob) {
taosReleaseRef(gCtgMgmt.jobPool, pJob->refId);
(void)taosReleaseRef(gCtgMgmt.jobPool, pJob->refId);
if (code) {
taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
(void)taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
}
}
@ -1558,7 +1622,7 @@ int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion** users, uint32_
while (pAuth != NULL) {
size_t len = 0;
void* key = taosHashGetKey(pAuth, &len);
strncpy((*users)[i].user, key, len);
TAOS_STRNCPY((*users)[i].user, key, len);
(*users)[i].user[len] = 0;
(*users)[i].version = pAuth->userAuth.version;
++i;
@ -1795,10 +1859,12 @@ int32_t catalogAsyncUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** ppTsma, int32_t
if (!pCtg || !ppTsma) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t code = 0;
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, ppTsma, tsmaVersion, false));
_return:
CTG_API_LEAVE(code);
}
@ -1807,10 +1873,12 @@ int32_t catalogUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** pTsma) {
if (!pCtg || !pTsma) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t code = 0;
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, pTsma, 0, true));
_return:
CTG_API_LEAVE(code);
}
@ -1823,36 +1891,14 @@ int32_t catalogRemoveTSMA(SCatalog* pCtg, const STableTSMAInfo* pTsma) {
}
if (!pCtg->dbCache) {
return TSDB_CODE_SUCCESS;
}
CTG_ERR_JRET(ctgDropTbTSMAEnqueue(pCtg, pTsma, true));
_return:
CTG_API_LEAVE(code);
}
int32_t ctgGetTbTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, SArray** ppRes) {
STableTSMAInfoRsp tsmasRsp = {0};
int32_t code = ctgGetTbTSMAFromMnode(pCtg, pConn, pTableName, &tsmasRsp, NULL, TDMT_MND_GET_TABLE_TSMA);
if (code == TSDB_CODE_MND_SMA_NOT_EXIST) {
code = 0;
goto _return;
}
CTG_ERR_JRET(code);
assert(tsmasRsp.pTsmas);
assert(tsmasRsp.pTsmas->size > 0);
*ppRes = tsmasRsp.pTsmas;
tsmasRsp.pTsmas = NULL;
for (int32_t i = 0; i < (*ppRes)->size; ++i) {
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, taosArrayGet((*ppRes), i), 0, false));
}
return TSDB_CODE_SUCCESS;
CTG_ERR_JRET(ctgDropTbTSMAEnqueue(pCtg, pTsma, true));
_return:
if (tsmasRsp.pTsmas) {
tFreeTableTSMAInfoRsp(&tsmasRsp);
}
CTG_RET(code);
CTG_API_LEAVE(code);
}
int32_t catalogGetTableTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes) {
@ -1870,27 +1916,6 @@ _return:
CTG_API_LEAVE(code);
}
int32_t ctgGetTsma(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTsmaName, STableTSMAInfo** pTsma) {
STableTSMAInfoRsp tsmaRsp = {0};
int32_t code = ctgGetTbTSMAFromMnode(pCtg, pConn, pTsmaName, &tsmaRsp, NULL, TDMT_MND_GET_TSMA);
if (code == TSDB_CODE_MND_SMA_NOT_EXIST) {
code = 0;
goto _return;
}
CTG_ERR_JRET(code);
ASSERT(tsmaRsp.pTsmas && tsmaRsp.pTsmas->size == 1);
*pTsma = taosArrayGetP(tsmaRsp.pTsmas, 0);
taosArrayDestroy(tsmaRsp.pTsmas);
tsmaRsp.pTsmas = NULL;
_return:
if (tsmaRsp.pTsmas) {
tFreeTableTSMAInfoRsp(&tsmaRsp);
}
CTG_RET(code);
}
int32_t catalogGetTsma(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTsmaName, STableTSMAInfo** pTsma) {
CTG_API_ENTER();
@ -1903,6 +1928,7 @@ int32_t catalogGetTsma(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTs
CTG_ERR_JRET(ctgGetTsma(pCtg, pConn, pTsmaName, pTsma));
_return:
CTG_API_LEAVE(code);
}
@ -1930,7 +1956,7 @@ void catalogDestroy(void) {
}
if (gCtgMgmt.cacheTimer) {
taosTmrStop(gCtgMgmt.cacheTimer);
(void)taosTmrStop(gCtgMgmt.cacheTimer);
gCtgMgmt.cacheTimer = NULL;
taosTmrCleanUp(gCtgMgmt.timer);
gCtgMgmt.timer = NULL;
@ -1939,8 +1965,8 @@ void catalogDestroy(void) {
atomic_store_8((int8_t*)&gCtgMgmt.exit, true);
if (!taosCheckCurrentInDll()) {
ctgClearCacheEnqueue(NULL, false, true, true, true);
taosThreadJoin(gCtgMgmt.updateThread, NULL);
(void)ctgClearCacheEnqueue(NULL, false, true, true, true);
(void)taosThreadJoin(gCtgMgmt.updateThread, NULL);
}
taosHashCleanup(gCtgMgmt.pCluster);

File diff suppressed because it is too large Load Diff

View File

@ -764,12 +764,10 @@ int32_t ctgReadDBCfgFromCache(SCatalog *pCtg, const char* dbFName, SDbCfgInfo* p
CTG_LOCK(CTG_READ, &dbCache->cfgCache.cfgLock);
if (dbCache->cfgCache.cfgInfo) {
SDbCfgInfo *pInfo = ctgCloneDbCfgInfo(dbCache->cfgCache.cfgInfo);
if (NULL == pInfo) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
SDbCfgInfo *pInfo = NULL;
CTG_ERR_JRET(ctgCloneDbCfgInfo(dbCache->cfgCache.cfgInfo, &pInfo));
memcpy(pDbCfg, pInfo, sizeof(*pInfo));
TAOS_MEMCPY(pDbCfg, pInfo, sizeof(*pInfo));
taosMemoryFree(pInfo);
CTG_CACHE_HIT_INC(CTG_CI_DB_CFG, 1);
} else {

View File

@ -556,7 +556,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
pName = ctgGetFetchName(ctx->pNames, fetch);
CTG_ERR_JRET(ctgGetFetchName(ctx->pNames, fetch, &pName));
} else if (CTG_TASK_GET_TB_TSMA == pTask->type){
SCtgTbTSMACtx* pCtx = pTask->taskCtx;
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
@ -616,7 +616,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
pName = ctgGetFetchName(ctx->pNames, fetch);
CTG_ERR_JRET(ctgGetFetchName(ctx->pNames, fetch, &pName));
} else if (CTG_TASK_GET_TB_TSMA == pTask->type){
SCtgTbTSMACtx* pCtx = pTask->taskCtx;
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);

View File

@ -1627,23 +1627,41 @@ int32_t ctgAddFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fet
return TSDB_CODE_SUCCESS;
}
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch) {
int32_t ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch, SName** ppName) {
STablesReq* pReq = (STablesReq*)taosArrayGet(pNames, pFetch->dbIdx);
return (SName*)taosArrayGet(pReq->pTables, pFetch->tbIdx);
if (NULL == pReq) {
qError("fail to get the %dth tb in pTables, tbNum:%d", pFetch->tbIdx, (int32_t)taosArrayGetSize(pReq->pTables));
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
*ppName = (SName*)taosArrayGet(pReq->pTables, pFetch->tbIdx);
if (NULL == *ppName) {
qError("fail to get the %dth tb in pTables, tbNum:%d", pFetch->tbIdx, (int32_t)taosArrayGetSize(pReq->pTables));
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
return TSDB_CODE_SUCCESS;
}
static void* ctgCloneDbVgroup(void* pSrc) { return taosArrayDup((const SArray*)pSrc, NULL); }
static void ctgFreeDbVgroup(void* p) { taosArrayDestroy((SArray*)((SMetaRes*)p)->pRes); }
void* ctgCloneDbCfgInfo(void* pSrc) {
int32_t ctgCloneDbCfgInfo(void* pSrc, SDbCfgInfo** ppDst) {
SDbCfgInfo* pDst = taosMemoryMalloc(sizeof(SDbCfgInfo));
if (NULL == pDst) {
return NULL;
return terrno;
}
memcpy(pDst, pSrc, sizeof(SDbCfgInfo));
TAOS_MEMCPY(pDst, pSrc, sizeof(SDbCfgInfo));
pDst->pRetensions = taosArrayDup(((SDbCfgInfo *)pSrc)->pRetensions, NULL);
return pDst;
if (NULL == pDst->pRetensions) {
taosMemoryFree(pDst);
return terrno;
}
*ppDst = pDst;
return TSDB_CODE_SUCCESS;
}
static void ctgFreeDbCfgInfo(void* p) {

View File

@ -77,10 +77,10 @@ void schtInitLogFile() {
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
qDebugFlag = 159;
strcpy(tsLogDir, TD_LOG_DIR_PATH);
TAOS_STRCPY(tsLogDir, TD_LOG_DIR_PATH);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
(void)printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
@ -628,7 +628,7 @@ void schtFreeQueryJob(int32_t freeThread) {
schedulerFreeJob(&job, 0);
if (freeThread) {
if (++freeNum % schtTestPrintNum == 0) {
printf("FreeNum:%d\n", freeNum);
(void)printf("FreeNum:%d\n", freeNum);
}
}
}
@ -667,7 +667,7 @@ void *schtRunJobThread(void *aa) {
SQueryNodeLoad load = {0};
load.addr.epSet.numOfEps = 1;
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
TAOS_STRCPY(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
load.addr.epSet.eps[0].port = 6031;
if (NULL == taosArrayPush(qnodeList, &load)) {
assert(0);
@ -800,7 +800,7 @@ void *schtRunJobThread(void *aa) {
schtFreeQueryDag(dag);
if (++jobFinished % schtTestPrintNum == 0) {
printf("jobFinished:%d\n", jobFinished);
(void)printf("jobFinished:%d\n", jobFinished);
}
++schtQueryId;
@ -834,9 +834,9 @@ TEST(queryTest, normalCase) {
SQueryNodeLoad load = {0};
load.addr.epSet.numOfEps = 1;
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
TAOS_STRCPY(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
load.addr.epSet.eps[0].port = 6031;
taosArrayPush(qnodeList, &load);
assert(taosArrayPush(qnodeList, &load) != NULL);
int32_t code = schedulerInit();
ASSERT_EQ(code, 0);
@ -872,7 +872,7 @@ TEST(queryTest, normalCase) {
SDataBuf msg = {0};
void *rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
assert(0 == schtBuildQueryRspMsg(&msg.len, &rmsg));
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
@ -888,7 +888,7 @@ TEST(queryTest, normalCase) {
if (JOB_TASK_STATUS_EXEC == task->status) {
SDataBuf msg = {0};
void *rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
assert(0 == schtBuildQueryRspMsg(&msg.len, &rmsg));
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
@ -909,10 +909,10 @@ TEST(queryTest, normalCase) {
}
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
assert(0 == taosThreadAttrInit(&thattr));
TdThread thread1;
taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
assert(0 == taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job));
void *data = NULL;
req.syncReq = true;
@ -926,13 +926,13 @@ TEST(queryTest, normalCase) {
ASSERT_EQ(pRsp->numOfRows, 10);
taosMemoryFreeClear(data);
schReleaseJob(job);
(void)schReleaseJob(job);
schedulerDestroy();
schedulerFreeJob(&job, 0);
taosThreadJoin(thread1, NULL);
(void)taosThreadJoin(thread1, NULL);
}
TEST(queryTest, readyFirstCase) {
@ -948,9 +948,9 @@ TEST(queryTest, readyFirstCase) {
SQueryNodeLoad load = {0};
load.addr.epSet.numOfEps = 1;
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
TAOS_STRCPY(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
load.addr.epSet.eps[0].port = 6031;
taosArrayPush(qnodeList, &load);
assert(NULL != taosArrayPush(qnodeList, &load));
int32_t code = schedulerInit();
ASSERT_EQ(code, 0);
@ -985,7 +985,7 @@ TEST(queryTest, readyFirstCase) {
SDataBuf msg = {0};
void *rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
assert(0 == schtBuildQueryRspMsg(&msg.len, &rmsg));
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
@ -1002,7 +1002,7 @@ TEST(queryTest, readyFirstCase) {
if (JOB_TASK_STATUS_EXEC == task->status) {
SDataBuf msg = {0};
void *rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
assert(0 == schtBuildQueryRspMsg(&msg.len, &rmsg));
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
@ -1023,10 +1023,10 @@ TEST(queryTest, readyFirstCase) {
}
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
assert(0 == taosThreadAttrInit(&thattr));
TdThread thread1;
taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
assert(0 == taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job));
void *data = NULL;
req.syncReq = true;
@ -1039,13 +1039,13 @@ TEST(queryTest, readyFirstCase) {
ASSERT_EQ(pRsp->numOfRows, 10);
taosMemoryFreeClear(data);
schReleaseJob(job);
(void)schReleaseJob(job);
schedulerDestroy();
schedulerFreeJob(&job, 0);
taosThreadJoin(thread1, NULL);
(void)taosThreadJoin(thread1, NULL);
}
TEST(queryTest, flowCtrlCase) {
@ -1065,9 +1065,9 @@ TEST(queryTest, flowCtrlCase) {
SQueryNodeLoad load = {0};
load.addr.epSet.numOfEps = 1;
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
TAOS_STRCPY(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
load.addr.epSet.eps[0].port = 6031;
taosArrayPush(qnodeList, &load);
assert(NULL != taosArrayPush(qnodeList, &load));
int32_t code = schedulerInit();
ASSERT_EQ(code, 0);
@ -1078,7 +1078,7 @@ TEST(queryTest, flowCtrlCase) {
schtSetExecNode();
schtSetAsyncSendMsgToServer();
initTaskQueue();
assert(0 == initTaskQueue());
int32_t queryDone = 0;
SRequestConnInfo conn = {0};
@ -1106,7 +1106,7 @@ TEST(queryTest, flowCtrlCase) {
if (JOB_TASK_STATUS_EXEC == task->status && 0 != task->lastMsgType) {
SDataBuf msg = {0};
void *rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
assert(0 == schtBuildQueryRspMsg(&msg.len, &rmsg));
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
@ -1120,10 +1120,10 @@ TEST(queryTest, flowCtrlCase) {
}
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
assert(0 == taosThreadAttrInit(&thattr));
TdThread thread1;
taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
assert(0 == taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job));
void *data = NULL;
req.syncReq = true;
@ -1136,13 +1136,13 @@ TEST(queryTest, flowCtrlCase) {
ASSERT_EQ(pRsp->numOfRows, 10);
taosMemoryFreeClear(data);
schReleaseJob(job);
(void)schReleaseJob(job);
schedulerDestroy();
schedulerFreeJob(&job, 0);
taosThreadJoin(thread1, NULL);
(void)taosThreadJoin(thread1, NULL);
}
TEST(insertTest, normalCase) {
@ -1158,9 +1158,9 @@ TEST(insertTest, normalCase) {
SQueryNodeLoad load = {0};
load.addr.epSet.numOfEps = 1;
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
TAOS_STRCPY(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
load.addr.epSet.eps[0].port = 6031;
taosArrayPush(qnodeList, &load);
assert(NULL != taosArrayPush(qnodeList, &load));
int32_t code = schedulerInit();
ASSERT_EQ(code, 0);
@ -1171,12 +1171,12 @@ TEST(insertTest, normalCase) {
schtSetAsyncSendMsgToServer();
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
assert(0 == taosThreadAttrInit(&thattr));
schtJobDone = false;
TdThread thread1;
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
assert(0 == taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId));
int32_t queryDone = 0;
SRequestConnInfo conn = {0};
@ -1204,17 +1204,17 @@ TEST(insertTest, normalCase) {
schedulerDestroy();
taosThreadJoin(thread1, NULL);
(void)taosThreadJoin(thread1, NULL);
}
TEST(multiThread, forceFree) {
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
assert(0 == taosThreadAttrInit(&thattr));
TdThread thread1, thread2, thread3;
taosThreadCreate(&(thread1), &thattr, schtRunJobThread, NULL);
assert(0 == taosThreadCreate(&(thread1), &thattr, schtRunJobThread, NULL));
// taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL);
taosThreadCreate(&(thread3), &thattr, schtFetchRspThread, NULL);
assert(0 == taosThreadCreate(&(thread3), &thattr, schtFetchRspThread, NULL));
while (true) {
if (schtTestDeadLoop) {
@ -1231,7 +1231,7 @@ TEST(multiThread, forceFree) {
TEST(otherTest, otherCase) {
// excpet test
schReleaseJob(0);
(void)schReleaseJob(0);
schFreeRpcCtx(NULL);
char* ep = NULL;