refactor: adjust msgcb
This commit is contained in:
parent
2ee38b94fd
commit
7341f68cbe
|
@ -36,9 +36,9 @@ typedef struct SBnodeMgmt {
|
||||||
|
|
||||||
// bmHandle.c
|
// bmHandle.c
|
||||||
SArray *bmGetMsgHandles();
|
SArray *bmGetMsgHandles();
|
||||||
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq);
|
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
||||||
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq);
|
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
||||||
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq);
|
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
|
||||||
// bmWorker.c
|
// bmWorker.c
|
||||||
int32_t bmStartWorker(SBnodeMgmt *pMgmt);
|
int32_t bmStartWorker(SBnodeMgmt *pMgmt);
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
|
|
||||||
void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
|
void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
|
||||||
|
|
||||||
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
SMonBmInfo bmInfo = {0};
|
SMonBmInfo bmInfo = {0};
|
||||||
bmGetMonitorInfo(pMgmt, &bmInfo);
|
bmGetMonitorInfo(pMgmt, &bmInfo);
|
||||||
dmGetMonitorSystemInfo(&bmInfo.sys);
|
dmGetMonitorSystemInfo(&bmInfo.sys);
|
||||||
|
@ -37,17 +37,15 @@ int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo);
|
tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo);
|
||||||
pReq->info.rsp = pRsp;
|
pMsg->info.rsp = pRsp;
|
||||||
pReq->info.rspLen = rspLen;
|
pMsg->info.rspLen = rspLen;
|
||||||
tFreeSMonBmInfo(&bmInfo);
|
tFreeSMonBmInfo(&bmInfo);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
SRpcMsg *pReq = pMsg;
|
|
||||||
|
|
||||||
SDCreateBnodeReq createReq = {0};
|
SDCreateBnodeReq createReq = {0};
|
||||||
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -68,10 +66,8 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
SRpcMsg *pReq = pMsg;
|
|
||||||
|
|
||||||
SDDropBnodeReq dropReq = {0};
|
SDDropBnodeReq dropReq = {0};
|
||||||
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,18 +41,18 @@ typedef struct SMnodeMgmt {
|
||||||
|
|
||||||
// mmFile.c
|
// mmFile.c
|
||||||
int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed);
|
int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed);
|
||||||
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed);
|
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed);
|
||||||
|
|
||||||
// mmInt.c
|
// mmInt.c
|
||||||
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq);
|
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg);
|
||||||
|
|
||||||
// mmHandle.c
|
// mmHandle.c
|
||||||
SArray *mmGetMsgHandles();
|
SArray *mmGetMsgHandles();
|
||||||
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
||||||
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
||||||
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq);
|
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq);
|
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
|
||||||
// mmWorker.c
|
// mmWorker.c
|
||||||
int32_t mmStartWorker(SMnodeMgmt *pMgmt);
|
int32_t mmStartWorker(SMnodeMgmt *pMgmt);
|
||||||
|
@ -62,10 +62,10 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
|
int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
|
int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
|
int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
|
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,7 +104,7 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) {
|
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) {
|
||||||
char file[PATH_MAX] = {0};
|
char file[PATH_MAX] = {0};
|
||||||
char realfile[PATH_MAX] = {0};
|
char realfile[PATH_MAX] = {0};
|
||||||
snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
|
snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
|
||||||
|
@ -124,11 +124,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) {
|
||||||
len += snprintf(content + len, maxLen - len, "{\n");
|
len += snprintf(content + len, maxLen - len, "{\n");
|
||||||
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
|
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
|
||||||
|
|
||||||
int8_t replica = (pReq != NULL ? pReq->replica : pMgmt->replica);
|
int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica);
|
||||||
for (int32_t i = 0; i < replica; ++i) {
|
for (int32_t i = 0; i < replica; ++i) {
|
||||||
SReplica *pReplica = &pMgmt->replicas[i];
|
SReplica *pReplica = &pMgmt->replicas[i];
|
||||||
if (pReq != NULL) {
|
if (pMsg != NULL) {
|
||||||
pReplica = &pReq->replicas[i];
|
pReplica = &pMsg->replicas[i];
|
||||||
}
|
}
|
||||||
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
|
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
|
||||||
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
|
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
|
||||||
|
|
|
@ -25,7 +25,7 @@ void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
|
||||||
mndGetLoad(pMgmt->pMnode, &pInfo->load);
|
mndGetLoad(pMgmt->pMnode, &pInfo->load);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
SMonMmInfo mmInfo = {0};
|
SMonMmInfo mmInfo = {0};
|
||||||
mmGetMonitorInfo(pMgmt, &mmInfo);
|
mmGetMonitorInfo(pMgmt, &mmInfo);
|
||||||
dmGetMonitorSystemInfo(&mmInfo.sys);
|
dmGetMonitorSystemInfo(&mmInfo.sys);
|
||||||
|
@ -44,13 +44,13 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo);
|
tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo);
|
||||||
pReq->info.rsp = pRsp;
|
pMsg->info.rsp = pRsp;
|
||||||
pReq->info.rspLen = rspLen;
|
pMsg->info.rspLen = rspLen;
|
||||||
tFreeSMonMmInfo(&mmInfo);
|
tFreeSMonMmInfo(&mmInfo);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
SMonMloadInfo mloads = {0};
|
SMonMloadInfo mloads = {0};
|
||||||
mmGetMnodeLoads(pMgmt, &mloads);
|
mmGetMnodeLoads(pMgmt, &mloads);
|
||||||
|
|
||||||
|
@ -67,16 +67,14 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tSerializeSMonMloadInfo(pRsp, rspLen, &mloads);
|
tSerializeSMonMloadInfo(pRsp, rspLen, &mloads);
|
||||||
pReq->info.rsp = pRsp;
|
pMsg->info.rsp = pRsp;
|
||||||
pReq->info.rspLen = rspLen;
|
pMsg->info.rspLen = rspLen;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
SRpcMsg *pReq = pMsg;
|
|
||||||
|
|
||||||
SDCreateMnodeReq createReq = {0};
|
SDCreateMnodeReq createReq = {0};
|
||||||
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -101,10 +99,8 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
SRpcMsg *pReq = pMsg;
|
|
||||||
|
|
||||||
SDDropMnodeReq dropReq = {0};
|
SDDropMnodeReq dropReq = {0};
|
||||||
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -129,10 +125,8 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
SRpcMsg *pReq = pMsg;
|
|
||||||
|
|
||||||
SDAlterMnodeReq alterReq = {0};
|
SDAlterMnodeReq alterReq = {0};
|
||||||
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
|
if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,9 +87,9 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) {
|
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) {
|
||||||
SMnodeOpt option = {0};
|
SMnodeOpt option = {0};
|
||||||
if (mmBuildOptionFromReq(pMgmt, &option, pReq) != 0) {
|
if (mmBuildOptionFromReq(pMgmt, &option, pMsg) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool deployed = true;
|
bool deployed = true;
|
||||||
if (mmWriteFile(pMgmt, pReq, deployed) != 0) {
|
if (mmWriteFile(pMgmt, pMsg, deployed) != 0) {
|
||||||
dError("failed to write mnode file since %s", terrstr());
|
dError("failed to write mnode file since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -135,7 +135,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||||
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue;
|
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue;
|
||||||
pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue;
|
pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue;
|
||||||
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
|
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
|
||||||
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
|
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue;
|
||||||
pMgmt->msgCb.mgmt = pMgmt;
|
pMgmt->msgCb.mgmt = pMgmt;
|
||||||
|
|
||||||
bool deployed = false;
|
bool deployed = false;
|
||||||
|
|
|
@ -126,7 +126,7 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg);
|
return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); }
|
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); }
|
||||||
|
|
||||||
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
||||||
SSingleWorkerCfg qCfg = {
|
SSingleWorkerCfg qCfg = {
|
||||||
|
|
|
@ -39,7 +39,7 @@ typedef struct SQnodeMgmt {
|
||||||
SArray *qmGetMsgHandles();
|
SArray *qmGetMsgHandles();
|
||||||
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
||||||
int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
||||||
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq);
|
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
|
||||||
// qmWorker.c
|
// qmWorker.c
|
||||||
int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
|
|
||||||
void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
|
void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
|
||||||
|
|
||||||
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
SMonQmInfo qmInfo = {0};
|
SMonQmInfo qmInfo = {0};
|
||||||
qmGetMonitorInfo(pMgmt, &qmInfo);
|
qmGetMonitorInfo(pMgmt, &qmInfo);
|
||||||
dmGetMonitorSystemInfo(&qmInfo.sys);
|
dmGetMonitorSystemInfo(&qmInfo.sys);
|
||||||
|
@ -37,17 +37,15 @@ int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo);
|
tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo);
|
||||||
pReq->info.rsp = pRsp;
|
pMsg->info.rsp = pRsp;
|
||||||
pReq->info.rspLen = rspLen;
|
pMsg->info.rspLen = rspLen;
|
||||||
tFreeSMonQmInfo(&qmInfo);
|
tFreeSMonQmInfo(&qmInfo);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
SRpcMsg *pReq = pMsg;
|
|
||||||
|
|
||||||
SDCreateQnodeReq createReq = {0};
|
SDCreateQnodeReq createReq = {0};
|
||||||
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -68,10 +66,8 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
SRpcMsg *pReq = pMsg;
|
|
||||||
|
|
||||||
SDDropQnodeReq dropReq = {0};
|
SDDropQnodeReq dropReq = {0};
|
||||||
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ typedef struct SSnodeMgmt {
|
||||||
SArray *smGetMsgHandles();
|
SArray *smGetMsgHandles();
|
||||||
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
||||||
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
||||||
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq);
|
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
|
||||||
// smWorker.c
|
// smWorker.c
|
||||||
int32_t smStartWorker(SSnodeMgmt *pMgmt);
|
int32_t smStartWorker(SSnodeMgmt *pMgmt);
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
|
|
||||||
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
|
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
|
||||||
|
|
||||||
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
SMonSmInfo smInfo = {0};
|
SMonSmInfo smInfo = {0};
|
||||||
smGetMonitorInfo(pMgmt, &smInfo);
|
smGetMonitorInfo(pMgmt, &smInfo);
|
||||||
dmGetMonitorSystemInfo(&smInfo.sys);
|
dmGetMonitorSystemInfo(&smInfo.sys);
|
||||||
|
@ -37,17 +37,15 @@ int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tSerializeSMonSmInfo(pRsp, rspLen, &smInfo);
|
tSerializeSMonSmInfo(pRsp, rspLen, &smInfo);
|
||||||
pReq->info.rsp = pRsp;
|
pMsg->info.rsp = pRsp;
|
||||||
pReq->info.rspLen = rspLen;
|
pMsg->info.rspLen = rspLen;
|
||||||
tFreeSMonSmInfo(&smInfo);
|
tFreeSMonSmInfo(&smInfo);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
SRpcMsg *pReq = pMsg;
|
|
||||||
|
|
||||||
SDCreateSnodeReq createReq = {0};
|
SDCreateSnodeReq createReq = {0};
|
||||||
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -68,10 +66,8 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
SRpcMsg *pReq = pMsg;
|
|
||||||
|
|
||||||
SDDropSnodeReq dropReq = {0};
|
SDDropSnodeReq dropReq = {0};
|
||||||
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,10 +84,10 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
|
||||||
|
|
||||||
// vmHandle.c
|
// vmHandle.c
|
||||||
SArray *vmGetMsgHandles();
|
SArray *vmGetMsgHandles();
|
||||||
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
|
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
|
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
|
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
|
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
|
||||||
// vmFile.c
|
// vmFile.c
|
||||||
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
|
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
|
||||||
|
|
|
@ -82,7 +82,7 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
|
||||||
taosArrayDestroy(pVloads);
|
taosArrayDestroy(pVloads);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
SMonVmInfo vmInfo = {0};
|
SMonVmInfo vmInfo = {0};
|
||||||
vmGetMonitorInfo(pMgmt, &vmInfo);
|
vmGetMonitorInfo(pMgmt, &vmInfo);
|
||||||
dmGetMonitorSystemInfo(&vmInfo.sys);
|
dmGetMonitorSystemInfo(&vmInfo.sys);
|
||||||
|
@ -101,13 +101,13 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo);
|
tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo);
|
||||||
pReq->info.rsp = pRsp;
|
pMsg->info.rsp = pRsp;
|
||||||
pReq->info.rspLen = rspLen;
|
pMsg->info.rspLen = rspLen;
|
||||||
tFreeSMonVmInfo(&vmInfo);
|
tFreeSMonVmInfo(&vmInfo);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
SMonVloadInfo vloads = {0};
|
SMonVloadInfo vloads = {0};
|
||||||
vmGetVnodeLoads(pMgmt, &vloads);
|
vmGetVnodeLoads(pMgmt, &vloads);
|
||||||
|
|
||||||
|
@ -124,8 +124,8 @@ int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tSerializeSMonVloadInfo(pRsp, rspLen, &vloads);
|
tSerializeSMonVloadInfo(pRsp, rspLen, &vloads);
|
||||||
pReq->info.rsp = pRsp;
|
pMsg->info.rsp = pRsp;
|
||||||
pReq->info.rspLen = rspLen;
|
pMsg->info.rspLen = rspLen;
|
||||||
tFreeSMonVloadInfo(&vloads);
|
tFreeSMonVloadInfo(&vloads);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -174,12 +174,11 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
SRpcMsg *pReq = pMsg;
|
|
||||||
SCreateVnodeReq createReq = {0};
|
SCreateVnodeReq createReq = {0};
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
char path[TSDB_FILENAME_LEN] = {0};
|
char path[TSDB_FILENAME_LEN] = {0};
|
||||||
|
|
||||||
if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -242,9 +241,8 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
SRpcMsg *pReq = pMsg;
|
|
||||||
SDropVnodeReq dropReq = {0};
|
SDropVnodeReq dropReq = {0};
|
||||||
if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -200,12 +200,12 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) {
|
static void dmSendRpcRedirectRsp(const SRpcMsg *pMsg) {
|
||||||
SDnode *pDnode = dmInstance();
|
SDnode *pDnode = dmInstance();
|
||||||
SEpSet epSet = {0};
|
SEpSet epSet = {0};
|
||||||
dmGetMnodeEpSet(&pDnode->data, &epSet);
|
dmGetMnodeEpSet(&pDnode->data, &epSet);
|
||||||
|
|
||||||
dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse);
|
dDebug("RPC %p, req is redirected, num:%d use:%d", pMsg->info.handle, epSet.numOfEps, epSet.inUse);
|
||||||
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
|
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
|
||||||
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
|
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
|
||||||
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
|
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
|
||||||
|
@ -220,12 +220,14 @@ static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) {
|
||||||
|
|
||||||
SRpcMsg rsp = {
|
SRpcMsg rsp = {
|
||||||
.code = TSDB_CODE_RPC_REDIRECT,
|
.code = TSDB_CODE_RPC_REDIRECT,
|
||||||
.info = pReq->info,
|
.info = pMsg->info,
|
||||||
.contLen = len,
|
.contLen = len,
|
||||||
};
|
};
|
||||||
rsp.pCont = rpcMallocCont(len);
|
rsp.pCont = rpcMallocCont(len);
|
||||||
tSerializeSMEpSet(rsp.pCont, len, &msg);
|
tSerializeSMEpSet(rsp.pCont, len, &msg);
|
||||||
rpcSendResponse(&rsp);
|
rpcSendResponse(&rsp);
|
||||||
|
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
|
static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
|
||||||
|
@ -239,16 +241,16 @@ static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) {
|
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
SDnode *pDnode = dmInstance();
|
SDnode *pDnode = dmInstance();
|
||||||
if (pDnode->status != DND_STAT_RUNNING) {
|
if (pDnode->status != DND_STAT_RUNNING) {
|
||||||
rpcFreeCont(pReq->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
pReq->pCont = NULL;
|
pMsg->pCont = NULL;
|
||||||
terrno = TSDB_CODE_NODE_OFFLINE;
|
terrno = TSDB_CODE_NODE_OFFLINE;
|
||||||
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->info.handle);
|
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pMsg->info.handle);
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL);
|
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue