diff --git a/include/common/tname.h b/include/common/tname.h index 28f97d1028..c691c2f7b2 100644 --- a/include/common/tname.h +++ b/include/common/tname.h @@ -63,6 +63,8 @@ int32_t tNameSetAcctId(SName* dst, int32_t acctId); bool tNameDBNameEqual(SName* left, SName* right); +bool tNameTbNameEqual(SName* left, SName* right); + typedef struct { // input SArray* tags; // element is SSmlKv diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 1fb8a1db73..26f2758033 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -209,6 +209,8 @@ SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* nam void destroyQueryExecRes(SQueryExecRes* pRes); int32_t dataConverToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len); char* parseTagDatatoJson(void* p); +int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst); +int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst); extern int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallocFp)(int32_t)); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 6948b3c566..c39b492620 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -46,7 +46,7 @@ extern "C" { #define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define HEARTBEAT_INTERVAL 1500 // ms -#define SYNC_ON_TOP_OF_ASYNC 0 +#define SYNC_ON_TOP_OF_ASYNC 1 enum { RES_TYPE__QUERY = 1, diff --git a/source/common/src/tname.c b/source/common/src/tname.c index fd05513579..990e199d6f 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -240,6 +240,15 @@ bool tNameDBNameEqual(SName* left, SName* right) { return (0 == strcmp(left->dbname, right->dbname)); } +bool tNameTbNameEqual(SName* left, SName* right) { + bool equal = tNameDBNameEqual(left, right); + if (equal) { + return (0 == strcmp(left->tname, right->tname)); + } + + return equal; +} + int32_t tNameFromString(SName* dst, const char* str, uint32_t type) { assert(dst != NULL && str != NULL && strlen(str) > 0); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 4923fcd226..98a03aa39b 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -71,11 +71,18 @@ typedef enum { CTG_TASK_GET_TB_META, CTG_TASK_GET_TB_HASH, CTG_TASK_GET_TB_INDEX, + CTG_TASK_GET_TB_CFG, CTG_TASK_GET_INDEX, CTG_TASK_GET_UDF, CTG_TASK_GET_USER, } CTG_TASK_TYPE; +typedef enum { + CTG_TASK_LAUNCHED = 1, + CTG_TASK_DONE, +} CTG_TASK_STATUS; + + typedef struct SCtgDebug { bool lockEnable; bool cacheEnable; @@ -102,6 +109,12 @@ typedef struct SCtgTbIndexCtx { SName* pName; } SCtgTbIndexCtx; +typedef struct SCtgTbCfgCtx { + SName* pName; + int32_t tbType; + SVgroupInfo* pVgInfo; +} SCtgTbCfgCtx; + typedef struct SCtgDbVgCtx { char dbFName[TSDB_DB_FNAME_LEN]; } SCtgDbVgCtx; @@ -190,7 +203,9 @@ typedef struct SCtgJob { SArray* pTasks; int32_t taskDone; SMetaData jobRes; - + int32_t taskIdx; + SRWLatch taskLock; + uint64_t queryId; SCatalog* pCtg; SRequestConnInfo conn; @@ -206,6 +221,7 @@ typedef struct SCtgJob { int32_t userNum; int32_t dbInfoNum; int32_t tbIndexNum; + int32_t tbCfgNum; } SCtgJob; typedef struct SCtgMsgCtx { @@ -215,24 +231,44 @@ typedef struct SCtgMsgCtx { char* target; } SCtgMsgCtx; +typedef struct SCtgTask SCtgTask; +typedef int32_t (*ctgSubTaskCbFp)(SCtgTask*); + +typedef struct SCtgSubRes { + CTG_TASK_TYPE type; + int32_t code; + void* res; + ctgSubTaskCbFp fp; +} SCtgSubRes; + typedef struct SCtgTask { - CTG_TASK_TYPE type; - int32_t taskId; - SCtgJob* pJob; - void* taskCtx; - SCtgMsgCtx msgCtx; - int32_t code; - void* res; + CTG_TASK_TYPE type; + int32_t taskId; + SCtgJob* pJob; + void* taskCtx; + SCtgMsgCtx msgCtx; + int32_t code; + void* res; + CTG_TASK_STATUS status; + SRWLatch lock; + SArray* pParents; + SCtgSubRes subRes; } SCtgTask; +typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*); typedef int32_t (*ctgLanchTaskFp)(SCtgTask*); typedef int32_t (*ctgHandleTaskMsgRspFp)(SCtgTask*, int32_t, const SDataBuf *, int32_t); typedef int32_t (*ctgDumpTaskResFp)(SCtgTask*); +typedef int32_t (*ctgCloneTaskResFp)(SCtgTask*, void**); +typedef int32_t (*ctgCompTaskFp)(SCtgTask*, void*, bool*); typedef struct SCtgAsyncFps { - ctgLanchTaskFp launchFp; + ctgInitTaskFp initFp; + ctgLanchTaskFp launchFp; ctgHandleTaskMsgRspFp handleRspFp; - ctgDumpTaskResFp dumpResFp; + ctgDumpTaskResFp dumpResFp; + ctgCompTaskFp compFp; + ctgCloneTaskResFp cloneFp; } SCtgAsyncFps; typedef struct SCtgApiStat { @@ -521,6 +557,7 @@ int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation); int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation); int32_t ctgOpClearCache(SCtgCacheOperation *operation); int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tableName, int32_t *tbType); +int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVgroupInfo **pVgroup); @@ -542,6 +579,8 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum); int32_t ctgLaunchJob(SCtgJob *pJob); int32_t ctgMakeAsyncRes(SCtgJob *pJob); +int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param); +int32_t ctgGetTbCfgCb(SCtgTask *pTask); int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst); int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput); @@ -562,6 +601,7 @@ char * ctgTaskTypeStr(CTG_TASK_TYPE type); int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask); int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes); void ctgFreeSTableIndex(void *info); +void ctgClearSubTaskRes(SCtgSubRes *pRes); extern SCatalogMgmt gCtgMgmt; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 28ebb2969d..d1cac29653 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -22,36 +22,6 @@ SCatalogMgmt gCtgMgmt = {0}; -int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) { - int32_t code = 0; - STableMeta* tblMeta = NULL; - SCtgTbMetaCtx tbCtx = {0}; - tbCtx.flag = CTG_FLAG_UNKNOWN_STB; - tbCtx.pName = pTableName; - - CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &tbCtx, &tblMeta)); - - if (NULL == tblMeta) { - ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname); - return TSDB_CODE_SUCCESS; - } - - char dbFName[TSDB_DB_FNAME_LEN]; - tNameGetFullDbName(pTableName, dbFName); - - if (TSDB_SUPER_TABLE == tblMeta->tableType) { - CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq)); - } else { - CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq)); - } - -_return: - - taosMemoryFreeClear(tblMeta); - - CTG_RET(code); -} - int32_t ctgGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char* dbFName, SCtgDBCache** dbCache, SDBVgInfo **pInfo) { int32_t code = 0; @@ -212,29 +182,6 @@ _return: CTG_RET(code); } -int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) { - if (CTG_IS_SYS_DBNAME(ctx->pName->dbname)) { - CTG_FLAG_SET_SYS_DB(ctx->flag); - } - - CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, ctx, pTableMeta)); - - if (*pTableMeta) { - if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) && - ((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) { - return TSDB_CODE_SUCCESS; - } - - taosMemoryFreeClear(*pTableMeta); - } - - if (CTG_FLAG_IS_UNKNOWN_STB(ctx->flag)) { - CTG_FLAG_SET_STB(ctx->flag, ctx->tbInfo.tbType); - } - - return TSDB_CODE_SUCCESS; -} - int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) { int32_t code = 0; STableMetaOutput *output = NULL; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 6adadf5045..41249cc85e 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -20,7 +20,8 @@ #include "systable.h" #include "tref.h" -int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { +int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, void* param) { + SName *name = (SName*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_TB_META; @@ -44,12 +45,13 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { +int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { + char *dbFName = (char*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_DB_VGROUP; @@ -67,12 +69,13 @@ int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { +int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { + char *dbFName = (char*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_DB_CFG; @@ -90,12 +93,13 @@ int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { +int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, void* param) { + char *dbFName = (char*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_DB_INFO; @@ -113,13 +117,14 @@ int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { +int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, void* param) { + SName *name = (SName*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_TB_HASH; @@ -143,12 +148,12 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx) { +int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) { SCtgTask task = {0}; task.type = CTG_TASK_GET_QNODE; @@ -163,7 +168,8 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx) { return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, char *name) { +int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { + char *name = (char*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_INDEX; @@ -181,12 +187,13 @@ int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, char *name) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, char *name) { +int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, void* param) { + char *name = (char*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_UDF; @@ -204,12 +211,13 @@ int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, char *name) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, udfName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, udfName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user) { +int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, void* param) { + SUserAuthInfo *user = (SUserAuthInfo*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_USER; @@ -227,12 +235,13 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user) taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, user:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), user->user); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, user:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), user->user); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { +int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { + SName *name = (SName*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_TB_INDEX; @@ -255,11 +264,41 @@ int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; } +int32_t ctgInitGetTbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { + SName *name = (SName*)param; + SCtgTask task = {0}; + + task.type = CTG_TASK_GET_TB_CFG; + task.taskId = taskIdx; + task.pJob = pJob; + + task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbCfgCtx)); + if (NULL == task.taskCtx) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + SCtgTbCfgCtx* ctx = task.taskCtx; + ctx->pName = taosMemoryMalloc(sizeof(*name)); + if (NULL == ctx->pName) { + taosMemoryFree(task.taskCtx); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + memcpy(ctx->pName, name, sizeof(*name)); + + taosArrayPush(pJob->pTasks, &task); + + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, const SCatalogReq* pReq) { SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); @@ -296,6 +335,13 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN); } + for (int32_t i = 0; i < pJob->tbCfgNum; ++i) { + SName* name = taosArrayGet(pReq->pTableCfg, i); + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(name, dbFName); + taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN); + } + char* dbFName = taosHashIterate(pDb, NULL); while (dbFName) { ctgDropDbVgroupEnqueue(pCtg, dbFName, true); @@ -304,39 +350,31 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con taosHashCleanup(pDb); - int32_t tbNum = pJob->tbMetaNum + pJob->tbHashNum; - if (tbNum > 0) { - if (tbNum > pJob->tbMetaNum && tbNum > pJob->tbHashNum) { - SHashObj* pTb = taosHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - for (int32_t i = 0; i < pJob->tbMetaNum; ++i) { - SName* name = taosArrayGet(pReq->pTableMeta, i); - taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName)); - } - - for (int32_t i = 0; i < pJob->tbHashNum; ++i) { - SName* name = taosArrayGet(pReq->pTableHash, i); - taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName)); - } - - SName* name = taosHashIterate(pTb, NULL); - while (name) { - catalogRemoveTableMeta(pCtg, name); - name = taosHashIterate(pTb, name); - } - - taosHashCleanup(pTb); - } else { - for (int32_t i = 0; i < pJob->tbMetaNum; ++i) { - SName* name = taosArrayGet(pReq->pTableMeta, i); - catalogRemoveTableMeta(pCtg, name); - } - - for (int32_t i = 0; i < pJob->tbHashNum; ++i) { - SName* name = taosArrayGet(pReq->pTableHash, i); - catalogRemoveTableMeta(pCtg, name); - } - } + // REFRESH TABLE META + SHashObj* pTb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + for (int32_t i = 0; i < pJob->tbMetaNum; ++i) { + SName* name = taosArrayGet(pReq->pTableMeta, i); + taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName)); } + + for (int32_t i = 0; i < pJob->tbHashNum; ++i) { + SName* name = taosArrayGet(pReq->pTableHash, i); + taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName)); + } + + for (int32_t i = 0; i < pJob->tbCfgNum; ++i) { + SName* name = taosArrayGet(pReq->pTableCfg, i); + taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName)); + } + + SName* name = taosHashIterate(pTb, NULL); + while (name) { + catalogRemoveTableMeta(pCtg, name); + name = taosHashIterate(pTb, name); + } + + taosHashCleanup(pTb); + for (int32_t i = 0; i < pJob->tbIndexNum; ++i) { SName* name = taosArrayGet(pReq->pTableIndex, i); @@ -346,6 +384,20 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con return TSDB_CODE_SUCCESS; } +int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *taskId) { + int32_t tid = atomic_fetch_add_32(&pJob->taskIdx, 1); + + CTG_LOCK(CTG_WRITE, &pJob->taskLock); + CTG_ERR_RET((*gCtgAsyncFps[type].initFp)(pJob, tid, param)); + CTG_UNLOCK(CTG_WRITE, &pJob->taskLock); + + if (taskId) { + *taskId = tid; + } + + return TSDB_CODE_SUCCESS; +} + int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) { int32_t code = 0; int32_t tbMetaNum = (int32_t)taosArrayGetSize(pReq->pTableMeta); @@ -358,8 +410,9 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6 int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser); int32_t dbInfoNum = (int32_t)taosArrayGetSize(pReq->pDbInfo); int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex); + int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg); - *taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum; + *taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum; if (*taskNum <= 0) { ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId); return TSDB_CODE_SUCCESS; @@ -389,6 +442,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6 pJob->userNum = userNum; pJob->dbInfoNum = dbInfoNum; pJob->tbIndexNum = tbIndexNum; + pJob->tbCfgNum = tbCfgNum; pJob->pTasks = taosArrayInit(*taskNum, sizeof(SCtgTask)); @@ -401,54 +455,58 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6 CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, *taskNum, pJob, pReq)); } - int32_t taskIdx = 0; for (int32_t i = 0; i < dbVgNum; ++i) { char* dbFName = taosArrayGet(pReq->pDbVgroup, i); - CTG_ERR_JRET(ctgInitGetDbVgTask(pJob, taskIdx++, dbFName)); + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DB_VGROUP, dbFName, NULL)); } for (int32_t i = 0; i < dbCfgNum; ++i) { char* dbFName = taosArrayGet(pReq->pDbCfg, i); - CTG_ERR_JRET(ctgInitGetDbCfgTask(pJob, taskIdx++, dbFName)); + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DB_CFG, dbFName, NULL)); } for (int32_t i = 0; i < dbInfoNum; ++i) { char* dbFName = taosArrayGet(pReq->pDbInfo, i); - CTG_ERR_JRET(ctgInitGetDbInfoTask(pJob, taskIdx++, dbFName)); + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DB_INFO, dbFName, NULL)); } for (int32_t i = 0; i < tbMetaNum; ++i) { SName* name = taosArrayGet(pReq->pTableMeta, i); - CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name)); + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_META, name, NULL)); } for (int32_t i = 0; i < tbHashNum; ++i) { SName* name = taosArrayGet(pReq->pTableHash, i); - CTG_ERR_JRET(ctgInitGetTbHashTask(pJob, taskIdx++, name)); + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_HASH, name, NULL)); } for (int32_t i = 0; i < tbIndexNum; ++i) { SName* name = taosArrayGet(pReq->pTableIndex, i); - CTG_ERR_JRET(ctgInitGetTbIndexTask(pJob, taskIdx++, name)); + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_INDEX, name, NULL)); + } + + for (int32_t i = 0; i < tbCfgNum; ++i) { + SName* name = taosArrayGet(pReq->pTableCfg, i); + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_CFG, name, NULL)); } for (int32_t i = 0; i < indexNum; ++i) { char* indexName = taosArrayGet(pReq->pIndex, i); - CTG_ERR_JRET(ctgInitGetIndexTask(pJob, taskIdx++, indexName)); + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_INDEX, indexName, NULL)); } for (int32_t i = 0; i < udfNum; ++i) { char* udfName = taosArrayGet(pReq->pUdf, i); - CTG_ERR_JRET(ctgInitGetUdfTask(pJob, taskIdx++, udfName)); + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_UDF, udfName, NULL)); } for (int32_t i = 0; i < userNum; ++i) { SUserAuthInfo* user = taosArrayGet(pReq->pUser, i); - CTG_ERR_JRET(ctgInitGetUserTask(pJob, taskIdx++, user)); + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_USER, user, NULL)); } if (qnodeNum) { - CTG_ERR_JRET(ctgInitGetQnodeTask(pJob, taskIdx++)); + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_QNODE, NULL, NULL)); } pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob); @@ -528,6 +586,21 @@ int32_t ctgDumpTbIndexRes(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } +int32_t ctgDumpTbCfgRes(SCtgTask* pTask) { + SCtgJob* pJob = pTask->pJob; + if (NULL == pJob->jobRes.pTableCfg) { + pJob->jobRes.pTableCfg = taosArrayInit(pJob->tbCfgNum, sizeof(SMetaRes)); + if (NULL == pJob->jobRes.pTableCfg) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + } + + SMetaRes res = {.code = pTask->code, .pRes = pTask->res}; + taosArrayPush(pJob->jobRes.pTableCfg, &res); + + return TSDB_CODE_SUCCESS; +} + int32_t ctgDumpIndexRes(SCtgTask* pTask) { SCtgJob* pJob = pTask->pJob; if (NULL == pJob->jobRes.pIndex) { @@ -618,13 +691,48 @@ int32_t ctgDumpUserRes(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } +int32_t ctgInvokeSubCb(SCtgTask *pTask) { + int32_t code = 0; + + CTG_LOCK(CTG_WRITE, &pTask->lock); + + int32_t parentNum = taosArrayGetSize(pTask->pParents); + for (int32_t i = 0; i < parentNum; ++i) { + SCtgTask* pParent = taosArrayGetP(pTask->pParents, i); + + pParent->subRes.code = pTask->code; + if (TSDB_CODE_SUCCESS == pTask->code) { + code = (*gCtgAsyncFps[pTask->type].cloneFp)(pTask, &pParent->subRes.res); + if (code) { + pParent->subRes.code = code; + } + } + + CTG_ERR_JRET(pParent->subRes.fp(pParent)); + } + +_return: + + CTG_UNLOCK(CTG_WRITE, &pTask->lock); + + CTG_RET(code); +} + + int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) { SCtgJob* pJob = pTask->pJob; int32_t code = 0; + if (CTG_TASK_DONE == pTask->status) { + return TSDB_CODE_SUCCESS; + } + qDebug("QID:0x%" PRIx64 " task %d end with res %s", pJob->queryId, pTask->taskId, tstrerror(rspCode)); pTask->code = rspCode; + pTask->status = CTG_TASK_DONE; + + ctgInvokeSubCb(pTask); int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1); if (taskDone < taosArrayGetSize(pJob->pTasks)) { @@ -802,11 +910,12 @@ int32_t ctgHandleGetDbVgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pM switch (reqType) { case TDMT_MND_USE_DB: { SUseDbOutput* pOut = (SUseDbOutput*)pTask->msgCtx.out; - - CTG_ERR_JRET(ctgGenerateVgList(pCtg, pOut->dbVgroup->vgHash, (SArray**)&pTask->res)); + SDBVgInfo* pDb = NULL; - CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, ctx->dbFName, pOut->dbId, pOut->dbVgroup, false)); - pOut->dbVgroup = NULL; + CTG_ERR_JRET(ctgGenerateVgList(pCtg, pOut->dbVgroup->vgHash, (SArray**)&pTask->res)); + + CTG_ERR_JRET(cloneDbVgInfo(pOut->dbVgroup, &pDb)); + CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, ctx->dbFName, pOut->dbId, pDb, false)); break; } @@ -874,6 +983,7 @@ int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pTask->pJob->pCtg, (STableIndex**)&pTask->msgCtx.out, false)); _return: + if (TSDB_CODE_MND_DB_INDEX_NOT_EXIST == code) { code = TSDB_CODE_SUCCESS; } @@ -882,6 +992,18 @@ _return: CTG_RET(code); } +int32_t ctgHandleGetTbCfgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { + int32_t code = 0; + CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); + + TSWAP(pTask->res, pTask->msgCtx.out); + +_return: + + ctgHandleTaskEnd(pTask, code); + + CTG_RET(code); +} int32_t ctgHandleGetDbCfgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; @@ -1138,6 +1260,48 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } +int32_t ctgLaunchGetTbCfgTask(SCtgTask *pTask) { + int32_t code = 0; + SCatalog* pCtg = pTask->pJob->pCtg; + SRequestConnInfo* pConn = &pTask->pJob->conn; + SCtgTbCfgCtx* pCtx = (SCtgTbCfgCtx*)pTask->taskCtx; + SArray* pRes = NULL; + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(pCtx->pName, dbFName); + + if (pCtx->tbType <= 0) { + CTG_ERR_JRET(ctgReadTbTypeFromCache(pCtg, dbFName, pCtx->pName->tname, &pCtx->tbType)); + if (pCtx->tbType <= 0) { + CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_TB_META, ctgGetTbCfgCb, pCtx->pName)); + return TSDB_CODE_SUCCESS; + } + } + + if (TSDB_SUPER_TABLE == pCtx->tbType) { + CTG_ERR_JRET(ctgGetTableCfgFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask)); + } else { + if (NULL == pCtx->pVgInfo) { + CTG_ERR_JRET(ctgGetTbHashVgroupFromCache(pCtg, pCtx->pName, &pCtx->pVgInfo)); + if (NULL == pCtx->pVgInfo) { + CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_DB_VGROUP, ctgGetTbCfgCb, dbFName)); + return TSDB_CODE_SUCCESS; + } + } + + CTG_ERR_JRET(ctgGetTableCfgFromVnode(pCtg, pConn, pCtx->pName, pCtx->pVgInfo, NULL, pTask)); + } + + return TSDB_CODE_SUCCESS; + +_return: + + if (CTG_TASK_LAUNCHED == pTask->status) { + ctgHandleTaskEnd(pTask, code); + } + + CTG_RET(code); +} + int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; @@ -1244,17 +1408,70 @@ int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } +int32_t ctgGetTbCfgCb(SCtgTask *pTask) { + int32_t code = 0; + + CTG_ERR_JRET(pTask->subRes.code); + + SCtgTbCfgCtx* pCtx = (SCtgTbCfgCtx*)pTask->taskCtx; + if (CTG_TASK_GET_TB_META == pTask->subRes.type) { + pCtx->tbType = ((STableMeta*)pTask->subRes.res)->tableType; + } else if (CTG_TASK_GET_DB_VGROUP == pTask->subRes.type) { + SDBVgInfo* pDb = (SDBVgInfo*)pTask->subRes.res; + + pCtx->pVgInfo = taosMemoryCalloc(1, sizeof(SVgroupInfo)); + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pTask->pJob->pCtg, pDb, pCtx->pName, pCtx->pVgInfo)); + } + + CTG_RET(ctgLaunchGetTbCfgTask(pTask)); + +_return: + + CTG_RET(ctgHandleTaskEnd(pTask, pTask->subRes.code)); +} + +int32_t ctgCompDbVgTasks(SCtgTask* pTask, void* param, bool* equal) { + SCtgDbVgCtx* ctx = pTask->taskCtx; + + *equal = (0 == strcmp(ctx->dbFName, param)); + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgCompTbMetaTasks(SCtgTask* pTask, void* param, bool* equal) { + SCtgTbMetaCtx* ctx = pTask->taskCtx; + + *equal = tNameTbNameEqual(ctx->pName, (SName*)param); + + return TSDB_CODE_SUCCESS; +} + +int32_t ctgCloneTbMeta(SCtgTask* pTask, void** pRes) { + STableMeta* pMeta = (STableMeta*)pTask->res; + + CTG_RET(cloneTableMeta(pMeta, (STableMeta**)pRes)); +} + +int32_t ctgCloneDbVg(SCtgTask* pTask, void** pRes) { + SUseDbOutput* pOut = (SUseDbOutput*)pTask->msgCtx.out; + + CTG_RET(cloneDbVgInfo(pOut->dbVgroup, (SDBVgInfo**)pRes)); +} + + SCtgAsyncFps gCtgAsyncFps[] = { - {ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes}, - {ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes}, - {ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes}, - {ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes}, - {ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes}, - {ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes}, - {ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes}, - {ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes}, - {ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes}, - {ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes}, + {ctgInitGetQnodeTask, ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes, NULL, NULL}, + {ctgInitGetDbVgTask, ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes, ctgCompDbVgTasks, ctgCloneDbVg}, + {ctgInitGetDbCfgTask, ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes, NULL, NULL}, + {ctgInitGetDbInfoTask, ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes, NULL, NULL}, + {ctgInitGetTbMetaTask, ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes, ctgCompTbMetaTasks, ctgCloneTbMeta}, + {ctgInitGetTbHashTask, ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes, NULL, NULL}, + {ctgInitGetTbIndexTask, ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes, NULL, NULL}, + {ctgInitGetTbCfgTask, ctgLaunchGetTbCfgTask, ctgHandleGetTbCfgRsp, ctgDumpTbCfgRes, NULL, NULL}, + {ctgInitGetIndexTask, ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes, NULL, NULL}, + {ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL}, + {ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL}, }; int32_t ctgMakeAsyncRes(SCtgJob *pJob) { @@ -1269,6 +1486,86 @@ int32_t ctgMakeAsyncRes(SCtgJob *pJob) { return TSDB_CODE_SUCCESS; } +int32_t ctgSearchExistingTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t* taskId) { + bool equal = false; + SCtgTask* pTask = NULL; + int32_t code = 0; + + CTG_LOCK(CTG_READ, &pJob->taskLock); + + int32_t taskNum = taosArrayGetSize(pJob->pTasks); + for (int32_t i = 0; i < taskNum; ++i) { + pTask = taosArrayGet(pJob->pTasks, i); + if (type != pTask->type) { + continue; + } + + CTG_ERR_JRET((*gCtgAsyncFps[type].compFp)(pTask, param, &equal)); + if (equal) { + break; + } + } + +_return: + + CTG_UNLOCK(CTG_READ, &pJob->taskLock); + if (equal) { + *taskId = pTask->taskId; + } + + CTG_RET(code); +} + +int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) { + int32_t code = 0; + + CTG_LOCK(CTG_WRITE, &pSub->lock); + if (CTG_TASK_DONE == pSub->status) { + pTask->subRes.code = pSub->code; + CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res)); + CTG_ERR_JRET(pTask->subRes.fp(pTask)); + } else { + if (NULL == pSub->pParents) { + pSub->pParents = taosArrayInit(4, POINTER_BYTES); + } + + taosArrayPush(pSub->pParents, &pTask); + } + +_return: + + CTG_UNLOCK(CTG_WRITE, &pSub->lock); + + CTG_RET(code); +} + + +int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param) { + SCtgJob* pJob = pTask->pJob; + int32_t subTaskId = -1; + bool newTask = false; + + ctgClearSubTaskRes(&pTask->subRes); + pTask->subRes.type = type; + pTask->subRes.fp = fp; + + CTG_ERR_RET(ctgSearchExistingTask(pJob, type, param, &subTaskId)); + if (subTaskId < 0) { + CTG_ERR_RET(ctgInitTask(pJob, type, param, &subTaskId)); + newTask = true; + } + + SCtgTask* pSub = taosArrayGet(pJob->pTasks, subTaskId); + + CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask)); + + if (newTask) { + CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub)); + pSub->status = CTG_TASK_LAUNCHED; + } + + return TSDB_CODE_SUCCESS; +} int32_t ctgLaunchJob(SCtgJob *pJob) { int32_t taskNum = taosArrayGetSize(pJob->pTasks); @@ -1278,6 +1575,7 @@ int32_t ctgLaunchJob(SCtgJob *pJob) { qDebug("QID:0x%" PRIx64 " ctg start to launch task %d", pJob->queryId, pTask->taskId); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); + pTask->status = CTG_TASK_LAUNCHED; } return TSDB_CODE_SUCCESS; diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 4fa1c1a197..eeb627624b 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -2061,4 +2061,92 @@ int32_t ctgStartUpdateThread() { } +int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) { + if (CTG_IS_SYS_DBNAME(ctx->pName->dbname)) { + CTG_FLAG_SET_SYS_DB(ctx->flag); + } + + CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, ctx, pTableMeta)); + + if (*pTableMeta) { + if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) && + ((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) { + return TSDB_CODE_SUCCESS; + } + + taosMemoryFreeClear(*pTableMeta); + } + + if (CTG_FLAG_IS_UNKNOWN_STB(ctx->flag)) { + CTG_FLAG_SET_STB(ctx->flag, ctx->tbInfo.tbType); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) { + int32_t code = 0; + STableMeta* tblMeta = NULL; + SCtgTbMetaCtx tbCtx = {0}; + tbCtx.flag = CTG_FLAG_UNKNOWN_STB; + tbCtx.pName = pTableName; + + CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &tbCtx, &tblMeta)); + + if (NULL == tblMeta) { + ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname); + return TSDB_CODE_SUCCESS; + } + + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(pTableName, dbFName); + + if (TSDB_SUPER_TABLE == tblMeta->tableType) { + CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq)); + } else { + CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq)); + } + +_return: + + taosMemoryFreeClear(tblMeta); + + CTG_RET(code); +} + +int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVgroupInfo **pVgroup) { + if (CTG_IS_SYS_DBNAME(pTableName->dbname)) { + ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname); + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + + SCtgDBCache* dbCache = NULL; + int32_t code = 0; + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(pTableName, dbFName); + + CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); + + if (NULL == dbCache) { + *pVgroup = NULL; + return TSDB_CODE_SUCCESS; + } + + *pVgroup = taosMemoryCalloc(1, sizeof(SVgroupInfo)); + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, *pVgroup)); + +_return: + + if (dbCache) { + ctgReleaseVgInfoToCache(pCtg, dbCache); + } + + if (code) { + taosMemoryFreeClear(*pVgroup); + } + + CTG_RET(code); +} + diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 476eb371b0..4012f6d158 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -33,6 +33,10 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) { return "[get table meta]"; case CTG_TASK_GET_TB_HASH: return "[get table hash]"; + case CTG_TASK_GET_TB_INDEX: + return "[get table index]"; + case CTG_TASK_GET_TB_CFG: + return "[get table cfg]"; case CTG_TASK_GET_INDEX: return "[get index]"; case CTG_TASK_GET_UDF: @@ -96,6 +100,9 @@ void ctgFreeSMetaData(SMetaData* pData) { taosArrayDestroy(pData->pQnodeList); pData->pQnodeList = NULL; + + taosArrayDestroy(pData->pTableCfg); + pData->pTableCfg = NULL; } void ctgFreeSCtgUserAuth(SCtgUserAuth *userCache) { @@ -280,6 +287,13 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) { } break; } + case TDMT_VND_TABLE_CFG: + case TDMT_MND_TABLE_CFG: { + STableCfgRsp* pOut = (STableCfgRsp*)pCtx->out; + tFreeSTableCfgRsp(pOut); + taosMemoryFreeClear(pCtx->out); + break; + } case TDMT_MND_RETRIEVE_FUNC: { SFuncInfo* pOut = (SFuncInfo*)pCtx->out; taosMemoryFree(pOut->pCode); @@ -328,14 +342,135 @@ void ctgResetTbMetaTask(SCtgTask* pTask) { taosMemoryFreeClear(pTask->res); } -void ctgFreeTask(SCtgTask* pTask) { - ctgFreeMsgCtx(&pTask->msgCtx); - +void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) { + switch (type) { + case CTG_TASK_GET_QNODE: { + taosArrayDestroy((SArray*)*pRes); + *pRes = NULL; + break; + } + case CTG_TASK_GET_TB_META: { + taosMemoryFreeClear(*pRes); + break; + } + case CTG_TASK_GET_DB_VGROUP: { + taosArrayDestroy((SArray*)*pRes); + *pRes = NULL; + break; + } + case CTG_TASK_GET_DB_CFG: { + if (*pRes) { + SDbCfgInfo* pInfo = (SDbCfgInfo*)*pRes; + taosArrayDestroy(pInfo->pRetensions); + taosMemoryFreeClear(*pRes); + } + break; + } + case CTG_TASK_GET_DB_INFO: { + taosMemoryFreeClear(*pRes); + break; + } + case CTG_TASK_GET_TB_HASH: { + taosMemoryFreeClear(*pRes); + break; + } + case CTG_TASK_GET_TB_INDEX: { + taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo); + *pRes = NULL; + break; + } + case CTG_TASK_GET_INDEX: { + taosMemoryFreeClear(*pRes); + break; + } + case CTG_TASK_GET_UDF: { + taosMemoryFreeClear(*pRes); + break; + } + case CTG_TASK_GET_USER: { + taosMemoryFreeClear(*pRes); + break; + } + default: + qError("invalid task type %d", type); + break; + } +} + + +void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) { + switch (type) { + case CTG_TASK_GET_QNODE: { + taosArrayDestroy((SArray*)*pRes); + *pRes = NULL; + break; + } + case CTG_TASK_GET_TB_META: { + taosMemoryFreeClear(*pRes); + break; + } + case CTG_TASK_GET_DB_VGROUP: { + if (*pRes) { + SDBVgInfo* pInfo = (SDBVgInfo*)*pRes; + taosHashCleanup(pInfo->vgHash); + taosMemoryFreeClear(*pRes); + } + break; + } + case CTG_TASK_GET_DB_CFG: { + if (*pRes) { + SDbCfgInfo* pInfo = (SDbCfgInfo*)*pRes; + taosArrayDestroy(pInfo->pRetensions); + taosMemoryFreeClear(*pRes); + } + break; + } + case CTG_TASK_GET_DB_INFO: { + taosMemoryFreeClear(*pRes); + break; + } + case CTG_TASK_GET_TB_HASH: { + taosMemoryFreeClear(*pRes); + break; + } + case CTG_TASK_GET_TB_INDEX: { + taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo); + *pRes = NULL; + break; + } + case CTG_TASK_GET_INDEX: { + taosMemoryFreeClear(*pRes); + break; + } + case CTG_TASK_GET_UDF: { + taosMemoryFreeClear(*pRes); + break; + } + case CTG_TASK_GET_USER: { + taosMemoryFreeClear(*pRes); + break; + } + default: + qError("invalid task type %d", type); + break; + } +} + + +void ctgClearSubTaskRes(SCtgSubRes *pRes) { + pRes->code = 0; + + if (NULL == pRes->res) { + return; + } + + ctgFreeSubTaskRes(pRes->type, &pRes->res); +} + +void ctgFreeTaskCtx(SCtgTask* pTask) { switch (pTask->type) { case CTG_TASK_GET_QNODE: { - taosArrayDestroy((SArray*)pTask->res); taosMemoryFreeClear(pTask->taskCtx); - pTask->res = NULL; break; } case CTG_TASK_GET_TB_META: { @@ -346,56 +481,49 @@ void ctgFreeTask(SCtgTask* pTask) { pTask->msgCtx.lastOut = NULL; } taosMemoryFreeClear(pTask->taskCtx); - taosMemoryFreeClear(pTask->res); break; } case CTG_TASK_GET_DB_VGROUP: { - taosArrayDestroy((SArray*)pTask->res); taosMemoryFreeClear(pTask->taskCtx); - pTask->res = NULL; break; } case CTG_TASK_GET_DB_CFG: { taosMemoryFreeClear(pTask->taskCtx); - if (pTask->res) { - SDbCfgInfo* pInfo = (SDbCfgInfo*)pTask->res; - taosArrayDestroy(pInfo->pRetensions); - taosMemoryFreeClear(pTask->res); - } break; } case CTG_TASK_GET_DB_INFO: { taosMemoryFreeClear(pTask->taskCtx); - taosMemoryFreeClear(pTask->res); break; } case CTG_TASK_GET_TB_HASH: { SCtgTbHashCtx* taskCtx = (SCtgTbHashCtx*)pTask->taskCtx; taosMemoryFreeClear(taskCtx->pName); taosMemoryFreeClear(pTask->taskCtx); - taosMemoryFreeClear(pTask->res); break; } case CTG_TASK_GET_TB_INDEX: { SCtgTbIndexCtx* taskCtx = (SCtgTbIndexCtx*)pTask->taskCtx; taosMemoryFreeClear(taskCtx->pName); taosMemoryFreeClear(pTask->taskCtx); - taosArrayDestroyEx(pTask->res, tFreeSTableIndexInfo); + break; + } + case CTG_TASK_GET_TB_CFG: { + SCtgTbCfgCtx* taskCtx = (SCtgTbCfgCtx*)pTask->taskCtx; + taosMemoryFreeClear(taskCtx->pName); + taosMemoryFreeClear(taskCtx->pVgInfo); + taosMemoryFreeClear(pTask->taskCtx); break; } case CTG_TASK_GET_INDEX: { taosMemoryFreeClear(pTask->taskCtx); - taosMemoryFreeClear(pTask->res); break; } case CTG_TASK_GET_UDF: { taosMemoryFreeClear(pTask->taskCtx); - taosMemoryFreeClear(pTask->res); break; } case CTG_TASK_GET_USER: { taosMemoryFreeClear(pTask->taskCtx); - taosMemoryFreeClear(pTask->res); break; } default: @@ -404,6 +532,16 @@ void ctgFreeTask(SCtgTask* pTask) { } } + +void ctgFreeTask(SCtgTask* pTask) { + ctgFreeMsgCtx(&pTask->msgCtx); + ctgFreeTaskRes(pTask->type, &pTask->res); + ctgFreeTaskCtx(pTask); + + taosArrayDestroy(pTask->pParents); + ctgClearSubTaskRes(&pTask->subRes); +} + void ctgFreeTasks(SArray* pArray) { if (NULL == pArray) { return; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index c8b78fcc8b..bea91af38a 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1081,15 +1081,6 @@ end: return code; } -static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { - *pDst = taosMemoryMalloc(TABLE_META_SIZE(pSrc)); - if (NULL == *pDst) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - memcpy(*pDst, pSrc, TABLE_META_SIZE(pSrc)); - return TSDB_CODE_SUCCESS; -} - static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName, int32_t len, STableMeta* pMeta) { SVgroupInfo vg; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index d102c9275a..c807c1c4cd 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -373,3 +373,57 @@ end: } +int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { + if (NULL == pSrc) { + *pDst = NULL; + return TSDB_CODE_SUCCESS; + } + + int32_t metaSize = (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema); + *pDst = taosMemoryMalloc(metaSize); + if (NULL == *pDst) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + memcpy(*pDst, pSrc, metaSize); + return TSDB_CODE_SUCCESS; +} + +int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) { + if (NULL == pSrc) { + *pDst = NULL; + return TSDB_CODE_SUCCESS; + } + + *pDst = taosMemoryMalloc(sizeof(*pSrc)); + if (NULL == *pDst) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + memcpy(*pDst, pSrc, sizeof(*pSrc)); + if (pSrc->vgHash) { + (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (NULL == (*pDst)->vgHash) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + SVgroupInfo* vgInfo = NULL; + void *pIter = taosHashIterate(pSrc->vgHash, NULL); + while (pIter) { + vgInfo = pIter; + int32_t* vgId = taosHashGetKey(pIter, NULL); + + if (0 != taosHashPut((*pDst)->vgHash, vgId, sizeof(*vgId), vgInfo, sizeof(*vgInfo))) { + qError("taosHashPut failed, vgId:%d", vgInfo->vgId); + taosHashCancelIterate(pSrc->vgHash, pIter); + taosHashCleanup((*pDst)->vgHash); + taosMemoryFreeClear(*pDst); + return TSDB_CODE_CTG_MEM_ERROR; + } + + pIter = taosHashIterate(pSrc->vgHash, pIter); + } + } + + return TSDB_CODE_SUCCESS; +} + +