use regex cache
This commit is contained in:
parent
076fd07a26
commit
41b8693430
|
@ -836,6 +836,7 @@ int32_t taosGetErrSize();
|
|||
#define TSDB_CODE_PAR_TBNAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267E)
|
||||
#define TSDB_CODE_PAR_TAG_NAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267F)
|
||||
#define TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC TAOS_DEF_ERROR_CODE(0, 0x2680)
|
||||
#define TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR TAOS_DEF_ERROR_CODE(0, 0x2681)
|
||||
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
|
||||
|
||||
//planner
|
||||
|
|
|
@ -45,6 +45,8 @@ typedef struct SPatternCompareInfo {
|
|||
TdUcs4 umatchOne; // unicode version matchOne
|
||||
} SPatternCompareInfo;
|
||||
|
||||
int32_t InitRegexCache();
|
||||
void DestroyRegexCache();
|
||||
int32_t patternMatch(const char *pattern, size_t psize, const char *str, size_t ssize, const SPatternCompareInfo *pInfo);
|
||||
|
||||
int32_t wcsPatternMatch(const TdUcs4 *pattern, size_t psize, const TdUcs4 *str, size_t ssize, const SPatternCompareInfo *pInfo);
|
||||
|
@ -83,7 +85,6 @@ int32_t compareLenBinaryVal(const void *pLeft, const void *pRight);
|
|||
|
||||
int32_t comparestrRegexMatch(const void *pLeft, const void *pRight);
|
||||
int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight);
|
||||
void DestoryThreadLocalRegComp();
|
||||
|
||||
int32_t comparewcsRegexMatch(const void *pLeft, const void *pRight);
|
||||
int32_t comparewcsRegexNMatch(const void *pLeft, const void *pRight);
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "audit.h"
|
||||
#include "libs/function/tudf.h"
|
||||
#include "tgrant.h"
|
||||
#include "tcompare.h"
|
||||
|
||||
#define DM_INIT_AUDIT() \
|
||||
do { \
|
||||
|
@ -163,6 +164,7 @@ int32_t dmInit() {
|
|||
if (dmInitMonitor() != 0) return -1;
|
||||
if (dmInitAudit() != 0) return -1;
|
||||
if (dmInitDnode(dmInstance()) != 0) return -1;
|
||||
if (InitRegexCache() != 0) return -1;
|
||||
#if defined(USE_S3)
|
||||
if (s3Begin() != 0) return -1;
|
||||
#endif
|
||||
|
@ -192,6 +194,7 @@ void dmCleanup() {
|
|||
udfStopUdfd();
|
||||
taosStopCacheRefreshWorker();
|
||||
dmDiskClose();
|
||||
DestroyRegexCache();
|
||||
|
||||
#if defined(USE_S3)
|
||||
s3End();
|
||||
|
|
|
@ -223,7 +223,9 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
|||
return "Tag name:%s duplicated";
|
||||
case TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC:
|
||||
return "Some functions cannot appear in the select list at the same time";
|
||||
default:
|
||||
case TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR:
|
||||
return "Syntax error in regular expression";
|
||||
default:
|
||||
return "Unknown error";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1667,6 +1667,7 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
|
|||
}
|
||||
} else {
|
||||
for (int32_t i = startIndex; i < numOfRows && i >= 0; i += step) {
|
||||
if (terrno != TSDB_CODE_SUCCESS) break;
|
||||
int32_t leftIndex = (i >= pLeft->numOfRows) ? 0 : i;
|
||||
int32_t rightIndex = (i >= pRight->numOfRows) ? 0 : i;
|
||||
|
||||
|
@ -1688,6 +1689,7 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
|
|||
} else {
|
||||
// if (GET_PARAM_TYPE(pLeft) == TSDB_DATA_TYPE_JSON || GET_PARAM_TYPE(pRight) == TSDB_DATA_TYPE_JSON) {
|
||||
for (int32_t i = startIndex; i < numOfRows && i >= startIndex; i += step) {
|
||||
if (terrno != TSDB_CODE_SUCCESS) break;
|
||||
int32_t leftIndex = (i >= pLeft->numOfRows) ? 0 : i;
|
||||
int32_t rightIndex = (i >= pRight->numOfRows) ? 0 : i;
|
||||
|
||||
|
|
|
@ -1203,54 +1203,142 @@ int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight) {
|
|||
return comparestrRegexMatch(pLeft, pRight) ? 0 : 1;
|
||||
}
|
||||
|
||||
static threadlocal regex_t pRegex;
|
||||
static threadlocal char *pOldPattern = NULL;
|
||||
static regex_t *threadGetRegComp(const char *pPattern) {
|
||||
if (NULL != pOldPattern) {
|
||||
if( strcmp(pOldPattern, pPattern) == 0) {
|
||||
return &pRegex;
|
||||
} else {
|
||||
DestoryThreadLocalRegComp();
|
||||
typedef struct UsingRegex {
|
||||
regex_t pRegex;
|
||||
int32_t usingCount;
|
||||
int32_t lastUsedTime;
|
||||
} UsingRegex;
|
||||
|
||||
typedef struct RegexCache {
|
||||
SHashObj *regexHash;
|
||||
int32_t regexCaheSize;
|
||||
TdThreadRwlock regexLock;
|
||||
int32_t lastClearTime;
|
||||
} RegexCache;
|
||||
static RegexCache sRegexCache;
|
||||
#define MAX_REGEX_CACHE_SIZE 20
|
||||
#define REGEX_CACHE_CLEAR_TIME 30
|
||||
|
||||
int32_t InitRegexCache() {
|
||||
if (taosThreadRwlockInit(&sRegexCache.regexLock, NULL) != 0) {
|
||||
uError("failed to create RegexCache lock");
|
||||
return -1;
|
||||
}
|
||||
sRegexCache.regexHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
if (sRegexCache.regexHash == NULL) {
|
||||
uError("failed to create RegexCache");
|
||||
return -1;
|
||||
}
|
||||
sRegexCache.regexCaheSize = MAX_REGEX_CACHE_SIZE;
|
||||
sRegexCache.lastClearTime = taosGetTimestampSec();
|
||||
return 0;
|
||||
}
|
||||
|
||||
void DestroyRegexCache(){
|
||||
UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL);
|
||||
while ((ppUsingRegex != NULL)) {
|
||||
regfree(&(*ppUsingRegex)->pRegex);
|
||||
taosMemoryFree(*ppUsingRegex);
|
||||
ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex);
|
||||
}
|
||||
taosHashCleanup(sRegexCache.regexHash);
|
||||
taosThreadRwlockDestroy(&sRegexCache.regexLock);
|
||||
}
|
||||
|
||||
static void clearOlderRegex() {
|
||||
if (taosGetTimestampSec() - sRegexCache.lastClearTime < REGEX_CACHE_CLEAR_TIME ||
|
||||
taosHashGetSize(sRegexCache.regexHash) < sRegexCache.regexCaheSize) {
|
||||
return;
|
||||
}
|
||||
taosThreadRwlockWrlock(&sRegexCache.regexLock);
|
||||
if (taosHashGetSize(sRegexCache.regexHash) >= sRegexCache.regexCaheSize) {
|
||||
UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL);
|
||||
while ((ppUsingRegex != NULL)) {
|
||||
if ((*ppUsingRegex)->usingCount == 0 &&
|
||||
taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) {
|
||||
regfree(&(*ppUsingRegex)->pRegex);
|
||||
taosMemoryFree(*ppUsingRegex);
|
||||
taosHashRelease(sRegexCache.regexHash, ppUsingRegex);
|
||||
sRegexCache.lastClearTime = taosGetTimestampSec();
|
||||
}
|
||||
ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex);
|
||||
}
|
||||
}
|
||||
pOldPattern = taosMemoryMalloc(strlen(pPattern) + 1);
|
||||
if (NULL == pOldPattern) {
|
||||
taosThreadRwlockUnlock(&sRegexCache.regexLock);
|
||||
}
|
||||
|
||||
static UsingRegex *getRegComp(const char *pPattern) {
|
||||
taosThreadRwlockRdlock(&sRegexCache.regexLock);
|
||||
UsingRegex **ppUsingRegex = (UsingRegex **)taosHashGet(sRegexCache.regexHash, pPattern, strlen(pPattern));
|
||||
if (ppUsingRegex != NULL) {
|
||||
(*ppUsingRegex)->usingCount++;
|
||||
taosThreadRwlockUnlock(&sRegexCache.regexLock);
|
||||
return *ppUsingRegex;
|
||||
}
|
||||
taosThreadRwlockUnlock(&sRegexCache.regexLock);
|
||||
|
||||
UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex));
|
||||
if (pUsingRegex == NULL) {
|
||||
uError("Failed to Malloc when compile regex pattern %s.", pPattern);
|
||||
return NULL;
|
||||
}
|
||||
strcpy(pOldPattern, pPattern);
|
||||
int32_t cflags = REG_EXTENDED;
|
||||
int32_t ret = regcomp(&pRegex, pPattern, cflags);
|
||||
int32_t ret = regcomp(&pUsingRegex->pRegex, pPattern, cflags);
|
||||
if (ret != 0) {
|
||||
char msgbuf[256] = {0};
|
||||
regerror(ret, &pRegex, msgbuf, tListLen(msgbuf));
|
||||
regerror(ret, &pUsingRegex->pRegex, msgbuf, tListLen(msgbuf));
|
||||
uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf);
|
||||
DestoryThreadLocalRegComp();
|
||||
taosMemoryFree(pUsingRegex);
|
||||
terrno = TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR;
|
||||
return NULL;
|
||||
}
|
||||
return &pRegex;
|
||||
|
||||
taosThreadRwlockWrlock(&sRegexCache.regexLock);
|
||||
int code = taosHashPut(sRegexCache.regexHash, pPattern, strlen(pPattern), &pUsingRegex, sizeof(UsingRegex *));
|
||||
if (code != 0) {
|
||||
if( terrno == TSDB_CODE_DUP_KEY) {
|
||||
regfree(&pUsingRegex->pRegex);
|
||||
taosMemoryFree(pUsingRegex);
|
||||
|
||||
UsingRegex **ppUsingRegex = (UsingRegex **)taosHashGet(sRegexCache.regexHash, pPattern, strlen(pPattern));
|
||||
if(ppUsingRegex) {
|
||||
pUsingRegex = (*ppUsingRegex);
|
||||
} else {
|
||||
uError("Failed to get regex pattern %s from cache, exception internal error.", pPattern);
|
||||
taosThreadRwlockUnlock(&sRegexCache.regexLock);
|
||||
return NULL;
|
||||
}
|
||||
} else {
|
||||
uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern);
|
||||
taosThreadRwlockUnlock(&sRegexCache.regexLock);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
atomic_add_fetch_32(&pUsingRegex->usingCount, 1);
|
||||
taosThreadRwlockUnlock(&sRegexCache.regexLock);
|
||||
|
||||
clearOlderRegex();
|
||||
return pUsingRegex;
|
||||
}
|
||||
|
||||
void DestoryThreadLocalRegComp() {
|
||||
if (NULL != pOldPattern) {
|
||||
regfree(&pRegex);
|
||||
taosMemoryFree(pOldPattern);
|
||||
pOldPattern = NULL;
|
||||
}
|
||||
void recycleRegex(UsingRegex *regex){
|
||||
atomic_add_fetch_32(®ex->usingCount, -1);
|
||||
regex->lastUsedTime = taosGetTimestampSec();
|
||||
}
|
||||
|
||||
static int32_t doExecRegexMatch(const char *pString, const char *pPattern) {
|
||||
int32_t ret = 0;
|
||||
char msgbuf[256] = {0};
|
||||
regex_t *regex = threadGetRegComp(pPattern);
|
||||
if (regex == NULL) {
|
||||
UsingRegex *pUsingRegex = getRegComp(pPattern);
|
||||
if (pUsingRegex == NULL) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
regmatch_t pmatch[1];
|
||||
ret = regexec(regex, pString, 1, pmatch, 0);
|
||||
ret = regexec(&pUsingRegex->pRegex, pString, 1, pmatch, 0);
|
||||
recycleRegex(pUsingRegex);
|
||||
if (ret != 0 && ret != REG_NOMATCH) {
|
||||
regerror(ret, regex, msgbuf, sizeof(msgbuf));
|
||||
regerror(ret, &pUsingRegex->pRegex, msgbuf, sizeof(msgbuf));
|
||||
uDebug("Failed to match %s with pattern %s, reason %s", pString, pPattern, msgbuf)
|
||||
}
|
||||
|
||||
|
|
|
@ -683,6 +683,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_ERROR, "Pseudo tag tbname n
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_DUPLICATED, "Table name duplicated")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_NAME_DUPLICATED, "Tag name duplicated")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC, "Some functions cannot appear in the select list at the same time")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
|
||||
|
||||
//planner
|
||||
|
|
|
@ -104,7 +104,6 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) {
|
|||
}
|
||||
|
||||
destroyThreadLocalGeosCtx();
|
||||
DestoryThreadLocalRegComp();
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
@ -224,7 +223,6 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
|
|||
|
||||
taosUpdateItemSize(qinfo.queue, 1);
|
||||
}
|
||||
DestoryThreadLocalRegComp();
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
@ -636,7 +634,6 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
|
|||
}
|
||||
|
||||
destroyThreadLocalGeosCtx();
|
||||
DestoryThreadLocalRegComp();
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue