shm
This commit is contained in:
parent
44311425db
commit
5015a6a78b
|
@ -22,7 +22,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef enum { PROC_REQ, PROC_RSP, PROC_REG, PROC_RELEASE } ProcFuncType;
|
typedef enum { PROC_QUEUE, PROC_REQ, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType;
|
||||||
|
|
||||||
typedef struct SProcQueue SProcQueue;
|
typedef struct SProcQueue SProcQueue;
|
||||||
typedef struct SProcObj SProcObj;
|
typedef struct SProcObj SProcObj;
|
||||||
|
@ -53,7 +53,7 @@ void taosProcCleanup(SProcObj *pProc);
|
||||||
int32_t taosProcRun(SProcObj *pProc);
|
int32_t taosProcRun(SProcObj *pProc);
|
||||||
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
||||||
ProcFuncType ftype);
|
ProcFuncType ftype);
|
||||||
int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
||||||
ProcFuncType ftype);
|
ProcFuncType ftype);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -19,11 +19,7 @@
|
||||||
static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
|
static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
|
||||||
|
|
||||||
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
|
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
|
||||||
SMsgCb msgCb = {0};
|
SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
|
||||||
msgCb.pWrapper = pMgmt->pWrapper;
|
|
||||||
msgCb.sendReqFp = dndSendReqToDnode;
|
|
||||||
msgCb.sendRspFp = dndSendRsp;
|
|
||||||
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
|
|
||||||
pOption->msgCb = msgCb;
|
pOption->msgCb = msgCb;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -149,7 +149,7 @@ int32_t dndInitClient(SDnode *pDnode);
|
||||||
void dndCleanupClient(SDnode *pDnode);
|
void dndCleanupClient(SDnode *pDnode);
|
||||||
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
|
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
|
||||||
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg);
|
int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg);
|
||||||
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
|
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
|
||||||
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
|
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
|
||||||
|
|
|
@ -90,10 +90,10 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t
|
||||||
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
|
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
|
||||||
ProcFuncType ftype) {
|
ProcFuncType ftype) {
|
||||||
pMsg->pCont = pCont;
|
pMsg->pCont = pCont;
|
||||||
dTrace("msg:%p, get from parent queue, handle:%p app:%p", pMsg, pMsg->handle, pMsg->ahandle);
|
dTrace("msg:%p, get from parent queue, ftype:%d handle:%p, app:%p", pMsg, ftype, pMsg->handle, pMsg->ahandle);
|
||||||
|
|
||||||
switch (ftype) {
|
switch (ftype) {
|
||||||
case PROC_REG:
|
case PROC_REGIST:
|
||||||
rpcRegisterBrokenLinkArg(pMsg);
|
rpcRegisterBrokenLinkArg(pMsg);
|
||||||
break;
|
break;
|
||||||
case PROC_RELEASE:
|
case PROC_RELEASE:
|
||||||
|
@ -101,11 +101,14 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
|
||||||
rpcFreeCont(pCont);
|
rpcFreeCont(pCont);
|
||||||
break;
|
break;
|
||||||
case PROC_REQ:
|
case PROC_REQ:
|
||||||
// todo send to dnode
|
|
||||||
dndSendReqToMnode(pWrapper, pMsg);
|
dndSendReqToMnode(pWrapper, pMsg);
|
||||||
default:
|
// dndSendReq(pWrapper, (const SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
|
||||||
|
break;
|
||||||
|
case PROC_RSP:
|
||||||
dndSendRpcRsp(pWrapper, pMsg);
|
dndSendRpcRsp(pWrapper, pMsg);
|
||||||
break;
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
taosMemoryFree(pMsg);
|
taosMemoryFree(pMsg);
|
||||||
}
|
}
|
||||||
|
@ -325,6 +328,8 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dndSetStatus(pDnode, DND_STAT_RUNNING);
|
||||||
|
|
||||||
if (taosProcRun(pWrapper->pProc) != 0) {
|
if (taosProcRun(pWrapper->pProc) != 0) {
|
||||||
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
|
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -319,22 +319,6 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *p
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
|
|
||||||
if (pWrapper->procType != PROC_CHILD) {
|
|
||||||
SDnode *pDnode = pWrapper->pDnode;
|
|
||||||
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
|
|
||||||
terrno = TSDB_CODE_DND_OFFLINE;
|
|
||||||
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return dndSendRpcReq(&pDnode->trans, pEpSet, pReq);
|
|
||||||
} else {
|
|
||||||
while (taosProcPutToParentQ(pWrapper->pProc, pReq, sizeof(SRpcMsg), pReq->pCont, pReq->contLen, PROC_REQ) != 0) {
|
|
||||||
taosMsleep(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
|
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
|
||||||
SDnode *pDnode = pWrapper->pDnode;
|
SDnode *pDnode = pWrapper->pDnode;
|
||||||
STransMgmt *pTrans = &pDnode->trans;
|
STransMgmt *pTrans = &pDnode->trans;
|
||||||
|
@ -362,13 +346,37 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
|
||||||
|
SDnode *pDnode = pWrapper->pDnode;
|
||||||
|
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
|
||||||
|
terrno = TSDB_CODE_DND_OFFLINE;
|
||||||
|
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pWrapper->procType != PROC_CHILD) {
|
||||||
|
return dndSendRpcReq(&pDnode->trans, pEpSet, pReq);
|
||||||
|
} else {
|
||||||
|
int32_t headLen = sizeof(SRpcMsg) + sizeof(SEpSet);
|
||||||
|
char *pHead = taosMemoryMalloc(headLen);
|
||||||
|
if (pHead == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
memcpy(pHead, pReq, sizeof(SRpcMsg));
|
||||||
|
memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet));
|
||||||
|
|
||||||
|
taosProcPutToParentQ(pWrapper->pProc, pReq, headLen, pReq->pCont, pReq->contLen, PROC_REQ);
|
||||||
|
taosMemoryFree(pHead);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
|
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
|
||||||
if (pWrapper->procType != PROC_CHILD) {
|
if (pWrapper->procType != PROC_CHILD) {
|
||||||
dndSendRpcRsp(pWrapper, pRsp);
|
dndSendRpcRsp(pWrapper, pRsp);
|
||||||
} else {
|
} else {
|
||||||
while (taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP) != 0) {
|
taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP);
|
||||||
taosMsleep(1);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -376,9 +384,7 @@ void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
|
||||||
if (pWrapper->procType != PROC_CHILD) {
|
if (pWrapper->procType != PROC_CHILD) {
|
||||||
rpcRegisterBrokenLinkArg(pMsg);
|
rpcRegisterBrokenLinkArg(pMsg);
|
||||||
} else {
|
} else {
|
||||||
while (taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REG) != 0) {
|
taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGIST);
|
||||||
taosMsleep(1);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -387,19 +393,17 @@ static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type)
|
||||||
rpcReleaseHandle(handle, type);
|
rpcReleaseHandle(handle, type);
|
||||||
} else {
|
} else {
|
||||||
SRpcMsg msg = {.handle = handle, .code = type};
|
SRpcMsg msg = {.handle = handle, .code = type};
|
||||||
while (taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE) != 0) {
|
taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE);
|
||||||
taosMsleep(1);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) {
|
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) {
|
||||||
SMsgCb msgCb = {
|
SMsgCb msgCb = {
|
||||||
.pWrapper = pWrapper,
|
.pWrapper = pWrapper,
|
||||||
|
.sendReqFp = dndSendReq,
|
||||||
|
.sendRspFp = dndSendRsp,
|
||||||
.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg,
|
.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg,
|
||||||
.releaseHandleFp = dndReleaseHandle,
|
.releaseHandleFp = dndReleaseHandle,
|
||||||
.sendReqFp = dndSendReqToDnode,
|
|
||||||
.sendRspFp = dndSendRsp,
|
|
||||||
};
|
};
|
||||||
return msgCb;
|
return msgCb;
|
||||||
}
|
}
|
|
@ -434,7 +434,9 @@ int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen,
|
||||||
return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, ftype);
|
return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, ftype);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
||||||
ProcFuncType ftype) {
|
ProcFuncType ftype) {
|
||||||
return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype);
|
while (taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype) != 0) {
|
||||||
|
taosMsleep(1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue