Merge remote-tracking branch 'origin/3.0' into enh/opt-transport

This commit is contained in:
yihaoDeng 2024-10-11 14:35:46 +08:00
parent dc9027bb90
commit a1a421b41b
3 changed files with 89 additions and 1 deletions

View File

@ -282,6 +282,7 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn);
static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key);
static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn);
static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn);
static int32_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn);
// thread obj
static int32_t createThrdObj(void* trans, SCliThrd** pThrd);
@ -322,6 +323,7 @@ void transHeapDestroy(SHeap* heap);
int32_t transHeapGet(SHeap* heap, SCliConn** p);
int32_t transHeapInsert(SHeap* heap, SCliConn* p);
int32_t transHeapDelete(SHeap* heap, SCliConn* p);
int32_t transHeapBalance(SHeap* heap, SCliConn* p);
#define CLI_RELEASE_UV(loop) \
do { \
@ -471,6 +473,11 @@ int32_t cliGetReqBySeq(SCliConn* conn, int64_t seq, int32_t msgType, SCliReq** p
int8_t cliMayRecycleConn(SCliConn* conn) {
int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd;
code = balanceConnHeapCache(pThrd->connHeapCache, conn);
if (code != 0) {
tDebug("%s conn %p failed to balance heap cache", CONN_GET_INST_LABEL(conn), conn);
}
if (transQueueSize(&conn->reqsToSend) == 0 && transQueueSize(&conn->reqsSentOut) == 0 &&
taosHashGetSize(conn->pQTable) == 0) {
code = delConnFromHeapCache(pThrd->connHeapCache, conn);
@ -3780,6 +3787,13 @@ static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
}
return code;
}
static int32_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
if (pConn->heap != NULL) {
return transHeapBalance(pConn->heap, pConn);
}
return 0;
}
// conn heap
int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) {
SCliConn* args1 = container_of(a, SCliConn, node);
@ -3850,3 +3864,15 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p) {
}
return 0;
}
int32_t transHeapBalance(SHeap* heap, SCliConn* p) {
if (p->inHeap == 0) {
return 0;
}
if (heap && heap->heap && heap->heap->nelts >= 64) {
tDebug("conn %p heap busy,heap size:%d", heap->heap->nelts);
}
heapRemove(heap->heap, &p->node);
heapInsert(heap->heap, &p->node);
return 0;
}

View File

@ -126,6 +126,13 @@ add_test(
COMMAND regexTest
)
add_executable(heapTest "heapTest.cpp")
target_link_libraries(heapTest os util gtest_main )
add_test(
NAME heapTest
COMMAND heapTest
)
#add_executable(decompressTest "decompressTest.cpp")
#target_link_libraries(decompressTest os util common gtest_main)
#add_test(
@ -147,4 +154,4 @@ if (${TD_LINUX})
add_custom_command(TARGET terrorTest POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_if_different ${ERR_TBL_FILE} $<TARGET_FILE_DIR:terrorTest>
)
endif ()
endif ()

View File

@ -0,0 +1,55 @@
#include <gtest/gtest.h>
#include "taoserror.h"
#include "theap.h"
using namespace std;
typedef struct TNode {
int32_t data;
HeapNode node;
} TNodeMem;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
int32_t heapCompare(const HeapNode* a, const HeapNode* b) {
TNodeMem *ta = container_of(a, TNodeMem, node);
TNodeMem *tb = container_of(b, TNodeMem, node);
if (ta->data > tb->data) {
return 0;
}
return 1;
}
TEST(TD_UTIL_HEAP_TEST, heapTest) {
Heap* heap = heapCreate(heapCompare);
ASSERT_TRUE(heap != NULL);
ASSERT_EQ(0, heapSize(heap));
int32_t limit = 10;
TNodeMem **pArr = (TNodeMem **)taosMemoryCalloc(100, sizeof(TNodeMem *));
for (int i = 0; i < 100; i++) {
TNodeMem *a = (TNodeMem *)taosMemoryCalloc(1, sizeof(TNodeMem));
a->data = i%limit;
heapInsert(heap, &a->node);
pArr[i] = a;
TNodeMem *b = (TNodeMem *)taosMemoryCalloc(1, sizeof(TNodeMem));
b->data = (limit - i)%limit;
heapInsert(heap, &b->node);
}
for (int i = 98; i < 100; i++) {
TNodeMem *p = pArr[i];
p->data = -100000;
}
HeapNode *node = heapMin(heap);
while (node != NULL) {
TNodeMem *data = container_of(node, TNodeMem, node);
heapRemove(heap, node);
printf("%d\t", data->data);
node = heapMin(heap);
}
heapDestroy(heap);
}