This commit is contained in:
Shengliang Guan 2022-03-29 13:39:55 +08:00
parent 182a5ee4b5
commit ac6b121348
7 changed files with 57 additions and 25 deletions

View File

@ -56,13 +56,14 @@ typedef struct {
ReleaseHandleFp releaseHandleFp; ReleaseHandleFp releaseHandleFp;
} SMsgCb; } SMsgCb;
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq);
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp); void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp);
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg);
void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type); void tmsgReleaseHandle(void* handle, int8_t type);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -22,7 +22,7 @@
extern "C" { extern "C" {
#endif #endif
typedef enum { PROC_REQ, PROC_RSP, PROC_REGISTER } ProcFuncType; typedef enum { PROC_REQ, PROC_RSP, PROC_REG, PROC_RELEASE } ProcFuncType;
typedef struct SProcQueue SProcQueue; typedef struct SProcQueue SProcQueue;
typedef struct SProcObj SProcObj; typedef struct SProcObj SProcObj;

View File

@ -16,6 +16,10 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tmsgcb.h" #include "tmsgcb.h"
static SMsgCb tsDefaultMsgCb;
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq); return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq);
} }
@ -38,6 +42,6 @@ void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
(*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg); (*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg);
} }
void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type) { void tmsgReleaseHandle(void* handle, int8_t type) {
(*pMsgCb->releaseHandleFp)(pMsgCb->pWrapper, handle, type); (*tsDefaultMsgCb.releaseHandleFp)(tsDefaultMsgCb.pWrapper, handle, type);
} }

View File

@ -33,6 +33,7 @@
#include "tthread.h" #include "tthread.h"
#include "ttime.h" #include "ttime.h"
#include "tworker.h" #include "tworker.h"
#include "tmsgcb.h"
#include "dnode.h" #include "dnode.h"
#include "monitor.h" #include "monitor.h"
@ -140,6 +141,8 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg);
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type);
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);

View File

@ -77,6 +77,8 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper); pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
SMsgCb msgCb = dndCreateMsgcb(pWrapper);
tmsgSetDefaultMsgCb(&msgCb);
if (taosMkDir(pWrapper->path) != 0) { if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -146,9 +148,13 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
pMsg->ahandle); pMsg->ahandle);
switch (ftype) { switch (ftype) {
case PROC_REGISTER: case PROC_REG:
rpcRegisterBrokenLinkArg(pMsg); rpcRegisterBrokenLinkArg(pMsg);
break; break;
case PROC_RELEASE:
rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
rpcFreeCont(pCont);
break;
default: default:
dndSendRpcRsp(pWrapper, pMsg); dndSendRpcRsp(pWrapper, pMsg);
break; break;
@ -164,6 +170,9 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
pWrapper->required = dndRequireNode(pWrapper); pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
SMsgCb msgCb = dndCreateMsgcb(pWrapper);
tmsgSetDefaultMsgCb(&msgCb);
if (taosMkDir(pWrapper->path) != 0) { if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to create dir:%s since %s", pWrapper->path, terrstr()); dError("failed to create dir:%s since %s", pWrapper->path, terrstr());

View File

@ -363,29 +363,44 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
} }
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
if (pWrapper->procType == PROC_CHILD) { if (pWrapper->procType != PROC_CHILD) {
int32_t code = -1;
do {
code = taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP);
if (code != 0) {
taosMsleep(10);
}
} while (code != 0);
} else {
dndSendRpcRsp(pWrapper, pRsp); dndSendRpcRsp(pWrapper, pRsp);
} else {
while (taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP) != 0) {
taosMsleep(1);
}
} }
} }
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
if (pWrapper->procType == PROC_CHILD) { if (pWrapper->procType != PROC_CHILD) {
int32_t code = -1;
do {
code = taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGISTER);
if (code != 0) {
taosMsleep(10);
}
} while (code != 0);
} else {
rpcRegisterBrokenLinkArg(pMsg); rpcRegisterBrokenLinkArg(pMsg);
} else {
while (taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REG) != 0) {
taosMsleep(1);
}
} }
}
void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
if (pWrapper->procType != PROC_CHILD) {
rpcReleaseHandle(handle, type);
} else {
SRpcMsg msg = {.handle = handle, .code = type};
while (taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE) != 0) {
taosMsleep(1);
}
}
}
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) {
SMsgCb msgCb = {
.pWrapper = pWrapper,
.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg,
.releaseHandleFp = dndReleaseHandle,
.sendMnodeReqFp = dndSendReqToMnode,
.sendReqFp = dndSendReqToDnode,
.sendRspFp = dndSendRsp,
};
return msgCb;
} }

View File

@ -432,7 +432,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
rpcReleaseHandle(ctx->connInfo.handle, TAOS_CONN_SERVER); tmsgReleaseHandle(ctx->connInfo.handle, TAOS_CONN_SERVER);
ctx->connInfo.handle = NULL; ctx->connInfo.handle = NULL;
qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle); qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
@ -1282,7 +1282,7 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
QW_LOCK(QW_WRITE, &sch->hbConnLock); QW_LOCK(QW_WRITE, &sch->hbConnLock);
if (sch->hbConnInfo.handle) { if (sch->hbConnInfo.handle) {
rpcReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER);
} }
memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));