add config

This commit is contained in:
yihaoDeng 2024-10-06 09:49:56 +08:00
parent fd6e1c5aab
commit 9d31855ccf
1 changed files with 54 additions and 6 deletions

View File

@ -107,6 +107,10 @@ typedef struct SCliConn {
int32_t readerStart; int32_t readerStart;
queue wq; // uv_write_t queue queue wq; // uv_write_t queue
queue batchSendq;
int8_t inThreadSendq;
} SCliConn; } SCliConn;
typedef struct { typedef struct {
@ -126,6 +130,7 @@ typedef struct SCliReq {
int64_t seq; int64_t seq;
int32_t sent; //(0: no send, 1: alread sent) int32_t sent; //(0: no send, 1: alread sent)
STransMsg msg; STransMsg msg;
int8_t inRetry;
} SCliReq; } SCliReq;
@ -164,6 +169,7 @@ typedef struct SCliThrd {
SHashObj* pIdConnTable; // <qid, conn> SHashObj* pIdConnTable; // <qid, conn>
SArray* pQIdBuf; // tmp buf to avoid alloc buf; SArray* pQIdBuf; // tmp buf to avoid alloc buf;
queue batchSendSet;
} SCliThrd; } SCliThrd;
typedef struct SCliObj { typedef struct SCliObj {
@ -192,7 +198,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn); static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn);
static void cliBatchSendCb(uv_write_t* req, int status); static void cliBatchSendCb(uv_write_t* req, int status);
void cliBatchSendImpl(SCliConn* pConn); void cliBatchSendImpl(SCliConn* pConn);
static int32_t cliBatchSend(SCliConn* conn); static int32_t cliBatchSend(SCliConn* conn, int8_t direct);
void cliConnCheckTimoutMsg(SCliConn* conn); void cliConnCheckTimoutMsg(SCliConn* conn);
bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead); bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead);
// register conn timer // register conn timer
@ -1039,6 +1045,8 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
TAOS_CHECK_GOTO(initWQ(&conn->wq), NULL, _failed); TAOS_CHECK_GOTO(initWQ(&conn->wq), NULL, _failed);
QUEUE_INIT(&conn->batchSendq);
conn->stream->data = conn; conn->stream->data = conn;
conn->connReq.data = conn; conn->connReq.data = conn;
@ -1278,7 +1286,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
} }
if (!cliMayRecycleConn(conn)) { if (!cliMayRecycleConn(conn)) {
code = cliBatchSend(conn); code = cliBatchSend(conn, 1);
if (code != 0) { if (code != 0) {
tDebug("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); tDebug("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
TAOS_UNUSED(transUnrefCliHandle(conn)); TAOS_UNUSED(transUnrefCliHandle(conn));
@ -1307,7 +1315,7 @@ bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msg
pConn->userInited = 1; pConn->userInited = 1;
return true; return true;
} }
int32_t cliBatchSend(SCliConn* pConn) { int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
int32_t code = 0; int32_t code = 0;
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
@ -1319,6 +1327,17 @@ int32_t cliBatchSend(SCliConn* pConn) {
if (pConn->connnected != 1) { if (pConn->connnected != 1) {
return 0; return 0;
} }
if (!direct) {
if (pConn->inThreadSendq) {
return 0;
}
QUEUE_PUSH(&pThrd->batchSendSet, &pConn->batchSendq);
pConn->inThreadSendq = 1;
tDebug("%s conn %p batch send later", pInst->label, pConn);
return 0;
}
int32_t size = transQueueSize(&pConn->reqsToSend); int32_t size = transQueueSize(&pConn->reqsToSend);
int32_t totalLen = 0; int32_t totalLen = 0;
@ -1420,7 +1439,20 @@ int32_t cliBatchSend(SCliConn* pConn) {
int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) { int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) {
transQueuePush(&pConn->reqsToSend, &pCliMsg->q); transQueuePush(&pConn->reqsToSend, &pCliMsg->q);
return cliBatchSend(pConn); return cliBatchSend(pConn, pCliMsg->inRetry);
}
int32_t cliSendReqPrepare(SCliConn* pConn, SCliReq* pCliMsg) {
transQueuePush(&pConn->reqsToSend, &pCliMsg->q);
if (pConn->broken) {
return 0;
}
if (pConn->connnected != 1) {
return 0;
}
// return cliBatchSend(pConn);
return 0;
} }
static void cliDestroyBatch(SCliBatch* pBatch) { static void cliDestroyBatch(SCliBatch* pBatch) {
@ -1577,7 +1609,7 @@ void cliConnCb(uv_connect_t* req, int status) {
} }
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
code = cliBatchSend(pConn); code = cliBatchSend(pConn, 1);
if (code != 0) { if (code != 0) {
tDebug("%s conn %p failed to get sock info since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code)); tDebug("%s conn %p failed to get sock info since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code));
TAOS_UNUSED(transUnrefCliHandle(pConn)); TAOS_UNUSED(transUnrefCliHandle(pConn));
@ -1906,13 +1938,13 @@ void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq) { return cliHandleBatchReq(pTh
static void cliDoReq(queue* wq, SCliThrd* pThrd) { static void cliDoReq(queue* wq, SCliThrd* pThrd) {
int count = 0; int count = 0;
QUEUE_INIT(&pThrd->batchSendSet);
while (!QUEUE_IS_EMPTY(wq)) { while (!QUEUE_IS_EMPTY(wq)) {
queue* h = QUEUE_HEAD(wq); queue* h = QUEUE_HEAD(wq);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); SCliReq* pReq = QUEUE_DATA(h, SCliReq, q);
if (pReq->type == Quit) { if (pReq->type == Quit) {
pThrd->stopMsg = pReq; pThrd->stopMsg = pReq;
continue; continue;
@ -1920,6 +1952,17 @@ static void cliDoReq(queue* wq, SCliThrd* pThrd) {
(*cliAsyncHandle[pReq->type])(pThrd, pReq); (*cliAsyncHandle[pReq->type])(pThrd, pReq);
count++; count++;
} }
while (!QUEUE_IS_EMPTY(&pThrd->batchSendSet)) {
queue* el = QUEUE_HEAD(&pThrd->batchSendSet);
QUEUE_REMOVE(el);
SCliConn* conn = QUEUE_DATA(el, SCliConn, batchSendq);
conn->inThreadSendq = 0;
QUEUE_INIT(&conn->batchSendq);
cliBatchSend(conn, 1);
}
QUEUE_INIT(&pThrd->batchSendSet);
if (count >= 2) { if (count >= 2) {
tTrace("cli process batch size:%d", count); tTrace("cli process batch size:%d", count);
} }
@ -2467,6 +2510,11 @@ FORCE_INLINE int cliRBChoseIdx(STrans* pInst) {
} }
static FORCE_INLINE void doDelayTask(void* param) { static FORCE_INLINE void doDelayTask(void* param) {
STaskArg* arg = param; STaskArg* arg = param;
if (arg && arg->param1) {
SCliReq* pReq = arg->param1;
pReq->inRetry = 1;
}
cliHandleReq((SCliThrd*)arg->param2, (SCliReq*)arg->param1); cliHandleReq((SCliThrd*)arg->param2, (SCliReq*)arg->param1);
taosMemoryFree(arg); taosMemoryFree(arg);
} }