diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index febc349e41..4bf62f2067 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -349,10 +349,8 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType); // request list typedef struct SWriteReq { - queue q; // gloabl queue node - queue node; // req queue node - void* conn; - uv_write_t req; + queue node; // req queue node + void* conn; } SWriteReq; void transReqQueueInit(queue* q); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8d76a419ef..f8328c5b69 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -57,7 +57,6 @@ typedef struct SCliConn { int32_t ref; uv_connect_t connReq; uv_stream_t* stream; - queue wreqQueue; uv_timer_t* timer; // read timer, forbidden @@ -909,8 +908,6 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int conn->broken = false; QUEUE_INIT(&conn->q); - transReqQueueInit(&conn->wreqQueue); - TAOS_CHECK_GOTO(transQueueInit(&conn->reqsToSend, cliDestroyMsg), NULL, _failed); TAOS_CHECK_GOTO(transQueueInit(&conn->reqsSentOut, cliDestroyMsg), NULL, _failed); @@ -1007,7 +1004,6 @@ static void cliDestroy(uv_handle_t* handle) { destroyWQ(&conn->wq); tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); - transReqQueueClear(&conn->wreqQueue); (void)transDestroyBuffer(&conn->readBuf); taosMemoryFree(conn); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 951b651ee1..7814ccafac 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -412,29 +412,32 @@ void transReqQueueInit(queue* q) { QUEUE_INIT(q); } void* transReqQueuePush(queue* q, SWriteReq* userReq) { - uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); - req->data = userReq; + return NULL; + // uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); + // req->data = userReq; - QUEUE_PUSH(q, &userReq->q); - return req; + // QUEUE_PUSH(q, &userReq->q); + // return req; } void* transReqQueueRemove(void* arg) { - void* ret = NULL; - uv_write_t* req = arg; + return NULL; + // void* ret = NULL; + // uv_write_t* req = arg; - SWriteReq* userReq = req ? req->data : NULL; - if (req == NULL) return NULL; - QUEUE_REMOVE(&userReq->q); + // SWriteReq* userReq = req ? req->data : NULL; + // if (req == NULL) return NULL; + // QUEUE_REMOVE(&userReq->q); - return userReq; + // return userReq; } void transReqQueueClear(queue* q) { - while (!QUEUE_IS_EMPTY(q)) { - queue* h = QUEUE_HEAD(q); - QUEUE_REMOVE(h); - SWriteReq* req = QUEUE_DATA(h, SWriteReq, q); - taosMemoryFree(req); - } + return; + // while (!QUEUE_IS_EMPTY(q)) { + // queue* h = QUEUE_HEAD(q); + // QUEUE_REMOVE(h); + // SWriteReq* req = QUEUE_DATA(h, SWriteReq, q); + // taosMemoryFree(req); + // } } int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) { @@ -883,3 +886,39 @@ int32_t transUtilSWhiteListToStr(SIpWhiteList* pList, char** ppBuf) { // STUB_RAND_NETWORK_ERR(status) // return status; // } + +int32_t initWQ(queue* wq) { + QUEUE_INIT(wq); + for (int i = 0; i < 4; i++) { + SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper)); + w->wreq.data = w; + w->arg = NULL; + QUEUE_PUSH(wq, &w->q); + } + return 0; +} +void destroyWQ(queue* wq) { + while (!QUEUE_IS_EMPTY(wq)) { + queue* h = QUEUE_HEAD(wq); + QUEUE_REMOVE(h); + SWReqsWrapper* w = QUEUE_DATA(h, SWReqsWrapper, q); + taosMemoryFree(w); + } +} + +uv_write_t* allocWReqFromWQ(queue* wq, void* arg) { + if (!QUEUE_IS_EMPTY(wq)) { + queue* node = QUEUE_HEAD(wq); + QUEUE_REMOVE(node); + SWReqsWrapper* w = QUEUE_DATA(node, SWReqsWrapper, q); + w->arg = arg; + return &w->wreq; + } else { + SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper)); + w->wreq.data = w; + w->arg = arg; + return &w->wreq; + } +} + +void freeWReqToWQ(queue* wq, SWReqsWrapper* w) { QUEUE_PUSH(wq, &w->q); } \ No newline at end of file diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index a4f5dd17bf..03fad980b5 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -27,7 +27,6 @@ typedef struct { typedef struct SSvrConn { int32_t ref; uv_tcp_t* pTcp; - queue wreqQueue; uv_timer_t pTimer; queue queue; @@ -1227,7 +1226,6 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end); } - transReqQueueInit(&pConn->wreqQueue); QUEUE_INIT(&pConn->queue); if ((code = transQueueInit(&pConn->resps, uvDestroyResp)) != 0) { @@ -1359,7 +1357,6 @@ static void uvDestroyConn(uv_handle_t* handle) { tDebug("%s conn %p destroy", transLabel(pInst), conn); transQueueDestroy(&conn->resps); - transReqQueueClear(&conn->wreqQueue); QUEUE_REMOVE(&conn->queue);