feat: support vnode meta batch fetching
This commit is contained in:
parent
4cf20418cd
commit
602d2197fb
|
@ -3051,6 +3051,12 @@ typedef struct {
|
|||
void* msg;
|
||||
} SBatchMsg;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead header;
|
||||
int32_t msgNum;
|
||||
SBatchMsg msg[];
|
||||
} SBatchReq;
|
||||
|
||||
typedef struct {
|
||||
int32_t reqType;
|
||||
int32_t msgLen;
|
||||
|
@ -3063,7 +3069,7 @@ static FORCE_INLINE void tFreeSBatchRsp(void *p) {
|
|||
return;
|
||||
}
|
||||
|
||||
SBatchRsp* pRsp = (SBatchRsp*);
|
||||
SBatchRsp* pRsp = (SBatchRsp*)p;
|
||||
taosMemoryFree(pRsp->msg);
|
||||
}
|
||||
|
||||
|
|
|
@ -136,6 +136,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_INDEX, "get-table-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL)
|
||||
|
|
|
@ -80,6 +80,7 @@ int32_t vnodeQueryOpen(SVnode* pVnode);
|
|||
void vnodeQueryClose(SVnode* pVnode);
|
||||
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
|
||||
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
|
||||
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg);
|
||||
|
||||
// vnodeCommit.c
|
||||
int32_t vnodeBegin(SVnode* pVnode);
|
||||
|
|
|
@ -124,6 +124,8 @@ _exit:
|
|||
|
||||
if (direct) {
|
||||
tmsgSendRsp(&rpcMsg);
|
||||
} else {
|
||||
*pMsg = rpcMsg;
|
||||
}
|
||||
|
||||
taosMemoryFree(metaRsp.pSchemas);
|
||||
|
@ -241,6 +243,8 @@ _exit:
|
|||
|
||||
if (direct) {
|
||||
tmsgSendRsp(&rpcMsg);
|
||||
} else {
|
||||
*pMsg = rpcMsg;
|
||||
}
|
||||
|
||||
tFreeSTableCfgRsp(&cfgRsp);
|
||||
|
@ -253,8 +257,9 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
int32_t code = 0;
|
||||
int32_t offset = 0;
|
||||
int32_t rspSize = 0;
|
||||
int32_t msgNum = ntohl(pMsg->pCont);
|
||||
offset += sizeof(msgNum);
|
||||
SBatchReq *batchReq = (SBatchReq*)pMsg->pCont;
|
||||
int32_t msgNum = ntohl(batchReq->msgNum);
|
||||
offset += sizeof(SBatchReq);
|
||||
SBatchMsg req = {0};
|
||||
SBatchRsp rsp = {0};
|
||||
SRpcMsg reqMsg = *pMsg;
|
||||
|
@ -268,11 +273,11 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < msgNum; ++i) {
|
||||
req.msgType = ntohl((char*)pMsg->pCont + offset);
|
||||
offset += req.msgType;
|
||||
req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
|
||||
offset += sizeof(req.msgType);
|
||||
|
||||
req.msgLen = ntohl((char*)pMsg->pCont + offset);
|
||||
offset += req.msgLen;
|
||||
req.msgLen = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
|
||||
offset += sizeof(req.msgLen);
|
||||
|
||||
req.msg = (char*)pMsg->pCont + offset;
|
||||
offset += req.msgLen;
|
||||
|
|
|
@ -296,7 +296,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||
vTrace("message in fetch queue is processing");
|
||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META ||
|
||||
pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType = TDMT_VND_BATCH_META) &&
|
||||
pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||
!vnodeIsLeader(pVnode)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
return 0;
|
||||
|
|
|
@ -208,7 +208,7 @@ typedef struct SCtgBatch {
|
|||
int32_t msgType;
|
||||
int32_t msgSize;
|
||||
SArray* pMsgs;
|
||||
SRequestConnInfo *pConn;
|
||||
SRequestConnInfo conn;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
SArray* pTaskIds;
|
||||
} SCtgBatch;
|
||||
|
@ -634,6 +634,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
|
|||
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask);
|
||||
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask);
|
||||
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask);
|
||||
int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs);
|
||||
|
||||
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param);
|
||||
int32_t ctgLaunchJob(SCtgJob *pJob);
|
||||
|
@ -660,7 +661,7 @@ int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2);
|
|||
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
|
||||
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
|
||||
char * ctgTaskTypeStr(CTG_TASK_TYPE type);
|
||||
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask);
|
||||
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId);
|
||||
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes);
|
||||
void ctgFreeSTableIndex(void *info);
|
||||
void ctgClearSubTaskRes(SCtgSubRes *pRes);
|
||||
|
|
|
@ -1709,12 +1709,12 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
|
|||
|
||||
for (int32_t i = 0; i < taskNum; ++i) {
|
||||
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i);
|
||||
pTask->pBatchs = pJob->pBatchs;
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId);
|
||||
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
|
||||
|
||||
pTask->status = CTG_TASK_LAUNCHED;
|
||||
pTask->pBatchs = pJob->pBatchs;
|
||||
}
|
||||
|
||||
if (taskNum <= 0) {
|
||||
|
|
|
@ -24,13 +24,14 @@
|
|||
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
SArray* pTaskId = cbParam->taskId;
|
||||
SCatalog* pCtg = pJob->pCtg;
|
||||
int32_t taskNum = taosArrayGetSize(pTaskId);
|
||||
SDataBuf taskMsg = *pMsg;
|
||||
int32_t offset = 0;
|
||||
int32_t msgNum = (pMsg->pData && (pMsg->len > 0)) ? htonl(pMsg->pData) : 0;
|
||||
int32_t msgNum = (pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0;
|
||||
ASSERT(taskNum == msgNum || 0 == msgNum);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1));
|
||||
ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1));
|
||||
|
||||
offset += sizeof(msgNum);
|
||||
SBatchRsp rsp = {0};
|
||||
|
@ -41,14 +42,14 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < taskNum; ++i) {
|
||||
int32_t taskId = taosArrayGet(pTaskId, i);
|
||||
SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskId);
|
||||
int32_t* taskId = taosArrayGet(pTaskId, i);
|
||||
SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId);
|
||||
if (msgNum > 0) {
|
||||
rsp.reqType = htonl(((char*)pMsg->pData) + offset);
|
||||
rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
|
||||
offset += sizeof(rsp.reqType);
|
||||
rsp.msgLen = htonl(((char*)pMsg->pData) + offset);
|
||||
rsp.msgLen = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
|
||||
offset += sizeof(rsp.msgLen);
|
||||
rsp.rspCode = htonl(((char*)pMsg->pData) + offset);
|
||||
rsp.rspCode = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
|
||||
offset += sizeof(rsp.rspCode);
|
||||
rsp.msg = ((char*)pMsg->pData) + offset;
|
||||
offset += rsp.msgLen;
|
||||
|
@ -65,7 +66,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
|
|||
|
||||
pTask->pBatchs = pBatchs;
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1));
|
||||
ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1));
|
||||
|
||||
(*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
|
||||
}
|
||||
|
@ -317,15 +318,30 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
|
|||
goto _return;
|
||||
}
|
||||
|
||||
SCatalog* pCtg = pJob->pCtg;
|
||||
|
||||
if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) {
|
||||
CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode));
|
||||
} else {
|
||||
int32_t taskId = taosArrayGet(cbParam->taskId, 0);
|
||||
SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskId);
|
||||
int32_t *taskId = taosArrayGet(cbParam->taskId, 0);
|
||||
SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1));
|
||||
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
SHashObj* pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (NULL == pBatchs) {
|
||||
ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
pTask->pBatchs = pBatchs;
|
||||
#endif
|
||||
|
||||
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode));
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
|
||||
#endif
|
||||
}
|
||||
|
||||
_return:
|
||||
|
@ -393,6 +409,7 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob* pJob,
|
|||
|
||||
int64_t transporterId = 0;
|
||||
code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, &transporterId, pMsgSendInfo);
|
||||
pMsgSendInfo = NULL;
|
||||
if (code) {
|
||||
ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code));
|
||||
CTG_ERR_JRET(code);
|
||||
|
@ -426,7 +443,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
|
|||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
newBatch.pConn = pConn;
|
||||
newBatch.conn = *pConn;
|
||||
|
||||
req.msgType = msgType;
|
||||
req.msgLen = msgSize;
|
||||
|
@ -437,7 +454,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
|
|||
if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) {
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
newBatch.msgSize = sizeof(req) + msgSize - POINTER_BYTES;
|
||||
newBatch.msgSize = sizeof(SBatchReq) + sizeof(req) + msgSize - POINTER_BYTES;
|
||||
|
||||
if (vgId > 0) {
|
||||
if (TDMT_VND_TABLE_CFG == msgType) {
|
||||
|
@ -459,7 +476,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
|
|||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch->batchId, vgId);
|
||||
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId, vgId);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -500,7 +517,7 @@ _return:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, void** msg) {
|
||||
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
|
||||
*msg = taosMemoryMalloc(pBatch->msgSize);
|
||||
if (NULL == (*msg)) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
@ -508,8 +525,11 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, void** msg) {
|
|||
|
||||
int32_t offset = 0;
|
||||
int32_t num = taosArrayGetSize(pBatch->pMsgs);
|
||||
*(int32_t*)((char*)(*msg) + offset) = htonl(num);
|
||||
offset += sizeof(num);
|
||||
SBatchReq *pBatchReq = (SBatchReq*)(*msg);
|
||||
|
||||
pBatchReq->header.vgId = htonl(vgId);
|
||||
pBatchReq->msgNum = htonl(num);
|
||||
offset += sizeof(SBatchReq);
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i);
|
||||
|
@ -534,9 +554,11 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) {
|
|||
size_t len = 0;
|
||||
int32_t* vgId = taosHashGetKey(p, &len);
|
||||
SCtgBatch* pBatch = (SCtgBatch*)p;
|
||||
|
||||
ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
|
||||
|
||||
CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, &msg));
|
||||
CTG_ERR_JRET(ctgAsyncSendMsg(pCtg, pBatch->pConn, pJob, pBatch->pTaskIds, pBatch->batchId,
|
||||
CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg));
|
||||
CTG_ERR_JRET(ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId,
|
||||
pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize));
|
||||
|
||||
p = taosHashIterate(pBatchs, p);
|
||||
|
@ -1016,7 +1038,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
|
|||
.mgmtEps = vgroupInfo->epSet};
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_RET(ctgAddVnodeBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen));
|
||||
CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen));
|
||||
#else
|
||||
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
|
|
|
@ -636,7 +636,6 @@ void initQueryModuleMsgHandle() {
|
|||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_BATCH_META)] = queryProcessGetBatchMetaRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryProcessGetSerVerRsp;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue