refactor transport
This commit is contained in:
parent
f9290cf13f
commit
ab66eefcb4
|
@ -1064,20 +1064,13 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
|||
uv_read_stop(conn->stream);
|
||||
|
||||
conn->seq = 0;
|
||||
int32_t code = allocConnRef(conn, true);
|
||||
if (code != 0) {
|
||||
cliDestroyConn(conn, true);
|
||||
return;
|
||||
}
|
||||
|
||||
SCliThrd* thrd = conn->hostThrd;
|
||||
cliResetConnTimer(conn);
|
||||
if (T_REF_VAL_GET(conn) > 1) {
|
||||
transUnrefCliHandle(conn);
|
||||
}
|
||||
|
||||
cliDestroyConnMsgs(conn, false);
|
||||
conn->seq = 0;
|
||||
|
||||
if (conn->list == NULL && conn->dstAddr != NULL) {
|
||||
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr));
|
||||
|
@ -1248,7 +1241,6 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn)
|
|||
int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet);
|
||||
|
||||
TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception);
|
||||
// TAOS_CHECK_GOTO(addConnToHeapCache(pThrd->connHeapCache, pConn), NULL, _exception);
|
||||
|
||||
transQueuePush(&pConn->reqs, pReq);
|
||||
|
||||
|
@ -1332,7 +1324,44 @@ _failed:
|
|||
taosMemoryFree(conn);
|
||||
return code;
|
||||
}
|
||||
static void cliDestroyConn(SCliConn* conn, bool clear) {}
|
||||
static void cliDestroyConn(SCliConn* conn, bool clear) {
|
||||
SCliThrd* pThrd = conn->hostThrd;
|
||||
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||
conn->broken = true;
|
||||
QUEUE_REMOVE(&conn->q);
|
||||
QUEUE_INIT(&conn->q);
|
||||
|
||||
conn->broken = true;
|
||||
if (conn->list == NULL) {
|
||||
conn->list = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr));
|
||||
}
|
||||
|
||||
if (conn->list) {
|
||||
SConnList* list = conn->list;
|
||||
list->list->numOfConn--;
|
||||
if (conn->status == ConnInPool) {
|
||||
list->size--;
|
||||
}
|
||||
}
|
||||
conn->list = NULL;
|
||||
|
||||
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
|
||||
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||
conn->refId = -1;
|
||||
|
||||
if (conn->task != NULL) {
|
||||
transDQCancel(pThrd->timeoutQueue, conn->task);
|
||||
conn->task = NULL;
|
||||
}
|
||||
// cliResetTimer(pThrd, conn);
|
||||
|
||||
if (clear) {
|
||||
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
|
||||
(void)uv_read_stop(conn->stream);
|
||||
uv_close((uv_handle_t*)conn->stream, cliDestroy);
|
||||
}
|
||||
}
|
||||
}
|
||||
static void cliDestroy(uv_handle_t* handle) {
|
||||
if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
|
||||
return;
|
||||
|
@ -1350,6 +1379,8 @@ static void cliDestroy(uv_handle_t* handle) {
|
|||
|
||||
cliDestroyConnMsgs(conn, true);
|
||||
|
||||
delConnFromHeapCache(pThrd->connHeapCache, conn);
|
||||
|
||||
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||
transReqQueueClear(&conn->wreqQueue);
|
||||
(void)transDestroyBuffer(&conn->readBuf);
|
||||
|
@ -1435,7 +1466,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
|
|||
|
||||
int32_t totalLen = 0;
|
||||
if (size == 0) {
|
||||
tError("%s conn %p not msg to send", pInst->label, pConn);
|
||||
tDebug("%s conn %p not msg to send", pInst->label, pConn);
|
||||
return;
|
||||
}
|
||||
uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t));
|
||||
|
@ -3590,6 +3621,7 @@ static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {
|
|||
code = transHeapInsert(p, pConn);
|
||||
tDebug("add conn %p to heap cache for key:%s,status:%d, refCnt:%d", pConn, pConn->dstAddr, pConn->inHeap,
|
||||
pConn->reqRefCnt);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
|
||||
|
@ -3656,7 +3688,7 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p) {
|
|||
// impl later
|
||||
if (p->inHeap == 0) {
|
||||
tDebug("failed to del conn %p since not in heap", p);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
return 0;
|
||||
}
|
||||
p->inHeap = 0;
|
||||
p->reqRefCnt--;
|
||||
|
|
Loading…
Reference in New Issue