opt parameter
This commit is contained in:
parent
13b1e5ee4e
commit
fadfbf2824
|
@ -94,10 +94,6 @@ typedef void* queue[2];
|
||||||
/* Return the structure holding the given element. */
|
/* Return the structure holding the given element. */
|
||||||
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
|
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
queue q;
|
|
||||||
} queueWrapper;
|
|
||||||
|
|
||||||
// #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
|
// #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
|
||||||
// #define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
|
// #define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
|
||||||
#define TRANS_CONN_TIMEOUT 3000 // connect timeout (ms)
|
#define TRANS_CONN_TIMEOUT 3000 // connect timeout (ms)
|
||||||
|
|
|
@ -403,16 +403,16 @@ bool filteBySeq(void* key, void* arg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) {
|
int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
queueWrapper set;
|
queue set;
|
||||||
QUEUE_INIT(&set.q)
|
QUEUE_INIT(&set)
|
||||||
transQueueRemoveByFilter(&conn->reqsSentOut, filteBySeq, &seq, &set, 1);
|
transQueueRemoveByFilter(&conn->reqsSentOut, filteBySeq, &seq, &set, 1);
|
||||||
|
|
||||||
if (QUEUE_IS_EMPTY(&set.q)) {
|
if (QUEUE_IS_EMPTY(&set)) {
|
||||||
return TSDB_CODE_OUT_OF_RANGE;
|
return TSDB_CODE_OUT_OF_RANGE;
|
||||||
}
|
}
|
||||||
|
|
||||||
queue* e = QUEUE_HEAD(&set.q);
|
queue* e = QUEUE_HEAD(&set);
|
||||||
SCliReq* p = QUEUE_DATA(e, SCliReq, q);
|
SCliReq* p = QUEUE_DATA(e, SCliReq, q);
|
||||||
|
|
||||||
*pReq = p;
|
*pReq = p;
|
||||||
|
@ -481,13 +481,13 @@ int32_t cliConnMayHandleState_releaseReq(SCliConn* conn, STransMsgHead* pHead) {
|
||||||
tDebug("%s %p reqToSend:%d, sentOut:%d", CONN_GET_INST_LABEL(conn), conn, transQueueSize(&conn->reqsToSend),
|
tDebug("%s %p reqToSend:%d, sentOut:%d", CONN_GET_INST_LABEL(conn), conn, transQueueSize(&conn->reqsToSend),
|
||||||
transQueueSize(&conn->reqsSentOut));
|
transQueueSize(&conn->reqsSentOut));
|
||||||
|
|
||||||
queueWrapper set;
|
queue set;
|
||||||
QUEUE_INIT(&set.q);
|
QUEUE_INIT(&set);
|
||||||
transQueueRemoveByFilter(&conn->reqsSentOut, filterByQid, &qId, &set, -1);
|
transQueueRemoveByFilter(&conn->reqsSentOut, filterByQid, &qId, &set, -1);
|
||||||
transQueueRemoveByFilter(&conn->reqsToSend, filterByQid, &qId, &set, -1);
|
transQueueRemoveByFilter(&conn->reqsToSend, filterByQid, &qId, &set, -1);
|
||||||
|
|
||||||
while (!QUEUE_IS_EMPTY(&set.q)) {
|
while (!QUEUE_IS_EMPTY(&set)) {
|
||||||
queue* el = QUEUE_HEAD(&set.q);
|
queue* el = QUEUE_HEAD(&set);
|
||||||
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
||||||
QUEUE_REMOVE(el);
|
QUEUE_REMOVE(el);
|
||||||
if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
|
if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
|
||||||
|
@ -1317,12 +1317,12 @@ bool fileToRmReq(void* h, void* arg) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
static void cliConnRmReqs(SCliConn* conn) {
|
static void cliConnRmReqs(SCliConn* conn) {
|
||||||
queueWrapper set;
|
queue set;
|
||||||
QUEUE_INIT(&set.q);
|
QUEUE_INIT(&set);
|
||||||
|
|
||||||
transQueueRemoveByFilter(&conn->reqsSentOut, fileToRmReq, NULL, &set, -1);
|
transQueueRemoveByFilter(&conn->reqsSentOut, fileToRmReq, NULL, &set, -1);
|
||||||
while (!QUEUE_IS_EMPTY(&set.q)) {
|
while (!QUEUE_IS_EMPTY(&set)) {
|
||||||
queue* el = QUEUE_HEAD(&set.q);
|
queue* el = QUEUE_HEAD(&set);
|
||||||
QUEUE_REMOVE(el);
|
QUEUE_REMOVE(el);
|
||||||
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
||||||
destroyReq(pReq);
|
destroyReq(pReq);
|
||||||
|
|
|
@ -471,14 +471,14 @@ void* transQueueGet(STransQueue* q, int idx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg), void* arg, void* dst, int32_t size) {
|
void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg), void* arg, void* dst, int32_t size) {
|
||||||
queueWrapper* d = dst;
|
queue* d = dst;
|
||||||
queue* node = QUEUE_NEXT(&q->node);
|
queue* node = QUEUE_NEXT(&q->node);
|
||||||
while (node != &q->node) {
|
while (node != &q->node) {
|
||||||
queue* next = QUEUE_NEXT(node);
|
queue* next = QUEUE_NEXT(node);
|
||||||
if (filter(node, arg)) {
|
if (filter(node, arg)) {
|
||||||
QUEUE_REMOVE(node);
|
QUEUE_REMOVE(node);
|
||||||
q->size--;
|
q->size--;
|
||||||
QUEUE_PUSH(&d->q, node);
|
QUEUE_PUSH(d, node);
|
||||||
if (--size == 0) {
|
if (--size == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue