feat: show create table
This commit is contained in:
parent
b084be71f6
commit
0f148c6f47
|
@ -63,6 +63,8 @@ int32_t tNameSetAcctId(SName* dst, int32_t acctId);
|
|||
|
||||
bool tNameDBNameEqual(SName* left, SName* right);
|
||||
|
||||
bool tNameTbNameEqual(SName* left, SName* right);
|
||||
|
||||
typedef struct {
|
||||
// input
|
||||
SArray* tags; // element is SSmlKv
|
||||
|
|
|
@ -209,6 +209,8 @@ SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* nam
|
|||
void destroyQueryExecRes(SQueryExecRes* pRes);
|
||||
int32_t dataConverToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len);
|
||||
char* parseTagDatatoJson(void* p);
|
||||
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst);
|
||||
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst);
|
||||
|
||||
extern int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallocFp)(int32_t));
|
||||
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize);
|
||||
|
|
|
@ -46,7 +46,7 @@ extern "C" {
|
|||
|
||||
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
|
||||
#define HEARTBEAT_INTERVAL 1500 // ms
|
||||
#define SYNC_ON_TOP_OF_ASYNC 0
|
||||
#define SYNC_ON_TOP_OF_ASYNC 1
|
||||
|
||||
enum {
|
||||
RES_TYPE__QUERY = 1,
|
||||
|
|
|
@ -240,6 +240,15 @@ bool tNameDBNameEqual(SName* left, SName* right) {
|
|||
return (0 == strcmp(left->dbname, right->dbname));
|
||||
}
|
||||
|
||||
bool tNameTbNameEqual(SName* left, SName* right) {
|
||||
bool equal = tNameDBNameEqual(left, right);
|
||||
if (equal) {
|
||||
return (0 == strcmp(left->tname, right->tname));
|
||||
}
|
||||
|
||||
return equal;
|
||||
}
|
||||
|
||||
int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
|
||||
assert(dst != NULL && str != NULL && strlen(str) > 0);
|
||||
|
||||
|
|
|
@ -71,11 +71,18 @@ typedef enum {
|
|||
CTG_TASK_GET_TB_META,
|
||||
CTG_TASK_GET_TB_HASH,
|
||||
CTG_TASK_GET_TB_INDEX,
|
||||
CTG_TASK_GET_TB_CFG,
|
||||
CTG_TASK_GET_INDEX,
|
||||
CTG_TASK_GET_UDF,
|
||||
CTG_TASK_GET_USER,
|
||||
} CTG_TASK_TYPE;
|
||||
|
||||
typedef enum {
|
||||
CTG_TASK_LAUNCHED = 1,
|
||||
CTG_TASK_DONE,
|
||||
} CTG_TASK_STATUS;
|
||||
|
||||
|
||||
typedef struct SCtgDebug {
|
||||
bool lockEnable;
|
||||
bool cacheEnable;
|
||||
|
@ -102,6 +109,12 @@ typedef struct SCtgTbIndexCtx {
|
|||
SName* pName;
|
||||
} SCtgTbIndexCtx;
|
||||
|
||||
typedef struct SCtgTbCfgCtx {
|
||||
SName* pName;
|
||||
int32_t tbType;
|
||||
SVgroupInfo* pVgInfo;
|
||||
} SCtgTbCfgCtx;
|
||||
|
||||
typedef struct SCtgDbVgCtx {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
} SCtgDbVgCtx;
|
||||
|
@ -190,6 +203,8 @@ typedef struct SCtgJob {
|
|||
SArray* pTasks;
|
||||
int32_t taskDone;
|
||||
SMetaData jobRes;
|
||||
int32_t taskIdx;
|
||||
SRWLatch taskLock;
|
||||
|
||||
uint64_t queryId;
|
||||
SCatalog* pCtg;
|
||||
|
@ -206,6 +221,7 @@ typedef struct SCtgJob {
|
|||
int32_t userNum;
|
||||
int32_t dbInfoNum;
|
||||
int32_t tbIndexNum;
|
||||
int32_t tbCfgNum;
|
||||
} SCtgJob;
|
||||
|
||||
typedef struct SCtgMsgCtx {
|
||||
|
@ -215,24 +231,44 @@ typedef struct SCtgMsgCtx {
|
|||
char* target;
|
||||
} SCtgMsgCtx;
|
||||
|
||||
typedef struct SCtgTask SCtgTask;
|
||||
typedef int32_t (*ctgSubTaskCbFp)(SCtgTask*);
|
||||
|
||||
typedef struct SCtgSubRes {
|
||||
CTG_TASK_TYPE type;
|
||||
int32_t code;
|
||||
void* res;
|
||||
ctgSubTaskCbFp fp;
|
||||
} SCtgSubRes;
|
||||
|
||||
typedef struct SCtgTask {
|
||||
CTG_TASK_TYPE type;
|
||||
int32_t taskId;
|
||||
SCtgJob* pJob;
|
||||
void* taskCtx;
|
||||
SCtgMsgCtx msgCtx;
|
||||
int32_t code;
|
||||
void* res;
|
||||
CTG_TASK_TYPE type;
|
||||
int32_t taskId;
|
||||
SCtgJob* pJob;
|
||||
void* taskCtx;
|
||||
SCtgMsgCtx msgCtx;
|
||||
int32_t code;
|
||||
void* res;
|
||||
CTG_TASK_STATUS status;
|
||||
SRWLatch lock;
|
||||
SArray* pParents;
|
||||
SCtgSubRes subRes;
|
||||
} SCtgTask;
|
||||
|
||||
typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*);
|
||||
typedef int32_t (*ctgLanchTaskFp)(SCtgTask*);
|
||||
typedef int32_t (*ctgHandleTaskMsgRspFp)(SCtgTask*, int32_t, const SDataBuf *, int32_t);
|
||||
typedef int32_t (*ctgDumpTaskResFp)(SCtgTask*);
|
||||
typedef int32_t (*ctgCloneTaskResFp)(SCtgTask*, void**);
|
||||
typedef int32_t (*ctgCompTaskFp)(SCtgTask*, void*, bool*);
|
||||
|
||||
typedef struct SCtgAsyncFps {
|
||||
ctgLanchTaskFp launchFp;
|
||||
ctgInitTaskFp initFp;
|
||||
ctgLanchTaskFp launchFp;
|
||||
ctgHandleTaskMsgRspFp handleRspFp;
|
||||
ctgDumpTaskResFp dumpResFp;
|
||||
ctgDumpTaskResFp dumpResFp;
|
||||
ctgCompTaskFp compFp;
|
||||
ctgCloneTaskResFp cloneFp;
|
||||
} SCtgAsyncFps;
|
||||
|
||||
typedef struct SCtgApiStat {
|
||||
|
@ -521,6 +557,7 @@ int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation);
|
|||
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation);
|
||||
int32_t ctgOpClearCache(SCtgCacheOperation *operation);
|
||||
int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tableName, int32_t *tbType);
|
||||
int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVgroupInfo **pVgroup);
|
||||
|
||||
|
||||
|
||||
|
@ -542,6 +579,8 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
|
|||
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum);
|
||||
int32_t ctgLaunchJob(SCtgJob *pJob);
|
||||
int32_t ctgMakeAsyncRes(SCtgJob *pJob);
|
||||
int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param);
|
||||
int32_t ctgGetTbCfgCb(SCtgTask *pTask);
|
||||
|
||||
int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst);
|
||||
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput);
|
||||
|
@ -562,6 +601,7 @@ char * ctgTaskTypeStr(CTG_TASK_TYPE type);
|
|||
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask);
|
||||
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes);
|
||||
void ctgFreeSTableIndex(void *info);
|
||||
void ctgClearSubTaskRes(SCtgSubRes *pRes);
|
||||
|
||||
|
||||
extern SCatalogMgmt gCtgMgmt;
|
||||
|
|
|
@ -22,36 +22,6 @@
|
|||
|
||||
SCatalogMgmt gCtgMgmt = {0};
|
||||
|
||||
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) {
|
||||
int32_t code = 0;
|
||||
STableMeta* tblMeta = NULL;
|
||||
SCtgTbMetaCtx tbCtx = {0};
|
||||
tbCtx.flag = CTG_FLAG_UNKNOWN_STB;
|
||||
tbCtx.pName = pTableName;
|
||||
|
||||
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &tbCtx, &tblMeta));
|
||||
|
||||
if (NULL == tblMeta) {
|
||||
ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pTableName, dbFName);
|
||||
|
||||
if (TSDB_SUPER_TABLE == tblMeta->tableType) {
|
||||
CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq));
|
||||
} else {
|
||||
CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq));
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(tblMeta);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char* dbFName, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -212,29 +182,6 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) {
|
||||
if (CTG_IS_SYS_DBNAME(ctx->pName->dbname)) {
|
||||
CTG_FLAG_SET_SYS_DB(ctx->flag);
|
||||
}
|
||||
|
||||
CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, ctx, pTableMeta));
|
||||
|
||||
if (*pTableMeta) {
|
||||
if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) &&
|
||||
((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(*pTableMeta);
|
||||
}
|
||||
|
||||
if (CTG_FLAG_IS_UNKNOWN_STB(ctx->flag)) {
|
||||
CTG_FLAG_SET_STB(ctx->flag, ctx->tbInfo.tbType);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) {
|
||||
int32_t code = 0;
|
||||
STableMetaOutput *output = NULL;
|
||||
|
|
|
@ -20,7 +20,8 @@
|
|||
#include "systable.h"
|
||||
#include "tref.h"
|
||||
|
||||
int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
|
||||
int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
|
||||
SName *name = (SName*)param;
|
||||
SCtgTask task = {0};
|
||||
|
||||
task.type = CTG_TASK_GET_TB_META;
|
||||
|
@ -44,12 +45,13 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
|
||||
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
|
||||
int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
|
||||
char *dbFName = (char*)param;
|
||||
SCtgTask task = {0};
|
||||
|
||||
task.type = CTG_TASK_GET_DB_VGROUP;
|
||||
|
@ -67,12 +69,13 @@ int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName);
|
||||
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
|
||||
int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
|
||||
char *dbFName = (char*)param;
|
||||
SCtgTask task = {0};
|
||||
|
||||
task.type = CTG_TASK_GET_DB_CFG;
|
||||
|
@ -90,12 +93,13 @@ int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName);
|
||||
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
|
||||
int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
|
||||
char *dbFName = (char*)param;
|
||||
SCtgTask task = {0};
|
||||
|
||||
task.type = CTG_TASK_GET_DB_INFO;
|
||||
|
@ -113,13 +117,14 @@ int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName);
|
||||
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
|
||||
int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
|
||||
SName *name = (SName*)param;
|
||||
SCtgTask task = {0};
|
||||
|
||||
task.type = CTG_TASK_GET_TB_HASH;
|
||||
|
@ -143,12 +148,12 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
|
||||
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx) {
|
||||
int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
|
||||
SCtgTask task = {0};
|
||||
|
||||
task.type = CTG_TASK_GET_QNODE;
|
||||
|
@ -163,7 +168,8 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, char *name) {
|
||||
int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
|
||||
char *name = (char*)param;
|
||||
SCtgTask task = {0};
|
||||
|
||||
task.type = CTG_TASK_GET_INDEX;
|
||||
|
@ -181,12 +187,13 @@ int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, char *name) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name);
|
||||
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, char *name) {
|
||||
int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
|
||||
char *name = (char*)param;
|
||||
SCtgTask task = {0};
|
||||
|
||||
task.type = CTG_TASK_GET_UDF;
|
||||
|
@ -204,12 +211,13 @@ int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, char *name) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, udfName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name);
|
||||
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, udfName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user) {
|
||||
int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
|
||||
SUserAuthInfo *user = (SUserAuthInfo*)param;
|
||||
SCtgTask task = {0};
|
||||
|
||||
task.type = CTG_TASK_GET_USER;
|
||||
|
@ -227,12 +235,13 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user)
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, user:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), user->user);
|
||||
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, user:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), user->user);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
|
||||
int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
|
||||
SName *name = (SName*)param;
|
||||
SCtgTask task = {0};
|
||||
|
||||
task.type = CTG_TASK_GET_TB_INDEX;
|
||||
|
@ -255,11 +264,41 @@ int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
|
||||
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgInitGetTbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
|
||||
SName *name = (SName*)param;
|
||||
SCtgTask task = {0};
|
||||
|
||||
task.type = CTG_TASK_GET_TB_CFG;
|
||||
task.taskId = taskIdx;
|
||||
task.pJob = pJob;
|
||||
|
||||
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbCfgCtx));
|
||||
if (NULL == task.taskCtx) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCtgTbCfgCtx* ctx = task.taskCtx;
|
||||
ctx->pName = taosMemoryMalloc(sizeof(*name));
|
||||
if (NULL == ctx->pName) {
|
||||
taosMemoryFree(task.taskCtx);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
memcpy(ctx->pName, name, sizeof(*name));
|
||||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, const SCatalogReq* pReq) {
|
||||
SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
|
@ -296,6 +335,13 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con
|
|||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->tbCfgNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableCfg, i);
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(name, dbFName);
|
||||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
char* dbFName = taosHashIterate(pDb, NULL);
|
||||
while (dbFName) {
|
||||
ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
|
||||
|
@ -304,40 +350,32 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con
|
|||
|
||||
taosHashCleanup(pDb);
|
||||
|
||||
int32_t tbNum = pJob->tbMetaNum + pJob->tbHashNum;
|
||||
if (tbNum > 0) {
|
||||
if (tbNum > pJob->tbMetaNum && tbNum > pJob->tbHashNum) {
|
||||
SHashObj* pTb = taosHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
for (int32_t i = 0; i < pJob->tbMetaNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableMeta, i);
|
||||
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->tbHashNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableHash, i);
|
||||
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
|
||||
}
|
||||
|
||||
SName* name = taosHashIterate(pTb, NULL);
|
||||
while (name) {
|
||||
catalogRemoveTableMeta(pCtg, name);
|
||||
name = taosHashIterate(pTb, name);
|
||||
}
|
||||
|
||||
taosHashCleanup(pTb);
|
||||
} else {
|
||||
for (int32_t i = 0; i < pJob->tbMetaNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableMeta, i);
|
||||
catalogRemoveTableMeta(pCtg, name);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->tbHashNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableHash, i);
|
||||
catalogRemoveTableMeta(pCtg, name);
|
||||
}
|
||||
}
|
||||
// REFRESH TABLE META
|
||||
SHashObj* pTb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
for (int32_t i = 0; i < pJob->tbMetaNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableMeta, i);
|
||||
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->tbHashNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableHash, i);
|
||||
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->tbCfgNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableCfg, i);
|
||||
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
|
||||
}
|
||||
|
||||
SName* name = taosHashIterate(pTb, NULL);
|
||||
while (name) {
|
||||
catalogRemoveTableMeta(pCtg, name);
|
||||
name = taosHashIterate(pTb, name);
|
||||
}
|
||||
|
||||
taosHashCleanup(pTb);
|
||||
|
||||
|
||||
for (int32_t i = 0; i < pJob->tbIndexNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableIndex, i);
|
||||
ctgDropTbIndexEnqueue(pCtg, name, true);
|
||||
|
@ -346,6 +384,20 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *taskId) {
|
||||
int32_t tid = atomic_fetch_add_32(&pJob->taskIdx, 1);
|
||||
|
||||
CTG_LOCK(CTG_WRITE, &pJob->taskLock);
|
||||
CTG_ERR_RET((*gCtgAsyncFps[type].initFp)(pJob, tid, param));
|
||||
CTG_UNLOCK(CTG_WRITE, &pJob->taskLock);
|
||||
|
||||
if (taskId) {
|
||||
*taskId = tid;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) {
|
||||
int32_t code = 0;
|
||||
int32_t tbMetaNum = (int32_t)taosArrayGetSize(pReq->pTableMeta);
|
||||
|
@ -358,8 +410,9 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
|
|||
int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser);
|
||||
int32_t dbInfoNum = (int32_t)taosArrayGetSize(pReq->pDbInfo);
|
||||
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
|
||||
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
|
||||
|
||||
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum;
|
||||
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum;
|
||||
if (*taskNum <= 0) {
|
||||
ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -389,6 +442,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
|
|||
pJob->userNum = userNum;
|
||||
pJob->dbInfoNum = dbInfoNum;
|
||||
pJob->tbIndexNum = tbIndexNum;
|
||||
pJob->tbCfgNum = tbCfgNum;
|
||||
|
||||
pJob->pTasks = taosArrayInit(*taskNum, sizeof(SCtgTask));
|
||||
|
||||
|
@ -401,54 +455,58 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
|
|||
CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, *taskNum, pJob, pReq));
|
||||
}
|
||||
|
||||
int32_t taskIdx = 0;
|
||||
for (int32_t i = 0; i < dbVgNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
|
||||
CTG_ERR_JRET(ctgInitGetDbVgTask(pJob, taskIdx++, dbFName));
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DB_VGROUP, dbFName, NULL));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < dbCfgNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbCfg, i);
|
||||
CTG_ERR_JRET(ctgInitGetDbCfgTask(pJob, taskIdx++, dbFName));
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DB_CFG, dbFName, NULL));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < dbInfoNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbInfo, i);
|
||||
CTG_ERR_JRET(ctgInitGetDbInfoTask(pJob, taskIdx++, dbFName));
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DB_INFO, dbFName, NULL));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tbMetaNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableMeta, i);
|
||||
CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name));
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_META, name, NULL));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tbHashNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableHash, i);
|
||||
CTG_ERR_JRET(ctgInitGetTbHashTask(pJob, taskIdx++, name));
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_HASH, name, NULL));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tbIndexNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableIndex, i);
|
||||
CTG_ERR_JRET(ctgInitGetTbIndexTask(pJob, taskIdx++, name));
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_INDEX, name, NULL));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tbCfgNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableCfg, i);
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_CFG, name, NULL));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < indexNum; ++i) {
|
||||
char* indexName = taosArrayGet(pReq->pIndex, i);
|
||||
CTG_ERR_JRET(ctgInitGetIndexTask(pJob, taskIdx++, indexName));
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_INDEX, indexName, NULL));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < udfNum; ++i) {
|
||||
char* udfName = taosArrayGet(pReq->pUdf, i);
|
||||
CTG_ERR_JRET(ctgInitGetUdfTask(pJob, taskIdx++, udfName));
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_UDF, udfName, NULL));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < userNum; ++i) {
|
||||
SUserAuthInfo* user = taosArrayGet(pReq->pUser, i);
|
||||
CTG_ERR_JRET(ctgInitGetUserTask(pJob, taskIdx++, user));
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_USER, user, NULL));
|
||||
}
|
||||
|
||||
if (qnodeNum) {
|
||||
CTG_ERR_JRET(ctgInitGetQnodeTask(pJob, taskIdx++));
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_QNODE, NULL, NULL));
|
||||
}
|
||||
|
||||
pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob);
|
||||
|
@ -528,6 +586,21 @@ int32_t ctgDumpTbIndexRes(SCtgTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgDumpTbCfgRes(SCtgTask* pTask) {
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
if (NULL == pJob->jobRes.pTableCfg) {
|
||||
pJob->jobRes.pTableCfg = taosArrayInit(pJob->tbCfgNum, sizeof(SMetaRes));
|
||||
if (NULL == pJob->jobRes.pTableCfg) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
|
||||
taosArrayPush(pJob->jobRes.pTableCfg, &res);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgDumpIndexRes(SCtgTask* pTask) {
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
if (NULL == pJob->jobRes.pIndex) {
|
||||
|
@ -618,13 +691,48 @@ int32_t ctgDumpUserRes(SCtgTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgInvokeSubCb(SCtgTask *pTask) {
|
||||
int32_t code = 0;
|
||||
|
||||
CTG_LOCK(CTG_WRITE, &pTask->lock);
|
||||
|
||||
int32_t parentNum = taosArrayGetSize(pTask->pParents);
|
||||
for (int32_t i = 0; i < parentNum; ++i) {
|
||||
SCtgTask* pParent = taosArrayGetP(pTask->pParents, i);
|
||||
|
||||
pParent->subRes.code = pTask->code;
|
||||
if (TSDB_CODE_SUCCESS == pTask->code) {
|
||||
code = (*gCtgAsyncFps[pTask->type].cloneFp)(pTask, &pParent->subRes.res);
|
||||
if (code) {
|
||||
pParent->subRes.code = code;
|
||||
}
|
||||
}
|
||||
|
||||
CTG_ERR_JRET(pParent->subRes.fp(pParent));
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
CTG_UNLOCK(CTG_WRITE, &pTask->lock);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
int32_t code = 0;
|
||||
|
||||
if (CTG_TASK_DONE == pTask->status) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " task %d end with res %s", pJob->queryId, pTask->taskId, tstrerror(rspCode));
|
||||
|
||||
pTask->code = rspCode;
|
||||
pTask->status = CTG_TASK_DONE;
|
||||
|
||||
ctgInvokeSubCb(pTask);
|
||||
|
||||
int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1);
|
||||
if (taskDone < taosArrayGetSize(pJob->pTasks)) {
|
||||
|
@ -802,11 +910,12 @@ int32_t ctgHandleGetDbVgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pM
|
|||
switch (reqType) {
|
||||
case TDMT_MND_USE_DB: {
|
||||
SUseDbOutput* pOut = (SUseDbOutput*)pTask->msgCtx.out;
|
||||
SDBVgInfo* pDb = NULL;
|
||||
|
||||
CTG_ERR_JRET(ctgGenerateVgList(pCtg, pOut->dbVgroup->vgHash, (SArray**)&pTask->res));
|
||||
|
||||
CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, ctx->dbFName, pOut->dbId, pOut->dbVgroup, false));
|
||||
pOut->dbVgroup = NULL;
|
||||
CTG_ERR_JRET(cloneDbVgInfo(pOut->dbVgroup, &pDb));
|
||||
CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, ctx->dbFName, pOut->dbId, pDb, false));
|
||||
|
||||
break;
|
||||
}
|
||||
|
@ -874,6 +983,7 @@ int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf
|
|||
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pTask->pJob->pCtg, (STableIndex**)&pTask->msgCtx.out, false));
|
||||
|
||||
_return:
|
||||
|
||||
if (TSDB_CODE_MND_DB_INDEX_NOT_EXIST == code) {
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -882,6 +992,18 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgHandleGetTbCfgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
|
||||
|
||||
TSWAP(pTask->res, pTask->msgCtx.out);
|
||||
|
||||
_return:
|
||||
|
||||
ctgHandleTaskEnd(pTask, code);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgHandleGetDbCfgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
|
@ -1138,6 +1260,48 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgLaunchGetTbCfgTask(SCtgTask *pTask) {
|
||||
int32_t code = 0;
|
||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||
SCtgTbCfgCtx* pCtx = (SCtgTbCfgCtx*)pTask->taskCtx;
|
||||
SArray* pRes = NULL;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pCtx->pName, dbFName);
|
||||
|
||||
if (pCtx->tbType <= 0) {
|
||||
CTG_ERR_JRET(ctgReadTbTypeFromCache(pCtg, dbFName, pCtx->pName->tname, &pCtx->tbType));
|
||||
if (pCtx->tbType <= 0) {
|
||||
CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_TB_META, ctgGetTbCfgCb, pCtx->pName));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_SUPER_TABLE == pCtx->tbType) {
|
||||
CTG_ERR_JRET(ctgGetTableCfgFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask));
|
||||
} else {
|
||||
if (NULL == pCtx->pVgInfo) {
|
||||
CTG_ERR_JRET(ctgGetTbHashVgroupFromCache(pCtg, pCtx->pName, &pCtx->pVgInfo));
|
||||
if (NULL == pCtx->pVgInfo) {
|
||||
CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_DB_VGROUP, ctgGetTbCfgCb, dbFName));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
CTG_ERR_JRET(ctgGetTableCfgFromVnode(pCtg, pConn, pCtx->pName, pCtx->pVgInfo, NULL, pTask));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
if (CTG_TASK_LAUNCHED == pTask->status) {
|
||||
ctgHandleTaskEnd(pTask, code);
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
|
||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||
|
@ -1244,17 +1408,70 @@ int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetTbCfgCb(SCtgTask *pTask) {
|
||||
int32_t code = 0;
|
||||
|
||||
CTG_ERR_JRET(pTask->subRes.code);
|
||||
|
||||
SCtgTbCfgCtx* pCtx = (SCtgTbCfgCtx*)pTask->taskCtx;
|
||||
if (CTG_TASK_GET_TB_META == pTask->subRes.type) {
|
||||
pCtx->tbType = ((STableMeta*)pTask->subRes.res)->tableType;
|
||||
} else if (CTG_TASK_GET_DB_VGROUP == pTask->subRes.type) {
|
||||
SDBVgInfo* pDb = (SDBVgInfo*)pTask->subRes.res;
|
||||
|
||||
pCtx->pVgInfo = taosMemoryCalloc(1, sizeof(SVgroupInfo));
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pTask->pJob->pCtg, pDb, pCtx->pName, pCtx->pVgInfo));
|
||||
}
|
||||
|
||||
CTG_RET(ctgLaunchGetTbCfgTask(pTask));
|
||||
|
||||
_return:
|
||||
|
||||
CTG_RET(ctgHandleTaskEnd(pTask, pTask->subRes.code));
|
||||
}
|
||||
|
||||
int32_t ctgCompDbVgTasks(SCtgTask* pTask, void* param, bool* equal) {
|
||||
SCtgDbVgCtx* ctx = pTask->taskCtx;
|
||||
|
||||
*equal = (0 == strcmp(ctx->dbFName, param));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgCompTbMetaTasks(SCtgTask* pTask, void* param, bool* equal) {
|
||||
SCtgTbMetaCtx* ctx = pTask->taskCtx;
|
||||
|
||||
*equal = tNameTbNameEqual(ctx->pName, (SName*)param);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgCloneTbMeta(SCtgTask* pTask, void** pRes) {
|
||||
STableMeta* pMeta = (STableMeta*)pTask->res;
|
||||
|
||||
CTG_RET(cloneTableMeta(pMeta, (STableMeta**)pRes));
|
||||
}
|
||||
|
||||
int32_t ctgCloneDbVg(SCtgTask* pTask, void** pRes) {
|
||||
SUseDbOutput* pOut = (SUseDbOutput*)pTask->msgCtx.out;
|
||||
|
||||
CTG_RET(cloneDbVgInfo(pOut->dbVgroup, (SDBVgInfo**)pRes));
|
||||
}
|
||||
|
||||
|
||||
SCtgAsyncFps gCtgAsyncFps[] = {
|
||||
{ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes},
|
||||
{ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes},
|
||||
{ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes},
|
||||
{ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes},
|
||||
{ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes},
|
||||
{ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes},
|
||||
{ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes},
|
||||
{ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes},
|
||||
{ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes},
|
||||
{ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes},
|
||||
{ctgInitGetQnodeTask, ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes, NULL, NULL},
|
||||
{ctgInitGetDbVgTask, ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes, ctgCompDbVgTasks, ctgCloneDbVg},
|
||||
{ctgInitGetDbCfgTask, ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes, NULL, NULL},
|
||||
{ctgInitGetDbInfoTask, ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes, NULL, NULL},
|
||||
{ctgInitGetTbMetaTask, ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes, ctgCompTbMetaTasks, ctgCloneTbMeta},
|
||||
{ctgInitGetTbHashTask, ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes, NULL, NULL},
|
||||
{ctgInitGetTbIndexTask, ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes, NULL, NULL},
|
||||
{ctgInitGetTbCfgTask, ctgLaunchGetTbCfgTask, ctgHandleGetTbCfgRsp, ctgDumpTbCfgRes, NULL, NULL},
|
||||
{ctgInitGetIndexTask, ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes, NULL, NULL},
|
||||
{ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL},
|
||||
{ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL},
|
||||
};
|
||||
|
||||
int32_t ctgMakeAsyncRes(SCtgJob *pJob) {
|
||||
|
@ -1269,6 +1486,86 @@ int32_t ctgMakeAsyncRes(SCtgJob *pJob) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgSearchExistingTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t* taskId) {
|
||||
bool equal = false;
|
||||
SCtgTask* pTask = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
CTG_LOCK(CTG_READ, &pJob->taskLock);
|
||||
|
||||
int32_t taskNum = taosArrayGetSize(pJob->pTasks);
|
||||
for (int32_t i = 0; i < taskNum; ++i) {
|
||||
pTask = taosArrayGet(pJob->pTasks, i);
|
||||
if (type != pTask->type) {
|
||||
continue;
|
||||
}
|
||||
|
||||
CTG_ERR_JRET((*gCtgAsyncFps[type].compFp)(pTask, param, &equal));
|
||||
if (equal) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
CTG_UNLOCK(CTG_READ, &pJob->taskLock);
|
||||
if (equal) {
|
||||
*taskId = pTask->taskId;
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) {
|
||||
int32_t code = 0;
|
||||
|
||||
CTG_LOCK(CTG_WRITE, &pSub->lock);
|
||||
if (CTG_TASK_DONE == pSub->status) {
|
||||
pTask->subRes.code = pSub->code;
|
||||
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res));
|
||||
CTG_ERR_JRET(pTask->subRes.fp(pTask));
|
||||
} else {
|
||||
if (NULL == pSub->pParents) {
|
||||
pSub->pParents = taosArrayInit(4, POINTER_BYTES);
|
||||
}
|
||||
|
||||
taosArrayPush(pSub->pParents, &pTask);
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
CTG_UNLOCK(CTG_WRITE, &pSub->lock);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param) {
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
int32_t subTaskId = -1;
|
||||
bool newTask = false;
|
||||
|
||||
ctgClearSubTaskRes(&pTask->subRes);
|
||||
pTask->subRes.type = type;
|
||||
pTask->subRes.fp = fp;
|
||||
|
||||
CTG_ERR_RET(ctgSearchExistingTask(pJob, type, param, &subTaskId));
|
||||
if (subTaskId < 0) {
|
||||
CTG_ERR_RET(ctgInitTask(pJob, type, param, &subTaskId));
|
||||
newTask = true;
|
||||
}
|
||||
|
||||
SCtgTask* pSub = taosArrayGet(pJob->pTasks, subTaskId);
|
||||
|
||||
CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask));
|
||||
|
||||
if (newTask) {
|
||||
CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub));
|
||||
pSub->status = CTG_TASK_LAUNCHED;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgLaunchJob(SCtgJob *pJob) {
|
||||
int32_t taskNum = taosArrayGetSize(pJob->pTasks);
|
||||
|
@ -1278,6 +1575,7 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
|
|||
|
||||
qDebug("QID:0x%" PRIx64 " ctg start to launch task %d", pJob->queryId, pTask->taskId);
|
||||
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
|
||||
pTask->status = CTG_TASK_LAUNCHED;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -2061,4 +2061,92 @@ int32_t ctgStartUpdateThread() {
|
|||
}
|
||||
|
||||
|
||||
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) {
|
||||
if (CTG_IS_SYS_DBNAME(ctx->pName->dbname)) {
|
||||
CTG_FLAG_SET_SYS_DB(ctx->flag);
|
||||
}
|
||||
|
||||
CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, ctx, pTableMeta));
|
||||
|
||||
if (*pTableMeta) {
|
||||
if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) &&
|
||||
((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(*pTableMeta);
|
||||
}
|
||||
|
||||
if (CTG_FLAG_IS_UNKNOWN_STB(ctx->flag)) {
|
||||
CTG_FLAG_SET_STB(ctx->flag, ctx->tbInfo.tbType);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) {
|
||||
int32_t code = 0;
|
||||
STableMeta* tblMeta = NULL;
|
||||
SCtgTbMetaCtx tbCtx = {0};
|
||||
tbCtx.flag = CTG_FLAG_UNKNOWN_STB;
|
||||
tbCtx.pName = pTableName;
|
||||
|
||||
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &tbCtx, &tblMeta));
|
||||
|
||||
if (NULL == tblMeta) {
|
||||
ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pTableName, dbFName);
|
||||
|
||||
if (TSDB_SUPER_TABLE == tblMeta->tableType) {
|
||||
CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq));
|
||||
} else {
|
||||
CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq));
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(tblMeta);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVgroupInfo **pVgroup) {
|
||||
if (CTG_IS_SYS_DBNAME(pTableName->dbname)) {
|
||||
ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SCtgDBCache* dbCache = NULL;
|
||||
int32_t code = 0;
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(pTableName, dbFName);
|
||||
|
||||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
|
||||
|
||||
if (NULL == dbCache) {
|
||||
*pVgroup = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*pVgroup = taosMemoryCalloc(1, sizeof(SVgroupInfo));
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, *pVgroup));
|
||||
|
||||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
if (code) {
|
||||
taosMemoryFreeClear(*pVgroup);
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -33,6 +33,10 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
|
|||
return "[get table meta]";
|
||||
case CTG_TASK_GET_TB_HASH:
|
||||
return "[get table hash]";
|
||||
case CTG_TASK_GET_TB_INDEX:
|
||||
return "[get table index]";
|
||||
case CTG_TASK_GET_TB_CFG:
|
||||
return "[get table cfg]";
|
||||
case CTG_TASK_GET_INDEX:
|
||||
return "[get index]";
|
||||
case CTG_TASK_GET_UDF:
|
||||
|
@ -96,6 +100,9 @@ void ctgFreeSMetaData(SMetaData* pData) {
|
|||
|
||||
taosArrayDestroy(pData->pQnodeList);
|
||||
pData->pQnodeList = NULL;
|
||||
|
||||
taosArrayDestroy(pData->pTableCfg);
|
||||
pData->pTableCfg = NULL;
|
||||
}
|
||||
|
||||
void ctgFreeSCtgUserAuth(SCtgUserAuth *userCache) {
|
||||
|
@ -280,6 +287,13 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
|
|||
}
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_TABLE_CFG:
|
||||
case TDMT_MND_TABLE_CFG: {
|
||||
STableCfgRsp* pOut = (STableCfgRsp*)pCtx->out;
|
||||
tFreeSTableCfgRsp(pOut);
|
||||
taosMemoryFreeClear(pCtx->out);
|
||||
break;
|
||||
}
|
||||
case TDMT_MND_RETRIEVE_FUNC: {
|
||||
SFuncInfo* pOut = (SFuncInfo*)pCtx->out;
|
||||
taosMemoryFree(pOut->pCode);
|
||||
|
@ -328,14 +342,135 @@ void ctgResetTbMetaTask(SCtgTask* pTask) {
|
|||
taosMemoryFreeClear(pTask->res);
|
||||
}
|
||||
|
||||
void ctgFreeTask(SCtgTask* pTask) {
|
||||
ctgFreeMsgCtx(&pTask->msgCtx);
|
||||
void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
|
||||
switch (type) {
|
||||
case CTG_TASK_GET_QNODE: {
|
||||
taosArrayDestroy((SArray*)*pRes);
|
||||
*pRes = NULL;
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_TB_META: {
|
||||
taosMemoryFreeClear(*pRes);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_DB_VGROUP: {
|
||||
taosArrayDestroy((SArray*)*pRes);
|
||||
*pRes = NULL;
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_DB_CFG: {
|
||||
if (*pRes) {
|
||||
SDbCfgInfo* pInfo = (SDbCfgInfo*)*pRes;
|
||||
taosArrayDestroy(pInfo->pRetensions);
|
||||
taosMemoryFreeClear(*pRes);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_DB_INFO: {
|
||||
taosMemoryFreeClear(*pRes);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_TB_HASH: {
|
||||
taosMemoryFreeClear(*pRes);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_TB_INDEX: {
|
||||
taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo);
|
||||
*pRes = NULL;
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_INDEX: {
|
||||
taosMemoryFreeClear(*pRes);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_UDF: {
|
||||
taosMemoryFreeClear(*pRes);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_USER: {
|
||||
taosMemoryFreeClear(*pRes);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
qError("invalid task type %d", type);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) {
|
||||
switch (type) {
|
||||
case CTG_TASK_GET_QNODE: {
|
||||
taosArrayDestroy((SArray*)*pRes);
|
||||
*pRes = NULL;
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_TB_META: {
|
||||
taosMemoryFreeClear(*pRes);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_DB_VGROUP: {
|
||||
if (*pRes) {
|
||||
SDBVgInfo* pInfo = (SDBVgInfo*)*pRes;
|
||||
taosHashCleanup(pInfo->vgHash);
|
||||
taosMemoryFreeClear(*pRes);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_DB_CFG: {
|
||||
if (*pRes) {
|
||||
SDbCfgInfo* pInfo = (SDbCfgInfo*)*pRes;
|
||||
taosArrayDestroy(pInfo->pRetensions);
|
||||
taosMemoryFreeClear(*pRes);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_DB_INFO: {
|
||||
taosMemoryFreeClear(*pRes);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_TB_HASH: {
|
||||
taosMemoryFreeClear(*pRes);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_TB_INDEX: {
|
||||
taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo);
|
||||
*pRes = NULL;
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_INDEX: {
|
||||
taosMemoryFreeClear(*pRes);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_UDF: {
|
||||
taosMemoryFreeClear(*pRes);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_USER: {
|
||||
taosMemoryFreeClear(*pRes);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
qError("invalid task type %d", type);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ctgClearSubTaskRes(SCtgSubRes *pRes) {
|
||||
pRes->code = 0;
|
||||
|
||||
if (NULL == pRes->res) {
|
||||
return;
|
||||
}
|
||||
|
||||
ctgFreeSubTaskRes(pRes->type, &pRes->res);
|
||||
}
|
||||
|
||||
void ctgFreeTaskCtx(SCtgTask* pTask) {
|
||||
switch (pTask->type) {
|
||||
case CTG_TASK_GET_QNODE: {
|
||||
taosArrayDestroy((SArray*)pTask->res);
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
pTask->res = NULL;
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_TB_META: {
|
||||
|
@ -346,56 +481,49 @@ void ctgFreeTask(SCtgTask* pTask) {
|
|||
pTask->msgCtx.lastOut = NULL;
|
||||
}
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
taosMemoryFreeClear(pTask->res);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_DB_VGROUP: {
|
||||
taosArrayDestroy((SArray*)pTask->res);
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
pTask->res = NULL;
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_DB_CFG: {
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
if (pTask->res) {
|
||||
SDbCfgInfo* pInfo = (SDbCfgInfo*)pTask->res;
|
||||
taosArrayDestroy(pInfo->pRetensions);
|
||||
taosMemoryFreeClear(pTask->res);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_DB_INFO: {
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
taosMemoryFreeClear(pTask->res);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_TB_HASH: {
|
||||
SCtgTbHashCtx* taskCtx = (SCtgTbHashCtx*)pTask->taskCtx;
|
||||
taosMemoryFreeClear(taskCtx->pName);
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
taosMemoryFreeClear(pTask->res);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_TB_INDEX: {
|
||||
SCtgTbIndexCtx* taskCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
|
||||
taosMemoryFreeClear(taskCtx->pName);
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
taosArrayDestroyEx(pTask->res, tFreeSTableIndexInfo);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_TB_CFG: {
|
||||
SCtgTbCfgCtx* taskCtx = (SCtgTbCfgCtx*)pTask->taskCtx;
|
||||
taosMemoryFreeClear(taskCtx->pName);
|
||||
taosMemoryFreeClear(taskCtx->pVgInfo);
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_INDEX: {
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
taosMemoryFreeClear(pTask->res);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_UDF: {
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
taosMemoryFreeClear(pTask->res);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_USER: {
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
taosMemoryFreeClear(pTask->res);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
@ -404,6 +532,16 @@ void ctgFreeTask(SCtgTask* pTask) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
void ctgFreeTask(SCtgTask* pTask) {
|
||||
ctgFreeMsgCtx(&pTask->msgCtx);
|
||||
ctgFreeTaskRes(pTask->type, &pTask->res);
|
||||
ctgFreeTaskCtx(pTask);
|
||||
|
||||
taosArrayDestroy(pTask->pParents);
|
||||
ctgClearSubTaskRes(&pTask->subRes);
|
||||
}
|
||||
|
||||
void ctgFreeTasks(SArray* pArray) {
|
||||
if (NULL == pArray) {
|
||||
return;
|
||||
|
|
|
@ -1081,15 +1081,6 @@ end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
|
||||
*pDst = taosMemoryMalloc(TABLE_META_SIZE(pSrc));
|
||||
if (NULL == *pDst) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(*pDst, pSrc, TABLE_META_SIZE(pSrc));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
|
||||
int32_t len, STableMeta* pMeta) {
|
||||
SVgroupInfo vg;
|
||||
|
|
|
@ -373,3 +373,57 @@ end:
|
|||
}
|
||||
|
||||
|
||||
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
|
||||
if (NULL == pSrc) {
|
||||
*pDst = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t metaSize = (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema);
|
||||
*pDst = taosMemoryMalloc(metaSize);
|
||||
if (NULL == *pDst) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(*pDst, pSrc, metaSize);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
|
||||
if (NULL == pSrc) {
|
||||
*pDst = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*pDst = taosMemoryMalloc(sizeof(*pSrc));
|
||||
if (NULL == *pDst) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(*pDst, pSrc, sizeof(*pSrc));
|
||||
if (pSrc->vgHash) {
|
||||
(*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == (*pDst)->vgHash) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SVgroupInfo* vgInfo = NULL;
|
||||
void *pIter = taosHashIterate(pSrc->vgHash, NULL);
|
||||
while (pIter) {
|
||||
vgInfo = pIter;
|
||||
int32_t* vgId = taosHashGetKey(pIter, NULL);
|
||||
|
||||
if (0 != taosHashPut((*pDst)->vgHash, vgId, sizeof(*vgId), vgInfo, sizeof(*vgInfo))) {
|
||||
qError("taosHashPut failed, vgId:%d", vgInfo->vgId);
|
||||
taosHashCancelIterate(pSrc->vgHash, pIter);
|
||||
taosHashCleanup((*pDst)->vgHash);
|
||||
taosMemoryFreeClear(*pDst);
|
||||
return TSDB_CODE_CTG_MEM_ERROR;
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pSrc->vgHash, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue