From f9d6551de6025380739fe1d986dabd3387e0e918 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 15 Sep 2022 20:51:14 +0800 Subject: [PATCH 1/3] fix(tsc): avoid mem leak --- source/client/src/clientMsgHandler.c | 14 +- source/client/src/clientTmq.c | 4 + source/libs/catalog/src/ctgAsync.c | 621 +++++++++++++------------- source/libs/catalog/src/ctgRemote.c | 86 ++-- source/libs/scheduler/src/schRemote.c | 3 +- 5 files changed, 371 insertions(+), 357 deletions(-) diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index a7a16d484c..ebccc5ea1a 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -34,6 +34,7 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) { removeMeta(pRequest->pTscObj, pRequest->targetTableList); } + taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pData); if (pRequest->body.queryFp != NULL) { pRequest->body.queryFp(pRequest->body.param, pRequest, code); @@ -62,6 +63,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { if (delta > timestampDeltaLimit) { code = TSDB_CODE_TIME_UNSYNCED; tscError("time diff:%ds is too big", delta); + taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pData); setErrno(pRequest, code); tsem_post(&pRequest->body.rspSem); @@ -114,6 +116,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { pTscObj->pAppInfo->numOfConns); taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); tsem_post(&pRequest->body.rspSem); return 0; } @@ -137,6 +140,7 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) { // todo rsp with the vnode id list SRequestObj* pRequest = param; taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); if (code != TSDB_CODE_SUCCESS) { setErrno(pRequest, code); } @@ -173,6 +177,7 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); setErrno(pRequest, code); if (pRequest->body.queryFp != NULL) { @@ -220,6 +225,7 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { setConnectionDB(pRequest->pTscObj, db); taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); if (pRequest->body.queryFp != NULL) { pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code); @@ -237,7 +243,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) { setErrno(pRequest, code); } else { SMCreateStbRsp createRsp = {0}; - SDecoder coder = {0}; + SDecoder coder = {0}; tDecoderInit(&coder, pMsg->pData, pMsg->len); tDecodeSMCreateStbRsp(&coder, &createRsp); tDecoderClear(&coder); @@ -246,6 +252,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) { pRequest->body.resInfo.execRes.res = createRsp.pMeta; } + taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pData); if (pRequest->body.queryFp != NULL) { @@ -262,7 +269,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) { code = ret; } } - + pRequest->body.queryFp(pRequest->body.param, pRequest, code); } else { tsem_post(&pRequest->body.rspSem); @@ -284,6 +291,7 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) { } taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); if (pRequest->body.queryFp != NULL) { pRequest->body.queryFp(pRequest->body.param, pRequest, code); @@ -309,6 +317,7 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) { } taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); if (pRequest->body.queryFp != NULL) { SExecResult* pRes = &pRequest->body.resInfo.execRes; @@ -420,6 +429,7 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) { } taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); if (pRequest->body.queryFp != NULL) { pRequest->body.queryFp(pRequest->body.param, pRequest, code); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 9f9a14952e..f6874c6101 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1077,6 +1077,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tsem_destroy(&pParam->rspSem); taosMemoryFree(pParam); taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; return -1; } @@ -1115,6 +1116,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tmqEpoch); tsem_post(&tmq->rspSem); taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); return 0; } @@ -1128,6 +1130,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); if (pRspWrapper == NULL) { taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch); goto CREATE_MSG_FAIL; } @@ -1164,6 +1167,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); taosWriteQitem(tmq->mqueue, pRspWrapper); tsem_post(&tmq->rspSem); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 585b33930c..3c4cc9f7a2 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -13,15 +13,15 @@ * along with this program. If not, see . */ -#include "trpc.h" -#include "query.h" -#include "tname.h" #include "catalogInt.h" +#include "query.h" #include "systable.h" +#include "tname.h" #include "tref.h" +#include "trpc.h" -int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, void* param) { - SName *name = (SName*)param; +int32_t ctgInitGetTbMetaTask(SCtgJob* pJob, int32_t taskIdx, void* param) { + SName* name = (SName*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_TB_META; @@ -45,13 +45,14 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, + ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetTbMetasTask(SCtgJob *pJob, int32_t taskIdx, void* param) { - SName *name = (SName*)param; +int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) { + SName* name = (SName*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_TB_META_BATCH; @@ -69,14 +70,14 @@ int32_t ctgInitGetTbMetasTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", - pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx, + ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { - char *dbFName = (char*)param; +int32_t ctgInitGetDbVgTask(SCtgJob* pJob, int32_t taskIdx, void* param) { + char* dbFName = (char*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_DB_VGROUP; @@ -94,13 +95,14 @@ int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, + ctgTaskTypeStr(task.type), dbFName); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { - char *dbFName = (char*)param; +int32_t ctgInitGetDbCfgTask(SCtgJob* pJob, int32_t taskIdx, void* param) { + char* dbFName = (char*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_DB_CFG; @@ -118,13 +120,14 @@ int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, + ctgTaskTypeStr(task.type), dbFName); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, void* param) { - char *dbFName = (char*)param; +int32_t ctgInitGetDbInfoTask(SCtgJob* pJob, int32_t taskIdx, void* param) { + char* dbFName = (char*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_DB_INFO; @@ -142,14 +145,14 @@ int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, + ctgTaskTypeStr(task.type), dbFName); return TSDB_CODE_SUCCESS; } - -int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, void* param) { - SName *name = (SName*)param; +int32_t ctgInitGetTbHashTask(SCtgJob* pJob, int32_t taskIdx, void* param) { + SName* name = (SName*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_TB_HASH; @@ -173,13 +176,14 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tableName:%s", pJob->queryId, taskIdx, + ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetTbHashsTask(SCtgJob *pJob, int32_t taskIdx, void* param) { - SName *name = (SName*)param; +int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) { + SName* name = (SName*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_TB_HASH_BATCH; @@ -197,14 +201,13 @@ int32_t ctgInitGetTbHashsTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", - pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx, + ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum); return TSDB_CODE_SUCCESS; } - -int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) { +int32_t ctgInitGetQnodeTask(SCtgJob* pJob, int32_t taskIdx, void* param) { SCtgTask task = {0}; task.type = CTG_TASK_GET_QNODE; @@ -219,7 +222,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) { return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetDnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) { +int32_t ctgInitGetDnodeTask(SCtgJob* pJob, int32_t taskIdx, void* param) { SCtgTask task = {0}; task.type = CTG_TASK_GET_DNODE; @@ -234,8 +237,8 @@ int32_t ctgInitGetDnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) { return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { - char *name = (char*)param; +int32_t ctgInitGetIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) { + char* name = (char*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_INDEX; @@ -253,13 +256,14 @@ int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, + ctgTaskTypeStr(task.type), name); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, void* param) { - char *name = (char*)param; +int32_t ctgInitGetUdfTask(SCtgJob* pJob, int32_t taskIdx, void* param) { + char* name = (char*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_UDF; @@ -277,14 +281,15 @@ int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, udfName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, udfName:%s", pJob->queryId, taskIdx, + ctgTaskTypeStr(task.type), name); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, void* param) { - SUserAuthInfo *user = (SUserAuthInfo*)param; - SCtgTask task = {0}; +int32_t ctgInitGetUserTask(SCtgJob* pJob, int32_t taskIdx, void* param) { + SUserAuthInfo* user = (SUserAuthInfo*)param; + SCtgTask task = {0}; task.type = CTG_TASK_GET_USER; task.taskId = taskIdx; @@ -301,12 +306,13 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, user:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), user->user); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, user:%s", pJob->queryId, taskIdx, + ctgTaskTypeStr(task.type), user->user); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetSvrVerTask(SCtgJob *pJob, int32_t taskIdx, void* param) { +int32_t ctgInitGetSvrVerTask(SCtgJob* pJob, int32_t taskIdx, void* param) { SCtgTask task = {0}; task.type = CTG_TASK_GET_SVR_VER; @@ -320,8 +326,8 @@ int32_t ctgInitGetSvrVerTask(SCtgJob *pJob, int32_t taskIdx, void* param) { return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { - SName *name = (SName*)param; +int32_t ctgInitGetTbIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) { + SName* name = (SName*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_TB_INDEX; @@ -344,13 +350,14 @@ int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, + ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; } -int32_t ctgInitGetTbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { - SName *name = (SName*)param; +int32_t ctgInitGetTbCfgTask(SCtgJob* pJob, int32_t taskIdx, void* param) { + SName* name = (SName*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_TB_CFG; @@ -373,13 +380,13 @@ int32_t ctgInitGetTbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, + ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; } - -int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, const SCatalogReq* pReq) { +int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, const SCatalogReq* pReq) { SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); SHashObj* pTb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (NULL == pDb || NULL == pTb) { @@ -427,7 +434,7 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con for (int32_t i = 0; i < pJob->tbCfgNum; ++i) { SName* name = taosArrayGet(pReq->pTableCfg, i); - char dbFName[TSDB_DB_FNAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(name, dbFName); taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN); } @@ -455,7 +462,6 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con taosHashCleanup(pTb); - for (int32_t i = 0; i < pJob->tbIndexNum; ++i) { SName* name = taosArrayGet(pReq->pTableIndex, i); ctgDropTbIndexEnqueue(pCtg, name, true); @@ -464,7 +470,7 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con return TSDB_CODE_SUCCESS; } -int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *taskId) { +int32_t ctgInitTask(SCtgJob* pJob, CTG_TASK_TYPE type, void* param, int32_t* taskId) { int32_t tid = atomic_fetch_add_32(&pJob->taskIdx, 1); CTG_LOCK(CTG_WRITE, &pJob->taskLock); @@ -478,7 +484,8 @@ int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *tas return TSDB_CODE_SUCCESS; } -int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param) { +int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, + void* param) { int32_t code = 0; int32_t tbMetaNum = (int32_t)ctgGetTablesReqNum(pReq->pTableMeta); int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup); @@ -494,7 +501,8 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex); int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg); - int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum; + int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + + userNum + dbInfoNum + tbIndexNum + tbCfgNum; *job = taosMemoryCalloc(1, sizeof(SCtgJob)); if (NULL == *job) { @@ -502,13 +510,13 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - SCtgJob *pJob = *job; + SCtgJob* pJob = *job; pJob->subTaskNum = taskNum; pJob->queryId = pConn->requestId; pJob->userFp = fp; - pJob->pCtg = pCtg; - pJob->conn = *pConn; + pJob->pCtg = pCtg; + pJob->conn = *pConn; pJob->userParam = param; pJob->tbMetaNum = tbMetaNum; @@ -526,7 +534,8 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const pJob->svrVerNum = svrVerNum; #if CTG_BATCH_FETCH - pJob->pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + pJob->pBatchs = + taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (NULL == pJob->pBatchs) { ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); @@ -625,10 +634,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const taosAcquireRef(gCtgMgmt.jobPool, pJob->refId); - qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId, taskNum, pReq->forceUpdate); + qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId, + taskNum, pReq->forceUpdate); return TSDB_CODE_SUCCESS; - _return: ctgFreeJob(*job); @@ -658,7 +667,6 @@ int32_t ctgDumpTbMetasRes(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } - int32_t ctgDumpDbVgRes(SCtgTask* pTask) { SCtgJob* pJob = pTask->pJob; if (NULL == pJob->jobRes.pDbVgroup) { @@ -772,7 +780,6 @@ int32_t ctgDumpDnodeRes(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } - int32_t ctgDumpDbCfgRes(SCtgTask* pTask) { SCtgJob* pJob = pTask->pJob; if (NULL == pJob->jobRes.pDbCfg) { @@ -848,15 +855,15 @@ int32_t ctgDumpSvrVer(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t ctgCallSubCb(SCtgTask *pTask) { +int32_t ctgCallSubCb(SCtgTask* pTask) { int32_t code = 0; CTG_LOCK(CTG_WRITE, &pTask->lock); int32_t parentNum = taosArrayGetSize(pTask->pParents); for (int32_t i = 0; i < parentNum; ++i) { - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); - SCtgTask* pParent = taosArrayGetP(pTask->pParents, i); + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgTask* pParent = taosArrayGetP(pTask->pParents, i); pParent->subRes.code = pTask->code; if (TSDB_CODE_SUCCESS == pTask->code) { @@ -866,7 +873,7 @@ int32_t ctgCallSubCb(SCtgTask *pTask) { } } - SCtgMsgCtx *pParMsgCtx = CTG_GET_TASK_MSGCTX(pParent, -1); + SCtgMsgCtx* pParMsgCtx = CTG_GET_TASK_MSGCTX(pParent, -1); pParMsgCtx->pBatchs = pMsgCtx->pBatchs; CTG_ERR_JRET(pParent->subRes.fp(pParent)); @@ -895,7 +902,7 @@ int32_t ctgCallUserCb(void* param) { int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) { SCtgJob* pJob = pTask->pJob; - int32_t code = 0; + int32_t code = 0; if (CTG_TASK_DONE == pTask->status) { return TSDB_CODE_SUCCESS; @@ -910,7 +917,8 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) { int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1); if (taskDone < taosArrayGetSize(pJob->pTasks)) { - qDebug("QID:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone, (int32_t)taosArrayGetSize(pJob->pTasks)); + qDebug("QID:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone, + (int32_t)taosArrayGetSize(pJob->pTasks)); return TSDB_CODE_SUCCESS; } @@ -920,25 +928,25 @@ _return: pJob->jobResCode = code; - //taosSsleep(2); - //qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId); + // taosSsleep(2); + // qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId); taosAsyncExec(ctgCallUserCb, pJob, NULL); CTG_RET(code); } -int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; - SCtgDBCache *dbCache = NULL; - SCtgTask* pTask = tReq->pTask; - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; + SCtgDBCache* dbCache = NULL; + SCtgTask* pTask = tReq->pTask; + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); - SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; - SName* pName = ctx->pName; - int32_t flag = ctx->flag; - int32_t* vgId = &ctx->vgId; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); + SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; + SName* pName = ctx->pName; + int32_t flag = ctx->flag; + int32_t* vgId = &ctx->vgId; CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); @@ -1057,25 +1065,25 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); } -/* - else if (CTG_IS_META_CTABLE(pOut->metaType)) { - SName stbName = *pName; - strcpy(stbName.tname, pOut->tbName); - SCtgTbMetaCtx stbCtx = {0}; - stbCtx.flag = flag; - stbCtx.pName = &stbName; + /* + else if (CTG_IS_META_CTABLE(pOut->metaType)) { + SName stbName = *pName; + strcpy(stbName.tname, pOut->tbName); + SCtgTbMetaCtx stbCtx = {0}; + stbCtx.flag = flag; + stbCtx.pName = &stbName; - CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta)); - if (NULL == pOut->tbMeta) { - ctgDebug("stb no longer exist, stbName:%s", stbName.tname); - CTG_ERR_JRET(ctgRelaunchGetTbMetaTask(pTask)); + CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta)); + if (NULL == pOut->tbMeta) { + ctgDebug("stb no longer exist, stbName:%s", stbName.tname); + CTG_ERR_JRET(ctgRelaunchGetTbMetaTask(pTask)); - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; + } + + memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); } - - memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); - } -*/ + */ TSWAP(pTask->res, pOut->tbMeta); @@ -1092,20 +1100,19 @@ _return: CTG_RET(code); } - -int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; - SCtgDBCache *dbCache = NULL; - SCtgTask* pTask = tReq->pTask; - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; + SCtgDBCache* dbCache = NULL; + SCtgTask* pTask = tReq->pTask; + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); - SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; - SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); - SName* pName = ctgGetFetchName(ctx->pNames, pFetch); - int32_t flag = pFetch->flag; - int32_t* vgId = &pFetch->vgId; - bool taskDone = false; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); + SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; + SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); + SName* pName = ctgGetFetchName(ctx->pNames, pFetch); + int32_t flag = pFetch->flag; + int32_t* vgId = &pFetch->vgId; + bool taskDone = false; CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); @@ -1225,25 +1232,25 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); } -/* - else if (CTG_IS_META_CTABLE(pOut->metaType)) { - SName stbName = *pName; - strcpy(stbName.tname, pOut->tbName); - SCtgTbMetaCtx stbCtx = {0}; - stbCtx.flag = flag; - stbCtx.pName = &stbName; + /* + else if (CTG_IS_META_CTABLE(pOut->metaType)) { + SName stbName = *pName; + strcpy(stbName.tname, pOut->tbName); + SCtgTbMetaCtx stbCtx = {0}; + stbCtx.flag = flag; + stbCtx.pName = &stbName; - CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta)); - if (NULL == pOut->tbMeta) { - ctgDebug("stb no longer exist, stbName:%s", stbName.tname); - CTG_ERR_JRET(ctgRelaunchGetTbMetaTask(pTask)); + CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta)); + if (NULL == pOut->tbMeta) { + ctgDebug("stb no longer exist, stbName:%s", stbName.tname); + CTG_ERR_JRET(ctgRelaunchGetTbMetaTask(pTask)); - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; + } + + memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); } - - memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); - } -*/ + */ SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx); pRes->code = 0; @@ -1277,19 +1284,18 @@ _return: CTG_RET(code); } - -int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; - SCtgTask* pTask = tReq->pTask; +int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; + SCtgTask* pTask = tReq->pTask; SCtgDbVgCtx* ctx = (SCtgDbVgCtx*)pTask->taskCtx; - SCatalog* pCtg = pTask->pJob->pCtg; + SCatalog* pCtg = pTask->pJob->pCtg; CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); switch (reqType) { case TDMT_MND_USE_DB: { SUseDbOutput* pOut = (SUseDbOutput*)pTask->msgCtx.out; - SDBVgInfo* pDb = NULL; + SDBVgInfo* pDb = NULL; CTG_ERR_JRET(ctgGenerateVgList(pCtg, pOut->dbVgroup->vgHash, (SArray**)&pTask->res)); @@ -1304,7 +1310,6 @@ int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf * break; } - _return: ctgHandleTaskEnd(pTask, code); @@ -1312,11 +1317,11 @@ _return: CTG_RET(code); } -int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; - SCtgTask* pTask = tReq->pTask; +int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; + SCtgTask* pTask = tReq->pTask; SCtgTbHashCtx* ctx = (SCtgTbHashCtx*)pTask->taskCtx; - SCatalog* pCtg = pTask->pJob->pCtg; + SCatalog* pCtg = pTask->pJob->pCtg; CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); @@ -1342,7 +1347,6 @@ int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf break; } - _return: ctgHandleTaskEnd(pTask, code); @@ -1350,14 +1354,14 @@ _return: CTG_RET(code); } -int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; - SCtgTask* pTask = tReq->pTask; +int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; + SCtgTask* pTask = tReq->pTask; SCtgTbHashsCtx* ctx = (SCtgTbHashsCtx*)pTask->taskCtx; - SCatalog* pCtg = pTask->pJob->pCtg; - SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); - SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); - bool taskDone = false; + SCatalog* pCtg = pTask->pJob->pCtg; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); + SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); + bool taskDone = false; CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); @@ -1388,9 +1392,9 @@ _return: if (code) { STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx); - int32_t num = taosArrayGetSize(pReq->pTables); + int32_t num = taosArrayGetSize(pReq->pTables); for (int32_t i = 0; i < num; ++i) { - SMetaRes *pRes = taosArrayGet(ctx->pResList, pFetch->resIdx + i); + SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx + i); pRes->code = code; pRes->pRes = NULL; } @@ -1408,14 +1412,13 @@ _return: CTG_RET(code); } - -int32_t ctgHandleGetTbIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; +int32_t ctgHandleGetTbIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; SCtgTask* pTask = tReq->pTask; CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); STableIndex* pOut = (STableIndex*)pTask->msgCtx.out; - SArray* pInfo = NULL; + SArray* pInfo = NULL; CTG_ERR_JRET(ctgCloneTableIndex(pOut->pIndex, &pInfo)); pTask->res = pInfo; @@ -1432,8 +1435,8 @@ _return: CTG_RET(code); } -int32_t ctgHandleGetTbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; +int32_t ctgHandleGetTbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; SCtgTask* pTask = tReq->pTask; CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); @@ -1446,8 +1449,8 @@ _return: CTG_RET(code); } -int32_t ctgHandleGetDbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; +int32_t ctgHandleGetDbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; SCtgTask* pTask = tReq->pTask; CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); @@ -1460,13 +1463,12 @@ _return: CTG_RET(code); } -int32_t ctgHandleGetDbInfoRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { +int32_t ctgHandleGetDbInfoRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { CTG_RET(TSDB_CODE_APP_ERROR); } - -int32_t ctgHandleGetQnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; +int32_t ctgHandleGetQnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; SCtgTask* pTask = tReq->pTask; CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); @@ -1479,8 +1481,8 @@ _return: CTG_RET(code); } -int32_t ctgHandleGetDnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; +int32_t ctgHandleGetDnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; SCtgTask* pTask = tReq->pTask; CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); @@ -1493,8 +1495,8 @@ _return: CTG_RET(code); } -int32_t ctgHandleGetIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; +int32_t ctgHandleGetIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; SCtgTask* pTask = tReq->pTask; CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); @@ -1507,8 +1509,8 @@ _return: CTG_RET(code); } -int32_t ctgHandleGetUdfRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; +int32_t ctgHandleGetUdfRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; SCtgTask* pTask = tReq->pTask; CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); @@ -1521,12 +1523,12 @@ _return: CTG_RET(code); } -int32_t ctgHandleGetUserRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; - SCtgTask* pTask = tReq->pTask; - SCtgUserCtx* ctx = (SCtgUserCtx*)pTask->taskCtx; - SCatalog* pCtg = pTask->pJob->pCtg; - bool pass = false; +int32_t ctgHandleGetUserRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; + SCtgTask* pTask = tReq->pTask; + SCtgUserCtx* ctx = (SCtgUserCtx*)pTask->taskCtx; + SCatalog* pCtg = pTask->pJob->pCtg; + bool pass = false; SGetUserAuthRsp* pOut = (SGetUserAuthRsp*)pTask->msgCtx.out; CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); @@ -1541,9 +1543,11 @@ int32_t ctgHandleGetUserRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf * goto _return; } - if (ctx->user.type == AUTH_TYPE_READ && pOut->readDbs && taosHashGet(pOut->readDbs, ctx->user.dbFName, strlen(ctx->user.dbFName))) { + if (ctx->user.type == AUTH_TYPE_READ && pOut->readDbs && + taosHashGet(pOut->readDbs, ctx->user.dbFName, strlen(ctx->user.dbFName))) { pass = true; - } else if (ctx->user.type == AUTH_TYPE_WRITE && pOut->writeDbs && taosHashGet(pOut->writeDbs, ctx->user.dbFName, strlen(ctx->user.dbFName))) { + } else if (ctx->user.type == AUTH_TYPE_WRITE && pOut->writeDbs && + taosHashGet(pOut->writeDbs, ctx->user.dbFName, strlen(ctx->user.dbFName))) { pass = true; } @@ -1566,8 +1570,8 @@ _return: CTG_RET(code); } -int32_t ctgHandleGetSvrVerRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; +int32_t ctgHandleGetSvrVerRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; SCtgTask* pTask = tReq->pTask; CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); @@ -1581,16 +1585,16 @@ _return: CTG_RET(code); } -int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq *tReq, int32_t flag, SName* pName, int32_t* vgId) { - SCtgTask* pTask = tReq->pTask; - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq* tReq, int32_t flag, SName* pName, int32_t* vgId) { + SCtgTask* pTask = tReq->pTask; + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - int32_t code = 0; + int32_t code = 0; if (CTG_FLAG_IS_SYS_DB(flag)) { ctgDebug("will refresh sys db tbmeta, tbName:%s", tNameGetTableName(pName)); - CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char *)pName->dbname, (char *)pName->tname, NULL, tReq)); + CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char*)pName->dbname, (char*)pName->tname, NULL, tReq)); } if (CTG_FLAG_IS_STB(flag)) { @@ -1600,8 +1604,8 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq *tReq, int32_t flag, SName* pName, int CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq)); } - SCtgDBCache *dbCache = NULL; - char dbFName[TSDB_DB_FNAME_LEN] = {0}; + SCtgDBCache* dbCache = NULL; + char dbFName[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pName, dbFName); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); @@ -1631,11 +1635,11 @@ _return: CTG_RET(code); } -int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) { - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgLaunchGetTbMetaTask(SCtgTask* pTask) { + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgJob* pJob = pTask->pJob; - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); if (NULL == pMsgCtx->pBatchs) { pMsgCtx->pBatchs = pJob->pBatchs; } @@ -1647,7 +1651,7 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) { } SCtgTbMetaCtx* pCtx = (SCtgTbMetaCtx*)pTask->taskCtx; - SCtgTaskReq tReq; + SCtgTaskReq tReq; tReq.pTask = pTask; tReq.msgIdx = -1; CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pCtx->flag, pCtx->pName, &pCtx->vgId)); @@ -1655,11 +1659,11 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) { - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) { + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx; - SCtgJob* pJob = pTask->pJob; + SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx; + SCtgJob* pJob = pTask->pJob; int32_t dbNum = taosArrayGetSize(pCtx->pNames); int32_t fetchIdx = 0; @@ -1683,9 +1687,9 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) { taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum); for (int32_t i = 0; i < pCtx->fetchNum; ++i) { - SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); - SName* pName = ctgGetFetchName(pCtx->pNames, pFetch); - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i); + SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); + SName* pName = ctgGetFetchName(pCtx->pNames, pFetch); + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i); if (NULL == pMsgCtx->pBatchs) { pMsgCtx->pBatchs = pJob->pBatchs; } @@ -1699,14 +1703,14 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) { - int32_t code = 0; - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgLaunchGetDbVgTask(SCtgTask* pTask) { + int32_t code = 0; + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgDBCache *dbCache = NULL; - SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx; - SCtgJob* pJob = pTask->pJob; - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgDBCache* dbCache = NULL; + SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); if (NULL == pMsgCtx->pBatchs) { pMsgCtx->pBatchs = pJob->pBatchs; } @@ -1740,14 +1744,14 @@ _return: CTG_RET(code); } -int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) { - int32_t code = 0; - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgLaunchGetTbHashTask(SCtgTask* pTask) { + int32_t code = 0; + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgDBCache *dbCache = NULL; - SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx; - SCtgJob* pJob = pTask->pJob; - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgDBCache* dbCache = NULL; + SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); if (NULL == pMsgCtx->pBatchs) { pMsgCtx->pBatchs = pJob->pBatchs; } @@ -1785,16 +1789,16 @@ _return: CTG_RET(code); } -int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgLaunchGetTbHashsTask(SCtgTask* pTask) { + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgTbHashsCtx* pCtx = (SCtgTbHashsCtx*)pTask->taskCtx; - SCtgDBCache *dbCache = NULL; - SCtgJob* pJob = pTask->pJob; - int32_t dbNum = taosArrayGetSize(pCtx->pNames); - int32_t fetchIdx = 0; - int32_t baseResIdx = 0; - int32_t code = 0; + SCtgTbHashsCtx* pCtx = (SCtgTbHashsCtx*)pTask->taskCtx; + SCtgDBCache* dbCache = NULL; + SCtgJob* pJob = pTask->pJob; + int32_t dbNum = taosArrayGetSize(pCtx->pNames); + int32_t fetchIdx = 0; + int32_t baseResIdx = 0; + int32_t code = 0; for (int32_t i = 0; i < dbNum; ++i) { STablesReq* pReq = taosArrayGet(pCtx->pNames, i); @@ -1805,7 +1809,8 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { SCtgTaskReq tReq; tReq.pTask = pTask; tReq.msgIdx = -1; - CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, &tReq, dbCache->vgCache.vgInfo, pCtx, pReq->dbFName, pReq->pTables, false)); + CTG_ERR_JRET( + ctgGetVgInfosFromHashValue(pCtg, &tReq, dbCache->vgCache.vgInfo, pCtx, pReq->dbFName, pReq->pTables, false)); ctgReleaseVgInfoToCache(pCtg, dbCache); dbCache = NULL; @@ -1831,9 +1836,9 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum); for (int32_t i = 0; i < pCtx->fetchNum; ++i) { - SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); + SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i); + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i); if (NULL == pMsgCtx->pBatchs) { pMsgCtx->pBatchs = pJob->pBatchs; } @@ -1858,15 +1863,14 @@ _return: return code; } - -int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) { - int32_t code = 0; - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgLaunchGetTbIndexTask(SCtgTask* pTask) { + int32_t code = 0; + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx; - SArray* pRes = NULL; - SCtgJob* pJob = pTask->pJob; - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx; + SArray* pRes = NULL; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); if (NULL == pMsgCtx->pBatchs) { pMsgCtx->pBatchs = pJob->pBatchs; } @@ -1883,16 +1887,16 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t ctgLaunchGetTbCfgTask(SCtgTask *pTask) { - int32_t code = 0; - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgLaunchGetTbCfgTask(SCtgTask* pTask) { + int32_t code = 0; + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgTbCfgCtx* pCtx = (SCtgTbCfgCtx*)pTask->taskCtx; - SArray* pRes = NULL; - char dbFName[TSDB_DB_FNAME_LEN]; + SCtgTbCfgCtx* pCtx = (SCtgTbCfgCtx*)pTask->taskCtx; + SArray* pRes = NULL; + char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pCtx->pName, dbFName); - SCtgJob* pJob = pTask->pJob; - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); if (NULL == pMsgCtx->pBatchs) { pMsgCtx->pBatchs = pJob->pBatchs; } @@ -1930,12 +1934,11 @@ _return: CTG_RET(code); } - -int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) { - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgLaunchGetQnodeTask(SCtgTask* pTask) { + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgJob* pJob = pTask->pJob; - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); if (NULL == pMsgCtx->pBatchs) { pMsgCtx->pBatchs = pJob->pBatchs; } @@ -1944,11 +1947,11 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) { - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgLaunchGetDnodeTask(SCtgTask* pTask) { + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgJob* pJob = pTask->pJob; - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); if (NULL == pMsgCtx->pBatchs) { pMsgCtx->pBatchs = pJob->pBatchs; } @@ -1957,13 +1960,12 @@ int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } - -int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) { - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgLaunchGetDbCfgTask(SCtgTask* pTask) { + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx; - SCtgJob* pJob = pTask->pJob; - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); if (NULL == pMsgCtx->pBatchs) { pMsgCtx->pBatchs = pJob->pBatchs; } @@ -1973,13 +1975,13 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) { - int32_t code = 0; - SCatalog* pCtg = pTask->pJob->pCtg; - SCtgDBCache *dbCache = NULL; +int32_t ctgLaunchGetDbInfoTask(SCtgTask* pTask) { + int32_t code = 0; + SCatalog* pCtg = pTask->pJob->pCtg; + SCtgDBCache* dbCache = NULL; SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx; - SCtgJob* pJob = pTask->pJob; - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); if (NULL == pMsgCtx->pBatchs) { pMsgCtx->pBatchs = pJob->pBatchs; } @@ -2013,12 +2015,12 @@ _return: CTG_RET(code); } -int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) { - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgLaunchGetIndexTask(SCtgTask* pTask) { + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx; - SCtgJob* pJob = pTask->pJob; - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); if (NULL == pMsgCtx->pBatchs) { pMsgCtx->pBatchs = pJob->pBatchs; } @@ -2028,12 +2030,12 @@ int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) { - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgLaunchGetUdfTask(SCtgTask* pTask) { + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx; - SCtgJob* pJob = pTask->pJob; - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); if (NULL == pMsgCtx->pBatchs) { pMsgCtx->pBatchs = pJob->pBatchs; } @@ -2043,14 +2045,14 @@ int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t ctgLaunchGetUserTask(SCtgTask *pTask) { - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgLaunchGetUserTask(SCtgTask* pTask) { + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx; - bool inCache = false; - bool pass = false; - SCtgJob* pJob = pTask->pJob; - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx; + bool inCache = false; + bool pass = false; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); if (NULL == pMsgCtx->pBatchs) { pMsgCtx->pBatchs = pJob->pBatchs; } @@ -2072,11 +2074,11 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) { - SCatalog* pCtg = pTask->pJob->pCtg; +int32_t ctgLaunchGetSvrVerTask(SCtgTask* pTask) { + SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; - SCtgJob* pJob = pTask->pJob; - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); if (NULL == pMsgCtx->pBatchs) { pMsgCtx->pBatchs = pJob->pBatchs; } @@ -2086,7 +2088,7 @@ int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) { +int32_t ctgRelaunchGetTbMetaTask(SCtgTask* pTask) { ctgResetTbMetaTask(pTask); CTG_ERR_RET(ctgLaunchGetTbMetaTask(pTask)); @@ -2094,7 +2096,7 @@ int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t ctgGetTbCfgCb(SCtgTask *pTask) { +int32_t ctgGetTbCfgCb(SCtgTask* pTask) { int32_t code = 0; CTG_ERR_JRET(pTask->subRes.code); @@ -2124,7 +2126,6 @@ int32_t ctgCompDbVgTasks(SCtgTask* pTask, void* param, bool* equal) { return TSDB_CODE_SUCCESS; } - int32_t ctgCompTbMetaTasks(SCtgTask* pTask, void* param, bool* equal) { SCtgTbMetaCtx* ctx = pTask->taskCtx; @@ -2145,38 +2146,38 @@ int32_t ctgCloneDbVg(SCtgTask* pTask, void** pRes) { CTG_RET(cloneDbVgInfo(pOut->dbVgroup, (SDBVgInfo**)pRes)); } - SCtgAsyncFps gCtgAsyncFps[] = { - {ctgInitGetQnodeTask, ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes, NULL, NULL}, - {ctgInitGetDnodeTask, ctgLaunchGetDnodeTask, ctgHandleGetDnodeRsp, ctgDumpDnodeRes, NULL, NULL}, - {ctgInitGetDbVgTask, ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes, ctgCompDbVgTasks, ctgCloneDbVg}, - {ctgInitGetDbCfgTask, ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes, NULL, NULL}, - {ctgInitGetDbInfoTask, ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes, NULL, NULL}, - {ctgInitGetTbMetaTask, ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes, ctgCompTbMetaTasks, ctgCloneTbMeta}, - {ctgInitGetTbHashTask, ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes, NULL, NULL}, - {ctgInitGetTbIndexTask, ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes, NULL, NULL}, - {ctgInitGetTbCfgTask, ctgLaunchGetTbCfgTask, ctgHandleGetTbCfgRsp, ctgDumpTbCfgRes, NULL, NULL}, - {ctgInitGetIndexTask, ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes, NULL, NULL}, - {ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL}, - {ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL}, - {ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL}, - {ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetasRsp, ctgDumpTbMetasRes, NULL, NULL}, - {ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL}, + {ctgInitGetQnodeTask, ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes, NULL, NULL}, + {ctgInitGetDnodeTask, ctgLaunchGetDnodeTask, ctgHandleGetDnodeRsp, ctgDumpDnodeRes, NULL, NULL}, + {ctgInitGetDbVgTask, ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes, ctgCompDbVgTasks, ctgCloneDbVg}, + {ctgInitGetDbCfgTask, ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes, NULL, NULL}, + {ctgInitGetDbInfoTask, ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes, NULL, NULL}, + {ctgInitGetTbMetaTask, ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes, ctgCompTbMetaTasks, + ctgCloneTbMeta}, + {ctgInitGetTbHashTask, ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes, NULL, NULL}, + {ctgInitGetTbIndexTask, ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes, NULL, NULL}, + {ctgInitGetTbCfgTask, ctgLaunchGetTbCfgTask, ctgHandleGetTbCfgRsp, ctgDumpTbCfgRes, NULL, NULL}, + {ctgInitGetIndexTask, ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes, NULL, NULL}, + {ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL}, + {ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL}, + {ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL}, + {ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetasRsp, ctgDumpTbMetasRes, NULL, NULL}, + {ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL}, }; -int32_t ctgMakeAsyncRes(SCtgJob *pJob) { +int32_t ctgMakeAsyncRes(SCtgJob* pJob) { int32_t code = 0; int32_t taskNum = taosArrayGetSize(pJob->pTasks); for (int32_t i = 0; i < taskNum; ++i) { - SCtgTask *pTask = taosArrayGet(pJob->pTasks, i); + SCtgTask* pTask = taosArrayGet(pJob->pTasks, i); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].dumpResFp)(pTask)); } return TSDB_CODE_SUCCESS; } -int32_t ctgSearchExistingTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t* taskId) { +int32_t ctgSearchExistingTask(SCtgJob* pJob, CTG_TASK_TYPE type, void* param, int32_t* taskId) { bool equal = false; SCtgTask* pTask = NULL; int32_t code = 0; @@ -2186,7 +2187,7 @@ int32_t ctgSearchExistingTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, in int32_t taskNum = taosArrayGetSize(pJob->pTasks); for (int32_t i = 0; i < taskNum; ++i) { pTask = taosArrayGet(pJob->pTasks, i); - if (type != pTask->type) { + if (type != pTask->type) { continue; } @@ -2206,15 +2207,15 @@ _return: CTG_RET(code); } -int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) { +int32_t ctgSetSubTaskCb(SCtgTask* pSub, SCtgTask* pTask) { int32_t code = 0; CTG_LOCK(CTG_WRITE, &pSub->lock); if (CTG_TASK_DONE == pSub->status) { pTask->subRes.code = pSub->code; CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res)); - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); - SCtgMsgCtx *pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1); + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgMsgCtx* pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1); pMsgCtx->pBatchs = pSubMsgCtx->pBatchs; CTG_ERR_JRET(pTask->subRes.fp(pTask)); @@ -2233,8 +2234,7 @@ _return: CTG_RET(code); } - -int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param) { +int32_t ctgLaunchSubTask(SCtgTask* pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param) { SCtgJob* pJob = pTask->pJob; int32_t subTaskId = -1; bool newTask = false; @@ -2254,8 +2254,8 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask)); if (newTask) { - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); - SCtgMsgCtx *pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1); + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgMsgCtx* pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1); pSubMsgCtx->pBatchs = pMsgCtx->pBatchs; CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub)); @@ -2265,11 +2265,11 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, return TSDB_CODE_SUCCESS; } -int32_t ctgLaunchJob(SCtgJob *pJob) { +int32_t ctgLaunchJob(SCtgJob* pJob) { int32_t taskNum = taosArrayGetSize(pJob->pTasks); for (int32_t i = 0; i < taskNum; ++i) { - SCtgTask *pTask = taosArrayGet(pJob->pTasks, i); + SCtgTask* pTask = taosArrayGet(pJob->pTasks, i); qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); @@ -2289,6 +2289,3 @@ int32_t ctgLaunchJob(SCtgJob *pJob) { return TSDB_CODE_SUCCESS; } - - - diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 1fdf84e120..7b8c66e368 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -68,14 +68,15 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu taskMsg.pData = NULL; taskMsg.len = 0; } - + SCtgTaskReq tReq; tReq.pTask = pTask; - tReq.msgIdx = rsp.msgIdx; + tReq.msgIdx = rsp.msgIdx; SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx); pMsgCtx->pBatchs = pBatchs; - ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId, rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs); + ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId, + rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs); (*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); } @@ -344,13 +345,14 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); pMsgCtx->pBatchs = pBatchs; #endif SCtgTaskReq tReq; tReq.pTask = pTask; tReq.msgIdx = -1; + CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, cbParam->reqType, pMsg, rspCode)); #if CTG_BATCH_FETCH @@ -361,6 +363,7 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) { _return: taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); if (pJob) { taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId); @@ -442,17 +445,17 @@ _return: CTG_RET(code); } -int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTaskReq* tReq, int32_t msgType, void* msg, - uint32_t msgSize) { - int32_t code = 0; - SCtgTask* pTask = tReq->pTask; +int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTaskReq* tReq, int32_t msgType, + void* msg, uint32_t msgSize) { + int32_t code = 0; + SCtgTask* pTask = tReq->pTask; SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); - SHashObj* pBatchs = pMsgCtx->pBatchs; - SCtgJob* pJob = pTask->pJob; - SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId)); - SCtgBatch newBatch = {0}; - SBatchMsg req = {0}; - + SHashObj* pBatchs = pMsgCtx->pBatchs; + SCtgJob* pJob = pTask->pJob; + SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId)); + SCtgBatch newBatch = {0}; + SBatchMsg req = {0}; + if (NULL == pBatch) { newBatch.pMsgs = taosArrayInit(pJob->subTaskNum, sizeof(SBatchMsg)); newBatch.pTaskIds = taosArrayInit(pJob->subTaskNum, sizeof(int32_t)); @@ -487,7 +490,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT } else if (TDMT_VND_TABLE_META == msgType) { if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; - SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); + SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); pName = ctgGetFetchName(ctx->pNames, fetch); } else { SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; @@ -521,14 +524,14 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT if (NULL == taosArrayPush(pBatch->pMsgs, &req)) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - msg = NULL; + msg = NULL; if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } if (NULL == taosArrayPush(pBatch->pMsgIdxs, &req.msgIdx)) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - + pBatch->msgSize += sizeof(req) + msgSize - POINTER_BYTES; if (vgId > 0) { @@ -539,7 +542,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT } else if (TDMT_VND_TABLE_META == msgType) { if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; - SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); + SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); pName = ctgGetFetchName(ctx->pNames, fetch); } else { SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; @@ -550,7 +553,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT CTG_ERR_JRET(TSDB_CODE_APP_ERROR); } - tNameGetFullDbName(pName, pBatch->dbFName); + tNameGetFullDbName(pName, pBatch->dbFName); } ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, @@ -583,7 +586,7 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { for (int32_t i = 0; i < num; ++i) { SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i); *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgIdx); - offset += sizeof(pReq->msgIdx); + offset += sizeof(pReq->msgIdx); *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType); offset += sizeof(pReq->msgType); *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgLen); @@ -611,7 +614,7 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) { ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg)); - code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs, + code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs, pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize); pBatch->pTaskIds = NULL; CTG_ERR_JRET(code); @@ -656,7 +659,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray #if CTG_BATCH_FETCH SCtgTaskReq tReq; tReq.pTask = pTask; - tReq.msgIdx = -1; + tReq.msgIdx = -1; CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); #else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); @@ -705,7 +708,7 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray #if CTG_BATCH_FETCH SCtgTaskReq tReq; tReq.pTask = pTask; - tReq.msgIdx = -1; + tReq.msgIdx = -1; CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); #else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); @@ -736,9 +739,9 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out, SCtgTaskReq* tReq) { - char* msg = NULL; - int32_t msgLen = 0; - int32_t reqType = TDMT_MND_USE_DB; + char* msg = NULL; + int32_t msgLen = 0; + int32_t reqType = TDMT_MND_USE_DB; SCtgTask* pTask = tReq ? tReq->pTask : NULL; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; @@ -813,7 +816,7 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char #if CTG_BATCH_FETCH SCtgTaskReq tReq; tReq.pTask = pTask; - tReq.msgIdx = -1; + tReq.msgIdx = -1; CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); #else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); @@ -868,7 +871,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const #if CTG_BATCH_FETCH SCtgTaskReq tReq; tReq.pTask = pTask; - tReq.msgIdx = -1; + tReq.msgIdx = -1; CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); #else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); @@ -919,13 +922,13 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n if (NULL == pOut) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)tbFName)); #if CTG_BATCH_FETCH SCtgTaskReq tReq; tReq.pTask = pTask; - tReq.msgIdx = -1; + tReq.msgIdx = -1; CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); #else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); @@ -980,7 +983,7 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ch #if CTG_BATCH_FETCH SCtgTaskReq tReq; tReq.pTask = pTask; - tReq.msgIdx = -1; + tReq.msgIdx = -1; CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); #else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); @@ -1035,7 +1038,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const #if CTG_BATCH_FETCH SCtgTaskReq tReq; tReq.pTask = pTask; - tReq.msgIdx = -1; + tReq.msgIdx = -1; CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); #else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); @@ -1066,7 +1069,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName, STableMetaOutput* out, SCtgTaskReq* tReq) { - SCtgTask *pTask = tReq ? tReq->pTask : NULL; + SCtgTask* pTask = tReq ? tReq->pTask : NULL; SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName}; char* msg = NULL; SEpSet* pVnodeEpSet = NULL; @@ -1091,7 +1094,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* } CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName)); - + #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen)); #else @@ -1131,8 +1134,8 @@ int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo, STableMetaOutput* out, SCtgTaskReq* tReq) { - SCtgTask *pTask = tReq ? tReq->pTask : NULL; - char dbFName[TSDB_DB_FNAME_LEN]; + SCtgTask* pTask = tReq ? tReq->pTask : NULL; + char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFName); int32_t reqType = TDMT_VND_TABLE_META; char tbFName[TSDB_TABLE_FNAME_LEN]; @@ -1165,7 +1168,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa .requestObjRefId = pConn->requestObjRefId, .mgmtEps = vgroupInfo->epSet}; - CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName)); + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen)); @@ -1231,7 +1234,7 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S #if CTG_BATCH_FETCH SCtgTaskReq tReq; tReq.pTask = pTask; - tReq.msgIdx = -1; + tReq.msgIdx = -1; CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, &tReq, reqType, msg, msgLen)); #else SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; @@ -1243,7 +1246,8 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S } taosArrayPush(pTaskId, &pTask->taskId); - CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->pVgInfo->vgId, reqType, msg, msgLen)); + CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->pVgInfo->vgId, reqType, msg, + msgLen)); #endif } @@ -1289,7 +1293,7 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S #if CTG_BATCH_FETCH SCtgTaskReq tReq; tReq.pTask = pTask; - tReq.msgIdx = -1; + tReq.msgIdx = -1; CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); #else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); @@ -1338,7 +1342,7 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** ou #if CTG_BATCH_FETCH SCtgTaskReq tReq; tReq.pTask = pTask; - tReq.msgIdx = -1; + tReq.msgIdx = -1; CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen)); #else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 275fbb8016..c6dc57d752 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -384,8 +384,7 @@ _return: taosMemoryFreeClear(msg); SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); -} - +} int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; From 90fac9b8da70a11ccb6ddcbbe3cf03fc298cb0b7 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 15 Sep 2022 21:11:12 +0800 Subject: [PATCH 2/3] fix(tsc): avoid mem leak --- source/client/src/clientMsgHandler.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index ebccc5ea1a..cdf977bea3 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -47,6 +47,7 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) { int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pData); setErrno(pRequest, code); tsem_post(&pRequest->body.rspSem); @@ -72,6 +73,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { /*assert(connectRsp.epSet.numOfEps > 0);*/ if (connectRsp.epSet.numOfEps == 0) { + taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pData); setErrno(pRequest, TSDB_CODE_MND_APP_ERROR); tsem_post(&pRequest->body.rspSem); From 3398de8a35ba6409a745a5625a06a14eaa0f99c4 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 15 Sep 2022 21:35:54 +0800 Subject: [PATCH 3/3] fix(tsc): avoid mem leak --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- source/libs/transport/src/transCli.c | 4 +--- source/libs/transport/src/transSvr.c | 4 ++++ 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index b91b82b72e..f57943b9dd 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -270,7 +270,7 @@ int32_t dmInitClient(SDnode *pDnode) { SRpcInit rpcInit = {0}; rpcInit.label = "DND-C"; - rpcInit.numOfThreads = 1; + rpcInit.numOfThreads = 4; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dcfa93431b..191011111d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1498,9 +1498,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; - if (ctx != NULL) { - pCtx->appCtx = *ctx; - } + if (ctx != NULL) pCtx->appCtx = *ctx; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index d3277d1cc1..70a47fe079 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1148,6 +1148,7 @@ int transReleaseSrvHandle(void* handle) { tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); + transReleaseExHandle(transGetRefMgt(), refId); return 0; _return1: @@ -1177,8 +1178,10 @@ int transSendResponse(const STransMsg* msg) { STraceId* trace = (STraceId*)&msg->info.traceId; tGTrace("conn %p start to send resp (1/2)", exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); + transReleaseExHandle(transGetRefMgt(), refId); return 0; + _return1: tTrace("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); @@ -1207,6 +1210,7 @@ int transRegisterMsg(const STransMsg* msg) { STrans* pTransInst = pThrd->pTransInst; tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); + transReleaseExHandle(transGetRefMgt(), refId); return 0;