Merge pull request #12731 from taosdata/feature/async

feat: add catalog async api
This commit is contained in:
dapan1121 2022-05-23 12:52:11 +08:00 committed by GitHub
commit 403b2ba1c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 4338 additions and 2534 deletions

View File

@ -46,24 +46,34 @@ typedef enum {
AUTH_TYPE_OTHER,
} AUTH_TYPE;
typedef struct SUserAuthInfo {
char user[TSDB_USER_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
AUTH_TYPE type;
} SUserAuthInfo;
typedef struct SCatalogReq {
SArray *pTableName; // element is SNAME
SArray *pUdf; // udf name
SArray *pTableMeta; // element is SNAME
SArray *pDbVgroup; // element is db full name
SArray *pTableHash; // element is SNAME
SArray *pUdf; // element is udf name
SArray *pDbCfg; // element is db full name
SArray *pIndex; // element is index name
SArray *pUser; // element is SUserAuthInfo
bool qNodeRequired; // valid qnode
} SCatalogReq;
typedef struct SMetaData {
SArray *pTableMeta; // STableMeta array
SArray *pVgroupInfo; // SVgroupInfo list
SArray *pUdfList; // udf info list
SArray *pQnodeList; // qnode list, SArray<SQueryNodeAddr>
SArray *pTableMeta; // SArray<STableMeta>
SArray *pDbVgroup; // SArray<SArray<SVgroupInfo>*>
SArray *pTableHash; // SArray<SVgroupInfo>
SArray *pUdfList; // SArray<SFuncInfo>
SArray *pDbCfg; // SArray<SDbCfgInfo>
SArray *pIndex; // SArray<SIndexInfo>
SArray *pUser; // SArray<bool>
SArray *pQnodeList; // SArray<SQueryNodeAddr>
} SMetaData;
typedef struct STbSVersion {
char* tbFName;
int32_t sver;
} STbSVersion;
typedef struct SCatalogCfg {
uint32_t maxTblCacheNum;
uint32_t maxDBCacheNum;
@ -88,6 +98,11 @@ typedef struct SDbVgVersion {
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
} SDbVgVersion;
typedef struct STbSVersion {
char* tbFName;
int32_t sver;
} STbSVersion;
typedef struct SUserAuthVersion {
char user[TSDB_USER_LEN];
int32_t version;
@ -96,6 +111,8 @@ typedef struct SUserAuthVersion {
typedef SDbCfgRsp SDbCfgInfo;
typedef SUserIndexRsp SIndexInfo;
typedef void (*catalogCallback)(SMetaData* pResult, void* param, int32_t code);
int32_t catalogInit(SCatalogCfg *cfg);
/**
@ -131,7 +148,7 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t d
int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId);
int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName);
int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName);
int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid);
@ -241,9 +258,9 @@ int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg);
int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo** pInfo);
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo);
int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass);

View File

@ -121,7 +121,7 @@ struct SAppInstInfo {
SCorEpSet mgmtEp;
SInstanceSummary summary;
SList* pConnList; // STscObj linked list
int64_t clusterId;
uint64_t clusterId;
void* pTransporter;
SAppHbMgr* pAppHbMgr;
};
@ -286,6 +286,8 @@ void initMsgHandleFp();
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType);
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);

View File

@ -565,10 +565,32 @@ const char *taos_get_server_info(TAOS *taos) {
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
if (taos == NULL || sql == NULL) {
// todo directly call fp
fp(param, NULL, TSDB_CODE_INVALID_PARA);
return;
}
taos_query_l(taos, sql, (int32_t)strlen(sql));
SRequestObj* pRequest = NULL;
int32_t retryNum = 0;
int32_t code = 0;
size_t sqlLen = strlen(sql);
while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
pRequest = launchQuery(taos, sql, sqlLen);
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break;
}
code = refreshMeta(taos, pRequest);
if (code) {
pRequest->code = code;
break;
}
destroyRequest(pRequest);
}
fp(param, pRequest, code);
}
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {

View File

@ -125,10 +125,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
struct SCatalog* pCatalog = NULL;
if (usedbRsp.vgVersion >= 0) {
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
int32_t code1 = catalogGetHandle(clusterId, &pCatalog);
if (code1 != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
tstrerror(code1));
tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, tstrerror(code1));
} else {
catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
}
@ -158,7 +158,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
taosMemoryFreeClear(output.dbVgroup);
tscError("failed to build use db output since %s", terrstr());
tscError("0x%" PRIx64" failed to build use db output since %s", pRequest->requestId, terrstr());
} else if (output.dbVgroup) {
struct SCatalog* pCatalog = NULL;

View File

@ -58,6 +58,17 @@ enum {
CTG_ACT_MAX
};
typedef enum {
CTG_TASK_GET_QNODE = 0,
CTG_TASK_GET_DB_VGROUP,
CTG_TASK_GET_DB_CFG,
CTG_TASK_GET_TB_META,
CTG_TASK_GET_TB_HASH,
CTG_TASK_GET_INDEX,
CTG_TASK_GET_UDF,
CTG_TASK_GET_USER,
} CTG_TASK_TYPE;
typedef struct SCtgDebug {
bool lockEnable;
bool cacheEnable;
@ -66,6 +77,43 @@ typedef struct SCtgDebug {
uint32_t showCachePeriodSec;
} SCtgDebug;
typedef struct SCtgTbCacheInfo {
bool inCache;
uint64_t dbId;
uint64_t suid;
int32_t tbType;
} SCtgTbCacheInfo;
typedef struct SCtgTbMetaCtx {
SCtgTbCacheInfo tbInfo;
SName* pName;
int32_t flag;
} SCtgTbMetaCtx;
typedef struct SCtgDbVgCtx {
char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDbVgCtx;
typedef struct SCtgDbCfgCtx {
char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDbCfgCtx;
typedef struct SCtgTbHashCtx {
char dbFName[TSDB_DB_FNAME_LEN];
SName* pName;
} SCtgTbHashCtx;
typedef struct SCtgIndexCtx {
char indexFName[TSDB_INDEX_FNAME_LEN];
} SCtgIndexCtx;
typedef struct SCtgUdfCtx {
char udfName[TSDB_FUNC_NAME_LEN];
} SCtgUdfCtx;
typedef struct SCtgUserCtx {
SUserAuthInfo user;
} SCtgUserCtx;
typedef struct SCtgTbMetaCache {
SRWLatch stbLock;
@ -113,6 +161,55 @@ typedef struct SCatalog {
SCtgRentMgmt stbRent;
} SCatalog;
typedef struct SCtgJob {
int64_t refId;
SArray* pTasks;
int32_t taskDone;
SMetaData jobRes;
int32_t rspCode;
uint64_t queryId;
SCatalog* pCtg;
void* pTrans;
const SEpSet* pMgmtEps;
void* userParam;
catalogCallback userFp;
int32_t tbMetaNum;
int32_t tbHashNum;
int32_t dbVgNum;
int32_t udfNum;
int32_t qnodeNum;
int32_t dbCfgNum;
int32_t indexNum;
int32_t userNum;
} SCtgJob;
typedef struct SCtgMsgCtx {
int32_t reqType;
void* lastOut;
void* out;
char* target;
} SCtgMsgCtx;
typedef struct SCtgTask {
CTG_TASK_TYPE type;
int32_t taskId;
SCtgJob *pJob;
void* taskCtx;
SCtgMsgCtx msgCtx;
void* res;
} SCtgTask;
typedef int32_t (*ctgLanchTaskFp)(SCtgTask*);
typedef int32_t (*ctgHandleTaskMsgRspFp)(SCtgTask*, int32_t, const SDataBuf *, int32_t);
typedef int32_t (*ctgDumpTaskResFp)(SCtgTask*);
typedef struct SCtgAsyncFps {
ctgLanchTaskFp launchFp;
ctgHandleTaskMsgRspFp handleRspFp;
ctgDumpTaskResFp dumpResFp;
} SCtgAsyncFps;
typedef struct SCtgApiStat {
#ifdef WINDOWS
@ -214,6 +311,7 @@ typedef struct SCtgQueue {
typedef struct SCatalogMgmt {
bool exit;
int32_t jobPool;
SRWLatch lock;
SCtgQueue queue;
TdThread updateThread;
@ -327,10 +425,80 @@ typedef struct SCtgAction {
#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); CTG_RET(__code); } while (0)
#define CTG_API_ENTER() do { CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &gCtgMgmt.lock); if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0)
#define CTG_PARAMS SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
#define CTG_PARAMS_LIST() pCtg, pTrans, pMgmtEps
extern void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p);
extern void ctgdShowClusterCache(SCatalog* pCtg);
extern int32_t ctgdShowCacheInfo(void);
void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p);
void ctgdShowClusterCache(SCatalog* pCtg);
int32_t ctgdShowCacheInfo(void);
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq);
int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgActUpdateVg(SCtgMetaAction *action);
int32_t ctgActUpdateTb(SCtgMetaAction *action);
int32_t ctgActRemoveDB(SCtgMetaAction *action);
int32_t ctgActRemoveStb(SCtgMetaAction *action);
int32_t ctgActRemoveTb(SCtgMetaAction *action);
int32_t ctgActUpdateUser(SCtgMetaAction *action);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache);
void ctgReleaseVgInfo(SCtgDBCache *dbCache);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist);
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tbType, uint64_t *suid, char *stbName);
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
int32_t ctgPutRmDBToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
int32_t ctgPutRmStbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);
int32_t ctgPutRmTbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq);
int32_t ctgPutUpdateVgToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq);
int32_t ctgPutUpdateTbToQueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq);
int32_t ctgPutUpdateUserToQueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq);
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type);
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size);
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size);
int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq);
int32_t ctgStartUpdateThread();
int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask);
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target);
int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask);
int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnode(CTG_PARAMS, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param);
int32_t ctgLaunchJob(SCtgJob *pJob);
int32_t ctgMakeAsyncRes(SCtgJob *pJob);
int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst);
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput);
int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList);
void ctgFreeJob(void* job);
void ctgFreeHandle(SCatalog* pCtg);
void ctgFreeVgInfo(SDBVgInfo *vgInfo);
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup);
void ctgResetTbMetaTask(SCtgTask* pTask);
void ctgFreeDbCache(SCtgDBCache *dbCache);
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);
int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2);
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2);
int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2);
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
extern SCatalogMgmt gCtgMgmt;
extern SCtgDebug gCTGDebug;
extern SCtgAsyncFps gCtgAsyncFps[];
#ifdef __cplusplus
}

View File

@ -0,0 +1,35 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_CATALOG_REMOTE_H_
#define _TD_CATALOG_REMOTE_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SCtgTaskCallbackParam {
uint64_t queryId;
int64_t refId;
uint64_t taskId;
int32_t reqType;
} SCtgTaskCallbackParam;
#ifdef __cplusplus
}
#endif
#endif /*_TD_CATALOG_REMOTE_H_*/

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,577 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "trpc.h"
#include "query.h"
#include "tname.h"
#include "catalogInt.h"
#include "systable.h"
#include "ctgRemote.h"
#include "tref.h"
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) {
int32_t code = 0;
switch (reqType) {
case TDMT_MND_QNODE_LIST: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for qnode list, error:%s", tstrerror(rspCode));
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process qnode list rsp failed, error:%s", tstrerror(rspCode));
CTG_ERR_RET(code);
}
qDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out));
break;
}
case TDMT_MND_USE_DB: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for use db, error:%s, dbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process use db rsp failed, error:%s, dbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
qDebug("Got db vgInfo from mnode, dbFName:%s", target);
break;
}
case TDMT_MND_GET_DB_CFG: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for get db cfg, error:%s, db:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process get db cfg rsp failed, error:%s, db:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
qDebug("Got db cfg from mnode, dbFName:%s", target);
break;
}
case TDMT_MND_GET_INDEX: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for get index, error:%s, indexName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process get index rsp failed, error:%s, indexName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
qDebug("Got index from mnode, indexName:%s", target);
break;
}
case TDMT_MND_RETRIEVE_FUNC: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process get udf rsp failed, error:%s, funcName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
qDebug("Got udf from mnode, funcName:%s", target);
break;
}
case TDMT_MND_GET_USER_AUTH: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for get user auth, error:%s, user:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process get user auth rsp failed, error:%s, user:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
qDebug("Got user auth from mnode, user:%s", target);
break;
}
case TDMT_MND_TABLE_META: {
if (TSDB_CODE_SUCCESS != rspCode) {
if (CTG_TABLE_NOT_EXIST(rspCode)) {
SET_META_TYPE_NULL(((STableMetaOutput*)out)->metaType);
qDebug("stablemeta not exist in mnode, tbFName:%s", target);
return TSDB_CODE_SUCCESS;
}
qError("error rsp for stablemeta from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process mnode stablemeta rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
qDebug("Got table meta from mnode, tbFName:%s", target);
break;
}
case TDMT_VND_TABLE_META: {
if (TSDB_CODE_SUCCESS != rspCode) {
if (CTG_TABLE_NOT_EXIST(rspCode)) {
SET_META_TYPE_NULL(((STableMetaOutput*)out)->metaType);
qDebug("tablemeta not exist in vnode, tbFName:%s", target);
return TSDB_CODE_SUCCESS;
}
qError("error rsp for table meta from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process vnode tablemeta rsp failed, code:%s, tbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
qDebug("Got table meta from vnode, tbFName:%s", target);
break;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgHandleMsgCallback(void *param, const SDataBuf *pMsg, int32_t rspCode) {
SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param;
int32_t code = 0;
CTG_API_ENTER();
SCtgJob* pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId);
if (NULL == pJob) {
qDebug("job refId %" PRIx64 " already dropped", cbParam->refId);
goto _return;
}
SCtgTask *pTask = taosArrayGet(pJob->pTasks, cbParam->taskId);
qDebug("QID:%" PRIx64 " task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1));
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode));
_return:
if (pJob) {
taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
}
taosMemoryFree(param);
CTG_API_LEAVE(code);
}
int32_t ctgMakeMsgSendInfo(SCtgTask* pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
int32_t code = 0;
SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == msgSendInfo) {
qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
CTG_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCtgTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam));
if (NULL == param) {
qError("calloc %d failed", (int32_t)sizeof(SCtgTaskCallbackParam));
CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
param->reqType = msgType;
param->queryId = pTask->pJob->queryId;
param->refId = pTask->pJob->refId;
param->taskId = pTask->taskId;
msgSendInfo->param = param;
msgSendInfo->fp = ctgHandleMsgCallback;
*pMsgSendInfo = msgSendInfo;
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFree(param);
taosMemoryFree(msgSendInfo);
CTG_RET(code);
}
int32_t ctgAsyncSendMsg(CTG_PARAMS, SCtgTask* pTask, int32_t msgType, void *msg, uint32_t msgSize) {
int32_t code = 0;
SMsgSendInfo *pMsgSendInfo = NULL;
CTG_ERR_JRET(ctgMakeMsgSendInfo(pTask, msgType, &pMsgSendInfo));
pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize;
pMsgSendInfo->msgInfo.handle = NULL;
pMsgSendInfo->msgType = msgType;
int64_t transporterId = 0;
code = asyncSendMsgToServer(pTrans, (SEpSet*)pMgmtEps, &transporterId, pMsgSendInfo);
if (code) {
ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code));
CTG_ERR_JRET(code);
}
ctgDebug("req msg sent, reqId:%" PRIx64 ", msg type:%d, %s", pTask->pJob->queryId, msgType, TMSG_INFO(msgType));
return TSDB_CODE_SUCCESS;
_return:
if (pMsgSendInfo) {
taosMemoryFreeClear(pMsgSendInfo->param);
taosMemoryFreeClear(pMsgSendInfo);
}
CTG_RET(code);
}
int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_QNODE_LIST;
ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pMgmtEps->inUse);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen);
if (code) {
ctgError("Build qnode list msg failed, error:%s", tstrerror(code));
CTG_ERR_RET(code);
}
if (pTask) {
void* pOut = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, NULL));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_USE_DB;
ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](input, &msg, 0, &msgLen);
if (code) {
ctgError("Build use db msg failed, code:%x, db:%s", code, input->db);
CTG_ERR_RET(code);
}
if (pTask) {
void* pOut = taosMemoryCalloc(1, sizeof(SUseDbOutput));
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, input->db));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, input->db));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_DB_CFG;
ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)dbFName, &msg, 0, &msgLen);
if (code) {
ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName);
CTG_ERR_RET(code);
}
if (pTask) {
void* pOut = taosMemoryCalloc(1, sizeof(SDbCfgInfo));
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)dbFName));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_GET_DB_CFG,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)dbFName));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_INDEX;
ctgDebug("try to get index from mnode, indexName:%s", indexName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)indexName, &msg, 0, &msgLen);
if (code) {
ctgError("Build get index msg failed, code:%x, db:%s", code, indexName);
CTG_ERR_RET(code);
}
if (pTask) {
void* pOut = taosMemoryCalloc(1, sizeof(SIndexInfo));
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)indexName));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)indexName));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_RETRIEVE_FUNC;
ctgDebug("try to get udf info from mnode, funcName:%s", funcName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)funcName, &msg, 0, &msgLen);
if (code) {
ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName);
CTG_ERR_RET(code);
}
if (pTask) {
void* pOut = taosMemoryCalloc(1, sizeof(SFuncInfo));
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)funcName));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)funcName));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_USER_AUTH;
ctgDebug("try to get user auth from mnode, user:%s", user);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)user, &msg, 0, &msgLen);
if (code) {
ctgError("Build get user auth msg failed, code:%x, db:%s", code, user);
CTG_ERR_RET(code);
}
if (pTask) {
void* pOut = taosMemoryCalloc(1, sizeof(SGetUserAuthRsp));
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)user));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)user));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask) {
SBuildTableMetaInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
char *msg = NULL;
SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_TABLE_META;
char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, tbName);
ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen);
if (code) {
ctgError("Build mnode stablemeta msg failed, code:%x", code);
CTG_ERR_RET(code);
}
if (pTask) {
void* pOut = taosMemoryCalloc(1, sizeof(STableMetaOutput));
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTbMetaFromMnode(CTG_PARAMS, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask) {
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
return ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), dbFName, (char *)pTableName->tname, out, pTask);
}
int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask) {
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
int32_t reqType = TDMT_VND_TABLE_META;
char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, pTableName->tname);
ctgDebug("try to get table meta from vnode, vgId:%d, tbFName:%s", vgroupInfo->vgId, tbFName);
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)};
char *msg = NULL;
int32_t msgLen = 0;
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen);
if (code) {
ctgError("Build vnode tablemeta msg failed, code:%x, tbFName:%s", code, tbFName);
CTG_ERR_RET(code);
}
if (pTask) {
void* pOut = taosMemoryCalloc(1, sizeof(STableMetaOutput));
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
return TSDB_CODE_SUCCESS;
}

View File

@ -0,0 +1,577 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "trpc.h"
#include "query.h"
#include "tname.h"
#include "catalogInt.h"
#include "systable.h"
void ctgFreeSMetaData(SMetaData* pData) {
taosArrayDestroy(pData->pTableMeta);
pData->pTableMeta = NULL;
for (int32_t i = 0; i < taosArrayGetSize(pData->pDbVgroup); ++i) {
SArray** pArray = taosArrayGet(pData->pDbVgroup, i);
taosArrayDestroy(*pArray);
}
taosArrayDestroy(pData->pDbVgroup);
pData->pDbVgroup = NULL;
taosArrayDestroy(pData->pTableHash);
pData->pTableHash = NULL;
taosArrayDestroy(pData->pUdfList);
pData->pUdfList = NULL;
for (int32_t i = 0; i < taosArrayGetSize(pData->pDbCfg); ++i) {
SDbCfgInfo* pInfo = taosArrayGet(pData->pDbCfg, i);
taosArrayDestroy(pInfo->pRetensions);
}
taosArrayDestroy(pData->pDbCfg);
pData->pDbCfg = NULL;
taosArrayDestroy(pData->pIndex);
pData->pIndex = NULL;
taosArrayDestroy(pData->pUser);
pData->pUser = NULL;
taosArrayDestroy(pData->pQnodeList);
pData->pQnodeList = NULL;
}
void ctgFreeSCtgUserAuth(SCtgUserAuth *userCache) {
taosHashCleanup(userCache->createdDbs);
taosHashCleanup(userCache->readDbs);
taosHashCleanup(userCache->writeDbs);
}
void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
if (NULL == mgmt->slots) {
return;
}
for (int32_t i = 0; i < mgmt->slotNum; ++i) {
SCtgRentSlot *slot = &mgmt->slots[i];
if (slot->meta) {
taosArrayDestroy(slot->meta);
slot->meta = NULL;
}
}
taosMemoryFreeClear(mgmt->slots);
}
void ctgFreeTbMetaCache(SCtgTbMetaCache *cache) {
CTG_LOCK(CTG_WRITE, &cache->stbLock);
if (cache->stbCache) {
int32_t stblNum = taosHashGetSize(cache->stbCache);
taosHashCleanup(cache->stbCache);
cache->stbCache = NULL;
CTG_CACHE_STAT_SUB(stblNum, stblNum);
}
CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
CTG_LOCK(CTG_WRITE, &cache->metaLock);
if (cache->metaCache) {
int32_t tblNum = taosHashGetSize(cache->metaCache);
taosHashCleanup(cache->metaCache);
cache->metaCache = NULL;
CTG_CACHE_STAT_SUB(tblNum, tblNum);
}
CTG_UNLOCK(CTG_WRITE, &cache->metaLock);
}
void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
if (NULL == vgInfo) {
return;
}
if (vgInfo->vgHash) {
taosHashCleanup(vgInfo->vgHash);
vgInfo->vgHash = NULL;
}
taosMemoryFreeClear(vgInfo);
}
void ctgFreeDbCache(SCtgDBCache *dbCache) {
if (NULL == dbCache) {
return;
}
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
ctgFreeVgInfo (dbCache->vgInfo);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
ctgFreeTbMetaCache(&dbCache->tbCache);
}
void ctgFreeHandle(SCatalog* pCtg) {
ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent);
if (pCtg->dbCache) {
int32_t dbNum = taosHashGetSize(pCtg->dbCache);
void *pIter = taosHashIterate(pCtg->dbCache, NULL);
while (pIter) {
SCtgDBCache *dbCache = pIter;
atomic_store_8(&dbCache->deleted, 1);
ctgFreeDbCache(dbCache);
pIter = taosHashIterate(pCtg->dbCache, pIter);
}
taosHashCleanup(pCtg->dbCache);
CTG_CACHE_STAT_SUB(dbNum, dbNum);
}
if (pCtg->userCache) {
int32_t userNum = taosHashGetSize(pCtg->userCache);
void *pIter = taosHashIterate(pCtg->userCache, NULL);
while (pIter) {
SCtgUserAuth *userCache = pIter;
ctgFreeSCtgUserAuth(userCache);
pIter = taosHashIterate(pCtg->userCache, pIter);
}
taosHashCleanup(pCtg->userCache);
CTG_CACHE_STAT_SUB(userNum, userNum);
}
taosMemoryFree(pCtg);
}
void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) {
if (NULL == pOutput || NULL == pOutput->dbVgroup) {
return;
}
taosHashCleanup(pOutput->dbVgroup->vgHash);
taosMemoryFreeClear(pOutput->dbVgroup);
taosMemoryFree(pOutput);
}
void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
taosMemoryFreeClear(pCtx->target);
if (NULL == pCtx->out) {
return;
}
switch (pCtx->reqType) {
case TDMT_MND_GET_DB_CFG: {
SDbCfgInfo* pOut = (SDbCfgInfo*)pCtx->out;
taosArrayDestroy(pOut->pRetensions);
taosMemoryFreeClear(pCtx->out);
break;
}
case TDMT_MND_USE_DB:{
SUseDbOutput* pOut = (SUseDbOutput*)pCtx->out;
ctgFreeSUseDbOutput(pOut);
pCtx->out = NULL;
break;
}
case TDMT_MND_GET_INDEX: {
SIndexInfo* pOut = (SIndexInfo*)pCtx->out;
taosMemoryFreeClear(pCtx->out);
break;
}
case TDMT_MND_QNODE_LIST: {
SArray* pOut = (SArray*)pCtx->out;
taosArrayDestroy(pOut);
pCtx->out = NULL;
break;
}
case TDMT_VND_TABLE_META:
case TDMT_MND_TABLE_META: {
STableMetaOutput* pOut = (STableMetaOutput*)pCtx->out;
taosMemoryFree(pOut->tbMeta);
taosMemoryFreeClear(pCtx->out);
break;
}
case TDMT_MND_RETRIEVE_FUNC: {
SFuncInfo* pOut = (SFuncInfo*)pCtx->out;
taosMemoryFree(pOut->pCode);
taosMemoryFree(pOut->pComment);
taosMemoryFreeClear(pCtx->out);
break;
}
case TDMT_MND_GET_USER_AUTH: {
SGetUserAuthRsp* pOut = (SGetUserAuthRsp*)pCtx->out;
taosHashCleanup(pOut->createdDbs);
taosHashCleanup(pOut->readDbs);
taosHashCleanup(pOut->writeDbs);
taosMemoryFreeClear(pCtx->out);
break;
}
default:
qError("invalid reqType %d", pCtx->reqType);
break;
}
}
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput) {
if (NULL == pOutput) {
return;
}
taosMemoryFree(pOutput->tbMeta);
taosMemoryFree(pOutput);
}
void ctgResetTbMetaTask(SCtgTask* pTask) {
SCtgTbMetaCtx* taskCtx = (SCtgTbMetaCtx*)pTask->taskCtx;
memset(&taskCtx->tbInfo, 0, sizeof(taskCtx->tbInfo));
taskCtx->flag = CTG_FLAG_UNKNOWN_STB;
if (pTask->msgCtx.lastOut) {
ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut);
pTask->msgCtx.lastOut = NULL;
}
if (pTask->msgCtx.out) {
ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.out);
pTask->msgCtx.out = NULL;
}
taosMemoryFreeClear(pTask->msgCtx.target);
taosMemoryFreeClear(pTask->res);
}
void ctgFreeTask(SCtgTask* pTask) {
ctgFreeMsgCtx(&pTask->msgCtx);
switch (pTask->type) {
case CTG_TASK_GET_QNODE: {
taosArrayDestroy((SArray*)pTask->res);
pTask->res = NULL;
break;
}
case CTG_TASK_GET_TB_META: {
SCtgTbMetaCtx* taskCtx = (SCtgTbMetaCtx*)pTask->taskCtx;
taosMemoryFreeClear(taskCtx->pName);
if (pTask->msgCtx.lastOut) {
ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut);
pTask->msgCtx.lastOut = NULL;
}
taosMemoryFreeClear(pTask->res);
break;
}
case CTG_TASK_GET_DB_VGROUP: {
taosArrayDestroy((SArray*)pTask->res);
pTask->res = NULL;
break;
}
case CTG_TASK_GET_DB_CFG: {
if (pTask->res) {
taosArrayDestroy(((SDbCfgInfo*)pTask->res)->pRetensions);
taosMemoryFreeClear(pTask->res);
}
break;
}
case CTG_TASK_GET_TB_HASH: {
SCtgTbHashCtx* taskCtx = (SCtgTbHashCtx*)pTask->taskCtx;
taosMemoryFreeClear(taskCtx->pName);
taosMemoryFreeClear(pTask->res);
break;
}
case CTG_TASK_GET_INDEX: {
taosMemoryFreeClear(pTask->taskCtx);
taosMemoryFreeClear(pTask->res);
break;
}
case CTG_TASK_GET_UDF: {
taosMemoryFreeClear(pTask->taskCtx);
taosMemoryFreeClear(pTask->res);
break;
}
case CTG_TASK_GET_USER: {
taosMemoryFreeClear(pTask->taskCtx);
taosMemoryFreeClear(pTask->res);
break;
}
default:
qError("invalid task type %d", pTask->type);
break;
}
}
void ctgFreeTasks(SArray* pArray) {
if (NULL == pArray) {
return;
}
int32_t num = taosArrayGetSize(pArray);
for (int32_t i = 0; i < num; ++i) {
SCtgTask* pTask = taosArrayGet(pArray, i);
ctgFreeTask(pTask);
}
taosArrayDestroy(pArray);
}
void ctgFreeJob(void* job) {
if (NULL == job) {
return;
}
SCtgJob* pJob = (SCtgJob*)job;
int64_t rid = pJob->refId;
uint64_t qid = pJob->queryId;
ctgFreeTasks(pJob->pTasks);
ctgFreeSMetaData(&pJob->jobRes);
taosMemoryFree(job);
qDebug("QID:%" PRIx64 ", job %" PRIx64 " freed", qid, rid);
}
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target) {
ctgFreeMsgCtx(pCtx);
pCtx->reqType = reqType;
pCtx->out = out;
if (target) {
pCtx->target = strdup(target);
if (NULL == pCtx->target) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
} else {
pCtx->target = NULL;
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
switch (hashMethod) {
default:
*fp = MurmurHash3_32;
break;
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList) {
SHashObj *vgroupHash = NULL;
SVgroupInfo *vgInfo = NULL;
SArray *vgList = NULL;
int32_t code = 0;
int32_t vgNum = taosHashGetSize(vgHash);
vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
if (NULL == vgList) {
ctgError("taosArrayInit failed, num:%d", vgNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
void *pIter = taosHashIterate(vgHash, NULL);
while (pIter) {
vgInfo = pIter;
if (NULL == taosArrayPush(vgList, vgInfo)) {
ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
taosHashCancelIterate(vgHash, pIter);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
pIter = taosHashIterate(vgHash, pIter);
vgInfo = NULL;
}
*pList = vgList;
ctgDebug("Got vgList from cache, vgNum:%d", vgNum);
return TSDB_CODE_SUCCESS;
_return:
if (vgList) {
taosArrayDestroy(vgList);
}
CTG_RET(code);
}
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
int32_t code = 0;
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
if (vgNum <= 0) {
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", db, vgNum);
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
}
tableNameHashFp fp = NULL;
SVgroupInfo *vgInfo = NULL;
CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFullName);
uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));
void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
while (pIter) {
vgInfo = pIter;
if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
taosHashCancelIterate(dbInfo->vgHash, pIter);
break;
}
pIter = taosHashIterate(dbInfo->vgHash, pIter);
vgInfo = NULL;
}
if (NULL == vgInfo) {
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db, taosHashGetSize(dbInfo->vgHash));
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
*pVgroup = *vgInfo;
CTG_RET(code);
}
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) {
if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) {
return -1;
} else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) {
return 1;
} else {
return 0;
}
}
int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2) {
if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) {
return -1;
} else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) {
return 1;
} else {
return 0;
}
}
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) {
if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) {
return -1;
} else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) {
return 1;
} else {
return 0;
}
}
int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2) {
if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) {
return -1;
} else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) {
return 1;
} else {
return 0;
}
}
int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
*dst = taosMemoryMalloc(sizeof(SDBVgInfo));
if (NULL == *dst) {
qError("malloc %d failed", (int32_t)sizeof(SDBVgInfo));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
memcpy(*dst, src, sizeof(SDBVgInfo));
size_t hashSize = taosHashGetSize(src->vgHash);
(*dst)->vgHash = taosHashInit(hashSize, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == (*dst)->vgHash) {
qError("taosHashInit %d failed", (int32_t)hashSize);
taosMemoryFreeClear(*dst);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
int32_t *vgId = NULL;
void *pIter = taosHashIterate(src->vgHash, NULL);
while (pIter) {
vgId = taosHashGetKey(pIter, NULL);
if (taosHashPut((*dst)->vgHash, (void *)vgId, sizeof(int32_t), pIter, sizeof(SVgroupInfo))) {
qError("taosHashPut failed, hashSize:%d", (int32_t)hashSize);
taosHashCancelIterate(src->vgHash, pIter);
taosHashCleanup((*dst)->vgHash);
taosMemoryFreeClear(*dst);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
pIter = taosHashIterate(src->vgHash, pIter);
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) {
*pOutput = taosMemoryMalloc(sizeof(STableMetaOutput));
if (NULL == *pOutput) {
qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
memcpy(*pOutput, output, sizeof(STableMetaOutput));
if (output->tbMeta) {
int32_t metaSize = CTG_META_SIZE(output->tbMeta);
(*pOutput)->tbMeta = taosMemoryMalloc(metaSize);
if (NULL == (*pOutput)->tbMeta) {
qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
taosMemoryFreeClear(*pOutput);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
memcpy((*pOutput)->tbMeta, output->tbMeta, metaSize);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -40,10 +40,8 @@
namespace {
extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SName *pTableName, STableMeta **pTableMeta,
bool *inCache, int32_t flag, uint64_t *dbId);
extern "C" int32_t ctgdGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type);
extern "C" int32_t ctgActUpdateTbl(SCtgMetaAction *action);
extern "C" int32_t ctgActUpdateTb(SCtgMetaAction *action);
extern "C" int32_t ctgdEnableDebug(char *option);
extern "C" int32_t ctgdGetStatNum(char *option, void *res);
@ -52,7 +50,7 @@ void ctgTestSetRspCTableMeta();
void ctgTestSetRspSTableMeta();
void ctgTestSetRspMultiSTableMeta();
extern "C" SCatalogMgmt gCtgMgmt;
//extern "C" SCatalogMgmt gCtgMgmt;
enum {
CTGT_RSP_VGINFO = 1,
@ -859,8 +857,12 @@ void *ctgTestGetCtableMetaThread(void *param) {
strcpy(cn.dbname, "db1");
strcpy(cn.tname, ctgTestCTablename);
SCtgTbMetaCtx ctx = {0};
ctx.pName = &cn;
ctx.flag = CTG_FLAG_UNKNOWN_STB;
while (!ctgTestStop) {
code = ctgGetTableMetaFromCache(pCtg, &cn, &tbMeta, &inCache, 0, NULL);
code = ctgReadTbMetaFromCache(pCtg, &ctx, &tbMeta);
if (code || !inCache) {
assert(0);
}
@ -899,7 +901,7 @@ void *ctgTestSetCtableMetaThread(void *param) {
msg->output = output;
action.data = msg;
code = ctgActUpdateTbl(&action);
code = ctgActUpdateTb(&action);
if (code) {
assert(0);
}

View File

@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <executorimpl.h>
#include "filter.h"
#include "function.h"
#include "functionMgt.h"

View File

@ -66,22 +66,18 @@ static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
}
static int32_t getUdfInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc) {
SFuncInfo* pInfo = NULL;
int32_t code = catalogGetUdfInfo(pParam->pCtg, pParam->pRpc, pParam->pMgmtEps, pFunc->functionName, &pInfo);
SFuncInfo funcInfo = {0};
int32_t code = catalogGetUdfInfo(pParam->pCtg, pParam->pRpc, pParam->pMgmtEps, pFunc->functionName, &funcInfo);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (NULL == pInfo) {
snprintf(pParam->pErrBuf, pParam->errBufLen, "Invalid function name: %s", pFunc->functionName);
return TSDB_CODE_FUNC_INVALID_FUNTION;
}
pFunc->funcType = FUNCTION_TYPE_UDF;
pFunc->funcId = TSDB_FUNC_TYPE_AGGREGATE == pInfo->funcType ? FUNC_AGGREGATE_UDF_ID : FUNC_SCALAR_UDF_ID;
pFunc->node.resType.type = pInfo->outputType;
pFunc->node.resType.bytes = pInfo->outputLen;
pFunc->udfBufSize = pInfo->bufSize;
tFreeSFuncInfo(pInfo);
taosMemoryFree(pInfo);
pFunc->funcId = TSDB_FUNC_TYPE_AGGREGATE == funcInfo.funcType ? FUNC_AGGREGATE_UDF_ID : FUNC_SCALAR_UDF_ID;
pFunc->node.resType.type = funcInfo.outputType;
pFunc->node.resType.bytes = funcInfo.outputLen;
pFunc->udfBufSize = funcInfo.bufSize;
tFreeSFuncInfo(&funcInfo);
return TSDB_CODE_SUCCESS;
}

View File

@ -528,20 +528,18 @@ int32_t qwDropTask(QW_FPARAMS_DEF) {
}
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
qTaskInfo_t *taskHandle = &ctx->taskHandle;
qTaskInfo_t taskHandle = ctx->taskHandle;
if (TASK_TYPE_TEMP == ctx->taskType) {
if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) {
if (ctx->explain) {
SExplainExecInfo *execInfo = NULL;
int32_t resNum = 0;
QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo));
QW_ERR_RET(qGetExplainExecInfo(taskHandle, &resNum, &execInfo));
SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
connInfo.ahandle = NULL;
QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum));
}
qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
}
return TSDB_CODE_SUCCESS;
@ -554,16 +552,21 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
uint64_t useconds = 0;
int32_t i = 0;
int32_t execNum = 0;
qTaskInfo_t *taskHandle = &ctx->taskHandle;
qTaskInfo_t taskHandle = ctx->taskHandle;
DataSinkHandle sinkHandle = ctx->sinkHandle;
while (true) {
QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
code = qExecTask(*taskHandle, &pRes, &useconds);
if (code) {
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code);
pRes = NULL;
// if *taskHandle is NULL, it's killed right now
if (taskHandle) {
code = qExecTask(taskHandle, &pRes, &useconds);
if (code) {
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code);
}
}
++execNum;

View File

@ -244,6 +244,7 @@ class TDTestCase:
if user is None:
user = self.root_user
with taos_connect(user=user.name, passwd=user.passwd) as use:
time.sleep(2)
use.query("use db")
use.query("show tables")
if check_priv == PRIVILEGES_ALL:
@ -398,6 +399,7 @@ class TDTestCase:
tdLog.printNoPrefix("==========step 1.18: revoke all from all = nothing")
self.revoke_user(user=self.users[2], priv=PRIVILEGES_ALL)
time.sleep(3)
self.__user_check(user=self.users[2], check_priv=None)
def __grant_err(self):