diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 502cc71cf2..994469ba86 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1147,45 +1147,45 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { } } -static int32_t cliAddReqToConn(SCliConn* conn, SCliReq* pReq) { - if (transQueuePush(&conn->reqs, pReq) != 0) { - return TSDB_CODE_OUT_OF_MEMORY; - } - return 0; -} +// static int32_t cliAddReqToConn(SCliConn* conn, SCliReq* pReq) { +// if (transQueuePush(&conn->reqs, pReq) != 0) { +// return TSDB_CODE_OUT_OF_MEMORY; +// } +// return 0; +// } -static int32_t cliRmReqFromConn(SCliConn* conn, SCliReq** pReq) { - // do nothing - SCliReq* pTail = transQueuePop(&conn->reqs); - if (pTail == NULL) { - return TSDB_CODE_INVALID_PARA; - } - if (pReq != NULL) { - *pReq = pTail; - } - return 0; -} -static int32_t cliPutQReqToTable(SCliConn* pConn, SCliReq* pReq) { - int32_t code = 0; - if (pReq->msg.info.handle == 0) { - return 0; - } +// static int32_t cliRmReqFromConn(SCliConn* conn, SCliReq** pReq) { +// // do nothing +// SCliReq* pTail = transQueuePop(&conn->reqs); +// if (pTail == NULL) { +// return TSDB_CODE_INVALID_PARA; +// } +// if (pReq != NULL) { +// *pReq = pTail; +// } +// return 0; +// } +// static int32_t cliPutQReqToTable(SCliConn* pConn, SCliReq* pReq) { +// int32_t code = 0; +// if (pReq->msg.info.handle == 0) { +// return 0; +// } - queue q; - QUEUE_INIT(&q); +// queue q; +// QUEUE_INIT(&q); - queue* p = taosHashGet(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t)); - if (p == NULL) { - QUEUE_PUSH(&q, &pReq->qlist); - code = taosHashPut(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t), &q, sizeof(queue)); - if (code != 0) { - return code; - } - } else { - QUEUE_PUSH(p, &pReq->qlist); - } - return 0; -} +// queue* p = taosHashGet(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t)); +// if (p == NULL) { +// QUEUE_PUSH(&q, &pReq->qlist); +// code = taosHashPut(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t), &q, sizeof(queue)); +// if (code != 0) { +// return code; +// } +// } else { +// QUEUE_PUSH(p, &pReq->qlist); +// } +// return 0; +// } static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) { int32_t code = 0; SCliConn* pConn = NULL; @@ -1218,18 +1218,11 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int conn->ipStr = taosStrdup(ip); conn->port = port; - transReqQueueInit(&conn->wreqQueue); QUEUE_INIT(&conn->q); conn->hostThrd = pThrd; conn->status = ConnNormal; conn->broken = false; - - conn->pQueryTable = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - TAOS_CHECK_GOTO(transQueueInit(&conn->reqs, NULL), NULL, _failed); - - TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); - - transRefCliHandle(conn); + QUEUE_INIT(&conn->q); transReqQueueInit(&conn->wreqQueue); @@ -1237,9 +1230,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); - QUEUE_INIT(&conn->q); conn->hostThrd = pThrd; - transRefCliHandle(conn); conn->seq = 0; conn->pQTable = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); @@ -1263,10 +1254,11 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int code = TSDB_CODE_THIRDPARTY_ERROR; TAOS_CHECK_GOTO(code, NULL, _failed); } - conn->stream->data = conn; conn->connReq.data = conn; + transRefCliHandle(conn); + *pCliConn = conn; return code; _failed: @@ -1276,6 +1268,9 @@ _failed: (void)transDestroyBuffer(&conn->readBuf); transQueueDestroy(&conn->reqs); taosMemoryFree(conn->dstAddr); + + (void)transReleaseExHandle(transGetRefMgt(), conn->refId); + (void)transRemoveExHandle(transGetRefMgt(), conn->refId); } tError("failed to create conn, code:%d", code); taosMemoryFree(conn); @@ -1286,7 +1281,6 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); conn->broken = true; QUEUE_REMOVE(&conn->q); - QUEUE_INIT(&conn->q); conn->broken = true; if (conn->list == NULL) { @@ -1334,15 +1328,14 @@ static void cliDestroy(uv_handle_t* handle) { delConnFromHeapCache(pThrd->connHeapCache, conn); taosMemoryFree(conn->dstAddr); taosMemoryFree(conn->stream); + taosMemoryFree(conn->ipStr); void* pIter = taosHashIterate(conn->pQTable, NULL); while (pIter) { - int64_t qid = *(int64_t*)pIter; - (void)taosHashRemove(pThrd->pIdConnTable, &qid, sizeof(qid)); - + int64_t* qid = taosHashGetKey(pIter, NULL); + (void)taosHashRemove(pThrd->pIdConnTable, qid, sizeof(*qid)); pIter = taosHashIterate(conn->pQTable, pIter); - - tDebug("%s conn %p destroy state %ld", CONN_GET_INST_LABEL(conn), conn, qid); + tDebug("%s conn %p destroy state %ld", CONN_GET_INST_LABEL(conn), conn, *qid); } cliDestroyConnMsgs(conn, true); @@ -2482,10 +2475,10 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } - pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - if (pThrd->failFastCache == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); - } + // pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + // if (pThrd->failFastCache == NULL) { + // TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + // } pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pThrd->batchCache == NULL) { @@ -2577,11 +2570,11 @@ static void destroyThrdObj(SCliThrd* pThrd) { } taosHashCleanup(pThrd->batchCache); - void** pIter2 = taosHashIterate(pThrd->connHeapCache, NULL); + void* pIter2 = taosHashIterate(pThrd->connHeapCache, NULL); while (pIter2 != NULL) { - SHeap* heap = (SHeap*)(*pIter2); + SHeap* heap = (SHeap*)(pIter2); transHeapDestroy(heap); - pIter2 = (void**)taosHashIterate(pThrd->connHeapCache, pIter2); + pIter2 = (void*)taosHashIterate(pThrd->connHeapCache, pIter2); } taosHashCleanup(pThrd->connHeapCache);