diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 70d4d43f09..c3808fba58 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -66,6 +66,7 @@ typedef struct SCliConn { SConnBuffer readBuf; STransQueue reqs; + SHashObj* pQueryTable; queue q; SConnList* list; @@ -109,6 +110,8 @@ typedef struct SCliReq { int sent; //(0: no send, 1: alread sent) queue seqq; int32_t seq; + + queue qlist; } SCliReq; typedef struct SCliThrd { @@ -1234,6 +1237,27 @@ static int32_t cliRmReqFromConn(SCliConn* conn, SCliReq** pReq) { } return 0; } +static int32_t cliPutQReqToTable(SCliConn* pConn, SCliReq* pReq) { + int32_t code = 0; + if (pReq->msg.info.handle == 0) { + return 0; + } + + queue q; + QUEUE_INIT(&q); + + queue* p = taosHashGet(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t)); + if (p == NULL) { + QUEUE_PUSH(&q, &pReq->qlist); + code = taosHashPut(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t), &q, sizeof(queue)); + if (code != 0) { + return code; + } + } else { + QUEUE_PUSH(p, &pReq->qlist); + } + return 0; +} static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) { int32_t code = 0; SCliConn* pConn = NULL; @@ -1242,6 +1266,12 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception); + if (pReq->msg.info.handle != 0) { + // SExHandle *p = transAcquireExHandle(transGetRefMgt(), (int64_t)pReq->msg.info.handle); + // TAOS_CHECK_GOTO(specifyConnRef(pConn, false, pReq->msg.info.handle), NULL, _exception); + // } else { + // TAOS_CHECK_GOTO(allocConnRef(pConn, false), NULL, _exception); + } transQueuePush(&pConn->reqs, pReq); return cliDoConn(pThrd, pConn); @@ -1271,6 +1301,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int conn->status = ConnNormal; conn->broken = false; + conn->pQueryTable = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); TAOS_CHECK_GOTO(transQueueInit(&conn->reqs, NULL), NULL, _failed); TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); @@ -1701,7 +1732,7 @@ void cliConnCb(uv_connect_t* req, int status) { // } else if (timeout == true) { // // already deal by timeout // } - // return; + return; } cliConnSetSockInfo(pConn); @@ -1974,6 +2005,20 @@ static void doFreeTimeoutMsg(void* param) { taosMemoryFree(arg); } +int32_t cliConnHandleQueryById(SCliReq* pReq) { + if (pReq->msg.info.handle == 0) { + return 0; + } else { + int64_t queryId = (int64_t)pReq->msg.info.handle; + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), queryId); + if (exh->inited == 1) { + + } else { + } + transReleaseExHandle(transGetRefMgt(), queryId); + } + return 0; +} void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) { int32_t lino = 0; STransMsg resp = {0};