opt transport

This commit is contained in:
yihaoDeng 2024-09-08 07:30:16 +08:00
parent 488cccd10e
commit 0b440aad0c
1 changed files with 53 additions and 60 deletions

View File

@ -1147,45 +1147,45 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
}
}
static int32_t cliAddReqToConn(SCliConn* conn, SCliReq* pReq) {
if (transQueuePush(&conn->reqs, pReq) != 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return 0;
}
// static int32_t cliAddReqToConn(SCliConn* conn, SCliReq* pReq) {
// if (transQueuePush(&conn->reqs, pReq) != 0) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
// return 0;
// }
static int32_t cliRmReqFromConn(SCliConn* conn, SCliReq** pReq) {
// do nothing
SCliReq* pTail = transQueuePop(&conn->reqs);
if (pTail == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (pReq != NULL) {
*pReq = pTail;
}
return 0;
}
static int32_t cliPutQReqToTable(SCliConn* pConn, SCliReq* pReq) {
int32_t code = 0;
if (pReq->msg.info.handle == 0) {
return 0;
}
// static int32_t cliRmReqFromConn(SCliConn* conn, SCliReq** pReq) {
// // do nothing
// SCliReq* pTail = transQueuePop(&conn->reqs);
// if (pTail == NULL) {
// return TSDB_CODE_INVALID_PARA;
// }
// if (pReq != NULL) {
// *pReq = pTail;
// }
// return 0;
// }
// static int32_t cliPutQReqToTable(SCliConn* pConn, SCliReq* pReq) {
// int32_t code = 0;
// if (pReq->msg.info.handle == 0) {
// return 0;
// }
queue q;
QUEUE_INIT(&q);
// queue q;
// QUEUE_INIT(&q);
queue* p = taosHashGet(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t));
if (p == NULL) {
QUEUE_PUSH(&q, &pReq->qlist);
code = taosHashPut(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t), &q, sizeof(queue));
if (code != 0) {
return code;
}
} else {
QUEUE_PUSH(p, &pReq->qlist);
}
return 0;
}
// queue* p = taosHashGet(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t));
// if (p == NULL) {
// QUEUE_PUSH(&q, &pReq->qlist);
// code = taosHashPut(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t), &q, sizeof(queue));
// if (code != 0) {
// return code;
// }
// } else {
// QUEUE_PUSH(p, &pReq->qlist);
// }
// return 0;
// }
static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) {
int32_t code = 0;
SCliConn* pConn = NULL;
@ -1218,18 +1218,11 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
conn->ipStr = taosStrdup(ip);
conn->port = port;
transReqQueueInit(&conn->wreqQueue);
QUEUE_INIT(&conn->q);
conn->hostThrd = pThrd;
conn->status = ConnNormal;
conn->broken = false;
conn->pQueryTable = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
TAOS_CHECK_GOTO(transQueueInit(&conn->reqs, NULL), NULL, _failed);
TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed);
transRefCliHandle(conn);
QUEUE_INIT(&conn->q);
transReqQueueInit(&conn->wreqQueue);
@ -1237,9 +1230,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed);
QUEUE_INIT(&conn->q);
conn->hostThrd = pThrd;
transRefCliHandle(conn);
conn->seq = 0;
conn->pQTable = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
@ -1263,10 +1254,11 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
code = TSDB_CODE_THIRDPARTY_ERROR;
TAOS_CHECK_GOTO(code, NULL, _failed);
}
conn->stream->data = conn;
conn->connReq.data = conn;
transRefCliHandle(conn);
*pCliConn = conn;
return code;
_failed:
@ -1276,6 +1268,9 @@ _failed:
(void)transDestroyBuffer(&conn->readBuf);
transQueueDestroy(&conn->reqs);
taosMemoryFree(conn->dstAddr);
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
}
tError("failed to create conn, code:%d", code);
taosMemoryFree(conn);
@ -1286,7 +1281,6 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
conn->broken = true;
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
conn->broken = true;
if (conn->list == NULL) {
@ -1334,15 +1328,14 @@ static void cliDestroy(uv_handle_t* handle) {
delConnFromHeapCache(pThrd->connHeapCache, conn);
taosMemoryFree(conn->dstAddr);
taosMemoryFree(conn->stream);
taosMemoryFree(conn->ipStr);
void* pIter = taosHashIterate(conn->pQTable, NULL);
while (pIter) {
int64_t qid = *(int64_t*)pIter;
(void)taosHashRemove(pThrd->pIdConnTable, &qid, sizeof(qid));
int64_t* qid = taosHashGetKey(pIter, NULL);
(void)taosHashRemove(pThrd->pIdConnTable, qid, sizeof(*qid));
pIter = taosHashIterate(conn->pQTable, pIter);
tDebug("%s conn %p destroy state %ld", CONN_GET_INST_LABEL(conn), conn, qid);
tDebug("%s conn %p destroy state %ld", CONN_GET_INST_LABEL(conn), conn, *qid);
}
cliDestroyConnMsgs(conn, true);
@ -2482,10 +2475,10 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (pThrd->failFastCache == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
// pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
// if (pThrd->failFastCache == NULL) {
// TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
// }
pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (pThrd->batchCache == NULL) {
@ -2577,11 +2570,11 @@ static void destroyThrdObj(SCliThrd* pThrd) {
}
taosHashCleanup(pThrd->batchCache);
void** pIter2 = taosHashIterate(pThrd->connHeapCache, NULL);
void* pIter2 = taosHashIterate(pThrd->connHeapCache, NULL);
while (pIter2 != NULL) {
SHeap* heap = (SHeap*)(*pIter2);
SHeap* heap = (SHeap*)(pIter2);
transHeapDestroy(heap);
pIter2 = (void**)taosHashIterate(pThrd->connHeapCache, pIter2);
pIter2 = (void*)taosHashIterate(pThrd->connHeapCache, pIter2);
}
taosHashCleanup(pThrd->connHeapCache);