Merge pull request #14495 from taosdata/fix/invaldRead
fix: invalid read/write
This commit is contained in:
commit
8d8df859cc
|
@ -87,8 +87,8 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
|
||||||
static void dmCleanupProcQueue(SProcQueue *queue) {}
|
static void dmCleanupProcQueue(SProcQueue *queue) {}
|
||||||
|
|
||||||
static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) {
|
static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) {
|
||||||
const void *pHead = pMsg;
|
const void * pHead = pMsg;
|
||||||
const void *pBody = pMsg->pCont;
|
const void * pBody = pMsg->pCont;
|
||||||
const int16_t rawHeadLen = sizeof(SRpcMsg);
|
const int16_t rawHeadLen = sizeof(SRpcMsg);
|
||||||
const int32_t rawBodyLen = pMsg->contLen;
|
const int32_t rawBodyLen = pMsg->contLen;
|
||||||
const int16_t headLen = CEIL8(rawHeadLen);
|
const int16_t headLen = CEIL8(rawHeadLen);
|
||||||
|
@ -257,7 +257,7 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
|
||||||
proc->wrapper = pWrapper;
|
proc->wrapper = pWrapper;
|
||||||
proc->name = pWrapper->name;
|
proc->name = pWrapper->name;
|
||||||
|
|
||||||
SShm *shm = &proc->shm;
|
SShm * shm = &proc->shm;
|
||||||
int32_t cstart = 0;
|
int32_t cstart = 0;
|
||||||
int32_t csize = CEIL8(shm->size / 2);
|
int32_t csize = CEIL8(shm->size / 2);
|
||||||
int32_t pstart = csize;
|
int32_t pstart = csize;
|
||||||
|
@ -281,13 +281,13 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *dmConsumChildQueue(void *param) {
|
static void *dmConsumChildQueue(void *param) {
|
||||||
SProc *proc = param;
|
SProc * proc = param;
|
||||||
SMgmtWrapper *pWrapper = proc->wrapper;
|
SMgmtWrapper *pWrapper = proc->wrapper;
|
||||||
SProcQueue *queue = proc->cqueue;
|
SProcQueue * queue = proc->cqueue;
|
||||||
int32_t numOfMsgs = 0;
|
int32_t numOfMsgs = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
EProcFuncType ftype = DND_FUNC_REQ;
|
EProcFuncType ftype = DND_FUNC_REQ;
|
||||||
SRpcMsg *pMsg = NULL;
|
SRpcMsg * pMsg = NULL;
|
||||||
|
|
||||||
dDebug("node:%s, start to consume from cqueue", proc->name);
|
dDebug("node:%s, start to consume from cqueue", proc->name);
|
||||||
do {
|
do {
|
||||||
|
@ -324,13 +324,13 @@ static void *dmConsumChildQueue(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *dmConsumParentQueue(void *param) {
|
static void *dmConsumParentQueue(void *param) {
|
||||||
SProc *proc = param;
|
SProc * proc = param;
|
||||||
SMgmtWrapper *pWrapper = proc->wrapper;
|
SMgmtWrapper *pWrapper = proc->wrapper;
|
||||||
SProcQueue *queue = proc->pqueue;
|
SProcQueue * queue = proc->pqueue;
|
||||||
int32_t numOfMsgs = 0;
|
int32_t numOfMsgs = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
EProcFuncType ftype = DND_FUNC_REQ;
|
EProcFuncType ftype = DND_FUNC_REQ;
|
||||||
SRpcMsg *pMsg = NULL;
|
SRpcMsg * pMsg = NULL;
|
||||||
|
|
||||||
dDebug("node:%s, start to consume from pqueue", proc->name);
|
dDebug("node:%s, start to consume from pqueue", proc->name);
|
||||||
do {
|
do {
|
||||||
|
@ -353,7 +353,7 @@ static void *dmConsumParentQueue(void *param) {
|
||||||
rpcRegisterBrokenLinkArg(pMsg);
|
rpcRegisterBrokenLinkArg(pMsg);
|
||||||
} else if (ftype == DND_FUNC_RELEASE) {
|
} else if (ftype == DND_FUNC_RELEASE) {
|
||||||
dmRemoveProcRpcHandle(proc, pMsg->info.handle);
|
dmRemoveProcRpcHandle(proc, pMsg->info.handle);
|
||||||
rpcReleaseHandle(pMsg->info.handle, (int8_t)pMsg->code);
|
rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER);
|
||||||
} else {
|
} else {
|
||||||
dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype);
|
dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
|
|
@ -245,7 +245,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
|
||||||
SRpcMsg msg = {.code = type, .info = *pHandle};
|
SRpcMsg msg = {.code = type, .info = *pHandle};
|
||||||
dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
|
dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
|
||||||
} else {
|
} else {
|
||||||
rpcReleaseHandle(pHandle->handle, type);
|
rpcReleaseHandle(pHandle, type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -262,13 +262,17 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
||||||
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->info.persistHandle == 1)
|
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->info.persistHandle == 1)
|
||||||
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
|
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
|
||||||
|
|
||||||
|
#define EPSET_IS_VALID(epSet) ((epSet) != NULL && (epSet)->numOfEps != 0)
|
||||||
#define EPSET_GET_SIZE(epSet) (epSet)->numOfEps
|
#define EPSET_GET_SIZE(epSet) (epSet)->numOfEps
|
||||||
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
|
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
|
||||||
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
|
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
|
||||||
#define EPSET_FORWARD_INUSE(epSet) \
|
#define EPSET_FORWARD_INUSE(epSet) \
|
||||||
do { \
|
do { \
|
||||||
(epSet)->inUse = (++((epSet)->inUse)) % ((epSet)->numOfEps); \
|
if ((epSet)->numOfEps != 0) { \
|
||||||
|
(epSet)->inUse = (++((epSet)->inUse)) % ((epSet)->numOfEps); \
|
||||||
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define EPSET_DEBUG_STR(epSet, tbuf) \
|
#define EPSET_DEBUG_STR(epSet, tbuf) \
|
||||||
do { \
|
do { \
|
||||||
int len = snprintf(tbuf, sizeof(tbuf), "epset:{"); \
|
int len = snprintf(tbuf, sizeof(tbuf), "epset:{"); \
|
||||||
|
@ -512,7 +516,6 @@ static void allocConnRef(SCliConn* conn, bool update) {
|
||||||
}
|
}
|
||||||
static void addConnToPool(void* pool, SCliConn* conn) {
|
static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
if (conn->status == ConnInPool) {
|
if (conn->status == ConnInPool) {
|
||||||
// assert(0);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SCliThrd* thrd = conn->hostThrd;
|
SCliThrd* thrd = conn->hostThrd;
|
||||||
|
@ -668,7 +671,6 @@ static void cliSendCb(uv_write_t* req, int status) {
|
||||||
void cliSend(SCliConn* pConn) {
|
void cliSend(SCliConn* pConn) {
|
||||||
CONN_HANDLE_BROKEN(pConn);
|
CONN_HANDLE_BROKEN(pConn);
|
||||||
|
|
||||||
// assert(taosArrayGetSize(pConn->cliMsgs) > 0);
|
|
||||||
assert(!transQueueEmpty(&pConn->cliMsgs));
|
assert(!transQueueEmpty(&pConn->cliMsgs));
|
||||||
|
|
||||||
SCliMsg* pCliMsg = NULL;
|
SCliMsg* pCliMsg = NULL;
|
||||||
|
@ -810,6 +812,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
|
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
|
||||||
|
if (!EPSET_IS_VALID(&pCtx->epSet)) {
|
||||||
|
destroyCmsg(pMsg);
|
||||||
|
tError("invalid epset");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
bool ignore = false;
|
bool ignore = false;
|
||||||
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
|
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
|
||||||
|
@ -1077,12 +1084,14 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
} else {
|
} else {
|
||||||
cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT);
|
cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT);
|
||||||
if (pCtx->retryCnt < pCtx->retryLimit) {
|
if (pCtx->retryCnt < pCtx->retryLimit) {
|
||||||
addConnToPool(pThrd->pool, pConn);
|
|
||||||
if (pResp->contLen == 0) {
|
if (pResp->contLen == 0) {
|
||||||
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
||||||
} else {
|
} else {
|
||||||
tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet);
|
if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet) < 0) {
|
||||||
|
tError("%s conn %p failed to deserialize epset", CONN_GET_INST_LABEL(pConn));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
addConnToPool(pThrd->pool, pConn);
|
||||||
transFreeMsg(pResp->pCont);
|
transFreeMsg(pResp->pCont);
|
||||||
cliSchedMsgToNextNode(pMsg, pThrd);
|
cliSchedMsgToNextNode(pMsg, pThrd);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -1029,8 +1029,9 @@ void transUnrefSrvHandle(void* handle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void transReleaseSrvHandle(void* handle) {
|
void transReleaseSrvHandle(void* handle) {
|
||||||
SExHandle* exh = handle;
|
SRpcHandleInfo* info = handle;
|
||||||
int64_t refId = exh->refId;
|
SExHandle* exh = info->handle;
|
||||||
|
int64_t refId = info->refId;
|
||||||
|
|
||||||
ASYNC_CHECK_HANDLE(exh, refId);
|
ASYNC_CHECK_HANDLE(exh, refId);
|
||||||
|
|
||||||
|
|
|
@ -175,7 +175,7 @@ static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)
|
||||||
rpcMsg.code = 0;
|
rpcMsg.code = 0;
|
||||||
rpcSendResponse(&rpcMsg);
|
rpcSendResponse(&rpcMsg);
|
||||||
|
|
||||||
rpcReleaseHandle(pMsg->info.handle, TAOS_CONN_SERVER);
|
rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER);
|
||||||
}
|
}
|
||||||
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue