fix: match cache release
This commit is contained in:
parent
c135ee8358
commit
cfda747613
|
@ -3217,6 +3217,7 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes
|
|||
continue;
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
||||
// match/nmatch for nchar type need convert from ucs4 to mbs
|
||||
if (info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR &&
|
||||
|
@ -3235,6 +3236,7 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes
|
|||
p[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, colData,
|
||||
info->cunits[uidx].valData);
|
||||
}
|
||||
if (terrno != TSDB_CODE_SUCCESS) break;
|
||||
|
||||
if (p[i] == 0) {
|
||||
all = false;
|
||||
|
@ -3358,8 +3360,9 @@ int32_t filterSetExecFunc(SFilterInfo *info) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
info->func = filterExecuteImplMisc;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t filterPreprocess(SFilterInfo *info) {
|
||||
|
@ -4744,6 +4747,7 @@ int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p,
|
|||
}
|
||||
|
||||
bool keepAll = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified);
|
||||
if (terrno != TSDB_CODE_SUCCESS) return terrno;
|
||||
|
||||
// todo this should be return during filter procedure
|
||||
if (keepAll) {
|
||||
|
|
|
@ -1650,8 +1650,8 @@ static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) {
|
|||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
SDataType rdt = ((SExprNode *)(pOp->pRight))->resType;
|
||||
if (ldt.type == TSDB_DATA_TYPE_VARBINARY || !IS_VAR_DATA_TYPE(ldt.type) || QUERY_NODE_VALUE != nodeType(pOp->pRight) ||
|
||||
(!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) {
|
||||
if (QUERY_NODE_VALUE != nodeType(pOp->pRight) ||
|
||||
(rdt.type != TSDB_DATA_TYPE_NCHAR && rdt.type != TSDB_DATA_TYPE_VARCHAR && rdt.type != TSDB_DATA_TYPE_NULL)) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1667,7 +1667,6 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
|
|||
}
|
||||
} else {
|
||||
for (int32_t i = startIndex; i < numOfRows && i >= 0; i += step) {
|
||||
if (terrno != TSDB_CODE_SUCCESS) break;
|
||||
int32_t leftIndex = (i >= pLeft->numOfRows) ? 0 : i;
|
||||
int32_t rightIndex = (i >= pRight->numOfRows) ? 0 : i;
|
||||
|
||||
|
@ -1676,11 +1675,11 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
|
|||
pRes[i] = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
char *pLeftData = colDataGetData(pLeft->columnData, leftIndex);
|
||||
char *pRightData = colDataGetData(pRight->columnData, rightIndex);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
pRes[i] = filterDoCompare(fp, optr, pLeftData, pRightData);
|
||||
if (terrno != TSDB_CODE_SUCCESS) break;
|
||||
if (pRes[i]) {
|
||||
++num;
|
||||
}
|
||||
|
@ -1689,7 +1688,6 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
|
|||
} else {
|
||||
// if (GET_PARAM_TYPE(pLeft) == TSDB_DATA_TYPE_JSON || GET_PARAM_TYPE(pRight) == TSDB_DATA_TYPE_JSON) {
|
||||
for (int32_t i = startIndex; i < numOfRows && i >= startIndex; i += step) {
|
||||
if (terrno != TSDB_CODE_SUCCESS) break;
|
||||
int32_t leftIndex = (i >= pLeft->numOfRows) ? 0 : i;
|
||||
int32_t rightIndex = (i >= pRight->numOfRows) ? 0 : i;
|
||||
|
||||
|
@ -1716,7 +1714,7 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
|
|||
if (!pLeftData || !pRightData) {
|
||||
result = false;
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
if (!result) {
|
||||
colDataSetInt8(pOut->columnData, i, (int8_t *)&result);
|
||||
} else {
|
||||
|
@ -1726,6 +1724,7 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
|
|||
++num;
|
||||
}
|
||||
}
|
||||
if (terrno != TSDB_CODE_SUCCESS) break;
|
||||
|
||||
if (freeLeft) {
|
||||
taosMemoryFreeClear(pLeftData);
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "tutil.h"
|
||||
#include "types.h"
|
||||
#include "osString.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
int32_t setChkInBytes1(const void *pLeft, const void *pRight) {
|
||||
return NULL != taosHashGet((SHashObj *)pRight, pLeft, 1) ? 1 : 0;
|
||||
|
@ -1205,77 +1206,87 @@ int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight) {
|
|||
|
||||
typedef struct UsingRegex {
|
||||
regex_t pRegex;
|
||||
int32_t usingCount;
|
||||
int32_t lastUsedTime;
|
||||
} UsingRegex;
|
||||
|
||||
typedef struct RegexCache {
|
||||
SHashObj *regexHash;
|
||||
int32_t regexCaheSize;
|
||||
TdThreadRwlock regexLock;
|
||||
void *regexCacheTimer;
|
||||
void *timer;
|
||||
int32_t lastClearTime;
|
||||
} RegexCache;
|
||||
static RegexCache sRegexCache;
|
||||
#define MAX_REGEX_CACHE_SIZE 20
|
||||
#define REGEX_CACHE_CLEAR_TIME 30
|
||||
|
||||
int32_t InitRegexCache() {
|
||||
if (taosThreadRwlockInit(&sRegexCache.regexLock, NULL) != 0) {
|
||||
uError("failed to create RegexCache lock");
|
||||
return -1;
|
||||
}
|
||||
sRegexCache.regexHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
if (sRegexCache.regexHash == NULL) {
|
||||
uError("failed to create RegexCache");
|
||||
return -1;
|
||||
}
|
||||
sRegexCache.regexCaheSize = MAX_REGEX_CACHE_SIZE;
|
||||
sRegexCache.lastClearTime = taosGetTimestampSec();
|
||||
return 0;
|
||||
}
|
||||
|
||||
void DestroyRegexCache(){
|
||||
UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL);
|
||||
while ((ppUsingRegex != NULL)) {
|
||||
regfree(&(*ppUsingRegex)->pRegex);
|
||||
taosMemoryFree(*ppUsingRegex);
|
||||
ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex);
|
||||
}
|
||||
taosHashCleanup(sRegexCache.regexHash);
|
||||
taosThreadRwlockDestroy(&sRegexCache.regexLock);
|
||||
}
|
||||
|
||||
static void clearOlderRegex() {
|
||||
static void checkRegexCache(void* param, void* tmrId) {
|
||||
if (taosGetTimestampSec() - sRegexCache.lastClearTime < REGEX_CACHE_CLEAR_TIME ||
|
||||
taosHashGetSize(sRegexCache.regexHash) < sRegexCache.regexCaheSize) {
|
||||
return;
|
||||
}
|
||||
taosThreadRwlockWrlock(&sRegexCache.regexLock);
|
||||
|
||||
if (taosHashGetSize(sRegexCache.regexHash) >= sRegexCache.regexCaheSize) {
|
||||
UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL);
|
||||
while ((ppUsingRegex != NULL)) {
|
||||
if ((*ppUsingRegex)->usingCount == 0 &&
|
||||
taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) {
|
||||
regfree(&(*ppUsingRegex)->pRegex);
|
||||
taosMemoryFree(*ppUsingRegex);
|
||||
if (taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) {
|
||||
taosHashRelease(sRegexCache.regexHash, ppUsingRegex);
|
||||
sRegexCache.lastClearTime = taosGetTimestampSec();
|
||||
}
|
||||
ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex);
|
||||
}
|
||||
}
|
||||
taosThreadRwlockUnlock(&sRegexCache.regexLock);
|
||||
|
||||
taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, NULL, &tmrId);
|
||||
}
|
||||
|
||||
static UsingRegex *getRegComp(const char *pPattern) {
|
||||
taosThreadRwlockRdlock(&sRegexCache.regexLock);
|
||||
UsingRegex **ppUsingRegex = (UsingRegex **)taosHashGet(sRegexCache.regexHash, pPattern, strlen(pPattern));
|
||||
if (ppUsingRegex != NULL) {
|
||||
(*ppUsingRegex)->usingCount++;
|
||||
taosThreadRwlockUnlock(&sRegexCache.regexLock);
|
||||
return *ppUsingRegex;
|
||||
void regexCacheFree(void *ppUsingRegex) {
|
||||
regfree(&(*(UsingRegex **)ppUsingRegex)->pRegex);
|
||||
taosMemoryFree(*(UsingRegex **)ppUsingRegex);
|
||||
}
|
||||
|
||||
int32_t InitRegexCache() {
|
||||
sRegexCache.regexHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
if (sRegexCache.regexHash == NULL) {
|
||||
uError("failed to create RegexCache");
|
||||
return -1;
|
||||
}
|
||||
taosHashSetFreeFp(sRegexCache.regexHash, regexCacheFree);
|
||||
sRegexCache.regexCaheSize = MAX_REGEX_CACHE_SIZE;
|
||||
sRegexCache.lastClearTime = taosGetTimestampSec();
|
||||
|
||||
sRegexCache.regexCacheTimer = taosTmrInit(0, 0, 0, "REGEXCACHE");
|
||||
if (sRegexCache.regexCacheTimer == NULL) {
|
||||
uError("failed to create regex cache check timer");
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
sRegexCache.timer = taosTmrStart(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, NULL, sRegexCache.regexCacheTimer);
|
||||
if (sRegexCache.timer == NULL) {
|
||||
uError("failed to start regex cache timer");
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void DestroyRegexCache(){
|
||||
UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL);
|
||||
taosTmrStopA(&sRegexCache.timer);
|
||||
while ((ppUsingRegex != NULL)) {
|
||||
regexCacheFree(ppUsingRegex);
|
||||
ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex);
|
||||
}
|
||||
taosHashCleanup(sRegexCache.regexHash);
|
||||
}
|
||||
|
||||
static UsingRegex **getRegComp(const char *pPattern) {
|
||||
UsingRegex **ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern));
|
||||
if (ppUsingRegex != NULL) {
|
||||
(*ppUsingRegex)->lastUsedTime = taosGetTimestampSec();
|
||||
return ppUsingRegex;
|
||||
}
|
||||
taosThreadRwlockUnlock(&sRegexCache.regexLock);
|
||||
|
||||
UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex));
|
||||
if (pUsingRegex == NULL) {
|
||||
|
@ -1294,54 +1305,49 @@ static UsingRegex *getRegComp(const char *pPattern) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
taosThreadRwlockWrlock(&sRegexCache.regexLock);
|
||||
int code = taosHashPut(sRegexCache.regexHash, pPattern, strlen(pPattern), &pUsingRegex, sizeof(UsingRegex *));
|
||||
if (code != 0) {
|
||||
if( terrno == TSDB_CODE_DUP_KEY) {
|
||||
regfree(&pUsingRegex->pRegex);
|
||||
taosMemoryFree(pUsingRegex);
|
||||
|
||||
UsingRegex **ppUsingRegex = (UsingRegex **)taosHashGet(sRegexCache.regexHash, pPattern, strlen(pPattern));
|
||||
if(ppUsingRegex) {
|
||||
pUsingRegex = (*ppUsingRegex);
|
||||
while (true) {
|
||||
int code = taosHashPut(sRegexCache.regexHash, pPattern, strlen(pPattern), &pUsingRegex, sizeof(UsingRegex *));
|
||||
if (code != 0) {
|
||||
if (terrno == TSDB_CODE_DUP_KEY) {
|
||||
ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern));
|
||||
if (ppUsingRegex) {
|
||||
if (*ppUsingRegex != pUsingRegex) {
|
||||
regexCacheFree(&pUsingRegex);
|
||||
}
|
||||
pUsingRegex = (*ppUsingRegex);
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
uError("Failed to get regex pattern %s from cache, exception internal error.", pPattern);
|
||||
taosThreadRwlockUnlock(&sRegexCache.regexLock);
|
||||
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
return NULL;
|
||||
}
|
||||
} else {
|
||||
regexCacheFree(&pUsingRegex);
|
||||
uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern);
|
||||
taosThreadRwlockUnlock(&sRegexCache.regexLock);
|
||||
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
atomic_add_fetch_32(&pUsingRegex->usingCount, 1);
|
||||
taosThreadRwlockUnlock(&sRegexCache.regexLock);
|
||||
|
||||
clearOlderRegex();
|
||||
return pUsingRegex;
|
||||
pUsingRegex->lastUsedTime = taosGetTimestampSec();
|
||||
return ppUsingRegex;
|
||||
}
|
||||
|
||||
void recycleRegex(UsingRegex *regex){
|
||||
atomic_add_fetch_32(®ex->usingCount, -1);
|
||||
regex->lastUsedTime = taosGetTimestampSec();
|
||||
void releaseRegComp(UsingRegex **regex){
|
||||
taosHashRelease(sRegexCache.regexHash, regex);
|
||||
}
|
||||
|
||||
static int32_t doExecRegexMatch(const char *pString, const char *pPattern) {
|
||||
int32_t ret = 0;
|
||||
char msgbuf[256] = {0};
|
||||
UsingRegex *pUsingRegex = getRegComp(pPattern);
|
||||
UsingRegex **pUsingRegex = getRegComp(pPattern);
|
||||
if (pUsingRegex == NULL) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
regmatch_t pmatch[1];
|
||||
ret = regexec(&pUsingRegex->pRegex, pString, 1, pmatch, 0);
|
||||
recycleRegex(pUsingRegex);
|
||||
ret = regexec(&(*pUsingRegex)->pRegex, pString, 1, pmatch, 0);
|
||||
releaseRegComp(pUsingRegex);
|
||||
if (ret != 0 && ret != REG_NOMATCH) {
|
||||
regerror(ret, &pUsingRegex->pRegex, msgbuf, sizeof(msgbuf));
|
||||
regerror(ret, &(*pUsingRegex)->pRegex, msgbuf, sizeof(msgbuf));
|
||||
uDebug("Failed to match %s with pattern %s, reason %s", pString, pPattern, msgbuf)
|
||||
}
|
||||
|
||||
|
|
|
@ -38,18 +38,6 @@ class TDTestCase:
|
|||
def stopTest(self):
|
||||
tdSql.execute("drop database if exists db")
|
||||
|
||||
def insertData(self, threadID):
|
||||
cursor = self.conn.cursor()
|
||||
print("Thread %d: starting" % threadID)
|
||||
base = 200000 * threadID
|
||||
for i in range(200):
|
||||
query = "insert into tb values"
|
||||
for j in range(1000):
|
||||
query += "(%d, %d, 'test')" % (self.ts + base + i * 1000 + j, base + i * 1000 + j)
|
||||
cursor.execute(query)
|
||||
cursor.close()
|
||||
print("Thread %d: finishing" % threadID)
|
||||
|
||||
def threadTest(self, threadID):
|
||||
print(f"Thread {threadID} starting...")
|
||||
tdsqln = tdCom.newTdSql()
|
||||
|
@ -68,6 +56,8 @@ class TDTestCase:
|
|||
|
||||
tdsqln.query("select * from db.t1x where c1 match '%__c'")
|
||||
tdsqln.checkRows(0)
|
||||
|
||||
tdsqln.error("select * from db.t1x where c1 match '*d'")
|
||||
|
||||
print(f"Thread {threadID} finished.")
|
||||
|
||||
|
@ -91,7 +81,7 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("select * from db.t1x where c1 match '%__c'")
|
||||
tdSql.checkRows(0)
|
||||
|
||||
tdSql.error("select * from db.t1x where c1 match '*d'")
|
||||
threads = []
|
||||
for i in range(10):
|
||||
t = threading.Thread(target=self.threadTest, args=(i,))
|
||||
|
@ -122,6 +112,7 @@ class TDTestCase:
|
|||
tdSql.checkRows(2)
|
||||
tdSql.query("select * from db.t3x where c1 match '中文'")
|
||||
tdSql.checkRows(5)
|
||||
tdSql.error("select * from db.t1x where c1 match '*d'")
|
||||
|
||||
for thread in threads:
|
||||
print(f"Thread waitting for finish...")
|
||||
|
|
Loading…
Reference in New Issue