Merge pull request #15715 from taosdata/fix/valgrindBug

fix; valgrind error
This commit is contained in:
Yihao Deng 2022-08-04 09:26:26 +08:00 committed by GitHub
commit 1b619f9e86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 246 additions and 232 deletions

View File

@ -809,6 +809,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper, code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper,
&pRequest->body.queryJob); &pRequest->body.queryJob);
pCxt = NULL;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
return; return;
} }
@ -816,6 +817,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
_error: _error:
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
pRequest->requestId); pRequest->requestId);
taosMemoryFree(pCxt);
terrno = code; terrno = code;
pRequest->code = code; pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);

View File

@ -13,14 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "trpc.h"
#include "query.h"
#include "tname.h"
#include "catalogInt.h" #include "catalogInt.h"
#include "query.h"
#include "systable.h" #include "systable.h"
#include "tname.h"
#include "tref.h" #include "tref.h"
#include "trpc.h"
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SArray* pTaskId = cbParam->taskId; SArray* pTaskId = cbParam->taskId;
SCatalog* pCtg = pJob->pCtg; SCatalog* pCtg = pJob->pCtg;
@ -30,7 +30,8 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0; int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0;
ASSERT(taskNum == msgNum || 0 == msgNum); ASSERT(taskNum == msgNum || 0 == msgNum);
ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1)); ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId,
TMSG_INFO(cbParam->reqType + 1));
offset += sizeof(msgNum); offset += sizeof(msgNum);
SBatchRsp rsp = {0}; SBatchRsp rsp = {0};
@ -42,7 +43,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
int32_t* taskId = taosArrayGet(pTaskId, i); int32_t* taskId = taosArrayGet(pTaskId, i);
SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId); SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
if (msgNum > 0) { if (msgNum > 0) {
rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
offset += sizeof(rsp.reqType); offset += sizeof(rsp.reqType);
@ -65,7 +66,8 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
pTask->pBatchs = pBatchs; pTask->pBatchs = pBatchs;
ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1)); ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId,
TMSG_INFO(taskMsg.msgType + 1));
(*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); (*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
} }
@ -78,7 +80,6 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) { int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) {
int32_t code = 0; int32_t code = 0;
@ -303,8 +304,7 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param; SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param;
int32_t code = 0; int32_t code = 0;
SCtgJob* pJob = NULL; SCtgJob* pJob = NULL;
@ -322,13 +322,15 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) { if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) {
CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode)); CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode));
} else { } else {
int32_t *taskId = taosArrayGet(cbParam->taskId, 0); int32_t* taskId = taosArrayGet(cbParam->taskId, 0);
SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId); SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1)); qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId,
TMSG_INFO(cbParam->reqType + 1));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
SHashObj* pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); SHashObj* pBatchs =
taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == pBatchs) { if (NULL == pBatchs) {
ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM); ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
@ -354,16 +356,16 @@ _return:
CTG_API_LEAVE(code); CTG_API_LEAVE(code);
} }
int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType,
int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType, SMsgSendInfo **pMsgSendInfo) { SMsgSendInfo** pMsgSendInfo) {
int32_t code = 0; int32_t code = 0;
SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == msgSendInfo) { if (NULL == msgSendInfo) {
qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCtgTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam)); SCtgTaskCallbackParam* param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam));
if (NULL == param) { if (NULL == param) {
qError("calloc %d failed", (int32_t)sizeof(SCtgTaskCallbackParam)); qError("calloc %d failed", (int32_t)sizeof(SCtgTaskCallbackParam));
CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -391,10 +393,10 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob* pJob, SArray* pTaskId, int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, SArray* pTaskId, int32_t batchId,
int32_t batchId, char* dbFName, int32_t vgId, int32_t msgType, void *msg, uint32_t msgSize) { char* dbFName, int32_t vgId, int32_t msgType, void* msg, uint32_t msgSize) {
int32_t code = 0; int32_t code = 0;
SMsgSendInfo *pMsgSendInfo = NULL; SMsgSendInfo* pMsgSendInfo = NULL;
CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, msgType, &pMsgSendInfo)); CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, msgType, &pMsgSendInfo));
ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId); ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId);
@ -426,7 +428,8 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgTask* pTask, int32_t msgType, void *msg, uint32_t msgSize) { int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTask* pTask, int32_t msgType, void* msg,
uint32_t msgSize) {
int32_t code = 0; int32_t code = 0;
SHashObj* pBatchs = pTask->pBatchs; SHashObj* pBatchs = pTask->pBatchs;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
@ -475,7 +478,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId, vgId); ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId,
vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -504,7 +508,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
} }
} }
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, vgId); ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId,
vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -517,14 +522,14 @@ _return:
} }
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
*msg = taosMemoryMalloc(pBatch->msgSize); *msg = taosMemoryCalloc(1, pBatch->msgSize);
if (NULL == (*msg)) { if (NULL == (*msg)) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
int32_t offset = 0; int32_t offset = 0;
int32_t num = taosArrayGetSize(pBatch->pMsgs); int32_t num = taosArrayGetSize(pBatch->pMsgs);
SBatchReq *pBatchReq = (SBatchReq*)(*msg); SBatchReq* pBatchReq = (SBatchReq*)(*msg);
pBatchReq->header.vgId = htonl(vgId); pBatchReq->header.vgId = htonl(vgId);
pBatchReq->msgNum = htonl(num); pBatchReq->msgNum = htonl(num);
@ -547,7 +552,7 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) { int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
int32_t code = 0; int32_t code = 0;
void* msg = NULL; void* msg = NULL;
void* p = taosHashIterate(pBatchs, NULL); void* p = taosHashIterate(pBatchs, NULL);
@ -559,8 +564,8 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) {
ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg)); CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg));
code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->dbFName, *vgId,
pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize); pBatch->msgType, msg, pBatch->msgSize);
pBatch->pTaskIds = NULL; pBatch->pTaskIds = NULL;
CTG_ERR_JRET(code); CTG_ERR_JRET(code);
@ -579,12 +584,11 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* out, SCtgTask* pTask) {
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask) { char* msg = NULL;
char *msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_QNODE_LIST; int32_t reqType = TDMT_MND_QNODE_LIST;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse); ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
@ -630,11 +634,11 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask) { int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray** out, SCtgTask* pTask) {
char *msg = NULL; char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_DNODE_LIST; int32_t reqType = TDMT_MND_DNODE_LIST;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse); ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
@ -676,12 +680,12 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out,
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask) { SCtgTask* pTask) {
char *msg = NULL; char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_USE_DB; int32_t reqType = TDMT_MND_USE_DB;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db); ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);
@ -727,15 +731,16 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask) { int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* out,
char *msg = NULL; SCtgTask* pTask) {
char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_DB_CFG; int32_t reqType = TDMT_MND_GET_DB_CFG;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName); ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)dbFName, &msg, 0, &msgLen, mallocFp); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)dbFName, &msg, 0, &msgLen, mallocFp);
if (code) { if (code) {
ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName); ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName);
CTG_ERR_RET(code); CTG_ERR_RET(code);
@ -777,15 +782,16 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask) { int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* indexName, SIndexInfo* out,
char *msg = NULL; SCtgTask* pTask) {
char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_INDEX; int32_t reqType = TDMT_MND_GET_INDEX;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get index from mnode, indexName:%s", indexName); ctgDebug("try to get index from mnode, indexName:%s", indexName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)indexName, &msg, 0, &msgLen, mallocFp); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)indexName, &msg, 0, &msgLen, mallocFp);
if (code) { if (code) {
ctgError("Build get index msg failed, code:%x, db:%s", code, indexName); ctgError("Build get index msg failed, code:%x, db:%s", code, indexName);
CTG_ERR_RET(code); CTG_ERR_RET(code);
@ -827,17 +833,18 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask) { int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* name, STableIndex* out,
char *msg = NULL; SCtgTask* pTask) {
char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_INDEX; int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(name, tbFName); tNameExtractFullName(name, tbFName);
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName); ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)tbFName, &msg, 0, &msgLen, mallocFp); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
if (code) { if (code) {
ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName); ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
CTG_ERR_RET(code); CTG_ERR_RET(code);
@ -880,15 +887,16 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask) { int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* out,
char *msg = NULL; SCtgTask* pTask) {
char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_RETRIEVE_FUNC; int32_t reqType = TDMT_MND_RETRIEVE_FUNC;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get udf info from mnode, funcName:%s", funcName); ctgDebug("try to get udf info from mnode, funcName:%s", funcName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)funcName, &msg, 0, &msgLen, mallocFp); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)funcName, &msg, 0, &msgLen, mallocFp);
if (code) { if (code) {
ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName); ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName);
CTG_ERR_RET(code); CTG_ERR_RET(code);
@ -930,15 +938,16 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask) { int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, SGetUserAuthRsp* out,
char *msg = NULL; SCtgTask* pTask) {
char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_USER_AUTH; int32_t reqType = TDMT_MND_GET_USER_AUTH;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get user auth from mnode, user:%s", user); ctgDebug("try to get user auth from mnode, user:%s", user);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)user, &msg, 0, &msgLen, mallocFp); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)user, &msg, 0, &msgLen, mallocFp);
if (code) { if (code) {
ctgError("Build get user auth msg failed, code:%x, db:%s", code, user); ctgError("Build get user auth msg failed, code:%x, db:%s", code, user);
CTG_ERR_RET(code); CTG_ERR_RET(code);
@ -980,16 +989,16 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName,
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask) { STableMetaOutput* out, SCtgTask* pTask) {
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName}; SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
char *msg = NULL; char* msg = NULL;
SEpSet *pVnodeEpSet = NULL; SEpSet* pVnodeEpSet = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_TABLE_META; int32_t reqType = TDMT_MND_TABLE_META;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, tbName); sprintf(tbFName, "%s.%s", dbFName, tbName);
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName); ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName);
@ -1034,27 +1043,30 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask) { int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out,
SCtgTask* pTask) {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName); tNameGetFullDbName(pTableName, dbFName);
return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char *)pTableName->tname, out, pTask); return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char*)pTableName->tname, out, pTask);
} }
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask) { int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo,
STableMetaOutput* out, SCtgTask* pTask) {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName); tNameGetFullDbName(pTableName, dbFName);
int32_t reqType = TDMT_VND_TABLE_META; int32_t reqType = TDMT_VND_TABLE_META;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, pTableName->tname); sprintf(tbFName, "%s.%s", dbFName, pTableName->tname);
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse]; SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
vgroupInfo->vgId, vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName); vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)}; SBuildTableInput bInput = {
char *msg = NULL; .vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)tNameGetTableName(pTableName)};
char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
@ -1107,20 +1119,21 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask) { int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
char *msg = NULL; SVgroupInfo* vgroupInfo, STableCfg** out, SCtgTask* pTask) {
char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_VND_TABLE_CFG; int32_t reqType = TDMT_VND_TABLE_CFG;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFName); tNameExtractFullName(pTableName, tbFName);
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName); tNameGetFullDbName(pTableName, dbFName);
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname}; SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse]; SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
vgroupInfo->vgId, vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName); vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
if (code) { if (code) {
@ -1167,14 +1180,14 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** out,
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask) { SCtgTask* pTask) {
char *msg = NULL; char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_TABLE_CFG; int32_t reqType = TDMT_MND_TABLE_CFG;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFName); tNameExtractFullName(pTableName, tbFName);
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName); tNameGetFullDbName(pTableName, dbFName);
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname}; SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
@ -1218,11 +1231,11 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask) { int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** out, SCtgTask* pTask) {
char *msg = NULL; char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_SERVER_VERSION; int32_t reqType = TDMT_MND_SERVER_VERSION;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
qDebug("try to get svr ver from mnode"); qDebug("try to get svr ver from mnode");
@ -1263,5 +1276,3 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }