enh: dm init API

This commit is contained in:
kailixu 2023-10-29 21:13:18 +08:00
parent 3f65773222
commit 22d6b95585
3 changed files with 42 additions and 0 deletions

View File

@ -97,7 +97,11 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper);
void dmReleaseWrapper(SMgmtWrapper *pWrapper);
int32_t dmInitVars(SDnode *pDnode);
void dmClearVars(SDnode *pDnode);
#if defined(TD_MODULE_OPTIMIZE) || !defined(TD_ENTERPRISE)
int32_t dmInitModule(SDnode *pDnode, SMgmtWrapper *wrappers);
#else
int32_t dmInitModule(SDnode *pDnode);
#endif
bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper);
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper);
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
@ -119,7 +123,11 @@ int32_t dmInitStatusClient(SDnode *pDnode);
void dmCleanupClient(SDnode *pDnode);
void dmCleanupStatusClient(SDnode *pDnode);
SMsgCb dmGetMsgcb(SDnode *pDnode);
#if defined(TD_MODULE_OPTIMIZE) || !defined(TD_ENTERPRISE)
int32_t dmInitMsgHandle(SDnode *pDnode, SMgmtWrapper *wrappers);
#else
int32_t dmInitMsgHandle(SDnode *pDnode);
#endif
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
// dmMonitor.c

View File

@ -66,9 +66,15 @@ int32_t dmInitDnode(SDnode *pDnode) {
goto _OVER;
}
#if defined(TD_MODULE_OPTIMIZE) || !defined(TD_ENTERPRISE)
if (dmInitModule(pDnode, pDnode->wrappers) != 0) {
goto _OVER;
}
#else
if (dmInitModule(pDnode) != 0) {
goto _OVER;
}
#endif
indexInit(tsNumOfCommitThreads);
streamMetaInit();

View File

@ -251,6 +251,33 @@ _OVER:
dmReleaseWrapper(pWrapper);
}
#if defined(TD_MODULE_OPTIMIZE) || !defined(TD_ENTERPRISE)
int32_t dmInitMsgHandle(SDnode *pDnode, SMgmtWrapper *wrappers) {
SDnodeTrans *pTrans = &pDnode->trans;
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = wrappers + ntype;
SArray *pArray = (*pWrapper->func.getHandlesFp)();
if (pArray == NULL) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
if (pMgmt->needCheckVgId) {
pHandle->needCheckVgId = pMgmt->needCheckVgId;
}
if (!pMgmt->needCheckVgId) {
pHandle->defaultNtype = ntype;
}
pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
}
taosArrayDestroy(pArray);
}
return 0;
}
#else
int32_t dmInitMsgHandle(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
@ -276,6 +303,7 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
return 0;
}
#endif
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance();