Revert "fix: avoid invalid read"
This commit is contained in:
parent
b09791d4af
commit
87f28ab358
|
@ -87,8 +87,8 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
|
||||||
static void dmCleanupProcQueue(SProcQueue *queue) {}
|
static void dmCleanupProcQueue(SProcQueue *queue) {}
|
||||||
|
|
||||||
static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) {
|
static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) {
|
||||||
const void * pHead = pMsg;
|
const void *pHead = pMsg;
|
||||||
const void * pBody = pMsg->pCont;
|
const void *pBody = pMsg->pCont;
|
||||||
const int16_t rawHeadLen = sizeof(SRpcMsg);
|
const int16_t rawHeadLen = sizeof(SRpcMsg);
|
||||||
const int32_t rawBodyLen = pMsg->contLen;
|
const int32_t rawBodyLen = pMsg->contLen;
|
||||||
const int16_t headLen = CEIL8(rawHeadLen);
|
const int16_t headLen = CEIL8(rawHeadLen);
|
||||||
|
@ -257,7 +257,7 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
|
||||||
proc->wrapper = pWrapper;
|
proc->wrapper = pWrapper;
|
||||||
proc->name = pWrapper->name;
|
proc->name = pWrapper->name;
|
||||||
|
|
||||||
SShm * shm = &proc->shm;
|
SShm *shm = &proc->shm;
|
||||||
int32_t cstart = 0;
|
int32_t cstart = 0;
|
||||||
int32_t csize = CEIL8(shm->size / 2);
|
int32_t csize = CEIL8(shm->size / 2);
|
||||||
int32_t pstart = csize;
|
int32_t pstart = csize;
|
||||||
|
@ -281,13 +281,13 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *dmConsumChildQueue(void *param) {
|
static void *dmConsumChildQueue(void *param) {
|
||||||
SProc * proc = param;
|
SProc *proc = param;
|
||||||
SMgmtWrapper *pWrapper = proc->wrapper;
|
SMgmtWrapper *pWrapper = proc->wrapper;
|
||||||
SProcQueue * queue = proc->cqueue;
|
SProcQueue *queue = proc->cqueue;
|
||||||
int32_t numOfMsgs = 0;
|
int32_t numOfMsgs = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
EProcFuncType ftype = DND_FUNC_REQ;
|
EProcFuncType ftype = DND_FUNC_REQ;
|
||||||
SRpcMsg * pMsg = NULL;
|
SRpcMsg *pMsg = NULL;
|
||||||
|
|
||||||
dDebug("node:%s, start to consume from cqueue", proc->name);
|
dDebug("node:%s, start to consume from cqueue", proc->name);
|
||||||
do {
|
do {
|
||||||
|
@ -324,13 +324,13 @@ static void *dmConsumChildQueue(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *dmConsumParentQueue(void *param) {
|
static void *dmConsumParentQueue(void *param) {
|
||||||
SProc * proc = param;
|
SProc *proc = param;
|
||||||
SMgmtWrapper *pWrapper = proc->wrapper;
|
SMgmtWrapper *pWrapper = proc->wrapper;
|
||||||
SProcQueue * queue = proc->pqueue;
|
SProcQueue *queue = proc->pqueue;
|
||||||
int32_t numOfMsgs = 0;
|
int32_t numOfMsgs = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
EProcFuncType ftype = DND_FUNC_REQ;
|
EProcFuncType ftype = DND_FUNC_REQ;
|
||||||
SRpcMsg * pMsg = NULL;
|
SRpcMsg *pMsg = NULL;
|
||||||
|
|
||||||
dDebug("node:%s, start to consume from pqueue", proc->name);
|
dDebug("node:%s, start to consume from pqueue", proc->name);
|
||||||
do {
|
do {
|
||||||
|
@ -353,7 +353,7 @@ static void *dmConsumParentQueue(void *param) {
|
||||||
rpcRegisterBrokenLinkArg(pMsg);
|
rpcRegisterBrokenLinkArg(pMsg);
|
||||||
} else if (ftype == DND_FUNC_RELEASE) {
|
} else if (ftype == DND_FUNC_RELEASE) {
|
||||||
dmRemoveProcRpcHandle(proc, pMsg->info.handle);
|
dmRemoveProcRpcHandle(proc, pMsg->info.handle);
|
||||||
rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER);
|
rpcReleaseHandle(pMsg->info.handle, (int8_t)pMsg->code);
|
||||||
} else {
|
} else {
|
||||||
dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype);
|
dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
|
|
@ -245,7 +245,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
|
||||||
SRpcMsg msg = {.code = type, .info = *pHandle};
|
SRpcMsg msg = {.code = type, .info = *pHandle};
|
||||||
dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
|
dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
|
||||||
} else {
|
} else {
|
||||||
rpcReleaseHandle(pHandle, type);
|
rpcReleaseHandle(pHandle->handle, type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1029,9 +1029,9 @@ void transUnrefSrvHandle(void* handle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void transReleaseSrvHandle(void* handle) {
|
void transReleaseSrvHandle(void* handle) {
|
||||||
SRpcHandleInfo* info = handle;
|
SExHandle* exh = handle;
|
||||||
SExHandle* exh = info->handle;
|
int64_t refId = exh->refId;
|
||||||
int64_t refId = info->refId;
|
|
||||||
ASYNC_CHECK_HANDLE(exh, refId);
|
ASYNC_CHECK_HANDLE(exh, refId);
|
||||||
|
|
||||||
SWorkThrd* pThrd = exh->pThrd;
|
SWorkThrd* pThrd = exh->pThrd;
|
||||||
|
|
Loading…
Reference in New Issue