diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 7964a9479d..ea79e3582f 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -496,7 +496,9 @@ int32_t transUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf); enum { REQ_STATUS_INIT = 0, REQ_STATUS_PROCESSING }; -#define BUFFER_LIMIT 4 +#define BUFFER_LIMIT 4 +#define HEAP_MISS_HIT_LIMIT 100000 +#define READ_TIMEOUT 100000 typedef struct { queue node; // queue for write diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c2ce95b651..10df19d3c8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -100,7 +100,8 @@ typedef struct SCliConn { int8_t userInited; void* pInitUserReq; - void* heap; // point to req conn heap + void* heap; // point to req conn heap + int32_t heapMissHit; uv_buf_t* buf; int32_t bufSize; @@ -198,6 +199,8 @@ static int32_t cliBatchSend(SCliConn* conn); bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead); // register conn timer static void cliConnTimeout(uv_timer_t* handle); + +void cliConnTimeout__checkReq(uv_timer_t* handle); // register timer for read static void cliReadTimeoutCb(uv_timer_t* handle); // register timer in each thread to clear expire conn @@ -361,6 +364,15 @@ void cliResetConnTimer(SCliConn* conn) { } } +void cliConnSetReadTimeout(SCliConn* conn, int timeout) { + if (conn->timer == NULL) { + if (cliGetConnTimer(conn->hostThrd, conn) != 0) { + return; + } + } + uv_timer_start(conn->timer, cliConnTimeout__checkReq, timeout, 0); +} + void cliHandleBatchResp(SCliConn* conn) { return; } void destroyCliConnQTable(SCliConn* conn) { @@ -517,9 +529,9 @@ void cliHandleResp(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; - cliConnClearInitUserMsg(conn); - cliResetConnTimer(conn); + + cliConnClearInitUserMsg(conn); SCliReq* pReq = NULL; STransMsgHead* pHead = NULL; @@ -562,9 +574,9 @@ void cliHandleResp(SCliConn* conn) { CONN_GET_INST_LABEL(conn), conn, seq, qId, tstrerror(code)); } if (code != 0) { - tDebug("%s conn %p recv unexpected packet, msgType:%s, seqNum:%" PRId64 ", qId:%" PRId64 - ", the sever sends repeated response,reason:%s", - CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), seq, qId, tstrerror(code)); + tWarn("%s conn %p recv unexpected packet, msgType:%s, seqNum:%" PRId64 ", qId:%" PRId64 + ", the sever may sends repeated response,reason:%s", + CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), seq, qId, tstrerror(code)); // TODO: notify cb if (cliMayRecycleConn(conn)) { return; @@ -575,16 +587,11 @@ void cliHandleResp(SCliConn* conn) { code = cliBuildRespFromCont(pReq, &resp, pHead); STraceId* trace = &resp.info.traceId; - if (code != 0) { - tGDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq); - } else { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%d, qid:%" PRId64 "", CONN_GET_INST_LABEL(conn), - conn, TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq, qId); - } + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%d, qid:%" PRId64 "", CONN_GET_INST_LABEL(conn), + conn, TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq, qId); code = cliNotifyCb(conn, pReq, &resp); if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { - STraceId* trace = &resp.info.traceId; tGWarn("%s msg need retry", CONN_GET_INST_LABEL(conn)); } else { destroyReq(pReq); @@ -592,6 +599,8 @@ void cliHandleResp(SCliConn* conn) { if (cliMayRecycleConn(conn)) { return; } + if (transQueueSize(&conn->reqsSentOut)) cliConnSetReadTimeout(conn, READ_TIMEOUT); + (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); } @@ -607,6 +616,39 @@ void cliConnTimeout(uv_timer_t* handle) { tTrace("%s conn %p conn timeout", CONN_GET_INST_LABEL(conn), conn); } +bool filterToRmTimoutReq(void* key, void* arg) { + SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); + if (pReq->msg.info.qId == 0 && REQUEST_NO_RESP(&pReq->msg)) { + int64_t elapse = ((taosGetTimestampUs() - pReq->st) / 1000); + if (elapse > READ_TIMEOUT) { + return true; + } + return true; + } + return false; +} +void cliConnTimeout__checkReq(uv_timer_t* handle) { + queue set; + QUEUE_INIT(&set); + + SCliConn* conn = handle->data; + SCliThrd* pThrd = conn->hostThrd; + if (transQueueSize(&conn->reqsSentOut) == 0) { + return; + } + + transQueueRemoveByFilter(&conn->reqsSentOut, filterToRmTimoutReq, NULL, &set, -1); + + while (!QUEUE_IS_EMPTY(&set)) { + queue* el = QUEUE_HEAD(&set); + QUEUE_REMOVE(el); + SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); + STraceId* trace = &pReq->msg.info.traceId; + tDebug("%s conn %p req %s timeout, start to free", CONN_GET_INST_LABEL(conn), conn, pReq->msg.msgType); + destroyReqWrapper(pReq, pThrd); + } +} + void* createConnPool(int size) { // thread local, no lock return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); @@ -729,6 +771,8 @@ static void addConnToPool(void* pool, SCliConn* conn) { conn->list->size += 1; tDebug("conn %p added to pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr); + conn->heapMissHit = 0; + if (conn->list->size >= 10) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); if (arg == NULL) return; @@ -980,6 +1024,7 @@ static void cliHandleException(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; + cliResetConnTimer(conn); queue set; QUEUE_INIT(&set); // TODO @@ -1028,7 +1073,7 @@ static void cliHandleException(SCliConn* conn) { } } -bool fileToRmReq(void* h, void* arg) { +bool filterToRmReq(void* h, void* arg) { queue* el = h; SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) { @@ -1040,7 +1085,7 @@ static void cliConnRmReqs(SCliConn* conn) { queue set; QUEUE_INIT(&set); - transQueueRemoveByFilter(&conn->reqsSentOut, fileToRmReq, NULL, &set, -1); + transQueueRemoveByFilter(&conn->reqsSentOut, filterToRmReq, NULL, &set, -1); while (!QUEUE_IS_EMPTY(&set)) { queue* el = QUEUE_HEAD(&set); QUEUE_REMOVE(el); @@ -1070,6 +1115,10 @@ static void cliBatchSendCb(uv_write_t* req, int status) { return; } + cliResetConnTimer(conn); + + if (transQueueSize(&conn->reqsSentOut)) cliConnSetReadTimeout(conn, READ_TIMEOUT); + (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); if (!cliMayRecycleConn(conn)) { @@ -1150,7 +1199,7 @@ int32_t cliBatchSend(SCliConn* pConn) { pHead->version = TRANS_VER; pHead->compatibilityVer = htonl(pInst->compatibilityVer); } - pHead->timestamp = taosHton64(taosGetTimestampUs()); + pHead->timestamp = taosHton64(pCliMsg->st); pHead->seqNum = taosHton64(pConn->seq); pHead->qid = taosHton64(pReq->info.qId); @@ -3084,6 +3133,7 @@ static int32_t getOrCreateHeap(SHashObj* pConnHeapCache, char* key, SHeap** pHea if (code != 0) { transHeapDestroy(&heap); tError("failed to put heap to cache for key:%s, reason: %s", key, tstrerror(code)); + return code; } p = taosHashGet(pConnHeapCache, key, klen); if (p == NULL) { @@ -3103,6 +3153,24 @@ static FORCE_INLINE int8_t shouldSWitchToOtherConn(int32_t reqNum, int32_t sentN return 0; } +static FORCE_INLINE bool filterToDebug(void* e, void* arg) { + SCliReq* pReq = QUEUE_DATA(e, SCliReq, q); + STraceId* trace = &pReq->msg.info.traceId; + tGWarn("%s is sent to, and no resp from server", pReq->msg.msgType); + return false; +} +static FORCE_INLINE int32_t logConnMissHit(SCliConn* pConn) { + queue set; + QUEUE_INIT(&set); + pConn->heapMissHit++; + tDebug("conn %p has %d reqs, %d sentout and %d status in process, total limit:%d, switch to other conn", pConn, + transQueueSize(&pConn->reqsToSend), transQueueSize(&pConn->reqsSentOut), taosHashGetSize(pConn->pQTable), + BUFFER_LIMIT); + if (transQueueSize(&pConn->reqsSentOut) >= BUFFER_LIMIT) { + transQueueRemoveByFilter(&pConn->reqsSentOut, filterToDebug, NULL, &set, 1); + } + return 0; +} static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { int code = 0; SHeap* pHeap = NULL; @@ -3123,11 +3191,11 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { int32_t stateNum = taosHashGetSize(pConn->pQTable); if (shouldSWitchToOtherConn(reqsNum, reqsSentOut, stateNum)) { - tDebug("conn %p has %d reqs, %d sentout and %d status in process, switch to other conn", pConn, reqsNum, - reqsSentOut, stateNum); + logConnMissHit(pConn); return NULL; } } + return pConn; } static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 929f645e7e..060ab2edae 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -482,7 +482,7 @@ void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg) queue* node = QUEUE_NEXT(&q->node); while (node != &q->node) { queue* next = QUEUE_NEXT(node); - if (filter(node, arg)) { + if (filter && filter(node, arg)) { QUEUE_REMOVE(node); q->size--; QUEUE_PUSH(d, node);