refact dm
This commit is contained in:
parent
a554bd7fe7
commit
7e361ebcd0
|
@ -480,9 +480,9 @@ enum {
|
||||||
SND_WORKER_TYPE__UNIQUE,
|
SND_WORKER_TYPE__UNIQUE,
|
||||||
};
|
};
|
||||||
|
|
||||||
#define MND_VGID -1
|
#define MNODE_HANDLE -1
|
||||||
#define QND_VGID 1
|
#define QNODE_HANDLE 1
|
||||||
#define VND_VGID 0
|
#define DEFAULT_HANDLE 0
|
||||||
|
|
||||||
#define MAX_NUM_STR_SIZE 40
|
#define MAX_NUM_STR_SIZE 40
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,7 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt);
|
||||||
void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps);
|
void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps);
|
||||||
|
|
||||||
// dmMsg.c
|
// dmMsg.c
|
||||||
void dmInitMsgHandles(SMgmtWrapper *pWrapper);
|
void dmInitMsgHandle(SMgmtWrapper *pWrapper);
|
||||||
void dmSendStatusReq(SDnodeMgmt *pMgmt);
|
void dmSendStatusReq(SDnodeMgmt *pMgmt);
|
||||||
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
|
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
|
||||||
int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
|
int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
|
||||||
|
@ -53,10 +53,11 @@ int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
|
||||||
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
|
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
|
||||||
|
|
||||||
// dmWorker.c
|
// dmWorker.c
|
||||||
|
int32_t dmStartThread(SDnodeMgmt *pMgmt);
|
||||||
int32_t dmStartWorker(SDnodeMgmt *pMgmt);
|
int32_t dmStartWorker(SDnodeMgmt *pMgmt);
|
||||||
void dmStopWorker(SDnodeMgmt *pMgmt);
|
void dmStopWorker(SDnodeMgmt *pMgmt);
|
||||||
int32_t dmStartThread(SDnodeMgmt *pMgmt);
|
int32_t dmPutMsgToMgmtWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||||
int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
int32_t dmPutMsgToStatusWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -195,14 +195,7 @@ int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
|
static void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
|
||||||
SStartupReq *pStartup = &pDnode->startup;
|
|
||||||
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
|
|
||||||
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
|
|
||||||
pStartup->finished = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
|
|
||||||
memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
|
memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
|
||||||
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
|
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
|
||||||
}
|
}
|
||||||
|
@ -218,21 +211,21 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
void dmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||||
// Requests handled by DNODE
|
// Requests handled by DNODE
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||||
|
|
||||||
// Requests handled by MNODE
|
// Requests handled by MNODE
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmPutMsgToStatusWorker, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -165,7 +165,7 @@ void dmGetMgmtFp(SMgmtWrapper *pWrapper) {
|
||||||
mgmtFp.startFp = dmStart;
|
mgmtFp.startFp = dmStart;
|
||||||
mgmtFp.requiredFp = dmRequire;
|
mgmtFp.requiredFp = dmRequire;
|
||||||
|
|
||||||
dmInitMsgHandles(pWrapper);
|
dmInitMsgHandle(pWrapper);
|
||||||
pWrapper->name = "dnode";
|
pWrapper->name = "dnode";
|
||||||
pWrapper->fp = mgmtFp;
|
pWrapper->fp = mgmtFp;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
|
|
||||||
static void *dmThreadRoutine(void *param) {
|
static void *dmThreadRoutine(void *param) {
|
||||||
SDnodeMgmt *pMgmt = param;
|
SDnodeMgmt *pMgmt = param;
|
||||||
SDnode * pDnode = pMgmt->pDnode;
|
SDnode *pDnode = pMgmt->pDnode;
|
||||||
int64_t lastStatusTime = taosGetTimestampMs();
|
int64_t lastStatusTime = taosGetTimestampMs();
|
||||||
int64_t lastMonitorTime = lastStatusTime;
|
int64_t lastMonitorTime = lastStatusTime;
|
||||||
|
|
||||||
|
@ -32,8 +32,7 @@ static void *dmThreadRoutine(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t curTime = taosGetTimestampMs();
|
int64_t curTime = taosGetTimestampMs();
|
||||||
|
float statusInterval = (curTime - lastStatusTime) / 1000.0f;
|
||||||
float statusInterval = (curTime - lastStatusTime) / 1000.0f;
|
|
||||||
if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) {
|
if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) {
|
||||||
dmSendStatusReq(pMgmt);
|
dmSendStatusReq(pMgmt);
|
||||||
lastStatusTime = curTime;
|
lastStatusTime = curTime;
|
||||||
|
@ -47,10 +46,21 @@ static void *dmThreadRoutine(void *param) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t dmStartThread(SDnodeMgmt *pMgmt) {
|
||||||
|
pMgmt->threadId = taosCreateThread(dmThreadRoutine, pMgmt);
|
||||||
|
if (pMgmt->threadId == NULL) {
|
||||||
|
dError("failed to init dnode thread");
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||||
SDnodeMgmt *pMgmt = pInfo->ahandle;
|
SDnodeMgmt *pMgmt = pInfo->ahandle;
|
||||||
|
|
||||||
SDnode * pDnode = pMgmt->pDnode;
|
SDnode *pDnode = pMgmt->pDnode;
|
||||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
dTrace("msg:%p, will be processed in dnode queue", pMsg);
|
dTrace("msg:%p, will be processed in dnode queue", pMsg);
|
||||||
|
@ -95,16 +105,14 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
|
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
|
||||||
SSingleWorkerCfg mgmtCfg = {
|
SSingleWorkerCfg mcfg = {.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt};
|
||||||
.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt};
|
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mcfg) != 0) {
|
||||||
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) {
|
|
||||||
dError("failed to start dnode mgmt worker since %s", terrstr());
|
dError("failed to start dnode mgmt worker since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSingleWorkerCfg statusCfg = {
|
SSingleWorkerCfg scfg = {.min = 1, .max = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt};
|
||||||
.min = 1, .max = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt};
|
if (tSingleWorkerInit(&pMgmt->statusWorker, &scfg) != 0) {
|
||||||
if (tSingleWorkerInit(&pMgmt->statusWorker, &statusCfg) != 0) {
|
|
||||||
dError("failed to start dnode status worker since %s", terrstr());
|
dError("failed to start dnode status worker since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -113,17 +121,6 @@ int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dmStartThread(SDnodeMgmt *pMgmt) {
|
|
||||||
pMgmt->threadId = taosCreateThread(dmThreadRoutine, pMgmt);
|
|
||||||
if (pMgmt->threadId == NULL) {
|
|
||||||
dError("failed to init dnode thread");
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void dmStopWorker(SDnodeMgmt *pMgmt) {
|
void dmStopWorker(SDnodeMgmt *pMgmt) {
|
||||||
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
|
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
|
||||||
tSingleWorkerCleanup(&pMgmt->statusWorker);
|
tSingleWorkerCleanup(&pMgmt->statusWorker);
|
||||||
|
@ -135,12 +132,18 @@ void dmStopWorker(SDnodeMgmt *pMgmt) {
|
||||||
dDebug("dnode workers are closed");
|
dDebug("dnode workers are closed");
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
int32_t dmPutMsgToMgmtWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
||||||
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
|
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
|
||||||
SSingleWorker *pWorker = &pMgmt->mgmtWorker;
|
SSingleWorker *pWorker = &pMgmt->mgmtWorker;
|
||||||
if (pMsg->rpcMsg.msgType == TDMT_MND_STATUS_RSP) {
|
|
||||||
pWorker = &pMgmt->statusWorker;
|
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
|
||||||
}
|
taosWriteQitem(pWorker->queue, pMsg);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dmPutMsgToStatusWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
||||||
|
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
|
||||||
|
SSingleWorker *pWorker = &pMgmt->statusWorker;
|
||||||
|
|
||||||
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
|
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
|
||||||
taosWriteQitem(pWorker->queue, pMsg);
|
taosWriteQitem(pWorker->queue, pMsg);
|
||||||
|
|
|
@ -204,3 +204,10 @@ void dndSetStatus(SDnode *pDnode, EDndStatus status) {
|
||||||
pDnode->status = status;
|
pDnode->status = status;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
|
||||||
|
SStartupReq *pStartup = &pDnode->startup;
|
||||||
|
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
|
||||||
|
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
|
||||||
|
pStartup->finished = 0;
|
||||||
|
}
|
||||||
|
|
|
@ -137,9 +137,9 @@ static void dndProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
if (pHandle->pMndWrapper != NULL || pHandle->pQndWrapper != NULL) {
|
if (pHandle->pMndWrapper != NULL || pHandle->pQndWrapper != NULL) {
|
||||||
SMsgHead *pHead = pMsg->pCont;
|
SMsgHead *pHead = pMsg->pCont;
|
||||||
int32_t vgId = ntohl(pHead->vgId);
|
int32_t vgId = ntohl(pHead->vgId);
|
||||||
if (vgId == QND_VGID) {
|
if (vgId == QNODE_HANDLE) {
|
||||||
pWrapper = pHandle->pQndWrapper;
|
pWrapper = pHandle->pQndWrapper;
|
||||||
} else if (vgId == MND_VGID) {
|
} else if (vgId == MNODE_HANDLE) {
|
||||||
pWrapper = pHandle->pMndWrapper;
|
pWrapper = pHandle->pMndWrapper;
|
||||||
} else {
|
} else {
|
||||||
}
|
}
|
||||||
|
@ -315,13 +315,13 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
|
||||||
if (msgFp == NULL) continue;
|
if (msgFp == NULL) continue;
|
||||||
|
|
||||||
SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
|
SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
|
||||||
if (vgId == QND_VGID) {
|
if (vgId == QNODE_HANDLE) {
|
||||||
if (pHandle->pQndWrapper != NULL) {
|
if (pHandle->pQndWrapper != NULL) {
|
||||||
dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
|
dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pHandle->pQndWrapper = pWrapper;
|
pHandle->pQndWrapper = pWrapper;
|
||||||
} else if (vgId == MND_VGID) {
|
} else if (vgId == MNODE_HANDLE) {
|
||||||
if (pHandle->pMndWrapper != NULL) {
|
if (pHandle->pMndWrapper != NULL) {
|
||||||
dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
|
dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -75,91 +75,91 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
|
|
||||||
void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||||
// Requests handled by DNODE
|
// Requests handled by DNODE
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
|
||||||
// Requests handled by MNODE
|
// Requests handled by MNODE
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, mmProcessReadMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, mmProcessReadMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
|
||||||
// Requests handled by VNODE
|
// Requests handled by VNODE
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, mmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, mmProcessQueryMsg, MND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, mmProcessQueryMsg, MNODE_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, mmProcessQueryMsg, MND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, mmProcessQueryMsg, MNODE_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, mmProcessQueryMsg, MND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, mmProcessQueryMsg, MNODE_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, mmProcessQueryMsg, MND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, mmProcessQueryMsg, MNODE_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, mmProcessQueryMsg, MND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, mmProcessQueryMsg, MNODE_HANDLE);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,14 +56,14 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
||||||
|
|
||||||
void qmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
void qmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||||
// Requests handled by VNODE
|
// Requests handled by VNODE
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, qmProcessQueryMsg, QND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, qmProcessQueryMsg, QNODE_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, QND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, QNODE_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, qmProcessFetchMsg, QND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, qmProcessFetchMsg, QNODE_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, qmProcessFetchMsg, QND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, qmProcessFetchMsg, QNODE_HANDLE);
|
||||||
|
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, qmProcessFetchMsg, QND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, qmProcessFetchMsg, QNODE_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, qmProcessFetchMsg, QND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, qmProcessFetchMsg, QNODE_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, qmProcessFetchMsg, QND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, qmProcessFetchMsg, QNODE_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, qmProcessFetchMsg, QND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, qmProcessFetchMsg, QNODE_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, qmProcessFetchMsg, QND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, qmProcessFetchMsg, QNODE_HANDLE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,6 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
||||||
|
|
||||||
void smInitMsgHandles(SMgmtWrapper *pWrapper) {
|
void smInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||||
// Requests handled by SNODE
|
// Requests handled by SNODE
|
||||||
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, DEFAULT_HANDLE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -235,49 +235,49 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
|
|
||||||
void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||||
// Requests handled by VNODE
|
// Requests handled by VNODE
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
|
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, vmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, vmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, vmProcessMgmtMsg, VND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,7 @@ int32_t mndProcessFetchMsg(SNodeMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndInitQuery(SMnode *pMnode) {
|
int32_t mndInitQuery(SMnode *pMnode) {
|
||||||
if (qWorkerInit(NODE_TYPE_MNODE, MND_VGID, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) {
|
if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) {
|
||||||
mError("failed to init qworker in mnode since %s", terrstr());
|
mError("failed to init qworker in mnode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -419,7 +419,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
||||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||||
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
|
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
|
||||||
} else {
|
} else {
|
||||||
SQueryNodeAddr addr = { .nodeId = MND_VGID, .epSet = pCxt->pPlanCxt->mgmtEpSet };
|
SQueryNodeAddr addr = { .nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet };
|
||||||
taosArrayPush(pCxt->pExecNodeList, &addr);
|
taosArrayPush(pCxt->pExecNodeList, &addr);
|
||||||
}
|
}
|
||||||
pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
|
pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
|
||||||
|
|
Loading…
Reference in New Issue