opt parameter

This commit is contained in:
yihaoDeng 2024-09-11 22:17:38 +08:00
parent 65a5dad3cb
commit 13b1e5ee4e
3 changed files with 113 additions and 74 deletions

View File

@ -94,6 +94,10 @@ typedef void* queue[2];
/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
typedef struct {
queue q;
} queueWrapper;
// #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
// #define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
#define TRANS_CONN_TIMEOUT 3000 // connect timeout (ms)
@ -364,7 +368,7 @@ void transReqQueueClear(queue* q);
// queue sending msgs
typedef struct {
queue node;
void (*freeFunc)(const void* arg);
void (*freeFunc)(void* arg);
int32_t size;
} STransQueue;

View File

@ -274,7 +274,7 @@ static void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq);
static void cliHandleQuit(SCliThrd* pThrd, SCliReq* pReq);
static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq);
static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq);
static void cliHandleFreeById(SCliThrd* pThrd, SCliReq* pReq);
static void cliHandleFreeById(SCliThrd* pThrd, SCliReq* pReq) { return; }
static void cliDoReq(queue* h, SCliThrd* pThrd);
static void cliDoBatchReq(queue* h, SCliThrd* pThrd);
@ -393,17 +393,30 @@ void cliResetConnTimer(SCliConn* conn) {
void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); }
int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) {
int32_t code = 0;
for (int i = 0; i < transQueueSize(&conn->reqsSentOut); i++) {
SCliReq* p = transQueueGet(&conn->reqsSentOut, i);
if (p->seq == seq) {
transQueueRm(&conn->reqsSentOut, i);
*pReq = p;
return 0;
}
bool filteBySeq(void* key, void* arg) {
int32_t* seq = arg;
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
if (pReq->seq == *seq) {
return true;
} else {
return false;
}
return TSDB_CODE_OUT_OF_RANGE;
}
int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) {
int32_t code = 0;
queueWrapper set;
QUEUE_INIT(&set.q)
transQueueRemoveByFilter(&conn->reqsSentOut, filteBySeq, &seq, &set, 1);
if (QUEUE_IS_EMPTY(&set.q)) {
return TSDB_CODE_OUT_OF_RANGE;
}
queue* e = QUEUE_HEAD(&set.q);
SCliReq* p = QUEUE_DATA(e, SCliReq, q);
*pReq = p;
return 0;
}
int8_t cliMayRecycleConn(SCliConn* conn) {
@ -468,13 +481,13 @@ int32_t cliConnMayHandleState_releaseReq(SCliConn* conn, STransMsgHead* pHead) {
tDebug("%s %p reqToSend:%d, sentOut:%d", CONN_GET_INST_LABEL(conn), conn, transQueueSize(&conn->reqsToSend),
transQueueSize(&conn->reqsSentOut));
queue set;
QUEUE_INIT(&set);
queueWrapper set;
QUEUE_INIT(&set.q);
transQueueRemoveByFilter(&conn->reqsSentOut, filterByQid, &qId, &set, -1);
transQueueRemoveByFilter(&conn->reqsToSend, filterByQid, &qId, &set, -1);
while (!QUEUE_IS_EMPTY(&set)) {
queue* el = QUEUE_HEAD(&set);
while (!QUEUE_IS_EMPTY(&set.q)) {
queue* el = QUEUE_HEAD(&set.q);
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
QUEUE_REMOVE(el);
if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
@ -1105,7 +1118,14 @@ _exception:
// free conn
return code;
}
void cliDestroyMsg(void* arg) {
queue* e = arg;
SCliReq* pReq = QUEUE_DATA(e, SCliReq, q);
if (pReq->msg.info.notFreeAhandle == 0) {
taosMemoryFree(pReq->ctx->ahandle);
}
destroyReq(pReq);
}
static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int32_t port) {
int32_t code = 0;
int32_t lino = 0;
@ -1129,7 +1149,8 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
transReqQueueInit(&conn->wreqQueue);
TAOS_CHECK_GOTO(transQueueInit(&conn->reqsToSend, NULL), NULL, _failed);
TAOS_CHECK_GOTO(transQueueInit(&conn->reqsToSend, cliDestroyMsg), NULL, _failed);
TAOS_CHECK_GOTO(transQueueInit(&conn->reqsSentOut, cliDestroyMsg), NULL, _failed);
TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed);
@ -1287,15 +1308,25 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
transUnrefCliHandle(conn);
}
bool fileToRmReq(void* h, void* arg) {
queue* el = h;
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) {
return true;
}
return false;
}
static void cliConnRmReqs(SCliConn* conn) {
// for (int i = 0; i < transQueueSize(&conn->reqsSentOut); i++) {
// SCliReq* pReq = transQueueGet(&conn->reqsSentOut, i);
// if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) {
// transQueueRm(&conn->reqsToSend, i);
// destroyReq(pReq);
// i--;
// }
// }
queueWrapper set;
QUEUE_INIT(&set.q);
transQueueRemoveByFilter(&conn->reqsSentOut, fileToRmReq, NULL, &set, -1);
while (!QUEUE_IS_EMPTY(&set.q)) {
queue* el = QUEUE_HEAD(&set.q);
QUEUE_REMOVE(el);
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
destroyReq(pReq);
}
return;
}
@ -1382,7 +1413,8 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
STraceId* trace = &pCliMsg->msg.info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d, qid:%ld", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId);
transQueuePush(&pConn->reqsSentOut, pCliMsg->q);
transQueuePush(&pConn->reqsSentOut, &pCliMsg->q);
}
if (j == 0) {
taosMemoryFree(wb);
@ -1399,7 +1431,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) {
int32_t code = 0;
transQueuePush(&pConn->reqsToSend, pCliMsg);
transQueuePush(&pConn->reqsToSend, &pCliMsg->q);
if (pConn->connnected) {
code = cliSend2(pConn);
} else {
@ -1496,11 +1528,11 @@ _exception2:
static void cliHandleFastFail_resp(SCliConn* pConn, int status) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
SCliReq* pReq = transQueueGet(&pConn->reqsToSend, 0);
// //kSCliReq* pReq = transQueueGet(&pConn->reqsToSend, 0);
STraceId* trace = &pReq->msg.info.traceId;
tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
TMSG_INFO(pReq->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status));
// STraceId* trace = &pReq->msg.info.traceId;
// tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
// TMSG_INFO(pReq->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status));
}
static void cliHandleFastFail_noresp(SCliConn* pConn, int status) {
@ -1616,31 +1648,32 @@ static void cliHandleQuit(SCliThrd* pThrd, SCliReq* pReq) {
(void)uv_walk(pThrd->loop, cliWalkCb, NULL);
}
static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) {
int64_t refId = (int64_t)(pReq->msg.info.handle);
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) {
tDebug("%" PRId64 " already released", refId);
destroyReq(pReq);
return;
}
return;
// int64_t refId = (int64_t)(pReq->msg.info.handle);
// SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
// if (exh == NULL) {
// tDebug("%" PRId64 " already released", refId);
// destroyReq(pReq);
// return;
// }
taosRLockLatch(&exh->latch);
SCliConn* conn = exh->handle;
taosRUnLockLatch(&exh->latch);
// taosRLockLatch(&exh->latch);
// SCliConn* conn = exh->handle;
// taosRUnLockLatch(&exh->latch);
(void)transReleaseExHandle(transGetRefMgt(), refId);
tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
// (void)transReleaseExHandle(transGetRefMgt(), refId);
// tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
if (TRANS_CONN_REF_GET(conn) == 2) {
transUnrefCliHandle(conn);
if (!transQueuePush(&conn->reqsToSend, pReq)) {
return;
}
(void)cliSend2(conn);
} else {
tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
destroyReq(pReq);
}
// if (TRANS_CONN_REF_GET(conn) == 2) {
// transUnrefCliHandle(conn);
// if (!transQueuePush(&conn->reqsToSend, &pReq->q)) {
// return;
// }
// (void)cliSend2(conn);
// } else {
// tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
// destroyReq(pReq);
// }
}
static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) { return; }
@ -2111,31 +2144,32 @@ static void cliAsyncCb(uv_async_t* handle) {
}
void cliDestroyConnMsgs(SCliConn* conn, bool destroy) {
transCtxCleanup(&conn->ctx);
// cliReleaseUnfinishedMsg(conn);
// transCtxCleanup(&conn->ctx);
// cliReleaseUnfinishedMsg(conn);
if (destroy == 1) {
transQueueDestroy(&conn->reqsToSend);
} else {
transQueueClear(&conn->reqsToSend);
}
transQueueDestroy(&conn->reqsSentOut);
}
void cliConnFreeMsgs(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst;
for (int i = 0; i < transQueueSize(&conn->reqsToSend); i++) {
SCliReq* cmsg = transQueueGet(&conn->reqsToSend, i);
if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) {
continue;
}
// for (int i = 0; i < transQueueSize(&conn->reqsToSend); i++) {
// SCliReq* cmsg = transQueueGet(&conn->reqsToSend, i);
// if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) {
// continue;
// }
if (cliBuildExceptRespAndNotifyCb(pThrd, cmsg, 0) != 0) {
continue;
}
// if (cliBuildExceptRespAndNotifyCb(pThrd, cmsg, 0) != 0) {
// continue;
// }
cmsg->ctx->ahandle = NULL;
}
// cmsg->ctx->ahandle = NULL;
// }
}
static FORCE_INLINE void destroyReq(void* arg) {

View File

@ -439,7 +439,7 @@ void transReqQueueClear(queue* q) {
int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) {
QUEUE_INIT(&wq->node);
wq->freeFunc = (void (*)(const void*))freeFunc;
wq->freeFunc = (void (*)(void*))freeFunc;
wq->size = 0;
return 0;
}
@ -453,9 +453,10 @@ int32_t transQueuePush(STransQueue* q, void* arg) {
void* transQueuePop(STransQueue* q) {
if (q->size == 0) return NULL;
queue* tail = QUEUE_HEAD(&q->node);
QUEUE_REMOVE(tail);
return tail;
queue* head = QUEUE_HEAD(&q->node);
QUEUE_REMOVE(head);
q->size--;
return head;
}
int32_t transQueueSize(STransQueue* q) { return q->size; }
@ -470,14 +471,14 @@ void* transQueueGet(STransQueue* q, int idx) {
}
void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg), void* arg, void* dst, int32_t size) {
queue* d = dst;
queue* node = QUEUE_NEXT(&q->node);
queueWrapper* d = dst;
queue* node = QUEUE_NEXT(&q->node);
while (node != &q->node) {
queue* next = QUEUE_NEXT(node);
if (filter(node, arg)) {
QUEUE_REMOVE(node);
q->size--;
QUEUE_PUSH(d, node);
QUEUE_PUSH(&d->q, node);
if (--size == 0) {
break;
}
@ -516,7 +517,7 @@ void transQueueRemove(STransQueue* q, void* e) {
bool transQueueEmpty(STransQueue* q) { return q->size == 0 ? true : false; }
void transQueueClear(STransQueue* q) {
while (!QUEUE_IS_EMPTY(q->node)) {
while (!QUEUE_IS_EMPTY(&q->node)) {
queue* h = QUEUE_HEAD(&q->node);
QUEUE_REMOVE(h);
if (q->freeFunc != NULL) (q->freeFunc)(h);