Merge pull request #26972 from taosdata/fix/TD-29679/l2cache

Fix/td 29679/l2cache
This commit is contained in:
dapan1121 2024-08-08 16:51:44 +08:00 committed by GitHub
commit 905d1eb762
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 539 additions and 73 deletions

View File

@ -49,6 +49,7 @@ int32_t InitRegexCache();
void DestroyRegexCache();
int32_t patternMatch(const char *pattern, size_t psize, const char *str, size_t ssize, const SPatternCompareInfo *pInfo);
int32_t checkRegexPattern(const char *pPattern);
void DestoryThreadLocalRegComp();
int32_t wcsPatternMatch(const TdUcs4 *pattern, size_t psize, const TdUcs4 *str, size_t ssize, const SPatternCompareInfo *pInfo);

View File

@ -38,7 +38,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
*pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
(*pRsp)->useconds = 0;
@ -289,7 +289,7 @@ static int32_t buildRetension(SArray* pRetension, char **ppRetentions ) {
char* p1 = taosMemoryCalloc(1, 100);
if(NULL == p1) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
int32_t len = 0;
@ -849,7 +849,7 @@ _return:
static int32_t buildLocalVariablesResultDataBlock(SSDataBlock** pOutput) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (NULL == pBlock) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pBlock->info.hasVarCol = true;

View File

@ -227,7 +227,7 @@ int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplai
SExplainResNode *resNode = taosMemoryCalloc(1, sizeof(SExplainResNode));
if (NULL == resNode) {
qError("calloc SPhysiNodeExplainRes failed");
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
int32_t code = 0;

View File

@ -162,7 +162,7 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
fnInfo("[UDFD]Succsess to set TAOS_FQDN:%s", taosFqdn);
} else {
fnError("[UDFD]Failed to allocate memory for TAOS_FQDN");
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
}
@ -837,10 +837,13 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
udfBlock->numOfRows = block->info.rows;
udfBlock->numOfCols = taosArrayGetSize(block->pDataBlock);
udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *));
if((udfBlock->udfCols) == NULL) {
return terrno;
}
for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
if(udfBlock->udfCols[i] == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i);
SUdfColumn *udfCol = udfBlock->udfCols[i];
@ -854,13 +857,13 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
if(udfCol->colData.varLenCol.varOffsets == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
if(udfCol->colData.varLenCol.payload == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
if (col->reassigned) {
for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) {
@ -882,7 +885,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
if(udfCol->colData.fixLenCol.nullBitmap == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
char *bitmap = udfCol->colData.fixLenCol.nullBitmap;
memcpy(bitmap, col->nullbitmap, bitmapLen);
@ -985,7 +988,7 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData));
if(output->columnData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData));
output->colAlloced = true;
@ -1724,7 +1727,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
if(conn == NULL) {
fnError("udfc event loop start connect task malloc conn failed.");
taosMemoryFree(pipe);
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
conn->pipe = pipe;
conn->readBuf.len = 0;
@ -1954,7 +1957,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
if(uvTask == NULL) {
fnError("udfc client task: %p failed to allocate memory for uvTask", task);
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe);
@ -1986,13 +1989,13 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
if(task == NULL) {
fnError("doSetupUdf, failed to allocate memory for task");
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
if(task->session == NULL) {
fnError("doSetupUdf, failed to allocate memory for session");
taosMemoryFree(task);
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
task->session->udfc = &gUdfcProxy;
task->type = UDF_TASK_SETUP;
@ -2037,7 +2040,7 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
if(task == NULL) {
fnError("udfc call udf. failed to allocate memory for task");
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
task->session = (SUdfcUvSession *)handle;
task->type = UDF_TASK_CALL;
@ -2169,7 +2172,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
if(task == NULL) {
fnError("doTeardownUdf, failed to allocate memory for task");
taosMemoryFree(session);
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
task->session = session;
task->type = UDF_TASK_TEARDOWN;

View File

@ -409,6 +409,10 @@ int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
int16_t lenPythonPath =
strlen(tsUdfdLdLibPath) + strlen(global.udfDataDir) + 1 + 1; // global.udfDataDir:tsUdfdLdLibPath
char *pythonPath = taosMemoryMalloc(lenPythonPath);
if(pythonPath == NULL) {
uv_dlclose(&plugin->lib);
return terrno;
}
#ifdef WINDOWS
snprintf(pythonPath, lenPythonPath, "%s;%s", global.udfDataDir, tsUdfdLdLibPath);
#else
@ -705,6 +709,10 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
uv_mutex_unlock(&udf->lock);
}
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
if(handle == NULL) {
fnError("udfdProcessSetupRequest: malloc failed.");
code = terrno;
}
handle->udf = udf;
_send:
@ -775,7 +783,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
if (outBuf.buf != NULL) {
code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx);
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
}
subRsp->resultBuf = outBuf;
break;
@ -784,9 +792,13 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfDataBlock input = {0};
if (convertDataBlockToUdfDataBlock(&call->block, &input) == TSDB_CODE_SUCCESS) {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
if (outBuf.buf != NULL) {
code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx);
freeUdfInterBuf(&call->interBuf);
subRsp->resultBuf = outBuf;
} else {
code = terrno;
}
}
freeUdfDataDataBlock(&input);
@ -794,18 +806,27 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
}
case TSDB_UDF_CALL_AGG_MERGE: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
if (outBuf.buf != NULL) {
code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx);
freeUdfInterBuf(&call->interBuf);
freeUdfInterBuf(&call->interBuf2);
subRsp->resultBuf = outBuf;
} else {
code = terrno;
}
break;
}
case TSDB_UDF_CALL_AGG_FIN: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
if (outBuf.buf != NULL) {
code = udf->scriptPlugin->udfAggFinishFunc(&call->interBuf, &outBuf, udf->scriptUdfCtx);
freeUdfInterBuf(&call->interBuf);
subRsp->resultBuf = outBuf;
} else {
code = terrno;
}
break;
}
default:
@ -820,19 +841,24 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
int32_t len = encodeUdfResponse(NULL, rsp);
if(len < 0) {
fnError("udfdProcessCallRequest: encode udf response failed. len %d", len);
return;
goto _exit;
}
rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
if (bufBegin == NULL) {
fnError("udfdProcessCallRequest: malloc failed. len %d", len);
goto _exit;
}
void *buf = bufBegin;
if(encodeUdfResponse(&buf, rsp) < 0) {
fnError("udfdProcessCallRequest: encode udf response failed. len %d", len);
taosMemoryFree(bufBegin);
return;
goto _exit;
}
uvUdf->output = uv_buf_init(bufBegin, len);
_exit:
switch (call->callType) {
case TSDB_UDF_CALL_SCALA_PROC: {
blockDataFreeRes(&call->block);
@ -906,6 +932,10 @@ _send:
}
rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
if(bufBegin == NULL) {
fnError("udfdProcessTeardownRequest: malloc failed. len %d", len);
return;
}
void *buf = bufBegin;
if (encodeUdfResponse(&buf, rsp) < 0) {
fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len);
@ -1210,6 +1240,11 @@ void udfdSendResponse(uv_work_t *work, int status) {
if (udfWork->conn != NULL) {
uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
if(write_req == NULL) {
fnError("udfd send response error, malloc failed");
taosMemoryFree(work);
return;
}
write_req->data = udfWork;
int32_t code = uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite);
if (code != 0) {
@ -1269,7 +1304,16 @@ void udfdHandleRequest(SUdfdUvConn *conn) {
int32_t inputLen = conn->inputLen;
uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t));
if(work == NULL) {
fnError("udfd malloc work failed");
return;
}
SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
if(udfWork == NULL) {
fnError("udfd malloc udf work failed");
taosMemoryFree(work);
return;
}
udfWork->conn = conn;
udfWork->pWorkNext = conn->pWorkList;
conn->pWorkList = udfWork;
@ -1334,6 +1378,10 @@ void udfdOnNewConnection(uv_stream_t *server, int status) {
int32_t code = 0;
uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t));
if(client == NULL) {
fnError("udfd pipe malloc failed");
return;
}
code = uv_pipe_init(global.loop, client, 0);
if (code) {
fnError("udfd pipe init error %s", uv_strerror(code));
@ -1342,6 +1390,10 @@ void udfdOnNewConnection(uv_stream_t *server, int status) {
}
if (uv_accept(server, (uv_stream_t *)client) == 0) {
SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn));
if(ctx == NULL) {
fnError("udfd conn malloc failed");
goto _exit;
}
ctx->pWorkList = NULL;
ctx->client = (uv_stream_t *)client;
ctx->inputBuf = 0;
@ -1356,9 +1408,11 @@ void udfdOnNewConnection(uv_stream_t *server, int status) {
taosMemoryFree(ctx);
taosMemoryFree(client);
}
} else {
uv_close((uv_handle_t *)client, NULL);
return;
}
_exit:
uv_close((uv_handle_t *)client, NULL);
taosMemoryFree(client);
}
void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
@ -1411,6 +1465,10 @@ static int32_t udfdInitLog() {
void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
buf->base = taosMemoryMalloc(suggested_size);
if (buf->base == NULL) {
fnError("udfd ctrl pipe alloc buffer failed");
return;
}
buf->len = suggested_size;
}
@ -1477,13 +1535,13 @@ static int32_t udfdGlobalDataInit() {
uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t));
if (loop == NULL) {
fnError("udfd init uv loop failed, mem overflow");
return -1;
return terrno;
}
global.loop = loop;
if (uv_mutex_init(&global.scriptPluginsMutex) != 0) {
fnError("udfd init script plugins mutex failed");
return -1;
return TSDB_CODE_UDF_UV_EXEC_FAILURE;
}
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
@ -1494,7 +1552,7 @@ static int32_t udfdGlobalDataInit() {
if (uv_mutex_init(&global.udfsMutex) != 0) {
fnError("udfd init udfs mutex failed");
return -2;
return TSDB_CODE_UDF_UV_EXEC_FAILURE;
}
return 0;

View File

@ -34,7 +34,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
pOut->dbVgroup = taosMemoryCalloc(1, sizeof(SDBVgInfo));
if (NULL == pOut->dbVgroup) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pOut->dbVgroup->vgVersion = usedbRsp->vgVersion;
@ -509,7 +509,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta *
STableMeta *pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize);
if (NULL == pTableMeta) {
qError("calloc size[%d] failed", metaSize);
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
SSchemaExt *pSchemaExt = (SSchemaExt *)((char *)pTableMeta + metaSize);
@ -764,7 +764,7 @@ int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) {
STableCfgRsp *out = taosMemoryCalloc(1, sizeof(STableCfgRsp));
if(out == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
if (tDeserializeSTableCfgRsp(msg, msgSize, out) != 0) {
qError("tDeserializeSTableCfgRsp failed, msgSize:%d", msgSize);
@ -785,7 +785,7 @@ int32_t queryProcessGetViewMetaRsp(void *output, char *msg, int32_t msgSize) {
SViewMetaRsp *out = taosMemoryCalloc(1, sizeof(SViewMetaRsp));
if (out == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
if (tDeserializeSViewMetaRsp(msg, msgSize, out) != 0) {
qError("tDeserializeSViewMetaRsp failed, msgSize:%d", msgSize);

View File

@ -904,9 +904,8 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
terrno = TSDB_CODE_SUCCESS;
SCL_ERR_JRET(OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC));
SCL_ERR_JRET(terrno);
_return:
_return:
sclFreeParamList(params, paramNum);
SCL_RET(code);
}

View File

@ -1208,20 +1208,28 @@ typedef struct UsingRegex {
regex_t pRegex;
int32_t lastUsedTime;
} UsingRegex;
typedef UsingRegex* HashRegexPtr;
typedef struct RegexCache {
SHashObj *regexHash;
void *regexCacheTmr;
void *timer;
SRWLatch mutex;
bool exit;
} RegexCache;
static RegexCache sRegexCache;
#define MAX_REGEX_CACHE_SIZE 20
#define REGEX_CACHE_CLEAR_TIME 30
static void checkRegexCache(void* param, void* tmrId) {
int32_t code = 0;
taosRLockLatch(&sRegexCache.mutex);
if(sRegexCache.exit) {
goto _exit;
}
(void)taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, sRegexCache.regexCacheTmr, &tmrId);
if (taosHashGetSize(sRegexCache.regexHash) < MAX_REGEX_CACHE_SIZE) {
return;
goto _exit;
}
if (taosHashGetSize(sRegexCache.regexHash) >= MAX_REGEX_CACHE_SIZE) {
@ -1235,6 +1243,8 @@ static void checkRegexCache(void* param, void* tmrId) {
ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex);
}
}
_exit:
taosRUnLockLatch(&sRegexCache.mutex);
}
void regexCacheFree(void *ppUsingRegex) {
@ -1246,30 +1256,35 @@ int32_t InitRegexCache() {
sRegexCache.regexHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (sRegexCache.regexHash == NULL) {
uError("failed to create RegexCache");
return -1;
return terrno;
}
taosHashSetFreeFp(sRegexCache.regexHash, regexCacheFree);
sRegexCache.regexCacheTmr = taosTmrInit(0, 0, 0, "REGEXCACHE");
if (sRegexCache.regexCacheTmr == NULL) {
uError("failed to create regex cache check timer");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
return terrno;
}
sRegexCache.exit = false;
taosInitRWLatch(&sRegexCache.mutex);
sRegexCache.timer = taosTmrStart(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, NULL, sRegexCache.regexCacheTmr);
if (sRegexCache.timer == NULL) {
uError("failed to start regex cache timer");
return -1;
return terrno;
}
return 0;
return TSDB_CODE_SUCCESS;
}
void DestroyRegexCache(){
int32_t code = 0;
uInfo("[regex cache] destory regex cache");
(void)taosTmrStopA(&sRegexCache.timer);
taosWLockLatch(&sRegexCache.mutex);
sRegexCache.exit = true;
taosHashCleanup(sRegexCache.regexHash);
taosTmrCleanUp(sRegexCache.regexCacheTmr);
taosWUnLockLatch(&sRegexCache.mutex);
}
int32_t checkRegexPattern(const char *pPattern) {
@ -1290,18 +1305,17 @@ int32_t checkRegexPattern(const char *pPattern) {
return TSDB_CODE_SUCCESS;
}
static UsingRegex **getRegComp(const char *pPattern) {
UsingRegex **ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern));
int32_t getRegComp(const char *pPattern, HashRegexPtr **regexRet) {
HashRegexPtr* ppUsingRegex = (HashRegexPtr*)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern));
if (ppUsingRegex != NULL) {
(*ppUsingRegex)->lastUsedTime = taosGetTimestampSec();
return ppUsingRegex;
*regexRet = ppUsingRegex;
return TSDB_CODE_SUCCESS;
}
UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex));
if (pUsingRegex == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
uError("Failed to Malloc when compile regex pattern %s.", pPattern);
return NULL;
return terrno;
}
int32_t cflags = REG_EXTENDED;
int32_t ret = regcomp(&pUsingRegex->pRegex, pPattern, cflags);
@ -1310,8 +1324,7 @@ static UsingRegex **getRegComp(const char *pPattern) {
(void)regerror(ret, &pUsingRegex->pRegex, msgbuf, tListLen(msgbuf));
uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf);
taosMemoryFree(pUsingRegex);
terrno = TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR;
return NULL;
return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR;
}
while (true) {
@ -1319,8 +1332,9 @@ static UsingRegex **getRegComp(const char *pPattern) {
if (code != 0 && code != TSDB_CODE_DUP_KEY) {
regexCacheFree(&pUsingRegex);
uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern);
terrno = code;
return NULL;
return code;
} else if (code == TSDB_CODE_DUP_KEY) {
terrno = 0;
}
ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern));
if (ppUsingRegex) {
@ -1334,27 +1348,68 @@ static UsingRegex **getRegComp(const char *pPattern) {
}
}
pUsingRegex->lastUsedTime = taosGetTimestampSec();
return ppUsingRegex;
*regexRet = ppUsingRegex;
return TSDB_CODE_SUCCESS;
}
void releaseRegComp(UsingRegex **regex){
taosHashRelease(sRegexCache.regexHash, regex);
}
static threadlocal UsingRegex ** ppUsingRegex;
static threadlocal regex_t * pRegex;
static threadlocal char *pOldPattern = NULL;
void DestoryThreadLocalRegComp() {
if (NULL != pOldPattern) {
releaseRegComp(ppUsingRegex);
taosMemoryFree(pOldPattern);
ppUsingRegex = NULL;
pRegex = NULL;
pOldPattern = NULL;
}
}
int32_t threadGetRegComp(regex_t **regex, const char *pPattern) {
if (NULL != pOldPattern) {
if (strcmp(pOldPattern, pPattern) == 0) {
*regex = pRegex;
return 0;
} else {
DestoryThreadLocalRegComp();
}
}
HashRegexPtr *ppRegex = NULL;
int32_t code = getRegComp(pPattern, &ppRegex);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pOldPattern = taosStrdup(pPattern);
if (NULL == pOldPattern) {
uError("Failed to Malloc when compile regex pattern %s.", pPattern);
return terrno;
}
ppUsingRegex = ppRegex;
pRegex = &((*ppUsingRegex)->pRegex);
*regex = &(*ppRegex)->pRegex;
return 0;
}
static int32_t doExecRegexMatch(const char *pString, const char *pPattern) {
int32_t ret = 0;
char msgbuf[256] = {0};
UsingRegex **pUsingRegex = getRegComp(pPattern);
if (pUsingRegex == NULL) {
return 1;
regex_t *regex = NULL;
ret = threadGetRegComp(&regex, pPattern);
if (ret != 0) {
return ret;
}
regmatch_t pmatch[1];
ret = regexec(&(*pUsingRegex)->pRegex, pString, 1, pmatch, 0);
releaseRegComp(pUsingRegex);
ret = regexec(regex, pString, 1, pmatch, 0);
if (ret != 0 && ret != REG_NOMATCH) {
terrno = TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR;
(void)regerror(ret, &(*pUsingRegex)->pRegex, msgbuf, sizeof(msgbuf));
(void)regerror(ret, regex, msgbuf, sizeof(msgbuf));
uDebug("Failed to match %s with pattern %s, reason %s", pString, pPattern, msgbuf)
}
@ -1365,8 +1420,7 @@ int32_t comparestrRegexMatch(const void *pLeft, const void *pRight) {
size_t sz = varDataLen(pRight);
char *pattern = taosMemoryMalloc(sz + 1);
if (NULL == pattern) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return 1;
return 1; // terrno has been set
}
(void)memcpy(pattern, varDataVal(pRight), varDataLen(pRight));
@ -1376,8 +1430,7 @@ int32_t comparestrRegexMatch(const void *pLeft, const void *pRight) {
char *str = taosMemoryMalloc(sz + 1);
if (NULL == str) {
taosMemoryFree(pattern);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return 1;
return 1; // terrno has been set
}
(void)memcpy(str, varDataVal(pLeft), sz);
@ -1395,14 +1448,13 @@ int32_t comparewcsRegexMatch(const void *pString, const void *pPattern) {
size_t len = varDataLen(pPattern);
char *pattern = taosMemoryMalloc(len + 1);
if (NULL == pattern) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return 1;
return 1; // terrno has been set
}
int convertLen = taosUcs4ToMbs((TdUcs4 *)varDataVal(pPattern), len, pattern);
if (convertLen < 0) {
taosMemoryFree(pattern);
return (terrno = TSDB_CODE_APP_ERROR);
return 1; // terrno has been set
}
pattern[convertLen] = 0;
@ -1411,15 +1463,14 @@ int32_t comparewcsRegexMatch(const void *pString, const void *pPattern) {
char *str = taosMemoryMalloc(len + 1);
if (NULL == str) {
taosMemoryFree(pattern);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return 1;
return 1; // terrno has been set
}
convertLen = taosUcs4ToMbs((TdUcs4 *)varDataVal(pString), len, str);
if (convertLen < 0) {
taosMemoryFree(str);
taosMemoryFree(pattern);
return (terrno = TSDB_CODE_APP_ERROR);
return 1; // terrno has been set
}
str[convertLen] = 0;

View File

@ -106,6 +106,7 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) {
}
destroyThreadLocalGeosCtx();
DestoryThreadLocalRegComp();
return NULL;
}
@ -237,6 +238,7 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
taosUpdateItemSize(qinfo.queue, 1);
}
DestoryThreadLocalRegComp();
return NULL;
}
@ -664,6 +666,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
}
destroyThreadLocalGeosCtx();
DestoryThreadLocalRegComp();
return NULL;
}

View File

@ -119,6 +119,13 @@ add_test(
COMMAND bufferTest
)
add_executable(regexTest "regexTest.cpp")
target_link_libraries(regexTest os util gtest_main )
add_test(
NAME regexTest
COMMAND regexTest
)
#add_executable(decompressTest "decompressTest.cpp")
#target_link_libraries(decompressTest os util common gtest_main)
#add_test(

View File

@ -0,0 +1,344 @@
#include <chrono>
#include <cstdio>
#include <gtest/gtest.h>
#include <limits.h>
#include <taosdef.h>
#include "os.h"
#include "tutil.h"
#include "regex.h"
#include "osDef.h"
#include "tcompare.h"
extern "C" {
typedef struct UsingRegex UsingRegex;
typedef struct HashRegexPtr HashRegexPtr;
int32_t getRegComp(const char *pPattern, HashRegexPtr **regexRet);
int32_t threadGetRegComp(regex_t **regex, const char *pPattern);
}
class regexTest {
public:
regexTest() { (void)InitRegexCache(); }
~regexTest() { (void)DestroyRegexCache(); }
};
static regexTest test;
static threadlocal regex_t pRegex;
static threadlocal char *pOldPattern = NULL;
void DestoryThreadLocalRegComp1() {
if (NULL != pOldPattern) {
regfree(&pRegex);
taosMemoryFree(pOldPattern);
pOldPattern = NULL;
}
}
static regex_t *threadGetRegComp1(const char *pPattern) {
if (NULL != pOldPattern) {
if( strcmp(pOldPattern, pPattern) == 0) {
return &pRegex;
} else {
DestoryThreadLocalRegComp1();
}
}
pOldPattern = (char*)taosMemoryMalloc(strlen(pPattern) + 1);
if (NULL == pOldPattern) {
uError("Failed to Malloc when compile regex pattern %s.", pPattern);
return NULL;
}
strcpy(pOldPattern, pPattern);
int32_t cflags = REG_EXTENDED;
int32_t ret = regcomp(&pRegex, pPattern, cflags);
if (ret != 0) {
char msgbuf[256] = {0};
regerror(ret, &pRegex, msgbuf, tListLen(msgbuf));
uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf);
DestoryThreadLocalRegComp1();
return NULL;
}
return &pRegex;
}
TEST(testCase, regexCacheTest1) {
int times = 100000;
char s1[] = "abc";
auto start = std::chrono::high_resolution_clock::now();
uint64_t t0 = taosGetTimestampUs();
for (int i = 0; i < times; i++) {
HashRegexPtr* ret = NULL;
int32_t code = getRegComp(s1, &ret);
if (code != 0) {
FAIL() << "Failed to compile regex pattern " << s1;
}
}
uint64_t t1 = taosGetTimestampUs();
printf("%s regex(current) %d times:%" PRIu64 " us.\n", s1, times, t1 - t0);
uint64_t t2 = taosGetTimestampUs();
for(int i = 0; i < times; i++) {
regex_t* rex = threadGetRegComp1(s1);
}
uint64_t t3 = taosGetTimestampUs();
printf("%s regex(before) %d times:%" PRIu64 " us.\n", s1, times, t3 - t2);
t2 = taosGetTimestampUs();
for(int i = 0; i < times; i++) {
regex_t* rex = NULL;
(void)threadGetRegComp(&rex, s1);
}
t3 = taosGetTimestampUs();
printf("%s regex(new) %d times:%" PRIu64 " us.\n", s1, times, t3 - t2);
}
TEST(testCase, regexCacheTest2) {
int times = 100000;
char s1[] = "abc%*";
auto start = std::chrono::high_resolution_clock::now();
uint64_t t0 = taosGetTimestampUs();
for (int i = 0; i < times; i++) {
HashRegexPtr* ret = NULL;
int32_t code = getRegComp(s1, &ret);
if (code != 0) {
FAIL() << "Failed to compile regex pattern " << s1;
}
}
uint64_t t1 = taosGetTimestampUs();
printf("%s regex(current) %d times:%" PRIu64 " us.\n", s1, times, t1 - t0);
uint64_t t2 = taosGetTimestampUs();
for(int i = 0; i < times; i++) {
regex_t* rex = threadGetRegComp1(s1);
}
uint64_t t3 = taosGetTimestampUs();
printf("%s regex(before) %d times:%" PRIu64 " us.\n", s1, times, t3 - t2);
t2 = taosGetTimestampUs();
for(int i = 0; i < times; i++) {
regex_t* rex = NULL;
(void)threadGetRegComp(&rex, s1);
}
t3 = taosGetTimestampUs();
printf("%s regex(new) %d times:%" PRIu64 " us.\n", s1, times, t3 - t2);
}
TEST(testCase, regexCacheTest3) {
int times = 100000;
char s1[] = "abc%*";
char s2[] = "abc";
auto start = std::chrono::high_resolution_clock::now();
uint64_t t0 = taosGetTimestampUs();
for (int i = 0; i < times; i++) {
HashRegexPtr* ret = NULL;
int32_t code = getRegComp(s1, &ret);
if (code != 0) {
FAIL() << "Failed to compile regex pattern " << s1;
}
}
uint64_t t1 = taosGetTimestampUs();
printf("'%s' and '%s' take place by turn regex(current) %d times:%" PRIu64 " us.\n", s1, s2, times, t1 - t0);
uint64_t t2 = taosGetTimestampUs();
for(int i = 0; i < times; i++) {
regex_t* rex = threadGetRegComp1(s1);
rex = threadGetRegComp1(s2);
}
uint64_t t3 = taosGetTimestampUs();
printf("'%s' and '%s' take place by turn regex(before) %d times:%" PRIu64 " us.\n", s1, s2, times, t3 - t2);
t2 = taosGetTimestampUs();
for(int i = 0; i < times; i++) {
regex_t* rex = NULL;
(void)threadGetRegComp(&rex, s1);
(void)threadGetRegComp(&rex, s2);
}
t3 = taosGetTimestampUs();
printf("'%s' and '%s' take place by turn regex(new) %d times:%" PRIu64 " us.\n", s1, s2, times, t3 - t2);
}
TEST(testCase, regexCacheTest4) {
int times = 100;
int count = 1000;
char s1[] = "abc%*";
char s2[] = "abc";
auto start = std::chrono::high_resolution_clock::now();
uint64_t t0 = taosGetTimestampUs();
for (int i = 0; i < times; i++) {
for (int j = 0; j < count; ++j) {
HashRegexPtr* ret = NULL;
int32_t code = getRegComp(s1, &ret);
if (code != 0) {
FAIL() << "Failed to compile regex pattern " << s1;
}
}
for (int j = 0; j < count; ++j) {
HashRegexPtr* ret = NULL;
int32_t code = getRegComp(s2, &ret);
if (code != 0) {
FAIL() << "Failed to compile regex pattern " << s2;
}
}
}
uint64_t t1 = taosGetTimestampUs();
printf("'%s' and '%s' take place by turn(per %d count) regex(current) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t1 - t0);
uint64_t t2 = taosGetTimestampUs();
for (int i = 0; i < times; i++) {
for (int j = 0; j < count; ++j) {
regex_t* rex = threadGetRegComp1(s1);
}
for (int j = 0; j < count; ++j) {
regex_t* rex = threadGetRegComp1(s2);
}
}
uint64_t t3 = taosGetTimestampUs();
printf("'%s' and '%s' take place by turn(per %d count) regex(before) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t3 - t2);
t2 = taosGetTimestampUs();
for (int i = 0; i < times; i++) {
for (int j = 0; j < count; ++j) {
regex_t* rex = NULL;
(void)threadGetRegComp(&rex, s1);
}
for (int j = 0; j < count; ++j) {
regex_t* rex = NULL;
(void)threadGetRegComp(&rex, s2);
}
}
t3 = taosGetTimestampUs();
printf("'%s' and '%s' take place by turn(per %d count) regex(new) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t3 - t2);
}
// It is not a good idea to test this case, because it will take a long time.
/*
TEST(testCase, regexCacheTest5) {
int times = 10000;
int count = 10000;
char s1[] = "abc%*";
char s2[] = "abc";
auto start = std::chrono::high_resolution_clock::now();
uint64_t t0 = taosGetTimestampUs();
for (int i = 0; i < times; i++) {
for (int j = 0; j < count; ++j) {
HashRegexPtr* ret = NULL;
int32_t code = getRegComp(s1, &ret);
if (code != 0) {
FAIL() << "Failed to compile regex pattern " << s1;
}
}
for (int j = 0; j < count; ++j) {
HashRegexPtr* ret = NULL;
int32_t code = getRegComp(s2, &ret);
if (code != 0) {
FAIL() << "Failed to compile regex pattern " << s2;
}
}
}
uint64_t t1 = taosGetTimestampUs();
printf("'%s' and '%s' take place by turn(per %d count) regex(current) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t1 - t0);
uint64_t t2 = taosGetTimestampUs();
for (int i = 0; i < times; i++) {
for (int j = 0; j < count; ++j) {
regex_t* rex = threadGetRegComp1(s1);
}
for (int j = 0; j < count; ++j) {
regex_t* rex = threadGetRegComp1(s2);
}
}
uint64_t t3 = taosGetTimestampUs();
printf("'%s' and '%s' take place by turn(per %d count) regex(before) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t3 - t2);
t2 = taosGetTimestampUs();
for (int i = 0; i < times; i++) {
for (int j = 0; j < count; ++j) {
regex_t* rex = NULL;
(void)threadGetRegComp(&rex, s1);
}
for (int j = 0; j < count; ++j) {
regex_t* rex = NULL;
(void)threadGetRegComp(&rex, s2);
}
}
t3 = taosGetTimestampUs();
printf("'%s' and '%s' take place by turn(per %d count) regex(new) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t3 - t2);
}
TEST(testCase, regexCacheTest6) {
int times = 10000;
int count = 1000;
char s1[] = "abc%*";
char s2[] = "abc";
auto start = std::chrono::high_resolution_clock::now();
uint64_t t0 = taosGetTimestampUs();
for (int i = 0; i < times; i++) {
for (int j = 0; j < count; ++j) {
HashRegexPtr* ret = NULL;
int32_t code = getRegComp(s1, &ret);
if (code != 0) {
FAIL() << "Failed to compile regex pattern " << s1;
}
}
for (int j = 0; j < count; ++j) {
HashRegexPtr* ret = NULL;
int32_t code = getRegComp(s2, &ret);
if (code != 0) {
FAIL() << "Failed to compile regex pattern " << s2;
}
}
}
uint64_t t1 = taosGetTimestampUs();
printf("'%s' and '%s' take place by turn(per %d count) regex(current) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t1 - t0);
uint64_t t2 = taosGetTimestampUs();
for (int i = 0; i < times; i++) {
for (int j = 0; j < count; ++j) {
regex_t* rex = threadGetRegComp1(s1);
}
for (int j = 0; j < count; ++j) {
regex_t* rex = threadGetRegComp1(s2);
}
}
uint64_t t3 = taosGetTimestampUs();
printf("'%s' and '%s' take place by turn(per %d count) regex(before) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t3 - t2);
t2 = taosGetTimestampUs();
for (int i = 0; i < times; i++) {
for (int j = 0; j < count; ++j) {
regex_t* rex = NULL;
(void)threadGetRegComp(&rex, s1);
}
for (int j = 0; j < count; ++j) {
regex_t* rex = NULL;
(void)threadGetRegComp(&rex, s2);
}
}
t3 = taosGetTimestampUs();
printf("'%s' and '%s' take place by turn(per %d count) regex(new) %d times:%" PRIu64 " us.\n", s1, s2, count, times, t3 - t2);
}
*/