From 4cf20418cdd0d446517184077521c49ed349f562 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 26 Jul 2022 18:00:50 +0800 Subject: [PATCH 1/4] feat: support batch fetch in catalog --- include/common/tmsg.h | 22 ++ include/common/tmsgdef.h | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/vnode/src/inc/vnd.h | 4 +- source/dnode/vnode/src/vnd/vnodeQuery.c | 135 +++++++- source/dnode/vnode/src/vnd/vnodeSvr.c | 8 +- source/libs/catalog/inc/catalogInt.h | 18 + source/libs/catalog/inc/ctgRemote.h | 3 +- source/libs/catalog/src/ctgAsync.c | 21 +- source/libs/catalog/src/ctgRemote.c | 363 ++++++++++++++++++-- source/libs/catalog/src/ctgUtil.c | 34 +- source/libs/qcom/src/querymsg.c | 1 + 12 files changed, 559 insertions(+), 52 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0d0eb841bc..9d23557ee7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3045,6 +3045,28 @@ typedef struct SDeleteRes { int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes); +typedef struct { + int32_t msgType; + int32_t msgLen; + void* msg; +} SBatchMsg; + +typedef struct { + int32_t reqType; + int32_t msgLen; + int32_t rspCode; + void* msg; +} SBatchRsp; + +static FORCE_INLINE void tFreeSBatchRsp(void *p) { + if (NULL == p) { + return; + } + + SBatchRsp* pRsp = (SBatchRsp*); + taosMemoryFree(pRsp->msg); +} + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 56e0935ce1..2d5a703b62 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -180,6 +180,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_TABLE_META, "vnode-table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLES_META, "vnode-tables-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLE_CFG, "vnode-table-cfg", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_BATCH_META, "vnode-batch-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0471e2b850..9ef2350b6b 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -336,6 +336,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 984b34814d..44281ea38e 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -78,8 +78,8 @@ void vnodeBufPoolReset(SVBufPool* pPool); // vnodeQuery.c int32_t vnodeQueryOpen(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode); -int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); -int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg); +int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct); +int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 1c3e2f0514..bf846dba13 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -21,7 +21,7 @@ int vnodeQueryOpen(SVnode *pVnode) { void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } -int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { +int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { STableInfoReq infoReq = {0}; STableMetaRsp metaRsp = {0}; SMetaReader mer1 = {0}; @@ -99,7 +99,12 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { goto _exit; } - pRsp = rpcMallocCont(rspLen); + if (direct) { + pRsp = rpcMallocCont(rspLen); + } else { + pRsp = taosMemoryCalloc(1, rspLen); + } + if (pRsp == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; @@ -117,15 +122,17 @@ _exit: qError("get table %s meta failed cause of %s", infoReq.tbName, tstrerror(code)); } - tmsgSendRsp(&rpcMsg); - + if (direct) { + tmsgSendRsp(&rpcMsg); + } + taosMemoryFree(metaRsp.pSchemas); metaReaderClear(&mer2); metaReaderClear(&mer1); return TSDB_CODE_SUCCESS; } -int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg) { +int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { STableCfgReq cfgReq = {0}; STableCfgRsp cfgRsp = {0}; SMetaReader mer1 = {0}; @@ -209,7 +216,12 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg) { goto _exit; } - pRsp = rpcMallocCont(rspLen); + if (direct) { + pRsp = rpcMallocCont(rspLen); + } else { + pRsp = taosMemoryCalloc(1, rspLen); + } + if (pRsp == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; @@ -227,14 +239,121 @@ _exit: qError("get table %s cfg failed cause of %s", cfgReq.tbName, tstrerror(code)); } - tmsgSendRsp(&rpcMsg); - + if (direct) { + tmsgSendRsp(&rpcMsg); + } + tFreeSTableCfgRsp(&cfgRsp); metaReaderClear(&mer2); metaReaderClear(&mer1); return TSDB_CODE_SUCCESS; } +int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; + int32_t offset = 0; + int32_t rspSize = 0; + int32_t msgNum = ntohl(pMsg->pCont); + offset += sizeof(msgNum); + SBatchMsg req = {0}; + SBatchRsp rsp = {0}; + SRpcMsg reqMsg = *pMsg; + SRpcMsg rspMsg = {0}; + void* pRsp = NULL; + + SArray* batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp)); + if (NULL == batchRsp) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + for (int32_t i = 0; i < msgNum; ++i) { + req.msgType = ntohl((char*)pMsg->pCont + offset); + offset += req.msgType; + + req.msgLen = ntohl((char*)pMsg->pCont + offset); + offset += req.msgLen; + + req.msg = (char*)pMsg->pCont + offset; + offset += req.msgLen; + + reqMsg.msgType = req.msgType; + reqMsg.pCont = req.msg; + reqMsg.contLen = req.msgLen; + + switch (req.msgType) { + case TDMT_VND_TABLE_META: + vnodeGetTableMeta(pVnode, &reqMsg, false); + break; + case TDMT_VND_TABLE_CFG: + vnodeGetTableCfg(pVnode, &reqMsg, false); + break; + default: + qError("invalid req msgType %d", req.msgType); + reqMsg.code = TSDB_CODE_INVALID_MSG; + reqMsg.pCont = NULL; + reqMsg.contLen = 0; + break; + } + + rsp.reqType = reqMsg.msgType; + rsp.msgLen = reqMsg.contLen; + rsp.rspCode = reqMsg.code; + rsp.msg = reqMsg.pCont; + + taosArrayPush(batchRsp, &rsp); + + rspSize += sizeof(rsp) + rsp.msgLen - POINTER_BYTES; + } + + rspSize += sizeof(int32_t); + offset = 0; + + pRsp = rpcMallocCont(rspSize); + if (pRsp == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + *(int32_t*)((char*)pRsp + offset) = htonl(msgNum); + offset += sizeof(msgNum); + for (int32_t i = 0; i < msgNum; ++i) { + SBatchRsp *p = taosArrayGet(batchRsp, i); + + *(int32_t*)((char*)pRsp + offset) = htonl(p->reqType); + offset += sizeof(p->reqType); + *(int32_t*)((char*)pRsp + offset) = htonl(p->msgLen); + offset += sizeof(p->msgLen); + *(int32_t*)((char*)pRsp + offset) = htonl(p->rspCode); + offset += sizeof(p->rspCode); + memcpy((char*)pRsp + offset, p->msg, p->msgLen); + offset += p->msgLen; + + taosMemoryFreeClear(p->msg); + } + + taosArrayDestroy(batchRsp); + batchRsp = NULL; + +_exit: + + rspMsg.info = pMsg->info; + rspMsg.pCont = pRsp; + rspMsg.contLen = rspSize; + rspMsg.code = code; + rspMsg.msgType = pMsg->msgType; + + if (code) { + qError("get batch meta failed cause of %s", tstrerror(code)); + } + + taosArrayDestroyEx(batchRsp, tFreeSBatchRsp); + + tmsgSendRsp(&rspMsg); + + return TSDB_CODE_SUCCESS; +} + int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->vgId = TD_VID(pVnode); pLoad->syncState = syncGetMyRole(pVnode->sync); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 9d06fbffdd..830ab6e37c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -296,7 +296,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("message in fetch queue is processing"); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || - pMsg->msgType == TDMT_VND_TABLE_CFG) && + pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType = TDMT_VND_BATCH_META) && !vnodeIsLeader(pVnode)) { vnodeRedirectRpcMsg(pVnode, pMsg); return 0; @@ -318,9 +318,11 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { case TDMT_SCH_QUERY_HEARTBEAT: return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0); case TDMT_VND_TABLE_META: - return vnodeGetTableMeta(pVnode, pMsg); + return vnodeGetTableMeta(pVnode, pMsg, true); case TDMT_VND_TABLE_CFG: - return vnodeGetTableCfg(pVnode, pMsg); + return vnodeGetTableCfg(pVnode, pMsg, true); + case TDMT_VND_BATCH_META: + return vnodeGetBatchMeta(pVnode, pMsg); case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index bf3bc1f0f4..4bd85792cd 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -31,6 +31,7 @@ extern "C" { #define CTG_DEFAULT_RENT_SECOND 10 #define CTG_DEFAULT_RENT_SLOT_SIZE 10 #define CTG_DEFAULT_MAX_RETRY_TIMES 3 +#define CTG_DEFAULT_BATCH_NUM 64 #define CTG_RENT_SLOT_SECOND 1.5 @@ -38,6 +39,8 @@ extern "C" { #define CTG_ERR_CODE_TABLE_NOT_EXIST TSDB_CODE_PAR_TABLE_NOT_EXIST +#define CTG_BATCH_FETCH 1 + enum { CTG_READ = 1, CTG_WRITE, @@ -200,8 +203,20 @@ typedef struct SCatalog { SCtgRentMgmt stbRent; } SCatalog; +typedef struct SCtgBatch { + int32_t batchId; + int32_t msgType; + int32_t msgSize; + SArray* pMsgs; + SRequestConnInfo *pConn; + char dbFName[TSDB_DB_FNAME_LEN]; + SArray* pTaskIds; +} SCtgBatch; + typedef struct SCtgJob { int64_t refId; + int32_t batchId; + SHashObj* pBatchs; SArray* pTasks; int32_t taskDone; SMetaData jobRes; @@ -258,6 +273,7 @@ typedef struct SCtgTask { SRWLatch lock; SArray* pParents; SCtgSubRes subRes; + SHashObj* pBatchs; } SCtgTask; typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*); @@ -626,6 +642,8 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, int32_t ctgGetTbCfgCb(SCtgTask *pTask); void ctgFreeHandle(SCatalog* pCatalog); +void ctgFreeBatch(SCtgBatch *pBatch); +void ctgFreeBatchs(SHashObj *pBatchs); int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst); int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput); int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList); diff --git a/source/libs/catalog/inc/ctgRemote.h b/source/libs/catalog/inc/ctgRemote.h index cd88863c1b..72ab43e085 100644 --- a/source/libs/catalog/inc/ctgRemote.h +++ b/source/libs/catalog/inc/ctgRemote.h @@ -23,8 +23,9 @@ extern "C" { typedef struct SCtgTaskCallbackParam { uint64_t queryId; int64_t refId; - uint64_t taskId; + SArray* taskId; int32_t reqType; + int32_t batchId; } SCtgTaskCallbackParam; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 920acbac2e..65c6a0f76b 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -473,8 +473,15 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const pJob->tbCfgNum = tbCfgNum; pJob->svrVerNum = svrVerNum; - pJob->pTasks = taosArrayInit(taskNum, sizeof(SCtgTask)); +#if CTG_BATCH_FETCH + pJob->pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (NULL == pJob->pBatchs) { + ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } +#endif + pJob->pTasks = taosArrayInit(taskNum, sizeof(SCtgTask)); if (NULL == pJob->pTasks) { ctgError("taosArrayInit %d tasks failed", taskNum); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); @@ -560,7 +567,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const _return: - taosMemoryFreeClear(*job); + ctgFreeJob(*job); CTG_RET(code); } @@ -872,7 +879,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * SVgroupInfo vgInfo = {0}; CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo)); - ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag); + ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag); ctx->vgId = vgInfo.vgId; CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask)); @@ -890,7 +897,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * return TSDB_CODE_SUCCESS; } - ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(ctx->pName)); + ctgError("no tbmeta got, tbName:%s", tNameGetTableName(ctx->pName)); ctgRemoveTbMetaFromCache(pCtg, ctx->pName, false); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); @@ -1705,13 +1712,19 @@ int32_t ctgLaunchJob(SCtgJob *pJob) { qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); + pTask->status = CTG_TASK_LAUNCHED; + pTask->pBatchs = pJob->pBatchs; } if (taskNum <= 0) { qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode)); taosAsyncExec(ctgCallUserCb, pJob, NULL); +#if CTG_BATCH_FETCH + } else { + ctgLaunchBatchs(pJob->pCtg, pJob, pJob->pBatchs); +#endif } return TSDB_CODE_SUCCESS; diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index cc5dde9298..04ffbca629 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -21,6 +21,64 @@ #include "ctgRemote.h" #include "tref.h" +int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf *pMsg, int32_t rspCode) { + int32_t code = 0; + SArray* pTaskId = cbParam->taskId; + int32_t taskNum = taosArrayGetSize(pTaskId); + SDataBuf taskMsg = *pMsg; + int32_t offset = 0; + int32_t msgNum = (pMsg->pData && (pMsg->len > 0)) ? htonl(pMsg->pData) : 0; + ASSERT(taskNum == msgNum || 0 == msgNum); + + qDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1)); + + offset += sizeof(msgNum); + SBatchRsp rsp = {0}; + SHashObj* pBatchs = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (NULL == pBatchs) { + ctgError("taosHashInit %d batch failed", taskNum); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + for (int32_t i = 0; i < taskNum; ++i) { + int32_t taskId = taosArrayGet(pTaskId, i); + SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskId); + if (msgNum > 0) { + rsp.reqType = htonl(((char*)pMsg->pData) + offset); + offset += sizeof(rsp.reqType); + rsp.msgLen = htonl(((char*)pMsg->pData) + offset); + offset += sizeof(rsp.msgLen); + rsp.rspCode = htonl(((char*)pMsg->pData) + offset); + offset += sizeof(rsp.rspCode); + rsp.msg = ((char*)pMsg->pData) + offset; + offset += rsp.msgLen; + + taskMsg.msgType = rsp.reqType; + taskMsg.pData = rsp.msg; + taskMsg.len = rsp.msgLen; + } else { + rsp.reqType = -1; + taskMsg.msgType = -1; + taskMsg.pData = NULL; + taskMsg.len = 0; + } + + pTask->pBatchs = pBatchs; + + qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1)); + + (*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); + } + + CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs)); + +_return: + + ctgFreeBatchs(pBatchs); + CTG_RET(code); +} + + int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) { int32_t code = 0; @@ -233,6 +291,11 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, break; } default: + if (TSDB_CODE_SUCCESS != rspCode) { + qError("Got error rsp, error:%s", tstrerror(rspCode)); + CTG_ERR_RET(rspCode); + } + qError("invalid req type %s", TMSG_INFO(reqType)); return TSDB_CODE_APP_ERROR; } @@ -254,12 +317,17 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { goto _return; } - SCtgTask *pTask = taosArrayGet(pJob->pTasks, cbParam->taskId); + if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) { + CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode)); + } else { + int32_t taskId = taosArrayGet(cbParam->taskId, 0); + SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskId); - qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1)); + qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1)); + + CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode)); + } - CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode)); - _return: taosMemoryFree(pMsg->pData); @@ -272,7 +340,7 @@ _return: } -int32_t ctgMakeMsgSendInfo(SCtgTask* pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) { +int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType, SMsgSendInfo **pMsgSendInfo) { int32_t code = 0; SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == msgSendInfo) { @@ -287,9 +355,10 @@ int32_t ctgMakeMsgSendInfo(SCtgTask* pTask, int32_t msgType, SMsgSendInfo **pMsg } param->reqType = msgType; - param->queryId = pTask->pJob->queryId; - param->refId = pTask->pJob->refId; - param->taskId = pTask->taskId; + param->queryId = pJob->queryId; + param->refId = pJob->refId; + param->taskId = pTaskId; + param->batchId = batchId; msgSendInfo->param = param; msgSendInfo->paramFreeFp = taosMemoryFree; @@ -307,12 +376,13 @@ _return: CTG_RET(code); } -int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTask* pTask, int32_t msgType, void *msg, uint32_t msgSize) { +int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob* pJob, SArray* pTaskId, + int32_t batchId, char* dbFName, int32_t vgId, int32_t msgType, void *msg, uint32_t msgSize) { int32_t code = 0; SMsgSendInfo *pMsgSendInfo = NULL; - CTG_ERR_JRET(ctgMakeMsgSendInfo(pTask, msgType, &pMsgSendInfo)); + CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, msgType, &pMsgSendInfo)); - ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, pTask); + ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId); pMsgSendInfo->requestId = pConn->requestId; pMsgSendInfo->requestObjRefId = pConn->requestObjRefId; @@ -328,19 +398,163 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTask* pTask CTG_ERR_JRET(code); } - ctgDebug("ctg req msg sent, reqId:0x%" PRIx64 ", msg type:%d, %s", pTask->pJob->queryId, msgType, TMSG_INFO(msgType)); + ctgDebug("ctg req msg sent, reqId:0x%" PRIx64 ", msg type:%d, %s", pJob->queryId, msgType, TMSG_INFO(msgType)); return TSDB_CODE_SUCCESS; _return: if (pMsgSendInfo) { - taosMemoryFreeClear(pMsgSendInfo->param); - taosMemoryFreeClear(pMsgSendInfo); + destroySendMsgInfo(pMsgSendInfo); } CTG_RET(code); } +int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgTask* pTask, int32_t msgType, void *msg, uint32_t msgSize) { + int32_t code = 0; + SHashObj* pBatchs = pTask->pBatchs; + SCtgJob* pJob = pTask->pJob; + SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId)); + int32_t taskNum = taosArrayGetSize(pTask->pJob->pTasks); + SCtgBatch newBatch = {0}; + SBatchMsg req = {0}; + + if (NULL == pBatch) { + newBatch.pMsgs = taosArrayInit(taskNum, sizeof(SBatchMsg)); + newBatch.pTaskIds = taosArrayInit(taskNum, sizeof(int32_t)); + if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + newBatch.pConn = pConn; + + req.msgType = msgType; + req.msgLen = msgSize; + req.msg = msg; + if (NULL == taosArrayPush(newBatch.pMsgs, &req)) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + newBatch.msgSize = sizeof(req) + msgSize - POINTER_BYTES; + + if (vgId > 0) { + if (TDMT_VND_TABLE_CFG == msgType) { + SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; + tNameGetFullDbName(ctx->pName, newBatch.dbFName); + } else if (TDMT_VND_TABLE_META == msgType) { + SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; + tNameGetFullDbName(ctx->pName, newBatch.dbFName); + } else { + ctgError("invalid vnode msgType %d", msgType); + CTG_ERR_JRET(TSDB_CODE_APP_ERROR); + } + } + + newBatch.msgType = (vgId > 0) ? TDMT_VND_BATCH_META : TDMT_MND_BATCH_META; + newBatch.batchId = atomic_add_fetch_32(&pJob->batchId, 1); + + if (0 != taosHashPut(pBatchs, &vgId, sizeof(vgId), &newBatch, sizeof(newBatch))) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch->batchId, vgId); + + return TSDB_CODE_SUCCESS; + } + + req.msgType = msgType; + req.msgLen = msgSize; + req.msg = msg; + if (NULL == taosArrayPush(pBatch->pMsgs, &req)) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + pBatch->msgSize += sizeof(req) + msgSize - POINTER_BYTES; + + if (vgId > 0) { + if (TDMT_VND_TABLE_CFG == msgType) { + SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; + tNameGetFullDbName(ctx->pName, newBatch.dbFName); + } else if (TDMT_VND_TABLE_META == msgType) { + SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; + tNameGetFullDbName(ctx->pName, newBatch.dbFName); + } else { + ctgError("invalid vnode msgType %d", msgType); + CTG_ERR_JRET(TSDB_CODE_APP_ERROR); + } + } + + ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, vgId); + + return TSDB_CODE_SUCCESS; + +_return: + + ctgFreeBatch(&newBatch); + taosMemoryFree(msg); + + return code; +} + +int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, void** msg) { + *msg = taosMemoryMalloc(pBatch->msgSize); + if (NULL == (*msg)) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + int32_t offset = 0; + int32_t num = taosArrayGetSize(pBatch->pMsgs); + *(int32_t*)((char*)(*msg) + offset) = htonl(num); + offset += sizeof(num); + + for (int32_t i = 0; i < num; ++i) { + SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i); + *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType); + offset += sizeof(pReq->msgType); + *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgLen); + offset += sizeof(pReq->msgLen); + memcpy((char*)(*msg) + offset, pReq->msg, pReq->msgLen); + offset += pReq->msgLen; + } + + ASSERT(pBatch->msgSize == offset); + + return TSDB_CODE_SUCCESS; +} + +int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) { + int32_t code = 0; + void* msg = NULL; + void* p = taosHashIterate(pBatchs, NULL); + while (NULL != p) { + size_t len = 0; + int32_t* vgId = taosHashGetKey(p, &len); + SCtgBatch* pBatch = (SCtgBatch*)p; + + CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, &msg)); + CTG_ERR_JRET(ctgAsyncSendMsg(pCtg, pBatch->pConn, pJob, pBatch->pTaskIds, pBatch->batchId, + pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize)); + + p = taosHashIterate(pBatchs, p); + } + + return TSDB_CODE_SUCCESS; + +_return: + + if (p) { + taosHashCancelIterate(pBatchs, p); + } + taosMemoryFree(msg); + + CTG_RET(code); +} + + int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask) { char *msg = NULL; int32_t msgLen = 0; @@ -361,7 +575,14 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, NULL)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -396,7 +617,14 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray if (pTask) { CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -436,8 +664,14 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, input->db)); + + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -476,8 +710,14 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)dbFName)); + + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -517,7 +757,13 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)indexName)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -560,7 +806,13 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -600,7 +852,13 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)funcName)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -640,7 +898,13 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)user)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -685,7 +949,13 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -744,7 +1014,21 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa .requestId = pConn->requestId, .requestObjRefId = pConn->requestObjRefId, .mgmtEps = vgroupInfo->epSet}; - CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask, reqType, msg, msgLen)); + +#if CTG_BATCH_FETCH + CTG_RET(ctgAddVnodeBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); +#else + SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(ctx->pName, dbFName); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->vgId, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { @@ -791,7 +1075,20 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S .requestId = pConn->requestId, .requestObjRefId = pConn->requestObjRefId, .mgmtEps = vgroupInfo->epSet}; - CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask, reqType, msg, msgLen)); +#if CTG_BATCH_FETCH + CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); +#else + SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(ctx->pName, dbFName); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->pVgInfo->vgId, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { @@ -833,7 +1130,13 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S if (pTask) { CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { @@ -869,7 +1172,13 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou if (pTask) { CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL)); - CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 1f0f074a0f..8d33e5596f 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -19,6 +19,29 @@ #include "catalogInt.h" #include "systable.h" + +void ctgFreeBatch(SCtgBatch *pBatch) { + if (NULL == pBatch) { + return; + } + + taosArrayDestroy(pBatch->pMsgs); + taosArrayDestroy(pBatch->pTaskIds); +} + +void ctgFreeBatchs(SHashObj *pBatchs) { + void* p = taosHashIterate(pBatchs, NULL); + while (NULL != p) { + SCtgBatch* pBatch = (SCtgBatch*)p; + + ctgFreeBatch(pBatch); + + p = taosHashIterate(pBatchs, p); + } + + taosHashCleanup(pBatchs); +} + char *ctgTaskTypeStr(CTG_TASK_TYPE type) { switch (type) { case CTG_TASK_GET_QNODE: @@ -612,6 +635,7 @@ void ctgFreeJob(void* job) { uint64_t qid = pJob->queryId; ctgFreeTasks(pJob->pTasks); + ctgFreeBatchs(pJob->pBatchs); ctgFreeSMetaData(&pJob->jobRes); @@ -867,14 +891,10 @@ int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes) { } -int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask) { - if (msgType == TDMT_VND_TABLE_META) { - SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; - char dbFName[TSDB_DB_FNAME_LEN]; - tNameGetFullDbName(ctx->pName, dbFName); - +int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId) { + if (msgType == TDMT_VND_TABLE_META || msgType == TDMT_VND_TABLE_CFG || msgType == TDMT_VND_BATCH_META) { pMsgSendInfo->target.type = TARGET_TYPE_VNODE; - pMsgSendInfo->target.vgId = ctx->vgId; + pMsgSendInfo->target.vgId = vgId; pMsgSendInfo->target.dbFName = strdup(dbFName); } else { pMsgSendInfo->target.type = TARGET_TYPE_MNODE; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index ed8786170d..78ecde6003 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -636,6 +636,7 @@ void initQueryModuleMsgHandle() { queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_BATCH_META)] = queryProcessGetBatchMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryProcessGetSerVerRsp; } From 602d2197fb599b85c0f9d26c5ef525cbfb8be26c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 27 Jul 2022 08:27:24 +0800 Subject: [PATCH 2/4] feat: support vnode meta batch fetching --- include/common/tmsg.h | 8 +++- include/common/tmsgdef.h | 1 + source/dnode/vnode/src/inc/vnd.h | 1 + source/dnode/vnode/src/vnd/vnodeQuery.c | 17 ++++--- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/libs/catalog/inc/catalogInt.h | 5 +- source/libs/catalog/src/ctgAsync.c | 2 +- source/libs/catalog/src/ctgRemote.c | 62 +++++++++++++++++-------- source/libs/qcom/src/querymsg.c | 1 - 9 files changed, 67 insertions(+), 32 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9d23557ee7..0571f8382a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3051,6 +3051,12 @@ typedef struct { void* msg; } SBatchMsg; +typedef struct { + SMsgHead header; + int32_t msgNum; + SBatchMsg msg[]; +} SBatchReq; + typedef struct { int32_t reqType; int32_t msgLen; @@ -3063,7 +3069,7 @@ static FORCE_INLINE void tFreeSBatchRsp(void *p) { return; } - SBatchRsp* pRsp = (SBatchRsp*); + SBatchRsp* pRsp = (SBatchRsp*)p; taosMemoryFree(pRsp->msg); } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 2d5a703b62..20dc04631e 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -136,6 +136,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_INDEX, "get-table-index", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 44281ea38e..dd1facb462 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -80,6 +80,7 @@ int32_t vnodeQueryOpen(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct); +int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index bf846dba13..6b36a66776 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -124,6 +124,8 @@ _exit: if (direct) { tmsgSendRsp(&rpcMsg); + } else { + *pMsg = rpcMsg; } taosMemoryFree(metaRsp.pSchemas); @@ -241,6 +243,8 @@ _exit: if (direct) { tmsgSendRsp(&rpcMsg); + } else { + *pMsg = rpcMsg; } tFreeSTableCfgRsp(&cfgRsp); @@ -253,8 +257,9 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; int32_t offset = 0; int32_t rspSize = 0; - int32_t msgNum = ntohl(pMsg->pCont); - offset += sizeof(msgNum); + SBatchReq *batchReq = (SBatchReq*)pMsg->pCont; + int32_t msgNum = ntohl(batchReq->msgNum); + offset += sizeof(SBatchReq); SBatchMsg req = {0}; SBatchRsp rsp = {0}; SRpcMsg reqMsg = *pMsg; @@ -268,11 +273,11 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { } for (int32_t i = 0; i < msgNum; ++i) { - req.msgType = ntohl((char*)pMsg->pCont + offset); - offset += req.msgType; + req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset)); + offset += sizeof(req.msgType); - req.msgLen = ntohl((char*)pMsg->pCont + offset); - offset += req.msgLen; + req.msgLen = ntohl(*(int32_t*)((char*)pMsg->pCont + offset)); + offset += sizeof(req.msgLen); req.msg = (char*)pMsg->pCont + offset; offset += req.msgLen; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 830ab6e37c..f560d6a8c2 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -296,7 +296,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("message in fetch queue is processing"); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || - pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType = TDMT_VND_BATCH_META) && + pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) && !vnodeIsLeader(pVnode)) { vnodeRedirectRpcMsg(pVnode, pMsg); return 0; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 4bd85792cd..0821a91d6b 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -208,7 +208,7 @@ typedef struct SCtgBatch { int32_t msgType; int32_t msgSize; SArray* pMsgs; - SRequestConnInfo *pConn; + SRequestConnInfo conn; char dbFName[TSDB_DB_FNAME_LEN]; SArray* pTaskIds; } SCtgBatch; @@ -634,6 +634,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask); int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask); int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask); +int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs); int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param); int32_t ctgLaunchJob(SCtgJob *pJob); @@ -660,7 +661,7 @@ int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2); void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput); int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target); char * ctgTaskTypeStr(CTG_TASK_TYPE type); -int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask); +int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId); int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes); void ctgFreeSTableIndex(void *info); void ctgClearSubTaskRes(SCtgSubRes *pRes); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 65c6a0f76b..9a666eb4ff 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -1709,12 +1709,12 @@ int32_t ctgLaunchJob(SCtgJob *pJob) { for (int32_t i = 0; i < taskNum; ++i) { SCtgTask *pTask = taosArrayGet(pJob->pTasks, i); + pTask->pBatchs = pJob->pBatchs; qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); pTask->status = CTG_TASK_LAUNCHED; - pTask->pBatchs = pJob->pBatchs; } if (taskNum <= 0) { diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 04ffbca629..8b1d125a58 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -24,13 +24,14 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; SArray* pTaskId = cbParam->taskId; + SCatalog* pCtg = pJob->pCtg; int32_t taskNum = taosArrayGetSize(pTaskId); SDataBuf taskMsg = *pMsg; int32_t offset = 0; - int32_t msgNum = (pMsg->pData && (pMsg->len > 0)) ? htonl(pMsg->pData) : 0; + int32_t msgNum = (pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0; ASSERT(taskNum == msgNum || 0 == msgNum); - qDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1)); + ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1)); offset += sizeof(msgNum); SBatchRsp rsp = {0}; @@ -41,14 +42,14 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu } for (int32_t i = 0; i < taskNum; ++i) { - int32_t taskId = taosArrayGet(pTaskId, i); - SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskId); + int32_t* taskId = taosArrayGet(pTaskId, i); + SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId); if (msgNum > 0) { - rsp.reqType = htonl(((char*)pMsg->pData) + offset); + rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); offset += sizeof(rsp.reqType); - rsp.msgLen = htonl(((char*)pMsg->pData) + offset); + rsp.msgLen = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); offset += sizeof(rsp.msgLen); - rsp.rspCode = htonl(((char*)pMsg->pData) + offset); + rsp.rspCode = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); offset += sizeof(rsp.rspCode); rsp.msg = ((char*)pMsg->pData) + offset; offset += rsp.msgLen; @@ -65,7 +66,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu pTask->pBatchs = pBatchs; - qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1)); + ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1)); (*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); } @@ -317,15 +318,30 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { goto _return; } + SCatalog* pCtg = pJob->pCtg; + if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) { CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode)); } else { - int32_t taskId = taosArrayGet(cbParam->taskId, 0); - SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskId); + int32_t *taskId = taosArrayGet(cbParam->taskId, 0); + SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId); qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1)); - + +#if CTG_BATCH_FETCH + SHashObj* pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (NULL == pBatchs) { + ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + pTask->pBatchs = pBatchs; +#endif + CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode)); + +#if CTG_BATCH_FETCH + CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs)); +#endif } _return: @@ -393,6 +409,7 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob* pJob, int64_t transporterId = 0; code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, &transporterId, pMsgSendInfo); + pMsgSendInfo = NULL; if (code) { ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code)); CTG_ERR_JRET(code); @@ -426,7 +443,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - newBatch.pConn = pConn; + newBatch.conn = *pConn; req.msgType = msgType; req.msgLen = msgSize; @@ -437,7 +454,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - newBatch.msgSize = sizeof(req) + msgSize - POINTER_BYTES; + newBatch.msgSize = sizeof(SBatchReq) + sizeof(req) + msgSize - POINTER_BYTES; if (vgId > 0) { if (TDMT_VND_TABLE_CFG == msgType) { @@ -459,7 +476,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch->batchId, vgId); + ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId, vgId); return TSDB_CODE_SUCCESS; } @@ -500,7 +517,7 @@ _return: return code; } -int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, void** msg) { +int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { *msg = taosMemoryMalloc(pBatch->msgSize); if (NULL == (*msg)) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -508,8 +525,11 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, void** msg) { int32_t offset = 0; int32_t num = taosArrayGetSize(pBatch->pMsgs); - *(int32_t*)((char*)(*msg) + offset) = htonl(num); - offset += sizeof(num); + SBatchReq *pBatchReq = (SBatchReq*)(*msg); + + pBatchReq->header.vgId = htonl(vgId); + pBatchReq->msgNum = htonl(num); + offset += sizeof(SBatchReq); for (int32_t i = 0; i < num; ++i) { SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i); @@ -534,9 +554,11 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) { size_t len = 0; int32_t* vgId = taosHashGetKey(p, &len); SCtgBatch* pBatch = (SCtgBatch*)p; + + ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); - CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, &msg)); - CTG_ERR_JRET(ctgAsyncSendMsg(pCtg, pBatch->pConn, pJob, pBatch->pTaskIds, pBatch->batchId, + CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg)); + CTG_ERR_JRET(ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize)); p = taosHashIterate(pBatchs, p); @@ -1016,7 +1038,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa .mgmtEps = vgroupInfo->epSet}; #if CTG_BATCH_FETCH - CTG_RET(ctgAddVnodeBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); + CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); #else SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; char dbFName[TSDB_DB_FNAME_LEN]; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 78ecde6003..ed8786170d 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -636,7 +636,6 @@ void initQueryModuleMsgHandle() { queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_BATCH_META)] = queryProcessGetBatchMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryProcessGetSerVerRsp; } From 648a7a12b005c0a5e5c71c4b19e6da28a481d3c3 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 27 Jul 2022 14:02:05 +0800 Subject: [PATCH 3/4] feat: support mnd meta batch fetching --- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mnode/impl/src/mndQuery.c | 99 +++++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeQuery.c | 4 +- source/libs/catalog/inc/catalogInt.h | 11 +++ source/libs/catalog/inc/ctgRemote.h | 7 -- source/libs/catalog/src/ctgAsync.c | 5 +- source/libs/catalog/src/ctgRemote.c | 75 ++++++++++++---- source/libs/catalog/src/ctgUtil.c | 10 +++ 8 files changed, 187 insertions(+), 25 deletions(-) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 22a53f07f6..647af20fcf 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -184,6 +184,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_BATCH_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_CFG, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 5a527b994e..e6e9cae100 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -63,6 +63,104 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { return code; } +int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { + int32_t code = 0; + int32_t offset = 0; + int32_t rspSize = 0; + SBatchReq *batchReq = (SBatchReq*)pMsg->pCont; + int32_t msgNum = ntohl(batchReq->msgNum); + offset += sizeof(SBatchReq); + SBatchMsg req = {0}; + SBatchRsp rsp = {0}; + SRpcMsg reqMsg = *pMsg; + SRpcMsg rspMsg = {0}; + void* pRsp = NULL; + SMnode *pMnode = pMsg->info.node; + + SArray* batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp)); + if (NULL == batchRsp) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + for (int32_t i = 0; i < msgNum; ++i) { + req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset)); + offset += sizeof(req.msgType); + + req.msgLen = ntohl(*(int32_t*)((char*)pMsg->pCont + offset)); + offset += sizeof(req.msgLen); + + req.msg = (char*)pMsg->pCont + offset; + offset += req.msgLen; + + reqMsg.msgType = req.msgType; + reqMsg.pCont = req.msg; + reqMsg.contLen = req.msgLen; + reqMsg.info.rsp = NULL; + reqMsg.info.rspLen = 0; + + MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(req.msgType)]; + if (fp == NULL) { + mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + return -1; + } + + code = (*fp)(&reqMsg); + + rsp.reqType = reqMsg.msgType; + rsp.msgLen = reqMsg.info.rspLen; + rsp.rspCode = code; + rsp.msg = reqMsg.info.rsp; + + taosArrayPush(batchRsp, &rsp); + + rspSize += sizeof(rsp) + rsp.msgLen - POINTER_BYTES; + } + + rspSize += sizeof(int32_t); + offset = 0; + + pRsp = rpcMallocCont(rspSize); + if (pRsp == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + *(int32_t*)((char*)pRsp + offset) = htonl(msgNum); + offset += sizeof(msgNum); + for (int32_t i = 0; i < msgNum; ++i) { + SBatchRsp *p = taosArrayGet(batchRsp, i); + + *(int32_t*)((char*)pRsp + offset) = htonl(p->reqType); + offset += sizeof(p->reqType); + *(int32_t*)((char*)pRsp + offset) = htonl(p->msgLen); + offset += sizeof(p->msgLen); + *(int32_t*)((char*)pRsp + offset) = htonl(p->rspCode); + offset += sizeof(p->rspCode); + memcpy((char*)pRsp + offset, p->msg, p->msgLen); + offset += p->msgLen; + + rpcFreeCont(p->msg); + } + + taosArrayDestroy(batchRsp); + batchRsp = NULL; + +_exit: + + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspSize; + + if (code) { + mError("mnd get batch meta failed cause of %s", tstrerror(code)); + } + + taosArrayDestroyEx(batchRsp, tFreeSBatchRsp); + + return code; +} + int32_t mndInitQuery(SMnode *pMnode) { if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) { mError("failed to init qworker in mnode since %s", terrstr()); @@ -76,6 +174,7 @@ int32_t mndInitQuery(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg); + mndSetMsgHandle(pMnode, TDMT_MND_BATCH_META, mndProcessBatchMetaMsg); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 6b36a66776..71b9d70518 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -349,14 +349,14 @@ _exit: rspMsg.msgType = pMsg->msgType; if (code) { - qError("get batch meta failed cause of %s", tstrerror(code)); + qError("vnd get batch meta failed cause of %s", tstrerror(code)); } taosArrayDestroyEx(batchRsp, tFreeSBatchRsp); tmsgSendRsp(&rspMsg); - return TSDB_CODE_SUCCESS; + return code; } int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 0821a91d6b..1aaa1ecfd7 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -251,6 +251,16 @@ typedef struct SCtgMsgCtx { char* target; } SCtgMsgCtx; + +typedef struct SCtgTaskCallbackParam { + uint64_t queryId; + int64_t refId; + SArray* taskId; + int32_t reqType; + int32_t batchId; +} SCtgTaskCallbackParam; + + typedef struct SCtgTask SCtgTask; typedef int32_t (*ctgSubTaskCbFp)(SCtgTask*); @@ -643,6 +653,7 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, int32_t ctgGetTbCfgCb(SCtgTask *pTask); void ctgFreeHandle(SCatalog* pCatalog); +void ctgFreeMsgSendParam(void* param); void ctgFreeBatch(SCtgBatch *pBatch); void ctgFreeBatchs(SHashObj *pBatchs); int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst); diff --git a/source/libs/catalog/inc/ctgRemote.h b/source/libs/catalog/inc/ctgRemote.h index 72ab43e085..fe0762a88a 100644 --- a/source/libs/catalog/inc/ctgRemote.h +++ b/source/libs/catalog/inc/ctgRemote.h @@ -20,13 +20,6 @@ extern "C" { #endif -typedef struct SCtgTaskCallbackParam { - uint64_t queryId; - int64_t refId; - SArray* taskId; - int32_t reqType; - int32_t batchId; -} SCtgTaskCallbackParam; #ifdef __cplusplus diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 9a666eb4ff..f4cee13ec0 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -783,7 +783,8 @@ int32_t ctgCallSubCb(SCtgTask *pTask) { pParent->subRes.code = code; } } - + + pParent->pBatchs = pTask->pBatchs; CTG_ERR_JRET(pParent->subRes.fp(pParent)); } @@ -1660,6 +1661,7 @@ int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) { if (CTG_TASK_DONE == pSub->status) { pTask->subRes.code = pSub->code; CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res)); + pTask->pBatchs = pSub->pBatchs; CTG_ERR_JRET(pTask->subRes.fp(pTask)); } else { if (NULL == pSub->pParents) { @@ -1697,6 +1699,7 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask)); if (newTask) { + pSub->pBatchs = pTask->pBatchs; CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub)); pSub->status = CTG_TASK_LAUNCHED; } diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 8b1d125a58..3ec30ac39f 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -18,7 +18,6 @@ #include "tname.h" #include "catalogInt.h" #include "systable.h" -#include "ctgRemote.h" #include "tref.h" int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf *pMsg, int32_t rspCode) { @@ -361,7 +360,7 @@ int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int3 SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == msgSendInfo) { qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); - CTG_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } SCtgTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam)); @@ -377,7 +376,7 @@ int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int3 param->batchId = batchId; msgSendInfo->param = param; - msgSendInfo->paramFreeFp = taosMemoryFree; + msgSendInfo->paramFreeFp = ctgFreeMsgSendParam; msgSendInfo->fp = ctgHandleMsgCallback; *pMsgSendInfo = msgSendInfo; @@ -386,8 +385,8 @@ int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int3 _return: - taosMemoryFree(param); - taosMemoryFree(msgSendInfo); + taosArrayDestroy(pTaskId); + destroySendMsgInfo(msgSendInfo); CTG_RET(code); } @@ -543,6 +542,8 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { ASSERT(pBatch->msgSize == offset); + qDebug("batch req %d to vg %d msg built with %d meta reqs", pBatch->batchId, vgId, num); + return TSDB_CODE_SUCCESS; } @@ -558,9 +559,11 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) { ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg)); - CTG_ERR_JRET(ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, - pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize)); - + code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, + pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize); + pBatch->pTaskIds = NULL; + CTG_ERR_JRET(code); + p = taosHashIterate(pBatchs, p); } @@ -598,6 +601,9 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, NULL)); +#if CTG_BATCH_FETCH + CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -605,6 +611,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray taosArrayPush(pTaskId, &pTask->taskId); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { @@ -640,6 +647,9 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray if (pTask) { CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL)); +#if CTG_BATCH_FETCH + CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -647,6 +657,7 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray taosArrayPush(pTaskId, &pTask->taskId); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { @@ -687,6 +698,9 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, input->db)); +#if CTG_BATCH_FETCH + CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -694,6 +708,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU taosArrayPush(pTaskId, &pTask->taskId); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { @@ -733,6 +748,9 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)dbFName)); +#if CTG_BATCH_FETCH + CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -740,6 +758,7 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char taosArrayPush(pTaskId, &pTask->taskId); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { @@ -778,7 +797,10 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)indexName)); - + +#if CTG_BATCH_FETCH + CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -786,6 +808,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const taosArrayPush(pTaskId, &pTask->taskId); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { @@ -827,7 +850,10 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName)); - + +#if CTG_BATCH_FETCH + CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -835,6 +861,7 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n taosArrayPush(pTaskId, &pTask->taskId); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { @@ -873,7 +900,10 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)funcName)); - + +#if CTG_BATCH_FETCH + CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -881,6 +911,7 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch taosArrayPush(pTaskId, &pTask->taskId); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { @@ -919,7 +950,10 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)user)); - + +#if CTG_BATCH_FETCH + CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -927,6 +961,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const taosArrayPush(pTaskId, &pTask->taskId); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { @@ -970,7 +1005,9 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName)); - +#if CTG_BATCH_FETCH + CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -978,6 +1015,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char taosArrayPush(pTaskId, &pTask->taskId); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { @@ -1151,7 +1189,9 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S if (pTask) { CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName)); - +#if CTG_BATCH_FETCH + CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -1159,6 +1199,7 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S taosArrayPush(pTaskId, &pTask->taskId); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { @@ -1193,7 +1234,10 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou if (pTask) { CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL)); - + +#if CTG_BATCH_FETCH + CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -1201,6 +1245,7 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou taosArrayPush(pTaskId, &pTask->taskId); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 8d33e5596f..e61becbe17 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -19,6 +19,16 @@ #include "catalogInt.h" #include "systable.h" +void ctgFreeMsgSendParam(void* param) { + if (NULL == param) { + return; + } + + SCtgTaskCallbackParam* pParam = (SCtgTaskCallbackParam*)param; + taosArrayDestroy(pParam->taskId); + + taosMemoryFree(param); +} void ctgFreeBatch(SCtgBatch *pBatch) { if (NULL == pBatch) { From 75974255c7e50c6f75b07922318d7a9f8bafb0f7 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 27 Jul 2022 16:26:41 +0800 Subject: [PATCH 4/4] fix: fix mnd meta issue --- source/dnode/mnode/impl/src/mndQuery.c | 8 +++++--- source/libs/catalog/src/ctgRemote.c | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index e6e9cae100..2beeb10335 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -105,12 +105,14 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { terrno = TSDB_CODE_MSG_NOT_PROCESSED; return -1; } - - code = (*fp)(&reqMsg); + if ((*fp)(&reqMsg)) { + rsp.rspCode = terrno; + } else { + rsp.rspCode = 0; + } rsp.reqType = reqMsg.msgType; rsp.msgLen = reqMsg.info.rspLen; - rsp.rspCode = code; rsp.msg = reqMsg.info.rsp; taosArrayPush(batchRsp, &rsp); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 3ec30ac39f..55bfc88a49 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -27,7 +27,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu int32_t taskNum = taosArrayGetSize(pTaskId); SDataBuf taskMsg = *pMsg; int32_t offset = 0; - int32_t msgNum = (pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0; + int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0; ASSERT(taskNum == msgNum || 0 == msgNum); ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1));