diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b9bec12d14..4426eb0d44 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -282,6 +282,7 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key); static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn); static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn); +static int32_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn); // thread obj static int32_t createThrdObj(void* trans, SCliThrd** pThrd); @@ -322,6 +323,7 @@ void transHeapDestroy(SHeap* heap); int32_t transHeapGet(SHeap* heap, SCliConn** p); int32_t transHeapInsert(SHeap* heap, SCliConn* p); int32_t transHeapDelete(SHeap* heap, SCliConn* p); +int32_t transHeapBalance(SHeap* heap, SCliConn* p); #define CLI_RELEASE_UV(loop) \ do { \ @@ -471,6 +473,11 @@ int32_t cliGetReqBySeq(SCliConn* conn, int64_t seq, int32_t msgType, SCliReq** p int8_t cliMayRecycleConn(SCliConn* conn) { int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; + + code = balanceConnHeapCache(pThrd->connHeapCache, conn); + if (code != 0) { + tDebug("%s conn %p failed to balance heap cache", CONN_GET_INST_LABEL(conn), conn); + } if (transQueueSize(&conn->reqsToSend) == 0 && transQueueSize(&conn->reqsSentOut) == 0 && taosHashGetSize(conn->pQTable) == 0) { code = delConnFromHeapCache(pThrd->connHeapCache, conn); @@ -3780,6 +3787,13 @@ static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { } return code; } + +static int32_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { + if (pConn->heap != NULL) { + return transHeapBalance(pConn->heap, pConn); + } + return 0; +} // conn heap int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) { SCliConn* args1 = container_of(a, SCliConn, node); @@ -3850,3 +3864,15 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p) { } return 0; } + +int32_t transHeapBalance(SHeap* heap, SCliConn* p) { + if (p->inHeap == 0) { + return 0; + } + if (heap && heap->heap && heap->heap->nelts >= 64) { + tDebug("conn %p heap busy,heap size:%d", heap->heap->nelts); + } + heapRemove(heap->heap, &p->node); + heapInsert(heap->heap, &p->node); + return 0; +} diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index 0d8774ba41..e9b54934a3 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -126,6 +126,13 @@ add_test( COMMAND regexTest ) +add_executable(heapTest "heapTest.cpp") +target_link_libraries(heapTest os util gtest_main ) +add_test( + NAME heapTest + COMMAND heapTest +) + #add_executable(decompressTest "decompressTest.cpp") #target_link_libraries(decompressTest os util common gtest_main) #add_test( @@ -147,4 +154,4 @@ if (${TD_LINUX}) add_custom_command(TARGET terrorTest POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_if_different ${ERR_TBL_FILE} $ ) -endif () \ No newline at end of file +endif () diff --git a/source/util/test/heapTest.cpp b/source/util/test/heapTest.cpp new file mode 100644 index 0000000000..51eeb26ed3 --- /dev/null +++ b/source/util/test/heapTest.cpp @@ -0,0 +1,55 @@ +#include + +#include "taoserror.h" +#include "theap.h" + +using namespace std; + +typedef struct TNode { + int32_t data; + HeapNode node; +} TNodeMem; + +#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) +int32_t heapCompare(const HeapNode* a, const HeapNode* b) { + TNodeMem *ta = container_of(a, TNodeMem, node); + TNodeMem *tb = container_of(b, TNodeMem, node); + if (ta->data > tb->data) { + return 0; + } + return 1; +} + +TEST(TD_UTIL_HEAP_TEST, heapTest) { + Heap* heap = heapCreate(heapCompare); + ASSERT_TRUE(heap != NULL); + ASSERT_EQ(0, heapSize(heap)); + + + int32_t limit = 10; + + TNodeMem **pArr = (TNodeMem **)taosMemoryCalloc(100, sizeof(TNodeMem *)); + for (int i = 0; i < 100; i++) { + TNodeMem *a = (TNodeMem *)taosMemoryCalloc(1, sizeof(TNodeMem)); + a->data = i%limit; + + heapInsert(heap, &a->node); + + pArr[i] = a; + TNodeMem *b = (TNodeMem *)taosMemoryCalloc(1, sizeof(TNodeMem)); + b->data = (limit - i)%limit; + heapInsert(heap, &b->node); + } + for (int i = 98; i < 100; i++) { + TNodeMem *p = pArr[i]; + p->data = -100000; + } + HeapNode *node = heapMin(heap); + while (node != NULL) { + TNodeMem *data = container_of(node, TNodeMem, node); + heapRemove(heap, node); + printf("%d\t", data->data); + node = heapMin(heap); + } + heapDestroy(heap); +}