Merge branch '3.0' into enh/opt-transport

This commit is contained in:
yihaoDeng 2024-09-16 13:17:39 +08:00
parent b5c1b672a4
commit 9ecb9b23e7
3 changed files with 90 additions and 20 deletions

View File

@ -496,7 +496,9 @@ int32_t transUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf);
enum { REQ_STATUS_INIT = 0, REQ_STATUS_PROCESSING };
#define BUFFER_LIMIT 4
#define BUFFER_LIMIT 4
#define HEAP_MISS_HIT_LIMIT 100000
#define READ_TIMEOUT 100000
typedef struct {
queue node; // queue for write

View File

@ -100,7 +100,8 @@ typedef struct SCliConn {
int8_t userInited;
void* pInitUserReq;
void* heap; // point to req conn heap
void* heap; // point to req conn heap
int32_t heapMissHit;
uv_buf_t* buf;
int32_t bufSize;
@ -198,6 +199,8 @@ static int32_t cliBatchSend(SCliConn* conn);
bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead);
// register conn timer
static void cliConnTimeout(uv_timer_t* handle);
void cliConnTimeout__checkReq(uv_timer_t* handle);
// register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle);
// register timer in each thread to clear expire conn
@ -361,6 +364,15 @@ void cliResetConnTimer(SCliConn* conn) {
}
}
void cliConnSetReadTimeout(SCliConn* conn, int timeout) {
if (conn->timer == NULL) {
if (cliGetConnTimer(conn->hostThrd, conn) != 0) {
return;
}
}
uv_timer_start(conn->timer, cliConnTimeout__checkReq, timeout, 0);
}
void cliHandleBatchResp(SCliConn* conn) { return; }
void destroyCliConnQTable(SCliConn* conn) {
@ -517,9 +529,9 @@ void cliHandleResp(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst;
cliConnClearInitUserMsg(conn);
cliResetConnTimer(conn);
cliConnClearInitUserMsg(conn);
SCliReq* pReq = NULL;
STransMsgHead* pHead = NULL;
@ -562,9 +574,9 @@ void cliHandleResp(SCliConn* conn) {
CONN_GET_INST_LABEL(conn), conn, seq, qId, tstrerror(code));
}
if (code != 0) {
tDebug("%s conn %p recv unexpected packet, msgType:%s, seqNum:%" PRId64 ", qId:%" PRId64
", the sever sends repeated response,reason:%s",
CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), seq, qId, tstrerror(code));
tWarn("%s conn %p recv unexpected packet, msgType:%s, seqNum:%" PRId64 ", qId:%" PRId64
", the sever may sends repeated response,reason:%s",
CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), seq, qId, tstrerror(code));
// TODO: notify cb
if (cliMayRecycleConn(conn)) {
return;
@ -575,16 +587,11 @@ void cliHandleResp(SCliConn* conn) {
code = cliBuildRespFromCont(pReq, &resp, pHead);
STraceId* trace = &resp.info.traceId;
if (code != 0) {
tGDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq);
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%d, qid:%" PRId64 "", CONN_GET_INST_LABEL(conn),
conn, TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq, qId);
}
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%d, qid:%" PRId64 "", CONN_GET_INST_LABEL(conn),
conn, TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq, qId);
code = cliNotifyCb(conn, pReq, &resp);
if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
STraceId* trace = &resp.info.traceId;
tGWarn("%s msg need retry", CONN_GET_INST_LABEL(conn));
} else {
destroyReq(pReq);
@ -592,6 +599,8 @@ void cliHandleResp(SCliConn* conn) {
if (cliMayRecycleConn(conn)) {
return;
}
if (transQueueSize(&conn->reqsSentOut)) cliConnSetReadTimeout(conn, READ_TIMEOUT);
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
}
@ -607,6 +616,39 @@ void cliConnTimeout(uv_timer_t* handle) {
tTrace("%s conn %p conn timeout", CONN_GET_INST_LABEL(conn), conn);
}
bool filterToRmTimoutReq(void* key, void* arg) {
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
if (pReq->msg.info.qId == 0 && REQUEST_NO_RESP(&pReq->msg)) {
int64_t elapse = ((taosGetTimestampUs() - pReq->st) / 1000);
if (elapse > READ_TIMEOUT) {
return true;
}
return true;
}
return false;
}
void cliConnTimeout__checkReq(uv_timer_t* handle) {
queue set;
QUEUE_INIT(&set);
SCliConn* conn = handle->data;
SCliThrd* pThrd = conn->hostThrd;
if (transQueueSize(&conn->reqsSentOut) == 0) {
return;
}
transQueueRemoveByFilter(&conn->reqsSentOut, filterToRmTimoutReq, NULL, &set, -1);
while (!QUEUE_IS_EMPTY(&set)) {
queue* el = QUEUE_HEAD(&set);
QUEUE_REMOVE(el);
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
STraceId* trace = &pReq->msg.info.traceId;
tDebug("%s conn %p req %s timeout, start to free", CONN_GET_INST_LABEL(conn), conn, pReq->msg.msgType);
destroyReqWrapper(pReq, pThrd);
}
}
void* createConnPool(int size) {
// thread local, no lock
return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
@ -729,6 +771,8 @@ static void addConnToPool(void* pool, SCliConn* conn) {
conn->list->size += 1;
tDebug("conn %p added to pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr);
conn->heapMissHit = 0;
if (conn->list->size >= 10) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
if (arg == NULL) return;
@ -980,6 +1024,7 @@ static void cliHandleException(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst;
cliResetConnTimer(conn);
queue set;
QUEUE_INIT(&set);
// TODO
@ -1028,7 +1073,7 @@ static void cliHandleException(SCliConn* conn) {
}
}
bool fileToRmReq(void* h, void* arg) {
bool filterToRmReq(void* h, void* arg) {
queue* el = h;
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) {
@ -1040,7 +1085,7 @@ static void cliConnRmReqs(SCliConn* conn) {
queue set;
QUEUE_INIT(&set);
transQueueRemoveByFilter(&conn->reqsSentOut, fileToRmReq, NULL, &set, -1);
transQueueRemoveByFilter(&conn->reqsSentOut, filterToRmReq, NULL, &set, -1);
while (!QUEUE_IS_EMPTY(&set)) {
queue* el = QUEUE_HEAD(&set);
QUEUE_REMOVE(el);
@ -1070,6 +1115,10 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
return;
}
cliResetConnTimer(conn);
if (transQueueSize(&conn->reqsSentOut)) cliConnSetReadTimeout(conn, READ_TIMEOUT);
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
if (!cliMayRecycleConn(conn)) {
@ -1150,7 +1199,7 @@ int32_t cliBatchSend(SCliConn* pConn) {
pHead->version = TRANS_VER;
pHead->compatibilityVer = htonl(pInst->compatibilityVer);
}
pHead->timestamp = taosHton64(taosGetTimestampUs());
pHead->timestamp = taosHton64(pCliMsg->st);
pHead->seqNum = taosHton64(pConn->seq);
pHead->qid = taosHton64(pReq->info.qId);
@ -3084,6 +3133,7 @@ static int32_t getOrCreateHeap(SHashObj* pConnHeapCache, char* key, SHeap** pHea
if (code != 0) {
transHeapDestroy(&heap);
tError("failed to put heap to cache for key:%s, reason: %s", key, tstrerror(code));
return code;
}
p = taosHashGet(pConnHeapCache, key, klen);
if (p == NULL) {
@ -3103,6 +3153,24 @@ static FORCE_INLINE int8_t shouldSWitchToOtherConn(int32_t reqNum, int32_t sentN
return 0;
}
static FORCE_INLINE bool filterToDebug(void* e, void* arg) {
SCliReq* pReq = QUEUE_DATA(e, SCliReq, q);
STraceId* trace = &pReq->msg.info.traceId;
tGWarn("%s is sent to, and no resp from server", pReq->msg.msgType);
return false;
}
static FORCE_INLINE int32_t logConnMissHit(SCliConn* pConn) {
queue set;
QUEUE_INIT(&set);
pConn->heapMissHit++;
tDebug("conn %p has %d reqs, %d sentout and %d status in process, total limit:%d, switch to other conn", pConn,
transQueueSize(&pConn->reqsToSend), transQueueSize(&pConn->reqsSentOut), taosHashGetSize(pConn->pQTable),
BUFFER_LIMIT);
if (transQueueSize(&pConn->reqsSentOut) >= BUFFER_LIMIT) {
transQueueRemoveByFilter(&pConn->reqsSentOut, filterToDebug, NULL, &set, 1);
}
return 0;
}
static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
int code = 0;
SHeap* pHeap = NULL;
@ -3123,11 +3191,11 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
int32_t stateNum = taosHashGetSize(pConn->pQTable);
if (shouldSWitchToOtherConn(reqsNum, reqsSentOut, stateNum)) {
tDebug("conn %p has %d reqs, %d sentout and %d status in process, switch to other conn", pConn, reqsNum,
reqsSentOut, stateNum);
logConnMissHit(pConn);
return NULL;
}
}
return pConn;
}
static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {

View File

@ -482,7 +482,7 @@ void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg)
queue* node = QUEUE_NEXT(&q->node);
while (node != &q->node) {
queue* next = QUEUE_NEXT(node);
if (filter(node, arg)) {
if (filter && filter(node, arg)) {
QUEUE_REMOVE(node);
q->size--;
QUEUE_PUSH(d, node);