Merge remote-tracking branch 'origin/3.0' into fix/addErrorToMgmt

This commit is contained in:
Yihao Deng 2024-07-16 01:07:13 +00:00
commit 87e114a2d0
21 changed files with 511 additions and 139 deletions

View File

@ -626,6 +626,7 @@ bool nodesIsArithmeticOp(const SOperatorNode* pOp);
bool nodesIsComparisonOp(const SOperatorNode* pOp);
bool nodesIsJsonOp(const SOperatorNode* pOp);
bool nodesIsRegularOp(const SOperatorNode* pOp);
bool nodesIsMatchRegularOp(const SOperatorNode* pOp);
bool nodesIsBitwiseOp(const SOperatorNode* pOp);
bool nodesExprHasColumn(SNode* pNode);

View File

@ -837,6 +837,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_PAR_TBNAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267E)
#define TSDB_CODE_PAR_TAG_NAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267F)
#define TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC TAOS_DEF_ERROR_CODE(0, 0x2680)
#define TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR TAOS_DEF_ERROR_CODE(0, 0x2681)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
//planner

View File

@ -45,7 +45,10 @@ typedef struct SPatternCompareInfo {
TdUcs4 umatchOne; // unicode version matchOne
} SPatternCompareInfo;
int32_t InitRegexCache();
void DestroyRegexCache();
int32_t patternMatch(const char *pattern, size_t psize, const char *str, size_t ssize, const SPatternCompareInfo *pInfo);
int32_t checkRegexPattern(const char *pPattern);
int32_t wcsPatternMatch(const TdUcs4 *pattern, size_t psize, const TdUcs4 *str, size_t ssize, const SPatternCompareInfo *pInfo);
@ -83,7 +86,6 @@ int32_t compareLenBinaryVal(const void *pLeft, const void *pRight);
int32_t comparestrRegexMatch(const void *pLeft, const void *pRight);
int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight);
void DestoryThreadLocalRegComp();
int32_t comparewcsRegexMatch(const void *pLeft, const void *pRight);
int32_t comparewcsRegexNMatch(const void *pLeft, const void *pRight);

View File

@ -160,14 +160,15 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
} \
} while (0)
#define TAOS_CHECK_GOTO(CODE, LINO, LABEL) \
do { \
if ((CODE) != TSDB_CODE_SUCCESS) { \
if (LINO) { \
*((int32_t *)(LINO)) = __LINE__; \
} \
goto LABEL; \
} \
#define TAOS_CHECK_GOTO(CMD, LINO, LABEL) \
do { \
code = (CMD); \
if (code != TSDB_CODE_SUCCESS) { \
if (LINO) { \
*((int32_t *)(LINO)) = __LINE__; \
} \
goto LABEL; \
} \
} while (0)
#ifdef __cplusplus

View File

@ -18,6 +18,7 @@
#include "audit.h"
#include "libs/function/tudf.h"
#include "tgrant.h"
#include "tcompare.h"
#define DM_INIT_AUDIT() \
do { \
@ -213,6 +214,7 @@ void dmCleanup() {
udfStopUdfd();
taosStopCacheRefreshWorker();
dmDiskClose();
DestroyRegexCache();
#if defined(USE_S3)
s3End();

View File

@ -2203,6 +2203,17 @@ bool nodesIsRegularOp(const SOperatorNode* pOp) {
return false;
}
bool nodesIsMatchRegularOp(const SOperatorNode* pOp) {
switch (pOp->opType) {
case OP_TYPE_MATCH:
case OP_TYPE_NMATCH:
return true;
default:
break;
}
return false;
}
bool nodesIsBitwiseOp(const SOperatorNode* pOp) {
switch (pOp->opType) {
case OP_TYPE_BIT_AND:

View File

@ -223,7 +223,9 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Tag name:%s duplicated";
case TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC:
return "Some functions cannot appear in the select list at the same time";
default:
case TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR:
return "Syntax error in regular expression";
default:
return "Unknown error";
}
}

View File

@ -173,8 +173,21 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int
if (pEpSet) {
contLen = tSerializeSEpSet(NULL, 0, pEpSet);
if (contLen < 0) {
qError("tSerializeSEpSet failed, code:%x", terrno);
return terrno;
}
rsp = rpcMallocCont(contLen);
tSerializeSEpSet(rsp, contLen, pEpSet);
if (NULL == rsp) {
qError("rpcMallocCont %d failed, code:%x", contLen, terrno);
return terrno;
}
contLen = tSerializeSEpSet(rsp, contLen, pEpSet);
if (contLen < 0) {
qError("tSerializeSEpSet second failed, code:%x", terrno);
return terrno;
}
}
SRpcMsg rpcRsp = {
@ -216,20 +229,20 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
epSet.eps[2].port = 7300;
ctx->phase = QW_PHASE_POST_QUERY;
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet);
(void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet); // ignore error
*rsped = true;
return;
}
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY);
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL);
(void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error
*rsped = true;
return;
}
if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) {
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL);
(void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error
*rsped = true;
return;
}

View File

@ -166,6 +166,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
int32_t code) {
if (NULL == pRsp) {
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
if (NULL == pRsp) {
QW_RET(terrno);
}
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
dataLength = 0;
}
@ -187,6 +190,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
#if 0
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
if (NULL == pRsp) {
QW_RET(terrno);
}
pRsp->code = code;
SRpcMsg rpcRsp = {
@ -203,6 +209,9 @@ int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
if (NULL == pRsp) {
QW_RET(terrno);
}
pRsp->code = code;
SRpcMsg rpcRsp = {
@ -428,6 +437,7 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t code = 0;
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
SSubQueryMsg msg = {0};
if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
@ -442,8 +452,8 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t eId = msg.execId;
QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
qwAbortPrerocessQuery(QW_FPARAMS());
QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle);
code = qwAbortPrerocessQuery(QW_FPARAMS());
QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p, code:%x", pMsg->info.handle, code);
tFreeSSubQueryMsg(&msg);
@ -458,7 +468,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
int32_t code = 0;
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
SSubQueryMsg msg = {0};
@ -500,7 +510,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
SQWTaskCtx *handles = NULL;
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
@ -533,7 +543,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
SResFetchReq req = {0};
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
if (tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
@ -551,9 +561,9 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));
int32_t code = qwProcessFetch(QW_FPARAMS(), &qwMsg);
QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
QW_SCH_TASK_DLOG("processFetch end, node:%p, code:%x", node, code);
return TSDB_CODE_SUCCESS;
}
@ -561,7 +571,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
if (mgmt) {
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1);
}
@ -580,7 +590,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
int32_t code = 0;
STaskCancelReq *msg = pMsg->pCont;
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
@ -621,7 +631,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
int32_t code = 0;
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
STaskDropReq msg = {0};
@ -644,9 +654,9 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle);
QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));
code = qwProcessDrop(QW_FPARAMS(), &qwMsg);
QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
QW_SCH_TASK_DLOG("processDrop end, node:%p, code:%x", node, code);
return TSDB_CODE_SUCCESS;
}
@ -659,7 +669,7 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
int32_t code = 0;
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.notifyProcessed, 1);
STaskNotifyReq msg = {0};
@ -678,9 +688,9 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
QW_SCH_TASK_DLOG("processNotify start, node:%p, handle:%p", node, pMsg->info.handle);
QW_ERR_RET(qwProcessNotify(QW_FPARAMS(), &qwMsg));
code = qwProcessNotify(QW_FPARAMS(), &qwMsg);
QW_SCH_TASK_DLOG("processNotify end, node:%p", node);
QW_SCH_TASK_DLOG("processNotify end, node:%p, code:%x", node, code);
return TSDB_CODE_SUCCESS;
}
@ -695,7 +705,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
SSchedulerHbReq req = {0};
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1);
if (NULL == pMsg->pCont) {
@ -717,9 +727,9 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle);
QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));
code = qwProcessHb(mgmt, &qwMsg, &req);
QW_SCH_DLOG("processHb end, node:%p", node);
QW_SCH_DLOG("processHb end, node:%p, code:%x", node, code);
return TSDB_CODE_SUCCESS;
}
@ -735,7 +745,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD
QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1);
tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req);
QW_ERR_RET(tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req));
uint64_t sId = req.sId;
uint64_t qId = req.queryId;

View File

@ -323,34 +323,52 @@ static void freeExplainExecItem(void *param) {
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t code = TSDB_CODE_SUCCESS;
qTaskInfo_t taskHandle = ctx->taskHandle;
ctx->explainRsped = true;
SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList));
if (NULL == execInfoList) {
QW_ERR_JRET(terrno);
}
QW_ERR_JRET(qGetExplainExecInfo(taskHandle, execInfoList));
if (ctx->localExec) {
SExplainLocalRsp localRsp = {0};
localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList);
SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo));
if (NULL == pExec) {
QW_ERR_JRET(terrno);
}
memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
localRsp.rsp.subplanInfo = pExec;
localRsp.qId = qId;
localRsp.tId = tId;
localRsp.rId = rId;
localRsp.eId = eId;
taosArrayPush(ctx->explainRes, &localRsp);
if (NULL == taosArrayPush(ctx->explainRes, &localRsp)) {
QW_ERR_JRET(terrno);
}
taosArrayDestroy(execInfoList);
execInfoList = NULL;
} else {
SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
connInfo.ahandle = NULL;
int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
taosArrayDestroyEx(execInfoList, freeExplainExecItem);
execInfoList = NULL;
QW_ERR_RET(code);
}
return TSDB_CODE_SUCCESS;
_return:
taosArrayDestroyEx(execInfoList, freeExplainExecItem);
return code;
}
@ -544,7 +562,7 @@ int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
void qwCloseRef(void) {
taosWLockLatch(&gQwMgmt.lock);
if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
taosCloseRef(gQwMgmt.qwRef);
(void)taosCloseRef(gQwMgmt.qwRef); // ignore error
gQwMgmt.qwRef = -1;
}
taosWUnLockLatch(&gQwMgmt.lock);
@ -561,7 +579,7 @@ void qwDestroyImpl(void *pMgmt) {
int32_t schStatusCount = 0;
qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
taosTmrStop(mgmt->hbTimer);
(void)taosTmrStop(mgmt->hbTimer); //ignore error
mgmt->hbTimer = NULL;
taosTmrCleanUp(mgmt->timer);
@ -652,24 +670,33 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
return pStat->num ? (pStat->total / pStat->num) : 0;
default:
qError("unsupported queue type %d", type);
break;
}
return -1;
}
void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t num = taosArrayGetSize(pExpiredSch);
for (int32_t i = 0; i < num; ++i) {
uint64_t *sId = taosArrayGet(pExpiredSch, i);
SQWSchStatus *pSch = NULL;
if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) {
if (NULL == sId) {
qError("get the %dth sch failed, code:%x", i, terrno);
break;
}
code = qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch);
if (TSDB_CODE_SUCCESS != code) {
qError("acquire sch %" PRIx64 " failed, code:%x", *sId, code);
continue;
}
if (taosHashGetSize(pSch->tasksHash) <= 0) {
qwDestroySchStatus(pSch);
taosHashRemove(mgmt->schHash, sId, sizeof(*sId));
qDebug("sch %" PRIx64 " destroyed", *sId);
code = taosHashRemove(mgmt->schHash, sId, sizeof(*sId));
qDebug("sch %" PRIx64 " destroy result code:%x", *sId, code);
}
qwReleaseScheduler(QW_WRITE, mgmt);

View File

@ -1372,7 +1372,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
_return:
if (mgmt->refId >= 0) {
qwRelease(mgmt->refId); // ignore error
(void)qwRelease(mgmt->refId); // ignore error
} else {
taosHashCleanup(mgmt->schHash);
taosHashCleanup(mgmt->ctxHash);

View File

@ -130,30 +130,32 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
fetchRpc->contLen = sizeof(SResFetchReq);
}
void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
int qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
dropMsg->sId = 1;
dropMsg->queryId = atomic_load_64(&qwtTestQueryId);
dropMsg->taskId = 1;
int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, dropMsg);
if (msgSize < 0) {
return;
return terrno;
}
char *msg = (char*)taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
return;
return terrno;
}
if (tSerializeSTaskDropReq(msg, msgSize, dropMsg) < 0) {
taosMemoryFree(msg);
return;
return terrno;
}
dropRpc->msgType = TDMT_SCH_DROP_TASK;
dropRpc->pCont = msg;
dropRpc->contLen = msgSize;
return TSDB_CODE_SUCCESS;
}
int32_t qwtStringToPlan(const char *str, SSubplan **subplan) {
@ -164,6 +166,10 @@ int32_t qwtStringToPlan(const char *str, SSubplan **subplan) {
int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
taosWLockLatch(&qwtTestFetchQueueLock);
struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg));
if (NULL == newMsg) {
printf("malloc failed");
assert(0);
}
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg;
if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) {
@ -178,7 +184,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
}
taosWUnLockLatch(&qwtTestFetchQueueLock);
tsem_post(&qwtTestFetchSem);
if (tsem_post(&qwtTestFetchSem) < 0) {
printf("tsem_post failed, errno:%d", errno);
assert(0);
}
return 0;
}
@ -186,6 +195,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) {
taosWLockLatch(&qwtTestQueryQueueLock);
struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg));
if (NULL == newMsg) {
printf("malloc failed");
assert(0);
}
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg;
if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) {
@ -200,22 +213,34 @@ int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) {
}
taosWUnLockLatch(&qwtTestQueryQueueLock);
tsem_post(&qwtTestQuerySem);
if (tsem_post(&qwtTestQuerySem) < 0) {
printf("tsem_post failed, errno:%d", errno);
assert(0);
}
return 0;
}
void qwtSendReqToDnode(void *pVnode, struct SEpSet *epSet, struct SRpcMsg *pReq) {}
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
int qwtRpcSendResponse(const SRpcMsg *pRsp) {
int32_t code = 0;
switch (pRsp->msgType) {
case TDMT_SCH_QUERY_RSP:
case TDMT_SCH_MERGE_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont;
if (pRsp->code) {
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
if (code) {
assert(0);
return code;
}
code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
if (code) {
assert(0);
return code;
}
}
rpcFreeCont(rsp);
@ -227,13 +252,25 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
if (0 == pRsp->code && 0 == rsp->completed) {
qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
code = qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
if (code) {
assert(0);
return code;
}
rpcFreeCont(rsp);
return;
return code;
}
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
if (code) {
assert(0);
return code;
}
code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
if (code) {
assert(0);
return code;
}
rpcFreeCont(rsp);
break;
@ -245,9 +282,11 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
qwtTestCaseFinished = true;
break;
}
default:
break;
}
return;
return code;
}
int32_t qwtCreateExecTask(void *tsdb, int32_t vgId, uint64_t taskId, struct SSubplan *pPlan, qTaskInfo_t *pTaskInfo,
@ -292,6 +331,9 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) {
if (endExec) {
*pRes = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock));
if (NULL == *pRes) {
return terrno;
}
(*pRes)->info.rows = taosRand() % 1000 + 1;
} else {
*pRes = NULL;
@ -631,7 +673,7 @@ void *queryThread(void *param) {
while (!qwtTestStop) {
qwtBuildQueryReqMsg(&queryRpc);
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
(void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); // ignore error
if (qwtTestEnableSleep) {
taosUsleep(taosRand() % 5);
}
@ -653,7 +695,7 @@ void *fetchThread(void *param) {
while (!qwtTestStop) {
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
(void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); // ignore error
if (qwtTestEnableSleep) {
taosUsleep(taosRand() % 5);
}
@ -674,8 +716,11 @@ void *dropThread(void *param) {
STaskDropReq dropMsg = {0};
while (!qwtTestStop) {
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
if (0 != qwtBuildDropReqMsg(&dropMsg, &dropRpc)) {
break;
}
(void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); // ignore error
if (qwtTestEnableSleep) {
taosUsleep(taosRand() % 5);
}
@ -700,7 +745,7 @@ void *qwtclientThread(void *param) {
qwtTestCaseFinished = false;
qwtBuildQueryReqMsg(&queryRpc);
qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc);
(void)qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc); //ignore error
while (!qwtTestCaseFinished) {
taosUsleep(1);
@ -752,9 +797,9 @@ void *queryQueueThread(void *param) {
}
if (TDMT_SCH_QUERY == queryRpc->msgType) {
qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0);
(void)qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error
} else if (TDMT_SCH_QUERY_CONTINUE == queryRpc->msgType) {
qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0);
(void)qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error
} else {
printf("unknown msg in query queue, type:%d\n", queryRpc->msgType);
assert(0);
@ -810,16 +855,16 @@ void *fetchQueueThread(void *param) {
switch (fetchRpc->msgType) {
case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH:
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
(void)qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error
break;
case TDMT_SCH_CANCEL_TASK:
//qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0);
break;
case TDMT_SCH_DROP_TASK:
qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0);
(void)qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error
break;
case TDMT_SCH_TASK_NOTIFY:
qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0);
(void)qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error
break;
default:
printf("unknown msg type:%d in fetch queue", fetchRpc->msgType);
@ -853,7 +898,7 @@ TEST(seqTest, normalCase) {
qwtBuildQueryReqMsg(&queryRpc);
qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc);
qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
(void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error
stubSetStringToPlan();
stubSetRpcSendResponse();
@ -898,7 +943,7 @@ TEST(seqTest, cancelFirst) {
qwtInitLogFile();
qwtBuildQueryReqMsg(&queryRpc);
qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
(void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error
stubSetStringToPlan();
stubSetRpcSendResponse();
@ -954,7 +999,7 @@ TEST(seqTest, randCase) {
if (r >= 0 && r < maxr / 5) {
printf("Query,%d\n", t++);
qwtBuildQueryReqMsg(&queryRpc);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
(void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); //ignore error
} else if (r >= maxr / 5 && r < maxr * 2 / 5) {
// printf("Ready,%d\n", t++);
// qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
@ -965,14 +1010,14 @@ TEST(seqTest, randCase) {
} else if (r >= maxr * 2 / 5 && r < maxr * 3 / 5) {
printf("Fetch,%d\n", t++);
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
(void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); //ignore error
if (qwtTestEnableSleep) {
taosUsleep(1);
}
} else if (r >= maxr * 3 / 5 && r < maxr * 4 / 5) {
printf("Drop,%d\n", t++);
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
(void)qwtBuildDropReqMsg(&dropMsg, &dropRpc); //ignore error
(void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); //ignore error
if (qwtTestEnableSleep) {
taosUsleep(1);
}
@ -1018,14 +1063,14 @@ TEST(seqTest, multithreadRand) {
ASSERT_EQ(code, 0);
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
(void)taosThreadAttrInit(&thattr); //ignore error
TdThread t1, t2, t3, t4, t5, t6;
taosThreadCreate(&(t1), &thattr, queryThread, mgmt);
// taosThreadCreate(&(t2), &thattr, readyThread, NULL);
taosThreadCreate(&(t3), &thattr, fetchThread, NULL);
taosThreadCreate(&(t4), &thattr, dropThread, NULL);
taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt);
(void)taosThreadCreate(&(t1), &thattr, queryThread, mgmt); //ignore error
// (void)taosThreadCreate(&(t2), &thattr, readyThread, NULL); //ignore error
(void)taosThreadCreate(&(t3), &thattr, fetchThread, NULL); //ignore error
(void)taosThreadCreate(&(t4), &thattr, dropThread, NULL); //ignore error
(void)taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt); //ignore error
while (true) {
if (qwtTestDeadLoop) {
@ -1083,16 +1128,16 @@ TEST(rcTest, shortExecshortDelay) {
qwtTestMaxExecTaskUsec = 0;
qwtTestReqMaxDelayUsec = 0;
tsem_init(&qwtTestQuerySem, 0, 0);
tsem_init(&qwtTestFetchSem, 0, 0);
(void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
(void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
(void)taosThreadAttrInit(&thattr); //ignore error
TdThread t1, t2, t3, t4, t5;
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
(void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
(void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
(void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
while (true) {
if (qwtTestDeadLoop) {
@ -1114,8 +1159,8 @@ TEST(rcTest, shortExecshortDelay) {
if (qwtTestCaseFinished) {
if (qwtTestQuitThreadNum < 3) {
tsem_post(&qwtTestQuerySem);
tsem_post(&qwtTestFetchSem);
(void)tsem_post(&qwtTestQuerySem); //ignore error
(void)tsem_post(&qwtTestFetchSem); //ignore error
taosUsleep(10);
}
@ -1166,16 +1211,16 @@ TEST(rcTest, longExecshortDelay) {
qwtTestMaxExecTaskUsec = 1000000;
qwtTestReqMaxDelayUsec = 0;
tsem_init(&qwtTestQuerySem, 0, 0);
tsem_init(&qwtTestFetchSem, 0, 0);
(void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
(void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
(void)taosThreadAttrInit(&thattr); //ignore error
TdThread t1, t2, t3, t4, t5;
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
(void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
(void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
(void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
while (true) {
if (qwtTestDeadLoop) {
@ -1197,8 +1242,8 @@ TEST(rcTest, longExecshortDelay) {
if (qwtTestCaseFinished) {
if (qwtTestQuitThreadNum < 3) {
tsem_post(&qwtTestQuerySem);
tsem_post(&qwtTestFetchSem);
(void)tsem_post(&qwtTestQuerySem); //ignore error
(void)tsem_post(&qwtTestFetchSem); //ignore error
taosUsleep(10);
}
@ -1249,16 +1294,16 @@ TEST(rcTest, shortExeclongDelay) {
qwtTestMaxExecTaskUsec = 0;
qwtTestReqMaxDelayUsec = 1000000;
tsem_init(&qwtTestQuerySem, 0, 0);
tsem_init(&qwtTestFetchSem, 0, 0);
(void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
(void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
(void)taosThreadAttrInit(&thattr); //ignore error
TdThread t1, t2, t3, t4, t5;
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
(void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
(void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
(void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
while (true) {
if (qwtTestDeadLoop) {
@ -1280,8 +1325,8 @@ TEST(rcTest, shortExeclongDelay) {
if (qwtTestCaseFinished) {
if (qwtTestQuitThreadNum < 3) {
tsem_post(&qwtTestQuerySem);
tsem_post(&qwtTestFetchSem);
(void)tsem_post(&qwtTestQuerySem); //ignore error
(void)tsem_post(&qwtTestFetchSem); //ignore error
taosUsleep(10);
}
@ -1327,16 +1372,16 @@ TEST(rcTest, dropTest) {
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
tsem_init(&qwtTestQuerySem, 0, 0);
tsem_init(&qwtTestFetchSem, 0, 0);
(void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
(void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
(void)taosThreadAttrInit(&thattr); //ignore error
TdThread t1, t2, t3, t4, t5;
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
(void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
(void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
(void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
while (true) {
if (qwtTestDeadLoop) {

View File

@ -1654,6 +1654,12 @@ static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) {
(!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (nodesIsMatchRegularOp(pOp)) {
SValueNode* node = (SValueNode*)(pOp->pRight);
if(checkRegexPattern(node->literal) != TSDB_CODE_SUCCESS){
return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR;
}
}
}
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;

View File

@ -1675,10 +1675,8 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
pRes[i] = false;
continue;
}
char *pLeftData = colDataGetData(pLeft->columnData, leftIndex);
char *pRightData = colDataGetData(pRight->columnData, rightIndex);
pRes[i] = filterDoCompare(fp, optr, pLeftData, pRightData);
if (pRes[i]) {
++num;
@ -1714,7 +1712,6 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
if (!pLeftData || !pRightData) {
result = false;
}
if (!result) {
colDataSetInt8(pOut->columnData, i, (int8_t *)&result);
} else {

View File

@ -43,6 +43,7 @@
#include "tglobal.h"
#include "tlog.h"
#include "tvariant.h"
#include "tcompare.h"
#define _DEBUG_PRINT_ 0
@ -52,6 +53,12 @@
#define PRINTF(...)
#endif
class constantTest {
public:
constantTest() { InitRegexCache(); }
~constantTest() { DestroyRegexCache(); }
};
static constantTest test;
namespace {
SColumnInfo createColumnInfo(int32_t colId, int32_t type, int32_t bytes) {

View File

@ -24,6 +24,7 @@
#include "tutil.h"
#include "types.h"
#include "osString.h"
#include "ttimer.h"
int32_t setChkInBytes1(const void *pLeft, const void *pRight) {
return NULL != taosHashGet((SHashObj *)pRight, pLeft, 1) ? 1 : 0;
@ -1203,54 +1204,153 @@ int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight) {
return comparestrRegexMatch(pLeft, pRight) ? 0 : 1;
}
static threadlocal regex_t pRegex;
static threadlocal char *pOldPattern = NULL;
static regex_t *threadGetRegComp(const char *pPattern) {
if (NULL != pOldPattern) {
if( strcmp(pOldPattern, pPattern) == 0) {
return &pRegex;
} else {
DestoryThreadLocalRegComp();
typedef struct UsingRegex {
regex_t pRegex;
int32_t lastUsedTime;
} UsingRegex;
typedef struct RegexCache {
SHashObj *regexHash;
void *regexCacheTmr;
void *timer;
} RegexCache;
static RegexCache sRegexCache;
#define MAX_REGEX_CACHE_SIZE 20
#define REGEX_CACHE_CLEAR_TIME 30
static void checkRegexCache(void* param, void* tmrId) {
taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, sRegexCache.regexCacheTmr, &tmrId);
if (taosHashGetSize(sRegexCache.regexHash) < MAX_REGEX_CACHE_SIZE) {
return;
}
if (taosHashGetSize(sRegexCache.regexHash) >= MAX_REGEX_CACHE_SIZE) {
UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL);
while ((ppUsingRegex != NULL)) {
if (taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) {
size_t len = 0;
char* key = (char*)taosHashGetKey(ppUsingRegex, &len);
taosHashRemove(sRegexCache.regexHash, key, len);
}
ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex);
}
}
pOldPattern = taosMemoryMalloc(strlen(pPattern) + 1);
if (NULL == pOldPattern) {
}
void regexCacheFree(void *ppUsingRegex) {
regfree(&(*(UsingRegex **)ppUsingRegex)->pRegex);
taosMemoryFree(*(UsingRegex **)ppUsingRegex);
}
int32_t InitRegexCache() {
sRegexCache.regexHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (sRegexCache.regexHash == NULL) {
uError("failed to create RegexCache");
return -1;
}
taosHashSetFreeFp(sRegexCache.regexHash, regexCacheFree);
sRegexCache.regexCacheTmr = taosTmrInit(0, 0, 0, "REGEXCACHE");
if (sRegexCache.regexCacheTmr == NULL) {
uError("failed to create regex cache check timer");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
sRegexCache.timer = taosTmrStart(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, NULL, sRegexCache.regexCacheTmr);
if (sRegexCache.timer == NULL) {
uError("failed to start regex cache timer");
return -1;
}
return 0;
}
void DestroyRegexCache(){
uInfo("[regex cache] destory regex cache");
taosTmrStopA(&sRegexCache.timer);
taosHashCleanup(sRegexCache.regexHash);
taosTmrCleanUp(sRegexCache.regexCacheTmr);
}
int32_t checkRegexPattern(const char *pPattern) {
if (pPattern == NULL) {
return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR;
}
regex_t regex;
int32_t cflags = REG_EXTENDED;
int32_t ret = regcomp(&regex, pPattern, cflags);
if (ret != 0) {
char msgbuf[256] = {0};
regerror(ret, &regex, msgbuf, tListLen(msgbuf));
uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf);
return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR;
}
regfree(&regex);
return TSDB_CODE_SUCCESS;
}
static UsingRegex **getRegComp(const char *pPattern) {
UsingRegex **ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern));
if (ppUsingRegex != NULL) {
(*ppUsingRegex)->lastUsedTime = taosGetTimestampSec();
return ppUsingRegex;
}
UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex));
if (pUsingRegex == NULL) {
uError("Failed to Malloc when compile regex pattern %s.", pPattern);
return NULL;
}
strcpy(pOldPattern, pPattern);
int32_t cflags = REG_EXTENDED;
int32_t ret = regcomp(&pRegex, pPattern, cflags);
int32_t ret = regcomp(&pUsingRegex->pRegex, pPattern, cflags);
if (ret != 0) {
char msgbuf[256] = {0};
regerror(ret, &pRegex, msgbuf, tListLen(msgbuf));
regerror(ret, &pUsingRegex->pRegex, msgbuf, tListLen(msgbuf));
uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf);
DestoryThreadLocalRegComp();
taosMemoryFree(pUsingRegex);
return NULL;
}
return &pRegex;
while (true) {
int code = taosHashPut(sRegexCache.regexHash, pPattern, strlen(pPattern), &pUsingRegex, sizeof(UsingRegex *));
if (code != 0 && code != TSDB_CODE_DUP_KEY) {
regexCacheFree(&pUsingRegex);
uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern);
return NULL;
}
ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern));
if (ppUsingRegex) {
if (*ppUsingRegex != pUsingRegex) {
regexCacheFree(&pUsingRegex);
}
pUsingRegex = (*ppUsingRegex);
break;
} else {
continue;
}
}
pUsingRegex->lastUsedTime = taosGetTimestampSec();
return ppUsingRegex;
}
void DestoryThreadLocalRegComp() {
if (NULL != pOldPattern) {
regfree(&pRegex);
taosMemoryFree(pOldPattern);
pOldPattern = NULL;
}
void releaseRegComp(UsingRegex **regex){
taosHashRelease(sRegexCache.regexHash, regex);
}
static int32_t doExecRegexMatch(const char *pString, const char *pPattern) {
int32_t ret = 0;
char msgbuf[256] = {0};
regex_t *regex = threadGetRegComp(pPattern);
if (regex == NULL) {
UsingRegex **pUsingRegex = getRegComp(pPattern);
if (pUsingRegex == NULL) {
return 1;
}
regmatch_t pmatch[1];
ret = regexec(regex, pString, 1, pmatch, 0);
ret = regexec(&(*pUsingRegex)->pRegex, pString, 1, pmatch, 0);
releaseRegComp(pUsingRegex);
if (ret != 0 && ret != REG_NOMATCH) {
regerror(ret, regex, msgbuf, sizeof(msgbuf));
regerror(ret, &(*pUsingRegex)->pRegex, msgbuf, sizeof(msgbuf));
uDebug("Failed to match %s with pattern %s, reason %s", pString, pPattern, msgbuf)
}

View File

@ -684,6 +684,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_ERROR, "Pseudo tag tbname n
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_DUPLICATED, "Table name duplicated")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_NAME_DUPLICATED, "Tag name duplicated")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC, "Some functions cannot appear in the select list at the same time")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
//planner

View File

@ -104,7 +104,6 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) {
}
destroyThreadLocalGeosCtx();
DestoryThreadLocalRegComp();
return NULL;
}
@ -224,7 +223,6 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
taosUpdateItemSize(qinfo.queue, 1);
}
DestoryThreadLocalRegComp();
return NULL;
}
@ -636,7 +634,6 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
}
destroyThreadLocalGeosCtx();
DestoryThreadLocalRegComp();
return NULL;
}

View File

@ -193,6 +193,10 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py -Q 3

View File

@ -0,0 +1,141 @@
import taos
import sys
import datetime
import inspect
import threading
import time
from util.log import *
from util.sql import *
from util.cases import *
from util.common import tdCom
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
def initConnection(self):
self.records = 10000000
self.numOfTherads = 50
self.ts = 1537146000000
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
self.config = "/home/xp/git/TDengine/sim/dnode1/cfg"
self.conn = taos.connect(
self.host,
self.user,
self.password,
self.config)
def initDB(self):
tdSql.execute("drop database if exists db")
tdSql.execute("create database if not exists db")
def stopTest(self):
tdSql.execute("drop database if exists db")
def threadTest(self, threadID):
print(f"Thread {threadID} starting...")
tdsqln = tdCom.newTdSql()
for i in range(2, 50):
tdsqln.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}dx'")
tdsqln.checkRows(0)
for i in range(100):
tdsqln.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*1x'")
tdsqln.checkRows(2)
tdsqln.query("select * from db.t1x")
tdsqln.checkRows(5)
tdsqln.query("select * from db.t1x where c1 match '_c'")
tdsqln.checkRows(2)
tdsqln.query("select * from db.t1x where c1 match '%__c'")
tdsqln.checkRows(0)
tdsqln.error("select * from db.t1x where c1 match '*d'")
print(f"Thread {threadID} finished.")
def match_test(self):
tdSql.execute("create table db.t1x (ts timestamp, c1 varchar(100))")
tdSql.execute("create table db.t_1x (ts timestamp, c1 varchar(100))")
tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*1x'")
tdSql.checkRows(2)
for i in range(2, 50):
tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}x'")
tdSql.checkRows(0)
tdSql.error("select * from db.t1x where c1 match '*d'")
tdSql.query("insert into db.t1x values(now, 'abc'), (now+1s, 'a%c'),(now+2s, 'a_c'),(now+3s, '_c'),(now+4s, '%c')")
tdSql.query("select * from db.t1x")
tdSql.checkRows(5)
tdSql.query("select * from db.t1x where c1 match '_c'")
tdSql.checkRows(2)
tdSql.query("select * from db.t1x where c1 match '%__c'")
tdSql.checkRows(0)
tdSql.error("select * from db.t1x where c1 match '*d'")
threads = []
for i in range(10):
t = threading.Thread(target=self.threadTest, args=(i,))
threads.append(t)
t.start()
time.sleep(31)
tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*1x'")
tdSql.checkRows(2)
for i in range(2, 50):
tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}x'")
tdSql.checkRows(0)
tdSql.query("select * from db.t1x")
tdSql.checkRows(5)
tdSql.query("select * from db.t1x where c1 match '_c'")
tdSql.checkRows(2)
tdSql.query("select * from db.t1x where c1 match '%__c'")
tdSql.checkRows(0)
tdSql.execute("create table db.t3x (ts timestamp, c1 varchar(100))")
tdSql.execute("insert into db.t3x values(now, '我是中文'), (now+1s, '我是_中文'), (now+2s, '我是%中文'), (now+3s, '%中文'),(now+4s, '_中文')")
tdSql.query("select * from db.t3x where c1 match '%中文'")
tdSql.checkRows(2)
tdSql.query("select * from db.t3x where c1 match '中文'")
tdSql.checkRows(5)
tdSql.error("select * from db.t1x where c1 match '*d'")
for thread in threads:
print(f"Thread waitting for finish...")
thread.join()
print(f"Mutithread test finished.")
def run(self):
tdLog.printNoPrefix("==========start match_test run ...............")
tdSql.prepare(replica = self.replicaVar)
self.initConnection()
self.initDB()
self.match_test()
self.stopTest()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -104,6 +104,10 @@ python3 ./test.py -f 2-query/like.py
python3 ./test.py -f 2-query/like.py -Q 2
python3 ./test.py -f 2-query/like.py -Q 3
python3 ./test.py -f 2-query/like.py -Q 4
python3 ./test.py -f 2-query/match.py
python3 ./test.py -f 2-query/match.py -Q 2
python3 ./test.py -f 2-query/match.py -Q 3
python3 ./test.py -f 2-query/match.py -Q 4
python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False
python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False
python3 ./test.py -f 3-enterprise/restore/restoreMnode.py -N 5 -M 3 -i False