From 8fcf57c0771c05799767112cfb46908e3b117276 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Mon, 5 Aug 2024 12:28:49 +0800 Subject: [PATCH 01/13] l2 cache --- include/util/tcompare.h | 1 + source/util/src/tcompare.c | 51 +++++++++++++++++++++++++++------ source/util/src/tworker.c | 2 ++ source/util/test/CMakeLists.txt | 7 +++++ 4 files changed, 53 insertions(+), 8 deletions(-) diff --git a/include/util/tcompare.h b/include/util/tcompare.h index 80f992f646..c7a29cad57 100644 --- a/include/util/tcompare.h +++ b/include/util/tcompare.h @@ -49,6 +49,7 @@ int32_t InitRegexCache(); void DestroyRegexCache(); int32_t patternMatch(const char *pattern, size_t psize, const char *str, size_t ssize, const SPatternCompareInfo *pInfo); int32_t checkRegexPattern(const char *pPattern); +void DestoryThreadLocalRegComp(); int32_t wcsPatternMatch(const TdUcs4 *pattern, size_t psize, const TdUcs4 *str, size_t ssize, const SPatternCompareInfo *pInfo); diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index 4cb48bffe5..90c1441af5 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1290,13 +1290,12 @@ int32_t checkRegexPattern(const char *pPattern) { return TSDB_CODE_SUCCESS; } -static UsingRegex **getRegComp(const char *pPattern) { +UsingRegex **getRegComp(const char *pPattern) { UsingRegex **ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); if (ppUsingRegex != NULL) { (*ppUsingRegex)->lastUsedTime = taosGetTimestampSec(); return ppUsingRegex; } - UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex)); if (pUsingRegex == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1341,20 +1340,56 @@ void releaseRegComp(UsingRegex **regex){ taosHashRelease(sRegexCache.regexHash, regex); } -static int32_t doExecRegexMatch(const char *pString, const char *pPattern) { - int32_t ret = 0; - char msgbuf[256] = {0}; +static threadlocal UsingRegex ** ppRegex; +static threadlocal char *pOldPattern = NULL; +void DestoryThreadLocalRegComp() { + if (NULL != pOldPattern) { + releaseRegComp(ppRegex); + taosMemoryFree(pOldPattern); + pOldPattern = NULL; + } +} + +int32_t threadGetRegComp(regex_t **regex, const char *pPattern) { + if (NULL != pOldPattern) { + if (strcmp(pOldPattern, pPattern) == 0) { + *regex = &(*ppRegex)->pRegex; + return 0; + } else { + DestoryThreadLocalRegComp(); + } + } + UsingRegex **pUsingRegex = getRegComp(pPattern); if (pUsingRegex == NULL) { return 1; } + pOldPattern = (char *)taosMemoryMalloc(strlen(pPattern) + 1); + if (NULL == pOldPattern) { + uError("Failed to Malloc when compile regex pattern %s.", pPattern); + return TSDB_CODE_OUT_OF_MEMORY; + } + strcpy(pOldPattern, pPattern); + ppRegex = pUsingRegex; + *regex = &(*pUsingRegex)->pRegex; + return 0; +} + +static int32_t doExecRegexMatch(const char *pString, const char *pPattern) { + int32_t ret = 0; + char msgbuf[256] = {0}; + + regex_t *regex = NULL; + ret = threadGetRegComp(®ex, pPattern); + if (ret != 0) { + return ret; + } regmatch_t pmatch[1]; - ret = regexec(&(*pUsingRegex)->pRegex, pString, 1, pmatch, 0); - releaseRegComp(pUsingRegex); + ret = regexec(regex, pString, 1, pmatch, 0); if (ret != 0 && ret != REG_NOMATCH) { terrno = TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR; - (void)regerror(ret, &(*pUsingRegex)->pRegex, msgbuf, sizeof(msgbuf)); + (void)regerror(ret, regex, msgbuf, sizeof(msgbuf)); uDebug("Failed to match %s with pattern %s, reason %s", pString, pPattern, msgbuf) } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index d66628d46d..1eab9365f5 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -106,6 +106,7 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) { } destroyThreadLocalGeosCtx(); + DestoryThreadLocalRegComp(); return NULL; } @@ -237,6 +238,7 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) { taosUpdateItemSize(qinfo.queue, 1); } + DestoryThreadLocalRegComp(); return NULL; } diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index e8aabfe338..0d8774ba41 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -119,6 +119,13 @@ add_test( COMMAND bufferTest ) +add_executable(regexTest "regexTest.cpp") +target_link_libraries(regexTest os util gtest_main ) +add_test( + NAME regexTest + COMMAND regexTest +) + #add_executable(decompressTest "decompressTest.cpp") #target_link_libraries(decompressTest os util common gtest_main) #add_test( From 7d44466603ac26dd8cd532cf4bf1c751f65b2f4f Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Mon, 5 Aug 2024 12:30:05 +0800 Subject: [PATCH 02/13] utest --- source/util/test/regexTest.cpp | 257 +++++++++++++++++++++++++++++++++ 1 file changed, 257 insertions(+) create mode 100644 source/util/test/regexTest.cpp diff --git a/source/util/test/regexTest.cpp b/source/util/test/regexTest.cpp new file mode 100644 index 0000000000..26857bce88 --- /dev/null +++ b/source/util/test/regexTest.cpp @@ -0,0 +1,257 @@ + +#include +#include +#include +#include +#include +#include "os.h" +#include "tutil.h" +#include "regex.h" +#include "osDef.h" +#include "tcompare.h" + +extern "C" { + typedef struct UsingRegex UsingRegex; + UsingRegex** getRegComp(const char *pPattern); + int32_t threadGetRegComp(regex_t **regex, const char *pPattern); +} + +class regexTest { + public: + regexTest() { (void)InitRegexCache(); } + ~regexTest() { (void)DestroyRegexCache(); } +}; +static regexTest test; + +static threadlocal regex_t pRegex; +static threadlocal char *pOldPattern = NULL; + +void DestoryThreadLocalRegComp1() { + if (NULL != pOldPattern) { + regfree(&pRegex); + taosMemoryFree(pOldPattern); + pOldPattern = NULL; + } +} + +static regex_t *threadGetRegComp1(const char *pPattern) { + if (NULL != pOldPattern) { + if( strcmp(pOldPattern, pPattern) == 0) { + return &pRegex; + } else { + DestoryThreadLocalRegComp1(); + } + } + pOldPattern = (char*)taosMemoryMalloc(strlen(pPattern) + 1); + if (NULL == pOldPattern) { + uError("Failed to Malloc when compile regex pattern %s.", pPattern); + return NULL; + } + strcpy(pOldPattern, pPattern); + int32_t cflags = REG_EXTENDED; + int32_t ret = regcomp(&pRegex, pPattern, cflags); + if (ret != 0) { + char msgbuf[256] = {0}; + regerror(ret, &pRegex, msgbuf, tListLen(msgbuf)); + uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf); + DestoryThreadLocalRegComp1(); + return NULL; + } + return &pRegex; +} + +TEST(testCase, regexCacheTest1) { + int times = 100000; + char s1[] = "abc"; + auto start = std::chrono::high_resolution_clock::now(); + + uint64_t t0 = taosGetTimestampUs(); + for(int i = 0; i < times; i++) { + UsingRegex** rex = getRegComp(s1); + } + uint64_t t1 = taosGetTimestampUs(); + + printf("%s regex(current) %d times:%" PRIu64 " us.\n", s1, times, t1 - t0); + + uint64_t t2 = taosGetTimestampUs(); + for(int i = 0; i < times; i++) { + regex_t* rex = threadGetRegComp1(s1); + } + uint64_t t3 = taosGetTimestampUs(); + + printf("%s regex(before) %d times:%" PRIu64 " us.\n", s1, times, t3 - t2); + + t2 = taosGetTimestampUs(); + for(int i = 0; i < times; i++) { + regex_t* rex = NULL; + (void)threadGetRegComp(&rex, s1); + } + t3 = taosGetTimestampUs(); + + printf("%s regex(new) %d times:%" PRIu64 " us.\n", s1, times, t3 - t2); +} + +TEST(testCase, regexCacheTest2) { + int times = 100000; + char s1[] = "abc%*"; + auto start = std::chrono::high_resolution_clock::now(); + + uint64_t t0 = taosGetTimestampUs(); + for(int i = 0; i < times; i++) { + UsingRegex** rex = getRegComp(s1); + } + uint64_t t1 = taosGetTimestampUs(); + + printf("%s regex(current) %d times:%" PRIu64 " us.\n", s1, times, t1 - t0); + + uint64_t t2 = taosGetTimestampUs(); + for(int i = 0; i < times; i++) { + regex_t* rex = threadGetRegComp1(s1); + } + uint64_t t3 = taosGetTimestampUs(); + + printf("%s regex(before) %d times:%" PRIu64 " us.\n", s1, times, t3 - t2); + + t2 = taosGetTimestampUs(); + for(int i = 0; i < times; i++) { + regex_t* rex = NULL; + (void)threadGetRegComp(&rex, s1); + } + t3 = taosGetTimestampUs(); + + printf("%s regex(new) %d times:%" PRIu64 " us.\n", s1, times, t3 - t2); +} + +TEST(testCase, regexCacheTest3) { + int times = 100000; + char s1[] = "abc%*"; + char s2[] = "abc"; + auto start = std::chrono::high_resolution_clock::now(); + + uint64_t t0 = taosGetTimestampUs(); + for(int i = 0; i < times; i++) { + UsingRegex** rex = getRegComp(s1); + rex = getRegComp(s2); + } + uint64_t t1 = taosGetTimestampUs(); + + printf("'%s' and '%s' take place by turn regex(current) %d times:%" PRIu64 " us.\n", s1, s2, times, t1 - t0); + + uint64_t t2 = taosGetTimestampUs(); + for(int i = 0; i < times; i++) { + regex_t* rex = threadGetRegComp1(s1); + rex = threadGetRegComp1(s2); + } + uint64_t t3 = taosGetTimestampUs(); + + printf("'%s' and '%s' take place by turn regex(before) %d times:%" PRIu64 " us.\n", s1, s2, times, t3 - t2); + + t2 = taosGetTimestampUs(); + for(int i = 0; i < times; i++) { + regex_t* rex = NULL; + (void)threadGetRegComp(&rex, s1); + (void)threadGetRegComp(&rex, s2); + } + t3 = taosGetTimestampUs(); + + printf("'%s' and '%s' take place by turn regex(new) %d times:%" PRIu64 " us.\n", s1, s2, times, t3 - t2); +} + +TEST(testCase, regexCacheTest4) { + int times = 100; + int count = 1000; + char s1[] = "abc%*"; + char s2[] = "abc"; + auto start = std::chrono::high_resolution_clock::now(); + + uint64_t t0 = taosGetTimestampUs(); + for (int i = 0; i < times; i++) { + for (int j = 0; j < count; ++j) { + UsingRegex** rex = getRegComp(s1); + } + for (int j = 0; j < count; ++j) { + UsingRegex** rex = getRegComp(s2); + } + } + uint64_t t1 = taosGetTimestampUs(); + + printf("'%s' and '%s' take place by turn(per %d count) regex(current) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t1 - t0); + + uint64_t t2 = taosGetTimestampUs(); + for (int i = 0; i < times; i++) { + for (int j = 0; j < count; ++j) { + regex_t* rex = threadGetRegComp1(s1); + } + for (int j = 0; j < count; ++j) { + regex_t* rex = threadGetRegComp1(s2); + } + } + uint64_t t3 = taosGetTimestampUs(); + + printf("'%s' and '%s' take place by turn(per %d count) regex(before) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t3 - t2); + + t2 = taosGetTimestampUs(); + for (int i = 0; i < times; i++) { + for (int j = 0; j < count; ++j) { + regex_t* rex = NULL; + (void)threadGetRegComp(&rex, s1); + } + for (int j = 0; j < count; ++j) { + regex_t* rex = NULL; + (void)threadGetRegComp(&rex, s2); + } + } + t3 = taosGetTimestampUs(); + + printf("'%s' and '%s' take place by turn(per %d count) regex(new) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t3 - t2); +} +/* +TEST(testCase, regexCacheTest5) { + int times = 10000; + int count = 10000; + char s1[] = "abc%*"; + char s2[] = "abc"; + auto start = std::chrono::high_resolution_clock::now(); + + uint64_t t0 = taosGetTimestampUs(); + for (int i = 0; i < times; i++) { + for (int j = 0; j < count; ++j) { + UsingRegex** rex = getRegComp(s1); + } + for (int j = 0; j < count; ++j) { + UsingRegex** rex = getRegComp(s2); + } + } + uint64_t t1 = taosGetTimestampUs(); + + printf("'%s' and '%s' take place by turn(per %d count) regex(current) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t1 - t0); + + uint64_t t2 = taosGetTimestampUs(); + for (int i = 0; i < times; i++) { + for (int j = 0; j < count; ++j) { + regex_t* rex = threadGetRegComp1(s1); + } + for (int j = 0; j < count; ++j) { + regex_t* rex = threadGetRegComp1(s2); + } + } + uint64_t t3 = taosGetTimestampUs(); + + printf("'%s' and '%s' take place by turn(per %d count) regex(before) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t3 - t2); + + t2 = taosGetTimestampUs(); + for (int i = 0; i < times; i++) { + for (int j = 0; j < count; ++j) { + regex_t* rex = NULL; + (void)threadGetRegComp(&rex, s1); + } + for (int j = 0; j < count; ++j) { + regex_t* rex = NULL; + (void)threadGetRegComp(&rex, s2); + } + } + t3 = taosGetTimestampUs(); + + printf("'%s' and '%s' take place by turn(per %d count) regex(new) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t3 - t2); +} +*/ From cc4dccfd914d903ccfa8f251648dcc703af3f9f7 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Mon, 5 Aug 2024 13:42:04 +0800 Subject: [PATCH 03/13] mem leak --- source/util/src/tworker.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 1eab9365f5..27e5ea290f 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -666,6 +666,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { } destroyThreadLocalGeosCtx(); + DestoryThreadLocalRegComp(); return NULL; } From a61149047f9fc0a8ddc665817a14eb3fdc635dcc Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Mon, 5 Aug 2024 14:42:30 +0800 Subject: [PATCH 04/13] use direct pointer --- source/util/src/tcompare.c | 19 ++++++++----- source/util/test/regexTest.cpp | 52 +++++++++++++++++++++++++++++++++- 2 files changed, 63 insertions(+), 8 deletions(-) diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index 90c1441af5..af2205c590 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1296,6 +1296,7 @@ UsingRegex **getRegComp(const char *pPattern) { (*ppUsingRegex)->lastUsedTime = taosGetTimestampSec(); return ppUsingRegex; } + printf("getRegComp , ..."); UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex)); if (pUsingRegex == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1340,12 +1341,15 @@ void releaseRegComp(UsingRegex **regex){ taosHashRelease(sRegexCache.regexHash, regex); } -static threadlocal UsingRegex ** ppRegex; +static threadlocal UsingRegex ** ppUsingRegex; +static threadlocal regex_t * pRegex; static threadlocal char *pOldPattern = NULL; void DestoryThreadLocalRegComp() { if (NULL != pOldPattern) { - releaseRegComp(ppRegex); + releaseRegComp(ppUsingRegex); taosMemoryFree(pOldPattern); + ppUsingRegex = NULL; + pRegex = NULL; pOldPattern = NULL; } } @@ -1353,15 +1357,15 @@ void DestoryThreadLocalRegComp() { int32_t threadGetRegComp(regex_t **regex, const char *pPattern) { if (NULL != pOldPattern) { if (strcmp(pOldPattern, pPattern) == 0) { - *regex = &(*ppRegex)->pRegex; + *regex = pRegex; return 0; } else { DestoryThreadLocalRegComp(); } } - UsingRegex **pUsingRegex = getRegComp(pPattern); - if (pUsingRegex == NULL) { + UsingRegex **ppRegex = getRegComp(pPattern); + if (ppRegex == NULL) { return 1; } pOldPattern = (char *)taosMemoryMalloc(strlen(pPattern) + 1); @@ -1370,8 +1374,9 @@ int32_t threadGetRegComp(regex_t **regex, const char *pPattern) { return TSDB_CODE_OUT_OF_MEMORY; } strcpy(pOldPattern, pPattern); - ppRegex = pUsingRegex; - *regex = &(*pUsingRegex)->pRegex; + ppUsingRegex = ppRegex; + pRegex = &((*ppUsingRegex)->pRegex); + *regex = &(*ppRegex)->pRegex; return 0; } diff --git a/source/util/test/regexTest.cpp b/source/util/test/regexTest.cpp index 26857bce88..3cd8f5c88c 100644 --- a/source/util/test/regexTest.cpp +++ b/source/util/test/regexTest.cpp @@ -205,7 +205,8 @@ TEST(testCase, regexCacheTest4) { printf("'%s' and '%s' take place by turn(per %d count) regex(new) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t3 - t2); } -/* + +/* It is not a good idea to test this case, because it will take a long time. TEST(testCase, regexCacheTest5) { int times = 10000; int count = 10000; @@ -254,4 +255,53 @@ TEST(testCase, regexCacheTest5) { printf("'%s' and '%s' take place by turn(per %d count) regex(new) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t3 - t2); } + +TEST(testCase, regexCacheTest6) { + int times = 10000; + int count = 1000; + char s1[] = "abc%*"; + char s2[] = "abc"; + auto start = std::chrono::high_resolution_clock::now(); + + uint64_t t0 = taosGetTimestampUs(); + for (int i = 0; i < times; i++) { + for (int j = 0; j < count; ++j) { + UsingRegex** rex = getRegComp(s1); + } + for (int j = 0; j < count; ++j) { + UsingRegex** rex = getRegComp(s2); + } + } + uint64_t t1 = taosGetTimestampUs(); + + printf("'%s' and '%s' take place by turn(per %d count) regex(current) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t1 - t0); + + uint64_t t2 = taosGetTimestampUs(); + for (int i = 0; i < times; i++) { + for (int j = 0; j < count; ++j) { + regex_t* rex = threadGetRegComp1(s1); + } + for (int j = 0; j < count; ++j) { + regex_t* rex = threadGetRegComp1(s2); + } + } + uint64_t t3 = taosGetTimestampUs(); + + printf("'%s' and '%s' take place by turn(per %d count) regex(before) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t3 - t2); + + t2 = taosGetTimestampUs(); + for (int i = 0; i < times; i++) { + for (int j = 0; j < count; ++j) { + regex_t* rex = NULL; + (void)threadGetRegComp(&rex, s1); + } + for (int j = 0; j < count; ++j) { + regex_t* rex = NULL; + (void)threadGetRegComp(&rex, s2); + } + } + t3 = taosGetTimestampUs(); + + printf("'%s' and '%s' take place by turn(per %d count) regex(new) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t3 - t2); +} */ From 53d115f74adad6ee71560a3633042369f91bf83c Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Mon, 5 Aug 2024 19:30:22 +0800 Subject: [PATCH 05/13] fix: skip dub key error --- source/util/src/tcompare.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index af2205c590..d949bdcc07 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1321,6 +1321,8 @@ UsingRegex **getRegComp(const char *pPattern) { uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern); terrno = code; return NULL; + } else if (code == TSDB_CODE_DUP_KEY) { + terrno = 0; } ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); if (ppUsingRegex) { From 3f25f121873eb3efefbdafd44dab972f1e6fea49 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Tue, 6 Aug 2024 13:35:39 +0800 Subject: [PATCH 06/13] fix: cache free --- source/util/src/tcompare.c | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index d949bdcc07..a870aa1e29 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1213,14 +1213,30 @@ typedef struct RegexCache { SHashObj *regexHash; void *regexCacheTmr; void *timer; + TdThreadMutex mutex; + bool exit; } RegexCache; static RegexCache sRegexCache; #define MAX_REGEX_CACHE_SIZE 20 #define REGEX_CACHE_CLEAR_TIME 30 static void checkRegexCache(void* param, void* tmrId) { + int32_t code = 0; + code = taosThreadMutexLock(&sRegexCache.mutex); + if (code != 0) { + uError("[regex cache] checkRegexCache, Failed to lock mutex"); + return; + } + if(sRegexCache.exit) { + code = taosThreadMutexUnlock(&sRegexCache.mutex); + if(code != 0) { + uError("[regex cache] checkRegexCache, Failed to unlock mutex"); + } + return; + } (void)taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, sRegexCache.regexCacheTmr, &tmrId); if (taosHashGetSize(sRegexCache.regexHash) < MAX_REGEX_CACHE_SIZE) { + taosThreadMutexUnlock(&sRegexCache.mutex); return; } @@ -1235,6 +1251,10 @@ static void checkRegexCache(void* param, void* tmrId) { ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); } } + code = taosThreadMutexUnlock(&sRegexCache.mutex); + if(code != 0) { + uError("[regex cache] checkRegexCache, Failed to unlock mutex"); + } } void regexCacheFree(void *ppUsingRegex) { @@ -1262,14 +1282,31 @@ int32_t InitRegexCache() { return -1; } + if (taosThreadMutexInit(&sRegexCache.mutex, NULL) != 0) { + uError("failed to init mutex"); + return -1; + } + sRegexCache.exit = false; + return 0; } void DestroyRegexCache(){ + int32_t code = 0; uInfo("[regex cache] destory regex cache"); + code = taosThreadMutexLock(&sRegexCache.mutex); + if (code != 0) { + uError("[regex cache] Failed to lock mutex"); + return; + } (void)taosTmrStopA(&sRegexCache.timer); + sRegexCache.exit = true; taosHashCleanup(sRegexCache.regexHash); taosTmrCleanUp(sRegexCache.regexCacheTmr); + code = taosThreadMutexUnlock(&sRegexCache.mutex); + if (code != 0) { + uError("[regex cache] Failed to unlock mutex"); + } } int32_t checkRegexPattern(const char *pPattern) { @@ -1296,7 +1333,6 @@ UsingRegex **getRegComp(const char *pPattern) { (*ppUsingRegex)->lastUsedTime = taosGetTimestampSec(); return ppUsingRegex; } - printf("getRegComp , ..."); UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex)); if (pUsingRegex == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; From 8989a3986504d847ba18569027d5046d56a14bfb Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Tue, 6 Aug 2024 13:44:42 +0800 Subject: [PATCH 07/13] fix: free cache --- source/util/src/tcompare.c | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index a870aa1e29..f6878ee7c2 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1228,16 +1228,11 @@ static void checkRegexCache(void* param, void* tmrId) { return; } if(sRegexCache.exit) { - code = taosThreadMutexUnlock(&sRegexCache.mutex); - if(code != 0) { - uError("[regex cache] checkRegexCache, Failed to unlock mutex"); - } - return; + goto _exit; } (void)taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, sRegexCache.regexCacheTmr, &tmrId); if (taosHashGetSize(sRegexCache.regexHash) < MAX_REGEX_CACHE_SIZE) { - taosThreadMutexUnlock(&sRegexCache.mutex); - return; + goto _exit; } if (taosHashGetSize(sRegexCache.regexHash) >= MAX_REGEX_CACHE_SIZE) { @@ -1251,6 +1246,7 @@ static void checkRegexCache(void* param, void* tmrId) { ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); } } +_exit: code = taosThreadMutexUnlock(&sRegexCache.mutex); if(code != 0) { uError("[regex cache] checkRegexCache, Failed to unlock mutex"); @@ -1276,30 +1272,29 @@ int32_t InitRegexCache() { return -1; } + sRegexCache.exit = false; + if (taosThreadMutexInit(&sRegexCache.mutex, NULL) != 0) { + uError("failed to init mutex"); + return -1; + } sRegexCache.timer = taosTmrStart(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, NULL, sRegexCache.regexCacheTmr); if (sRegexCache.timer == NULL) { uError("failed to start regex cache timer"); return -1; } - if (taosThreadMutexInit(&sRegexCache.mutex, NULL) != 0) { - uError("failed to init mutex"); - return -1; - } - sRegexCache.exit = false; - return 0; } void DestroyRegexCache(){ int32_t code = 0; uInfo("[regex cache] destory regex cache"); + (void)taosTmrStopA(&sRegexCache.timer); code = taosThreadMutexLock(&sRegexCache.mutex); if (code != 0) { uError("[regex cache] Failed to lock mutex"); return; } - (void)taosTmrStopA(&sRegexCache.timer); sRegexCache.exit = true; taosHashCleanup(sRegexCache.regexHash); taosTmrCleanUp(sRegexCache.regexCacheTmr); From 3adaebd2a07209c9507e93568ef05dc6528c7cc9 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Tue, 6 Aug 2024 16:20:29 +0800 Subject: [PATCH 08/13] return value --- source/libs/scalar/src/scalar.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index d08f358ce0..9428f051aa 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -904,9 +904,8 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp terrno = TSDB_CODE_SUCCESS; SCL_ERR_JRET(OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC)); - SCL_ERR_JRET(terrno); -_return: +_return: sclFreeParamList(params, paramNum); SCL_RET(code); } From 152758b72d40dbeaabeb7b39f3e85137f0c89bdb Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Tue, 6 Aug 2024 16:28:18 +0800 Subject: [PATCH 09/13] use taosStrdup --- source/util/src/tcompare.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index f6878ee7c2..f9bea3b947 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1401,12 +1401,11 @@ int32_t threadGetRegComp(regex_t **regex, const char *pPattern) { if (ppRegex == NULL) { return 1; } - pOldPattern = (char *)taosMemoryMalloc(strlen(pPattern) + 1); + pOldPattern = taosStrdup(pPattern); if (NULL == pOldPattern) { uError("Failed to Malloc when compile regex pattern %s.", pPattern); return TSDB_CODE_OUT_OF_MEMORY; } - strcpy(pOldPattern, pPattern); ppUsingRegex = ppRegex; pRegex = &((*ppUsingRegex)->pRegex); *regex = &(*ppRegex)->pRegex; From 07cbdee93c6b7d79e9bb8accc976fa6724cadfff Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Wed, 7 Aug 2024 10:43:04 +0800 Subject: [PATCH 10/13] fix: mutex and return value --- source/util/src/tcompare.c | 58 ++++++++++---------------- source/util/test/regexTest.cpp | 75 +++++++++++++++++++++++++--------- 2 files changed, 77 insertions(+), 56 deletions(-) diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index f9bea3b947..6971b91ff1 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1208,12 +1208,13 @@ typedef struct UsingRegex { regex_t pRegex; int32_t lastUsedTime; } UsingRegex; +typedef UsingRegex* HashRegexPtr; typedef struct RegexCache { SHashObj *regexHash; void *regexCacheTmr; void *timer; - TdThreadMutex mutex; + SRWLatch mutex; bool exit; } RegexCache; static RegexCache sRegexCache; @@ -1222,11 +1223,7 @@ static RegexCache sRegexCache; static void checkRegexCache(void* param, void* tmrId) { int32_t code = 0; - code = taosThreadMutexLock(&sRegexCache.mutex); - if (code != 0) { - uError("[regex cache] checkRegexCache, Failed to lock mutex"); - return; - } + taosRLockLatch(&sRegexCache.mutex); if(sRegexCache.exit) { goto _exit; } @@ -1247,10 +1244,7 @@ static void checkRegexCache(void* param, void* tmrId) { } } _exit: - code = taosThreadMutexUnlock(&sRegexCache.mutex); - if(code != 0) { - uError("[regex cache] checkRegexCache, Failed to unlock mutex"); - } + taosRUnLockLatch(&sRegexCache.mutex); } void regexCacheFree(void *ppUsingRegex) { @@ -1273,10 +1267,7 @@ int32_t InitRegexCache() { } sRegexCache.exit = false; - if (taosThreadMutexInit(&sRegexCache.mutex, NULL) != 0) { - uError("failed to init mutex"); - return -1; - } + taosInitRWLatch(&sRegexCache.mutex); sRegexCache.timer = taosTmrStart(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, NULL, sRegexCache.regexCacheTmr); if (sRegexCache.timer == NULL) { uError("failed to start regex cache timer"); @@ -1290,18 +1281,11 @@ void DestroyRegexCache(){ int32_t code = 0; uInfo("[regex cache] destory regex cache"); (void)taosTmrStopA(&sRegexCache.timer); - code = taosThreadMutexLock(&sRegexCache.mutex); - if (code != 0) { - uError("[regex cache] Failed to lock mutex"); - return; - } + taosWLockLatch(&sRegexCache.mutex); sRegexCache.exit = true; taosHashCleanup(sRegexCache.regexHash); taosTmrCleanUp(sRegexCache.regexCacheTmr); - code = taosThreadMutexUnlock(&sRegexCache.mutex); - if (code != 0) { - uError("[regex cache] Failed to unlock mutex"); - } + taosWUnLockLatch(&sRegexCache.mutex); } int32_t checkRegexPattern(const char *pPattern) { @@ -1322,17 +1306,17 @@ int32_t checkRegexPattern(const char *pPattern) { return TSDB_CODE_SUCCESS; } -UsingRegex **getRegComp(const char *pPattern) { - UsingRegex **ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); +int32_t getRegComp(const char *pPattern, HashRegexPtr **regexRet) { + HashRegexPtr* ppUsingRegex = (HashRegexPtr*)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); if (ppUsingRegex != NULL) { (*ppUsingRegex)->lastUsedTime = taosGetTimestampSec(); - return ppUsingRegex; + *regexRet = ppUsingRegex; + return TSDB_CODE_SUCCESS; } UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex)); if (pUsingRegex == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; uError("Failed to Malloc when compile regex pattern %s.", pPattern); - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } int32_t cflags = REG_EXTENDED; int32_t ret = regcomp(&pUsingRegex->pRegex, pPattern, cflags); @@ -1341,8 +1325,7 @@ UsingRegex **getRegComp(const char *pPattern) { (void)regerror(ret, &pUsingRegex->pRegex, msgbuf, tListLen(msgbuf)); uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf); taosMemoryFree(pUsingRegex); - terrno = TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR; - return NULL; + return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR; } while (true) { @@ -1350,8 +1333,7 @@ UsingRegex **getRegComp(const char *pPattern) { if (code != 0 && code != TSDB_CODE_DUP_KEY) { regexCacheFree(&pUsingRegex); uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern); - terrno = code; - return NULL; + return code; } else if (code == TSDB_CODE_DUP_KEY) { terrno = 0; } @@ -1367,7 +1349,8 @@ UsingRegex **getRegComp(const char *pPattern) { } } pUsingRegex->lastUsedTime = taosGetTimestampSec(); - return ppUsingRegex; + *regexRet = ppUsingRegex; + return TSDB_CODE_SUCCESS; } void releaseRegComp(UsingRegex **regex){ @@ -1397,14 +1380,15 @@ int32_t threadGetRegComp(regex_t **regex, const char *pPattern) { } } - UsingRegex **ppRegex = getRegComp(pPattern); - if (ppRegex == NULL) { - return 1; + HashRegexPtr *ppRegex = NULL; + int32_t code = getRegComp(pPattern, &ppRegex); + if (code != TSDB_CODE_SUCCESS) { + return code; } pOldPattern = taosStrdup(pPattern); if (NULL == pOldPattern) { uError("Failed to Malloc when compile regex pattern %s.", pPattern); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } ppUsingRegex = ppRegex; pRegex = &((*ppUsingRegex)->pRegex); diff --git a/source/util/test/regexTest.cpp b/source/util/test/regexTest.cpp index 3cd8f5c88c..5fe3701700 100644 --- a/source/util/test/regexTest.cpp +++ b/source/util/test/regexTest.cpp @@ -12,7 +12,8 @@ extern "C" { typedef struct UsingRegex UsingRegex; - UsingRegex** getRegComp(const char *pPattern); + typedef struct HashRegexPtr HashRegexPtr; + int32_t getRegComp(const char *pPattern, HashRegexPtr **regexRet); int32_t threadGetRegComp(regex_t **regex, const char *pPattern); } @@ -66,8 +67,12 @@ TEST(testCase, regexCacheTest1) { auto start = std::chrono::high_resolution_clock::now(); uint64_t t0 = taosGetTimestampUs(); - for(int i = 0; i < times; i++) { - UsingRegex** rex = getRegComp(s1); + for (int i = 0; i < times; i++) { + HashRegexPtr* ret = NULL; + int32_t code = getRegComp(s1, &ret); + if (code != 0) { + FAIL() << "Failed to compile regex pattern " << s1; + } } uint64_t t1 = taosGetTimestampUs(); @@ -97,8 +102,12 @@ TEST(testCase, regexCacheTest2) { auto start = std::chrono::high_resolution_clock::now(); uint64_t t0 = taosGetTimestampUs(); - for(int i = 0; i < times; i++) { - UsingRegex** rex = getRegComp(s1); + for (int i = 0; i < times; i++) { + HashRegexPtr* ret = NULL; + int32_t code = getRegComp(s1, &ret); + if (code != 0) { + FAIL() << "Failed to compile regex pattern " << s1; + } } uint64_t t1 = taosGetTimestampUs(); @@ -129,9 +138,12 @@ TEST(testCase, regexCacheTest3) { auto start = std::chrono::high_resolution_clock::now(); uint64_t t0 = taosGetTimestampUs(); - for(int i = 0; i < times; i++) { - UsingRegex** rex = getRegComp(s1); - rex = getRegComp(s2); + for (int i = 0; i < times; i++) { + HashRegexPtr* ret = NULL; + int32_t code = getRegComp(s1, &ret); + if (code != 0) { + FAIL() << "Failed to compile regex pattern " << s1; + } } uint64_t t1 = taosGetTimestampUs(); @@ -167,10 +179,18 @@ TEST(testCase, regexCacheTest4) { uint64_t t0 = taosGetTimestampUs(); for (int i = 0; i < times; i++) { for (int j = 0; j < count; ++j) { - UsingRegex** rex = getRegComp(s1); + HashRegexPtr* ret = NULL; + int32_t code = getRegComp(s1, &ret); + if (code != 0) { + FAIL() << "Failed to compile regex pattern " << s1; + } } for (int j = 0; j < count; ++j) { - UsingRegex** rex = getRegComp(s2); + HashRegexPtr* ret = NULL; + int32_t code = getRegComp(s2, &ret); + if (code != 0) { + FAIL() << "Failed to compile regex pattern " << s2; + } } } uint64_t t1 = taosGetTimestampUs(); @@ -206,7 +226,8 @@ TEST(testCase, regexCacheTest4) { printf("'%s' and '%s' take place by turn(per %d count) regex(new) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t3 - t2); } -/* It is not a good idea to test this case, because it will take a long time. +// It is not a good idea to test this case, because it will take a long time. +/* TEST(testCase, regexCacheTest5) { int times = 10000; int count = 10000; @@ -217,10 +238,18 @@ TEST(testCase, regexCacheTest5) { uint64_t t0 = taosGetTimestampUs(); for (int i = 0; i < times; i++) { for (int j = 0; j < count; ++j) { - UsingRegex** rex = getRegComp(s1); + HashRegexPtr* ret = NULL; + int32_t code = getRegComp(s1, &ret); + if (code != 0) { + FAIL() << "Failed to compile regex pattern " << s1; + } } for (int j = 0; j < count; ++j) { - UsingRegex** rex = getRegComp(s2); + HashRegexPtr* ret = NULL; + int32_t code = getRegComp(s2, &ret); + if (code != 0) { + FAIL() << "Failed to compile regex pattern " << s2; + } } } uint64_t t1 = taosGetTimestampUs(); @@ -257,19 +286,27 @@ TEST(testCase, regexCacheTest5) { } TEST(testCase, regexCacheTest6) { - int times = 10000; - int count = 1000; - char s1[] = "abc%*"; - char s2[] = "abc"; + int times = 10000; + int count = 1000; + char s1[] = "abc%*"; + char s2[] = "abc"; auto start = std::chrono::high_resolution_clock::now(); uint64_t t0 = taosGetTimestampUs(); for (int i = 0; i < times; i++) { for (int j = 0; j < count; ++j) { - UsingRegex** rex = getRegComp(s1); + HashRegexPtr* ret = NULL; + int32_t code = getRegComp(s1, &ret); + if (code != 0) { + FAIL() << "Failed to compile regex pattern " << s1; + } } for (int j = 0; j < count; ++j) { - UsingRegex** rex = getRegComp(s2); + HashRegexPtr* ret = NULL; + int32_t code = getRegComp(s2, &ret); + if (code != 0) { + FAIL() << "Failed to compile regex pattern " << s2; + } } } uint64_t t1 = taosGetTimestampUs(); From bec6dfe16b11c5ddfaf3065ecd732de2fa3070bc Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Wed, 7 Aug 2024 16:51:39 +0800 Subject: [PATCH 11/13] return value --- source/libs/command/src/command.c | 6 +- source/libs/command/src/explain.c | 6 +- source/libs/function/src/tudf.c | 27 +++++---- source/libs/function/src/udfd.c | 94 +++++++++++++++++++++++++------ source/libs/qcom/src/querymsg.c | 8 +-- source/util/src/tcompare.c | 27 ++++----- 6 files changed, 112 insertions(+), 56 deletions(-) diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 2811402ea1..11ddc89d4c 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -38,7 +38,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN; *pRsp = taosMemoryCalloc(1, rspSize); if (NULL == *pRsp) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } (*pRsp)->useconds = 0; @@ -289,7 +289,7 @@ static int32_t buildRetension(SArray* pRetension, char **ppRetentions ) { char* p1 = taosMemoryCalloc(1, 100); if(NULL == p1) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } int32_t len = 0; @@ -849,7 +849,7 @@ _return: static int32_t buildLocalVariablesResultDataBlock(SSDataBlock** pOutput) { SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); if (NULL == pBlock) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } pBlock->info.hasVarCol = true; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 8d9f1fb9cc..2bfed58dc8 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -123,7 +123,7 @@ int32_t qExplainInitCtx(SExplainCtx **pCtx, SHashObj *groupHash, bool verbose, d SExplainCtx *ctx = taosMemoryCalloc(1, sizeof(SExplainCtx)); if (NULL == ctx) { qError("calloc SExplainCtx failed"); - QRY_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + goto _return; } SArray *rows = taosArrayInit(10, sizeof(SQueryExplainRowInfo)); @@ -227,7 +227,7 @@ int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplai SExplainResNode *resNode = taosMemoryCalloc(1, sizeof(SExplainResNode)); if (NULL == resNode) { qError("calloc SPhysiNodeExplainRes failed"); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } int32_t code = 0; @@ -1971,7 +1971,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize); if (NULL == rsp) { qError("malloc SRetrieveTableRsp failed, size:%d", rspSize); - QRY_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + goto _return; } rsp->completed = 1; diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index ad9e5ce7d4..9a751db801 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -162,7 +162,7 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { fnInfo("[UDFD]Succsess to set TAOS_FQDN:%s", taosFqdn); } else { fnError("[UDFD]Failed to allocate memory for TAOS_FQDN"); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } } @@ -837,10 +837,13 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfBlock->numOfRows = block->info.rows; udfBlock->numOfCols = taosArrayGetSize(block->pDataBlock); udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *)); + if((udfBlock->udfCols) == NULL) { + return terrno; + } for (int32_t i = 0; i < udfBlock->numOfCols; ++i) { udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn)); if(udfBlock->udfCols[i] == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i); SUdfColumn *udfCol = udfBlock->udfCols[i]; @@ -854,13 +857,13 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows; udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen); if(udfCol->colData.varLenCol.varOffsets == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen); udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows); udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen); if(udfCol->colData.varLenCol.payload == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } if (col->reassigned) { for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) { @@ -882,7 +885,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen; udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen); if(udfCol->colData.fixLenCol.nullBitmap == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } char *bitmap = udfCol->colData.fixLenCol.nullBitmap; memcpy(bitmap, col->nullbitmap, bitmapLen); @@ -985,7 +988,7 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) { output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData)); if(output->columnData == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData)); output->colAlloced = true; @@ -1724,7 +1727,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { if(conn == NULL) { fnError("udfc event loop start connect task malloc conn failed."); taosMemoryFree(pipe); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } conn->pipe = pipe; conn->readBuf.len = 0; @@ -1954,7 +1957,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode)); if(uvTask == NULL) { fnError("udfc client task: %p failed to allocate memory for uvTask", task); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe); @@ -1986,13 +1989,13 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); if(task == NULL) { fnError("doSetupUdf, failed to allocate memory for task"); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession)); if(task->session == NULL) { fnError("doSetupUdf, failed to allocate memory for session"); taosMemoryFree(task); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } task->session->udfc = &gUdfcProxy; task->type = UDF_TASK_SETUP; @@ -2037,7 +2040,7 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); if(task == NULL) { fnError("udfc call udf. failed to allocate memory for task"); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } task->session = (SUdfcUvSession *)handle; task->type = UDF_TASK_CALL; @@ -2169,7 +2172,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { if(task == NULL) { fnError("doTeardownUdf, failed to allocate memory for task"); taosMemoryFree(session); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } task->session = session; task->type = UDF_TASK_TEARDOWN; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 7339f115a3..25adfd11d8 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -409,6 +409,10 @@ int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { int16_t lenPythonPath = strlen(tsUdfdLdLibPath) + strlen(global.udfDataDir) + 1 + 1; // global.udfDataDir:tsUdfdLdLibPath char *pythonPath = taosMemoryMalloc(lenPythonPath); + if(pythonPath == NULL) { + uv_dlclose(&plugin->lib); + return terrno; + } #ifdef WINDOWS snprintf(pythonPath, lenPythonPath, "%s;%s", global.udfDataDir, tsUdfdLdLibPath); #else @@ -705,6 +709,10 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { uv_mutex_unlock(&udf->lock); } SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle)); + if(handle == NULL) { + fnError("udfdProcessSetupRequest: malloc failed."); + code = terrno; + } handle->udf = udf; _send: @@ -775,7 +783,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { if (outBuf.buf != NULL) { code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx); } else { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; } subRsp->resultBuf = outBuf; break; @@ -784,9 +792,13 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfDataBlock input = {0}; if (convertDataBlockToUdfDataBlock(&call->block, &input) == TSDB_CODE_SUCCESS) { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx); - freeUdfInterBuf(&call->interBuf); - subRsp->resultBuf = outBuf; + if (outBuf.buf != NULL) { + code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx); + freeUdfInterBuf(&call->interBuf); + subRsp->resultBuf = outBuf; + } else { + code = terrno; + } } freeUdfDataDataBlock(&input); @@ -794,18 +806,27 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { } case TSDB_UDF_CALL_AGG_MERGE: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx); - freeUdfInterBuf(&call->interBuf); - freeUdfInterBuf(&call->interBuf2); - subRsp->resultBuf = outBuf; + if (outBuf.buf != NULL) { + code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx); + freeUdfInterBuf(&call->interBuf); + freeUdfInterBuf(&call->interBuf2); + subRsp->resultBuf = outBuf; + } else { + code = terrno; + } break; } case TSDB_UDF_CALL_AGG_FIN: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - code = udf->scriptPlugin->udfAggFinishFunc(&call->interBuf, &outBuf, udf->scriptUdfCtx); - freeUdfInterBuf(&call->interBuf); - subRsp->resultBuf = outBuf; + if (outBuf.buf != NULL) { + code = udf->scriptPlugin->udfAggFinishFunc(&call->interBuf, &outBuf, udf->scriptUdfCtx); + freeUdfInterBuf(&call->interBuf); + subRsp->resultBuf = outBuf; + } else { + code = terrno; + } + break; } default: @@ -820,19 +841,24 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { int32_t len = encodeUdfResponse(NULL, rsp); if(len < 0) { fnError("udfdProcessCallRequest: encode udf response failed. len %d", len); - return; + goto _exit; } rsp->msgLen = len; void *bufBegin = taosMemoryMalloc(len); + if (bufBegin == NULL) { + fnError("udfdProcessCallRequest: malloc failed. len %d", len); + goto _exit; + } void *buf = bufBegin; if(encodeUdfResponse(&buf, rsp) < 0) { fnError("udfdProcessCallRequest: encode udf response failed. len %d", len); taosMemoryFree(bufBegin); - return; + goto _exit; } uvUdf->output = uv_buf_init(bufBegin, len); +_exit: switch (call->callType) { case TSDB_UDF_CALL_SCALA_PROC: { blockDataFreeRes(&call->block); @@ -906,6 +932,10 @@ _send: } rsp->msgLen = len; void *bufBegin = taosMemoryMalloc(len); + if(bufBegin == NULL) { + fnError("udfdProcessTeardownRequest: malloc failed. len %d", len); + return; + } void *buf = bufBegin; if (encodeUdfResponse(&buf, rsp) < 0) { fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len); @@ -1210,6 +1240,11 @@ void udfdSendResponse(uv_work_t *work, int status) { if (udfWork->conn != NULL) { uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t)); + if(write_req == NULL) { + fnError("udfd send response error, malloc failed"); + taosMemoryFree(work); + return; + } write_req->data = udfWork; int32_t code = uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite); if (code != 0) { @@ -1269,7 +1304,16 @@ void udfdHandleRequest(SUdfdUvConn *conn) { int32_t inputLen = conn->inputLen; uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t)); + if(work == NULL) { + fnError("udfd malloc work failed"); + return; + } SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork)); + if(udfWork == NULL) { + fnError("udfd malloc udf work failed"); + taosMemoryFree(work); + return; + } udfWork->conn = conn; udfWork->pWorkNext = conn->pWorkList; conn->pWorkList = udfWork; @@ -1334,6 +1378,10 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { int32_t code = 0; uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t)); + if(client == NULL) { + fnError("udfd pipe malloc failed"); + return; + } code = uv_pipe_init(global.loop, client, 0); if (code) { fnError("udfd pipe init error %s", uv_strerror(code)); @@ -1342,6 +1390,10 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { } if (uv_accept(server, (uv_stream_t *)client) == 0) { SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn)); + if(ctx == NULL) { + fnError("udfd conn malloc failed"); + goto _exit; + } ctx->pWorkList = NULL; ctx->client = (uv_stream_t *)client; ctx->inputBuf = 0; @@ -1356,9 +1408,11 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { taosMemoryFree(ctx); taosMemoryFree(client); } - } else { - uv_close((uv_handle_t *)client, NULL); + return; } +_exit: + uv_close((uv_handle_t *)client, NULL); + taosMemoryFree(client); } void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { @@ -1411,6 +1465,10 @@ static int32_t udfdInitLog() { void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { buf->base = taosMemoryMalloc(suggested_size); + if (buf->base == NULL) { + fnError("udfd ctrl pipe alloc buffer failed"); + return; + } buf->len = suggested_size; } @@ -1477,13 +1535,13 @@ static int32_t udfdGlobalDataInit() { uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t)); if (loop == NULL) { fnError("udfd init uv loop failed, mem overflow"); - return -1; + return terrno; } global.loop = loop; if (uv_mutex_init(&global.scriptPluginsMutex) != 0) { fnError("udfd init script plugins mutex failed"); - return -1; + return TSDB_CODE_UDF_UV_EXEC_FAILURE; } global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); @@ -1494,7 +1552,7 @@ static int32_t udfdGlobalDataInit() { if (uv_mutex_init(&global.udfsMutex) != 0) { fnError("udfd init udfs mutex failed"); - return -2; + return TSDB_CODE_UDF_UV_EXEC_FAILURE; } return 0; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index e8deed1df9..207bd91bd9 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -34,7 +34,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { pOut->dbVgroup = taosMemoryCalloc(1, sizeof(SDBVgInfo)); if (NULL == pOut->dbVgroup) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } pOut->dbVgroup->vgVersion = usedbRsp->vgVersion; @@ -509,7 +509,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta * STableMeta *pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize); if (NULL == pTableMeta) { qError("calloc size[%d] failed", metaSize); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } SSchemaExt *pSchemaExt = (SSchemaExt *)((char *)pTableMeta + metaSize); @@ -764,7 +764,7 @@ int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) { STableCfgRsp *out = taosMemoryCalloc(1, sizeof(STableCfgRsp)); if(out == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } if (tDeserializeSTableCfgRsp(msg, msgSize, out) != 0) { qError("tDeserializeSTableCfgRsp failed, msgSize:%d", msgSize); @@ -785,7 +785,7 @@ int32_t queryProcessGetViewMetaRsp(void *output, char *msg, int32_t msgSize) { SViewMetaRsp *out = taosMemoryCalloc(1, sizeof(SViewMetaRsp)); if (out == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } if (tDeserializeSViewMetaRsp(msg, msgSize, out) != 0) { qError("tDeserializeSViewMetaRsp failed, msgSize:%d", msgSize); diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index 6971b91ff1..670a70a309 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1256,14 +1256,13 @@ int32_t InitRegexCache() { sRegexCache.regexHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (sRegexCache.regexHash == NULL) { uError("failed to create RegexCache"); - return -1; + return terrno; } taosHashSetFreeFp(sRegexCache.regexHash, regexCacheFree); sRegexCache.regexCacheTmr = taosTmrInit(0, 0, 0, "REGEXCACHE"); if (sRegexCache.regexCacheTmr == NULL) { uError("failed to create regex cache check timer"); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno; } sRegexCache.exit = false; @@ -1271,10 +1270,10 @@ int32_t InitRegexCache() { sRegexCache.timer = taosTmrStart(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, NULL, sRegexCache.regexCacheTmr); if (sRegexCache.timer == NULL) { uError("failed to start regex cache timer"); - return -1; + return terrno; } - return 0; + return TSDB_CODE_SUCCESS; } void DestroyRegexCache(){ @@ -1316,7 +1315,7 @@ int32_t getRegComp(const char *pPattern, HashRegexPtr **regexRet) { UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex)); if (pUsingRegex == NULL) { uError("Failed to Malloc when compile regex pattern %s.", pPattern); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } int32_t cflags = REG_EXTENDED; int32_t ret = regcomp(&pUsingRegex->pRegex, pPattern, cflags); @@ -1421,8 +1420,7 @@ int32_t comparestrRegexMatch(const void *pLeft, const void *pRight) { size_t sz = varDataLen(pRight); char *pattern = taosMemoryMalloc(sz + 1); if (NULL == pattern) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return 1; + return 1; // terrno has been set } (void)memcpy(pattern, varDataVal(pRight), varDataLen(pRight)); @@ -1432,8 +1430,7 @@ int32_t comparestrRegexMatch(const void *pLeft, const void *pRight) { char *str = taosMemoryMalloc(sz + 1); if (NULL == str) { taosMemoryFree(pattern); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return 1; + return 1; // terrno has been set } (void)memcpy(str, varDataVal(pLeft), sz); @@ -1451,14 +1448,13 @@ int32_t comparewcsRegexMatch(const void *pString, const void *pPattern) { size_t len = varDataLen(pPattern); char *pattern = taosMemoryMalloc(len + 1); if (NULL == pattern) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return 1; + return 1; // terrno has been set } int convertLen = taosUcs4ToMbs((TdUcs4 *)varDataVal(pPattern), len, pattern); if (convertLen < 0) { taosMemoryFree(pattern); - return (terrno = TSDB_CODE_APP_ERROR); + return 1; // terrno has been set } pattern[convertLen] = 0; @@ -1467,15 +1463,14 @@ int32_t comparewcsRegexMatch(const void *pString, const void *pPattern) { char *str = taosMemoryMalloc(len + 1); if (NULL == str) { taosMemoryFree(pattern); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return 1; + return 1; // terrno has been set } convertLen = taosUcs4ToMbs((TdUcs4 *)varDataVal(pString), len, str); if (convertLen < 0) { taosMemoryFree(str); taosMemoryFree(pattern); - return (terrno = TSDB_CODE_APP_ERROR); + return 1; // terrno has been set } str[convertLen] = 0; From 5dbf50c555b2785b7dee4a3ef6fbab575b68c8c3 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Thu, 8 Aug 2024 09:49:22 +0800 Subject: [PATCH 12/13] fix: return code --- source/libs/command/src/explain.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 2bfed58dc8..6a3df7be72 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -123,6 +123,7 @@ int32_t qExplainInitCtx(SExplainCtx **pCtx, SHashObj *groupHash, bool verbose, d SExplainCtx *ctx = taosMemoryCalloc(1, sizeof(SExplainCtx)); if (NULL == ctx) { qError("calloc SExplainCtx failed"); + QRY_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); goto _return; } @@ -1971,6 +1972,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize); if (NULL == rsp) { qError("malloc SRetrieveTableRsp failed, size:%d", rspSize); + QRY_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); goto _return; } From 6e820d28ab66507e4b0a01b03346714698438f90 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Thu, 8 Aug 2024 13:41:47 +0800 Subject: [PATCH 13/13] unused code --- source/libs/command/src/explain.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 6a3df7be72..3a73c05de2 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -124,7 +124,6 @@ int32_t qExplainInitCtx(SExplainCtx **pCtx, SHashObj *groupHash, bool verbose, d if (NULL == ctx) { qError("calloc SExplainCtx failed"); QRY_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - goto _return; } SArray *rows = taosArrayInit(10, sizeof(SQueryExplainRowInfo)); @@ -1973,7 +1972,6 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { if (NULL == rsp) { qError("malloc SRetrieveTableRsp failed, size:%d", rspSize); QRY_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - goto _return; } rsp->completed = 1;