From cfda747613ec9f50e18bf29c96a32d81da893242 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 11 Jul 2024 21:00:08 +0800 Subject: [PATCH] fix: match cache release --- source/libs/scalar/src/filter.c | 6 +- source/libs/scalar/src/scalar.c | 4 +- source/libs/scalar/src/sclvector.c | 9 +- source/util/src/tcompare.c | 150 +++++++++++++++-------------- tests/system-test/2-query/match.py | 17 +--- 5 files changed, 93 insertions(+), 93 deletions(-) diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 72e38c7a0d..0a59cd2219 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3217,6 +3217,7 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes continue; } + terrno = TSDB_CODE_SUCCESS; void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); // match/nmatch for nchar type need convert from ucs4 to mbs if (info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR && @@ -3235,6 +3236,7 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes p[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, colData, info->cunits[uidx].valData); } + if (terrno != TSDB_CODE_SUCCESS) break; if (p[i] == 0) { all = false; @@ -3358,8 +3360,9 @@ int32_t filterSetExecFunc(SFilterInfo *info) { return TSDB_CODE_SUCCESS; } + terrno = TSDB_CODE_SUCCESS; info->func = filterExecuteImplMisc; - return TSDB_CODE_SUCCESS; + return terrno; } int32_t filterPreprocess(SFilterInfo *info) { @@ -4744,6 +4747,7 @@ int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, } bool keepAll = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified); + if (terrno != TSDB_CODE_SUCCESS) return terrno; // todo this should be return during filter procedure if (keepAll) { diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 50de5e760d..f9b9f3bcdb 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -1650,8 +1650,8 @@ static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) { return TSDB_CODE_TSC_INVALID_OPERATION; } SDataType rdt = ((SExprNode *)(pOp->pRight))->resType; - if (ldt.type == TSDB_DATA_TYPE_VARBINARY || !IS_VAR_DATA_TYPE(ldt.type) || QUERY_NODE_VALUE != nodeType(pOp->pRight) || - (!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) { + if (QUERY_NODE_VALUE != nodeType(pOp->pRight) || + (rdt.type != TSDB_DATA_TYPE_NCHAR && rdt.type != TSDB_DATA_TYPE_VARCHAR && rdt.type != TSDB_DATA_TYPE_NULL)) { return TSDB_CODE_TSC_INVALID_OPERATION; } } diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 673919b2f5..39c6a0cd67 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -1667,7 +1667,6 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa } } else { for (int32_t i = startIndex; i < numOfRows && i >= 0; i += step) { - if (terrno != TSDB_CODE_SUCCESS) break; int32_t leftIndex = (i >= pLeft->numOfRows) ? 0 : i; int32_t rightIndex = (i >= pRight->numOfRows) ? 0 : i; @@ -1676,11 +1675,11 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa pRes[i] = false; continue; } - char *pLeftData = colDataGetData(pLeft->columnData, leftIndex); char *pRightData = colDataGetData(pRight->columnData, rightIndex); - + terrno = TSDB_CODE_SUCCESS; pRes[i] = filterDoCompare(fp, optr, pLeftData, pRightData); + if (terrno != TSDB_CODE_SUCCESS) break; if (pRes[i]) { ++num; } @@ -1689,7 +1688,6 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa } else { // if (GET_PARAM_TYPE(pLeft) == TSDB_DATA_TYPE_JSON || GET_PARAM_TYPE(pRight) == TSDB_DATA_TYPE_JSON) { for (int32_t i = startIndex; i < numOfRows && i >= startIndex; i += step) { - if (terrno != TSDB_CODE_SUCCESS) break; int32_t leftIndex = (i >= pLeft->numOfRows) ? 0 : i; int32_t rightIndex = (i >= pRight->numOfRows) ? 0 : i; @@ -1716,7 +1714,7 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa if (!pLeftData || !pRightData) { result = false; } - + terrno = TSDB_CODE_SUCCESS; if (!result) { colDataSetInt8(pOut->columnData, i, (int8_t *)&result); } else { @@ -1726,6 +1724,7 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa ++num; } } + if (terrno != TSDB_CODE_SUCCESS) break; if (freeLeft) { taosMemoryFreeClear(pLeftData); diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index fde07b4ba8..ebc6f860a4 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -24,6 +24,7 @@ #include "tutil.h" #include "types.h" #include "osString.h" +#include "ttimer.h" int32_t setChkInBytes1(const void *pLeft, const void *pRight) { return NULL != taosHashGet((SHashObj *)pRight, pLeft, 1) ? 1 : 0; @@ -1205,77 +1206,87 @@ int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight) { typedef struct UsingRegex { regex_t pRegex; - int32_t usingCount; int32_t lastUsedTime; } UsingRegex; typedef struct RegexCache { SHashObj *regexHash; int32_t regexCaheSize; - TdThreadRwlock regexLock; + void *regexCacheTimer; + void *timer; int32_t lastClearTime; } RegexCache; static RegexCache sRegexCache; #define MAX_REGEX_CACHE_SIZE 20 #define REGEX_CACHE_CLEAR_TIME 30 -int32_t InitRegexCache() { - if (taosThreadRwlockInit(&sRegexCache.regexLock, NULL) != 0) { - uError("failed to create RegexCache lock"); - return -1; - } - sRegexCache.regexHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - if (sRegexCache.regexHash == NULL) { - uError("failed to create RegexCache"); - return -1; - } - sRegexCache.regexCaheSize = MAX_REGEX_CACHE_SIZE; - sRegexCache.lastClearTime = taosGetTimestampSec(); - return 0; -} - -void DestroyRegexCache(){ - UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL); - while ((ppUsingRegex != NULL)) { - regfree(&(*ppUsingRegex)->pRegex); - taosMemoryFree(*ppUsingRegex); - ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); - } - taosHashCleanup(sRegexCache.regexHash); - taosThreadRwlockDestroy(&sRegexCache.regexLock); -} - -static void clearOlderRegex() { +static void checkRegexCache(void* param, void* tmrId) { if (taosGetTimestampSec() - sRegexCache.lastClearTime < REGEX_CACHE_CLEAR_TIME || taosHashGetSize(sRegexCache.regexHash) < sRegexCache.regexCaheSize) { return; } - taosThreadRwlockWrlock(&sRegexCache.regexLock); + if (taosHashGetSize(sRegexCache.regexHash) >= sRegexCache.regexCaheSize) { UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL); while ((ppUsingRegex != NULL)) { - if ((*ppUsingRegex)->usingCount == 0 && - taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) { - regfree(&(*ppUsingRegex)->pRegex); - taosMemoryFree(*ppUsingRegex); + if (taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) { taosHashRelease(sRegexCache.regexHash, ppUsingRegex); sRegexCache.lastClearTime = taosGetTimestampSec(); } ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); } } - taosThreadRwlockUnlock(&sRegexCache.regexLock); + + taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, NULL, &tmrId); } -static UsingRegex *getRegComp(const char *pPattern) { - taosThreadRwlockRdlock(&sRegexCache.regexLock); - UsingRegex **ppUsingRegex = (UsingRegex **)taosHashGet(sRegexCache.regexHash, pPattern, strlen(pPattern)); - if (ppUsingRegex != NULL) { - (*ppUsingRegex)->usingCount++; - taosThreadRwlockUnlock(&sRegexCache.regexLock); - return *ppUsingRegex; +void regexCacheFree(void *ppUsingRegex) { + regfree(&(*(UsingRegex **)ppUsingRegex)->pRegex); + taosMemoryFree(*(UsingRegex **)ppUsingRegex); +} + +int32_t InitRegexCache() { + sRegexCache.regexHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (sRegexCache.regexHash == NULL) { + uError("failed to create RegexCache"); + return -1; + } + taosHashSetFreeFp(sRegexCache.regexHash, regexCacheFree); + sRegexCache.regexCaheSize = MAX_REGEX_CACHE_SIZE; + sRegexCache.lastClearTime = taosGetTimestampSec(); + + sRegexCache.regexCacheTimer = taosTmrInit(0, 0, 0, "REGEXCACHE"); + if (sRegexCache.regexCacheTimer == NULL) { + uError("failed to create regex cache check timer"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + sRegexCache.timer = taosTmrStart(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, NULL, sRegexCache.regexCacheTimer); + if (sRegexCache.timer == NULL) { + uError("failed to start regex cache timer"); + return -1; + } + + return 0; +} + +void DestroyRegexCache(){ + UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL); + taosTmrStopA(&sRegexCache.timer); + while ((ppUsingRegex != NULL)) { + regexCacheFree(ppUsingRegex); + ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); + } + taosHashCleanup(sRegexCache.regexHash); +} + +static UsingRegex **getRegComp(const char *pPattern) { + UsingRegex **ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); + if (ppUsingRegex != NULL) { + (*ppUsingRegex)->lastUsedTime = taosGetTimestampSec(); + return ppUsingRegex; } - taosThreadRwlockUnlock(&sRegexCache.regexLock); UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex)); if (pUsingRegex == NULL) { @@ -1294,54 +1305,49 @@ static UsingRegex *getRegComp(const char *pPattern) { return NULL; } - taosThreadRwlockWrlock(&sRegexCache.regexLock); - int code = taosHashPut(sRegexCache.regexHash, pPattern, strlen(pPattern), &pUsingRegex, sizeof(UsingRegex *)); - if (code != 0) { - if( terrno == TSDB_CODE_DUP_KEY) { - regfree(&pUsingRegex->pRegex); - taosMemoryFree(pUsingRegex); - - UsingRegex **ppUsingRegex = (UsingRegex **)taosHashGet(sRegexCache.regexHash, pPattern, strlen(pPattern)); - if(ppUsingRegex) { - pUsingRegex = (*ppUsingRegex); + while (true) { + int code = taosHashPut(sRegexCache.regexHash, pPattern, strlen(pPattern), &pUsingRegex, sizeof(UsingRegex *)); + if (code != 0) { + if (terrno == TSDB_CODE_DUP_KEY) { + ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); + if (ppUsingRegex) { + if (*ppUsingRegex != pUsingRegex) { + regexCacheFree(&pUsingRegex); + } + pUsingRegex = (*ppUsingRegex); + break; + } else { + continue; + } } else { - uError("Failed to get regex pattern %s from cache, exception internal error.", pPattern); - taosThreadRwlockUnlock(&sRegexCache.regexLock); - terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; - return NULL; - } - } else { + regexCacheFree(&pUsingRegex); uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern); - taosThreadRwlockUnlock(&sRegexCache.regexLock); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return NULL; + } } } - atomic_add_fetch_32(&pUsingRegex->usingCount, 1); - taosThreadRwlockUnlock(&sRegexCache.regexLock); - - clearOlderRegex(); - return pUsingRegex; + pUsingRegex->lastUsedTime = taosGetTimestampSec(); + return ppUsingRegex; } -void recycleRegex(UsingRegex *regex){ - atomic_add_fetch_32(®ex->usingCount, -1); - regex->lastUsedTime = taosGetTimestampSec(); +void releaseRegComp(UsingRegex **regex){ + taosHashRelease(sRegexCache.regexHash, regex); } static int32_t doExecRegexMatch(const char *pString, const char *pPattern) { int32_t ret = 0; char msgbuf[256] = {0}; - UsingRegex *pUsingRegex = getRegComp(pPattern); + UsingRegex **pUsingRegex = getRegComp(pPattern); if (pUsingRegex == NULL) { return 1; } regmatch_t pmatch[1]; - ret = regexec(&pUsingRegex->pRegex, pString, 1, pmatch, 0); - recycleRegex(pUsingRegex); + ret = regexec(&(*pUsingRegex)->pRegex, pString, 1, pmatch, 0); + releaseRegComp(pUsingRegex); if (ret != 0 && ret != REG_NOMATCH) { - regerror(ret, &pUsingRegex->pRegex, msgbuf, sizeof(msgbuf)); + regerror(ret, &(*pUsingRegex)->pRegex, msgbuf, sizeof(msgbuf)); uDebug("Failed to match %s with pattern %s, reason %s", pString, pPattern, msgbuf) } diff --git a/tests/system-test/2-query/match.py b/tests/system-test/2-query/match.py index 2e47f079e2..62de846cb7 100644 --- a/tests/system-test/2-query/match.py +++ b/tests/system-test/2-query/match.py @@ -38,18 +38,6 @@ class TDTestCase: def stopTest(self): tdSql.execute("drop database if exists db") - def insertData(self, threadID): - cursor = self.conn.cursor() - print("Thread %d: starting" % threadID) - base = 200000 * threadID - for i in range(200): - query = "insert into tb values" - for j in range(1000): - query += "(%d, %d, 'test')" % (self.ts + base + i * 1000 + j, base + i * 1000 + j) - cursor.execute(query) - cursor.close() - print("Thread %d: finishing" % threadID) - def threadTest(self, threadID): print(f"Thread {threadID} starting...") tdsqln = tdCom.newTdSql() @@ -68,6 +56,8 @@ class TDTestCase: tdsqln.query("select * from db.t1x where c1 match '%__c'") tdsqln.checkRows(0) + + tdsqln.error("select * from db.t1x where c1 match '*d'") print(f"Thread {threadID} finished.") @@ -91,7 +81,7 @@ class TDTestCase: tdSql.query("select * from db.t1x where c1 match '%__c'") tdSql.checkRows(0) - + tdSql.error("select * from db.t1x where c1 match '*d'") threads = [] for i in range(10): t = threading.Thread(target=self.threadTest, args=(i,)) @@ -122,6 +112,7 @@ class TDTestCase: tdSql.checkRows(2) tdSql.query("select * from db.t3x where c1 match '中文'") tdSql.checkRows(5) + tdSql.error("select * from db.t1x where c1 match '*d'") for thread in threads: print(f"Thread waitting for finish...")