diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9d23557ee7..0571f8382a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3051,6 +3051,12 @@ typedef struct { void* msg; } SBatchMsg; +typedef struct { + SMsgHead header; + int32_t msgNum; + SBatchMsg msg[]; +} SBatchReq; + typedef struct { int32_t reqType; int32_t msgLen; @@ -3063,7 +3069,7 @@ static FORCE_INLINE void tFreeSBatchRsp(void *p) { return; } - SBatchRsp* pRsp = (SBatchRsp*); + SBatchRsp* pRsp = (SBatchRsp*)p; taosMemoryFree(pRsp->msg); } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 2d5a703b62..20dc04631e 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -136,6 +136,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_INDEX, "get-table-index", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 44281ea38e..dd1facb462 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -80,6 +80,7 @@ int32_t vnodeQueryOpen(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct); +int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index bf846dba13..6b36a66776 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -124,6 +124,8 @@ _exit: if (direct) { tmsgSendRsp(&rpcMsg); + } else { + *pMsg = rpcMsg; } taosMemoryFree(metaRsp.pSchemas); @@ -241,6 +243,8 @@ _exit: if (direct) { tmsgSendRsp(&rpcMsg); + } else { + *pMsg = rpcMsg; } tFreeSTableCfgRsp(&cfgRsp); @@ -253,8 +257,9 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; int32_t offset = 0; int32_t rspSize = 0; - int32_t msgNum = ntohl(pMsg->pCont); - offset += sizeof(msgNum); + SBatchReq *batchReq = (SBatchReq*)pMsg->pCont; + int32_t msgNum = ntohl(batchReq->msgNum); + offset += sizeof(SBatchReq); SBatchMsg req = {0}; SBatchRsp rsp = {0}; SRpcMsg reqMsg = *pMsg; @@ -268,11 +273,11 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { } for (int32_t i = 0; i < msgNum; ++i) { - req.msgType = ntohl((char*)pMsg->pCont + offset); - offset += req.msgType; + req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset)); + offset += sizeof(req.msgType); - req.msgLen = ntohl((char*)pMsg->pCont + offset); - offset += req.msgLen; + req.msgLen = ntohl(*(int32_t*)((char*)pMsg->pCont + offset)); + offset += sizeof(req.msgLen); req.msg = (char*)pMsg->pCont + offset; offset += req.msgLen; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 830ab6e37c..f560d6a8c2 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -296,7 +296,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("message in fetch queue is processing"); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || - pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType = TDMT_VND_BATCH_META) && + pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) && !vnodeIsLeader(pVnode)) { vnodeRedirectRpcMsg(pVnode, pMsg); return 0; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 4bd85792cd..0821a91d6b 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -208,7 +208,7 @@ typedef struct SCtgBatch { int32_t msgType; int32_t msgSize; SArray* pMsgs; - SRequestConnInfo *pConn; + SRequestConnInfo conn; char dbFName[TSDB_DB_FNAME_LEN]; SArray* pTaskIds; } SCtgBatch; @@ -634,6 +634,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask); int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask); int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask); +int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs); int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param); int32_t ctgLaunchJob(SCtgJob *pJob); @@ -660,7 +661,7 @@ int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2); void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput); int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target); char * ctgTaskTypeStr(CTG_TASK_TYPE type); -int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask); +int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId); int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes); void ctgFreeSTableIndex(void *info); void ctgClearSubTaskRes(SCtgSubRes *pRes); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 65c6a0f76b..9a666eb4ff 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -1709,12 +1709,12 @@ int32_t ctgLaunchJob(SCtgJob *pJob) { for (int32_t i = 0; i < taskNum; ++i) { SCtgTask *pTask = taosArrayGet(pJob->pTasks, i); + pTask->pBatchs = pJob->pBatchs; qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); pTask->status = CTG_TASK_LAUNCHED; - pTask->pBatchs = pJob->pBatchs; } if (taskNum <= 0) { diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 04ffbca629..8b1d125a58 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -24,13 +24,14 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; SArray* pTaskId = cbParam->taskId; + SCatalog* pCtg = pJob->pCtg; int32_t taskNum = taosArrayGetSize(pTaskId); SDataBuf taskMsg = *pMsg; int32_t offset = 0; - int32_t msgNum = (pMsg->pData && (pMsg->len > 0)) ? htonl(pMsg->pData) : 0; + int32_t msgNum = (pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0; ASSERT(taskNum == msgNum || 0 == msgNum); - qDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1)); + ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1)); offset += sizeof(msgNum); SBatchRsp rsp = {0}; @@ -41,14 +42,14 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu } for (int32_t i = 0; i < taskNum; ++i) { - int32_t taskId = taosArrayGet(pTaskId, i); - SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskId); + int32_t* taskId = taosArrayGet(pTaskId, i); + SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId); if (msgNum > 0) { - rsp.reqType = htonl(((char*)pMsg->pData) + offset); + rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); offset += sizeof(rsp.reqType); - rsp.msgLen = htonl(((char*)pMsg->pData) + offset); + rsp.msgLen = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); offset += sizeof(rsp.msgLen); - rsp.rspCode = htonl(((char*)pMsg->pData) + offset); + rsp.rspCode = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); offset += sizeof(rsp.rspCode); rsp.msg = ((char*)pMsg->pData) + offset; offset += rsp.msgLen; @@ -65,7 +66,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu pTask->pBatchs = pBatchs; - qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1)); + ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1)); (*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); } @@ -317,15 +318,30 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { goto _return; } + SCatalog* pCtg = pJob->pCtg; + if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) { CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode)); } else { - int32_t taskId = taosArrayGet(cbParam->taskId, 0); - SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskId); + int32_t *taskId = taosArrayGet(cbParam->taskId, 0); + SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId); qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1)); - + +#if CTG_BATCH_FETCH + SHashObj* pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (NULL == pBatchs) { + ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + pTask->pBatchs = pBatchs; +#endif + CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode)); + +#if CTG_BATCH_FETCH + CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs)); +#endif } _return: @@ -393,6 +409,7 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob* pJob, int64_t transporterId = 0; code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, &transporterId, pMsgSendInfo); + pMsgSendInfo = NULL; if (code) { ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code)); CTG_ERR_JRET(code); @@ -426,7 +443,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - newBatch.pConn = pConn; + newBatch.conn = *pConn; req.msgType = msgType; req.msgLen = msgSize; @@ -437,7 +454,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - newBatch.msgSize = sizeof(req) + msgSize - POINTER_BYTES; + newBatch.msgSize = sizeof(SBatchReq) + sizeof(req) + msgSize - POINTER_BYTES; if (vgId > 0) { if (TDMT_VND_TABLE_CFG == msgType) { @@ -459,7 +476,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch->batchId, vgId); + ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId, vgId); return TSDB_CODE_SUCCESS; } @@ -500,7 +517,7 @@ _return: return code; } -int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, void** msg) { +int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { *msg = taosMemoryMalloc(pBatch->msgSize); if (NULL == (*msg)) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -508,8 +525,11 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, void** msg) { int32_t offset = 0; int32_t num = taosArrayGetSize(pBatch->pMsgs); - *(int32_t*)((char*)(*msg) + offset) = htonl(num); - offset += sizeof(num); + SBatchReq *pBatchReq = (SBatchReq*)(*msg); + + pBatchReq->header.vgId = htonl(vgId); + pBatchReq->msgNum = htonl(num); + offset += sizeof(SBatchReq); for (int32_t i = 0; i < num; ++i) { SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i); @@ -534,9 +554,11 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) { size_t len = 0; int32_t* vgId = taosHashGetKey(p, &len); SCtgBatch* pBatch = (SCtgBatch*)p; + + ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); - CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, &msg)); - CTG_ERR_JRET(ctgAsyncSendMsg(pCtg, pBatch->pConn, pJob, pBatch->pTaskIds, pBatch->batchId, + CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg)); + CTG_ERR_JRET(ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize)); p = taosHashIterate(pBatchs, p); @@ -1016,7 +1038,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa .mgmtEps = vgroupInfo->epSet}; #if CTG_BATCH_FETCH - CTG_RET(ctgAddVnodeBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); + CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); #else SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; char dbFName[TSDB_DB_FNAME_LEN]; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 78ecde6003..ed8786170d 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -636,7 +636,6 @@ void initQueryModuleMsgHandle() { queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_BATCH_META)] = queryProcessGetBatchMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryProcessGetSerVerRsp; }