diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index a461ceef2a..082e824e5a 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -152,7 +152,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, * @return */ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, - int32_t* tversion, int32_t idx); + int32_t* tversion, int32_t idx, bool* tbGet); /** * The main task execution function, including query on both table and multiple tables, diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 8fd5ba13fc..08b4c8fccb 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -626,6 +626,7 @@ bool nodesIsArithmeticOp(const SOperatorNode* pOp); bool nodesIsComparisonOp(const SOperatorNode* pOp); bool nodesIsJsonOp(const SOperatorNode* pOp); bool nodesIsRegularOp(const SOperatorNode* pOp); +bool nodesIsMatchRegularOp(const SOperatorNode* pOp); bool nodesIsBitwiseOp(const SOperatorNode* pOp); bool nodesExprHasColumn(SNode* pNode); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 64b9ec7aeb..900d3eedad 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -837,6 +837,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_PAR_TBNAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267E) #define TSDB_CODE_PAR_TAG_NAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267F) #define TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC TAOS_DEF_ERROR_CODE(0, 0x2680) +#define TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR TAOS_DEF_ERROR_CODE(0, 0x2681) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/include/util/tcompare.h b/include/util/tcompare.h index 9694bee92d..80f992f646 100644 --- a/include/util/tcompare.h +++ b/include/util/tcompare.h @@ -45,7 +45,10 @@ typedef struct SPatternCompareInfo { TdUcs4 umatchOne; // unicode version matchOne } SPatternCompareInfo; +int32_t InitRegexCache(); +void DestroyRegexCache(); int32_t patternMatch(const char *pattern, size_t psize, const char *str, size_t ssize, const SPatternCompareInfo *pInfo); +int32_t checkRegexPattern(const char *pPattern); int32_t wcsPatternMatch(const TdUcs4 *pattern, size_t psize, const TdUcs4 *str, size_t ssize, const SPatternCompareInfo *pInfo); @@ -83,7 +86,6 @@ int32_t compareLenBinaryVal(const void *pLeft, const void *pRight); int32_t comparestrRegexMatch(const void *pLeft, const void *pRight); int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight); -void DestoryThreadLocalRegComp(); int32_t comparewcsRegexMatch(const void *pLeft, const void *pRight); int32_t comparewcsRegexNMatch(const void *pLeft, const void *pRight); diff --git a/include/util/tutil.h b/include/util/tutil.h index d1a18dc3e8..d1fc11d0fe 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -56,7 +56,7 @@ void taosIpPort2String(uint32_t ip, uint16_t port, char *str); void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen); -int32_t parseCfgReal(const char* str, double* out); +int32_t parseCfgReal(const char *str, double *out); static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { T_MD5_CTX context; @@ -84,9 +84,9 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar static FORCE_INLINE int32_t taosCreateMD5Hash(char *pBuf, int32_t len) { T_MD5_CTX ctx; tMD5Init(&ctx); - tMD5Update(&ctx, (uint8_t*)pBuf, len); + tMD5Update(&ctx, (uint8_t *)pBuf, len); tMD5Final(&ctx); - char* p = pBuf; + char *p = pBuf; int32_t resLen = 0; for (uint8_t i = 0; i < tListLen(ctx.digest); ++i) { resLen += snprintf(p, 3, "%02x", ctx.digest[i]); @@ -147,6 +147,30 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, #define TCONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr)-offsetof(type, member))) +#define TAOS_RETURN(code) \ + do { \ + return (terrno = (code)); \ + } while (0) + +#define TAOS_CHECK_RETURN(CMD) \ + do { \ + int32_t code = (CMD); \ + if (code != TSDB_CODE_SUCCESS) { \ + TAOS_RETURN(code); \ + } \ + } while (0) + +#define TAOS_CHECK_GOTO(CMD, LINO, LABEL) \ + do { \ + code = (CMD); \ + if (code != TSDB_CODE_SUCCESS) { \ + if (LINO) { \ + *((int32_t *)(LINO)) = __LINE__; \ + } \ + goto LABEL; \ + } \ + } while (0) + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index d0ba4c6773..a7d459ff31 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -1398,10 +1398,8 @@ void hbMgrCleanUp() { taosThreadMutexLock(&clientHbMgr.lock); appHbMgrCleanup(); - taosArrayDestroy(clientHbMgr.appHbMgrs); clientHbMgr.appHbMgrs = NULL; - taosThreadMutexUnlock(&clientHbMgr.lock); } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index cb097a8fe7..fa79d16970 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9507,7 +9507,6 @@ static void tDeleteMqDataRspCommon(void *rsp) { SMqDataRspCommon *pRsp = rsp; taosArrayDestroy(pRsp->blockDataLen); pRsp->blockDataLen = NULL; - taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); pRsp->blockData = NULL; taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper); @@ -9562,7 +9561,6 @@ void tDeleteSTaosxRsp(void *rsp) { STaosxRsp *pRsp = (STaosxRsp *)rsp; taosArrayDestroy(pRsp->createTableLen); pRsp->createTableLen = NULL; - taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree); pRsp->createTableReq = NULL; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 4739409d2c..f636629b3a 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -18,6 +18,7 @@ #include "audit.h" #include "libs/function/tudf.h" #include "tgrant.h" +#include "tcompare.h" #define DM_INIT_AUDIT() \ do { \ @@ -163,6 +164,7 @@ int32_t dmInit() { if (dmInitMonitor() != 0) return -1; if (dmInitAudit() != 0) return -1; if (dmInitDnode(dmInstance()) != 0) return -1; + if (InitRegexCache() != 0) return -1; #if defined(USE_S3) if (s3Begin() != 0) return -1; #endif @@ -192,6 +194,7 @@ void dmCleanup() { udfStopUdfd(); taosStopCacheRefreshWorker(); dmDiskClose(); + DestroyRegexCache(); #if defined(USE_S3) s3End(); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 5164557184..9dc5f920ad 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -195,7 +195,9 @@ void *freeStreamTasks(SArray *pTaskLevel) { taosArrayDestroy(pLevel); } - return taosArrayDestroy(pTaskLevel); + taosArrayDestroy(pTaskLevel); + + return NULL; } void tFreeStreamObj(SStreamObj *pStream) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index d3327b5d6b..1c8f102725 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -251,7 +251,6 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag); taosArrayDestroy(tagArray); tagArray = NULL; - if (pCreateTbReq->ctb.pTag == NULL) { tdDestroySVCreateTbReq(pCreateTbReq); code = TSDB_CODE_OUT_OF_MEMORY; @@ -509,7 +508,6 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat if (pTableData->aRowP == NULL || pVals == NULL) { taosArrayDestroy(pTableData->aRowP); pTableData->aRowP = NULL; - taosArrayDestroy(pVals); code = TSDB_CODE_OUT_OF_MEMORY; tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code)); @@ -536,7 +534,6 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat ts, earlyTs); taosArrayDestroy(pTableData->aRowP); pTableData->aRowP = NULL; - taosArrayDestroy(pVals); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 332ed4d05c..e84800691f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -310,7 +310,6 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) { taosArrayDestroy(pInfo->delSkyline); pInfo->delSkyline = NULL; - pInfo->lastProcKey.ts = ts; // todo check the nextProcKey info pInfo->sttKeyInfo.nextProcKey.ts = ts + step; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c0d089e7c2..4aff0e69a3 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -479,12 +479,14 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI } int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, - int32_t* tversion, int32_t idx) { + int32_t* tversion, int32_t idx, bool* tbGet) { + *tbGet = false; + ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL); SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; if (taosArrayGetSize(pTaskInfo->schemaInfos) <= idx) { - return -1; + return TSDB_CODE_SUCCESS; } SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, idx); @@ -502,6 +504,8 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table tableName[0] = 0; } + *tbGet = true; + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 0002c94fa7..5f188c2c8a 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -129,7 +129,6 @@ static void destroyStreamFillOperatorInfo(void* param) { pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pSrcBlock = blockDataDestroy(pInfo->pSrcBlock); pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); - taosArrayDestroy(pInfo->matchInfo.pList); pInfo->matchInfo.pList = NULL; taosMemoryFree(pInfo); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 7455296f0f..41d398888e 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1138,7 +1138,6 @@ void destroyIntervalOperatorInfo(void* param) { taosArrayDestroy(pInfo->pInterpCols); pInfo->pInterpCols = NULL; - taosArrayDestroyEx(pInfo->pPrevValues, freeItem); pInfo->pPrevValues = NULL; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index d5117e0eec..aab5a52776 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1441,6 +1441,9 @@ _stddev_over: } static void stddevTransferInfo(SStddevRes* pInput, SStddevRes* pOutput) { + if (IS_NULL_TYPE(pInput->type)) { + return; + } pOutput->type = pInput->type; if (IS_SIGNED_NUMERIC_TYPE(pOutput->type)) { pOutput->quadraticISum += pInput->quadraticISum; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index ce410f76be..c306cf2c7a 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -2203,6 +2203,17 @@ bool nodesIsRegularOp(const SOperatorNode* pOp) { return false; } +bool nodesIsMatchRegularOp(const SOperatorNode* pOp) { + switch (pOp->opType) { + case OP_TYPE_MATCH: + case OP_TYPE_NMATCH: + return true; + default: + break; + } + return false; +} + bool nodesIsBitwiseOp(const SOperatorNode* pOp) { switch (pOp->opType) { case OP_TYPE_BIT_AND: diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index d67c7d306f..e6b6bcc903 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -223,7 +223,9 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "Tag name:%s duplicated"; case TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC: return "Some functions cannot appear in the select list at the same time"; - default: + case TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR: + return "Syntax error in regular expression"; + default: return "Unknown error"; } } diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 2222f9cb31..dfd00dc5a7 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -391,7 +391,7 @@ void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx); int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode); int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status, bool dynamicTask); int32_t qwDropTask(QW_FPARAMS_DEF); -void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); +int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); int32_t qwOpenRef(void); void qwSetHbParam(int64_t refId, SQWHbParam **pParam); int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index d28667d247..b7a4b718e2 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -173,8 +173,21 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int if (pEpSet) { contLen = tSerializeSEpSet(NULL, 0, pEpSet); + if (contLen < 0) { + qError("tSerializeSEpSet failed, code:%x", terrno); + return terrno; + } rsp = rpcMallocCont(contLen); - tSerializeSEpSet(rsp, contLen, pEpSet); + if (NULL == rsp) { + qError("rpcMallocCont %d failed, code:%x", contLen, terrno); + return terrno; + } + + contLen = tSerializeSEpSet(rsp, contLen, pEpSet); + if (contLen < 0) { + qError("tSerializeSEpSet second failed, code:%x", terrno); + return terrno; + } } SRpcMsg rpcRsp = { @@ -216,20 +229,20 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { epSet.eps[2].port = 7300; ctx->phase = QW_PHASE_POST_QUERY; - qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet); + (void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet); // ignore error *rsped = true; return; } if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY); - qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); + (void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error *rsped = true; return; } if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) { - qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); + (void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error *rsped = true; return; } @@ -260,8 +273,8 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) { if (++ignoreTime > 10 && 0 == taosRand() % 9) { if (ctx->fetchMsgType == TDMT_SCH_FETCH) { - qwBuildAndSendErrorRsp(TDMT_SCH_LINK_BROKEN, &ctx->ctrlConnInfo, TSDB_CODE_RPC_BROKEN_LINK); - qwBuildAndSendErrorRsp(ctx->fetchMsgType + 1, &ctx->dataConnInfo, TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); + (void)qwBuildAndSendErrorRsp(TDMT_SCH_LINK_BROKEN, &ctx->ctrlConnInfo, TSDB_CODE_RPC_BROKEN_LINK); // ignore error + (void)qwBuildAndSendErrorRsp(ctx->fetchMsgType + 1, &ctx->dataConnInfo, TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); // ignore error *rsped = true; taosSsleep(3); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index d30b100147..9f4a540f3c 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -105,8 +105,19 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray *pExecList) { SExplainRsp rsp = {.numOfPlans = taosArrayGetSize(pExecList), .subplanInfo = pInfo}; int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp); + if (contLen < 0) { + qError("tSerializeSExplainRsp failed, error: %x", terrno); + QW_RET(terrno); + } void *pRsp = rpcMallocCont(contLen); - tSerializeSExplainRsp(pRsp, contLen, &rsp); + if (NULL == pRsp) { + QW_RET(terrno); + } + contLen = tSerializeSExplainRsp(pRsp, contLen, &rsp); + if (contLen < 0) { + qError("tSerializeSExplainRsp second failed, error: %x", terrno); + QW_RET(terrno); + } SRpcMsg rpcRsp = { .msgType = TDMT_SCH_EXPLAIN_RSP, @@ -123,8 +134,20 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray *pExecList) { int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); + if (contLen < 0) { + qError("tSerializeSSchedulerHbRsp failed, error: %x", terrno); + QW_RET(terrno); + } + void *pRsp = rpcMallocCont(contLen); - tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); + if (NULL == pRsp) { + QW_RET(terrno); + } + contLen = tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); + if (contLen < 0) { + qError("tSerializeSSchedulerHbRsp second failed, error: %x", terrno); + QW_RET(terrno); + } SRpcMsg rpcRsp = { .msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP, @@ -143,6 +166,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve int32_t code) { if (NULL == pRsp) { pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + if (NULL == pRsp) { + QW_RET(terrno); + } memset(pRsp, 0, sizeof(SRetrieveTableRsp)); dataLength = 0; } @@ -164,6 +190,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve #if 0 int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) { STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); + if (NULL == pRsp) { + QW_RET(terrno); + } pRsp->code = code; SRpcMsg rpcRsp = { @@ -180,6 +209,9 @@ int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) { int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) { STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp)); + if (NULL == pRsp) { + QW_RET(terrno); + } pRsp->code = code; SRpcMsg rpcRsp = { @@ -405,6 +437,7 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SSubQueryMsg msg = {0}; if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) { @@ -419,8 +452,8 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t eId = msg.execId; QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle); - qwAbortPrerocessQuery(QW_FPARAMS()); - QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle); + code = qwAbortPrerocessQuery(QW_FPARAMS()); + QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p, code:%x", pMsg->info.handle, code); tFreeSSubQueryMsg(&msg); @@ -435,7 +468,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1); SSubQueryMsg msg = {0}; @@ -477,7 +510,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in SQWTaskCtx *handles = NULL; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1); if (NULL == msg || pMsg->contLen < sizeof(*msg)) { @@ -495,9 +528,9 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg)); + code = qwProcessCQuery(QW_FPARAMS(), &qwMsg); - QW_SCH_TASK_DLOG("processCQuery end, node:%p", node); + QW_SCH_TASK_DLOG("processCQuery end, node:%p, code:0x%x", node, code); return TSDB_CODE_SUCCESS; } @@ -510,7 +543,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int SResFetchReq req = {0}; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1); if (tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0) { @@ -528,9 +561,9 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg)); + int32_t code = qwProcessFetch(QW_FPARAMS(), &qwMsg); - QW_SCH_TASK_DLOG("processFetch end, node:%p", node); + QW_SCH_TASK_DLOG("processFetch end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -538,7 +571,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (mgmt) { - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1); } @@ -557,7 +590,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in int32_t code = 0; STaskCancelReq *msg = pMsg->pCont; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1); if (NULL == msg || pMsg->contLen < sizeof(*msg)) { @@ -598,7 +631,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1); STaskDropReq msg = {0}; @@ -621,9 +654,9 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg)); + code = qwProcessDrop(QW_FPARAMS(), &qwMsg); - QW_SCH_TASK_DLOG("processDrop end, node:%p", node); + QW_SCH_TASK_DLOG("processDrop end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -636,7 +669,7 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.notifyProcessed, 1); STaskNotifyReq msg = {0}; @@ -655,9 +688,9 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in QW_SCH_TASK_DLOG("processNotify start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessNotify(QW_FPARAMS(), &qwMsg)); + code = qwProcessNotify(QW_FPARAMS(), &qwMsg); - QW_SCH_TASK_DLOG("processNotify end, node:%p", node); + QW_SCH_TASK_DLOG("processNotify end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -672,7 +705,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ SSchedulerHbReq req = {0}; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1); if (NULL == pMsg->pCont) { @@ -694,9 +727,9 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req)); + code = qwProcessHb(mgmt, &qwMsg, &req); - QW_SCH_DLOG("processHb end, node:%p", node); + QW_SCH_DLOG("processHb end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -712,7 +745,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1); - tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req); + QW_ERR_RET(tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req)); uint64_t sId = req.sId; uint64_t qId = req.queryId; diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index f00c4aef30..0451532cbe 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -323,34 +323,52 @@ static void freeExplainExecItem(void *param) { int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { + int32_t code = TSDB_CODE_SUCCESS; qTaskInfo_t taskHandle = ctx->taskHandle; ctx->explainRsped = true; SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo)); - QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList)); + if (NULL == execInfoList) { + QW_ERR_JRET(terrno); + } + + QW_ERR_JRET(qGetExplainExecInfo(taskHandle, execInfoList)); if (ctx->localExec) { SExplainLocalRsp localRsp = {0}; localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList); SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo)); + if (NULL == pExec) { + QW_ERR_JRET(terrno); + } memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo)); localRsp.rsp.subplanInfo = pExec; localRsp.qId = qId; localRsp.tId = tId; localRsp.rId = rId; localRsp.eId = eId; - taosArrayPush(ctx->explainRes, &localRsp); + if (NULL == taosArrayPush(ctx->explainRes, &localRsp)) { + QW_ERR_JRET(terrno); + } + taosArrayDestroy(execInfoList); + execInfoList = NULL; } else { SRpcHandleInfo connInfo = ctx->ctrlConnInfo; connInfo.ahandle = NULL; int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList); taosArrayDestroyEx(execInfoList, freeExplainExecItem); + execInfoList = NULL; + QW_ERR_RET(code); } - return TSDB_CODE_SUCCESS; +_return: + + taosArrayDestroyEx(execInfoList, freeExplainExecItem); + + return code; } @@ -503,14 +521,18 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { *pParam = &gQwMgmt.param[paramIdx]; } -void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { +int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { char dbFName[TSDB_DB_FNAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN]; STbVerInfo tbInfo; int32_t i = 0; + int32_t code = TSDB_CODE_SUCCESS; + bool tbGet = false; while (true) { - if (qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &tbInfo.sversion, &tbInfo.tversion, i) < 0) { + tbGet = false; + code = qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &tbInfo.sversion, &tbInfo.tversion, i, &tbGet); + if (TSDB_CODE_SUCCESS != code || !tbGet) { break; } @@ -522,18 +544,25 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { if (NULL == ctx->tbInfo) { ctx->tbInfo = taosArrayInit(1, sizeof(tbInfo)); + if (NULL == ctx->tbInfo) { + QW_ERR_RET(terrno); + } } - taosArrayPush(ctx->tbInfo, &tbInfo); + if (NULL == taosArrayPush(ctx->tbInfo, &tbInfo)) { + QW_ERR_RET(terrno); + } i++; } + + QW_RET(code); } void qwCloseRef(void) { taosWLockLatch(&gQwMgmt.lock); if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) { - taosCloseRef(gQwMgmt.qwRef); + (void)taosCloseRef(gQwMgmt.qwRef); // ignore error gQwMgmt.qwRef = -1; } taosWUnLockLatch(&gQwMgmt.lock); @@ -550,7 +579,7 @@ void qwDestroyImpl(void *pMgmt) { int32_t schStatusCount = 0; qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); - taosTmrStop(mgmt->hbTimer); + (void)taosTmrStop(mgmt->hbTimer); //ignore error mgmt->hbTimer = NULL; taosTmrCleanUp(mgmt->timer); @@ -641,24 +670,33 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) { return pStat->num ? (pStat->total / pStat->num) : 0; default: qError("unsupported queue type %d", type); + break; } return -1; } void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) { + int32_t code = TSDB_CODE_SUCCESS; int32_t num = taosArrayGetSize(pExpiredSch); for (int32_t i = 0; i < num; ++i) { uint64_t *sId = taosArrayGet(pExpiredSch, i); SQWSchStatus *pSch = NULL; - if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) { + if (NULL == sId) { + qError("get the %dth sch failed, code:%x", i, terrno); + break; + } + + code = qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch); + if (TSDB_CODE_SUCCESS != code) { + qError("acquire sch %" PRIx64 " failed, code:%x", *sId, code); continue; } if (taosHashGetSize(pSch->tasksHash) <= 0) { qwDestroySchStatus(pSch); - taosHashRemove(mgmt->schHash, sId, sizeof(*sId)); - qDebug("sch %" PRIx64 " destroyed", *sId); + code = taosHashRemove(mgmt->schHash, sId, sizeof(*sId)); + qDebug("sch %" PRIx64 " destroy result code:%x", *sId, code); } qwReleaseScheduler(QW_WRITE, mgmt); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index c2cd07e9ed..5840cc0245 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -18,10 +18,11 @@ SQWorkerMgmt gQwMgmt = { .qwNum = 0, }; -int32_t qwStopAllTasks(SQWorker *mgmt) { +void qwStopAllTasks(SQWorker *mgmt) { uint64_t qId, tId, sId; int32_t eId; int64_t rId = 0; + int32_t code = TSDB_CODE_SUCCESS; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { @@ -44,22 +45,29 @@ int32_t qwStopAllTasks(SQWorker *mgmt) { } if (QW_QUERY_RUNNING(ctx)) { - qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); - QW_TASK_DLOG_E("task running, async killed"); + code = qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); + if (TSDB_CODE_SUCCESS != code) { + QW_TASK_ELOG("task running, async kill failed, error: %x", code); + } else { + QW_TASK_DLOG_E("task running, async killed"); + } } else if (QW_FETCH_RUNNING(ctx)) { QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); QW_TASK_DLOG_E("task fetching, update drop received"); } else { - qwDropTask(QW_FPARAMS()); + code = qwDropTask(QW_FPARAMS()); + if (TSDB_CODE_SUCCESS != code) { + QW_TASK_ELOG("task drop failed, error: %x", code); + } else { + QW_TASK_DLOG_E("task dropped"); + } } QW_UNLOCK(QW_WRITE, &ctx->lock); pIter = taosHashIterate(mgmt->ctxHash, pIter); } - - return TSDB_CODE_SUCCESS; } int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { @@ -111,7 +119,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t rspCode, bool quickRsp) { if ((!quickRsp) || QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy) { if (!ctx->localExec) { - qwBuildAndSendQueryRsp(msgType, &ctx->ctrlConnInfo, rspCode, ctx); + QW_ERR_RET(qwBuildAndSendQueryRsp(msgType, &ctx->ctrlConnInfo, rspCode, ctx)); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, rspCode, tstrerror(rspCode)); } @@ -140,6 +148,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { } SArray *pResList = taosArrayInit(4, POINTER_BYTES); + if (NULL == pResList) { + QW_ERR_RET(terrno); + } + while (true) { QW_TASK_DLOG("start to execTask, loopIdx:%d", i++); @@ -165,6 +177,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { size_t numOfResBlock = taosArrayGetSize(pResList); for (int32_t j = 0; j < numOfResBlock; ++j) { SSDataBlock *pRes = taosArrayGetP(pResList, j); + if (NULL == pRes) { + QW_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } SInputData inputData = {.pData = pRes}; code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); @@ -226,7 +241,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { } _return: + taosArrayDestroy(pResList); + QW_RET(code); } @@ -241,7 +258,8 @@ bool qwTaskNotInExec(SQWTaskCtx *ctx) { int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) { int32_t taskNum = 0; - + int32_t code = TSDB_CODE_SUCCESS; + hbInfo->connInfo = sch->hbConnInfo; hbInfo->rsp.epId = sch->hbEpId; @@ -272,7 +290,11 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) status.status = taskStatus->status; status.refId = taskStatus->refId; - taosArrayPush(hbInfo->rsp.taskStatus, &status); + if (NULL == taosArrayPush(hbInfo->rsp.taskStatus, &status)) { + taosHashCancelIterate(sch->tasksHash, pIter); + code = terrno; + break; + } ++i; pIter = taosHashIterate(sch->tasksHash, pIter); @@ -280,7 +302,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) QW_UNLOCK(QW_READ, &sch->tasksLock); - return TSDB_CODE_SUCCESS; + return code; } int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, int32_t *pRawDataLen, void **rspMsg, @@ -320,7 +342,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, pOutput->numOfRows); if (!ctx->dynamicTask) { - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); + QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask)); } if (NULL == pRsp) { @@ -375,7 +397,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks, pOutput->numOfRows); - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); + QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask)); break; } @@ -464,7 +486,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32 qwMsg->connInfo = ctx->dataConnInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + QW_ERR_RET(qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code)); rsp = NULL; QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), @@ -650,7 +672,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp _return: if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) { - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC, ctx->dynamicTask); + code = qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC, ctx->dynamicTask); ctx->queryGotData = true; } @@ -660,7 +682,10 @@ _return: qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); if (!rsped) { - qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false); + int32_t newCode = qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false); + if (TSDB_CODE_SUCCESS != newCode && TSDB_CODE_SUCCESS == code) { + code = newCode; + } } } @@ -672,7 +697,7 @@ _return: } if (code) { - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); + (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // already in error, ignore new error } QW_UNLOCK(QW_WRITE, &ctx->lock); @@ -687,11 +712,11 @@ _return: int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) { QW_ERR_RET(qwDropTask(QW_FPARAMS())); - QW_RET(TSDB_CODE_SUCCESS); + return TSDB_CODE_SUCCESS; } int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; SQWTaskCtx *ctx = NULL; QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); @@ -706,7 +731,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); - qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true); + QW_ERR_JRET(qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true)); _return: @@ -715,7 +740,7 @@ _return: qwReleaseTaskCtx(mgmt, ctx); } - QW_RET(TSDB_CODE_SUCCESS); + return TSDB_CODE_SUCCESS; } int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { @@ -761,7 +786,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->sinkHandle, sinkHandle); - qwSaveTbVersionInfo(pTaskInfo, ctx); + QW_ERR_JRET(qwSaveTbVersionInfo(pTaskInfo, ctx)); if (!ctx->dynamicTask) { QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); @@ -778,7 +803,7 @@ _return: input.msgType = qwMsg->msgType; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); - qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code); + QW_ERR_RET(qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code)); QW_RET(TSDB_CODE_SUCCESS); } @@ -829,7 +854,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { qwMsg->connInfo = ctx->dataConnInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + QW_ERR_JRET(qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code)); rsp = NULL; QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, @@ -851,9 +876,14 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { rsp = NULL; qwMsg->connInfo = ctx->dataConnInfo; - qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code); - QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), - 0); + code = qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code); + if (TSDB_CODE_SUCCESS != code) { + QW_TASK_ELOG("fetch rsp send fail, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), + 0); + } else { + QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), + 0); + } } QW_LOCK(QW_WRITE, &ctx->lock); @@ -869,7 +899,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } while (true); input.code = code; - qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL); + QW_ERR_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL)); QW_RET(TSDB_CODE_SUCCESS); } @@ -922,7 +952,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } else if (QW_QUERY_RUNNING(ctx)) { atomic_store_8((int8_t *)&ctx->queryContinue, 1); } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) { - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask); + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask)); atomic_store_8((int8_t *)&ctx->queryInQueue, 1); QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo)); @@ -952,9 +982,14 @@ _return: } if (!rsped) { - qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); - QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), - qwMsg->connInfo.handle, code, tstrerror(code), dataLen); + code = qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + if (TSDB_CODE_SUCCESS != code) { + QW_TASK_ELOG("fetch rsp send fail, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), + qwMsg->connInfo.handle, code, tstrerror(code), dataLen); + } else { + QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), + qwMsg->connInfo.handle, code, tstrerror(code), dataLen); + } } else { qwFreeFetchRsp(rsp); rsp = NULL; @@ -985,7 +1020,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_QUERY_RUNNING(ctx)) { QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED)); - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP, ctx->dynamicTask); + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP, ctx->dynamicTask)); } else { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); dropped = true; @@ -1001,7 +1036,7 @@ _return: if (code) { if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); + (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // task already failed, no more error handling } else { tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); } @@ -1035,7 +1070,7 @@ int32_t qwProcessNotify(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_QUERY_RUNNING(ctx)) { QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED)); - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask)); } switch (qwMsg->msgType) { @@ -1055,7 +1090,7 @@ _return: if (code) { if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); + (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // task already failed, no more error handling } } @@ -1104,7 +1139,7 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { _return: memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); - qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); + code = qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); if (code) { tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); @@ -1125,7 +1160,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { int64_t refId = hbParam->refId; SQWorker *mgmt = qwAcquire(refId); if (NULL == mgmt) { - QW_DLOG("qwAcquire %" PRIx64 "failed", refId); + QW_DLOG("qwAcquire %" PRIx64 "failed, code:0x%x", refId, terrno); return; } @@ -1137,7 +1172,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { qwDbgDumpMgmtInfo(mgmt); if (gQWDebug.forceStop) { - (void)qwStopAllTasks(mgmt); + qwStopAllTasks(mgmt); } QW_LOCK(QW_READ, &mgmt->schLock); @@ -1145,8 +1180,8 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { int32_t schNum = taosHashGetSize(mgmt->schHash); if (schNum <= 0) { QW_UNLOCK(QW_READ, &mgmt->schLock); - taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); - qwRelease(refId); + (void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error + (void)qwRelease(refId); // ignore error return; } @@ -1156,9 +1191,9 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { QW_UNLOCK(QW_READ, &mgmt->schLock); taosMemoryFree(rspList); taosArrayDestroy(pExpiredSch); - QW_ELOG("calloc %d SQWHbInfo failed", schNum); - taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); - qwRelease(refId); + QW_ELOG("calloc %d SQWHbInfo failed, code:%x", schNum, terrno); + (void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error + (void)qwRelease(refId); // ignore error return; } @@ -1174,7 +1209,11 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && taosHashGetSize(sch1->tasksHash) <= 0) { - taosArrayPush(pExpiredSch, sId); + if (NULL == taosArrayPush(pExpiredSch, sId)) { + QW_ELOG("add sId 0x%" PRIx64 " to expiredSch failed, code:%x", *sId, terrno); + taosHashCancelIterate(mgmt->schHash, pIter); + break; + } } pIter = taosHashIterate(mgmt->schHash, pIter); @@ -1196,7 +1235,7 @@ _return: QW_UNLOCK(QW_READ, &mgmt->schLock); for (int32_t j = 0; j < i; ++j) { - qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); + (void)qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); // ignore error /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/ /*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/ tFreeSSchedulerHbRsp(&rspList[j].rsp); @@ -1209,8 +1248,8 @@ _return: taosMemoryFreeClear(rspList); taosArrayDestroy(pExpiredSch); - taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); - qwRelease(refId); + (void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error + (void)qwRelease(refId); // ignore error } int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { @@ -1333,7 +1372,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S _return: if (mgmt->refId >= 0) { - qwRelease(mgmt->refId); + (void)qwRelease(mgmt->refId); // ignore error } else { taosHashCleanup(mgmt->schHash); taosHashCleanup(mgmt->ctxHash); @@ -1353,7 +1392,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { atomic_store_8(&mgmt->nodeStopped, 1); - (void)qwStopAllTasks(mgmt); + qwStopAllTasks(mgmt); } void qWorkerDestroy(void **qWorkerMgmt) { @@ -1383,7 +1422,7 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SDataSinkStat sinkStat = {0}; - dsDataSinkGetCacheSize(&sinkStat); + QW_ERR_RET(dsDataSinkGetCacheSize(&sinkStat)); pStat->cacheDataSize = sinkStat.cachedSize; pStat->queryProcessed = QW_STAT_GET(mgmt->stat.msgStat.queryProcessed); @@ -1427,6 +1466,10 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64 ctx->explainRes = explainRes; rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb)); + if (NULL == rHandle.pMsgCb) { + QW_ERR_JRET(terrno); + } + rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle; code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH); diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 4a0d74a6e3..d292b271c5 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -130,30 +130,32 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) { fetchRpc->contLen = sizeof(SResFetchReq); } -void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { +int qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { dropMsg->sId = 1; dropMsg->queryId = atomic_load_64(&qwtTestQueryId); dropMsg->taskId = 1; int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, dropMsg); if (msgSize < 0) { - return; + return terrno; } char *msg = (char*)taosMemoryCalloc(1, msgSize); if (NULL == msg) { - return; + return terrno; } if (tSerializeSTaskDropReq(msg, msgSize, dropMsg) < 0) { taosMemoryFree(msg); - return; + return terrno; } dropRpc->msgType = TDMT_SCH_DROP_TASK; dropRpc->pCont = msg; dropRpc->contLen = msgSize; + + return TSDB_CODE_SUCCESS; } int32_t qwtStringToPlan(const char *str, SSubplan **subplan) { @@ -164,6 +166,10 @@ int32_t qwtStringToPlan(const char *str, SSubplan **subplan) { int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { taosWLockLatch(&qwtTestFetchQueueLock); struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg)); + if (NULL == newMsg) { + printf("malloc failed"); + assert(0); + } memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg; if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) { @@ -178,7 +184,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { } taosWUnLockLatch(&qwtTestFetchQueueLock); - tsem_post(&qwtTestFetchSem); + if (tsem_post(&qwtTestFetchSem) < 0) { + printf("tsem_post failed, errno:%d", errno); + assert(0); + } return 0; } @@ -186,6 +195,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) { taosWLockLatch(&qwtTestQueryQueueLock); struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg)); + if (NULL == newMsg) { + printf("malloc failed"); + assert(0); + } memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg; if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) { @@ -200,22 +213,34 @@ int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) { } taosWUnLockLatch(&qwtTestQueryQueueLock); - tsem_post(&qwtTestQuerySem); + if (tsem_post(&qwtTestQuerySem) < 0) { + printf("tsem_post failed, errno:%d", errno); + assert(0); + } return 0; } void qwtSendReqToDnode(void *pVnode, struct SEpSet *epSet, struct SRpcMsg *pReq) {} -void qwtRpcSendResponse(const SRpcMsg *pRsp) { +int qwtRpcSendResponse(const SRpcMsg *pRsp) { + int32_t code = 0; switch (pRsp->msgType) { case TDMT_SCH_QUERY_RSP: case TDMT_SCH_MERGE_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont; if (pRsp->code) { - qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); - qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); + if (code) { + assert(0); + return code; + } + code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + if (code) { + assert(0); + return code; + } } rpcFreeCont(rsp); @@ -227,13 +252,25 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { if (0 == pRsp->code && 0 == rsp->completed) { qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc); - qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc); + code = qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc); + if (code) { + assert(0); + return code; + } rpcFreeCont(rsp); - return; + return code; } - qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); - qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); + if (code) { + assert(0); + return code; + } + code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + if (code) { + assert(0); + return code; + } rpcFreeCont(rsp); break; @@ -245,9 +282,11 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { qwtTestCaseFinished = true; break; } + default: + break; } - return; + return code; } int32_t qwtCreateExecTask(void *tsdb, int32_t vgId, uint64_t taskId, struct SSubplan *pPlan, qTaskInfo_t *pTaskInfo, @@ -292,6 +331,9 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) { if (endExec) { *pRes = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (NULL == *pRes) { + return terrno; + } (*pRes)->info.rows = taosRand() % 1000 + 1; } else { *pRes = NULL; @@ -631,7 +673,7 @@ void *queryThread(void *param) { while (!qwtTestStop) { qwtBuildQueryReqMsg(&queryRpc); - qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); + (void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); // ignore error if (qwtTestEnableSleep) { taosUsleep(taosRand() % 5); } @@ -653,7 +695,7 @@ void *fetchThread(void *param) { while (!qwtTestStop) { qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); - code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); + (void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); // ignore error if (qwtTestEnableSleep) { taosUsleep(taosRand() % 5); } @@ -674,8 +716,11 @@ void *dropThread(void *param) { STaskDropReq dropMsg = {0}; while (!qwtTestStop) { - qwtBuildDropReqMsg(&dropMsg, &dropRpc); - code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); + if (0 != qwtBuildDropReqMsg(&dropMsg, &dropRpc)) { + break; + } + (void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); // ignore error + if (qwtTestEnableSleep) { taosUsleep(taosRand() % 5); } @@ -700,7 +745,7 @@ void *qwtclientThread(void *param) { qwtTestCaseFinished = false; qwtBuildQueryReqMsg(&queryRpc); - qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc); + (void)qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc); //ignore error while (!qwtTestCaseFinished) { taosUsleep(1); @@ -752,9 +797,9 @@ void *queryQueueThread(void *param) { } if (TDMT_SCH_QUERY == queryRpc->msgType) { - qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0); + (void)qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error } else if (TDMT_SCH_QUERY_CONTINUE == queryRpc->msgType) { - qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0); + (void)qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error } else { printf("unknown msg in query queue, type:%d\n", queryRpc->msgType); assert(0); @@ -810,16 +855,16 @@ void *fetchQueueThread(void *param) { switch (fetchRpc->msgType) { case TDMT_SCH_FETCH: case TDMT_SCH_MERGE_FETCH: - qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); + (void)qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error break; case TDMT_SCH_CANCEL_TASK: //qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0); break; case TDMT_SCH_DROP_TASK: - qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0); + (void)qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error break; case TDMT_SCH_TASK_NOTIFY: - qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0); + (void)qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error break; default: printf("unknown msg type:%d in fetch queue", fetchRpc->msgType); @@ -853,7 +898,7 @@ TEST(seqTest, normalCase) { qwtBuildQueryReqMsg(&queryRpc); qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc); - qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); + (void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error stubSetStringToPlan(); stubSetRpcSendResponse(); @@ -898,7 +943,7 @@ TEST(seqTest, cancelFirst) { qwtInitLogFile(); qwtBuildQueryReqMsg(&queryRpc); - qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); + (void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error stubSetStringToPlan(); stubSetRpcSendResponse(); @@ -954,7 +999,7 @@ TEST(seqTest, randCase) { if (r >= 0 && r < maxr / 5) { printf("Query,%d\n", t++); qwtBuildQueryReqMsg(&queryRpc); - code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); + (void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); //ignore error } else if (r >= maxr / 5 && r < maxr * 2 / 5) { // printf("Ready,%d\n", t++); // qwtBuildReadyReqMsg(&readyMsg, &readyRpc); @@ -965,14 +1010,14 @@ TEST(seqTest, randCase) { } else if (r >= maxr * 2 / 5 && r < maxr * 3 / 5) { printf("Fetch,%d\n", t++); qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); - code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); + (void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); //ignore error if (qwtTestEnableSleep) { taosUsleep(1); } } else if (r >= maxr * 3 / 5 && r < maxr * 4 / 5) { printf("Drop,%d\n", t++); - qwtBuildDropReqMsg(&dropMsg, &dropRpc); - code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); + (void)qwtBuildDropReqMsg(&dropMsg, &dropRpc); //ignore error + (void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); //ignore error if (qwtTestEnableSleep) { taosUsleep(1); } @@ -1018,14 +1063,14 @@ TEST(seqTest, multithreadRand) { ASSERT_EQ(code, 0); TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5, t6; - taosThreadCreate(&(t1), &thattr, queryThread, mgmt); - // taosThreadCreate(&(t2), &thattr, readyThread, NULL); - taosThreadCreate(&(t3), &thattr, fetchThread, NULL); - taosThreadCreate(&(t4), &thattr, dropThread, NULL); - taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, queryThread, mgmt); //ignore error + // (void)taosThreadCreate(&(t2), &thattr, readyThread, NULL); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchThread, NULL); //ignore error + (void)taosThreadCreate(&(t4), &thattr, dropThread, NULL); //ignore error + (void)taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1083,16 +1128,16 @@ TEST(rcTest, shortExecshortDelay) { qwtTestMaxExecTaskUsec = 0; qwtTestReqMaxDelayUsec = 0; - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1114,8 +1159,8 @@ TEST(rcTest, shortExecshortDelay) { if (qwtTestCaseFinished) { if (qwtTestQuitThreadNum < 3) { - tsem_post(&qwtTestQuerySem); - tsem_post(&qwtTestFetchSem); + (void)tsem_post(&qwtTestQuerySem); //ignore error + (void)tsem_post(&qwtTestFetchSem); //ignore error taosUsleep(10); } @@ -1166,16 +1211,16 @@ TEST(rcTest, longExecshortDelay) { qwtTestMaxExecTaskUsec = 1000000; qwtTestReqMaxDelayUsec = 0; - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1197,8 +1242,8 @@ TEST(rcTest, longExecshortDelay) { if (qwtTestCaseFinished) { if (qwtTestQuitThreadNum < 3) { - tsem_post(&qwtTestQuerySem); - tsem_post(&qwtTestFetchSem); + (void)tsem_post(&qwtTestQuerySem); //ignore error + (void)tsem_post(&qwtTestFetchSem); //ignore error taosUsleep(10); } @@ -1249,16 +1294,16 @@ TEST(rcTest, shortExeclongDelay) { qwtTestMaxExecTaskUsec = 0; qwtTestReqMaxDelayUsec = 1000000; - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1280,8 +1325,8 @@ TEST(rcTest, shortExeclongDelay) { if (qwtTestCaseFinished) { if (qwtTestQuitThreadNum < 3) { - tsem_post(&qwtTestQuerySem); - tsem_post(&qwtTestFetchSem); + (void)tsem_post(&qwtTestQuerySem); //ignore error + (void)tsem_post(&qwtTestFetchSem); //ignore error taosUsleep(10); } @@ -1327,16 +1372,16 @@ TEST(rcTest, dropTest) { code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 50de5e760d..8f9ea4d36c 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -1654,6 +1654,12 @@ static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) { (!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) { return TSDB_CODE_TSC_INVALID_OPERATION; } + if (nodesIsMatchRegularOp(pOp)) { + SValueNode* node = (SValueNode*)(pOp->pRight); + if(checkRegexPattern(node->literal) != TSDB_CODE_SUCCESS){ + return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR; + } + } } pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index c5789a65ca..16c06c2452 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -1675,10 +1675,8 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa pRes[i] = false; continue; } - char *pLeftData = colDataGetData(pLeft->columnData, leftIndex); char *pRightData = colDataGetData(pRight->columnData, rightIndex); - pRes[i] = filterDoCompare(fp, optr, pLeftData, pRightData); if (pRes[i]) { ++num; @@ -1714,7 +1712,6 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa if (!pLeftData || !pRightData) { result = false; } - if (!result) { colDataSetInt8(pOut->columnData, i, (int8_t *)&result); } else { diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index fe86e18ce3..dd88344962 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -43,6 +43,7 @@ #include "tglobal.h" #include "tlog.h" #include "tvariant.h" +#include "tcompare.h" #define _DEBUG_PRINT_ 0 @@ -52,6 +53,12 @@ #define PRINTF(...) #endif +class constantTest { + public: + constantTest() { InitRegexCache(); } + ~constantTest() { DestroyRegexCache(); } +}; +static constantTest test; namespace { SColumnInfo createColumnInfo(int32_t colId, int32_t type, int32_t bytes) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 2fb0adbdf4..4cbe0cb136 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1059,10 +1059,8 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { taosThreadMutexDestroy(&pInfo->lock); taosArrayDestroy(pInfo->pDispatchTriggerList); pInfo->pDispatchTriggerList = NULL; - taosArrayDestroy(pInfo->pReadyMsgList); pInfo->pReadyMsgList = NULL; - taosArrayDestroy(pInfo->pCheckpointReadyRecvList); pInfo->pCheckpointReadyRecvList = NULL; diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index e5f9fe43e7..4c056178c2 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -34,14 +34,12 @@ SArray* taosArrayInit(size_t size, size_t elemSize) { SArray* pArray = taosMemoryMalloc(sizeof(SArray)); if (pArray == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } pArray->size = 0; pArray->pData = taosMemoryCalloc(size, elemSize); if (pArray->pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pArray); return NULL; } diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index 26122a4a29..09599cead4 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -24,6 +24,7 @@ #include "tutil.h" #include "types.h" #include "osString.h" +#include "ttimer.h" int32_t setChkInBytes1(const void *pLeft, const void *pRight) { return NULL != taosHashGet((SHashObj *)pRight, pLeft, 1) ? 1 : 0; @@ -1203,54 +1204,153 @@ int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight) { return comparestrRegexMatch(pLeft, pRight) ? 0 : 1; } -static threadlocal regex_t pRegex; -static threadlocal char *pOldPattern = NULL; -static regex_t *threadGetRegComp(const char *pPattern) { - if (NULL != pOldPattern) { - if( strcmp(pOldPattern, pPattern) == 0) { - return &pRegex; - } else { - DestoryThreadLocalRegComp(); +typedef struct UsingRegex { + regex_t pRegex; + int32_t lastUsedTime; +} UsingRegex; + +typedef struct RegexCache { + SHashObj *regexHash; + void *regexCacheTmr; + void *timer; +} RegexCache; +static RegexCache sRegexCache; +#define MAX_REGEX_CACHE_SIZE 20 +#define REGEX_CACHE_CLEAR_TIME 30 + +static void checkRegexCache(void* param, void* tmrId) { + taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, sRegexCache.regexCacheTmr, &tmrId); + if (taosHashGetSize(sRegexCache.regexHash) < MAX_REGEX_CACHE_SIZE) { + return; + } + + if (taosHashGetSize(sRegexCache.regexHash) >= MAX_REGEX_CACHE_SIZE) { + UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL); + while ((ppUsingRegex != NULL)) { + if (taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) { + size_t len = 0; + char* key = (char*)taosHashGetKey(ppUsingRegex, &len); + taosHashRemove(sRegexCache.regexHash, key, len); + } + ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex); } } - pOldPattern = taosMemoryMalloc(strlen(pPattern) + 1); - if (NULL == pOldPattern) { +} + +void regexCacheFree(void *ppUsingRegex) { + regfree(&(*(UsingRegex **)ppUsingRegex)->pRegex); + taosMemoryFree(*(UsingRegex **)ppUsingRegex); +} + +int32_t InitRegexCache() { + sRegexCache.regexHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (sRegexCache.regexHash == NULL) { + uError("failed to create RegexCache"); + return -1; + } + taosHashSetFreeFp(sRegexCache.regexHash, regexCacheFree); + sRegexCache.regexCacheTmr = taosTmrInit(0, 0, 0, "REGEXCACHE"); + if (sRegexCache.regexCacheTmr == NULL) { + uError("failed to create regex cache check timer"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + sRegexCache.timer = taosTmrStart(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, NULL, sRegexCache.regexCacheTmr); + if (sRegexCache.timer == NULL) { + uError("failed to start regex cache timer"); + return -1; + } + + return 0; +} + +void DestroyRegexCache(){ + uInfo("[regex cache] destory regex cache"); + taosTmrStopA(&sRegexCache.timer); + taosHashCleanup(sRegexCache.regexHash); + taosTmrCleanUp(sRegexCache.regexCacheTmr); +} + +int32_t checkRegexPattern(const char *pPattern) { + if (pPattern == NULL) { + return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR; + } + + regex_t regex; + int32_t cflags = REG_EXTENDED; + int32_t ret = regcomp(®ex, pPattern, cflags); + if (ret != 0) { + char msgbuf[256] = {0}; + regerror(ret, ®ex, msgbuf, tListLen(msgbuf)); + uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf); + return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR; + } + regfree(®ex); + return TSDB_CODE_SUCCESS; +} + +static UsingRegex **getRegComp(const char *pPattern) { + UsingRegex **ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); + if (ppUsingRegex != NULL) { + (*ppUsingRegex)->lastUsedTime = taosGetTimestampSec(); + return ppUsingRegex; + } + + UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex)); + if (pUsingRegex == NULL) { uError("Failed to Malloc when compile regex pattern %s.", pPattern); return NULL; } - strcpy(pOldPattern, pPattern); int32_t cflags = REG_EXTENDED; - int32_t ret = regcomp(&pRegex, pPattern, cflags); + int32_t ret = regcomp(&pUsingRegex->pRegex, pPattern, cflags); if (ret != 0) { char msgbuf[256] = {0}; - regerror(ret, &pRegex, msgbuf, tListLen(msgbuf)); + regerror(ret, &pUsingRegex->pRegex, msgbuf, tListLen(msgbuf)); uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf); - DestoryThreadLocalRegComp(); + taosMemoryFree(pUsingRegex); return NULL; } - return &pRegex; + + while (true) { + int code = taosHashPut(sRegexCache.regexHash, pPattern, strlen(pPattern), &pUsingRegex, sizeof(UsingRegex *)); + if (code != 0 && code != TSDB_CODE_DUP_KEY) { + regexCacheFree(&pUsingRegex); + uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern); + return NULL; + } + ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern)); + if (ppUsingRegex) { + if (*ppUsingRegex != pUsingRegex) { + regexCacheFree(&pUsingRegex); + } + pUsingRegex = (*ppUsingRegex); + break; + } else { + continue; + } + } + pUsingRegex->lastUsedTime = taosGetTimestampSec(); + return ppUsingRegex; } -void DestoryThreadLocalRegComp() { - if (NULL != pOldPattern) { - regfree(&pRegex); - taosMemoryFree(pOldPattern); - pOldPattern = NULL; - } +void releaseRegComp(UsingRegex **regex){ + taosHashRelease(sRegexCache.regexHash, regex); } static int32_t doExecRegexMatch(const char *pString, const char *pPattern) { int32_t ret = 0; char msgbuf[256] = {0}; - regex_t *regex = threadGetRegComp(pPattern); - if (regex == NULL) { + UsingRegex **pUsingRegex = getRegComp(pPattern); + if (pUsingRegex == NULL) { return 1; } regmatch_t pmatch[1]; - ret = regexec(regex, pString, 1, pmatch, 0); + ret = regexec(&(*pUsingRegex)->pRegex, pString, 1, pmatch, 0); + releaseRegComp(pUsingRegex); if (ret != 0 && ret != REG_NOMATCH) { - regerror(ret, regex, msgbuf, sizeof(msgbuf)); + regerror(ret, &(*pUsingRegex)->pRegex, msgbuf, sizeof(msgbuf)); uDebug("Failed to match %s with pattern %s, reason %s", pString, pPattern, msgbuf) } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0f5ad73f61..347f0be4ff 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -684,6 +684,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_ERROR, "Pseudo tag tbname n TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_DUPLICATED, "Table name duplicated") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_NAME_DUPLICATED, "Tag name duplicated") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC, "Some functions cannot appear in the select list at the same time") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") //planner diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 4a8a0823b7..7a97dc3527 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -104,7 +104,6 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) { } destroyThreadLocalGeosCtx(); - DestoryThreadLocalRegComp(); return NULL; } @@ -224,7 +223,6 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) { taosUpdateItemSize(qinfo.queue, 1); } - DestoryThreadLocalRegComp(); return NULL; } @@ -636,7 +634,6 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { } destroyThreadLocalGeosCtx(); - DestoryThreadLocalRegComp(); return NULL; } diff --git a/tests/army/query/function/cast.py b/tests/army/query/function/cast.py new file mode 100644 index 0000000000..66718ce715 --- /dev/null +++ b/tests/army/query/function/cast.py @@ -0,0 +1,183 @@ +# -*- coding: utf-8 -*- + +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame import * + + +class TDTestCase(TBase): + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + self.dbname = "cast_db" + self._datetime_epoch = datetime.datetime.fromtimestamp(0) + + def cast_without_from(self): + # int + int_num = 2147483648 + tdSql.query(f"select cast({int_num} as int) re;") + tdSql.checkData(0, 0, -int_num) + + tdSql.query(f"select cast(2147483647 as int) re;") + tdSql.checkData(0, 0, 2147483647) + + tdSql.query(f"select cast({int_num} as int unsigned) re;") + tdSql.checkData(0, 0, int_num) + + tdSql.query(f"select cast({int_num} as bigint) re;") + tdSql.checkData(0, 0, int_num) + + tdSql.query(f"select cast({int_num} as bigint unsigned) re;") + tdSql.checkData(0, 0, int_num) + + tdSql.query(f"select cast({int_num} as smallint) re;") + tdSql.checkData(0, 0, 0) + + tdSql.query(f"select cast({int_num} as smallint unsigned) re;") + tdSql.checkData(0, 0, 0) + + tdSql.query(f"select cast({int_num} as tinyint) re;") + tdSql.checkData(0, 0, 0) + + tdSql.query(f"select cast({int_num} as tinyint unsigned) re;") + tdSql.checkData(0, 0, 0) + + tdSql.query(f"select cast({int_num} as float) re;") + tdSql.checkData(0, 0, '2147483648.0') + + tdSql.query(f"select cast({int_num} as double) re;") + tdSql.checkData(0, 0, '2147483648.0') + + tdSql.query(f"select cast({int_num} as bool) as re;") + tdSql.checkData(0, 0, True) + + tdSql.query(f"select cast({int_num} as timestamp) as re;") + tdSql.checkData(0, 0, self._datetime_epoch + datetime.timedelta(seconds=int(int_num) / 1000)) + + tdSql.query(f"select cast({int_num} as varchar(10)) as re;") + tdSql.checkData(0, 0, int_num) + + tdSql.query(f"select cast({int_num} as binary(10)) as re;") + tdSql.checkData(0, 0, int_num) + + sql = f"select cast({int_num} as nchar(10));" + tdSql.query(sql) + tdSql.checkData(0, 0, int_num) + + # float + float_1001 = 3.14159265358979323846264338327950288419716939937510582097494459230781640628620899862803482534211706798214808651328230664709384460955058223172535940812848111745028410270193852110555964462294895493038196442881097566593344612847564823378678316527120190914564856692346034861045432664821339360726024914127372458700660631558817488152092096282925409171536436789259036001133053054882046652138414695194151160943305727036575959195309218611738193261179310511854807446237996274956735188575272489122793818301194912983367336244065664308602139494639522473719070217986094370277053921717629317675238467481846766940513200056812714526356082778577134275778960917363717872146844090122495343014654958537105079227968925892354201995611212902196086403441815981362977477130996051870721134999999837297804995105973173281609631859502445945534690830264252230825334468503526193118817101000313783875288658753320838142061717766914730359825349042875546873115956286388235378759375195778185778053217122680661300192787661119590921642019 + + tdSql.query(f"select cast({float_1001} as int) as re;") + tdSql.checkData(0, 0, 3) + + tdSql.query(f"select cast({float_1001} as int unsigned) as re;") + tdSql.checkData(0, 0, 3) + + tdSql.query(f"select cast({float_1001} as tinyint) as re;") + tdSql.checkData(0, 0, 3) + + tdSql.query(f"select cast({float_1001} as tinyint unsigned) as re;") + tdSql.checkData(0, 0, 3) + + tdSql.query(f"select cast({float_1001} as smallint) as re;") + tdSql.checkData(0, 0, 3) + + tdSql.query(f"select cast({float_1001} as smallint unsigned) as re;") + tdSql.checkData(0, 0, 3) + + tdSql.query(f"select cast({float_1001} as bigint) as re;") + tdSql.checkData(0, 0, 3) + + tdSql.query(f"select cast({float_1001} as bigint unsigned) as re;") + tdSql.checkData(0, 0, 3) + + tdSql.query(f"select cast({float_1001} as double) as re;") + tdSql.checkData(0, 0, 3.141592653589793) + + tdSql.query(f"select cast({float_1001} as float) as re;") + tdSql.checkData(0, 0, 3.1415927) + + tdSql.query(f"select cast({float_1001} as bool) as re;") + tdSql.checkData(0, 0, True) + + tdSql.query(f"select cast({float_1001} as timestamp) as re;") + tdSql.checkData(0, 0, self._datetime_epoch + datetime.timedelta(seconds=int(float_1001) / 1000)) + + sql = f"select cast({float_1001} as varchar(5)) as re;" + tdSql.query(sql) + tdSql.checkData(0, 0, 3.141) + + sql = f"select cast({float_1001} as binary(10)) as re;" + tdSql.query(sql) + tdSql.checkData(0, 0, 3.141593) + + tdSql.query(f"select cast({float_1001} as nchar(5));") + tdSql.checkData(0, 0, 3.141) + + # str + str_410 = "bcdefghigk" * 41 + big_str = "bcdefghigk" * 6552 + + tdSql.query(f"select cast('{str_410}' as binary(3)) as re;") + tdSql.checkData(0, 0, "bcd") + + tdSql.query(f"select cast('{str_410}' as varchar(2)) as re;") + tdSql.checkData(0, 0, "bc") + + tdSql.query(f"select cast('{str_410}' as nchar(10));") + tdSql.checkData(0, 0, "bcdefghigk") + + tdSql.query(f"select cast('北京' as nchar(10));") + tdSql.checkData(0, 0, "北京") + + tdSql.query(f"select cast('{str_410}' as int) as re;") + tdSql.checkData(0, 0, 0) + + tdSql.query(f"select cast('{str_410}' as int unsigned) as re;") + tdSql.checkData(0, 0, 0) + + tdSql.query(f"select cast('{str_410}' as tinyint) as re") + tdSql.checkData(0, 0, 0) + + tdSql.query(f"select cast('{str_410}' as tinyint unsigned) as re") + tdSql.checkData(0, 0, 0) + + tdSql.query(f"select cast('{str_410}' as smallint) as re") + tdSql.checkData(0, 0, 0) + + tdSql.query(f"select cast('{str_410}' as smallint unsigned) as re") + tdSql.checkData(0, 0, 0) + + tdSql.query(f"select cast('{str_410}' as bigint) as re") + tdSql.checkData(0, 0, 0) + + tdSql.query(f"select cast('{str_410}' as bigint unsigned) as re") + tdSql.checkData(0, 0, 0) + + tdSql.query(f"select cast('{str_410}' as float) as re;") + tdSql.checkData(0, 0, 0.0000000) + + tdSql.query(f"select cast('{str_410}' as double) as re;") + tdSql.checkData(0, 0, 0.000000000000000) + + tdSql.query(f"select cast('{str_410}' as bool) as re") + tdSql.checkData(0, 0, False) + + tdSql.query( f"select cast('{str_410}' as timestamp) as re") + tdSql.checkData(0, 0, "1970-01-01 08:00:00.000") + + + def run(self): + # self.prepare_data() + # self.all_test() + # tdSql.execute(f"flush database {self.dbname}") + # self.all_test() + self.cast_without_from() + + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/army/query/queryBugs.py b/tests/army/query/queryBugs.py index f6075f6063..20ecb23881 100644 --- a/tests/army/query/queryBugs.py +++ b/tests/army/query/queryBugs.py @@ -36,7 +36,7 @@ class TDTestCase(TBase): "create table db.st(ts timestamp, age int) tags(area tinyint);", "insert into db.t1 using db.st tags(100) values('2024-01-01 10:00:01', 1);", "insert into db.t2 using db.st tags(110) values('2024-01-01 10:00:02', 2);", - "insert into db.t3 using db.st tags(3) values('2024-01-01 10:00:03', 3);" + "insert into db.t3 using db.st tags(3) values('2024-01-01 10:00:03', 3);", ] tdSql.executes(sqls) @@ -44,7 +44,7 @@ class TDTestCase(TBase): results = [ ["2024-01-01 10:00:01", 1, 100], ["2024-01-01 10:00:02", 2, 110], - ["2024-01-01 10:00:03", 3, 3] + ["2024-01-01 10:00:03", 3, 3], ] tdSql.checkDataMem(sql, results) @@ -99,11 +99,12 @@ class TDTestCase(TBase): for i in range(1, 10): new_ts = base_ts + i * 1000 num = i * 100 + v1, v2 = i * 10, i * 11 sqls = [ - f"insert into ntb1 values({new_ts}, 'nihao{num}', {10*i}, {10*i}, {10*i});", + f"insert into ntb1 values({new_ts}, 'nihao{num}', {v1}, {v1}, {v1});", f"insert into ntb1 values({new_ts + 1}, 'nihao{num + 1}', NULL, NULL, NULL);", f"delete from ntb1 where ts = {new_ts};", - f"insert into ntb1 values({new_ts + 2}, 'nihao{num + 2}', {11*i}, {11*i}, {11*i});", + f"insert into ntb1 values({new_ts + 2}, 'nihao{num + 2}', {v2}, {v2}, {v2});", ] tdSql.executes(sqls) diff --git a/tests/army/s3/s3Basic.json b/tests/army/s3/s3Basic.json index ef1585d2ba..c3fcdb567c 100644 --- a/tests/army/s3/s3Basic.json +++ b/tests/army/s3/s3Basic.json @@ -36,7 +36,7 @@ "insert_rows": 2000000, "childtable_prefix": "d", "insert_mode": "taosc", - "timestamp_step": 1000, + "timestamp_step": 100, "start_timestamp": 1600000000000, "columns": [ { "type": "bool", "name": "bc"}, diff --git a/tests/army/s3/s3Basic.py b/tests/army/s3/s3Basic.py index f94fe611a6..466cc5ab19 100644 --- a/tests/army/s3/s3Basic.py +++ b/tests/army/s3/s3Basic.py @@ -73,7 +73,7 @@ class TDTestCase(TBase): # come from s3_basic.json self.childtable_count = 6 self.insert_rows = 2000000 - self.timestamp_step = 1000 + self.timestamp_step = 100 def createStream(self, sname): sql = f"create stream {sname} fill_history 1 into stm1 as select count(*) from {self.db}.{self.stb} interval(10s);" @@ -262,7 +262,7 @@ class TDTestCase(TBase): # come from s3_basic.json self.insert_rows += self.insert_rows/4 - self.timestamp_step = 500 + self.timestamp_step = 50 # delete def checkDelete(self): diff --git a/tests/army/s3/s3Basic1.json b/tests/army/s3/s3Basic1.json index fb95c14e98..0618c341c9 100644 --- a/tests/army/s3/s3Basic1.json +++ b/tests/army/s3/s3Basic1.json @@ -36,7 +36,7 @@ "insert_rows": 1000000, "childtable_prefix": "d", "insert_mode": "taosc", - "timestamp_step": 500, + "timestamp_step": 50, "start_timestamp": 1600000000000, "columns": [ { "type": "bool", "name": "bc"}, diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 4338187791..848917ceb5 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -15,6 +15,7 @@ ,,n,army,python3 ./test.py -f s3/s3Basic.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f cluster/snapshot.py -N 3 -L 3 -D 2 ,,y,army,./pytest.sh python3 ./test.py -f query/function/test_func_elapsed.py +,,y,army,./pytest.sh python3 ./test.py -f query/function/cast.py ,,y,army,./pytest.sh python3 ./test.py -f query/test_join.py ,,y,army,./pytest.sh python3 ./test.py -f query/test_compare.py ,,y,army,./pytest.sh python3 ./test.py -f insert/test_column_tag_boundary.py @@ -192,6 +193,10 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py -Q 3 diff --git a/tests/system-test/2-query/match.py b/tests/system-test/2-query/match.py new file mode 100644 index 0000000000..cd2ed5d96b --- /dev/null +++ b/tests/system-test/2-query/match.py @@ -0,0 +1,141 @@ +import taos +import sys +import datetime +import inspect +import threading +import time + +from util.log import * +from util.sql import * +from util.cases import * +from util.common import tdCom + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + + def initConnection(self): + self.records = 10000000 + self.numOfTherads = 50 + self.ts = 1537146000000 + self.host = "127.0.0.1" + self.user = "root" + self.password = "taosdata" + self.config = "/home/xp/git/TDengine/sim/dnode1/cfg" + self.conn = taos.connect( + self.host, + self.user, + self.password, + self.config) + + def initDB(self): + tdSql.execute("drop database if exists db") + tdSql.execute("create database if not exists db") + + def stopTest(self): + tdSql.execute("drop database if exists db") + + def threadTest(self, threadID): + print(f"Thread {threadID} starting...") + tdsqln = tdCom.newTdSql() + for i in range(2, 50): + tdsqln.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}dx'") + tdsqln.checkRows(0) + for i in range(100): + tdsqln.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*1x'") + tdsqln.checkRows(2) + + tdsqln.query("select * from db.t1x") + tdsqln.checkRows(5) + + tdsqln.query("select * from db.t1x where c1 match '_c'") + tdsqln.checkRows(2) + + tdsqln.query("select * from db.t1x where c1 match '%__c'") + tdsqln.checkRows(0) + + tdsqln.error("select * from db.t1x where c1 match '*d'") + + print(f"Thread {threadID} finished.") + + def match_test(self): + tdSql.execute("create table db.t1x (ts timestamp, c1 varchar(100))") + tdSql.execute("create table db.t_1x (ts timestamp, c1 varchar(100))") + + tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*1x'") + tdSql.checkRows(2) + for i in range(2, 50): + tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}x'") + tdSql.checkRows(0) + + tdSql.error("select * from db.t1x where c1 match '*d'") + tdSql.query("insert into db.t1x values(now, 'abc'), (now+1s, 'a%c'),(now+2s, 'a_c'),(now+3s, '_c'),(now+4s, '%c')") + + tdSql.query("select * from db.t1x") + tdSql.checkRows(5) + + tdSql.query("select * from db.t1x where c1 match '_c'") + tdSql.checkRows(2) + + tdSql.query("select * from db.t1x where c1 match '%__c'") + tdSql.checkRows(0) + tdSql.error("select * from db.t1x where c1 match '*d'") + threads = [] + for i in range(10): + t = threading.Thread(target=self.threadTest, args=(i,)) + threads.append(t) + t.start() + + time.sleep(31) + + tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*1x'") + tdSql.checkRows(2) + for i in range(2, 50): + tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}x'") + tdSql.checkRows(0) + + tdSql.query("select * from db.t1x") + tdSql.checkRows(5) + + tdSql.query("select * from db.t1x where c1 match '_c'") + tdSql.checkRows(2) + + tdSql.query("select * from db.t1x where c1 match '%__c'") + tdSql.checkRows(0) + + tdSql.execute("create table db.t3x (ts timestamp, c1 varchar(100))") + + tdSql.execute("insert into db.t3x values(now, '我是中文'), (now+1s, '我是_中文'), (now+2s, '我是%中文'), (now+3s, '%中文'),(now+4s, '_中文')") + tdSql.query("select * from db.t3x where c1 match '%中文'") + tdSql.checkRows(2) + tdSql.query("select * from db.t3x where c1 match '中文'") + tdSql.checkRows(5) + tdSql.error("select * from db.t1x where c1 match '*d'") + + for thread in threads: + print(f"Thread waitting for finish...") + thread.join() + + print(f"Mutithread test finished.") + + def run(self): + tdLog.printNoPrefix("==========start match_test run ...............") + tdSql.prepare(replica = self.replicaVar) + + self.initConnection() + + self.initDB() + self.match_test() + + self.stopTest() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file index 96f9452827..cdc4e27f20 100644 --- a/tests/system-test/win-test-file +++ b/tests/system-test/win-test-file @@ -104,6 +104,10 @@ python3 ./test.py -f 2-query/like.py python3 ./test.py -f 2-query/like.py -Q 2 python3 ./test.py -f 2-query/like.py -Q 3 python3 ./test.py -f 2-query/like.py -Q 4 +python3 ./test.py -f 2-query/match.py +python3 ./test.py -f 2-query/match.py -Q 2 +python3 ./test.py -f 2-query/match.py -Q 3 +python3 ./test.py -f 2-query/match.py -Q 4 python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False python3 ./test.py -f 3-enterprise/restore/restoreMnode.py -N 5 -M 3 -i False