From 41b8693430ec02f49083ab35efdc5e4f0f2fb60e Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Wed, 10 Jul 2024 19:46:09 +0800 Subject: [PATCH 01/17] use regex cache --- include/util/taoserror.h | 1 + include/util/tcompare.h | 3 +- source/dnode/mgmt/node_mgmt/src/dmEnv.c | 3 + source/libs/parser/src/parUtil.c | 4 +- source/libs/scalar/src/sclvector.c | 2 + source/util/src/tcompare.c | 138 +++++++++++++++++++----- source/util/src/terror.c | 1 + source/util/src/tworker.c | 3 - 8 files changed, 125 insertions(+), 30 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 24f9d041fc..8efd67f745 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -836,6 +836,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_PAR_TBNAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267E) #define TSDB_CODE_PAR_TAG_NAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267F) #define TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC TAOS_DEF_ERROR_CODE(0, 0x2680) +#define TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR TAOS_DEF_ERROR_CODE(0, 0x2681) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/include/util/tcompare.h b/include/util/tcompare.h index 9694bee92d..4f574a9b79 100644 --- a/include/util/tcompare.h +++ b/include/util/tcompare.h @@ -45,6 +45,8 @@ typedef struct SPatternCompareInfo { TdUcs4 umatchOne; // unicode version matchOne } SPatternCompareInfo; +int32_t InitRegexCache(); +void DestroyRegexCache(); int32_t patternMatch(const char *pattern, size_t psize, const char *str, size_t ssize, const SPatternCompareInfo *pInfo); int32_t wcsPatternMatch(const TdUcs4 *pattern, size_t psize, const TdUcs4 *str, size_t ssize, const SPatternCompareInfo *pInfo); @@ -83,7 +85,6 @@ int32_t compareLenBinaryVal(const void *pLeft, const void *pRight); int32_t comparestrRegexMatch(const void *pLeft, const void *pRight); int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight); -void DestoryThreadLocalRegComp(); int32_t comparewcsRegexMatch(const void *pLeft, const void *pRight); int32_t comparewcsRegexNMatch(const void *pLeft, const void *pRight); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 4be1af30b5..46f9965d1a 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -18,6 +18,7 @@ #include "audit.h" #include "libs/function/tudf.h" #include "tgrant.h" +#include "tcompare.h" #define DM_INIT_AUDIT() \ do { \ @@ -163,6 +164,7 @@ int32_t dmInit() { if (dmInitMonitor() != 0) return -1; if (dmInitAudit() != 0) return -1; if (dmInitDnode(dmInstance()) != 0) return -1; + if (InitRegexCache() != 0) return -1; #if defined(USE_S3) if (s3Begin() != 0) return -1; #endif @@ -192,6 +194,7 @@ void dmCleanup() { udfStopUdfd(); taosStopCacheRefreshWorker(); dmDiskClose(); + DestroyRegexCache(); #if defined(USE_S3) s3End(); diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index d67c7d306f..e6b6bcc903 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -223,7 +223,9 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "Tag name:%s duplicated"; case TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC: return "Some functions cannot appear in the select list at the same time"; - default: + case TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR: + return "Syntax error in regular expression"; + default: return "Unknown error"; } } diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index c5789a65ca..673919b2f5 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -1667,6 +1667,7 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa } } else { for (int32_t i = startIndex; i < numOfRows && i >= 0; i += step) { + if (terrno != TSDB_CODE_SUCCESS) break; int32_t leftIndex = (i >= pLeft->numOfRows) ? 0 : i; int32_t rightIndex = (i >= pRight->numOfRows) ? 0 : i; @@ -1688,6 +1689,7 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa } else { // if (GET_PARAM_TYPE(pLeft) == TSDB_DATA_TYPE_JSON || GET_PARAM_TYPE(pRight) == TSDB_DATA_TYPE_JSON) { for (int32_t i = startIndex; i < numOfRows && i >= startIndex; i += step) { + if (terrno != TSDB_CODE_SUCCESS) break; int32_t leftIndex = (i >= pLeft->numOfRows) ? 0 : i; int32_t rightIndex = (i >= pRight->numOfRows) ? 0 : i; diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index 26122a4a29..7f14f3a1e0 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1203,54 +1203,142 @@ int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight) { return comparestrRegexMatch(pLeft, pRight) ? 0 : 1; } -static threadlocal regex_t pRegex; -static threadlocal char *pOldPattern = NULL; -static regex_t *threadGetRegComp(const char *pPattern) { - if (NULL != pOldPattern) { - if( strcmp(pOldPattern, pPattern) == 0) { - return &pRegex; - } else { - DestoryThreadLocalRegComp(); +typedef struct UsingRegex { + regex_t pRegex; + int32_t usingCount; + int32_t lastUsedTime; +} UsingRegex; + +typedef struct RegexCache { + SHashObj *regexHash; + int32_t regexCaheSize; + TdThreadRwlock regexLock; + int32_t lastClearTime; +} RegexCache; +static RegexCache sRegexCache; +#define MAX_REGEX_CACHE_SIZE 20 +#define REGEX_CACHE_CLEAR_TIME 30 + +int32_t InitRegexCache() { + if (taosThreadRwlockInit(&sRegexCache.regexLock, NULL) != 0) { + uError("failed to create RegexCache lock"); + return -1; + } + sRegexCache.regexHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (sRegexCache.regexHash == NULL) { + uError("failed to create RegexCache"); + return -1; + } + sRegexCache.regexCaheSize = MAX_REGEX_CACHE_SIZE; + sRegexCache.lastClearTime = taosGetTimestampSec(); + return 0; +} + +void DestroyRegexCache(){ + UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL); + while ((ppUsingRegex != NULL)) { + regfree(&(*ppUsingRegex)->pRegex); + taosMemoryFree(*ppUsingRegex); + ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); + } + taosHashCleanup(sRegexCache.regexHash); + taosThreadRwlockDestroy(&sRegexCache.regexLock); +} + +static void clearOlderRegex() { + if (taosGetTimestampSec() - sRegexCache.lastClearTime < REGEX_CACHE_CLEAR_TIME || + taosHashGetSize(sRegexCache.regexHash) < sRegexCache.regexCaheSize) { + return; + } + taosThreadRwlockWrlock(&sRegexCache.regexLock); + if (taosHashGetSize(sRegexCache.regexHash) >= sRegexCache.regexCaheSize) { + UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL); + while ((ppUsingRegex != NULL)) { + if ((*ppUsingRegex)->usingCount == 0 && + taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) { + regfree(&(*ppUsingRegex)->pRegex); + taosMemoryFree(*ppUsingRegex); + taosHashRelease(sRegexCache.regexHash, ppUsingRegex); + sRegexCache.lastClearTime = taosGetTimestampSec(); + } + ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); } } - pOldPattern = taosMemoryMalloc(strlen(pPattern) + 1); - if (NULL == pOldPattern) { + taosThreadRwlockUnlock(&sRegexCache.regexLock); +} + +static UsingRegex *getRegComp(const char *pPattern) { + taosThreadRwlockRdlock(&sRegexCache.regexLock); + UsingRegex **ppUsingRegex = (UsingRegex **)taosHashGet(sRegexCache.regexHash, pPattern, strlen(pPattern)); + if (ppUsingRegex != NULL) { + (*ppUsingRegex)->usingCount++; + taosThreadRwlockUnlock(&sRegexCache.regexLock); + return *ppUsingRegex; + } + taosThreadRwlockUnlock(&sRegexCache.regexLock); + + UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex)); + if (pUsingRegex == NULL) { uError("Failed to Malloc when compile regex pattern %s.", pPattern); return NULL; } - strcpy(pOldPattern, pPattern); int32_t cflags = REG_EXTENDED; - int32_t ret = regcomp(&pRegex, pPattern, cflags); + int32_t ret = regcomp(&pUsingRegex->pRegex, pPattern, cflags); if (ret != 0) { char msgbuf[256] = {0}; - regerror(ret, &pRegex, msgbuf, tListLen(msgbuf)); + regerror(ret, &pUsingRegex->pRegex, msgbuf, tListLen(msgbuf)); uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf); - DestoryThreadLocalRegComp(); + taosMemoryFree(pUsingRegex); + terrno = TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR; return NULL; } - return &pRegex; + + taosThreadRwlockWrlock(&sRegexCache.regexLock); + int code = taosHashPut(sRegexCache.regexHash, pPattern, strlen(pPattern), &pUsingRegex, sizeof(UsingRegex *)); + if (code != 0) { + if( terrno == TSDB_CODE_DUP_KEY) { + regfree(&pUsingRegex->pRegex); + taosMemoryFree(pUsingRegex); + + UsingRegex **ppUsingRegex = (UsingRegex **)taosHashGet(sRegexCache.regexHash, pPattern, strlen(pPattern)); + if(ppUsingRegex) { + pUsingRegex = (*ppUsingRegex); + } else { + uError("Failed to get regex pattern %s from cache, exception internal error.", pPattern); + taosThreadRwlockUnlock(&sRegexCache.regexLock); + return NULL; + } + } else { + uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern); + taosThreadRwlockUnlock(&sRegexCache.regexLock); + return NULL; + } + } + atomic_add_fetch_32(&pUsingRegex->usingCount, 1); + taosThreadRwlockUnlock(&sRegexCache.regexLock); + + clearOlderRegex(); + return pUsingRegex; } -void DestoryThreadLocalRegComp() { - if (NULL != pOldPattern) { - regfree(&pRegex); - taosMemoryFree(pOldPattern); - pOldPattern = NULL; - } +void recycleRegex(UsingRegex *regex){ + atomic_add_fetch_32(®ex->usingCount, -1); + regex->lastUsedTime = taosGetTimestampSec(); } static int32_t doExecRegexMatch(const char *pString, const char *pPattern) { int32_t ret = 0; char msgbuf[256] = {0}; - regex_t *regex = threadGetRegComp(pPattern); - if (regex == NULL) { + UsingRegex *pUsingRegex = getRegComp(pPattern); + if (pUsingRegex == NULL) { return 1; } regmatch_t pmatch[1]; - ret = regexec(regex, pString, 1, pmatch, 0); + ret = regexec(&pUsingRegex->pRegex, pString, 1, pmatch, 0); + recycleRegex(pUsingRegex); if (ret != 0 && ret != REG_NOMATCH) { - regerror(ret, regex, msgbuf, sizeof(msgbuf)); + regerror(ret, &pUsingRegex->pRegex, msgbuf, sizeof(msgbuf)); uDebug("Failed to match %s with pattern %s, reason %s", pString, pPattern, msgbuf) } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c7fd6f65c5..c3f3a07b86 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -683,6 +683,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_ERROR, "Pseudo tag tbname n TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_DUPLICATED, "Table name duplicated") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_NAME_DUPLICATED, "Tag name duplicated") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC, "Some functions cannot appear in the select list at the same time") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") //planner diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 4a8a0823b7..7a97dc3527 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -104,7 +104,6 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) { } destroyThreadLocalGeosCtx(); - DestoryThreadLocalRegComp(); return NULL; } @@ -224,7 +223,6 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) { taosUpdateItemSize(qinfo.queue, 1); } - DestoryThreadLocalRegComp(); return NULL; } @@ -636,7 +634,6 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { } destroyThreadLocalGeosCtx(); - DestoryThreadLocalRegComp(); return NULL; } From d0ade067b26207aeda72a9ee56c01463b78d9435 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 11 Jul 2024 09:12:41 +0800 Subject: [PATCH 02/17] init regex cache in unit test --- source/libs/scalar/test/scalar/scalarTests.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index fe86e18ce3..dd88344962 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -43,6 +43,7 @@ #include "tglobal.h" #include "tlog.h" #include "tvariant.h" +#include "tcompare.h" #define _DEBUG_PRINT_ 0 @@ -52,6 +53,12 @@ #define PRINTF(...) #endif +class constantTest { + public: + constantTest() { InitRegexCache(); } + ~constantTest() { DestroyRegexCache(); } +}; +static constantTest test; namespace { SColumnInfo createColumnInfo(int32_t colId, int32_t type, int32_t bytes) { From e0f9dc1dce6b5da21f89e1a096462e4a17b40c3f Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 11 Jul 2024 11:23:13 +0800 Subject: [PATCH 03/17] add test case --- tests/parallel_test/cases.task | 4 + tests/system-test/2-query/match.py | 149 +++++++++++++++++++++++++++++ tests/system-test/win-test-file | 4 + 3 files changed, 157 insertions(+) create mode 100644 tests/system-test/2-query/match.py diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 4338187791..e9daf4f0ca 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -192,6 +192,10 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py -Q 3 diff --git a/tests/system-test/2-query/match.py b/tests/system-test/2-query/match.py new file mode 100644 index 0000000000..2e47f079e2 --- /dev/null +++ b/tests/system-test/2-query/match.py @@ -0,0 +1,149 @@ +import taos +import sys +import datetime +import inspect +import threading +import time + +from util.log import * +from util.sql import * +from util.cases import * +from util.common import tdCom + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + + def initConnection(self): + self.records = 10000000 + self.numOfTherads = 50 + self.ts = 1537146000000 + self.host = "127.0.0.1" + self.user = "root" + self.password = "taosdata" + self.config = "/home/xp/git/TDengine/sim/dnode1/cfg" + self.conn = taos.connect( + self.host, + self.user, + self.password, + self.config) + + def initDB(self): + tdSql.execute("drop database if exists db") + tdSql.execute("create database if not exists db") + + def stopTest(self): + tdSql.execute("drop database if exists db") + + def insertData(self, threadID): + cursor = self.conn.cursor() + print("Thread %d: starting" % threadID) + base = 200000 * threadID + for i in range(200): + query = "insert into tb values" + for j in range(1000): + query += "(%d, %d, 'test')" % (self.ts + base + i * 1000 + j, base + i * 1000 + j) + cursor.execute(query) + cursor.close() + print("Thread %d: finishing" % threadID) + + def threadTest(self, threadID): + print(f"Thread {threadID} starting...") + tdsqln = tdCom.newTdSql() + for i in range(2, 50): + tdsqln.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}dx'") + tdsqln.checkRows(0) + for i in range(100): + tdsqln.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*1x'") + tdsqln.checkRows(2) + + tdsqln.query("select * from db.t1x") + tdsqln.checkRows(5) + + tdsqln.query("select * from db.t1x where c1 match '_c'") + tdsqln.checkRows(2) + + tdsqln.query("select * from db.t1x where c1 match '%__c'") + tdsqln.checkRows(0) + + print(f"Thread {threadID} finished.") + + def match_test(self): + tdSql.execute("create table db.t1x (ts timestamp, c1 varchar(100))") + tdSql.execute("create table db.t_1x (ts timestamp, c1 varchar(100))") + + tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*1x'") + tdSql.checkRows(2) + for i in range(2, 50): + tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}x'") + tdSql.checkRows(0) + + tdSql.query("insert into db.t1x values(now, 'abc'), (now+1s, 'a%c'),(now+2s, 'a_c'),(now+3s, '_c'),(now+4s, '%c')") + + tdSql.query("select * from db.t1x") + tdSql.checkRows(5) + + tdSql.query("select * from db.t1x where c1 match '_c'") + tdSql.checkRows(2) + + tdSql.query("select * from db.t1x where c1 match '%__c'") + tdSql.checkRows(0) + + threads = [] + for i in range(10): + t = threading.Thread(target=self.threadTest, args=(i,)) + threads.append(t) + t.start() + + time.sleep(31) + + tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*1x'") + tdSql.checkRows(2) + for i in range(2, 50): + tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}x'") + tdSql.checkRows(0) + + tdSql.query("select * from db.t1x") + tdSql.checkRows(5) + + tdSql.query("select * from db.t1x where c1 match '_c'") + tdSql.checkRows(2) + + tdSql.query("select * from db.t1x where c1 match '%__c'") + tdSql.checkRows(0) + + tdSql.execute("create table db.t3x (ts timestamp, c1 varchar(100))") + + tdSql.execute("insert into db.t3x values(now, '我是中文'), (now+1s, '我是_中文'), (now+2s, '我是%中文'), (now+3s, '%中文'),(now+4s, '_中文')") + tdSql.query("select * from db.t3x where c1 match '%中文'") + tdSql.checkRows(2) + tdSql.query("select * from db.t3x where c1 match '中文'") + tdSql.checkRows(5) + + for thread in threads: + print(f"Thread waitting for finish...") + thread.join() + + print(f"Mutithread test finished.") + + def run(self): + tdLog.printNoPrefix("==========start match_test run ...............") + tdSql.prepare(replica = self.replicaVar) + + self.initConnection() + + self.initDB() + self.match_test() + + self.stopTest() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file index 96f9452827..cdc4e27f20 100644 --- a/tests/system-test/win-test-file +++ b/tests/system-test/win-test-file @@ -104,6 +104,10 @@ python3 ./test.py -f 2-query/like.py python3 ./test.py -f 2-query/like.py -Q 2 python3 ./test.py -f 2-query/like.py -Q 3 python3 ./test.py -f 2-query/like.py -Q 4 +python3 ./test.py -f 2-query/match.py +python3 ./test.py -f 2-query/match.py -Q 2 +python3 ./test.py -f 2-query/match.py -Q 3 +python3 ./test.py -f 2-query/match.py -Q 4 python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False python3 ./test.py -f 3-enterprise/restore/restoreMnode.py -N 5 -M 3 -i False From c135ee835867a1ab226a5e66ea664fb32589b72f Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 11 Jul 2024 11:37:16 +0800 Subject: [PATCH 04/17] fix: set terrno --- source/util/src/tcompare.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index 7f14f3a1e0..fde07b4ba8 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1280,6 +1280,7 @@ static UsingRegex *getRegComp(const char *pPattern) { UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex)); if (pUsingRegex == NULL) { uError("Failed to Malloc when compile regex pattern %s.", pPattern); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } int32_t cflags = REG_EXTENDED; @@ -1306,11 +1307,13 @@ static UsingRegex *getRegComp(const char *pPattern) { } else { uError("Failed to get regex pattern %s from cache, exception internal error.", pPattern); taosThreadRwlockUnlock(&sRegexCache.regexLock); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return NULL; } } else { uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern); taosThreadRwlockUnlock(&sRegexCache.regexLock); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return NULL; } } From cfda747613ec9f50e18bf29c96a32d81da893242 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 11 Jul 2024 21:00:08 +0800 Subject: [PATCH 05/17] fix: match cache release --- source/libs/scalar/src/filter.c | 6 +- source/libs/scalar/src/scalar.c | 4 +- source/libs/scalar/src/sclvector.c | 9 +- source/util/src/tcompare.c | 150 +++++++++++++++-------------- tests/system-test/2-query/match.py | 17 +--- 5 files changed, 93 insertions(+), 93 deletions(-) diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 72e38c7a0d..0a59cd2219 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3217,6 +3217,7 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes continue; } + terrno = TSDB_CODE_SUCCESS; void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); // match/nmatch for nchar type need convert from ucs4 to mbs if (info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR && @@ -3235,6 +3236,7 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes p[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, colData, info->cunits[uidx].valData); } + if (terrno != TSDB_CODE_SUCCESS) break; if (p[i] == 0) { all = false; @@ -3358,8 +3360,9 @@ int32_t filterSetExecFunc(SFilterInfo *info) { return TSDB_CODE_SUCCESS; } + terrno = TSDB_CODE_SUCCESS; info->func = filterExecuteImplMisc; - return TSDB_CODE_SUCCESS; + return terrno; } int32_t filterPreprocess(SFilterInfo *info) { @@ -4744,6 +4747,7 @@ int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, } bool keepAll = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified); + if (terrno != TSDB_CODE_SUCCESS) return terrno; // todo this should be return during filter procedure if (keepAll) { diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 50de5e760d..f9b9f3bcdb 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -1650,8 +1650,8 @@ static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) { return TSDB_CODE_TSC_INVALID_OPERATION; } SDataType rdt = ((SExprNode *)(pOp->pRight))->resType; - if (ldt.type == TSDB_DATA_TYPE_VARBINARY || !IS_VAR_DATA_TYPE(ldt.type) || QUERY_NODE_VALUE != nodeType(pOp->pRight) || - (!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) { + if (QUERY_NODE_VALUE != nodeType(pOp->pRight) || + (rdt.type != TSDB_DATA_TYPE_NCHAR && rdt.type != TSDB_DATA_TYPE_VARCHAR && rdt.type != TSDB_DATA_TYPE_NULL)) { return TSDB_CODE_TSC_INVALID_OPERATION; } } diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 673919b2f5..39c6a0cd67 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -1667,7 +1667,6 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa } } else { for (int32_t i = startIndex; i < numOfRows && i >= 0; i += step) { - if (terrno != TSDB_CODE_SUCCESS) break; int32_t leftIndex = (i >= pLeft->numOfRows) ? 0 : i; int32_t rightIndex = (i >= pRight->numOfRows) ? 0 : i; @@ -1676,11 +1675,11 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa pRes[i] = false; continue; } - char *pLeftData = colDataGetData(pLeft->columnData, leftIndex); char *pRightData = colDataGetData(pRight->columnData, rightIndex); - + terrno = TSDB_CODE_SUCCESS; pRes[i] = filterDoCompare(fp, optr, pLeftData, pRightData); + if (terrno != TSDB_CODE_SUCCESS) break; if (pRes[i]) { ++num; } @@ -1689,7 +1688,6 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa } else { // if (GET_PARAM_TYPE(pLeft) == TSDB_DATA_TYPE_JSON || GET_PARAM_TYPE(pRight) == TSDB_DATA_TYPE_JSON) { for (int32_t i = startIndex; i < numOfRows && i >= startIndex; i += step) { - if (terrno != TSDB_CODE_SUCCESS) break; int32_t leftIndex = (i >= pLeft->numOfRows) ? 0 : i; int32_t rightIndex = (i >= pRight->numOfRows) ? 0 : i; @@ -1716,7 +1714,7 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa if (!pLeftData || !pRightData) { result = false; } - + terrno = TSDB_CODE_SUCCESS; if (!result) { colDataSetInt8(pOut->columnData, i, (int8_t *)&result); } else { @@ -1726,6 +1724,7 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa ++num; } } + if (terrno != TSDB_CODE_SUCCESS) break; if (freeLeft) { taosMemoryFreeClear(pLeftData); diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index fde07b4ba8..ebc6f860a4 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -24,6 +24,7 @@ #include "tutil.h" #include "types.h" #include "osString.h" +#include "ttimer.h" int32_t setChkInBytes1(const void *pLeft, const void *pRight) { return NULL != taosHashGet((SHashObj *)pRight, pLeft, 1) ? 1 : 0; @@ -1205,77 +1206,87 @@ int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight) { typedef struct UsingRegex { regex_t pRegex; - int32_t usingCount; int32_t lastUsedTime; } UsingRegex; typedef struct RegexCache { SHashObj *regexHash; int32_t regexCaheSize; - TdThreadRwlock regexLock; + void *regexCacheTimer; + void *timer; int32_t lastClearTime; } RegexCache; static RegexCache sRegexCache; #define MAX_REGEX_CACHE_SIZE 20 #define REGEX_CACHE_CLEAR_TIME 30 -int32_t InitRegexCache() { - if (taosThreadRwlockInit(&sRegexCache.regexLock, NULL) != 0) { - uError("failed to create RegexCache lock"); - return -1; - } - sRegexCache.regexHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - if (sRegexCache.regexHash == NULL) { - uError("failed to create RegexCache"); - return -1; - } - sRegexCache.regexCaheSize = MAX_REGEX_CACHE_SIZE; - sRegexCache.lastClearTime = taosGetTimestampSec(); - return 0; -} - -void DestroyRegexCache(){ - UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL); - while ((ppUsingRegex != NULL)) { - regfree(&(*ppUsingRegex)->pRegex); - taosMemoryFree(*ppUsingRegex); - ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); - } - taosHashCleanup(sRegexCache.regexHash); - taosThreadRwlockDestroy(&sRegexCache.regexLock); -} - -static void clearOlderRegex() { +static void checkRegexCache(void* param, void* tmrId) { if (taosGetTimestampSec() - sRegexCache.lastClearTime < REGEX_CACHE_CLEAR_TIME || taosHashGetSize(sRegexCache.regexHash) < sRegexCache.regexCaheSize) { return; } - taosThreadRwlockWrlock(&sRegexCache.regexLock); + if (taosHashGetSize(sRegexCache.regexHash) >= sRegexCache.regexCaheSize) { UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL); while ((ppUsingRegex != NULL)) { - if ((*ppUsingRegex)->usingCount == 0 && - taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) { - regfree(&(*ppUsingRegex)->pRegex); - taosMemoryFree(*ppUsingRegex); + if (taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) { taosHashRelease(sRegexCache.regexHash, ppUsingRegex); sRegexCache.lastClearTime = taosGetTimestampSec(); } ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); } } - taosThreadRwlockUnlock(&sRegexCache.regexLock); + + taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, NULL, &tmrId); } -static UsingRegex *getRegComp(const char *pPattern) { - taosThreadRwlockRdlock(&sRegexCache.regexLock); - UsingRegex **ppUsingRegex = (UsingRegex **)taosHashGet(sRegexCache.regexHash, pPattern, strlen(pPattern)); - if (ppUsingRegex != NULL) { - (*ppUsingRegex)->usingCount++; - taosThreadRwlockUnlock(&sRegexCache.regexLock); - return *ppUsingRegex; +void regexCacheFree(void *ppUsingRegex) { + regfree(&(*(UsingRegex **)ppUsingRegex)->pRegex); + taosMemoryFree(*(UsingRegex **)ppUsingRegex); +} + +int32_t InitRegexCache() { + sRegexCache.regexHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (sRegexCache.regexHash == NULL) { + uError("failed to create RegexCache"); + return -1; + } + taosHashSetFreeFp(sRegexCache.regexHash, regexCacheFree); + sRegexCache.regexCaheSize = MAX_REGEX_CACHE_SIZE; + sRegexCache.lastClearTime = taosGetTimestampSec(); + + sRegexCache.regexCacheTimer = taosTmrInit(0, 0, 0, "REGEXCACHE"); + if (sRegexCache.regexCacheTimer == NULL) { + uError("failed to create regex cache check timer"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + sRegexCache.timer = taosTmrStart(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, NULL, sRegexCache.regexCacheTimer); + if (sRegexCache.timer == NULL) { + uError("failed to start regex cache timer"); + return -1; + } + + return 0; +} + +void DestroyRegexCache(){ + UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL); + taosTmrStopA(&sRegexCache.timer); + while ((ppUsingRegex != NULL)) { + regexCacheFree(ppUsingRegex); + ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); + } + taosHashCleanup(sRegexCache.regexHash); +} + +static UsingRegex **getRegComp(const char *pPattern) { + UsingRegex **ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); + if (ppUsingRegex != NULL) { + (*ppUsingRegex)->lastUsedTime = taosGetTimestampSec(); + return ppUsingRegex; } - taosThreadRwlockUnlock(&sRegexCache.regexLock); UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex)); if (pUsingRegex == NULL) { @@ -1294,54 +1305,49 @@ static UsingRegex *getRegComp(const char *pPattern) { return NULL; } - taosThreadRwlockWrlock(&sRegexCache.regexLock); - int code = taosHashPut(sRegexCache.regexHash, pPattern, strlen(pPattern), &pUsingRegex, sizeof(UsingRegex *)); - if (code != 0) { - if( terrno == TSDB_CODE_DUP_KEY) { - regfree(&pUsingRegex->pRegex); - taosMemoryFree(pUsingRegex); - - UsingRegex **ppUsingRegex = (UsingRegex **)taosHashGet(sRegexCache.regexHash, pPattern, strlen(pPattern)); - if(ppUsingRegex) { - pUsingRegex = (*ppUsingRegex); + while (true) { + int code = taosHashPut(sRegexCache.regexHash, pPattern, strlen(pPattern), &pUsingRegex, sizeof(UsingRegex *)); + if (code != 0) { + if (terrno == TSDB_CODE_DUP_KEY) { + ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); + if (ppUsingRegex) { + if (*ppUsingRegex != pUsingRegex) { + regexCacheFree(&pUsingRegex); + } + pUsingRegex = (*ppUsingRegex); + break; + } else { + continue; + } } else { - uError("Failed to get regex pattern %s from cache, exception internal error.", pPattern); - taosThreadRwlockUnlock(&sRegexCache.regexLock); - terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; - return NULL; - } - } else { + regexCacheFree(&pUsingRegex); uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern); - taosThreadRwlockUnlock(&sRegexCache.regexLock); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return NULL; + } } } - atomic_add_fetch_32(&pUsingRegex->usingCount, 1); - taosThreadRwlockUnlock(&sRegexCache.regexLock); - - clearOlderRegex(); - return pUsingRegex; + pUsingRegex->lastUsedTime = taosGetTimestampSec(); + return ppUsingRegex; } -void recycleRegex(UsingRegex *regex){ - atomic_add_fetch_32(®ex->usingCount, -1); - regex->lastUsedTime = taosGetTimestampSec(); +void releaseRegComp(UsingRegex **regex){ + taosHashRelease(sRegexCache.regexHash, regex); } static int32_t doExecRegexMatch(const char *pString, const char *pPattern) { int32_t ret = 0; char msgbuf[256] = {0}; - UsingRegex *pUsingRegex = getRegComp(pPattern); + UsingRegex **pUsingRegex = getRegComp(pPattern); if (pUsingRegex == NULL) { return 1; } regmatch_t pmatch[1]; - ret = regexec(&pUsingRegex->pRegex, pString, 1, pmatch, 0); - recycleRegex(pUsingRegex); + ret = regexec(&(*pUsingRegex)->pRegex, pString, 1, pmatch, 0); + releaseRegComp(pUsingRegex); if (ret != 0 && ret != REG_NOMATCH) { - regerror(ret, &pUsingRegex->pRegex, msgbuf, sizeof(msgbuf)); + regerror(ret, &(*pUsingRegex)->pRegex, msgbuf, sizeof(msgbuf)); uDebug("Failed to match %s with pattern %s, reason %s", pString, pPattern, msgbuf) } diff --git a/tests/system-test/2-query/match.py b/tests/system-test/2-query/match.py index 2e47f079e2..62de846cb7 100644 --- a/tests/system-test/2-query/match.py +++ b/tests/system-test/2-query/match.py @@ -38,18 +38,6 @@ class TDTestCase: def stopTest(self): tdSql.execute("drop database if exists db") - def insertData(self, threadID): - cursor = self.conn.cursor() - print("Thread %d: starting" % threadID) - base = 200000 * threadID - for i in range(200): - query = "insert into tb values" - for j in range(1000): - query += "(%d, %d, 'test')" % (self.ts + base + i * 1000 + j, base + i * 1000 + j) - cursor.execute(query) - cursor.close() - print("Thread %d: finishing" % threadID) - def threadTest(self, threadID): print(f"Thread {threadID} starting...") tdsqln = tdCom.newTdSql() @@ -68,6 +56,8 @@ class TDTestCase: tdsqln.query("select * from db.t1x where c1 match '%__c'") tdsqln.checkRows(0) + + tdsqln.error("select * from db.t1x where c1 match '*d'") print(f"Thread {threadID} finished.") @@ -91,7 +81,7 @@ class TDTestCase: tdSql.query("select * from db.t1x where c1 match '%__c'") tdSql.checkRows(0) - + tdSql.error("select * from db.t1x where c1 match '*d'") threads = [] for i in range(10): t = threading.Thread(target=self.threadTest, args=(i,)) @@ -122,6 +112,7 @@ class TDTestCase: tdSql.checkRows(2) tdSql.query("select * from db.t3x where c1 match '中文'") tdSql.checkRows(5) + tdSql.error("select * from db.t1x where c1 match '*d'") for thread in threads: print(f"Thread waitting for finish...") From 57f826608a5cf2848cb57e6554b0d4ddf542b78e Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Fri, 12 Jul 2024 11:48:48 +0800 Subject: [PATCH 06/17] checkout regex pattern at client --- include/libs/nodes/querynodes.h | 1 + include/util/tcompare.h | 1 + source/libs/nodes/src/nodesUtilFuncs.c | 11 ++++++++ source/libs/scalar/src/filter.c | 6 +--- source/libs/scalar/src/scalar.c | 6 ++++ source/libs/scalar/src/sclvector.c | 4 --- source/util/src/tcompare.c | 38 ++++++++++++++------------ tests/system-test/2-query/match.py | 1 + 8 files changed, 42 insertions(+), 26 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 34b42fd9e1..0c4c7819ab 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -618,6 +618,7 @@ bool nodesIsArithmeticOp(const SOperatorNode* pOp); bool nodesIsComparisonOp(const SOperatorNode* pOp); bool nodesIsJsonOp(const SOperatorNode* pOp); bool nodesIsRegularOp(const SOperatorNode* pOp); +bool nodesIsMatchRegularOp(const SOperatorNode* pOp); bool nodesIsBitwiseOp(const SOperatorNode* pOp); bool nodesExprHasColumn(SNode* pNode); diff --git a/include/util/tcompare.h b/include/util/tcompare.h index 4f574a9b79..80f992f646 100644 --- a/include/util/tcompare.h +++ b/include/util/tcompare.h @@ -48,6 +48,7 @@ typedef struct SPatternCompareInfo { int32_t InitRegexCache(); void DestroyRegexCache(); int32_t patternMatch(const char *pattern, size_t psize, const char *str, size_t ssize, const SPatternCompareInfo *pInfo); +int32_t checkRegexPattern(const char *pPattern); int32_t wcsPatternMatch(const TdUcs4 *pattern, size_t psize, const TdUcs4 *str, size_t ssize, const SPatternCompareInfo *pInfo); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index f118c15b7a..5ec8adafe9 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -2205,6 +2205,17 @@ bool nodesIsRegularOp(const SOperatorNode* pOp) { return false; } +bool nodesIsMatchRegularOp(const SOperatorNode* pOp) { + switch (pOp->opType) { + case OP_TYPE_MATCH: + case OP_TYPE_NMATCH: + return true; + default: + break; + } + return false; +} + bool nodesIsBitwiseOp(const SOperatorNode* pOp) { switch (pOp->opType) { case OP_TYPE_BIT_AND: diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 0a59cd2219..72e38c7a0d 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3217,7 +3217,6 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes continue; } - terrno = TSDB_CODE_SUCCESS; void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); // match/nmatch for nchar type need convert from ucs4 to mbs if (info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR && @@ -3236,7 +3235,6 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes p[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, colData, info->cunits[uidx].valData); } - if (terrno != TSDB_CODE_SUCCESS) break; if (p[i] == 0) { all = false; @@ -3360,9 +3358,8 @@ int32_t filterSetExecFunc(SFilterInfo *info) { return TSDB_CODE_SUCCESS; } - terrno = TSDB_CODE_SUCCESS; info->func = filterExecuteImplMisc; - return terrno; + return TSDB_CODE_SUCCESS; } int32_t filterPreprocess(SFilterInfo *info) { @@ -4747,7 +4744,6 @@ int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, } bool keepAll = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified); - if (terrno != TSDB_CODE_SUCCESS) return terrno; // todo this should be return during filter procedure if (keepAll) { diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index f9b9f3bcdb..8d1f8dd9b1 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -1654,6 +1654,12 @@ static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) { (rdt.type != TSDB_DATA_TYPE_NCHAR && rdt.type != TSDB_DATA_TYPE_VARCHAR && rdt.type != TSDB_DATA_TYPE_NULL)) { return TSDB_CODE_TSC_INVALID_OPERATION; } + if (nodesIsMatchRegularOp(pOp)) { + SValueNode* node = (SValueNode*)(pOp->pRight); + if(checkRegexPattern(node->literal) != TSDB_CODE_SUCCESS){ + return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR; + } + } } pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 39c6a0cd67..16c06c2452 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -1677,9 +1677,7 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa } char *pLeftData = colDataGetData(pLeft->columnData, leftIndex); char *pRightData = colDataGetData(pRight->columnData, rightIndex); - terrno = TSDB_CODE_SUCCESS; pRes[i] = filterDoCompare(fp, optr, pLeftData, pRightData); - if (terrno != TSDB_CODE_SUCCESS) break; if (pRes[i]) { ++num; } @@ -1714,7 +1712,6 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa if (!pLeftData || !pRightData) { result = false; } - terrno = TSDB_CODE_SUCCESS; if (!result) { colDataSetInt8(pOut->columnData, i, (int8_t *)&result); } else { @@ -1724,7 +1721,6 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa ++num; } } - if (terrno != TSDB_CODE_SUCCESS) break; if (freeLeft) { taosMemoryFreeClear(pLeftData); diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index ebc6f860a4..4c4e859480 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1211,27 +1211,23 @@ typedef struct UsingRegex { typedef struct RegexCache { SHashObj *regexHash; - int32_t regexCaheSize; void *regexCacheTimer; void *timer; - int32_t lastClearTime; } RegexCache; static RegexCache sRegexCache; #define MAX_REGEX_CACHE_SIZE 20 #define REGEX_CACHE_CLEAR_TIME 30 static void checkRegexCache(void* param, void* tmrId) { - if (taosGetTimestampSec() - sRegexCache.lastClearTime < REGEX_CACHE_CLEAR_TIME || - taosHashGetSize(sRegexCache.regexHash) < sRegexCache.regexCaheSize) { + if (taosHashGetSize(sRegexCache.regexHash) < MAX_REGEX_CACHE_SIZE) { return; } - if (taosHashGetSize(sRegexCache.regexHash) >= sRegexCache.regexCaheSize) { + if (taosHashGetSize(sRegexCache.regexHash) >= MAX_REGEX_CACHE_SIZE) { UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL); while ((ppUsingRegex != NULL)) { if (taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) { taosHashRelease(sRegexCache.regexHash, ppUsingRegex); - sRegexCache.lastClearTime = taosGetTimestampSec(); } ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); } @@ -1252,9 +1248,6 @@ int32_t InitRegexCache() { return -1; } taosHashSetFreeFp(sRegexCache.regexHash, regexCacheFree); - sRegexCache.regexCaheSize = MAX_REGEX_CACHE_SIZE; - sRegexCache.lastClearTime = taosGetTimestampSec(); - sRegexCache.regexCacheTimer = taosTmrInit(0, 0, 0, "REGEXCACHE"); if (sRegexCache.regexCacheTimer == NULL) { uError("failed to create regex cache check timer"); @@ -1272,15 +1265,28 @@ int32_t InitRegexCache() { } void DestroyRegexCache(){ - UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL); taosTmrStopA(&sRegexCache.timer); - while ((ppUsingRegex != NULL)) { - regexCacheFree(ppUsingRegex); - ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); - } taosHashCleanup(sRegexCache.regexHash); } +int32_t checkRegexPattern(const char *pPattern) { + if (pPattern == NULL) { + return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR; + } + + regex_t regex; + int32_t cflags = REG_EXTENDED; + int32_t ret = regcomp(®ex, pPattern, cflags); + if (ret != 0) { + char msgbuf[256] = {0}; + regerror(ret, ®ex, msgbuf, tListLen(msgbuf)); + uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf); + return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR; + } + regfree(®ex); + return TSDB_CODE_SUCCESS; +} + static UsingRegex **getRegComp(const char *pPattern) { UsingRegex **ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); if (ppUsingRegex != NULL) { @@ -1291,7 +1297,6 @@ static UsingRegex **getRegComp(const char *pPattern) { UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex)); if (pUsingRegex == NULL) { uError("Failed to Malloc when compile regex pattern %s.", pPattern); - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } int32_t cflags = REG_EXTENDED; @@ -1301,7 +1306,6 @@ static UsingRegex **getRegComp(const char *pPattern) { regerror(ret, &pUsingRegex->pRegex, msgbuf, tListLen(msgbuf)); uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf); taosMemoryFree(pUsingRegex); - terrno = TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR; return NULL; } @@ -1309,6 +1313,7 @@ static UsingRegex **getRegComp(const char *pPattern) { int code = taosHashPut(sRegexCache.regexHash, pPattern, strlen(pPattern), &pUsingRegex, sizeof(UsingRegex *)); if (code != 0) { if (terrno == TSDB_CODE_DUP_KEY) { + terrno = TSDB_CODE_SUCCESS; ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); if (ppUsingRegex) { if (*ppUsingRegex != pUsingRegex) { @@ -1322,7 +1327,6 @@ static UsingRegex **getRegComp(const char *pPattern) { } else { regexCacheFree(&pUsingRegex); uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern); - terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return NULL; } } diff --git a/tests/system-test/2-query/match.py b/tests/system-test/2-query/match.py index 62de846cb7..cd2ed5d96b 100644 --- a/tests/system-test/2-query/match.py +++ b/tests/system-test/2-query/match.py @@ -71,6 +71,7 @@ class TDTestCase: tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}x'") tdSql.checkRows(0) + tdSql.error("select * from db.t1x where c1 match '*d'") tdSql.query("insert into db.t1x values(now, 'abc'), (now+1s, 'a%c'),(now+2s, 'a_c'),(now+3s, '_c'),(now+4s, '%c')") tdSql.query("select * from db.t1x") From 2604a17fef44165ec56b2ab83c1425abc8619059 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Fri, 12 Jul 2024 12:04:23 +0800 Subject: [PATCH 07/17] fix: invalid operation --- source/libs/scalar/src/scalar.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 8d1f8dd9b1..a9ee40f8b0 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -1650,8 +1650,8 @@ static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) { return TSDB_CODE_TSC_INVALID_OPERATION; } SDataType rdt = ((SExprNode *)(pOp->pRight))->resType; - if (QUERY_NODE_VALUE != nodeType(pOp->pRight) || - (rdt.type != TSDB_DATA_TYPE_NCHAR && rdt.type != TSDB_DATA_TYPE_VARCHAR && rdt.type != TSDB_DATA_TYPE_NULL)) { + if (QUERY_NODE_VALUE != nodeType(pOp->pRight) || + (rdt.type != TSDB_DATA_TYPE_NCHAR && rdt.type != TSDB_DATA_TYPE_VARCHAR)) { return TSDB_CODE_TSC_INVALID_OPERATION; } if (nodesIsMatchRegularOp(pOp)) { From eed684e0723582540056f36e486978ef8a2c4a80 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Fri, 12 Jul 2024 14:16:55 +0800 Subject: [PATCH 08/17] fix: match check --- source/libs/scalar/src/scalar.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index a9ee40f8b0..8f9ea4d36c 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -1650,8 +1650,8 @@ static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) { return TSDB_CODE_TSC_INVALID_OPERATION; } SDataType rdt = ((SExprNode *)(pOp->pRight))->resType; - if (QUERY_NODE_VALUE != nodeType(pOp->pRight) || - (rdt.type != TSDB_DATA_TYPE_NCHAR && rdt.type != TSDB_DATA_TYPE_VARCHAR)) { + if (ldt.type == TSDB_DATA_TYPE_VARBINARY || !IS_VAR_DATA_TYPE(ldt.type) || QUERY_NODE_VALUE != nodeType(pOp->pRight) || + (!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) { return TSDB_CODE_TSC_INVALID_OPERATION; } if (nodesIsMatchRegularOp(pOp)) { From 3e85d0c54626ec9519060cd5044294d36b341e8c Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Sat, 13 Jul 2024 11:39:25 +0800 Subject: [PATCH 09/17] fix: timer --- source/util/src/tcompare.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index 4c4e859480..6433018293 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1219,6 +1219,7 @@ static RegexCache sRegexCache; #define REGEX_CACHE_CLEAR_TIME 30 static void checkRegexCache(void* param, void* tmrId) { + taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, NULL, &tmrId); if (taosHashGetSize(sRegexCache.regexHash) < MAX_REGEX_CACHE_SIZE) { return; } @@ -1232,8 +1233,6 @@ static void checkRegexCache(void* param, void* tmrId) { ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); } } - - taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, NULL, &tmrId); } void regexCacheFree(void *ppUsingRegex) { From 22f69837b34feab600de869b20cb367c17b6f4aa Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Sat, 13 Jul 2024 15:22:16 +0800 Subject: [PATCH 10/17] clear timer --- source/util/src/tcompare.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index 6433018293..b138028519 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1211,7 +1211,7 @@ typedef struct UsingRegex { typedef struct RegexCache { SHashObj *regexHash; - void *regexCacheTimer; + void *regexCacheTmr; void *timer; } RegexCache; static RegexCache sRegexCache; @@ -1219,7 +1219,7 @@ static RegexCache sRegexCache; #define REGEX_CACHE_CLEAR_TIME 30 static void checkRegexCache(void* param, void* tmrId) { - taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, NULL, &tmrId); + taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, sRegexCache.regexCacheTmr, &tmrId); if (taosHashGetSize(sRegexCache.regexHash) < MAX_REGEX_CACHE_SIZE) { return; } @@ -1236,6 +1236,7 @@ static void checkRegexCache(void* param, void* tmrId) { } void regexCacheFree(void *ppUsingRegex) { + uInfo("[regex cache] regexCacheFree %p", ppUsingRegex); regfree(&(*(UsingRegex **)ppUsingRegex)->pRegex); taosMemoryFree(*(UsingRegex **)ppUsingRegex); } @@ -1247,14 +1248,14 @@ int32_t InitRegexCache() { return -1; } taosHashSetFreeFp(sRegexCache.regexHash, regexCacheFree); - sRegexCache.regexCacheTimer = taosTmrInit(0, 0, 0, "REGEXCACHE"); - if (sRegexCache.regexCacheTimer == NULL) { + sRegexCache.regexCacheTmr = taosTmrInit(0, 0, 0, "REGEXCACHE"); + if (sRegexCache.regexCacheTmr == NULL) { uError("failed to create regex cache check timer"); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - sRegexCache.timer = taosTmrStart(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, NULL, sRegexCache.regexCacheTimer); + sRegexCache.timer = taosTmrStart(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, NULL, sRegexCache.regexCacheTmr); if (sRegexCache.timer == NULL) { uError("failed to start regex cache timer"); return -1; @@ -1264,8 +1265,10 @@ int32_t InitRegexCache() { } void DestroyRegexCache(){ + uInfo("[regex cache] destory regex cache"); taosTmrStopA(&sRegexCache.timer); taosHashCleanup(sRegexCache.regexHash); + taosTmrCleanUp(sRegexCache.regexCacheTmr); } int32_t checkRegexPattern(const char *pPattern) { From e88277592a1dcfec14918d72c4bace8b3dfecf9e Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Sat, 13 Jul 2024 18:13:21 +0800 Subject: [PATCH 11/17] log level --- source/util/src/tcompare.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index b138028519..c7b6d78868 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1236,7 +1236,6 @@ static void checkRegexCache(void* param, void* tmrId) { } void regexCacheFree(void *ppUsingRegex) { - uInfo("[regex cache] regexCacheFree %p", ppUsingRegex); regfree(&(*(UsingRegex **)ppUsingRegex)->pRegex); taosMemoryFree(*(UsingRegex **)ppUsingRegex); } From a8c39cdd1139c23958132b7ca19bbb06e7dbf99a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 15 Jul 2024 14:48:21 +0800 Subject: [PATCH 12/17] enh: add return code processing --- source/libs/qworker/src/qwDbg.c | 21 +++++++++++--- source/libs/qworker/src/qwMsg.c | 48 +++++++++++++++++++------------ source/libs/qworker/src/qwUtil.c | 43 +++++++++++++++++++++------ source/libs/qworker/src/qworker.c | 2 +- 4 files changed, 82 insertions(+), 32 deletions(-) diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 17b2849684..b7a4b718e2 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -173,8 +173,21 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int if (pEpSet) { contLen = tSerializeSEpSet(NULL, 0, pEpSet); + if (contLen < 0) { + qError("tSerializeSEpSet failed, code:%x", terrno); + return terrno; + } rsp = rpcMallocCont(contLen); - tSerializeSEpSet(rsp, contLen, pEpSet); + if (NULL == rsp) { + qError("rpcMallocCont %d failed, code:%x", contLen, terrno); + return terrno; + } + + contLen = tSerializeSEpSet(rsp, contLen, pEpSet); + if (contLen < 0) { + qError("tSerializeSEpSet second failed, code:%x", terrno); + return terrno; + } } SRpcMsg rpcRsp = { @@ -216,20 +229,20 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { epSet.eps[2].port = 7300; ctx->phase = QW_PHASE_POST_QUERY; - qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet); + (void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet); // ignore error *rsped = true; return; } if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY); - qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); + (void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error *rsped = true; return; } if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) { - qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); + (void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error *rsped = true; return; } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 473dd41228..9f4a540f3c 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -166,6 +166,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve int32_t code) { if (NULL == pRsp) { pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + if (NULL == pRsp) { + QW_RET(terrno); + } memset(pRsp, 0, sizeof(SRetrieveTableRsp)); dataLength = 0; } @@ -187,6 +190,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve #if 0 int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) { STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); + if (NULL == pRsp) { + QW_RET(terrno); + } pRsp->code = code; SRpcMsg rpcRsp = { @@ -203,6 +209,9 @@ int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) { int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) { STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp)); + if (NULL == pRsp) { + QW_RET(terrno); + } pRsp->code = code; SRpcMsg rpcRsp = { @@ -428,6 +437,7 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SSubQueryMsg msg = {0}; if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) { @@ -442,8 +452,8 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t eId = msg.execId; QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle); - qwAbortPrerocessQuery(QW_FPARAMS()); - QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle); + code = qwAbortPrerocessQuery(QW_FPARAMS()); + QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p, code:%x", pMsg->info.handle, code); tFreeSSubQueryMsg(&msg); @@ -458,7 +468,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1); SSubQueryMsg msg = {0}; @@ -500,7 +510,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in SQWTaskCtx *handles = NULL; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1); if (NULL == msg || pMsg->contLen < sizeof(*msg)) { @@ -533,7 +543,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int SResFetchReq req = {0}; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1); if (tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0) { @@ -551,9 +561,9 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg)); + int32_t code = qwProcessFetch(QW_FPARAMS(), &qwMsg); - QW_SCH_TASK_DLOG("processFetch end, node:%p", node); + QW_SCH_TASK_DLOG("processFetch end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -561,7 +571,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (mgmt) { - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1); } @@ -580,7 +590,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in int32_t code = 0; STaskCancelReq *msg = pMsg->pCont; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1); if (NULL == msg || pMsg->contLen < sizeof(*msg)) { @@ -621,7 +631,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1); STaskDropReq msg = {0}; @@ -644,9 +654,9 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg)); + code = qwProcessDrop(QW_FPARAMS(), &qwMsg); - QW_SCH_TASK_DLOG("processDrop end, node:%p", node); + QW_SCH_TASK_DLOG("processDrop end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -659,7 +669,7 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.notifyProcessed, 1); STaskNotifyReq msg = {0}; @@ -678,9 +688,9 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in QW_SCH_TASK_DLOG("processNotify start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessNotify(QW_FPARAMS(), &qwMsg)); + code = qwProcessNotify(QW_FPARAMS(), &qwMsg); - QW_SCH_TASK_DLOG("processNotify end, node:%p", node); + QW_SCH_TASK_DLOG("processNotify end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -695,7 +705,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ SSchedulerHbReq req = {0}; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1); if (NULL == pMsg->pCont) { @@ -717,9 +727,9 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req)); + code = qwProcessHb(mgmt, &qwMsg, &req); - QW_SCH_DLOG("processHb end, node:%p", node); + QW_SCH_DLOG("processHb end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -735,7 +745,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1); - tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req); + QW_ERR_RET(tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req)); uint64_t sId = req.sId; uint64_t qId = req.queryId; diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 06559093a8..0451532cbe 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -323,34 +323,52 @@ static void freeExplainExecItem(void *param) { int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { + int32_t code = TSDB_CODE_SUCCESS; qTaskInfo_t taskHandle = ctx->taskHandle; ctx->explainRsped = true; SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo)); - QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList)); + if (NULL == execInfoList) { + QW_ERR_JRET(terrno); + } + + QW_ERR_JRET(qGetExplainExecInfo(taskHandle, execInfoList)); if (ctx->localExec) { SExplainLocalRsp localRsp = {0}; localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList); SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo)); + if (NULL == pExec) { + QW_ERR_JRET(terrno); + } memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo)); localRsp.rsp.subplanInfo = pExec; localRsp.qId = qId; localRsp.tId = tId; localRsp.rId = rId; localRsp.eId = eId; - taosArrayPush(ctx->explainRes, &localRsp); + if (NULL == taosArrayPush(ctx->explainRes, &localRsp)) { + QW_ERR_JRET(terrno); + } + taosArrayDestroy(execInfoList); + execInfoList = NULL; } else { SRpcHandleInfo connInfo = ctx->ctrlConnInfo; connInfo.ahandle = NULL; int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList); taosArrayDestroyEx(execInfoList, freeExplainExecItem); + execInfoList = NULL; + QW_ERR_RET(code); } - return TSDB_CODE_SUCCESS; +_return: + + taosArrayDestroyEx(execInfoList, freeExplainExecItem); + + return code; } @@ -544,7 +562,7 @@ int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { void qwCloseRef(void) { taosWLockLatch(&gQwMgmt.lock); if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) { - taosCloseRef(gQwMgmt.qwRef); + (void)taosCloseRef(gQwMgmt.qwRef); // ignore error gQwMgmt.qwRef = -1; } taosWUnLockLatch(&gQwMgmt.lock); @@ -561,7 +579,7 @@ void qwDestroyImpl(void *pMgmt) { int32_t schStatusCount = 0; qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); - taosTmrStop(mgmt->hbTimer); + (void)taosTmrStop(mgmt->hbTimer); //ignore error mgmt->hbTimer = NULL; taosTmrCleanUp(mgmt->timer); @@ -652,24 +670,33 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) { return pStat->num ? (pStat->total / pStat->num) : 0; default: qError("unsupported queue type %d", type); + break; } return -1; } void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) { + int32_t code = TSDB_CODE_SUCCESS; int32_t num = taosArrayGetSize(pExpiredSch); for (int32_t i = 0; i < num; ++i) { uint64_t *sId = taosArrayGet(pExpiredSch, i); SQWSchStatus *pSch = NULL; - if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) { + if (NULL == sId) { + qError("get the %dth sch failed, code:%x", i, terrno); + break; + } + + code = qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch); + if (TSDB_CODE_SUCCESS != code) { + qError("acquire sch %" PRIx64 " failed, code:%x", *sId, code); continue; } if (taosHashGetSize(pSch->tasksHash) <= 0) { qwDestroySchStatus(pSch); - taosHashRemove(mgmt->schHash, sId, sizeof(*sId)); - qDebug("sch %" PRIx64 " destroyed", *sId); + code = taosHashRemove(mgmt->schHash, sId, sizeof(*sId)); + qDebug("sch %" PRIx64 " destroy result code:%x", *sId, code); } qwReleaseScheduler(QW_WRITE, mgmt); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 084ee7efe3..5840cc0245 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1372,7 +1372,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S _return: if (mgmt->refId >= 0) { - qwRelease(mgmt->refId); // ignore error + (void)qwRelease(mgmt->refId); // ignore error } else { taosHashCleanup(mgmt->schHash); taosHashCleanup(mgmt->ctxHash); From a5073355bbf8be73bb46d48494ad97dcab7aefa1 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Mon, 15 Jul 2024 14:56:23 +0800 Subject: [PATCH 13/17] fix: regex hash --- source/util/src/tcompare.c | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index c7b6d78868..94c4e27487 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1228,7 +1228,7 @@ static void checkRegexCache(void* param, void* tmrId) { UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL); while ((ppUsingRegex != NULL)) { if (taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) { - taosHashRelease(sRegexCache.regexHash, ppUsingRegex); + taosHashRemove(sRegexCache.regexHash, ppUsingRegex); } ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); } @@ -1312,24 +1312,20 @@ static UsingRegex **getRegComp(const char *pPattern) { while (true) { int code = taosHashPut(sRegexCache.regexHash, pPattern, strlen(pPattern), &pUsingRegex, sizeof(UsingRegex *)); - if (code != 0) { - if (terrno == TSDB_CODE_DUP_KEY) { - terrno = TSDB_CODE_SUCCESS; - ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); - if (ppUsingRegex) { - if (*ppUsingRegex != pUsingRegex) { - regexCacheFree(&pUsingRegex); - } - pUsingRegex = (*ppUsingRegex); - break; - } else { - continue; - } - } else { + if (code != 0 && code != TSDB_CODE_DUP_KEY) { + regexCacheFree(&pUsingRegex); + uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern); + return NULL; + } + ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); + if (ppUsingRegex) { + if (*ppUsingRegex != pUsingRegex) { regexCacheFree(&pUsingRegex); - uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern); - return NULL; } + pUsingRegex = (*ppUsingRegex); + break; + } else { + continue; } } pUsingRegex->lastUsedTime = taosGetTimestampSec(); From 50d7a458e524b00e3c2e70b6240aaf2d207b55ac Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 15 Jul 2024 15:40:39 +0800 Subject: [PATCH 14/17] fix: qworker ut cases --- source/libs/qworker/test/qworkerTests.cpp | 167 ++++++++++++++-------- 1 file changed, 105 insertions(+), 62 deletions(-) diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 4a0d74a6e3..b60e285cb4 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -130,7 +130,7 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) { fetchRpc->contLen = sizeof(SResFetchReq); } -void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { +int qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { dropMsg->sId = 1; dropMsg->queryId = atomic_load_64(&qwtTestQueryId); dropMsg->taskId = 1; @@ -164,6 +164,10 @@ int32_t qwtStringToPlan(const char *str, SSubplan **subplan) { int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { taosWLockLatch(&qwtTestFetchQueueLock); struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg)); + if (NULL == newMsg) { + printf("malloc failed"); + assert(0); + } memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg; if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) { @@ -178,7 +182,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { } taosWUnLockLatch(&qwtTestFetchQueueLock); - tsem_post(&qwtTestFetchSem); + if (tsem_post(&qwtTestFetchSem) < 0) { + printf("tsem_post failed, errno:%d", errno); + assert(0); + } return 0; } @@ -186,6 +193,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) { taosWLockLatch(&qwtTestQueryQueueLock); struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg)); + if (NULL == newMsg) { + printf("malloc failed"); + assert(0); + } memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg; if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) { @@ -200,22 +211,34 @@ int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) { } taosWUnLockLatch(&qwtTestQueryQueueLock); - tsem_post(&qwtTestQuerySem); + if (tsem_post(&qwtTestQuerySem) < 0) { + printf("tsem_post failed, errno:%d", errno); + assert(0); + } return 0; } void qwtSendReqToDnode(void *pVnode, struct SEpSet *epSet, struct SRpcMsg *pReq) {} -void qwtRpcSendResponse(const SRpcMsg *pRsp) { +int qwtRpcSendResponse(const SRpcMsg *pRsp) { + int32_t code = 0; switch (pRsp->msgType) { case TDMT_SCH_QUERY_RSP: case TDMT_SCH_MERGE_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont; if (pRsp->code) { - qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); - qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); + if (code) { + assert(0); + return code; + } + code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + if (code) { + assert(0); + return code; + } } rpcFreeCont(rsp); @@ -227,13 +250,25 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { if (0 == pRsp->code && 0 == rsp->completed) { qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc); - qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc); + code = qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc); + if (code) { + assert(0); + return code; + } rpcFreeCont(rsp); return; } - qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); - qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); + if (code) { + assert(0); + return code; + } + code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + if (code) { + assert(0); + return code; + } rpcFreeCont(rsp); break; @@ -245,9 +280,11 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { qwtTestCaseFinished = true; break; } + default: + break; } - return; + return code; } int32_t qwtCreateExecTask(void *tsdb, int32_t vgId, uint64_t taskId, struct SSubplan *pPlan, qTaskInfo_t *pTaskInfo, @@ -292,6 +329,9 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) { if (endExec) { *pRes = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (NULL == *pRes) { + return terrno; + } (*pRes)->info.rows = taosRand() % 1000 + 1; } else { *pRes = NULL; @@ -631,7 +671,7 @@ void *queryThread(void *param) { while (!qwtTestStop) { qwtBuildQueryReqMsg(&queryRpc); - qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); + (void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); // ignore error if (qwtTestEnableSleep) { taosUsleep(taosRand() % 5); } @@ -653,7 +693,7 @@ void *fetchThread(void *param) { while (!qwtTestStop) { qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); - code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); + (void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); // ignore error if (qwtTestEnableSleep) { taosUsleep(taosRand() % 5); } @@ -674,8 +714,11 @@ void *dropThread(void *param) { STaskDropReq dropMsg = {0}; while (!qwtTestStop) { - qwtBuildDropReqMsg(&dropMsg, &dropRpc); - code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); + if (0 != qwtBuildDropReqMsg(&dropMsg, &dropRpc)) { + break; + } + (void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); // ignore error + if (qwtTestEnableSleep) { taosUsleep(taosRand() % 5); } @@ -700,7 +743,7 @@ void *qwtclientThread(void *param) { qwtTestCaseFinished = false; qwtBuildQueryReqMsg(&queryRpc); - qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc); + (void)qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc); //ignore error while (!qwtTestCaseFinished) { taosUsleep(1); @@ -752,9 +795,9 @@ void *queryQueueThread(void *param) { } if (TDMT_SCH_QUERY == queryRpc->msgType) { - qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0); + (void)qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error } else if (TDMT_SCH_QUERY_CONTINUE == queryRpc->msgType) { - qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0); + (void)qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error } else { printf("unknown msg in query queue, type:%d\n", queryRpc->msgType); assert(0); @@ -810,16 +853,16 @@ void *fetchQueueThread(void *param) { switch (fetchRpc->msgType) { case TDMT_SCH_FETCH: case TDMT_SCH_MERGE_FETCH: - qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); + (void)qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error break; case TDMT_SCH_CANCEL_TASK: //qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0); break; case TDMT_SCH_DROP_TASK: - qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0); + (void)qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error break; case TDMT_SCH_TASK_NOTIFY: - qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0); + (void)qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error break; default: printf("unknown msg type:%d in fetch queue", fetchRpc->msgType); @@ -853,7 +896,7 @@ TEST(seqTest, normalCase) { qwtBuildQueryReqMsg(&queryRpc); qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc); - qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); + (void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error stubSetStringToPlan(); stubSetRpcSendResponse(); @@ -898,7 +941,7 @@ TEST(seqTest, cancelFirst) { qwtInitLogFile(); qwtBuildQueryReqMsg(&queryRpc); - qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); + (void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error stubSetStringToPlan(); stubSetRpcSendResponse(); @@ -954,7 +997,7 @@ TEST(seqTest, randCase) { if (r >= 0 && r < maxr / 5) { printf("Query,%d\n", t++); qwtBuildQueryReqMsg(&queryRpc); - code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); + (void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); //ignore error } else if (r >= maxr / 5 && r < maxr * 2 / 5) { // printf("Ready,%d\n", t++); // qwtBuildReadyReqMsg(&readyMsg, &readyRpc); @@ -965,14 +1008,14 @@ TEST(seqTest, randCase) { } else if (r >= maxr * 2 / 5 && r < maxr * 3 / 5) { printf("Fetch,%d\n", t++); qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); - code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); + (void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); //ignore error if (qwtTestEnableSleep) { taosUsleep(1); } } else if (r >= maxr * 3 / 5 && r < maxr * 4 / 5) { printf("Drop,%d\n", t++); - qwtBuildDropReqMsg(&dropMsg, &dropRpc); - code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); + (void)qwtBuildDropReqMsg(&dropMsg, &dropRpc); //ignore error + (void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); //ignore error if (qwtTestEnableSleep) { taosUsleep(1); } @@ -1018,14 +1061,14 @@ TEST(seqTest, multithreadRand) { ASSERT_EQ(code, 0); TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5, t6; - taosThreadCreate(&(t1), &thattr, queryThread, mgmt); - // taosThreadCreate(&(t2), &thattr, readyThread, NULL); - taosThreadCreate(&(t3), &thattr, fetchThread, NULL); - taosThreadCreate(&(t4), &thattr, dropThread, NULL); - taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, queryThread, mgmt); //ignore error + // (void)taosThreadCreate(&(t2), &thattr, readyThread, NULL); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchThread, NULL); //ignore error + (void)taosThreadCreate(&(t4), &thattr, dropThread, NULL); //ignore error + (void)taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1083,16 +1126,16 @@ TEST(rcTest, shortExecshortDelay) { qwtTestMaxExecTaskUsec = 0; qwtTestReqMaxDelayUsec = 0; - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1114,8 +1157,8 @@ TEST(rcTest, shortExecshortDelay) { if (qwtTestCaseFinished) { if (qwtTestQuitThreadNum < 3) { - tsem_post(&qwtTestQuerySem); - tsem_post(&qwtTestFetchSem); + (void)tsem_post(&qwtTestQuerySem); //ignore error + (void)tsem_post(&qwtTestFetchSem); //ignore error taosUsleep(10); } @@ -1166,16 +1209,16 @@ TEST(rcTest, longExecshortDelay) { qwtTestMaxExecTaskUsec = 1000000; qwtTestReqMaxDelayUsec = 0; - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1197,8 +1240,8 @@ TEST(rcTest, longExecshortDelay) { if (qwtTestCaseFinished) { if (qwtTestQuitThreadNum < 3) { - tsem_post(&qwtTestQuerySem); - tsem_post(&qwtTestFetchSem); + (void)tsem_post(&qwtTestQuerySem); //ignore error + (void)tsem_post(&qwtTestFetchSem); //ignore error taosUsleep(10); } @@ -1249,16 +1292,16 @@ TEST(rcTest, shortExeclongDelay) { qwtTestMaxExecTaskUsec = 0; qwtTestReqMaxDelayUsec = 1000000; - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1280,8 +1323,8 @@ TEST(rcTest, shortExeclongDelay) { if (qwtTestCaseFinished) { if (qwtTestQuitThreadNum < 3) { - tsem_post(&qwtTestQuerySem); - tsem_post(&qwtTestFetchSem); + (void)tsem_post(&qwtTestQuerySem); //ignore error + (void)tsem_post(&qwtTestFetchSem); //ignore error taosUsleep(10); } @@ -1327,16 +1370,16 @@ TEST(rcTest, dropTest) { code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { From ae67829b674d4bdfb29a3e10fc344b7a382071bd Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Mon, 15 Jul 2024 16:32:32 +0800 Subject: [PATCH 15/17] fix: hash remove --- source/util/src/tcompare.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index 94c4e27487..09599cead4 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1228,7 +1228,9 @@ static void checkRegexCache(void* param, void* tmrId) { UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL); while ((ppUsingRegex != NULL)) { if (taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) { - taosHashRemove(sRegexCache.regexHash, ppUsingRegex); + size_t len = 0; + char* key = (char*)taosHashGetKey(ppUsingRegex, &len); + taosHashRemove(sRegexCache.regexHash, key, len); } ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); } From b81493b7b426a1328b9971d781c5eacd115a4364 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 15 Jul 2024 17:32:36 +0800 Subject: [PATCH 16/17] fix: compile issues --- source/libs/qworker/test/qworkerTests.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index b60e285cb4..d292b271c5 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -137,23 +137,25 @@ int qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, dropMsg); if (msgSize < 0) { - return; + return terrno; } char *msg = (char*)taosMemoryCalloc(1, msgSize); if (NULL == msg) { - return; + return terrno; } if (tSerializeSTaskDropReq(msg, msgSize, dropMsg) < 0) { taosMemoryFree(msg); - return; + return terrno; } dropRpc->msgType = TDMT_SCH_DROP_TASK; dropRpc->pCont = msg; dropRpc->contLen = msgSize; + + return TSDB_CODE_SUCCESS; } int32_t qwtStringToPlan(const char *str, SSubplan **subplan) { @@ -256,7 +258,7 @@ int qwtRpcSendResponse(const SRpcMsg *pRsp) { return code; } rpcFreeCont(rsp); - return; + return code; } code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); From f3e2ba31dd767fa66f3d3f614693ff7cad5886a5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 15 Jul 2024 19:29:27 +0800 Subject: [PATCH 17/17] change the macro --- include/util/tutil.h | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/include/util/tutil.h b/include/util/tutil.h index d3003c27ba..d1fc11d0fe 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -160,14 +160,15 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, } \ } while (0) -#define TAOS_CHECK_GOTO(CODE, LINO, LABEL) \ - do { \ - if ((CODE) != TSDB_CODE_SUCCESS) { \ - if (LINO) { \ - *((int32_t *)(LINO)) = __LINE__; \ - } \ - goto LABEL; \ - } \ +#define TAOS_CHECK_GOTO(CMD, LINO, LABEL) \ + do { \ + code = (CMD); \ + if (code != TSDB_CODE_SUCCESS) { \ + if (LINO) { \ + *((int32_t *)(LINO)) = __LINE__; \ + } \ + goto LABEL; \ + } \ } while (0) #ifdef __cplusplus