refactor transport

This commit is contained in:
yihaoDeng 2024-09-05 11:29:43 +08:00
parent ab66eefcb4
commit 3940997194
1 changed files with 46 additions and 1 deletions

View File

@ -66,6 +66,7 @@ typedef struct SCliConn {
SConnBuffer readBuf;
STransQueue reqs;
SHashObj* pQueryTable;
queue q;
SConnList* list;
@ -109,6 +110,8 @@ typedef struct SCliReq {
int sent; //(0: no send, 1: alread sent)
queue seqq;
int32_t seq;
queue qlist;
} SCliReq;
typedef struct SCliThrd {
@ -1234,6 +1237,27 @@ static int32_t cliRmReqFromConn(SCliConn* conn, SCliReq** pReq) {
}
return 0;
}
static int32_t cliPutQReqToTable(SCliConn* pConn, SCliReq* pReq) {
int32_t code = 0;
if (pReq->msg.info.handle == 0) {
return 0;
}
queue q;
QUEUE_INIT(&q);
queue* p = taosHashGet(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t));
if (p == NULL) {
QUEUE_PUSH(&q, &pReq->qlist);
code = taosHashPut(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t), &q, sizeof(queue));
if (code != 0) {
return code;
}
} else {
QUEUE_PUSH(p, &pReq->qlist);
}
return 0;
}
static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) {
int32_t code = 0;
SCliConn* pConn = NULL;
@ -1242,6 +1266,12 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn)
TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception);
if (pReq->msg.info.handle != 0) {
// SExHandle *p = transAcquireExHandle(transGetRefMgt(), (int64_t)pReq->msg.info.handle);
// TAOS_CHECK_GOTO(specifyConnRef(pConn, false, pReq->msg.info.handle), NULL, _exception);
// } else {
// TAOS_CHECK_GOTO(allocConnRef(pConn, false), NULL, _exception);
}
transQueuePush(&pConn->reqs, pReq);
return cliDoConn(pThrd, pConn);
@ -1271,6 +1301,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
conn->status = ConnNormal;
conn->broken = false;
conn->pQueryTable = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
TAOS_CHECK_GOTO(transQueueInit(&conn->reqs, NULL), NULL, _failed);
TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed);
@ -1701,7 +1732,7 @@ void cliConnCb(uv_connect_t* req, int status) {
// } else if (timeout == true) {
// // already deal by timeout
// }
// return;
return;
}
cliConnSetSockInfo(pConn);
@ -1974,6 +2005,20 @@ static void doFreeTimeoutMsg(void* param) {
taosMemoryFree(arg);
}
int32_t cliConnHandleQueryById(SCliReq* pReq) {
if (pReq->msg.info.handle == 0) {
return 0;
} else {
int64_t queryId = (int64_t)pReq->msg.info.handle;
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), queryId);
if (exh->inited == 1) {
} else {
}
transReleaseExHandle(transGetRefMgt(), queryId);
}
return 0;
}
void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) {
int32_t lino = 0;
STransMsg resp = {0};