From 4977e2e676f7daca2c4aba3ee8f2d88fbd7f6d8a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 29 Sep 2022 18:35:12 +0800 Subject: [PATCH] opt tbname in --- include/libs/index/index.h | 6 ++++ source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 2 ++ source/libs/executor/src/executil.c | 40 ++++++++++++++++++++++-- source/libs/index/src/index.c | 12 +++++-- source/libs/index/src/indexCache.c | 2 +- source/libs/transport/src/transComm.c | 2 +- 6 files changed, 57 insertions(+), 7 deletions(-) diff --git a/include/libs/index/index.h b/include/libs/index/index.h index c1fdc4df52..a6e6373325 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -225,6 +225,12 @@ typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltS SIdxFltStatus idxGetFltStatus(SNode* pFilterNode); int32_t doFilterTag(SNode* pFilterNode, SIndexMetaArg* metaArg, SArray* result, SIdxFltStatus* status); + +/* + * init index env + * + */ +void indexInit(int32_t threads); /* * destory index env * diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 582b16ce99..2e149be3b6 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -193,6 +193,8 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) { goto _OVER; } + indexInit(tsNumOfCommitThreads); + dmReportStartup("dnode-transport", "initialized"); dDebug("dnode is created, ptr:%p", pDnode); code = 0; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 1ece2f902f..9da48dac86 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -27,6 +27,7 @@ #include "tcompression.h" static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond); +static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond); void initResultRowInfo(SResultRowInfo* pResultRowInfo) { pResultRowInfo->size = 0; @@ -750,7 +751,43 @@ end: return code; } -static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond) { +static int tableUidCompare(const void* a, const void* b) { + uint64_t u1 = *(uint64_t*)a; + uint64_t u2 = *(uint64_t*)b; + if (u1 == u2) { + return 0; + } + return u1 < u2 ? -1 : 1; +} +static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* cond) { + if (nodeType(cond) == QUERY_NODE_OPERATOR) { + return optimizeTbnameInCondImpl(metaHandle, suid, list, cond); + } + + if (nodeType(cond) != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) { + return -1; + } + + SLogicConditionNode* pNode = (SLogicConditionNode*)cond; + SNodeListNode* pList = (SNodeListNode*)pNode->pParameterList; + int32_t len = LIST_LENGTH(pList->pNodeList); + + bool hasTbnameCond = false; + if (len <= 0) return -1; + + SListCell* cell = pList->pNodeList->pHead; + for (int i = 0; i < len; i++) { + if (cell && optimizeTbnameInCondImpl(metaHandle, suid, list, cell->pNode) == 0) { + hasTbnameCond = true; + } + cell = cell->pNext; + } + taosArraySort(list, tableUidCompare); + taosArrayRemoveDuplicate(list, tableUidCompare, NULL); + + return hasTbnameCond == true ? 0 : -1; +} +static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond) { if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) { return -1; } @@ -777,7 +814,6 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list } char* name = varDataVal(valueNode->datum.p); taosArrayPush(pTbList, &name); - cell = cell->pNext; } diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index f507e1b3be..cb671cfff9 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -54,11 +54,17 @@ void* indexQhandle = NULL; int32_t indexRefMgt; +int32_t indexThreads = 5; + static void indexDestroy(void* sIdx); -void indexInit() { +void indexInit(int32_t threadNum) { + indexThreads = threadNum; + if (indexThreads <= 1) indexThreads = INDEX_NUM_OF_THREADS; +} +void indexEnvInit() { // refactor later - indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index", NULL); + indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, indexThreads, "index", NULL); indexRefMgt = taosOpenRef(1000, indexDestroy); } void indexCleanup() { @@ -99,7 +105,7 @@ static void indexWait(void* idx) { int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { int ret = TSDB_CODE_SUCCESS; - taosThreadOnce(&isInit, indexInit); + taosThreadOnce(&isInit, indexEnvInit); SIndex* idx = taosMemoryCalloc(1, sizeof(SIndex)); if (idx == NULL) { return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 39bba4e269..72f693f7e5 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -22,7 +22,7 @@ #define MAX_INDEX_KEY_LEN 256 // test only, change later #define MEM_TERM_LIMIT 10 * 10000 -#define MEM_THRESHOLD 512 * 1024 +#define MEM_THRESHOLD 8 * 512 * 1024 // 8M #define MEM_SIGNAL_QUIT MEM_THRESHOLD * 20 #define MEM_ESTIMATE_RADIO 1.5 diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 5f3171ee0e..ae9f3155b4 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -193,7 +193,7 @@ bool transReadComplete(SConnBuffer* connBuf) { int transSetConnOption(uv_tcp_t* stream) { uv_tcp_nodelay(stream, 1); - int ret = uv_tcp_keepalive(stream, 5, 5); + int ret = uv_tcp_keepalive(stream, 5, 60); return ret; }