other: merge 3.0

This commit is contained in:
Haojun Liao 2024-07-15 19:41:54 +08:00
commit 25f5c9dc5e
39 changed files with 872 additions and 219 deletions

View File

@ -152,7 +152,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
* @return * @return
*/ */
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
int32_t* tversion, int32_t idx); int32_t* tversion, int32_t idx, bool* tbGet);
/** /**
* The main task execution function, including query on both table and multiple tables, * The main task execution function, including query on both table and multiple tables,

View File

@ -626,6 +626,7 @@ bool nodesIsArithmeticOp(const SOperatorNode* pOp);
bool nodesIsComparisonOp(const SOperatorNode* pOp); bool nodesIsComparisonOp(const SOperatorNode* pOp);
bool nodesIsJsonOp(const SOperatorNode* pOp); bool nodesIsJsonOp(const SOperatorNode* pOp);
bool nodesIsRegularOp(const SOperatorNode* pOp); bool nodesIsRegularOp(const SOperatorNode* pOp);
bool nodesIsMatchRegularOp(const SOperatorNode* pOp);
bool nodesIsBitwiseOp(const SOperatorNode* pOp); bool nodesIsBitwiseOp(const SOperatorNode* pOp);
bool nodesExprHasColumn(SNode* pNode); bool nodesExprHasColumn(SNode* pNode);

View File

@ -837,6 +837,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_PAR_TBNAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267E) #define TSDB_CODE_PAR_TBNAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267E)
#define TSDB_CODE_PAR_TAG_NAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267F) #define TSDB_CODE_PAR_TAG_NAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267F)
#define TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC TAOS_DEF_ERROR_CODE(0, 0x2680) #define TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC TAOS_DEF_ERROR_CODE(0, 0x2680)
#define TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR TAOS_DEF_ERROR_CODE(0, 0x2681)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
//planner //planner

View File

@ -45,7 +45,10 @@ typedef struct SPatternCompareInfo {
TdUcs4 umatchOne; // unicode version matchOne TdUcs4 umatchOne; // unicode version matchOne
} SPatternCompareInfo; } SPatternCompareInfo;
int32_t InitRegexCache();
void DestroyRegexCache();
int32_t patternMatch(const char *pattern, size_t psize, const char *str, size_t ssize, const SPatternCompareInfo *pInfo); int32_t patternMatch(const char *pattern, size_t psize, const char *str, size_t ssize, const SPatternCompareInfo *pInfo);
int32_t checkRegexPattern(const char *pPattern);
int32_t wcsPatternMatch(const TdUcs4 *pattern, size_t psize, const TdUcs4 *str, size_t ssize, const SPatternCompareInfo *pInfo); int32_t wcsPatternMatch(const TdUcs4 *pattern, size_t psize, const TdUcs4 *str, size_t ssize, const SPatternCompareInfo *pInfo);
@ -83,7 +86,6 @@ int32_t compareLenBinaryVal(const void *pLeft, const void *pRight);
int32_t comparestrRegexMatch(const void *pLeft, const void *pRight); int32_t comparestrRegexMatch(const void *pLeft, const void *pRight);
int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight); int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight);
void DestoryThreadLocalRegComp();
int32_t comparewcsRegexMatch(const void *pLeft, const void *pRight); int32_t comparewcsRegexMatch(const void *pLeft, const void *pRight);
int32_t comparewcsRegexNMatch(const void *pLeft, const void *pRight); int32_t comparewcsRegexNMatch(const void *pLeft, const void *pRight);

View File

@ -56,7 +56,7 @@ void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen); void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
int32_t parseCfgReal(const char* str, double* out); int32_t parseCfgReal(const char *str, double *out);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
T_MD5_CTX context; T_MD5_CTX context;
@ -84,9 +84,9 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar
static FORCE_INLINE int32_t taosCreateMD5Hash(char *pBuf, int32_t len) { static FORCE_INLINE int32_t taosCreateMD5Hash(char *pBuf, int32_t len) {
T_MD5_CTX ctx; T_MD5_CTX ctx;
tMD5Init(&ctx); tMD5Init(&ctx);
tMD5Update(&ctx, (uint8_t*)pBuf, len); tMD5Update(&ctx, (uint8_t *)pBuf, len);
tMD5Final(&ctx); tMD5Final(&ctx);
char* p = pBuf; char *p = pBuf;
int32_t resLen = 0; int32_t resLen = 0;
for (uint8_t i = 0; i < tListLen(ctx.digest); ++i) { for (uint8_t i = 0; i < tListLen(ctx.digest); ++i) {
resLen += snprintf(p, 3, "%02x", ctx.digest[i]); resLen += snprintf(p, 3, "%02x", ctx.digest[i]);
@ -147,6 +147,30 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
#define TCONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr)-offsetof(type, member))) #define TCONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr)-offsetof(type, member)))
#define TAOS_RETURN(code) \
do { \
return (terrno = (code)); \
} while (0)
#define TAOS_CHECK_RETURN(CMD) \
do { \
int32_t code = (CMD); \
if (code != TSDB_CODE_SUCCESS) { \
TAOS_RETURN(code); \
} \
} while (0)
#define TAOS_CHECK_GOTO(CMD, LINO, LABEL) \
do { \
code = (CMD); \
if (code != TSDB_CODE_SUCCESS) { \
if (LINO) { \
*((int32_t *)(LINO)) = __LINE__; \
} \
goto LABEL; \
} \
} while (0)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -1398,10 +1398,8 @@ void hbMgrCleanUp() {
taosThreadMutexLock(&clientHbMgr.lock); taosThreadMutexLock(&clientHbMgr.lock);
appHbMgrCleanup(); appHbMgrCleanup();
taosArrayDestroy(clientHbMgr.appHbMgrs); taosArrayDestroy(clientHbMgr.appHbMgrs);
clientHbMgr.appHbMgrs = NULL; clientHbMgr.appHbMgrs = NULL;
taosThreadMutexUnlock(&clientHbMgr.lock); taosThreadMutexUnlock(&clientHbMgr.lock);
} }

View File

@ -9507,7 +9507,6 @@ static void tDeleteMqDataRspCommon(void *rsp) {
SMqDataRspCommon *pRsp = rsp; SMqDataRspCommon *pRsp = rsp;
taosArrayDestroy(pRsp->blockDataLen); taosArrayDestroy(pRsp->blockDataLen);
pRsp->blockDataLen = NULL; pRsp->blockDataLen = NULL;
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
pRsp->blockData = NULL; pRsp->blockData = NULL;
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper); taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper);
@ -9562,7 +9561,6 @@ void tDeleteSTaosxRsp(void *rsp) {
STaosxRsp *pRsp = (STaosxRsp *)rsp; STaosxRsp *pRsp = (STaosxRsp *)rsp;
taosArrayDestroy(pRsp->createTableLen); taosArrayDestroy(pRsp->createTableLen);
pRsp->createTableLen = NULL; pRsp->createTableLen = NULL;
taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree);
pRsp->createTableReq = NULL; pRsp->createTableReq = NULL;
} }

View File

@ -18,6 +18,7 @@
#include "audit.h" #include "audit.h"
#include "libs/function/tudf.h" #include "libs/function/tudf.h"
#include "tgrant.h" #include "tgrant.h"
#include "tcompare.h"
#define DM_INIT_AUDIT() \ #define DM_INIT_AUDIT() \
do { \ do { \
@ -163,6 +164,7 @@ int32_t dmInit() {
if (dmInitMonitor() != 0) return -1; if (dmInitMonitor() != 0) return -1;
if (dmInitAudit() != 0) return -1; if (dmInitAudit() != 0) return -1;
if (dmInitDnode(dmInstance()) != 0) return -1; if (dmInitDnode(dmInstance()) != 0) return -1;
if (InitRegexCache() != 0) return -1;
#if defined(USE_S3) #if defined(USE_S3)
if (s3Begin() != 0) return -1; if (s3Begin() != 0) return -1;
#endif #endif
@ -192,6 +194,7 @@ void dmCleanup() {
udfStopUdfd(); udfStopUdfd();
taosStopCacheRefreshWorker(); taosStopCacheRefreshWorker();
dmDiskClose(); dmDiskClose();
DestroyRegexCache();
#if defined(USE_S3) #if defined(USE_S3)
s3End(); s3End();

View File

@ -195,7 +195,9 @@ void *freeStreamTasks(SArray *pTaskLevel) {
taosArrayDestroy(pLevel); taosArrayDestroy(pLevel);
} }
return taosArrayDestroy(pTaskLevel); taosArrayDestroy(pTaskLevel);
return NULL;
} }
void tFreeStreamObj(SStreamObj *pStream) { void tFreeStreamObj(SStreamObj *pStream) {

View File

@ -251,7 +251,6 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag); tTagNew(tagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
tagArray = NULL; tagArray = NULL;
if (pCreateTbReq->ctb.pTag == NULL) { if (pCreateTbReq->ctb.pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq); tdDestroySVCreateTbReq(pCreateTbReq);
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
@ -509,7 +508,6 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
if (pTableData->aRowP == NULL || pVals == NULL) { if (pTableData->aRowP == NULL || pVals == NULL) {
taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pTableData->aRowP);
pTableData->aRowP = NULL; pTableData->aRowP = NULL;
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code)); tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code));
@ -536,7 +534,6 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
ts, earlyTs); ts, earlyTs);
taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pTableData->aRowP);
pTableData->aRowP = NULL; pTableData->aRowP = NULL;
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -310,7 +310,6 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
taosArrayDestroy(pInfo->delSkyline); taosArrayDestroy(pInfo->delSkyline);
pInfo->delSkyline = NULL; pInfo->delSkyline = NULL;
pInfo->lastProcKey.ts = ts; pInfo->lastProcKey.ts = ts;
// todo check the nextProcKey info // todo check the nextProcKey info
pInfo->sttKeyInfo.nextProcKey.ts = ts + step; pInfo->sttKeyInfo.nextProcKey.ts = ts + step;

View File

@ -479,12 +479,14 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI
} }
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
int32_t* tversion, int32_t idx) { int32_t* tversion, int32_t idx, bool* tbGet) {
*tbGet = false;
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL); ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (taosArrayGetSize(pTaskInfo->schemaInfos) <= idx) { if (taosArrayGetSize(pTaskInfo->schemaInfos) <= idx) {
return -1; return TSDB_CODE_SUCCESS;
} }
SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, idx); SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, idx);
@ -502,6 +504,8 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
tableName[0] = 0; tableName[0] = 0;
} }
*tbGet = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -129,7 +129,6 @@ static void destroyStreamFillOperatorInfo(void* param) {
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
pInfo->pSrcBlock = blockDataDestroy(pInfo->pSrcBlock); pInfo->pSrcBlock = blockDataDestroy(pInfo->pSrcBlock);
pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes);
taosArrayDestroy(pInfo->matchInfo.pList); taosArrayDestroy(pInfo->matchInfo.pList);
pInfo->matchInfo.pList = NULL; pInfo->matchInfo.pList = NULL;
taosMemoryFree(pInfo); taosMemoryFree(pInfo);

View File

@ -1138,7 +1138,6 @@ void destroyIntervalOperatorInfo(void* param) {
taosArrayDestroy(pInfo->pInterpCols); taosArrayDestroy(pInfo->pInterpCols);
pInfo->pInterpCols = NULL; pInfo->pInterpCols = NULL;
taosArrayDestroyEx(pInfo->pPrevValues, freeItem); taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
pInfo->pPrevValues = NULL; pInfo->pPrevValues = NULL;

View File

@ -1441,6 +1441,9 @@ _stddev_over:
} }
static void stddevTransferInfo(SStddevRes* pInput, SStddevRes* pOutput) { static void stddevTransferInfo(SStddevRes* pInput, SStddevRes* pOutput) {
if (IS_NULL_TYPE(pInput->type)) {
return;
}
pOutput->type = pInput->type; pOutput->type = pInput->type;
if (IS_SIGNED_NUMERIC_TYPE(pOutput->type)) { if (IS_SIGNED_NUMERIC_TYPE(pOutput->type)) {
pOutput->quadraticISum += pInput->quadraticISum; pOutput->quadraticISum += pInput->quadraticISum;

View File

@ -2203,6 +2203,17 @@ bool nodesIsRegularOp(const SOperatorNode* pOp) {
return false; return false;
} }
bool nodesIsMatchRegularOp(const SOperatorNode* pOp) {
switch (pOp->opType) {
case OP_TYPE_MATCH:
case OP_TYPE_NMATCH:
return true;
default:
break;
}
return false;
}
bool nodesIsBitwiseOp(const SOperatorNode* pOp) { bool nodesIsBitwiseOp(const SOperatorNode* pOp) {
switch (pOp->opType) { switch (pOp->opType) {
case OP_TYPE_BIT_AND: case OP_TYPE_BIT_AND:

View File

@ -223,6 +223,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Tag name:%s duplicated"; return "Tag name:%s duplicated";
case TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC: case TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC:
return "Some functions cannot appear in the select list at the same time"; return "Some functions cannot appear in the select list at the same time";
case TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR:
return "Syntax error in regular expression";
default: default:
return "Unknown error"; return "Unknown error";
} }

View File

@ -391,7 +391,7 @@ void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx);
int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode); int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode);
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status, bool dynamicTask); int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status, bool dynamicTask);
int32_t qwDropTask(QW_FPARAMS_DEF); int32_t qwDropTask(QW_FPARAMS_DEF);
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx);
int32_t qwOpenRef(void); int32_t qwOpenRef(void);
void qwSetHbParam(int64_t refId, SQWHbParam **pParam); void qwSetHbParam(int64_t refId, SQWHbParam **pParam);
int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);

View File

@ -173,8 +173,21 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int
if (pEpSet) { if (pEpSet) {
contLen = tSerializeSEpSet(NULL, 0, pEpSet); contLen = tSerializeSEpSet(NULL, 0, pEpSet);
if (contLen < 0) {
qError("tSerializeSEpSet failed, code:%x", terrno);
return terrno;
}
rsp = rpcMallocCont(contLen); rsp = rpcMallocCont(contLen);
tSerializeSEpSet(rsp, contLen, pEpSet); if (NULL == rsp) {
qError("rpcMallocCont %d failed, code:%x", contLen, terrno);
return terrno;
}
contLen = tSerializeSEpSet(rsp, contLen, pEpSet);
if (contLen < 0) {
qError("tSerializeSEpSet second failed, code:%x", terrno);
return terrno;
}
} }
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
@ -216,20 +229,20 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
epSet.eps[2].port = 7300; epSet.eps[2].port = 7300;
ctx->phase = QW_PHASE_POST_QUERY; ctx->phase = QW_PHASE_POST_QUERY;
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet); (void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet); // ignore error
*rsped = true; *rsped = true;
return; return;
} }
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY); QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY);
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); (void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error
*rsped = true; *rsped = true;
return; return;
} }
if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) { if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) {
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); (void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error
*rsped = true; *rsped = true;
return; return;
} }
@ -260,8 +273,8 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) {
if (++ignoreTime > 10 && 0 == taosRand() % 9) { if (++ignoreTime > 10 && 0 == taosRand() % 9) {
if (ctx->fetchMsgType == TDMT_SCH_FETCH) { if (ctx->fetchMsgType == TDMT_SCH_FETCH) {
qwBuildAndSendErrorRsp(TDMT_SCH_LINK_BROKEN, &ctx->ctrlConnInfo, TSDB_CODE_RPC_BROKEN_LINK); (void)qwBuildAndSendErrorRsp(TDMT_SCH_LINK_BROKEN, &ctx->ctrlConnInfo, TSDB_CODE_RPC_BROKEN_LINK); // ignore error
qwBuildAndSendErrorRsp(ctx->fetchMsgType + 1, &ctx->dataConnInfo, TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); (void)qwBuildAndSendErrorRsp(ctx->fetchMsgType + 1, &ctx->dataConnInfo, TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); // ignore error
*rsped = true; *rsped = true;
taosSsleep(3); taosSsleep(3);

View File

@ -105,8 +105,19 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray *pExecList) {
SExplainRsp rsp = {.numOfPlans = taosArrayGetSize(pExecList), .subplanInfo = pInfo}; SExplainRsp rsp = {.numOfPlans = taosArrayGetSize(pExecList), .subplanInfo = pInfo};
int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp); int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
if (contLen < 0) {
qError("tSerializeSExplainRsp failed, error: %x", terrno);
QW_RET(terrno);
}
void *pRsp = rpcMallocCont(contLen); void *pRsp = rpcMallocCont(contLen);
tSerializeSExplainRsp(pRsp, contLen, &rsp); if (NULL == pRsp) {
QW_RET(terrno);
}
contLen = tSerializeSExplainRsp(pRsp, contLen, &rsp);
if (contLen < 0) {
qError("tSerializeSExplainRsp second failed, error: %x", terrno);
QW_RET(terrno);
}
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_SCH_EXPLAIN_RSP, .msgType = TDMT_SCH_EXPLAIN_RSP,
@ -123,8 +134,20 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray *pExecList) {
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
if (contLen < 0) {
qError("tSerializeSSchedulerHbRsp failed, error: %x", terrno);
QW_RET(terrno);
}
void *pRsp = rpcMallocCont(contLen); void *pRsp = rpcMallocCont(contLen);
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); if (NULL == pRsp) {
QW_RET(terrno);
}
contLen = tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
if (contLen < 0) {
qError("tSerializeSSchedulerHbRsp second failed, error: %x", terrno);
QW_RET(terrno);
}
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP, .msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP,
@ -143,6 +166,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
int32_t code) { int32_t code) {
if (NULL == pRsp) { if (NULL == pRsp) {
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
if (NULL == pRsp) {
QW_RET(terrno);
}
memset(pRsp, 0, sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp));
dataLength = 0; dataLength = 0;
} }
@ -164,6 +190,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
#if 0 #if 0
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) { int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
if (NULL == pRsp) {
QW_RET(terrno);
}
pRsp->code = code; pRsp->code = code;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
@ -180,6 +209,9 @@ int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) { int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp)); STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
if (NULL == pRsp) {
QW_RET(terrno);
}
pRsp->code = code; pRsp->code = code;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
@ -405,6 +437,7 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
int32_t code = 0;
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
SSubQueryMsg msg = {0}; SSubQueryMsg msg = {0};
if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) { if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
@ -419,8 +452,8 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t eId = msg.execId; int32_t eId = msg.execId;
QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle); QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
qwAbortPrerocessQuery(QW_FPARAMS()); code = qwAbortPrerocessQuery(QW_FPARAMS());
QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle); QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p, code:%x", pMsg->info.handle, code);
tFreeSSubQueryMsg(&msg); tFreeSSubQueryMsg(&msg);
@ -435,7 +468,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
int32_t code = 0; int32_t code = 0;
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1); QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
SSubQueryMsg msg = {0}; SSubQueryMsg msg = {0};
@ -477,7 +510,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
SQWTaskCtx *handles = NULL; SQWTaskCtx *handles = NULL;
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1); QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
@ -495,9 +528,9 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle); QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle);
QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg)); code = qwProcessCQuery(QW_FPARAMS(), &qwMsg);
QW_SCH_TASK_DLOG("processCQuery end, node:%p", node); QW_SCH_TASK_DLOG("processCQuery end, node:%p, code:0x%x", node, code);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -510,7 +543,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
SResFetchReq req = {0}; SResFetchReq req = {0};
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1); QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
if (tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0) { if (tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
@ -528,9 +561,9 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle); QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg)); int32_t code = qwProcessFetch(QW_FPARAMS(), &qwMsg);
QW_SCH_TASK_DLOG("processFetch end, node:%p", node); QW_SCH_TASK_DLOG("processFetch end, node:%p, code:%x", node, code);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -538,7 +571,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
if (mgmt) { if (mgmt) {
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1); QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1);
} }
@ -557,7 +590,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
int32_t code = 0; int32_t code = 0;
STaskCancelReq *msg = pMsg->pCont; STaskCancelReq *msg = pMsg->pCont;
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1); QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
@ -598,7 +631,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
int32_t code = 0; int32_t code = 0;
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1); QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
STaskDropReq msg = {0}; STaskDropReq msg = {0};
@ -621,9 +654,9 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle); QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle);
QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg)); code = qwProcessDrop(QW_FPARAMS(), &qwMsg);
QW_SCH_TASK_DLOG("processDrop end, node:%p", node); QW_SCH_TASK_DLOG("processDrop end, node:%p, code:%x", node, code);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -636,7 +669,7 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
int32_t code = 0; int32_t code = 0;
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.notifyProcessed, 1); QW_STAT_INC(mgmt->stat.msgStat.notifyProcessed, 1);
STaskNotifyReq msg = {0}; STaskNotifyReq msg = {0};
@ -655,9 +688,9 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
QW_SCH_TASK_DLOG("processNotify start, node:%p, handle:%p", node, pMsg->info.handle); QW_SCH_TASK_DLOG("processNotify start, node:%p, handle:%p", node, pMsg->info.handle);
QW_ERR_RET(qwProcessNotify(QW_FPARAMS(), &qwMsg)); code = qwProcessNotify(QW_FPARAMS(), &qwMsg);
QW_SCH_TASK_DLOG("processNotify end, node:%p", node); QW_SCH_TASK_DLOG("processNotify end, node:%p, code:%x", node, code);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -672,7 +705,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
SSchedulerHbReq req = {0}; SSchedulerHbReq req = {0};
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1); QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1);
if (NULL == pMsg->pCont) { if (NULL == pMsg->pCont) {
@ -694,9 +727,9 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle); QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle);
QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req)); code = qwProcessHb(mgmt, &qwMsg, &req);
QW_SCH_DLOG("processHb end, node:%p", node); QW_SCH_DLOG("processHb end, node:%p, code:%x", node, code);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -712,7 +745,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD
QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1); QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1);
tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req); QW_ERR_RET(tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req));
uint64_t sId = req.sId; uint64_t sId = req.sId;
uint64_t qId = req.queryId; uint64_t qId = req.queryId;

View File

@ -323,34 +323,52 @@ static void freeExplainExecItem(void *param) {
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t code = TSDB_CODE_SUCCESS;
qTaskInfo_t taskHandle = ctx->taskHandle; qTaskInfo_t taskHandle = ctx->taskHandle;
ctx->explainRsped = true; ctx->explainRsped = true;
SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo)); SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList)); if (NULL == execInfoList) {
QW_ERR_JRET(terrno);
}
QW_ERR_JRET(qGetExplainExecInfo(taskHandle, execInfoList));
if (ctx->localExec) { if (ctx->localExec) {
SExplainLocalRsp localRsp = {0}; SExplainLocalRsp localRsp = {0};
localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList); localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList);
SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo)); SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo));
if (NULL == pExec) {
QW_ERR_JRET(terrno);
}
memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo)); memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
localRsp.rsp.subplanInfo = pExec; localRsp.rsp.subplanInfo = pExec;
localRsp.qId = qId; localRsp.qId = qId;
localRsp.tId = tId; localRsp.tId = tId;
localRsp.rId = rId; localRsp.rId = rId;
localRsp.eId = eId; localRsp.eId = eId;
taosArrayPush(ctx->explainRes, &localRsp); if (NULL == taosArrayPush(ctx->explainRes, &localRsp)) {
QW_ERR_JRET(terrno);
}
taosArrayDestroy(execInfoList); taosArrayDestroy(execInfoList);
execInfoList = NULL;
} else { } else {
SRpcHandleInfo connInfo = ctx->ctrlConnInfo; SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
connInfo.ahandle = NULL; connInfo.ahandle = NULL;
int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList); int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
taosArrayDestroyEx(execInfoList, freeExplainExecItem); taosArrayDestroyEx(execInfoList, freeExplainExecItem);
execInfoList = NULL;
QW_ERR_RET(code); QW_ERR_RET(code);
} }
return TSDB_CODE_SUCCESS; _return:
taosArrayDestroyEx(execInfoList, freeExplainExecItem);
return code;
} }
@ -503,14 +521,18 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
*pParam = &gQwMgmt.param[paramIdx]; *pParam = &gQwMgmt.param[paramIdx];
} }
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
STbVerInfo tbInfo; STbVerInfo tbInfo;
int32_t i = 0; int32_t i = 0;
int32_t code = TSDB_CODE_SUCCESS;
bool tbGet = false;
while (true) { while (true) {
if (qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &tbInfo.sversion, &tbInfo.tversion, i) < 0) { tbGet = false;
code = qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &tbInfo.sversion, &tbInfo.tversion, i, &tbGet);
if (TSDB_CODE_SUCCESS != code || !tbGet) {
break; break;
} }
@ -522,18 +544,25 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
if (NULL == ctx->tbInfo) { if (NULL == ctx->tbInfo) {
ctx->tbInfo = taosArrayInit(1, sizeof(tbInfo)); ctx->tbInfo = taosArrayInit(1, sizeof(tbInfo));
if (NULL == ctx->tbInfo) {
QW_ERR_RET(terrno);
}
} }
taosArrayPush(ctx->tbInfo, &tbInfo); if (NULL == taosArrayPush(ctx->tbInfo, &tbInfo)) {
QW_ERR_RET(terrno);
}
i++; i++;
} }
QW_RET(code);
} }
void qwCloseRef(void) { void qwCloseRef(void) {
taosWLockLatch(&gQwMgmt.lock); taosWLockLatch(&gQwMgmt.lock);
if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) { if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
taosCloseRef(gQwMgmt.qwRef); (void)taosCloseRef(gQwMgmt.qwRef); // ignore error
gQwMgmt.qwRef = -1; gQwMgmt.qwRef = -1;
} }
taosWUnLockLatch(&gQwMgmt.lock); taosWUnLockLatch(&gQwMgmt.lock);
@ -550,7 +579,7 @@ void qwDestroyImpl(void *pMgmt) {
int32_t schStatusCount = 0; int32_t schStatusCount = 0;
qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
taosTmrStop(mgmt->hbTimer); (void)taosTmrStop(mgmt->hbTimer); //ignore error
mgmt->hbTimer = NULL; mgmt->hbTimer = NULL;
taosTmrCleanUp(mgmt->timer); taosTmrCleanUp(mgmt->timer);
@ -641,24 +670,33 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
return pStat->num ? (pStat->total / pStat->num) : 0; return pStat->num ? (pStat->total / pStat->num) : 0;
default: default:
qError("unsupported queue type %d", type); qError("unsupported queue type %d", type);
break;
} }
return -1; return -1;
} }
void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) { void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t num = taosArrayGetSize(pExpiredSch); int32_t num = taosArrayGetSize(pExpiredSch);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
uint64_t *sId = taosArrayGet(pExpiredSch, i); uint64_t *sId = taosArrayGet(pExpiredSch, i);
SQWSchStatus *pSch = NULL; SQWSchStatus *pSch = NULL;
if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) { if (NULL == sId) {
qError("get the %dth sch failed, code:%x", i, terrno);
break;
}
code = qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch);
if (TSDB_CODE_SUCCESS != code) {
qError("acquire sch %" PRIx64 " failed, code:%x", *sId, code);
continue; continue;
} }
if (taosHashGetSize(pSch->tasksHash) <= 0) { if (taosHashGetSize(pSch->tasksHash) <= 0) {
qwDestroySchStatus(pSch); qwDestroySchStatus(pSch);
taosHashRemove(mgmt->schHash, sId, sizeof(*sId)); code = taosHashRemove(mgmt->schHash, sId, sizeof(*sId));
qDebug("sch %" PRIx64 " destroyed", *sId); qDebug("sch %" PRIx64 " destroy result code:%x", *sId, code);
} }
qwReleaseScheduler(QW_WRITE, mgmt); qwReleaseScheduler(QW_WRITE, mgmt);

View File

@ -18,10 +18,11 @@ SQWorkerMgmt gQwMgmt = {
.qwNum = 0, .qwNum = 0,
}; };
int32_t qwStopAllTasks(SQWorker *mgmt) { void qwStopAllTasks(SQWorker *mgmt) {
uint64_t qId, tId, sId; uint64_t qId, tId, sId;
int32_t eId; int32_t eId;
int64_t rId = 0; int64_t rId = 0;
int32_t code = TSDB_CODE_SUCCESS;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL); void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) { while (pIter) {
@ -44,22 +45,29 @@ int32_t qwStopAllTasks(SQWorker *mgmt) {
} }
if (QW_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); code = qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task running, async kill failed, error: %x", code);
} else {
QW_TASK_DLOG_E("task running, async killed"); QW_TASK_DLOG_E("task running, async killed");
}
} else if (QW_FETCH_RUNNING(ctx)) { } else if (QW_FETCH_RUNNING(ctx)) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED); QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
QW_TASK_DLOG_E("task fetching, update drop received"); QW_TASK_DLOG_E("task fetching, update drop received");
} else { } else {
qwDropTask(QW_FPARAMS()); code = qwDropTask(QW_FPARAMS());
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task drop failed, error: %x", code);
} else {
QW_TASK_DLOG_E("task dropped");
}
} }
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter); pIter = taosHashIterate(mgmt->ctxHash, pIter);
} }
return TSDB_CODE_SUCCESS;
} }
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
@ -111,7 +119,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t rspCode, bool quickRsp) { int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t rspCode, bool quickRsp) {
if ((!quickRsp) || QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy) { if ((!quickRsp) || QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy) {
if (!ctx->localExec) { if (!ctx->localExec) {
qwBuildAndSendQueryRsp(msgType, &ctx->ctrlConnInfo, rspCode, ctx); QW_ERR_RET(qwBuildAndSendQueryRsp(msgType, &ctx->ctrlConnInfo, rspCode, ctx));
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, rspCode, tstrerror(rspCode)); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, rspCode, tstrerror(rspCode));
} }
@ -140,6 +148,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
} }
SArray *pResList = taosArrayInit(4, POINTER_BYTES); SArray *pResList = taosArrayInit(4, POINTER_BYTES);
if (NULL == pResList) {
QW_ERR_RET(terrno);
}
while (true) { while (true) {
QW_TASK_DLOG("start to execTask, loopIdx:%d", i++); QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
@ -165,6 +177,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
size_t numOfResBlock = taosArrayGetSize(pResList); size_t numOfResBlock = taosArrayGetSize(pResList);
for (int32_t j = 0; j < numOfResBlock; ++j) { for (int32_t j = 0; j < numOfResBlock; ++j) {
SSDataBlock *pRes = taosArrayGetP(pResList, j); SSDataBlock *pRes = taosArrayGetP(pResList, j);
if (NULL == pRes) {
QW_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SInputData inputData = {.pData = pRes}; SInputData inputData = {.pData = pRes};
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
@ -226,7 +241,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
} }
_return: _return:
taosArrayDestroy(pResList); taosArrayDestroy(pResList);
QW_RET(code); QW_RET(code);
} }
@ -241,6 +258,7 @@ bool qwTaskNotInExec(SQWTaskCtx *ctx) {
int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) { int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
int32_t taskNum = 0; int32_t taskNum = 0;
int32_t code = TSDB_CODE_SUCCESS;
hbInfo->connInfo = sch->hbConnInfo; hbInfo->connInfo = sch->hbConnInfo;
hbInfo->rsp.epId = sch->hbEpId; hbInfo->rsp.epId = sch->hbEpId;
@ -272,7 +290,11 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
status.status = taskStatus->status; status.status = taskStatus->status;
status.refId = taskStatus->refId; status.refId = taskStatus->refId;
taosArrayPush(hbInfo->rsp.taskStatus, &status); if (NULL == taosArrayPush(hbInfo->rsp.taskStatus, &status)) {
taosHashCancelIterate(sch->tasksHash, pIter);
code = terrno;
break;
}
++i; ++i;
pIter = taosHashIterate(sch->tasksHash, pIter); pIter = taosHashIterate(sch->tasksHash, pIter);
@ -280,7 +302,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
QW_UNLOCK(QW_READ, &sch->tasksLock); QW_UNLOCK(QW_READ, &sch->tasksLock);
return TSDB_CODE_SUCCESS; return code;
} }
int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, int32_t *pRawDataLen, void **rspMsg, int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, int32_t *pRawDataLen, void **rspMsg,
@ -320,7 +342,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
pOutput->numOfRows); pOutput->numOfRows);
if (!ctx->dynamicTask) { if (!ctx->dynamicTask) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
} }
if (NULL == pRsp) { if (NULL == pRsp) {
@ -375,7 +397,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks, QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
pOutput->numOfRows); pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
break; break;
} }
@ -464,7 +486,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32
qwMsg->connInfo = ctx->dataConnInfo; qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code); QW_ERR_RET(qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code));
rsp = NULL; rsp = NULL;
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
@ -650,7 +672,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
_return: _return:
if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) { if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC, ctx->dynamicTask); code = qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC, ctx->dynamicTask);
ctx->queryGotData = true; ctx->queryGotData = true;
} }
@ -660,7 +682,10 @@ _return:
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
if (!rsped) { if (!rsped) {
qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false); int32_t newCode = qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false);
if (TSDB_CODE_SUCCESS != newCode && TSDB_CODE_SUCCESS == code) {
code = newCode;
}
} }
} }
@ -672,7 +697,7 @@ _return:
} }
if (code) { if (code) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // already in error, ignore new error
} }
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
@ -687,11 +712,11 @@ _return:
int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) { int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
QW_ERR_RET(qwDropTask(QW_FPARAMS())); QW_ERR_RET(qwDropTask(QW_FPARAMS()));
QW_RET(TSDB_CODE_SUCCESS); return TSDB_CODE_SUCCESS;
} }
int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0; int32_t code = TSDB_CODE_SUCCESS;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
@ -706,7 +731,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true); QW_ERR_JRET(qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true));
_return: _return:
@ -715,7 +740,7 @@ _return:
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
} }
QW_RET(TSDB_CODE_SUCCESS); return TSDB_CODE_SUCCESS;
} }
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
@ -761,7 +786,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle); atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
qwSaveTbVersionInfo(pTaskInfo, ctx); QW_ERR_JRET(qwSaveTbVersionInfo(pTaskInfo, ctx));
if (!ctx->dynamicTask) { if (!ctx->dynamicTask) {
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
@ -778,7 +803,7 @@ _return:
input.msgType = qwMsg->msgType; input.msgType = qwMsg->msgType;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code); QW_ERR_RET(qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code));
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
} }
@ -829,7 +854,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
qwMsg->connInfo = ctx->dataConnInfo; qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code); QW_ERR_JRET(qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code));
rsp = NULL; rsp = NULL;
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
@ -851,10 +876,15 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
rsp = NULL; rsp = NULL;
qwMsg->connInfo = ctx->dataConnInfo; qwMsg->connInfo = ctx->dataConnInfo;
qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code); code = qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("fetch rsp send fail, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
0);
} else {
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
0); 0);
} }
}
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
if (atomic_load_8((int8_t *)&ctx->queryEnd) || (queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || if (atomic_load_8((int8_t *)&ctx->queryEnd) || (queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) ||
@ -869,7 +899,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} while (true); } while (true);
input.code = code; input.code = code;
qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL); QW_ERR_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL));
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
} }
@ -922,7 +952,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} else if (QW_QUERY_RUNNING(ctx)) { } else if (QW_QUERY_RUNNING(ctx)) {
atomic_store_8((int8_t *)&ctx->queryContinue, 1); atomic_store_8((int8_t *)&ctx->queryContinue, 1);
} else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) { } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask));
atomic_store_8((int8_t *)&ctx->queryInQueue, 1); atomic_store_8((int8_t *)&ctx->queryInQueue, 1);
QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo)); QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
@ -952,9 +982,14 @@ _return:
} }
if (!rsped) { if (!rsped) {
qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); code = qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("fetch rsp send fail, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1),
qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
} else {
QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1),
qwMsg->connInfo.handle, code, tstrerror(code), dataLen); qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
}
} else { } else {
qwFreeFetchRsp(rsp); qwFreeFetchRsp(rsp);
rsp = NULL; rsp = NULL;
@ -985,7 +1020,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (QW_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED)); QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP, ctx->dynamicTask); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP, ctx->dynamicTask));
} else { } else {
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropped = true; dropped = true;
@ -1001,7 +1036,7 @@ _return:
if (code) { if (code) {
if (ctx) { if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // task already failed, no more error handling
} else { } else {
tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER);
} }
@ -1035,7 +1070,7 @@ int32_t qwProcessNotify(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (QW_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED)); QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
} }
switch (qwMsg->msgType) { switch (qwMsg->msgType) {
@ -1055,7 +1090,7 @@ _return:
if (code) { if (code) {
if (ctx) { if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // task already failed, no more error handling
} }
} }
@ -1104,7 +1139,7 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
_return: _return:
memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); memcpy(&rsp.epId, &req->epId, sizeof(req->epId));
qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); code = qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
if (code) { if (code) {
tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER);
@ -1125,7 +1160,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
int64_t refId = hbParam->refId; int64_t refId = hbParam->refId;
SQWorker *mgmt = qwAcquire(refId); SQWorker *mgmt = qwAcquire(refId);
if (NULL == mgmt) { if (NULL == mgmt) {
QW_DLOG("qwAcquire %" PRIx64 "failed", refId); QW_DLOG("qwAcquire %" PRIx64 "failed, code:0x%x", refId, terrno);
return; return;
} }
@ -1137,7 +1172,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
qwDbgDumpMgmtInfo(mgmt); qwDbgDumpMgmtInfo(mgmt);
if (gQWDebug.forceStop) { if (gQWDebug.forceStop) {
(void)qwStopAllTasks(mgmt); qwStopAllTasks(mgmt);
} }
QW_LOCK(QW_READ, &mgmt->schLock); QW_LOCK(QW_READ, &mgmt->schLock);
@ -1145,8 +1180,8 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
int32_t schNum = taosHashGetSize(mgmt->schHash); int32_t schNum = taosHashGetSize(mgmt->schHash);
if (schNum <= 0) { if (schNum <= 0) {
QW_UNLOCK(QW_READ, &mgmt->schLock); QW_UNLOCK(QW_READ, &mgmt->schLock);
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); (void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error
qwRelease(refId); (void)qwRelease(refId); // ignore error
return; return;
} }
@ -1156,9 +1191,9 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
QW_UNLOCK(QW_READ, &mgmt->schLock); QW_UNLOCK(QW_READ, &mgmt->schLock);
taosMemoryFree(rspList); taosMemoryFree(rspList);
taosArrayDestroy(pExpiredSch); taosArrayDestroy(pExpiredSch);
QW_ELOG("calloc %d SQWHbInfo failed", schNum); QW_ELOG("calloc %d SQWHbInfo failed, code:%x", schNum, terrno);
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); (void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error
qwRelease(refId); (void)qwRelease(refId); // ignore error
return; return;
} }
@ -1174,7 +1209,11 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) &&
taosHashGetSize(sch1->tasksHash) <= 0) { taosHashGetSize(sch1->tasksHash) <= 0) {
taosArrayPush(pExpiredSch, sId); if (NULL == taosArrayPush(pExpiredSch, sId)) {
QW_ELOG("add sId 0x%" PRIx64 " to expiredSch failed, code:%x", *sId, terrno);
taosHashCancelIterate(mgmt->schHash, pIter);
break;
}
} }
pIter = taosHashIterate(mgmt->schHash, pIter); pIter = taosHashIterate(mgmt->schHash, pIter);
@ -1196,7 +1235,7 @@ _return:
QW_UNLOCK(QW_READ, &mgmt->schLock); QW_UNLOCK(QW_READ, &mgmt->schLock);
for (int32_t j = 0; j < i; ++j) { for (int32_t j = 0; j < i; ++j) {
qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); (void)qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); // ignore error
/*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/ /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/
/*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/ /*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/
tFreeSSchedulerHbRsp(&rspList[j].rsp); tFreeSSchedulerHbRsp(&rspList[j].rsp);
@ -1209,8 +1248,8 @@ _return:
taosMemoryFreeClear(rspList); taosMemoryFreeClear(rspList);
taosArrayDestroy(pExpiredSch); taosArrayDestroy(pExpiredSch);
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); (void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error
qwRelease(refId); (void)qwRelease(refId); // ignore error
} }
int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
@ -1333,7 +1372,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
_return: _return:
if (mgmt->refId >= 0) { if (mgmt->refId >= 0) {
qwRelease(mgmt->refId); (void)qwRelease(mgmt->refId); // ignore error
} else { } else {
taosHashCleanup(mgmt->schHash); taosHashCleanup(mgmt->schHash);
taosHashCleanup(mgmt->ctxHash); taosHashCleanup(mgmt->ctxHash);
@ -1353,7 +1392,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
atomic_store_8(&mgmt->nodeStopped, 1); atomic_store_8(&mgmt->nodeStopped, 1);
(void)qwStopAllTasks(mgmt); qwStopAllTasks(mgmt);
} }
void qWorkerDestroy(void **qWorkerMgmt) { void qWorkerDestroy(void **qWorkerMgmt) {
@ -1383,7 +1422,7 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
SDataSinkStat sinkStat = {0}; SDataSinkStat sinkStat = {0};
dsDataSinkGetCacheSize(&sinkStat); QW_ERR_RET(dsDataSinkGetCacheSize(&sinkStat));
pStat->cacheDataSize = sinkStat.cachedSize; pStat->cacheDataSize = sinkStat.cachedSize;
pStat->queryProcessed = QW_STAT_GET(mgmt->stat.msgStat.queryProcessed); pStat->queryProcessed = QW_STAT_GET(mgmt->stat.msgStat.queryProcessed);
@ -1427,6 +1466,10 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64
ctx->explainRes = explainRes; ctx->explainRes = explainRes;
rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb)); rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb));
if (NULL == rHandle.pMsgCb) {
QW_ERR_JRET(terrno);
}
rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle; rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle;
code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH); code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH);

View File

@ -130,30 +130,32 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
fetchRpc->contLen = sizeof(SResFetchReq); fetchRpc->contLen = sizeof(SResFetchReq);
} }
void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { int qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
dropMsg->sId = 1; dropMsg->sId = 1;
dropMsg->queryId = atomic_load_64(&qwtTestQueryId); dropMsg->queryId = atomic_load_64(&qwtTestQueryId);
dropMsg->taskId = 1; dropMsg->taskId = 1;
int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, dropMsg); int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, dropMsg);
if (msgSize < 0) { if (msgSize < 0) {
return; return terrno;
} }
char *msg = (char*)taosMemoryCalloc(1, msgSize); char *msg = (char*)taosMemoryCalloc(1, msgSize);
if (NULL == msg) { if (NULL == msg) {
return; return terrno;
} }
if (tSerializeSTaskDropReq(msg, msgSize, dropMsg) < 0) { if (tSerializeSTaskDropReq(msg, msgSize, dropMsg) < 0) {
taosMemoryFree(msg); taosMemoryFree(msg);
return; return terrno;
} }
dropRpc->msgType = TDMT_SCH_DROP_TASK; dropRpc->msgType = TDMT_SCH_DROP_TASK;
dropRpc->pCont = msg; dropRpc->pCont = msg;
dropRpc->contLen = msgSize; dropRpc->contLen = msgSize;
return TSDB_CODE_SUCCESS;
} }
int32_t qwtStringToPlan(const char *str, SSubplan **subplan) { int32_t qwtStringToPlan(const char *str, SSubplan **subplan) {
@ -164,6 +166,10 @@ int32_t qwtStringToPlan(const char *str, SSubplan **subplan) {
int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
taosWLockLatch(&qwtTestFetchQueueLock); taosWLockLatch(&qwtTestFetchQueueLock);
struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg)); struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg));
if (NULL == newMsg) {
printf("malloc failed");
assert(0);
}
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg; qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg;
if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) { if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) {
@ -178,7 +184,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
} }
taosWUnLockLatch(&qwtTestFetchQueueLock); taosWUnLockLatch(&qwtTestFetchQueueLock);
tsem_post(&qwtTestFetchSem); if (tsem_post(&qwtTestFetchSem) < 0) {
printf("tsem_post failed, errno:%d", errno);
assert(0);
}
return 0; return 0;
} }
@ -186,6 +195,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) { int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) {
taosWLockLatch(&qwtTestQueryQueueLock); taosWLockLatch(&qwtTestQueryQueueLock);
struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg)); struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg));
if (NULL == newMsg) {
printf("malloc failed");
assert(0);
}
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg; qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg;
if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) { if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) {
@ -200,22 +213,34 @@ int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) {
} }
taosWUnLockLatch(&qwtTestQueryQueueLock); taosWUnLockLatch(&qwtTestQueryQueueLock);
tsem_post(&qwtTestQuerySem); if (tsem_post(&qwtTestQuerySem) < 0) {
printf("tsem_post failed, errno:%d", errno);
assert(0);
}
return 0; return 0;
} }
void qwtSendReqToDnode(void *pVnode, struct SEpSet *epSet, struct SRpcMsg *pReq) {} void qwtSendReqToDnode(void *pVnode, struct SEpSet *epSet, struct SRpcMsg *pReq) {}
void qwtRpcSendResponse(const SRpcMsg *pRsp) { int qwtRpcSendResponse(const SRpcMsg *pRsp) {
int32_t code = 0;
switch (pRsp->msgType) { switch (pRsp->msgType) {
case TDMT_SCH_QUERY_RSP: case TDMT_SCH_QUERY_RSP:
case TDMT_SCH_MERGE_QUERY_RSP: { case TDMT_SCH_MERGE_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont; SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont;
if (pRsp->code) { if (pRsp->code) {
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); if (code) {
assert(0);
return code;
}
code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
if (code) {
assert(0);
return code;
}
} }
rpcFreeCont(rsp); rpcFreeCont(rsp);
@ -227,13 +252,25 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
if (0 == pRsp->code && 0 == rsp->completed) { if (0 == pRsp->code && 0 == rsp->completed) {
qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc); qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc); code = qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
if (code) {
assert(0);
return code;
}
rpcFreeCont(rsp); rpcFreeCont(rsp);
return; return code;
} }
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); if (code) {
assert(0);
return code;
}
code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
if (code) {
assert(0);
return code;
}
rpcFreeCont(rsp); rpcFreeCont(rsp);
break; break;
@ -245,9 +282,11 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
qwtTestCaseFinished = true; qwtTestCaseFinished = true;
break; break;
} }
default:
break;
} }
return; return code;
} }
int32_t qwtCreateExecTask(void *tsdb, int32_t vgId, uint64_t taskId, struct SSubplan *pPlan, qTaskInfo_t *pTaskInfo, int32_t qwtCreateExecTask(void *tsdb, int32_t vgId, uint64_t taskId, struct SSubplan *pPlan, qTaskInfo_t *pTaskInfo,
@ -292,6 +331,9 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) {
if (endExec) { if (endExec) {
*pRes = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock)); *pRes = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock));
if (NULL == *pRes) {
return terrno;
}
(*pRes)->info.rows = taosRand() % 1000 + 1; (*pRes)->info.rows = taosRand() % 1000 + 1;
} else { } else {
*pRes = NULL; *pRes = NULL;
@ -631,7 +673,7 @@ void *queryThread(void *param) {
while (!qwtTestStop) { while (!qwtTestStop) {
qwtBuildQueryReqMsg(&queryRpc); qwtBuildQueryReqMsg(&queryRpc);
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); (void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); // ignore error
if (qwtTestEnableSleep) { if (qwtTestEnableSleep) {
taosUsleep(taosRand() % 5); taosUsleep(taosRand() % 5);
} }
@ -653,7 +695,7 @@ void *fetchThread(void *param) {
while (!qwtTestStop) { while (!qwtTestStop) {
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); (void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); // ignore error
if (qwtTestEnableSleep) { if (qwtTestEnableSleep) {
taosUsleep(taosRand() % 5); taosUsleep(taosRand() % 5);
} }
@ -674,8 +716,11 @@ void *dropThread(void *param) {
STaskDropReq dropMsg = {0}; STaskDropReq dropMsg = {0};
while (!qwtTestStop) { while (!qwtTestStop) {
qwtBuildDropReqMsg(&dropMsg, &dropRpc); if (0 != qwtBuildDropReqMsg(&dropMsg, &dropRpc)) {
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); break;
}
(void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); // ignore error
if (qwtTestEnableSleep) { if (qwtTestEnableSleep) {
taosUsleep(taosRand() % 5); taosUsleep(taosRand() % 5);
} }
@ -700,7 +745,7 @@ void *qwtclientThread(void *param) {
qwtTestCaseFinished = false; qwtTestCaseFinished = false;
qwtBuildQueryReqMsg(&queryRpc); qwtBuildQueryReqMsg(&queryRpc);
qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc); (void)qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc); //ignore error
while (!qwtTestCaseFinished) { while (!qwtTestCaseFinished) {
taosUsleep(1); taosUsleep(1);
@ -752,9 +797,9 @@ void *queryQueueThread(void *param) {
} }
if (TDMT_SCH_QUERY == queryRpc->msgType) { if (TDMT_SCH_QUERY == queryRpc->msgType) {
qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0); (void)qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error
} else if (TDMT_SCH_QUERY_CONTINUE == queryRpc->msgType) { } else if (TDMT_SCH_QUERY_CONTINUE == queryRpc->msgType) {
qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0); (void)qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error
} else { } else {
printf("unknown msg in query queue, type:%d\n", queryRpc->msgType); printf("unknown msg in query queue, type:%d\n", queryRpc->msgType);
assert(0); assert(0);
@ -810,16 +855,16 @@ void *fetchQueueThread(void *param) {
switch (fetchRpc->msgType) { switch (fetchRpc->msgType) {
case TDMT_SCH_FETCH: case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH: case TDMT_SCH_MERGE_FETCH:
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); (void)qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error
break; break;
case TDMT_SCH_CANCEL_TASK: case TDMT_SCH_CANCEL_TASK:
//qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0); //qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0);
break; break;
case TDMT_SCH_DROP_TASK: case TDMT_SCH_DROP_TASK:
qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0); (void)qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error
break; break;
case TDMT_SCH_TASK_NOTIFY: case TDMT_SCH_TASK_NOTIFY:
qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0); (void)qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error
break; break;
default: default:
printf("unknown msg type:%d in fetch queue", fetchRpc->msgType); printf("unknown msg type:%d in fetch queue", fetchRpc->msgType);
@ -853,7 +898,7 @@ TEST(seqTest, normalCase) {
qwtBuildQueryReqMsg(&queryRpc); qwtBuildQueryReqMsg(&queryRpc);
qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc); qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc);
qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); (void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error
stubSetStringToPlan(); stubSetStringToPlan();
stubSetRpcSendResponse(); stubSetRpcSendResponse();
@ -898,7 +943,7 @@ TEST(seqTest, cancelFirst) {
qwtInitLogFile(); qwtInitLogFile();
qwtBuildQueryReqMsg(&queryRpc); qwtBuildQueryReqMsg(&queryRpc);
qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); (void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error
stubSetStringToPlan(); stubSetStringToPlan();
stubSetRpcSendResponse(); stubSetRpcSendResponse();
@ -954,7 +999,7 @@ TEST(seqTest, randCase) {
if (r >= 0 && r < maxr / 5) { if (r >= 0 && r < maxr / 5) {
printf("Query,%d\n", t++); printf("Query,%d\n", t++);
qwtBuildQueryReqMsg(&queryRpc); qwtBuildQueryReqMsg(&queryRpc);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); (void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); //ignore error
} else if (r >= maxr / 5 && r < maxr * 2 / 5) { } else if (r >= maxr / 5 && r < maxr * 2 / 5) {
// printf("Ready,%d\n", t++); // printf("Ready,%d\n", t++);
// qwtBuildReadyReqMsg(&readyMsg, &readyRpc); // qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
@ -965,14 +1010,14 @@ TEST(seqTest, randCase) {
} else if (r >= maxr * 2 / 5 && r < maxr * 3 / 5) { } else if (r >= maxr * 2 / 5 && r < maxr * 3 / 5) {
printf("Fetch,%d\n", t++); printf("Fetch,%d\n", t++);
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); (void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); //ignore error
if (qwtTestEnableSleep) { if (qwtTestEnableSleep) {
taosUsleep(1); taosUsleep(1);
} }
} else if (r >= maxr * 3 / 5 && r < maxr * 4 / 5) { } else if (r >= maxr * 3 / 5 && r < maxr * 4 / 5) {
printf("Drop,%d\n", t++); printf("Drop,%d\n", t++);
qwtBuildDropReqMsg(&dropMsg, &dropRpc); (void)qwtBuildDropReqMsg(&dropMsg, &dropRpc); //ignore error
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); (void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); //ignore error
if (qwtTestEnableSleep) { if (qwtTestEnableSleep) {
taosUsleep(1); taosUsleep(1);
} }
@ -1018,14 +1063,14 @@ TEST(seqTest, multithreadRand) {
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
TdThreadAttr thattr; TdThreadAttr thattr;
taosThreadAttrInit(&thattr); (void)taosThreadAttrInit(&thattr); //ignore error
TdThread t1, t2, t3, t4, t5, t6; TdThread t1, t2, t3, t4, t5, t6;
taosThreadCreate(&(t1), &thattr, queryThread, mgmt); (void)taosThreadCreate(&(t1), &thattr, queryThread, mgmt); //ignore error
// taosThreadCreate(&(t2), &thattr, readyThread, NULL); // (void)taosThreadCreate(&(t2), &thattr, readyThread, NULL); //ignore error
taosThreadCreate(&(t3), &thattr, fetchThread, NULL); (void)taosThreadCreate(&(t3), &thattr, fetchThread, NULL); //ignore error
taosThreadCreate(&(t4), &thattr, dropThread, NULL); (void)taosThreadCreate(&(t4), &thattr, dropThread, NULL); //ignore error
taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt); (void)taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt); //ignore error
while (true) { while (true) {
if (qwtTestDeadLoop) { if (qwtTestDeadLoop) {
@ -1083,16 +1128,16 @@ TEST(rcTest, shortExecshortDelay) {
qwtTestMaxExecTaskUsec = 0; qwtTestMaxExecTaskUsec = 0;
qwtTestReqMaxDelayUsec = 0; qwtTestReqMaxDelayUsec = 0;
tsem_init(&qwtTestQuerySem, 0, 0); (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
tsem_init(&qwtTestFetchSem, 0, 0); (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
TdThreadAttr thattr; TdThreadAttr thattr;
taosThreadAttrInit(&thattr); (void)taosThreadAttrInit(&thattr); //ignore error
TdThread t1, t2, t3, t4, t5; TdThread t1, t2, t3, t4, t5;
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
while (true) { while (true) {
if (qwtTestDeadLoop) { if (qwtTestDeadLoop) {
@ -1114,8 +1159,8 @@ TEST(rcTest, shortExecshortDelay) {
if (qwtTestCaseFinished) { if (qwtTestCaseFinished) {
if (qwtTestQuitThreadNum < 3) { if (qwtTestQuitThreadNum < 3) {
tsem_post(&qwtTestQuerySem); (void)tsem_post(&qwtTestQuerySem); //ignore error
tsem_post(&qwtTestFetchSem); (void)tsem_post(&qwtTestFetchSem); //ignore error
taosUsleep(10); taosUsleep(10);
} }
@ -1166,16 +1211,16 @@ TEST(rcTest, longExecshortDelay) {
qwtTestMaxExecTaskUsec = 1000000; qwtTestMaxExecTaskUsec = 1000000;
qwtTestReqMaxDelayUsec = 0; qwtTestReqMaxDelayUsec = 0;
tsem_init(&qwtTestQuerySem, 0, 0); (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
tsem_init(&qwtTestFetchSem, 0, 0); (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
TdThreadAttr thattr; TdThreadAttr thattr;
taosThreadAttrInit(&thattr); (void)taosThreadAttrInit(&thattr); //ignore error
TdThread t1, t2, t3, t4, t5; TdThread t1, t2, t3, t4, t5;
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
while (true) { while (true) {
if (qwtTestDeadLoop) { if (qwtTestDeadLoop) {
@ -1197,8 +1242,8 @@ TEST(rcTest, longExecshortDelay) {
if (qwtTestCaseFinished) { if (qwtTestCaseFinished) {
if (qwtTestQuitThreadNum < 3) { if (qwtTestQuitThreadNum < 3) {
tsem_post(&qwtTestQuerySem); (void)tsem_post(&qwtTestQuerySem); //ignore error
tsem_post(&qwtTestFetchSem); (void)tsem_post(&qwtTestFetchSem); //ignore error
taosUsleep(10); taosUsleep(10);
} }
@ -1249,16 +1294,16 @@ TEST(rcTest, shortExeclongDelay) {
qwtTestMaxExecTaskUsec = 0; qwtTestMaxExecTaskUsec = 0;
qwtTestReqMaxDelayUsec = 1000000; qwtTestReqMaxDelayUsec = 1000000;
tsem_init(&qwtTestQuerySem, 0, 0); (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
tsem_init(&qwtTestFetchSem, 0, 0); (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
TdThreadAttr thattr; TdThreadAttr thattr;
taosThreadAttrInit(&thattr); (void)taosThreadAttrInit(&thattr); //ignore error
TdThread t1, t2, t3, t4, t5; TdThread t1, t2, t3, t4, t5;
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
while (true) { while (true) {
if (qwtTestDeadLoop) { if (qwtTestDeadLoop) {
@ -1280,8 +1325,8 @@ TEST(rcTest, shortExeclongDelay) {
if (qwtTestCaseFinished) { if (qwtTestCaseFinished) {
if (qwtTestQuitThreadNum < 3) { if (qwtTestQuitThreadNum < 3) {
tsem_post(&qwtTestQuerySem); (void)tsem_post(&qwtTestQuerySem); //ignore error
tsem_post(&qwtTestFetchSem); (void)tsem_post(&qwtTestFetchSem); //ignore error
taosUsleep(10); taosUsleep(10);
} }
@ -1327,16 +1372,16 @@ TEST(rcTest, dropTest) {
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
tsem_init(&qwtTestQuerySem, 0, 0); (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
tsem_init(&qwtTestFetchSem, 0, 0); (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
TdThreadAttr thattr; TdThreadAttr thattr;
taosThreadAttrInit(&thattr); (void)taosThreadAttrInit(&thattr); //ignore error
TdThread t1, t2, t3, t4, t5; TdThread t1, t2, t3, t4, t5;
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
while (true) { while (true) {
if (qwtTestDeadLoop) { if (qwtTestDeadLoop) {

View File

@ -1654,6 +1654,12 @@ static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) {
(!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) { (!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
if (nodesIsMatchRegularOp(pOp)) {
SValueNode* node = (SValueNode*)(pOp->pRight);
if(checkRegexPattern(node->literal) != TSDB_CODE_SUCCESS){
return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR;
}
}
} }
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;

View File

@ -1675,10 +1675,8 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
pRes[i] = false; pRes[i] = false;
continue; continue;
} }
char *pLeftData = colDataGetData(pLeft->columnData, leftIndex); char *pLeftData = colDataGetData(pLeft->columnData, leftIndex);
char *pRightData = colDataGetData(pRight->columnData, rightIndex); char *pRightData = colDataGetData(pRight->columnData, rightIndex);
pRes[i] = filterDoCompare(fp, optr, pLeftData, pRightData); pRes[i] = filterDoCompare(fp, optr, pLeftData, pRightData);
if (pRes[i]) { if (pRes[i]) {
++num; ++num;
@ -1714,7 +1712,6 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
if (!pLeftData || !pRightData) { if (!pLeftData || !pRightData) {
result = false; result = false;
} }
if (!result) { if (!result) {
colDataSetInt8(pOut->columnData, i, (int8_t *)&result); colDataSetInt8(pOut->columnData, i, (int8_t *)&result);
} else { } else {

View File

@ -43,6 +43,7 @@
#include "tglobal.h" #include "tglobal.h"
#include "tlog.h" #include "tlog.h"
#include "tvariant.h" #include "tvariant.h"
#include "tcompare.h"
#define _DEBUG_PRINT_ 0 #define _DEBUG_PRINT_ 0
@ -52,6 +53,12 @@
#define PRINTF(...) #define PRINTF(...)
#endif #endif
class constantTest {
public:
constantTest() { InitRegexCache(); }
~constantTest() { DestroyRegexCache(); }
};
static constantTest test;
namespace { namespace {
SColumnInfo createColumnInfo(int32_t colId, int32_t type, int32_t bytes) { SColumnInfo createColumnInfo(int32_t colId, int32_t type, int32_t bytes) {

View File

@ -1059,10 +1059,8 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
taosThreadMutexDestroy(&pInfo->lock); taosThreadMutexDestroy(&pInfo->lock);
taosArrayDestroy(pInfo->pDispatchTriggerList); taosArrayDestroy(pInfo->pDispatchTriggerList);
pInfo->pDispatchTriggerList = NULL; pInfo->pDispatchTriggerList = NULL;
taosArrayDestroy(pInfo->pReadyMsgList); taosArrayDestroy(pInfo->pReadyMsgList);
pInfo->pReadyMsgList = NULL; pInfo->pReadyMsgList = NULL;
taosArrayDestroy(pInfo->pCheckpointReadyRecvList); taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
pInfo->pCheckpointReadyRecvList = NULL; pInfo->pCheckpointReadyRecvList = NULL;

View File

@ -34,14 +34,12 @@ SArray* taosArrayInit(size_t size, size_t elemSize) {
SArray* pArray = taosMemoryMalloc(sizeof(SArray)); SArray* pArray = taosMemoryMalloc(sizeof(SArray));
if (pArray == NULL) { if (pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pArray->size = 0; pArray->size = 0;
pArray->pData = taosMemoryCalloc(size, elemSize); pArray->pData = taosMemoryCalloc(size, elemSize);
if (pArray->pData == NULL) { if (pArray->pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pArray); taosMemoryFree(pArray);
return NULL; return NULL;
} }

View File

@ -24,6 +24,7 @@
#include "tutil.h" #include "tutil.h"
#include "types.h" #include "types.h"
#include "osString.h" #include "osString.h"
#include "ttimer.h"
int32_t setChkInBytes1(const void *pLeft, const void *pRight) { int32_t setChkInBytes1(const void *pLeft, const void *pRight) {
return NULL != taosHashGet((SHashObj *)pRight, pLeft, 1) ? 1 : 0; return NULL != taosHashGet((SHashObj *)pRight, pLeft, 1) ? 1 : 0;
@ -1203,54 +1204,153 @@ int32_t comparestrRegexNMatch(const void *pLeft, const void *pRight) {
return comparestrRegexMatch(pLeft, pRight) ? 0 : 1; return comparestrRegexMatch(pLeft, pRight) ? 0 : 1;
} }
static threadlocal regex_t pRegex; typedef struct UsingRegex {
static threadlocal char *pOldPattern = NULL; regex_t pRegex;
static regex_t *threadGetRegComp(const char *pPattern) { int32_t lastUsedTime;
if (NULL != pOldPattern) { } UsingRegex;
if( strcmp(pOldPattern, pPattern) == 0) {
return &pRegex; typedef struct RegexCache {
} else { SHashObj *regexHash;
DestoryThreadLocalRegComp(); void *regexCacheTmr;
void *timer;
} RegexCache;
static RegexCache sRegexCache;
#define MAX_REGEX_CACHE_SIZE 20
#define REGEX_CACHE_CLEAR_TIME 30
static void checkRegexCache(void* param, void* tmrId) {
taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, sRegexCache.regexCacheTmr, &tmrId);
if (taosHashGetSize(sRegexCache.regexHash) < MAX_REGEX_CACHE_SIZE) {
return;
}
if (taosHashGetSize(sRegexCache.regexHash) >= MAX_REGEX_CACHE_SIZE) {
UsingRegex **ppUsingRegex = taosHashIterate(sRegexCache.regexHash, NULL);
while ((ppUsingRegex != NULL)) {
if (taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) {
size_t len = 0;
char* key = (char*)taosHashGetKey(ppUsingRegex, &len);
taosHashRemove(sRegexCache.regexHash, key, len);
}
ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex);
} }
} }
pOldPattern = taosMemoryMalloc(strlen(pPattern) + 1); }
if (NULL == pOldPattern) {
void regexCacheFree(void *ppUsingRegex) {
regfree(&(*(UsingRegex **)ppUsingRegex)->pRegex);
taosMemoryFree(*(UsingRegex **)ppUsingRegex);
}
int32_t InitRegexCache() {
sRegexCache.regexHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (sRegexCache.regexHash == NULL) {
uError("failed to create RegexCache");
return -1;
}
taosHashSetFreeFp(sRegexCache.regexHash, regexCacheFree);
sRegexCache.regexCacheTmr = taosTmrInit(0, 0, 0, "REGEXCACHE");
if (sRegexCache.regexCacheTmr == NULL) {
uError("failed to create regex cache check timer");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
sRegexCache.timer = taosTmrStart(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, NULL, sRegexCache.regexCacheTmr);
if (sRegexCache.timer == NULL) {
uError("failed to start regex cache timer");
return -1;
}
return 0;
}
void DestroyRegexCache(){
uInfo("[regex cache] destory regex cache");
taosTmrStopA(&sRegexCache.timer);
taosHashCleanup(sRegexCache.regexHash);
taosTmrCleanUp(sRegexCache.regexCacheTmr);
}
int32_t checkRegexPattern(const char *pPattern) {
if (pPattern == NULL) {
return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR;
}
regex_t regex;
int32_t cflags = REG_EXTENDED;
int32_t ret = regcomp(&regex, pPattern, cflags);
if (ret != 0) {
char msgbuf[256] = {0};
regerror(ret, &regex, msgbuf, tListLen(msgbuf));
uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf);
return TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR;
}
regfree(&regex);
return TSDB_CODE_SUCCESS;
}
static UsingRegex **getRegComp(const char *pPattern) {
UsingRegex **ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern));
if (ppUsingRegex != NULL) {
(*ppUsingRegex)->lastUsedTime = taosGetTimestampSec();
return ppUsingRegex;
}
UsingRegex *pUsingRegex = taosMemoryMalloc(sizeof(UsingRegex));
if (pUsingRegex == NULL) {
uError("Failed to Malloc when compile regex pattern %s.", pPattern); uError("Failed to Malloc when compile regex pattern %s.", pPattern);
return NULL; return NULL;
} }
strcpy(pOldPattern, pPattern);
int32_t cflags = REG_EXTENDED; int32_t cflags = REG_EXTENDED;
int32_t ret = regcomp(&pRegex, pPattern, cflags); int32_t ret = regcomp(&pUsingRegex->pRegex, pPattern, cflags);
if (ret != 0) { if (ret != 0) {
char msgbuf[256] = {0}; char msgbuf[256] = {0};
regerror(ret, &pRegex, msgbuf, tListLen(msgbuf)); regerror(ret, &pUsingRegex->pRegex, msgbuf, tListLen(msgbuf));
uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf); uError("Failed to compile regex pattern %s. reason %s", pPattern, msgbuf);
DestoryThreadLocalRegComp(); taosMemoryFree(pUsingRegex);
return NULL; return NULL;
} }
return &pRegex;
while (true) {
int code = taosHashPut(sRegexCache.regexHash, pPattern, strlen(pPattern), &pUsingRegex, sizeof(UsingRegex *));
if (code != 0 && code != TSDB_CODE_DUP_KEY) {
regexCacheFree(&pUsingRegex);
uError("Failed to put regex pattern %s into cache, exception internal error.", pPattern);
return NULL;
}
ppUsingRegex = (UsingRegex **)taosHashAcquire(sRegexCache.regexHash, pPattern, strlen(pPattern));
if (ppUsingRegex) {
if (*ppUsingRegex != pUsingRegex) {
regexCacheFree(&pUsingRegex);
}
pUsingRegex = (*ppUsingRegex);
break;
} else {
continue;
}
}
pUsingRegex->lastUsedTime = taosGetTimestampSec();
return ppUsingRegex;
} }
void DestoryThreadLocalRegComp() { void releaseRegComp(UsingRegex **regex){
if (NULL != pOldPattern) { taosHashRelease(sRegexCache.regexHash, regex);
regfree(&pRegex);
taosMemoryFree(pOldPattern);
pOldPattern = NULL;
}
} }
static int32_t doExecRegexMatch(const char *pString, const char *pPattern) { static int32_t doExecRegexMatch(const char *pString, const char *pPattern) {
int32_t ret = 0; int32_t ret = 0;
char msgbuf[256] = {0}; char msgbuf[256] = {0};
regex_t *regex = threadGetRegComp(pPattern); UsingRegex **pUsingRegex = getRegComp(pPattern);
if (regex == NULL) { if (pUsingRegex == NULL) {
return 1; return 1;
} }
regmatch_t pmatch[1]; regmatch_t pmatch[1];
ret = regexec(regex, pString, 1, pmatch, 0); ret = regexec(&(*pUsingRegex)->pRegex, pString, 1, pmatch, 0);
releaseRegComp(pUsingRegex);
if (ret != 0 && ret != REG_NOMATCH) { if (ret != 0 && ret != REG_NOMATCH) {
regerror(ret, regex, msgbuf, sizeof(msgbuf)); regerror(ret, &(*pUsingRegex)->pRegex, msgbuf, sizeof(msgbuf));
uDebug("Failed to match %s with pattern %s, reason %s", pString, pPattern, msgbuf) uDebug("Failed to match %s with pattern %s, reason %s", pString, pPattern, msgbuf)
} }

View File

@ -684,6 +684,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_ERROR, "Pseudo tag tbname n
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_DUPLICATED, "Table name duplicated") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_DUPLICATED, "Table name duplicated")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_NAME_DUPLICATED, "Tag name duplicated") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_NAME_DUPLICATED, "Tag name duplicated")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC, "Some functions cannot appear in the select list at the same time") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC, "Some functions cannot appear in the select list at the same time")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
//planner //planner

View File

@ -104,7 +104,6 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) {
} }
destroyThreadLocalGeosCtx(); destroyThreadLocalGeosCtx();
DestoryThreadLocalRegComp();
return NULL; return NULL;
} }
@ -224,7 +223,6 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
taosUpdateItemSize(qinfo.queue, 1); taosUpdateItemSize(qinfo.queue, 1);
} }
DestoryThreadLocalRegComp();
return NULL; return NULL;
} }
@ -636,7 +634,6 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
} }
destroyThreadLocalGeosCtx(); destroyThreadLocalGeosCtx();
DestoryThreadLocalRegComp();
return NULL; return NULL;
} }

View File

@ -0,0 +1,183 @@
# -*- coding: utf-8 -*-
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
class TDTestCase(TBase):
def init(self, conn, logSql, replicaVar=1):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
self.dbname = "cast_db"
self._datetime_epoch = datetime.datetime.fromtimestamp(0)
def cast_without_from(self):
# int
int_num = 2147483648
tdSql.query(f"select cast({int_num} as int) re;")
tdSql.checkData(0, 0, -int_num)
tdSql.query(f"select cast(2147483647 as int) re;")
tdSql.checkData(0, 0, 2147483647)
tdSql.query(f"select cast({int_num} as int unsigned) re;")
tdSql.checkData(0, 0, int_num)
tdSql.query(f"select cast({int_num} as bigint) re;")
tdSql.checkData(0, 0, int_num)
tdSql.query(f"select cast({int_num} as bigint unsigned) re;")
tdSql.checkData(0, 0, int_num)
tdSql.query(f"select cast({int_num} as smallint) re;")
tdSql.checkData(0, 0, 0)
tdSql.query(f"select cast({int_num} as smallint unsigned) re;")
tdSql.checkData(0, 0, 0)
tdSql.query(f"select cast({int_num} as tinyint) re;")
tdSql.checkData(0, 0, 0)
tdSql.query(f"select cast({int_num} as tinyint unsigned) re;")
tdSql.checkData(0, 0, 0)
tdSql.query(f"select cast({int_num} as float) re;")
tdSql.checkData(0, 0, '2147483648.0')
tdSql.query(f"select cast({int_num} as double) re;")
tdSql.checkData(0, 0, '2147483648.0')
tdSql.query(f"select cast({int_num} as bool) as re;")
tdSql.checkData(0, 0, True)
tdSql.query(f"select cast({int_num} as timestamp) as re;")
tdSql.checkData(0, 0, self._datetime_epoch + datetime.timedelta(seconds=int(int_num) / 1000))
tdSql.query(f"select cast({int_num} as varchar(10)) as re;")
tdSql.checkData(0, 0, int_num)
tdSql.query(f"select cast({int_num} as binary(10)) as re;")
tdSql.checkData(0, 0, int_num)
sql = f"select cast({int_num} as nchar(10));"
tdSql.query(sql)
tdSql.checkData(0, 0, int_num)
# float
float_1001 = 3.14159265358979323846264338327950288419716939937510582097494459230781640628620899862803482534211706798214808651328230664709384460955058223172535940812848111745028410270193852110555964462294895493038196442881097566593344612847564823378678316527120190914564856692346034861045432664821339360726024914127372458700660631558817488152092096282925409171536436789259036001133053054882046652138414695194151160943305727036575959195309218611738193261179310511854807446237996274956735188575272489122793818301194912983367336244065664308602139494639522473719070217986094370277053921717629317675238467481846766940513200056812714526356082778577134275778960917363717872146844090122495343014654958537105079227968925892354201995611212902196086403441815981362977477130996051870721134999999837297804995105973173281609631859502445945534690830264252230825334468503526193118817101000313783875288658753320838142061717766914730359825349042875546873115956286388235378759375195778185778053217122680661300192787661119590921642019
tdSql.query(f"select cast({float_1001} as int) as re;")
tdSql.checkData(0, 0, 3)
tdSql.query(f"select cast({float_1001} as int unsigned) as re;")
tdSql.checkData(0, 0, 3)
tdSql.query(f"select cast({float_1001} as tinyint) as re;")
tdSql.checkData(0, 0, 3)
tdSql.query(f"select cast({float_1001} as tinyint unsigned) as re;")
tdSql.checkData(0, 0, 3)
tdSql.query(f"select cast({float_1001} as smallint) as re;")
tdSql.checkData(0, 0, 3)
tdSql.query(f"select cast({float_1001} as smallint unsigned) as re;")
tdSql.checkData(0, 0, 3)
tdSql.query(f"select cast({float_1001} as bigint) as re;")
tdSql.checkData(0, 0, 3)
tdSql.query(f"select cast({float_1001} as bigint unsigned) as re;")
tdSql.checkData(0, 0, 3)
tdSql.query(f"select cast({float_1001} as double) as re;")
tdSql.checkData(0, 0, 3.141592653589793)
tdSql.query(f"select cast({float_1001} as float) as re;")
tdSql.checkData(0, 0, 3.1415927)
tdSql.query(f"select cast({float_1001} as bool) as re;")
tdSql.checkData(0, 0, True)
tdSql.query(f"select cast({float_1001} as timestamp) as re;")
tdSql.checkData(0, 0, self._datetime_epoch + datetime.timedelta(seconds=int(float_1001) / 1000))
sql = f"select cast({float_1001} as varchar(5)) as re;"
tdSql.query(sql)
tdSql.checkData(0, 0, 3.141)
sql = f"select cast({float_1001} as binary(10)) as re;"
tdSql.query(sql)
tdSql.checkData(0, 0, 3.141593)
tdSql.query(f"select cast({float_1001} as nchar(5));")
tdSql.checkData(0, 0, 3.141)
# str
str_410 = "bcdefghigk" * 41
big_str = "bcdefghigk" * 6552
tdSql.query(f"select cast('{str_410}' as binary(3)) as re;")
tdSql.checkData(0, 0, "bcd")
tdSql.query(f"select cast('{str_410}' as varchar(2)) as re;")
tdSql.checkData(0, 0, "bc")
tdSql.query(f"select cast('{str_410}' as nchar(10));")
tdSql.checkData(0, 0, "bcdefghigk")
tdSql.query(f"select cast('北京' as nchar(10));")
tdSql.checkData(0, 0, "北京")
tdSql.query(f"select cast('{str_410}' as int) as re;")
tdSql.checkData(0, 0, 0)
tdSql.query(f"select cast('{str_410}' as int unsigned) as re;")
tdSql.checkData(0, 0, 0)
tdSql.query(f"select cast('{str_410}' as tinyint) as re")
tdSql.checkData(0, 0, 0)
tdSql.query(f"select cast('{str_410}' as tinyint unsigned) as re")
tdSql.checkData(0, 0, 0)
tdSql.query(f"select cast('{str_410}' as smallint) as re")
tdSql.checkData(0, 0, 0)
tdSql.query(f"select cast('{str_410}' as smallint unsigned) as re")
tdSql.checkData(0, 0, 0)
tdSql.query(f"select cast('{str_410}' as bigint) as re")
tdSql.checkData(0, 0, 0)
tdSql.query(f"select cast('{str_410}' as bigint unsigned) as re")
tdSql.checkData(0, 0, 0)
tdSql.query(f"select cast('{str_410}' as float) as re;")
tdSql.checkData(0, 0, 0.0000000)
tdSql.query(f"select cast('{str_410}' as double) as re;")
tdSql.checkData(0, 0, 0.000000000000000)
tdSql.query(f"select cast('{str_410}' as bool) as re")
tdSql.checkData(0, 0, False)
tdSql.query( f"select cast('{str_410}' as timestamp) as re")
tdSql.checkData(0, 0, "1970-01-01 08:00:00.000")
def run(self):
# self.prepare_data()
# self.all_test()
# tdSql.execute(f"flush database {self.dbname}")
# self.all_test()
self.cast_without_from()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -36,7 +36,7 @@ class TDTestCase(TBase):
"create table db.st(ts timestamp, age int) tags(area tinyint);", "create table db.st(ts timestamp, age int) tags(area tinyint);",
"insert into db.t1 using db.st tags(100) values('2024-01-01 10:00:01', 1);", "insert into db.t1 using db.st tags(100) values('2024-01-01 10:00:01', 1);",
"insert into db.t2 using db.st tags(110) values('2024-01-01 10:00:02', 2);", "insert into db.t2 using db.st tags(110) values('2024-01-01 10:00:02', 2);",
"insert into db.t3 using db.st tags(3) values('2024-01-01 10:00:03', 3);" "insert into db.t3 using db.st tags(3) values('2024-01-01 10:00:03', 3);",
] ]
tdSql.executes(sqls) tdSql.executes(sqls)
@ -44,7 +44,7 @@ class TDTestCase(TBase):
results = [ results = [
["2024-01-01 10:00:01", 1, 100], ["2024-01-01 10:00:01", 1, 100],
["2024-01-01 10:00:02", 2, 110], ["2024-01-01 10:00:02", 2, 110],
["2024-01-01 10:00:03", 3, 3] ["2024-01-01 10:00:03", 3, 3],
] ]
tdSql.checkDataMem(sql, results) tdSql.checkDataMem(sql, results)
@ -99,11 +99,12 @@ class TDTestCase(TBase):
for i in range(1, 10): for i in range(1, 10):
new_ts = base_ts + i * 1000 new_ts = base_ts + i * 1000
num = i * 100 num = i * 100
v1, v2 = i * 10, i * 11
sqls = [ sqls = [
f"insert into ntb1 values({new_ts}, 'nihao{num}', {10*i}, {10*i}, {10*i});", f"insert into ntb1 values({new_ts}, 'nihao{num}', {v1}, {v1}, {v1});",
f"insert into ntb1 values({new_ts + 1}, 'nihao{num + 1}', NULL, NULL, NULL);", f"insert into ntb1 values({new_ts + 1}, 'nihao{num + 1}', NULL, NULL, NULL);",
f"delete from ntb1 where ts = {new_ts};", f"delete from ntb1 where ts = {new_ts};",
f"insert into ntb1 values({new_ts + 2}, 'nihao{num + 2}', {11*i}, {11*i}, {11*i});", f"insert into ntb1 values({new_ts + 2}, 'nihao{num + 2}', {v2}, {v2}, {v2});",
] ]
tdSql.executes(sqls) tdSql.executes(sqls)

View File

@ -36,7 +36,7 @@
"insert_rows": 2000000, "insert_rows": 2000000,
"childtable_prefix": "d", "childtable_prefix": "d",
"insert_mode": "taosc", "insert_mode": "taosc",
"timestamp_step": 1000, "timestamp_step": 100,
"start_timestamp": 1600000000000, "start_timestamp": 1600000000000,
"columns": [ "columns": [
{ "type": "bool", "name": "bc"}, { "type": "bool", "name": "bc"},

View File

@ -73,7 +73,7 @@ class TDTestCase(TBase):
# come from s3_basic.json # come from s3_basic.json
self.childtable_count = 6 self.childtable_count = 6
self.insert_rows = 2000000 self.insert_rows = 2000000
self.timestamp_step = 1000 self.timestamp_step = 100
def createStream(self, sname): def createStream(self, sname):
sql = f"create stream {sname} fill_history 1 into stm1 as select count(*) from {self.db}.{self.stb} interval(10s);" sql = f"create stream {sname} fill_history 1 into stm1 as select count(*) from {self.db}.{self.stb} interval(10s);"
@ -262,7 +262,7 @@ class TDTestCase(TBase):
# come from s3_basic.json # come from s3_basic.json
self.insert_rows += self.insert_rows/4 self.insert_rows += self.insert_rows/4
self.timestamp_step = 500 self.timestamp_step = 50
# delete # delete
def checkDelete(self): def checkDelete(self):

View File

@ -36,7 +36,7 @@
"insert_rows": 1000000, "insert_rows": 1000000,
"childtable_prefix": "d", "childtable_prefix": "d",
"insert_mode": "taosc", "insert_mode": "taosc",
"timestamp_step": 500, "timestamp_step": 50,
"start_timestamp": 1600000000000, "start_timestamp": 1600000000000,
"columns": [ "columns": [
{ "type": "bool", "name": "bc"}, { "type": "bool", "name": "bc"},

View File

@ -15,6 +15,7 @@
,,n,army,python3 ./test.py -f s3/s3Basic.py -N 3 ,,n,army,python3 ./test.py -f s3/s3Basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f cluster/snapshot.py -N 3 -L 3 -D 2 ,,y,army,./pytest.sh python3 ./test.py -f cluster/snapshot.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f query/function/test_func_elapsed.py ,,y,army,./pytest.sh python3 ./test.py -f query/function/test_func_elapsed.py
,,y,army,./pytest.sh python3 ./test.py -f query/function/cast.py
,,y,army,./pytest.sh python3 ./test.py -f query/test_join.py ,,y,army,./pytest.sh python3 ./test.py -f query/test_join.py
,,y,army,./pytest.sh python3 ./test.py -f query/test_compare.py ,,y,army,./pytest.sh python3 ./test.py -f query/test_compare.py
,,y,army,./pytest.sh python3 ./test.py -f insert/test_column_tag_boundary.py ,,y,army,./pytest.sh python3 ./test.py -f insert/test_column_tag_boundary.py
@ -192,6 +193,10 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/match.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-28068.py -Q 3

View File

@ -0,0 +1,141 @@
import taos
import sys
import datetime
import inspect
import threading
import time
from util.log import *
from util.sql import *
from util.cases import *
from util.common import tdCom
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
def initConnection(self):
self.records = 10000000
self.numOfTherads = 50
self.ts = 1537146000000
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
self.config = "/home/xp/git/TDengine/sim/dnode1/cfg"
self.conn = taos.connect(
self.host,
self.user,
self.password,
self.config)
def initDB(self):
tdSql.execute("drop database if exists db")
tdSql.execute("create database if not exists db")
def stopTest(self):
tdSql.execute("drop database if exists db")
def threadTest(self, threadID):
print(f"Thread {threadID} starting...")
tdsqln = tdCom.newTdSql()
for i in range(2, 50):
tdsqln.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}dx'")
tdsqln.checkRows(0)
for i in range(100):
tdsqln.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*1x'")
tdsqln.checkRows(2)
tdsqln.query("select * from db.t1x")
tdsqln.checkRows(5)
tdsqln.query("select * from db.t1x where c1 match '_c'")
tdsqln.checkRows(2)
tdsqln.query("select * from db.t1x where c1 match '%__c'")
tdsqln.checkRows(0)
tdsqln.error("select * from db.t1x where c1 match '*d'")
print(f"Thread {threadID} finished.")
def match_test(self):
tdSql.execute("create table db.t1x (ts timestamp, c1 varchar(100))")
tdSql.execute("create table db.t_1x (ts timestamp, c1 varchar(100))")
tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*1x'")
tdSql.checkRows(2)
for i in range(2, 50):
tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}x'")
tdSql.checkRows(0)
tdSql.error("select * from db.t1x where c1 match '*d'")
tdSql.query("insert into db.t1x values(now, 'abc'), (now+1s, 'a%c'),(now+2s, 'a_c'),(now+3s, '_c'),(now+4s, '%c')")
tdSql.query("select * from db.t1x")
tdSql.checkRows(5)
tdSql.query("select * from db.t1x where c1 match '_c'")
tdSql.checkRows(2)
tdSql.query("select * from db.t1x where c1 match '%__c'")
tdSql.checkRows(0)
tdSql.error("select * from db.t1x where c1 match '*d'")
threads = []
for i in range(10):
t = threading.Thread(target=self.threadTest, args=(i,))
threads.append(t)
t.start()
time.sleep(31)
tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*1x'")
tdSql.checkRows(2)
for i in range(2, 50):
tdSql.query(f"select distinct table_name from information_schema.ins_columns where table_name match 't.*{i}x'")
tdSql.checkRows(0)
tdSql.query("select * from db.t1x")
tdSql.checkRows(5)
tdSql.query("select * from db.t1x where c1 match '_c'")
tdSql.checkRows(2)
tdSql.query("select * from db.t1x where c1 match '%__c'")
tdSql.checkRows(0)
tdSql.execute("create table db.t3x (ts timestamp, c1 varchar(100))")
tdSql.execute("insert into db.t3x values(now, '我是中文'), (now+1s, '我是_中文'), (now+2s, '我是%中文'), (now+3s, '%中文'),(now+4s, '_中文')")
tdSql.query("select * from db.t3x where c1 match '%中文'")
tdSql.checkRows(2)
tdSql.query("select * from db.t3x where c1 match '中文'")
tdSql.checkRows(5)
tdSql.error("select * from db.t1x where c1 match '*d'")
for thread in threads:
print(f"Thread waitting for finish...")
thread.join()
print(f"Mutithread test finished.")
def run(self):
tdLog.printNoPrefix("==========start match_test run ...............")
tdSql.prepare(replica = self.replicaVar)
self.initConnection()
self.initDB()
self.match_test()
self.stopTest()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -104,6 +104,10 @@ python3 ./test.py -f 2-query/like.py
python3 ./test.py -f 2-query/like.py -Q 2 python3 ./test.py -f 2-query/like.py -Q 2
python3 ./test.py -f 2-query/like.py -Q 3 python3 ./test.py -f 2-query/like.py -Q 3
python3 ./test.py -f 2-query/like.py -Q 4 python3 ./test.py -f 2-query/like.py -Q 4
python3 ./test.py -f 2-query/match.py
python3 ./test.py -f 2-query/match.py -Q 2
python3 ./test.py -f 2-query/match.py -Q 3
python3 ./test.py -f 2-query/match.py -Q 4
python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False
python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False
python3 ./test.py -f 3-enterprise/restore/restoreMnode.py -N 5 -M 3 -i False python3 ./test.py -f 3-enterprise/restore/restoreMnode.py -N 5 -M 3 -i False