fix: invalid write while send recv
This commit is contained in:
parent
095f6938b7
commit
d10684a389
|
@ -67,9 +67,9 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
|
||||||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
|
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
|
||||||
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
|
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
|
||||||
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq);
|
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq);
|
||||||
void tmsgSendRsp(const SRpcMsg* pRsp);
|
void tmsgSendRsp(SRpcMsg* pRsp);
|
||||||
void tmsgSendMnodeRecv(SRpcMsg* pReq, SRpcMsg* pRsp);
|
void tmsgSendMnodeRecv(SRpcMsg* pReq, SRpcMsg* pRsp);
|
||||||
void tmsgSendRedirectRsp(const SRpcMsg* pRsp, const SEpSet* pNewEpSet);
|
void tmsgSendRedirectRsp(SRpcMsg* pRsp, const SEpSet* pNewEpSet);
|
||||||
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg);
|
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg);
|
||||||
void tmsgReleaseHandle(void* handle, int8_t type);
|
void tmsgReleaseHandle(void* handle, int8_t type);
|
||||||
void tmsgReportStartup(const char* name, const char* desc);
|
void tmsgReportStartup(const char* name, const char* desc);
|
||||||
|
|
|
@ -55,7 +55,7 @@ int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgSendRsp(const SRpcMsg* pRsp) {
|
void tmsgSendRsp(SRpcMsg* pRsp) {
|
||||||
SendRspFp fp = tsDefaultMsgCb.sendRspFp;
|
SendRspFp fp = tsDefaultMsgCb.sendRspFp;
|
||||||
if (fp != NULL) {
|
if (fp != NULL) {
|
||||||
return (*fp)(tsDefaultMsgCb.pWrapper, pRsp);
|
return (*fp)(tsDefaultMsgCb.pWrapper, pRsp);
|
||||||
|
@ -64,7 +64,7 @@ void tmsgSendRsp(const SRpcMsg* pRsp) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgSendRedirectRsp(const SRpcMsg* pRsp, const SEpSet* pNewEpSet) {
|
void tmsgSendRedirectRsp(SRpcMsg* pRsp, const SEpSet* pNewEpSet) {
|
||||||
SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
|
SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
|
||||||
if (fp != NULL) {
|
if (fp != NULL) {
|
||||||
(*fp)(tsDefaultMsgCb.pWrapper, pRsp, pNewEpSet);
|
(*fp)(tsDefaultMsgCb.pWrapper, pRsp, pNewEpSet);
|
||||||
|
|
|
@ -259,34 +259,29 @@ static inline void dmSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) {
|
||||||
static inline void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
|
static inline void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
|
||||||
if (pDnode->status != DND_STAT_RUNNING) {
|
if (pDnode->status != DND_STAT_RUNNING) {
|
||||||
pRsp->code = TSDB_CODE_NODE_OFFLINE;
|
pRsp->code = TSDB_CODE_NODE_OFFLINE;
|
||||||
|
rpcFreeCont(pReq->pCont);
|
||||||
|
pReq->pCont = NULL;
|
||||||
} else {
|
} else {
|
||||||
rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
|
rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void dmSendToMnodeRecv(SMgmtWrapper *pWrapper, SRpcMsg *pReq, SRpcMsg *pRsp) {
|
static inline void dmSendToMnodeRecv(SMgmtWrapper *pWrapper, SRpcMsg *pReq, SRpcMsg *pRsp) {
|
||||||
if (pWrapper->pDnode->status != DND_STAT_RUNNING) {
|
SEpSet epSet = {0};
|
||||||
pRsp->code = TSDB_CODE_NODE_OFFLINE;
|
dmGetMnodeEpSet(pWrapper->pDnode, &epSet);
|
||||||
} else {
|
dmSendRecv(pWrapper->pDnode, &epSet, pReq, pRsp);
|
||||||
SEpSet epSet = {0};
|
|
||||||
dmGetMnodeEpSet(pWrapper->pDnode, &epSet);
|
|
||||||
dmSendRecv(pWrapper->pDnode, &epSet, pReq, pRsp);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
|
static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
|
||||||
SDnode *pDnode = pWrapper->pDnode;
|
SDnode *pDnode = pWrapper->pDnode;
|
||||||
if (pDnode->status != DND_STAT_RUNNING) {
|
if (pDnode->status != DND_STAT_RUNNING || pDnode->trans.clientRpc == NULL) {
|
||||||
|
rpcFreeCont(pReq->pCont);
|
||||||
|
pReq->pCont = NULL;
|
||||||
terrno = TSDB_CODE_NODE_OFFLINE;
|
terrno = TSDB_CODE_NODE_OFFLINE;
|
||||||
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
|
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDnode->trans.clientRpc == NULL) {
|
|
||||||
terrno = TSDB_CODE_NODE_OFFLINE;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL);
|
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue