minor changes

This commit is contained in:
Shengliang Guan 2022-04-02 13:39:01 +08:00
parent 15d038103e
commit a8a914d47d
5 changed files with 72 additions and 65 deletions

View File

@ -134,33 +134,42 @@ typedef struct SDnode {
SMgmtWrapper wrappers[NODE_MAX]; SMgmtWrapper wrappers[NODE_MAX];
} SDnode; } SDnode;
// dndFile.h
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed);
// dndInt.h
EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
// dndMonitor.h
void dndSendMonitorReport(SDnode *pDnode);
// dndMsg.h
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
// dndStr.h
const char *dndStatStr(EDndStatus stat);
const char *dndNodeLogStr(ENodeType ntype); const char *dndNodeLogStr(ENodeType ntype);
const char *dndNodeProcStr(ENodeType ntype); const char *dndNodeProcStr(ENodeType ntype);
const char *dndEventStr(EDndEvent ev); const char *dndEventStr(EDndEvent ev);
EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dndSendMonitorReport(SDnode *pDnode);
// dndTransport.h
int32_t dndInitServer(SDnode *pDnode); int32_t dndInitServer(SDnode *pDnode);
void dndCleanupServer(SDnode *pDnode); void dndCleanupServer(SDnode *pDnode);
int32_t dndInitClient(SDnode *pDnode); int32_t dndInitClient(SDnode *pDnode);
void dndCleanupClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg);
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -29,26 +29,24 @@
extern "C" { extern "C" {
#endif #endif
// dndInt.c // dndEnv.h
int32_t dndInit(); int32_t dndInit();
void dndCleanup(); void dndCleanup();
const char *dndStatStr(EDndStatus stat);
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
// dndMsg.c // dndExec.h
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
// dndExec.c
int32_t dndOpenNode(SMgmtWrapper *pWrapper); int32_t dndOpenNode(SMgmtWrapper *pWrapper);
void dndCloseNode(SMgmtWrapper *pWrapper); void dndCloseNode(SMgmtWrapper *pWrapper);
int32_t dndRun(SDnode *pDnode); int32_t dndRun(SDnode *pDnode);
// dndObj.c // dndInt.c
SDnode *dndCreate(const SDnodeOpt *pOption); SDnode *dndCreate(const SDnodeOpt *pOption);
void dndClose(SDnode *pDnode); void dndClose(SDnode *pDnode);
void dndHandleEvent(SDnode *pDnode, EDndEvent event); void dndHandleEvent(SDnode *pDnode, EDndEvent event);
// dndMsg.c
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
// dndTransport.c // dndTransport.c
int32_t dndInitMsgHandle(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode);
void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);

View File

@ -57,40 +57,3 @@ void dndCleanup() {
taosStopCacheRefreshWorker(); taosStopCacheRefreshWorker();
dInfo("dnode env is cleaned up"); dInfo("dnode env is cleaned up");
} }
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) {
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId;
}
EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; }
void dndSetStatus(SDnode *pDnode, EDndStatus status) {
if (pDnode->status != status) {
dDebug("dnode status set from %s to %s", dndStatStr(pDnode->status), dndStatStr(status));
pDnode->status = status;
}
}
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
SStartupReq *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
pStartup->finished = 0;
}
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
}
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received");
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
dndGetStartup(pDnode, pStartup);
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {
.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle};
rpcSendResponse(&rpcRsp);
}

View File

@ -188,4 +188,18 @@ void dndReleaseWrapper(SMgmtWrapper *pWrapper) {
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
taosRUnLockLatch(&pWrapper->latch); taosRUnLockLatch(&pWrapper->latch);
dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount); dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount);
} }
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) {
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId;
}
EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; }
void dndSetStatus(SDnode *pDnode, EDndStatus status) {
if (pDnode->status != status) {
dDebug("dnode status set from %s to %s", dndStatStr(pDnode->status), dndStatStr(status));
pDnode->status = status;
}
}

View File

@ -66,8 +66,8 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
dTrace("msg:%p, is created, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user); dTrace("msg:%p, is created, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user);
code = (*msgFp)(pWrapper, pMsg); code = (*msgFp)(pWrapper, pMsg);
} else if (pWrapper->procType == PROC_PARENT) { } else if (pWrapper->procType == PROC_PARENT) {
dTrace("msg:%p, is created and put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle, dTrace("msg:%p, is created and put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle,
pRpc->ahandle, pMsg->user); pMsg->user);
code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, PROC_REQ); code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, PROC_REQ);
} else { } else {
dTrace("msg:%p, should not processed in child process, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, dTrace("msg:%p, should not processed in child process, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle,
@ -171,4 +171,27 @@ int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1; return -1;
} }
} }
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
SStartupReq *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
pStartup->finished = 0;
}
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
}
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received");
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
dndGetStartup(pDnode, pStartup);
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {
.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle};
rpcSendResponse(&rpcRsp);
}