fix: fix release/redirect problem

This commit is contained in:
Shengliang Guan 2022-06-21 16:24:58 +08:00
parent e87baa8df7
commit 8d0e6fa68f
2 changed files with 19 additions and 8 deletions

View File

@ -175,6 +175,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
int connStatus = conn->status; \
uint64_t ahandle = head->ahandle; \
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \
conn->status = ConnRelease; \
@ -186,7 +187,9 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
} \
destroyCmsg(pMsg); \
cliReleaseUnfinishedMsg(conn); \
addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \
if (connStatus != ConnInPool) { \
addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \
} \
return; \
} \
} while (0)
@ -450,7 +453,7 @@ void* destroyConnPool(void* pool) {
while (connList != NULL) {
while (!QUEUE_IS_EMPTY(&connList->conn)) {
queue* h = QUEUE_HEAD(&connList->conn);
QUEUE_REMOVE(h);
//QUEUE_REMOVE(h);
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
cliDestroyConn(c, true);
}
@ -476,11 +479,15 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
if (QUEUE_IS_EMPTY(&plist->conn)) {
return NULL;
}
queue* h = QUEUE_HEAD(&plist->conn);
QUEUE_REMOVE(h);
//QUEUE_REMOVE(h);
SCliConn* conn = QUEUE_DATA(h, SCliConn, conn);
conn->status = ConnNormal;
QUEUE_INIT(&conn->conn);
QUEUE_REMOVE(&conn->conn);
tTrace("conn %p conn key: %s: ",conn, key);
assert(h == &conn->conn);
//QUEUE_INIT(&conn->conn);
return conn;
}
static void addConnToPool(void* pool, SCliConn* conn) {
@ -500,6 +507,8 @@ static void addConnToPool(void* pool, SCliConn* conn) {
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
// list already create before
assert(plist != NULL);
tTrace("conn %p conn key: %s: ", conn, key);
//QUEUE_INIT(&conn->conn);
QUEUE_PUSH(&plist->conn, &conn->conn);
assert(!QUEUE_IS_EMPTY(&plist->conn));
}
@ -561,8 +570,8 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
}
static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->conn);
//QUEUE_INIT(&conn->conn);
if (clear) {
uv_close((uv_handle_t*)conn->stream, cliDestroy);
}
@ -1006,7 +1015,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
tTrace("%s use remote epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT);
}
addConnToPool(pThrd->pool, pConn);
if (pConn->status != ConnInPool) {
addConnToPool(pThrd->pool, pConn);
}
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;

View File

@ -737,7 +737,7 @@ static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) {
// conn set
QUEUE_INIT(&pThrd->conn);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 1, pThrd, uvWorkerAsyncCb);
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return true;