Merge pull request #9457 from taosdata/feature/qnode

Feature/qnode
This commit is contained in:
dapan1121 2021-12-28 18:31:18 +08:00 committed by GitHub
commit d635295ba9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 27 additions and 37 deletions

View File

@ -400,7 +400,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
pMsgSendInfo->msgInfo.len = sizeof(SConnectMsg);
pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->fp = handleRequestRspFp[pMsgSendInfo->msgType];
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
pMsgSendInfo->param = pRequest;
SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg));

View File

@ -79,7 +79,7 @@ static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SMsgSendInfo* pMsgSe
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg);
pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->param = pRequest;
pMsgSendInfo->fp = handleRequestRspFp[pMsgSendInfo->msgType];
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
if (pRetrieveMsg == NULL) {
@ -104,7 +104,7 @@ SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) {
pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->param = pRequest;
pMsgSendInfo->fp = (handleRequestRspFp[pRequest->type] == NULL)? genericRspCallback:handleRequestRspFp[pRequest->type];
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)];
}
return pMsgSendInfo;
@ -290,11 +290,11 @@ void initMsgHandleFp() {
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
#endif
handleRequestRspFp[TDMT_MND_CONNECT] = processConnectRsp;
handleRequestRspFp[TDMT_MND_SHOW] = processShowRsp;
handleRequestRspFp[TDMT_MND_SHOW_RETRIEVE] = processRetrieveMnodeRsp;
handleRequestRspFp[TDMT_MND_CREATE_DB] = processCreateDbRsp;
handleRequestRspFp[TDMT_MND_USE_DB] = processUseDbRsp;
handleRequestRspFp[TDMT_MND_CREATE_STB] = processCreateTableRsp;
handleRequestRspFp[TDMT_MND_DROP_DB] = processDropDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_SHOW)] = processShowRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = processRetrieveMnodeRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp;
}

View File

@ -767,7 +767,7 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
return -1;
}
memcpy(pMeta->stbFname, pStb->name, TSDB_TABLE_FNAME_LEN);
memcpy(pMeta->tbFname, pStb->name, TSDB_TABLE_FNAME_LEN);
pMeta->numOfTags = htonl(pStb->numOfTags);
pMeta->numOfColumns = htonl(pStb->numOfColumns);
pMeta->precision = pDb->cfg.precision;

View File

@ -49,22 +49,11 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0;
CTG_ERR_RET(queryBuildMsg[TDMT_MND_USE_DB](input, &msg, 0, &msgLen));
char *pMsg = rpcMallocCont(msgLen);
if (NULL == pMsg) {
ctgError("rpc malloc %d failed", msgLen);
tfree(msg);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
memcpy(pMsg, msg, msgLen);
tfree(msg);
CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen));
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_USE_DB,
.pCont = pMsg,
.pCont = msg,
.contLen = msgLen,
};
@ -76,7 +65,7 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
CTG_ERR_RET(rpcRsp.code);
}
CTG_ERR_RET(queryProcessMsgRsp[TDMT_MND_USE_DB](out, rpcRsp.pCont, rpcRsp.contLen));
CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen));
return TSDB_CODE_SUCCESS;
}
@ -160,7 +149,7 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0;
CTG_ERR_RET(queryBuildMsg[TDMT_MND_STB_META](&bInput, &msg, 0, &msgLen));
CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen));
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_STB_META,
@ -177,7 +166,7 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
CTG_ERR_RET(rpcRsp.code);
}
CTG_ERR_RET(queryProcessMsgRsp[TDMT_MND_STB_META](output, rpcRsp.pCont, rpcRsp.contLen));
CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen));
return TSDB_CODE_SUCCESS;
}
@ -197,7 +186,7 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE
SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0;
CTG_ERR_RET(queryBuildMsg[TDMT_VND_TABLE_META](&bInput, &msg, 0, &msgLen));
CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen));
SRpcMsg rpcMsg = {
.msgType = TDMT_VND_TABLE_META,
@ -217,7 +206,7 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE
CTG_ERR_RET(rpcRsp.code);
}
CTG_ERR_RET(queryProcessMsgRsp[TDMT_VND_TABLE_META](output, rpcRsp.pCont, rpcRsp.contLen));
CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen));
return TSDB_CODE_SUCCESS;
}

View File

@ -16,6 +16,7 @@
#include "tmsg.h"
#include "queryInt.h"
#include "query.h"
#include "trpc.h"
int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0};
@ -31,7 +32,7 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
int32_t estimateSize = sizeof(STableInfoMsg);
if (NULL == *msg || msgSize < estimateSize) {
tfree(*msg);
*msg = calloc(1, estimateSize);
*msg = rpcMallocCont(estimateSize);
if (NULL == *msg) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
@ -59,7 +60,7 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms
int32_t estimateSize = sizeof(SUseDbMsg);
if (NULL == *msg || msgSize < estimateSize) {
tfree(*msg);
*msg = calloc(1, estimateSize);
*msg = rpcMallocCont(estimateSize);
if (NULL == *msg) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
@ -265,13 +266,13 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
void initQueryModuleMsgHandle() {
queryBuildMsg[TDMT_VND_TABLE_META] = queryBuildTableMetaReqMsg;
queryBuildMsg[TDMT_MND_STB_META] = queryBuildTableMetaReqMsg;
queryBuildMsg[TDMT_MND_USE_DB] = queryBuildUseDbMsg;
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)] = queryBuildTableMetaReqMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
queryProcessMsgRsp[TDMT_VND_TABLE_META] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TDMT_MND_STB_META] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TDMT_MND_USE_DB] = queryProcessUseDBRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
}