diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 8fd5ba13fc..08b4c8fccb 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -626,6 +626,7 @@ bool nodesIsArithmeticOp(const SOperatorNode* pOp); bool nodesIsComparisonOp(const SOperatorNode* pOp); bool nodesIsJsonOp(const SOperatorNode* pOp); bool nodesIsRegularOp(const SOperatorNode* pOp); +bool nodesIsMatchRegularOp(const SOperatorNode* pOp); bool nodesIsBitwiseOp(const SOperatorNode* pOp); bool nodesExprHasColumn(SNode* pNode); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 64b9ec7aeb..900d3eedad 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -837,6 +837,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_PAR_TBNAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267E) #define TSDB_CODE_PAR_TAG_NAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267F) #define TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC TAOS_DEF_ERROR_CODE(0, 0x2680) +#define TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR TAOS_DEF_ERROR_CODE(0, 0x2681) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/include/util/tcompare.h b/include/util/tcompare.h index 9694bee92d..80f992f646 100644 --- a/include/util/tcompare.h +++ b/include/util/tcompare.h @@ -45,7 +45,10 @@ typedef struct SPatternCompareInfo { TdUcs4 umatchOne; // unicode version matchOne } SPatternCompareInfo; +int32_t InitRegexCache(); +void DestroyRegexCache(); int32_t patternMatch(const char *pattern, size_t psize, const char *str, size_t ssize, const SPatternCompareInfo *pInfo); +int32_t checkRegexPattern(const char *pPattern); int32_t wcsPatternMatch(const TdUcs4 *pattern, size_t psize, const TdUcs4 *str, size_t ssize, const SPatternCompareInfo *pInfo); @@ -83,7 +86,6 @@ int32_t compareLenBinaryVal(const void *pLeft, const void *pRight); int32_t comparestrRegexMatch(const void *pLeft, const void *pRight); int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight); -void DestoryThreadLocalRegComp(); int32_t comparewcsRegexMatch(const void *pLeft, const void *pRight); int32_t comparewcsRegexNMatch(const void *pLeft, const void *pRight); diff --git a/include/util/tutil.h b/include/util/tutil.h index d3003c27ba..d1fc11d0fe 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -160,14 +160,15 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, } \ } while (0) -#define TAOS_CHECK_GOTO(CODE, LINO, LABEL) \ - do { \ - if ((CODE) != TSDB_CODE_SUCCESS) { \ - if (LINO) { \ - *((int32_t *)(LINO)) = __LINE__; \ - } \ - goto LABEL; \ - } \ +#define TAOS_CHECK_GOTO(CMD, LINO, LABEL) \ + do { \ + code = (CMD); \ + if (code != TSDB_CODE_SUCCESS) { \ + if (LINO) { \ + *((int32_t *)(LINO)) = __LINE__; \ + } \ + goto LABEL; \ + } \ } while (0) #ifdef __cplusplus diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 987d592158..15feaf1761 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -18,6 +18,7 @@ #include "audit.h" #include "libs/function/tudf.h" #include "tgrant.h" +#include "tcompare.h" #define DM_INIT_AUDIT() \ do { \ @@ -213,6 +214,7 @@ void dmCleanup() { udfStopUdfd(); taosStopCacheRefreshWorker(); dmDiskClose(); + DestroyRegexCache(); #if defined(USE_S3) s3End(); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index ce410f76be..c306cf2c7a 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -2203,6 +2203,17 @@ bool nodesIsRegularOp(const SOperatorNode* pOp) { return false; } +bool nodesIsMatchRegularOp(const SOperatorNode* pOp) { + switch (pOp->opType) { + case OP_TYPE_MATCH: + case OP_TYPE_NMATCH: + return true; + default: + break; + } + return false; +} + bool nodesIsBitwiseOp(const SOperatorNode* pOp) { switch (pOp->opType) { case OP_TYPE_BIT_AND: diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index d67c7d306f..e6b6bcc903 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -223,7 +223,9 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "Tag name:%s duplicated"; case TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC: return "Some functions cannot appear in the select list at the same time"; - default: + case TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR: + return "Syntax error in regular expression"; + default: return "Unknown error"; } } diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 17b2849684..b7a4b718e2 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -173,8 +173,21 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int if (pEpSet) { contLen = tSerializeSEpSet(NULL, 0, pEpSet); + if (contLen < 0) { + qError("tSerializeSEpSet failed, code:%x", terrno); + return terrno; + } rsp = rpcMallocCont(contLen); - tSerializeSEpSet(rsp, contLen, pEpSet); + if (NULL == rsp) { + qError("rpcMallocCont %d failed, code:%x", contLen, terrno); + return terrno; + } + + contLen = tSerializeSEpSet(rsp, contLen, pEpSet); + if (contLen < 0) { + qError("tSerializeSEpSet second failed, code:%x", terrno); + return terrno; + } } SRpcMsg rpcRsp = { @@ -216,20 +229,20 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { epSet.eps[2].port = 7300; ctx->phase = QW_PHASE_POST_QUERY; - qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet); + (void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet); // ignore error *rsped = true; return; } if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY); - qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); + (void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error *rsped = true; return; } if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) { - qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); + (void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error *rsped = true; return; } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 473dd41228..9f4a540f3c 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -166,6 +166,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve int32_t code) { if (NULL == pRsp) { pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + if (NULL == pRsp) { + QW_RET(terrno); + } memset(pRsp, 0, sizeof(SRetrieveTableRsp)); dataLength = 0; } @@ -187,6 +190,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve #if 0 int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) { STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); + if (NULL == pRsp) { + QW_RET(terrno); + } pRsp->code = code; SRpcMsg rpcRsp = { @@ -203,6 +209,9 @@ int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) { int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) { STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp)); + if (NULL == pRsp) { + QW_RET(terrno); + } pRsp->code = code; SRpcMsg rpcRsp = { @@ -428,6 +437,7 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SSubQueryMsg msg = {0}; if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) { @@ -442,8 +452,8 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t eId = msg.execId; QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle); - qwAbortPrerocessQuery(QW_FPARAMS()); - QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle); + code = qwAbortPrerocessQuery(QW_FPARAMS()); + QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p, code:%x", pMsg->info.handle, code); tFreeSSubQueryMsg(&msg); @@ -458,7 +468,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1); SSubQueryMsg msg = {0}; @@ -500,7 +510,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in SQWTaskCtx *handles = NULL; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1); if (NULL == msg || pMsg->contLen < sizeof(*msg)) { @@ -533,7 +543,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int SResFetchReq req = {0}; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1); if (tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0) { @@ -551,9 +561,9 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg)); + int32_t code = qwProcessFetch(QW_FPARAMS(), &qwMsg); - QW_SCH_TASK_DLOG("processFetch end, node:%p", node); + QW_SCH_TASK_DLOG("processFetch end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -561,7 +571,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (mgmt) { - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1); } @@ -580,7 +590,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in int32_t code = 0; STaskCancelReq *msg = pMsg->pCont; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1); if (NULL == msg || pMsg->contLen < sizeof(*msg)) { @@ -621,7 +631,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1); STaskDropReq msg = {0}; @@ -644,9 +654,9 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg)); + code = qwProcessDrop(QW_FPARAMS(), &qwMsg); - QW_SCH_TASK_DLOG("processDrop end, node:%p", node); + QW_SCH_TASK_DLOG("processDrop end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -659,7 +669,7 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.notifyProcessed, 1); STaskNotifyReq msg = {0}; @@ -678,9 +688,9 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in QW_SCH_TASK_DLOG("processNotify start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessNotify(QW_FPARAMS(), &qwMsg)); + code = qwProcessNotify(QW_FPARAMS(), &qwMsg); - QW_SCH_TASK_DLOG("processNotify end, node:%p", node); + QW_SCH_TASK_DLOG("processNotify end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -695,7 +705,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ SSchedulerHbReq req = {0}; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1); if (NULL == pMsg->pCont) { @@ -717,9 +727,9 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req)); + code = qwProcessHb(mgmt, &qwMsg, &req); - QW_SCH_DLOG("processHb end, node:%p", node); + QW_SCH_DLOG("processHb end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -735,7 +745,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1); - tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req); + QW_ERR_RET(tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req)); uint64_t sId = req.sId; uint64_t qId = req.queryId; diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 06559093a8..0451532cbe 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -323,34 +323,52 @@ static void freeExplainExecItem(void *param) { int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { + int32_t code = TSDB_CODE_SUCCESS; qTaskInfo_t taskHandle = ctx->taskHandle; ctx->explainRsped = true; SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo)); - QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList)); + if (NULL == execInfoList) { + QW_ERR_JRET(terrno); + } + + QW_ERR_JRET(qGetExplainExecInfo(taskHandle, execInfoList)); if (ctx->localExec) { SExplainLocalRsp localRsp = {0}; localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList); SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo)); + if (NULL == pExec) { + QW_ERR_JRET(terrno); + } memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo)); localRsp.rsp.subplanInfo = pExec; localRsp.qId = qId; localRsp.tId = tId; localRsp.rId = rId; localRsp.eId = eId; - taosArrayPush(ctx->explainRes, &localRsp); + if (NULL == taosArrayPush(ctx->explainRes, &localRsp)) { + QW_ERR_JRET(terrno); + } + taosArrayDestroy(execInfoList); + execInfoList = NULL; } else { SRpcHandleInfo connInfo = ctx->ctrlConnInfo; connInfo.ahandle = NULL; int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList); taosArrayDestroyEx(execInfoList, freeExplainExecItem); + execInfoList = NULL; + QW_ERR_RET(code); } - return TSDB_CODE_SUCCESS; +_return: + + taosArrayDestroyEx(execInfoList, freeExplainExecItem); + + return code; } @@ -544,7 +562,7 @@ int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { void qwCloseRef(void) { taosWLockLatch(&gQwMgmt.lock); if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) { - taosCloseRef(gQwMgmt.qwRef); + (void)taosCloseRef(gQwMgmt.qwRef); // ignore error gQwMgmt.qwRef = -1; } taosWUnLockLatch(&gQwMgmt.lock); @@ -561,7 +579,7 @@ void qwDestroyImpl(void *pMgmt) { int32_t schStatusCount = 0; qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); - taosTmrStop(mgmt->hbTimer); + (void)taosTmrStop(mgmt->hbTimer); //ignore error mgmt->hbTimer = NULL; taosTmrCleanUp(mgmt->timer); @@ -652,24 +670,33 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) { return pStat->num ? (pStat->total / pStat->num) : 0; default: qError("unsupported queue type %d", type); + break; } return -1; } void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) { + int32_t code = TSDB_CODE_SUCCESS; int32_t num = taosArrayGetSize(pExpiredSch); for (int32_t i = 0; i < num; ++i) { uint64_t *sId = taosArrayGet(pExpiredSch, i); SQWSchStatus *pSch = NULL; - if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) { + if (NULL == sId) { + qError("get the %dth sch failed, code:%x", i, terrno); + break; + } + + code = qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch); + if (TSDB_CODE_SUCCESS != code) { + qError("acquire sch %" PRIx64 " failed, code:%x", *sId, code); continue; } if (taosHashGetSize(pSch->tasksHash) <= 0) { qwDestroySchStatus(pSch); - taosHashRemove(mgmt->schHash, sId, sizeof(*sId)); - qDebug("sch %" PRIx64 " destroyed", *sId); + code = taosHashRemove(mgmt->schHash, sId, sizeof(*sId)); + qDebug("sch %" PRIx64 " destroy result code:%x", *sId, code); } qwReleaseScheduler(QW_WRITE, mgmt); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 084ee7efe3..5840cc0245 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1372,7 +1372,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S _return: if (mgmt->refId >= 0) { - qwRelease(mgmt->refId); // ignore error + (void)qwRelease(mgmt->refId); // ignore error } else { taosHashCleanup(mgmt->schHash); taosHashCleanup(mgmt->ctxHash); diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 4a0d74a6e3..d292b271c5 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -130,30 +130,32 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) { fetchRpc->contLen = sizeof(SResFetchReq); } -void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { +int qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { dropMsg->sId = 1; dropMsg->queryId = atomic_load_64(&qwtTestQueryId); dropMsg->taskId = 1; int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, dropMsg); if (msgSize < 0) { - return; + return terrno; } char *msg = (char*)taosMemoryCalloc(1, msgSize); if (NULL == msg) { - return; + return terrno; } if (tSerializeSTaskDropReq(msg, msgSize, dropMsg) < 0) { taosMemoryFree(msg); - return; + return terrno; } dropRpc->msgType = TDMT_SCH_DROP_TASK; dropRpc->pCont = msg; dropRpc->contLen = msgSize; + + return TSDB_CODE_SUCCESS; } int32_t qwtStringToPlan(const char *str, SSubplan **subplan) { @@ -164,6 +166,10 @@ int32_t qwtStringToPlan(const char *str, SSubplan **subplan) { int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { taosWLockLatch(&qwtTestFetchQueueLock); struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg)); + if (NULL == newMsg) { + printf("malloc failed"); + assert(0); + } memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg; if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) { @@ -178,7 +184,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { } taosWUnLockLatch(&qwtTestFetchQueueLock); - tsem_post(&qwtTestFetchSem); + if (tsem_post(&qwtTestFetchSem) < 0) { + printf("tsem_post failed, errno:%d", errno); + assert(0); + } return 0; } @@ -186,6 +195,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) { taosWLockLatch(&qwtTestQueryQueueLock); struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg)); + if (NULL == newMsg) { + printf("malloc failed"); + assert(0); + } memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg; if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) { @@ -200,22 +213,34 @@ int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) { } taosWUnLockLatch(&qwtTestQueryQueueLock); - tsem_post(&qwtTestQuerySem); + if (tsem_post(&qwtTestQuerySem) < 0) { + printf("tsem_post failed, errno:%d", errno); + assert(0); + } return 0; } void qwtSendReqToDnode(void *pVnode, struct SEpSet *epSet, struct SRpcMsg *pReq) {} -void qwtRpcSendResponse(const SRpcMsg *pRsp) { +int qwtRpcSendResponse(const SRpcMsg *pRsp) { + int32_t code = 0; switch (pRsp->msgType) { case TDMT_SCH_QUERY_RSP: case TDMT_SCH_MERGE_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont; if (pRsp->code) { - qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); - qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); + if (code) { + assert(0); + return code; + } + code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + if (code) { + assert(0); + return code; + } } rpcFreeCont(rsp); @@ -227,13 +252,25 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { if (0 == pRsp->code && 0 == rsp->completed) { qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc); - qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc); + code = qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc); + if (code) { + assert(0); + return code; + } rpcFreeCont(rsp); - return; + return code; } - qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); - qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); + if (code) { + assert(0); + return code; + } + code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + if (code) { + assert(0); + return code; + } rpcFreeCont(rsp); break; @@ -245,9 +282,11 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { qwtTestCaseFinished = true; break; } + default: + break; } - return; + return code; } int32_t qwtCreateExecTask(void *tsdb, int32_t vgId, uint64_t taskId, struct SSubplan *pPlan, qTaskInfo_t *pTaskInfo, @@ -292,6 +331,9 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) { if (endExec) { *pRes = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (NULL == *pRes) { + return terrno; + } (*pRes)->info.rows = taosRand() % 1000 + 1; } else { *pRes = NULL; @@ -631,7 +673,7 @@ void *queryThread(void *param) { while (!qwtTestStop) { qwtBuildQueryReqMsg(&queryRpc); - qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); + (void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); // ignore error if (qwtTestEnableSleep) { taosUsleep(taosRand() % 5); } @@ -653,7 +695,7 @@ void *fetchThread(void *param) { while (!qwtTestStop) { qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); - code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); + (void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); // ignore error if (qwtTestEnableSleep) { taosUsleep(taosRand() % 5); } @@ -674,8 +716,11 @@ void *dropThread(void *param) { STaskDropReq dropMsg = {0}; while (!qwtTestStop) { - qwtBuildDropReqMsg(&dropMsg, &dropRpc); - code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); + if (0 != qwtBuildDropReqMsg(&dropMsg, &dropRpc)) { + break; + } + (void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); // ignore error + if (qwtTestEnableSleep) { taosUsleep(taosRand() % 5); } @@ -700,7 +745,7 @@ void *qwtclientThread(void *param) { qwtTestCaseFinished = false; qwtBuildQueryReqMsg(&queryRpc); - qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc); + (void)qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc); //ignore error while (!qwtTestCaseFinished) { taosUsleep(1); @@ -752,9 +797,9 @@ void *queryQueueThread(void *param) { } if (TDMT_SCH_QUERY == queryRpc->msgType) { - qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0); + (void)qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error } else if (TDMT_SCH_QUERY_CONTINUE == queryRpc->msgType) { - qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0); + (void)qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error } else { printf("unknown msg in query queue, type:%d\n", queryRpc->msgType); assert(0); @@ -810,16 +855,16 @@ void *fetchQueueThread(void *param) { switch (fetchRpc->msgType) { case TDMT_SCH_FETCH: case TDMT_SCH_MERGE_FETCH: - qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); + (void)qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error break; case TDMT_SCH_CANCEL_TASK: //qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0); break; case TDMT_SCH_DROP_TASK: - qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0); + (void)qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error break; case TDMT_SCH_TASK_NOTIFY: - qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0); + (void)qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error break; default: printf("unknown msg type:%d in fetch queue", fetchRpc->msgType); @@ -853,7 +898,7 @@ TEST(seqTest, normalCase) { qwtBuildQueryReqMsg(&queryRpc); qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc); - qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); + (void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error stubSetStringToPlan(); stubSetRpcSendResponse(); @@ -898,7 +943,7 @@ TEST(seqTest, cancelFirst) { qwtInitLogFile(); qwtBuildQueryReqMsg(&queryRpc); - qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); + (void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error stubSetStringToPlan(); stubSetRpcSendResponse(); @@ -954,7 +999,7 @@ TEST(seqTest, randCase) { if (r >= 0 && r < maxr / 5) { printf("Query,%d\n", t++); qwtBuildQueryReqMsg(&queryRpc); - code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); + (void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); //ignore error } else if (r >= maxr / 5 && r < maxr * 2 / 5) { // printf("Ready,%d\n", t++); // qwtBuildReadyReqMsg(&readyMsg, &readyRpc); @@ -965,14 +1010,14 @@ TEST(seqTest, randCase) { } else if (r >= maxr * 2 / 5 && r < maxr * 3 / 5) { printf("Fetch,%d\n", t++); qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); - code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); + (void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); //ignore error if (qwtTestEnableSleep) { taosUsleep(1); } } else if (r >= maxr * 3 / 5 && r < maxr * 4 / 5) { printf("Drop,%d\n", t++); - qwtBuildDropReqMsg(&dropMsg, &dropRpc); - code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); + (void)qwtBuildDropReqMsg(&dropMsg, &dropRpc); //ignore error + (void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); //ignore error if (qwtTestEnableSleep) { taosUsleep(1); } @@ -1018,14 +1063,14 @@ TEST(seqTest, multithreadRand) { ASSERT_EQ(code, 0); TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5, t6; - taosThreadCreate(&(t1), &thattr, queryThread, mgmt); - // taosThreadCreate(&(t2), &thattr, readyThread, NULL); - taosThreadCreate(&(t3), &thattr, fetchThread, NULL); - taosThreadCreate(&(t4), &thattr, dropThread, NULL); - taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, queryThread, mgmt); //ignore error + // (void)taosThreadCreate(&(t2), &thattr, readyThread, NULL); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchThread, NULL); //ignore error + (void)taosThreadCreate(&(t4), &thattr, dropThread, NULL); //ignore error + (void)taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1083,16 +1128,16 @@ TEST(rcTest, shortExecshortDelay) { qwtTestMaxExecTaskUsec = 0; qwtTestReqMaxDelayUsec = 0; - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1114,8 +1159,8 @@ TEST(rcTest, shortExecshortDelay) { if (qwtTestCaseFinished) { if (qwtTestQuitThreadNum < 3) { - tsem_post(&qwtTestQuerySem); - tsem_post(&qwtTestFetchSem); + (void)tsem_post(&qwtTestQuerySem); //ignore error + (void)tsem_post(&qwtTestFetchSem); //ignore error taosUsleep(10); } @@ -1166,16 +1211,16 @@ TEST(rcTest, longExecshortDelay) { qwtTestMaxExecTaskUsec = 1000000; qwtTestReqMaxDelayUsec = 0; - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1197,8 +1242,8 @@ TEST(rcTest, longExecshortDelay) { if (qwtTestCaseFinished) { if (qwtTestQuitThreadNum < 3) { - tsem_post(&qwtTestQuerySem); - tsem_post(&qwtTestFetchSem); + (void)tsem_post(&qwtTestQuerySem); //ignore error + (void)tsem_post(&qwtTestFetchSem); //ignore error taosUsleep(10); } @@ -1249,16 +1294,16 @@ TEST(rcTest, shortExeclongDelay) { qwtTestMaxExecTaskUsec = 0; qwtTestReqMaxDelayUsec = 1000000; - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1280,8 +1325,8 @@ TEST(rcTest, shortExeclongDelay) { if (qwtTestCaseFinished) { if (qwtTestQuitThreadNum < 3) { - tsem_post(&qwtTestQuerySem); - tsem_post(&qwtTestFetchSem); + (void)tsem_post(&qwtTestQuerySem); //ignore error + (void)tsem_post(&qwtTestFetchSem); //ignore error taosUsleep(10); } @@ -1327,16 +1372,16 @@ TEST(rcTest, dropTest) { code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 50de5e760d..8f9ea4d36c 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -1654,6 +1654,12 @@ static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) { (!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) { return TSDB_CODE_TSC_INVALID_OPERATION; } + if (nodesIsMatchRegularOp(pOp)) { + SValueNode* node = (SValueNode*)(pOp->pRight); + if(checkRegexPattern(node->literal) != TSDB_CODE_SUCCESS){ + return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR; + } + } } pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index c5789a65ca..16c06c2452 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -1675,10 +1675,8 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa pRes[i] = false; continue; } - char *pLeftData = colDataGetData(pLeft->columnData, leftIndex); char *pRightData = colDataGetData(pRight->columnData, rightIndex); - pRes[i] = filterDoCompare(fp, optr, pLeftData, pRightData); if (pRes[i]) { ++num; @@ -1714,7 +1712,6 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa if (!pLeftData || !pRightData) { result = false; } - if (!result) { colDataSetInt8(pOut->columnData, i, (int8_t *)&result); } else { diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index fe86e18ce3..dd88344962 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -43,6 +43,7 @@ #include "tglobal.h" #include "tlog.h" #include "tvariant.h" +#include "tcompare.h" #define _DEBUG_PRINT_ 0 @@ -52,6 +53,12 @@ #define PRINTF(...) #endif +class constantTest { + public: + constantTest() { InitRegexCache(); } + ~constantTest() { DestroyRegexCache(); } +}; +static constantTest test; namespace { SColumnInfo createColumnInfo(int32_t colId, int32_t type, int32_t bytes) { diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index 26122a4a29..09599cead4 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -24,6 +24,7 @@ #include "tutil.h" #include "types.h" #include "osString.h" +#include "ttimer.h" int32_t setChkInBytes1(const void *pLeft, const void *pRight) { return NULL != taosHashGet((SHashObj *)pRight, pLeft, 1) ? 1 : 0; @@ -1203,54 +1204,153 @@ int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight) { return comparestrRegexMatch(pLeft, pRight) ? 0 : 1; } -static threadlocal regex_t pRegex; -static threadlocal char *pOldPattern = NULL; -static regex_t *threadGetRegComp(const char *pPattern) { - if (NULL != pOldPattern) { - if( strcmp(pOldPattern, pPattern) == 0) { - return &pRegex; - } else { - DestoryThreadLocalRegComp(); +typedef struct UsingRegex { + regex_t pRegex; + int32_t lastUsedTime; +} UsingRegex; + +typedef struct RegexCache { + SHashObj *regexHash; + void *regexCacheTmr; + void *timer; +} RegexCache; +static RegexCache sRegexCache; +#define MAX_REGEX_CACHE_SIZE 20 +#define REGEX_CACHE_CLEAR_TIME 30 + +static void checkRegexCache(void* param, void* tmrId) { + taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, sRegexCache.regexCacheTmr, &tmrId); + if (taosHashGetSize(sRegexCache.regexHash) < MAX_REGEX_CACHE_SIZE) { + return; + } + + if (taosHashGetSize(sRegexCache.regexHash) >= MAX_REGEX_CACHE_SIZE) { + UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL); + while ((ppUsingRegex != NULL)) { + if (taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) { + size_t len = 0; + char* key = (char*)taosHashGetKey(ppUsingRegex, &len); + taosHashRemove(sRegexCache.regexHash, key, len); + } + ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); } } - pOldPattern = taosMemoryMalloc(strlen(pPattern) + 1); - if (NULL == pOldPattern) { +} + +void regexCacheFree(void *ppUsingRegex) { + regfree(&(*(UsingRegex **)ppUsingRegex)->pRegex); + taosMemoryFree(*(UsingRegex **)ppUsingRegex); +} + +int32_t InitRegexCache() { + sRegexCache.regexHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (sRegexCache.regexHash == NULL) { + uError("failed to create RegexCache"); + return -1; + } + taosHashSetFreeFp(sRegexCache.regexHash, regexCacheFree); + sRegexCache.regexCacheTmr = taosTmrInit(0, 0, 0, "REGEXCACHE"); + if (sRegexCache.regexCacheTmr == NULL) { + uError("failed to create regex cache check timer"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + sRegexCache.timer = taosTmrStart(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, NULL, sRegexCache.regexCacheTmr); + if (sRegexCache.timer == NULL) { + uError("failed to start regex cache timer"); + return -1; + } + + return 0; +} + +void DestroyRegexCache(){ + uInfo("[regex cache] destory regex cache"); + taosTmrStopA(&sRegexCache.timer); + taosHashCleanup(sRegexCache.regexHash); + taosTmrCleanUp(sRegexCache.regexCacheTmr); +} + +int32_t checkRegexPattern(const char *pPattern) { + if (pPattern == NULL) { + return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR; + } + + regex_t regex; + int32_t cflags = REG_EXTENDED; + int32_t ret = regcomp(®ex, pPattern, cflags); + if (ret != 0) { + char msgbuf[256] = {0}; + regerror(ret, ®ex, msgbuf, tListLen(msgbuf)); + uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf); + return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR; + } + regfree(®ex); + return TSDB_CODE_SUCCESS; +} + +static UsingRegex **getRegComp(const char *pPattern) { + UsingRegex **ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); + if (ppUsingRegex != NULL) { + (*ppUsingRegex)->lastUsedTime = taosGetTimestampSec(); + return ppUsingRegex; + } + + UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex)); + if (pUsingRegex == NULL) { uError("Failed to Malloc when compile regex pattern %s.", pPattern); return NULL; } - strcpy(pOldPattern, pPattern); int32_t cflags = REG_EXTENDED; - int32_t ret = regcomp(&pRegex, pPattern, cflags); + int32_t ret = regcomp(&pUsingRegex->pRegex, pPattern, cflags); if (ret != 0) { char msgbuf[256] = {0}; - regerror(ret, &pRegex, msgbuf, tListLen(msgbuf)); + regerror(ret, &pUsingRegex->pRegex, msgbuf, tListLen(msgbuf)); uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf); - DestoryThreadLocalRegComp(); + taosMemoryFree(pUsingRegex); return NULL; } - return &pRegex; + + while (true) { + int code = taosHashPut(sRegexCache.regexHash, pPattern, strlen(pPattern), &pUsingRegex, sizeof(UsingRegex *)); + if (code != 0 && code != TSDB_CODE_DUP_KEY) { + regexCacheFree(&pUsingRegex); + uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern); + return NULL; + } + ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); + if (ppUsingRegex) { + if (*ppUsingRegex != pUsingRegex) { + regexCacheFree(&pUsingRegex); + } + pUsingRegex = (*ppUsingRegex); + break; + } else { + continue; + } + } + pUsingRegex->lastUsedTime = taosGetTimestampSec(); + return ppUsingRegex; } -void DestoryThreadLocalRegComp() { - if (NULL != pOldPattern) { - regfree(&pRegex); - taosMemoryFree(pOldPattern); - pOldPattern = NULL; - } +void releaseRegComp(UsingRegex **regex){ + taosHashRelease(sRegexCache.regexHash, regex); } static int32_t doExecRegexMatch(const char *pString, const char *pPattern) { int32_t ret = 0; char msgbuf[256] = {0}; - regex_t *regex = threadGetRegComp(pPattern); - if (regex == NULL) { + UsingRegex **pUsingRegex = getRegComp(pPattern); + if (pUsingRegex == NULL) { return 1; } regmatch_t pmatch[1]; - ret = regexec(regex, pString, 1, pmatch, 0); + ret = regexec(&(*pUsingRegex)->pRegex, pString, 1, pmatch, 0); + releaseRegComp(pUsingRegex); if (ret != 0 && ret != REG_NOMATCH) { - regerror(ret, regex, msgbuf, sizeof(msgbuf)); + regerror(ret, &(*pUsingRegex)->pRegex, msgbuf, sizeof(msgbuf)); uDebug("Failed to match %s with pattern %s, reason %s", pString, pPattern, msgbuf) } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0f5ad73f61..347f0be4ff 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -684,6 +684,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_ERROR, "Pseudo tag tbname n TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_DUPLICATED, "Table name duplicated") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_NAME_DUPLICATED, "Tag name duplicated") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC, "Some functions cannot appear in the select list at the same time") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") //planner diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 4a8a0823b7..7a97dc3527 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -104,7 +104,6 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) { } destroyThreadLocalGeosCtx(); - DestoryThreadLocalRegComp(); return NULL; } @@ -224,7 +223,6 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) { taosUpdateItemSize(qinfo.queue, 1); } - DestoryThreadLocalRegComp(); return NULL; } @@ -636,7 +634,6 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { } destroyThreadLocalGeosCtx(); - DestoryThreadLocalRegComp(); return NULL; } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 69709a201a..848917ceb5 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -193,6 +193,10 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py -Q 3 diff --git a/tests/system-test/2-query/match.py b/tests/system-test/2-query/match.py new file mode 100644 index 0000000000..cd2ed5d96b --- /dev/null +++ b/tests/system-test/2-query/match.py @@ -0,0 +1,141 @@ +import taos +import sys +import datetime +import inspect +import threading +import time + +from util.log import * +from util.sql import * +from util.cases import * +from util.common import tdCom + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + + def initConnection(self): + self.records = 10000000 + self.numOfTherads = 50 + self.ts = 1537146000000 + self.host = "127.0.0.1" + self.user = "root" + self.password = "taosdata" + self.config = "/home/xp/git/TDengine/sim/dnode1/cfg" + self.conn = taos.connect( + self.host, + self.user, + self.password, + self.config) + + def initDB(self): + tdSql.execute("drop database if exists db") + tdSql.execute("create database if not exists db") + + def stopTest(self): + tdSql.execute("drop database if exists db") + + def threadTest(self, threadID): + print(f"Thread {threadID} starting...") + tdsqln = tdCom.newTdSql() + for i in range(2, 50): + tdsqln.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}dx'") + tdsqln.checkRows(0) + for i in range(100): + tdsqln.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*1x'") + tdsqln.checkRows(2) + + tdsqln.query("select * from db.t1x") + tdsqln.checkRows(5) + + tdsqln.query("select * from db.t1x where c1 match '_c'") + tdsqln.checkRows(2) + + tdsqln.query("select * from db.t1x where c1 match '%__c'") + tdsqln.checkRows(0) + + tdsqln.error("select * from db.t1x where c1 match '*d'") + + print(f"Thread {threadID} finished.") + + def match_test(self): + tdSql.execute("create table db.t1x (ts timestamp, c1 varchar(100))") + tdSql.execute("create table db.t_1x (ts timestamp, c1 varchar(100))") + + tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*1x'") + tdSql.checkRows(2) + for i in range(2, 50): + tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}x'") + tdSql.checkRows(0) + + tdSql.error("select * from db.t1x where c1 match '*d'") + tdSql.query("insert into db.t1x values(now, 'abc'), (now+1s, 'a%c'),(now+2s, 'a_c'),(now+3s, '_c'),(now+4s, '%c')") + + tdSql.query("select * from db.t1x") + tdSql.checkRows(5) + + tdSql.query("select * from db.t1x where c1 match '_c'") + tdSql.checkRows(2) + + tdSql.query("select * from db.t1x where c1 match '%__c'") + tdSql.checkRows(0) + tdSql.error("select * from db.t1x where c1 match '*d'") + threads = [] + for i in range(10): + t = threading.Thread(target=self.threadTest, args=(i,)) + threads.append(t) + t.start() + + time.sleep(31) + + tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*1x'") + tdSql.checkRows(2) + for i in range(2, 50): + tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}x'") + tdSql.checkRows(0) + + tdSql.query("select * from db.t1x") + tdSql.checkRows(5) + + tdSql.query("select * from db.t1x where c1 match '_c'") + tdSql.checkRows(2) + + tdSql.query("select * from db.t1x where c1 match '%__c'") + tdSql.checkRows(0) + + tdSql.execute("create table db.t3x (ts timestamp, c1 varchar(100))") + + tdSql.execute("insert into db.t3x values(now, '我是中文'), (now+1s, '我是_中文'), (now+2s, '我是%中文'), (now+3s, '%中文'),(now+4s, '_中文')") + tdSql.query("select * from db.t3x where c1 match '%中文'") + tdSql.checkRows(2) + tdSql.query("select * from db.t3x where c1 match '中文'") + tdSql.checkRows(5) + tdSql.error("select * from db.t1x where c1 match '*d'") + + for thread in threads: + print(f"Thread waitting for finish...") + thread.join() + + print(f"Mutithread test finished.") + + def run(self): + tdLog.printNoPrefix("==========start match_test run ...............") + tdSql.prepare(replica = self.replicaVar) + + self.initConnection() + + self.initDB() + self.match_test() + + self.stopTest() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file index 96f9452827..cdc4e27f20 100644 --- a/tests/system-test/win-test-file +++ b/tests/system-test/win-test-file @@ -104,6 +104,10 @@ python3 ./test.py -f 2-query/like.py python3 ./test.py -f 2-query/like.py -Q 2 python3 ./test.py -f 2-query/like.py -Q 3 python3 ./test.py -f 2-query/like.py -Q 4 +python3 ./test.py -f 2-query/match.py +python3 ./test.py -f 2-query/match.py -Q 2 +python3 ./test.py -f 2-query/match.py -Q 3 +python3 ./test.py -f 2-query/match.py -Q 4 python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False python3 ./test.py -f 3-enterprise/restore/restoreMnode.py -N 5 -M 3 -i False