Merge pull request #12799 from taosdata/fix/valgrind
refactor: free rpcCont on error occured
This commit is contained in:
commit
22bfcc34a5
|
@ -282,8 +282,8 @@ int32_t* taosGetErrno();
|
||||||
// dnode
|
// dnode
|
||||||
#define TSDB_CODE_NODE_REDIRECT TAOS_DEF_ERROR_CODE(0, 0x0400)
|
#define TSDB_CODE_NODE_REDIRECT TAOS_DEF_ERROR_CODE(0, 0x0400)
|
||||||
#define TSDB_CODE_NODE_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0401)
|
#define TSDB_CODE_NODE_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0401)
|
||||||
#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0403)
|
#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0402)
|
||||||
#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0404)
|
#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0403)
|
||||||
|
|
||||||
// vnode
|
// vnode
|
||||||
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500)
|
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500)
|
||||||
|
|
|
@ -142,13 +142,14 @@ static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg
|
||||||
queue->tail = headLen + bodyLen;
|
queue->tail = headLen + bodyLen;
|
||||||
} else if (remain < 8 + headLen) {
|
} else if (remain < 8 + headLen) {
|
||||||
memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8);
|
memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8);
|
||||||
memcpy(queue->pBuffer, (char*)pHead + remain - 8, rawHeadLen - (remain - 8));
|
memcpy(queue->pBuffer, (char *)pHead + remain - 8, rawHeadLen - (remain - 8));
|
||||||
if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
|
if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
|
||||||
queue->tail = headLen - (remain - 8) + bodyLen;
|
queue->tail = headLen - (remain - 8) + bodyLen;
|
||||||
} else if (remain < 8 + headLen + bodyLen) {
|
} else if (remain < 8 + headLen + bodyLen) {
|
||||||
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
|
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
|
||||||
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
|
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
|
||||||
if (rawBodyLen > 0) memcpy(queue->pBuffer, (char*)pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
|
if (rawBodyLen > 0)
|
||||||
|
memcpy(queue->pBuffer, (char *)pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
|
||||||
queue->tail = bodyLen - (remain - 8 - headLen);
|
queue->tail = bodyLen - (remain - 8 - headLen);
|
||||||
} else {
|
} else {
|
||||||
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
|
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
|
||||||
|
@ -312,12 +313,7 @@ static void *dmConsumChildQueue(void *param) {
|
||||||
code = dmProcessNodeMsg(pWrapper, pMsg);
|
code = dmProcessNodeMsg(pWrapper, pMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pMsg, terrstr());
|
dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pMsg, terrstr());
|
||||||
SRpcMsg rsp = {
|
SRpcMsg rsp = {.code = (terrno != 0 ? terrno : code), .info = pMsg->info};
|
||||||
.code = (terrno != 0 ? terrno : code),
|
|
||||||
.pCont = pMsg->info.rsp,
|
|
||||||
.contLen = pMsg->info.rspLen,
|
|
||||||
.info = pMsg->info,
|
|
||||||
};
|
|
||||||
dmPutToProcPQueue(proc, &rsp, DND_FUNC_RSP);
|
dmPutToProcPQueue(proc, &rsp, DND_FUNC_RSP);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
|
@ -469,8 +465,18 @@ void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
|
||||||
taosMsleep(retry);
|
taosMsleep(retry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
|
pMsg->contLen = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
|
int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
|
||||||
return dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype);
|
int32_t code = dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype);
|
||||||
|
if (code == 0) {
|
||||||
|
dTrace("msg:%p, is freed after push to cqueue", pMsg);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,10 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "dmMgmt.h"
|
#include "dmMgmt.h"
|
||||||
|
|
||||||
|
static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet);
|
||||||
|
static void dmSendRsp(SRpcMsg *pMsg);
|
||||||
|
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
|
||||||
|
|
||||||
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
|
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
|
||||||
SRpcConnInfo connInfo = {0};
|
SRpcConnInfo connInfo = {0};
|
||||||
if (IsReq(pRpc) && rpcGetConnInfo(pRpc->info.handle, &connInfo) != 0) {
|
if (IsReq(pRpc) && rpcGetConnInfo(pRpc->info.handle, &connInfo) != 0) {
|
||||||
|
@ -122,27 +126,22 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code == 0) {
|
if (code != 0) {
|
||||||
if (pWrapper != NULL && InParentProc(pWrapper)) {
|
|
||||||
dTrace("msg:%p, is freed after push to cqueue", pMsg);
|
|
||||||
taosFreeQitem(pMsg);
|
|
||||||
rpcFreeCont(pRpc->pCont);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
dError("msg:%p, failed to process since %s", pMsg, terrstr());
|
dError("msg:%p, failed to process since %s", pMsg, terrstr());
|
||||||
if (terrno != 0) code = terrno;
|
if (terrno != 0) code = terrno;
|
||||||
|
|
||||||
if (IsReq(pRpc)) {
|
if (IsReq(pRpc)) {
|
||||||
if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) {
|
SRpcMsg rsp = {.code = code, .info = pRpc->info};
|
||||||
if (pRpc->msgType > TDMT_MND_MSG && pRpc->msgType < TDMT_VND_MSG) {
|
|
||||||
code = TSDB_CODE_NODE_REDIRECT;
|
if ((code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_APP_NOT_READY) && pRpc->msgType > TDMT_MND_MSG &&
|
||||||
}
|
pRpc->msgType < TDMT_VND_MSG) {
|
||||||
|
dmBuildMnodeRedirectRsp(pDnode, &rsp);
|
||||||
}
|
}
|
||||||
SRpcMsg rspMsg = {.code = code, .info = pRpc->info};
|
|
||||||
if (pWrapper != NULL) {
|
if (pWrapper != NULL) {
|
||||||
tmsgSendRsp(&rspMsg);
|
dmSendRsp(&rsp);
|
||||||
} else {
|
} else {
|
||||||
rpcSendResponse(&rspMsg);
|
rpcSendResponse(&rsp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -198,13 +197,25 @@ static inline void dmSendRsp(SRpcMsg *pMsg) {
|
||||||
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
||||||
if (InChildProc(pWrapper)) {
|
if (InChildProc(pWrapper)) {
|
||||||
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
|
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
pMsg->pCont = NULL;
|
|
||||||
} else {
|
} else {
|
||||||
rpcSendResponse(pMsg);
|
rpcSendResponse(pMsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
|
SMEpSet msg = {0};
|
||||||
|
dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &msg.epSet);
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);
|
||||||
|
pMsg->pCont = rpcMallocCont(contLen);
|
||||||
|
if (pMsg->pCont == NULL) {
|
||||||
|
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
} else {
|
||||||
|
tSerializeSMEpSet(pMsg->pCont, contLen, &msg);
|
||||||
|
pMsg->contLen = contLen;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
|
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
|
||||||
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
||||||
SMEpSet msg = {.epSet = *pNewEpSet};
|
SMEpSet msg = {.epSet = *pNewEpSet};
|
||||||
|
@ -226,8 +237,6 @@ static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
|
||||||
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
||||||
if (InChildProc(pWrapper)) {
|
if (InChildProc(pWrapper)) {
|
||||||
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
|
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
pMsg->pCont = NULL;
|
|
||||||
} else {
|
} else {
|
||||||
rpcRegisterBrokenLinkArg(pMsg);
|
rpcRegisterBrokenLinkArg(pMsg);
|
||||||
}
|
}
|
||||||
|
@ -322,34 +331,3 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) {
|
||||||
};
|
};
|
||||||
return msgCb;
|
return msgCb;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmSendMnodeRedirectRsp(SRpcMsg *pMsg) {
|
|
||||||
SDnode *pDnode = dmInstance();
|
|
||||||
SEpSet epSet = {0};
|
|
||||||
dmGetMnodeEpSet(&pDnode->data, &epSet);
|
|
||||||
|
|
||||||
dDebug("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse);
|
|
||||||
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
|
|
||||||
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
|
|
||||||
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
|
|
||||||
epSet.inUse = (i + 1) % epSet.numOfEps;
|
|
||||||
}
|
|
||||||
|
|
||||||
epSet.eps[i].port = htons(epSet.eps[i].port);
|
|
||||||
}
|
|
||||||
|
|
||||||
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
|
||||||
SMEpSet msg = {.epSet = epSet};
|
|
||||||
int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);
|
|
||||||
rsp.pCont = rpcMallocCont(contLen);
|
|
||||||
if (rsp.pCont == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
} else {
|
|
||||||
tSerializeSMEpSet(rsp.pCont, contLen, &msg);
|
|
||||||
rsp.contLen = contLen;
|
|
||||||
}
|
|
||||||
|
|
||||||
dmSendRsp(&rsp);
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
pMsg->pCont = NULL;
|
|
||||||
}
|
|
||||||
|
|
|
@ -173,6 +173,7 @@ int32_t dmReadEps(SDnodeData *pData);
|
||||||
int32_t dmWriteEps(SDnodeData *pData);
|
int32_t dmWriteEps(SDnodeData *pData);
|
||||||
void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps);
|
void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps);
|
||||||
void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
|
void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
|
||||||
|
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
|
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -314,6 +314,17 @@ void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
|
||||||
taosThreadRwlockUnlock(&pData->lock);
|
taosThreadRwlockUnlock(&pData->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
dmGetMnodeEpSet(pData, pEpSet);
|
||||||
|
dDebug("msg:%p, is redirected, num:%d use:%d", pMsg, pEpSet->numOfEps, pEpSet->inUse);
|
||||||
|
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
|
||||||
|
dDebug("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
||||||
|
if (strcmp(pEpSet->eps[i].fqdn, tsLocalFqdn) == 0 && pEpSet->eps[i].port == tsServerPort) {
|
||||||
|
pEpSet->inUse = (i + 1) % pEpSet->numOfEps;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
|
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
|
||||||
taosThreadRwlockWrlock(&pData->lock);
|
taosThreadRwlockWrlock(&pData->lock);
|
||||||
pData->mnodeEps = *pEpSet;
|
pData->mnodeEps = *pEpSet;
|
||||||
|
|
|
@ -994,9 +994,6 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
|
||||||
pAction->msgSent = 0;
|
pAction->msgSent = 0;
|
||||||
pAction->msgReceived = 0;
|
pAction->msgReceived = 0;
|
||||||
pAction->errCode = terrno;
|
pAction->errCode = terrno;
|
||||||
if (terrno == TSDB_CODE_INVALID_PTR || terrno == TSDB_CODE_NODE_OFFLINE) {
|
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
|
||||||
}
|
|
||||||
mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
|
mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue