diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 7053c428e2..19322c1327 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -363,8 +363,9 @@ void transReqQueueClear(queue* q); // queue sending msgs typedef struct { - SArray* q; + queue node; void (*freeFunc)(const void* arg); + int32_t size; } STransQueue; /* @@ -377,7 +378,7 @@ int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)); * put arg into queue * if queue'size > 1, return false; else return true */ -bool transQueuePush(STransQueue* queue, void* arg); +int32_t transQueuePush(STransQueue* queue, void* arg); /* * the size of queue */ @@ -390,10 +391,21 @@ void* transQueuePop(STransQueue* queue); * get ith from queue */ void* transQueueGet(STransQueue* queue, int i); +/* + * head elm from queue + */ + +void* tranQueueHead(STransQueue* q); /* * rm ith from queue */ + void* transQueueRm(STransQueue* queue, int i); +/* + * remove el from queue + */ + +void transQueueRemove(STransQueue* q, void* e); /* * queue empty or not */ diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index add463bb45..19196106d3 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -437,82 +437,75 @@ void transReqQueueClear(queue* q) { } } -int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) { - queue->q = taosArrayInit(2, sizeof(void*)); - if (queue->q == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - queue->freeFunc = (void (*)(const void*))freeFunc; +int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(const void* arg)) { + QUEUE_INIT(&wq->node); + wq->freeFunc = (void (*)(const void*))freeFunc; + wq->size = 0; + return 0; +} +int32_t transQueuePush(STransQueue* q, void* arg) { + queue* node = arg; + QUEUE_PUSH(&q->node, node); + q->size++; return 0; } -bool transQueuePush(STransQueue* queue, void* arg) { - if (queue->q == NULL) { - return true; - } - (void)taosArrayPush(queue->q, &arg); - if (taosArrayGetSize(queue->q) > 1) { - return false; - } - return true; -} -void* transQueuePop(STransQueue* queue) { - if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) { - return NULL; - } - void* ptr = taosArrayGetP(queue->q, 0); - taosArrayRemove(queue->q, 0); - return ptr; -} -int32_t transQueueSize(STransQueue* queue) { - if (queue->q == NULL) { - return 0; - } - return taosArrayGetSize(queue->q); -} -void* transQueueGet(STransQueue* queue, int i) { - if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) { - return NULL; - } - if (i >= taosArrayGetSize(queue->q)) { - return NULL; - } +void* transQueuePop(STransQueue* q) { + if (q->size == 0) return NULL; - void* ptr = taosArrayGetP(queue->q, i); - return ptr; + queue* tail = QUEUE_TAIL(&q->node); + QUEUE_REMOVE(tail); + return tail; +} +int32_t transQueueSize(STransQueue* q) { return q->size; } + +void* transQueueGet(STransQueue* q, int idx) { + if (q->size == 0) return NULL; + + while (idx-- > 0) { + queue* node = QUEUE_NEXT(&q->node); + if (node == &q->node) return NULL; + } + return NULL; } -void* transQueueRm(STransQueue* queue, int i) { - if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) { - return NULL; - } - if (i >= taosArrayGetSize(queue->q)) { - return NULL; - } - void* ptr = taosArrayGetP(queue->q, i); - taosArrayRemove(queue->q, i); - return ptr; +void* tranQueueHead(STransQueue* q) { + if (q->size == 0) return NULL; + + queue* head = QUEUE_HEAD(&q->node); + return head; } -bool transQueueEmpty(STransQueue* queue) { - if (queue->q == NULL) { - return true; +void* transQueueRm(STransQueue* q, int i) { + // if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) { + // return NULL; + // } + // if (i >= taosArrayGetSize(queue->q)) { + // return NULL; + // } + // void* ptr = taosArrayGetP(queue->q, i); + // taosArrayRemove(queue->q, i); + // return ptr; + return NULL; +} + +void transQueueRemove(STransQueue* q, void* e) { + queue* node = e; + QUEUE_REMOVE(node); + q->size--; +} + +bool transQueueEmpty(STransQueue* q) { return q->size == 0 ? true : false; } + +void transQueueClear(STransQueue* q) { + while (!QUEUE_IS_EMPTY(q->node)) { + queue* h = QUEUE_HEAD(&q->node); + QUEUE_REMOVE(h); + q->freeFunc(h); + q->size--; } - return taosArrayGetSize(queue->q) == 0; -} -void transQueueClear(STransQueue* queue) { - if (queue->freeFunc != NULL) { - for (int i = 0; i < taosArrayGetSize(queue->q); i++) { - void* p = taosArrayGetP(queue->q, i); - queue->freeFunc(p); - } - } - taosArrayClear(queue->q); -} -void transQueueDestroy(STransQueue* queue) { - transQueueClear(queue); - taosArrayDestroy(queue->q); } +void transQueueDestroy(STransQueue* q) { transQueueClear(q); } static FORCE_INLINE int32_t timeCompare(const HeapNode* a, const HeapNode* b) { SDelayTask* arg1 = container_of(a, SDelayTask, node); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 1ce1cb0596..51b0784f73 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -629,21 +629,30 @@ void uvOnSendCb(uv_write_t* req, int status) { SSvrConn* conn = userReq->conn; queue* src = &userReq->node; - tDebug("%s conn %p send data", transLabel(conn->pInst), conn); - + tDebug("%s conn %p send data out ", transLabel(conn->pInst), conn); if (status == 0) { while (!QUEUE_IS_EMPTY(src)) { queue* head = QUEUE_HEAD(&src); QUEUE_REMOVE(head); - SSvrMsg* smsg = QUEUE_DATA(head, SSvrMsg, sendReq); - + SSvrMsg* smsg = QUEUE_DATA(head, SSvrMsg, sendReq); STraceId* trace = &smsg->msg.info.traceId; tGDebug("%s conn %p msg already send out, seqNum:%d, qid:%ld", transLabel(conn->pInst), conn, smsg->msg.info.seqNum, smsg->msg.info.qId); destroySmsg(smsg); } } else { + while (!QUEUE_IS_EMPTY(src)) { + queue* head = QUEUE_HEAD(&src); + QUEUE_REMOVE(head); + + SSvrMsg* smsg = QUEUE_DATA(head, SSvrMsg, sendReq); + STraceId* trace = &smsg->msg.info.traceId; + tGDebug("%s conn %p failed to send, seqNum:%d, qid:%ld, reason:%s", transLabel(conn->pInst), conn, + smsg->msg.info.seqNum, smsg->msg.info.qId, uv_err_name(status)); + destroySmsg(smsg); + } + if (!uv_is_closing((uv_handle_t*)(conn->pTcp))) { tError("conn %p failed to write data, %s", conn, uv_err_name(status)); conn->broken = true; @@ -766,7 +775,6 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) { SWriteReq* pWreq = taosMemoryCalloc(1, sizeof(SWriteReq)); pWreq->conn = pConn; - QUEUE_INIT(&pWreq->q); QUEUE_INIT(&pWreq->node); pWreq->req.data = pWreq;