opt transport

This commit is contained in:
yihaoDeng 2024-09-11 08:23:02 +08:00
parent 56793dd89b
commit 60cb1cbf6b
3 changed files with 86 additions and 73 deletions

View File

@ -363,8 +363,9 @@ void transReqQueueClear(queue* q);
// queue sending msgs // queue sending msgs
typedef struct { typedef struct {
SArray* q; queue node;
void (*freeFunc)(const void* arg); void (*freeFunc)(const void* arg);
int32_t size;
} STransQueue; } STransQueue;
/* /*
@ -377,7 +378,7 @@ int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg));
* put arg into queue * put arg into queue
* if queue'size > 1, return false; else return true * if queue'size > 1, return false; else return true
*/ */
bool transQueuePush(STransQueue* queue, void* arg); int32_t transQueuePush(STransQueue* queue, void* arg);
/* /*
* the size of queue * the size of queue
*/ */
@ -390,10 +391,21 @@ void* transQueuePop(STransQueue* queue);
* get ith from queue * get ith from queue
*/ */
void* transQueueGet(STransQueue* queue, int i); void* transQueueGet(STransQueue* queue, int i);
/*
* head elm from queue
*/
void* tranQueueHead(STransQueue* q);
/* /*
* rm ith from queue * rm ith from queue
*/ */
void* transQueueRm(STransQueue* queue, int i); void* transQueueRm(STransQueue* queue, int i);
/*
* remove el from queue
*/
void transQueueRemove(STransQueue* q, void* e);
/* /*
* queue empty or not * queue empty or not
*/ */

View File

@ -437,82 +437,75 @@ void transReqQueueClear(queue* q) {
} }
} }
int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) { int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(const void* arg)) {
queue->q = taosArrayInit(2, sizeof(void*)); QUEUE_INIT(&wq->node);
if (queue->q == NULL) { wq->freeFunc = (void (*)(const void*))freeFunc;
return TSDB_CODE_OUT_OF_MEMORY; wq->size = 0;
} return 0;
queue->freeFunc = (void (*)(const void*))freeFunc; }
int32_t transQueuePush(STransQueue* q, void* arg) {
queue* node = arg;
QUEUE_PUSH(&q->node, node);
q->size++;
return 0; return 0;
} }
bool transQueuePush(STransQueue* queue, void* arg) { void* transQueuePop(STransQueue* q) {
if (queue->q == NULL) { if (q->size == 0) return NULL;
return true;
}
(void)taosArrayPush(queue->q, &arg);
if (taosArrayGetSize(queue->q) > 1) {
return false;
}
return true;
}
void* transQueuePop(STransQueue* queue) {
if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
return NULL;
}
void* ptr = taosArrayGetP(queue->q, 0);
taosArrayRemove(queue->q, 0);
return ptr;
}
int32_t transQueueSize(STransQueue* queue) {
if (queue->q == NULL) {
return 0;
}
return taosArrayGetSize(queue->q);
}
void* transQueueGet(STransQueue* queue, int i) {
if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
return NULL;
}
if (i >= taosArrayGetSize(queue->q)) {
return NULL;
}
void* ptr = taosArrayGetP(queue->q, i); queue* tail = QUEUE_TAIL(&q->node);
return ptr; QUEUE_REMOVE(tail);
return tail;
}
int32_t transQueueSize(STransQueue* q) { return q->size; }
void* transQueueGet(STransQueue* q, int idx) {
if (q->size == 0) return NULL;
while (idx-- > 0) {
queue* node = QUEUE_NEXT(&q->node);
if (node == &q->node) return NULL;
}
return NULL;
} }
void* transQueueRm(STransQueue* queue, int i) { void* tranQueueHead(STransQueue* q) {
if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) { if (q->size == 0) return NULL;
return NULL;
} queue* head = QUEUE_HEAD(&q->node);
if (i >= taosArrayGetSize(queue->q)) { return head;
return NULL;
}
void* ptr = taosArrayGetP(queue->q, i);
taosArrayRemove(queue->q, i);
return ptr;
} }
bool transQueueEmpty(STransQueue* queue) { void* transQueueRm(STransQueue* q, int i) {
if (queue->q == NULL) { // if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
return true; // return NULL;
// }
// if (i >= taosArrayGetSize(queue->q)) {
// return NULL;
// }
// void* ptr = taosArrayGetP(queue->q, i);
// taosArrayRemove(queue->q, i);
// return ptr;
return NULL;
}
void transQueueRemove(STransQueue* q, void* e) {
queue* node = e;
QUEUE_REMOVE(node);
q->size--;
}
bool transQueueEmpty(STransQueue* q) { return q->size == 0 ? true : false; }
void transQueueClear(STransQueue* q) {
while (!QUEUE_IS_EMPTY(q->node)) {
queue* h = QUEUE_HEAD(&q->node);
QUEUE_REMOVE(h);
q->freeFunc(h);
q->size--;
} }
return taosArrayGetSize(queue->q) == 0;
}
void transQueueClear(STransQueue* queue) {
if (queue->freeFunc != NULL) {
for (int i = 0; i < taosArrayGetSize(queue->q); i++) {
void* p = taosArrayGetP(queue->q, i);
queue->freeFunc(p);
}
}
taosArrayClear(queue->q);
}
void transQueueDestroy(STransQueue* queue) {
transQueueClear(queue);
taosArrayDestroy(queue->q);
} }
void transQueueDestroy(STransQueue* q) { transQueueClear(q); }
static FORCE_INLINE int32_t timeCompare(const HeapNode* a, const HeapNode* b) { static FORCE_INLINE int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
SDelayTask* arg1 = container_of(a, SDelayTask, node); SDelayTask* arg1 = container_of(a, SDelayTask, node);

View File

@ -629,21 +629,30 @@ void uvOnSendCb(uv_write_t* req, int status) {
SSvrConn* conn = userReq->conn; SSvrConn* conn = userReq->conn;
queue* src = &userReq->node; queue* src = &userReq->node;
tDebug("%s conn %p send data", transLabel(conn->pInst), conn); tDebug("%s conn %p send data out ", transLabel(conn->pInst), conn);
if (status == 0) { if (status == 0) {
while (!QUEUE_IS_EMPTY(src)) { while (!QUEUE_IS_EMPTY(src)) {
queue* head = QUEUE_HEAD(&src); queue* head = QUEUE_HEAD(&src);
QUEUE_REMOVE(head); QUEUE_REMOVE(head);
SSvrMsg* smsg = QUEUE_DATA(head, SSvrMsg, sendReq); SSvrMsg* smsg = QUEUE_DATA(head, SSvrMsg, sendReq);
STraceId* trace = &smsg->msg.info.traceId; STraceId* trace = &smsg->msg.info.traceId;
tGDebug("%s conn %p msg already send out, seqNum:%d, qid:%ld", transLabel(conn->pInst), conn, tGDebug("%s conn %p msg already send out, seqNum:%d, qid:%ld", transLabel(conn->pInst), conn,
smsg->msg.info.seqNum, smsg->msg.info.qId); smsg->msg.info.seqNum, smsg->msg.info.qId);
destroySmsg(smsg); destroySmsg(smsg);
} }
} else { } else {
while (!QUEUE_IS_EMPTY(src)) {
queue* head = QUEUE_HEAD(&src);
QUEUE_REMOVE(head);
SSvrMsg* smsg = QUEUE_DATA(head, SSvrMsg, sendReq);
STraceId* trace = &smsg->msg.info.traceId;
tGDebug("%s conn %p failed to send, seqNum:%d, qid:%ld, reason:%s", transLabel(conn->pInst), conn,
smsg->msg.info.seqNum, smsg->msg.info.qId, uv_err_name(status));
destroySmsg(smsg);
}
if (!uv_is_closing((uv_handle_t*)(conn->pTcp))) { if (!uv_is_closing((uv_handle_t*)(conn->pTcp))) {
tError("conn %p failed to write data, %s", conn, uv_err_name(status)); tError("conn %p failed to write data, %s", conn, uv_err_name(status));
conn->broken = true; conn->broken = true;
@ -766,7 +775,6 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
SWriteReq* pWreq = taosMemoryCalloc(1, sizeof(SWriteReq)); SWriteReq* pWreq = taosMemoryCalloc(1, sizeof(SWriteReq));
pWreq->conn = pConn; pWreq->conn = pConn;
QUEUE_INIT(&pWreq->q);
QUEUE_INIT(&pWreq->node); QUEUE_INIT(&pWreq->node);
pWreq->req.data = pWreq; pWreq->req.data = pWreq;