Merge pull request #20058 from taosdata/fix/invalidRead
fix: fix invalid read-write
This commit is contained in:
commit
243dfc42aa
|
@ -727,7 +727,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
|
||||||
QUEUE_INIT(&conn->q);
|
QUEUE_INIT(&conn->q);
|
||||||
conn->hostThrd = pThrd;
|
conn->hostThrd = pThrd;
|
||||||
conn->status = ConnNormal;
|
conn->status = ConnNormal;
|
||||||
conn->broken = 0;
|
conn->broken = false;
|
||||||
transRefCliHandle(conn);
|
transRefCliHandle(conn);
|
||||||
|
|
||||||
atomic_add_fetch_32(&pThrd->connCount, 1);
|
atomic_add_fetch_32(&pThrd->connCount, 1);
|
||||||
|
@ -997,6 +997,11 @@ static void cliDestroyBatch(SCliBatch* pBatch) {
|
||||||
taosMemoryFree(pBatch);
|
taosMemoryFree(pBatch);
|
||||||
}
|
}
|
||||||
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
|
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
|
||||||
|
if (pThrd->quit == true) {
|
||||||
|
cliDestroyBatch(pBatch);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
|
if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1082,18 +1087,24 @@ static void cliSendBatchCb(uv_write_t* req, int status) {
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
|
tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
|
||||||
p->wLen, p->batchSize, uv_err_name(status));
|
p->wLen, p->batchSize, uv_err_name(status));
|
||||||
cliHandleExcept(conn);
|
|
||||||
|
if (!uv_is_closing((uv_handle_t*)&conn->stream)) cliHandleExcept(conn);
|
||||||
|
|
||||||
cliHandleBatchReq(nxtBatch, thrd);
|
cliHandleBatchReq(nxtBatch, thrd);
|
||||||
} else {
|
} else {
|
||||||
tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen,
|
tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen,
|
||||||
p->batchSize);
|
p->batchSize);
|
||||||
|
if (!uv_is_closing((uv_handle_t*)&conn->stream)) {
|
||||||
if (nxtBatch != NULL) {
|
if (nxtBatch != NULL) {
|
||||||
conn->pBatch = nxtBatch;
|
conn->pBatch = nxtBatch;
|
||||||
cliSendBatch(conn);
|
cliSendBatch(conn);
|
||||||
} else {
|
} else {
|
||||||
addConnToPool(thrd->pool, conn);
|
addConnToPool(thrd->pool, conn);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
cliDestroyBatch(nxtBatch);
|
||||||
|
// conn release by other callback
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cliDestroyBatch(p);
|
cliDestroyBatch(p);
|
||||||
|
@ -1454,6 +1465,11 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
|
||||||
QUEUE_REMOVE(h);
|
QUEUE_REMOVE(h);
|
||||||
|
|
||||||
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||||
|
|
||||||
|
if (pMsg->type == Quit) {
|
||||||
|
pThrd->stopMsg = pMsg;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
|
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
|
||||||
|
|
||||||
count++;
|
count++;
|
||||||
|
@ -1485,6 +1501,12 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
|
||||||
QUEUE_REMOVE(h);
|
QUEUE_REMOVE(h);
|
||||||
|
|
||||||
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||||
|
|
||||||
|
if (pMsg->type == Quit) {
|
||||||
|
pThrd->stopMsg = pMsg;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
|
if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
|
|
||||||
|
@ -1582,7 +1604,6 @@ static void cliAsyncCb(uv_async_t* handle) {
|
||||||
SCliThrd* pThrd = item->pThrd;
|
SCliThrd* pThrd = item->pThrd;
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
SCliMsg* pMsg = NULL;
|
|
||||||
// batch process to avoid to lock/unlock frequently
|
// batch process to avoid to lock/unlock frequently
|
||||||
queue wq;
|
queue wq;
|
||||||
taosThreadMutexLock(&item->mtx);
|
taosThreadMutexLock(&item->mtx);
|
||||||
|
@ -2285,22 +2306,8 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
|
||||||
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||||
}
|
}
|
||||||
/*if (pTransInst->connLimitNum > 0 && REQUEST_NO_RESP(pReq)) {
|
|
||||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
|
||||||
char* ip = EPSET_GET_INUSE_IP((SEpSet*)pEpSet);
|
|
||||||
uint16_t port = EPSET_GET_INUSE_PORT((SEpSet*)pEpSet);
|
|
||||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
|
||||||
|
|
||||||
int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key));
|
|
||||||
if (val != NULL && *val >= pTransInst->connLimitNum) {
|
|
||||||
transFreeMsg(pReq->pCont);
|
|
||||||
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
|
||||||
return TSDB_CODE_RPC_MAX_SESSIONS;
|
|
||||||
}
|
|
||||||
}*/
|
|
||||||
|
|
||||||
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
||||||
|
|
||||||
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||||
pCtx->epSet = *pEpSet;
|
pCtx->epSet = *pEpSet;
|
||||||
pCtx->ahandle = pReq->info.ahandle;
|
pCtx->ahandle = pReq->info.ahandle;
|
||||||
|
|
Loading…
Reference in New Issue