diff --git a/include/common/tglobal.h b/include/common/tglobal.h index ce29c60f86..833094fb8f 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -79,6 +79,7 @@ extern int8_t tsMemPoolFullFunc; //extern int32_t tsQueryBufferPoolSize; extern int32_t tsMinReservedMemorySize; extern int64_t tsCurrentAvailMemorySize; +extern int32_t tsQueryNoFetchTimeoutSec; extern int32_t tsNumOfQueryThreads; extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfRpcSessions; diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 51d9e752a4..1b274a3f06 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -63,7 +63,7 @@ typedef struct SFuncExecFuncs { processFuncByRow processFuncByRow; } SFuncExecFuncs; -#define MAX_INTERVAL_TIME_WINDOW 10000000 // maximum allowed time windows in final results +#define MAX_INTERVAL_TIME_WINDOW 1000000000 // maximum allowed time windows in final results #define TOP_BOTTOM_QUERY_LIMIT 100 #define FUNCTIONS_NAME_MAX_LENGTH 32 diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b0a08d8eea..3d514057d5 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -644,6 +644,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_QRY_REACH_QMEM_THRESHOLD TAOS_DEF_ERROR_CODE(0, 0x0738) #define TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED TAOS_DEF_ERROR_CODE(0, 0x0739) #define TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM TAOS_DEF_ERROR_CODE(0, 0x073A) +#define TSDB_CODE_QRY_NO_FETCH_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x073B) // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) diff --git a/include/util/tconfig.h b/include/util/tconfig.h index 3fc247982f..89a3561096 100644 --- a/include/util/tconfig.h +++ b/include/util/tconfig.h @@ -118,6 +118,7 @@ void cfgUnLock(SConfig *pCfg); // clang-format off int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope, int8_t dynScope); +int32_t cfgAddInt32Ex(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope, int8_t dynScope); int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope, int8_t dynScope); int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope, int8_t dynScope); int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float minval, float maxval, int8_t scope, int8_t dynScope); diff --git a/include/util/tpagedbuf.h b/include/util/tpagedbuf.h index 71cee62d2e..045716d3ca 100644 --- a/include/util/tpagedbuf.h +++ b/include/util/tpagedbuf.h @@ -50,7 +50,7 @@ typedef struct SDiskbasedBufStatis { * @param handle * @return */ -int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id, +int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int64_t inMemBufSize, const char* id, const char* dir); /** diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index d941493d99..b3a1543668 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -682,7 +682,7 @@ void doDestroyRequest(void *p) { SRequestObj *pRequest = (SRequestObj *)p; uint64_t reqId = pRequest->requestId; - tscDebug("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); + tscDebug("begin to destroy request 0x%" PRIx64 " p:%p", reqId, pRequest); int64_t nextReqRefId = pRequest->relation.nextRefId; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index b708536d3b..f60b30823f 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -58,13 +58,14 @@ int8_t tsMemPoolFullFunc = 0; int8_t tsQueryUseMemoryPool = 1; int32_t tsQueryBufferPoolSize = 0; //MB int32_t tsSingleQueryMaxMemorySize = 0; //MB -int32_t tsMinReservedMemorySize = MIN_RESERVE_MEM_SIZE; //MB +int32_t tsMinReservedMemorySize = 0; //MB int64_t tsCurrentAvailMemorySize = 0; // queue & threads int32_t tsQueryMinConcurrentTaskNum = 1; int32_t tsQueryMaxConcurrentTaskNum = 0; int32_t tsQueryConcurrentTaskNum = 0; +int32_t tsQueryNoFetchTimeoutSec = 3600 * 5; int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcSessions = 30000; @@ -750,9 +751,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "memPoolFullFunc", tsMemPoolFullFunc, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "singleQueryMaxMemorySize", tsSingleQueryMaxMemorySize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "singleQueryMaxMemorySize", tsSingleQueryMaxMemorySize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0); //TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryBufferPoolSize", tsQueryBufferPoolSize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "minReservedMemorySize", tsMinReservedMemorySize, 1024, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0); + TAOS_CHECK_RETURN(cfgAddInt32Ex(pCfg, "minReservedMemorySize", 0, 1024, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryNoFetchTimeoutSec", tsQueryNoFetchTimeoutSec, 60, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE)); @@ -2089,7 +2091,8 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"experimental", &tsExperimental}, {"maxTsmaNum", &tsMaxTsmaNum}, {"safetyCheckLevel", &tsSafetyCheckLevel}, - {"bypassFlag", &tsBypassFlag}}; + {"bypassFlag", &tsBypassFlag}, + {"queryNoFetchTimeoutSec", &tsQueryNoFetchTimeoutSec}}; if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) { code = taosCfgSetOption(options, tListLen(options), pItem, false); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 039c0fa68b..fa2ad3d948 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -938,7 +938,7 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int3 struct SOperatorInfo* pOperator); STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); -int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); +int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, int64_t* defaultBufsz); extern void doDestroyExchangeOperatorInfo(void* param); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 829ca6da50..efb19574e5 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -573,7 +573,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n } uint32_t defaultPgsz = 0; - uint32_t defaultBufsz = 0; + int64_t defaultBufsz = 0; code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); if (code) { qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index af8e01be5e..c6c715a3c3 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -972,7 +972,7 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { } } -int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) { +int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, int64_t* defaultBufsz) { *defaultPgsz = 4096; uint32_t last = *defaultPgsz; while (*defaultPgsz < rowSize * 4) { @@ -986,7 +986,8 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul // The default buffer for each operator in query is 10MB. // at least four pages need to be in buffer // TODO: make this variable to be configurable. - *defaultBufsz = 4096 * 2560; +// *defaultBufsz = 4096 * 2560; + *defaultBufsz = 4096UL * 2560000; if ((*defaultBufsz) <= (*defaultPgsz)) { (*defaultBufsz) = (*defaultPgsz) * 4; if (*defaultBufsz < ((int64_t)(*defaultPgsz)) * 4) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index d6e3d26267..2346d578dd 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1161,7 +1161,7 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo } uint32_t defaultPgsz = 0; - uint32_t defaultBufsz = 0; + int64_t defaultBufsz = 0; pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc); QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno); diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index aa0eb3d7b3..a0f33c3763 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -47,6 +47,8 @@ extern "C" { #define QW_RETIRE_JOB_BATCH_NUM 5 +#define QW_DEFAULT_TIMEOUT_INTERVAL_SECS 3600 + enum { QW_CONC_TASK_LEVEL_LOW = 1, QW_CONC_TASK_LEVEL_MIDDLE, @@ -186,6 +188,8 @@ typedef struct SQWTaskCtx { void *sinkHandle; SArray *tbInfo; // STbVerInfo + int64_t lastAckTs; + void *memPoolSession; SQWJobInfo *pJobInfo; } SQWTaskCtx; @@ -243,7 +247,8 @@ typedef struct SQWorker { SQWStat stat; int32_t *destroyed; - int8_t nodeStopped; + int8_t nodeStopped; + int32_t lastChkTs; } SQWorker; typedef struct SQWorkerMgmt { @@ -529,6 +534,7 @@ int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t clientId, int32_t rwType, SQ void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx); int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx); +void qwDbgDumpJobsInfo(void); void qwDbgDumpMgmtInfo(SQWorker *mgmt); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore, bool dynamicTask); int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet); @@ -540,6 +546,8 @@ int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx); int32_t qwInitQueryPool(void); void qwDestroyJobInfo(void* job); bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool forceStop, int32_t errCode); +void qwStopAllTasks(SQWorker *mgmt); +void qwChkDropTimeoutQuery(SQWorker *mgmt, int32_t currTs); bool qwRetireJob(SQWJobInfo* pJob); void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session); int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession); diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index d640bae822..366b9d09f1 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -36,7 +36,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes); int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code); -int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, +int32_t qwBuildAndSendFetchRsp(SQWTaskCtx *ctx, int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, int32_t rawDataLen, bool qComplete); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 95e0f0ddba..d0035dd54a 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -168,6 +168,36 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) { qwDbgDumpTasksInfo(mgmt); } +void qwDbgDumpJobsInfo(void) { + if (!gQWDebug.dumpEnable) { + return; + } + + qDebug("total remain job num %d, task initNum:%" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64, + taosHashGetSize(gQueryMgmt.pJobInfo), atomic_load_64(&gQueryMgmt.stat.taskInitNum), + atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum)); + + size_t keyLen = 0; + char* id = NULL; + int32_t jobIdx = 0; + SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL); + while (NULL != pJob) { + qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " the %dth remain job", pJob->memInfo->jobId, pJob->memInfo->clientId, jobIdx++); + + int32_t sessionIdx = 0; + SQWSessionInfo* pSession = (SQWSessionInfo*)taosHashIterate(pJob->pSessions, NULL); + while (NULL != pSession) { + qDebug("QID:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d the %dth remain session", + pSession->qId, pSession->sId, pSession->cId, pSession->tId, pSession->eId, sessionIdx++); + + pSession = (SQWSessionInfo*)taosHashIterate(pJob->pSessions, pSession); + } + + pJob = (SQWJobInfo *)taosHashIterate(gQueryMgmt.pJobInfo, pJob); + } +} + + int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) { int32_t contLen = 0; char *rsp = NULL; diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index 6f076a8f69..0d71746583 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -85,9 +85,11 @@ void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session) { int32_t remainSessions = atomic_sub_fetch_32(&pJobInfo->memInfo->remainSession, 1); + QW_TASK_DLOG("task session destoryed, remainSessions:%d", remainSessions); + if (0 == remainSessions) { QW_LOCK(QW_WRITE, &pJobInfo->lock); - if (0 == taosHashGetSize(pJobInfo->pSessions) && 0 == atomic_load_32(&pJobInfo->memInfo->remainSession)) { + if (/*0 == taosHashGetSize(pJobInfo->pSessions) && */0 == atomic_load_32(&pJobInfo->memInfo->remainSession)) { atomic_store_8(&pJobInfo->destroyed, 1); QW_UNLOCK(QW_WRITE, &pJobInfo->lock); @@ -96,6 +98,7 @@ void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session) { (void)taosHashRemove(gQueryMgmt.pJobInfo, id2, sizeof(id2)); QW_TASK_DLOG_E("the whole query job removed"); } else { + QW_TASK_DLOG("job not removed, remainSessions:%d, %d", taosHashGetSize(pJobInfo->pSessions), pJobInfo->memInfo->remainSession); QW_UNLOCK(QW_WRITE, &pJobInfo->lock); } } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 69965cd78c..ec42f330af 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -162,7 +162,7 @@ int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, +int32_t qwBuildAndSendFetchRsp(SQWTaskCtx *ctx, int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { if (NULL == pRsp) { pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); @@ -184,6 +184,10 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve rpcRsp.info.compressed = pRsp->compressed; tmsgSendRsp(&rpcRsp); + if (NULL != ctx) { + ctx->lastAckTs = taosGetTimestampSec(); + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 8ee0f1528c..8bcf36c767 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -317,7 +317,7 @@ int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) { void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { if (ctx->ctrlConnInfo.handle) { - tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER, 0); + tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER, ctx->rspCode); } ctx->ctrlConnInfo.handle = NULL; @@ -395,6 +395,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0}; QW_SET_QTID(id, qId, cId, tId, eId); SQWTaskCtx octx; + int32_t code = TSDB_CODE_SUCCESS; SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); if (NULL == ctx) { @@ -404,14 +405,20 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { octx = *ctx; + if (ctx->pJobInfo && TSDB_CODE_SUCCESS != ctx->pJobInfo->errCode) { + QW_UPDATE_RSP_CODE(ctx, ctx->pJobInfo->errCode); + } + atomic_store_ptr(&ctx->taskHandle, NULL); atomic_store_ptr(&ctx->sinkHandle, NULL); + atomic_store_ptr(&ctx->pJobInfo, NULL); + atomic_store_ptr(&ctx->memPoolSession, NULL); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP); if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) { QW_TASK_ELOG_E("taosHashRemove from ctx hash failed"); - QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt)); + code = QW_CTX_NOT_EXISTS_ERR_CODE(mgmt); } qwFreeTaskCtx(QW_FPARAMS(), &octx); @@ -419,7 +426,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { QW_TASK_DLOG_E("task ctx dropped"); - return TSDB_CODE_SUCCESS; + return code; } int32_t qwDropTaskStatus(QW_FPARAMS_DEF) { @@ -747,7 +754,7 @@ bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool forceStop, int32_t errC QW_LOCK(QW_WRITE, &ctx->lock); - QW_TASK_DLOG("start to stop task, forceStop:%d", forceStop); + QW_TASK_DLOG("start to stop task, forceStop:%d, error:%s", forceStop, tstrerror(errCode)); if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG_E("task already dropping"); @@ -830,3 +837,49 @@ bool qwRetireJob(SQWJobInfo *pJob) { return retired; } + + +void qwStopAllTasks(SQWorker *mgmt) { + uint64_t qId, cId, tId, sId; + int32_t eId; + int64_t rId = 0; + + void *pIter = taosHashIterate(mgmt->ctxHash, NULL); + while (pIter) { + SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; + void *key = taosHashGetKey(pIter, NULL); + QW_GET_QTID(key, qId, cId, tId, eId); + + sId = ctx->sId; + + (void)qwStopTask(QW_FPARAMS(), ctx, true, TSDB_CODE_VND_STOPPED); + + pIter = taosHashIterate(mgmt->ctxHash, pIter); + } +} + + +void qwChkDropTimeoutQuery(SQWorker *mgmt, int32_t currTs) { + uint64_t qId, cId, tId, sId; + int32_t eId; + int64_t rId = 0; + + void *pIter = taosHashIterate(mgmt->ctxHash, NULL); + while (pIter) { + SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; + if ((ctx->lastAckTs <= 0) || (currTs - ctx->lastAckTs) < tsQueryNoFetchTimeoutSec) { + pIter = taosHashIterate(mgmt->ctxHash, pIter); + continue; + } + + void *key = taosHashGetKey(pIter, NULL); + QW_GET_QTID(key, qId, cId, tId, eId); + + sId = ctx->sId; + + (void)qwStopTask(QW_FPARAMS(), ctx, true, TSDB_CODE_QRY_NO_FETCH_TIMEOUT); + + pIter = taosHashIterate(mgmt->ctxHash, pIter); + } +} + diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 19f7538c00..a11cfb250a 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -21,24 +21,6 @@ SQWorkerMgmt gQwMgmt = { TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT; SQueryMgmt gQueryMgmt = {0}; -void qwStopAllTasks(SQWorker *mgmt) { - uint64_t qId, cId, tId, sId; - int32_t eId; - int64_t rId = 0; - - void *pIter = taosHashIterate(mgmt->ctxHash, NULL); - while (pIter) { - SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; - void *key = taosHashGetKey(pIter, NULL); - QW_GET_QTID(key, qId, cId, tId, eId); - - sId = ctx->sId; - - (void)qwStopTask(QW_FPARAMS(), ctx, true, TSDB_CODE_VND_STOPPED); - - pIter = taosHashIterate(mgmt->ctxHash, pIter); - } -} int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; @@ -102,6 +84,7 @@ int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, rspCode, tstrerror(rspCode)); } + ctx->lastAckTs = taosGetTimestampSec(); ctx->queryRsped = true; } @@ -502,7 +485,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32 qwMsg->connInfo = ctx->dataConnInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - QW_ERR_RET(qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code)); + QW_ERR_RET(qwBuildAndSendFetchRsp(ctx, ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code)); rsp = NULL; QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), @@ -899,7 +882,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { qwMsg->connInfo = ctx->dataConnInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - QW_ERR_JRET(qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code)); + QW_ERR_JRET(qwBuildAndSendFetchRsp(ctx, ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code)); rsp = NULL; QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, @@ -922,7 +905,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); qwMsg->connInfo = ctx->dataConnInfo; - code = qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code); + code = qwBuildAndSendFetchRsp(ctx, ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code); if (TSDB_CODE_SUCCESS != code) { QW_TASK_ELOG("fetch rsp send fail, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0); @@ -1031,7 +1014,7 @@ _return: } if (!rsped) { - code = qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + code = qwBuildAndSendFetchRsp(ctx, qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); if (TSDB_CODE_SUCCESS != code) { QW_TASK_ELOG("fetch rsp send fail, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code), dataLen); @@ -1088,7 +1071,7 @@ _return: (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // task already failed, no more error handling } else { - tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER, 0); + tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER, code); } } @@ -1220,13 +1203,20 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { SQWHbInfo *rspList = NULL; SArray *pExpiredSch = NULL; int32_t code = 0; + int32_t currTs = taosGetTimestampSec(); qwDbgDumpMgmtInfo(mgmt); + qwDbgDumpJobsInfo(); if (gQWDebug.forceStop) { qwStopAllTasks(mgmt); } + if (mgmt->lastChkTs > 0 && (currTs - mgmt->lastChkTs) >= QW_DEFAULT_TIMEOUT_INTERVAL_SECS) { + qwChkDropTimeoutQuery(mgmt, currTs); + mgmt->lastChkTs = currTs; + } + QW_LOCK(QW_READ, &mgmt->schLock); int32_t schNum = taosHashGetSize(mgmt->schHash); @@ -1434,6 +1424,8 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S QW_ERR_JRET(qExecutorInit()); + mgmt->lastChkTs = taosGetTimestampSec(); + *qWorkerMgmt = mgmt; qDebug("qworker initialized, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt); @@ -1671,8 +1663,7 @@ void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) { continue; } - if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && - 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { + if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); bool retired = qwRetireJob(pJob); @@ -1689,6 +1680,8 @@ void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) { pJob = (SQWJobInfo *)taosHashIterate(gQueryMgmt.pJobInfo, pJob); } + taosHashCancelIterate(gQueryMgmt.pJobInfo, pJob); + qDebug("job retire in batch done, [prev:%d, curr:%d, total:%d] jobs, direct retiredSize:%" PRId64 " targetRetireSize:%" PRId64 ", task initNum:%" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64, alreadyJobNum, jobNum, taosHashGetSize(gQueryMgmt.pJobInfo), retiredSize, retireSize, diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index ae99dff10c..db39880483 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -218,6 +218,7 @@ typedef struct SSchRedirectCtx { } SSchRedirectCtx; typedef struct SSchTimerParam { + int8_t exit; int64_t rId; uint64_t queryId; uint64_t taskId; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 9fb901800b..588af5f16e 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -1047,7 +1047,10 @@ int32_t schResetJobForRetry(SSchJob *pJob, SSchTask *pTask, int32_t rspCode, boo SCH_UNLOCK_TASK(pTask); SCH_RET(code); } + SCH_LOCK(SCH_WRITE, &pTask->planLock); qClearSubplanExecutionNode(pTask->plan); + SCH_UNLOCK(SCH_WRITE, &pTask->planLock); + schResetTaskForRetry(pJob, pTask); SCH_UNLOCK_TASK(pTask); } diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index bd737d6c89..6c19667f98 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -426,10 +426,11 @@ _return: void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) { pTask->waitRetry = true; - schDropTaskOnExecNode(pJob, pTask); if (pTask->delayTimer) { - schStopTaskDelayTimer(pJob, pTask, false); + taosTmrStop(pTask->delayTimer); } + + schDropTaskOnExecNode(pJob, pTask); taosHashClear(pTask->execNodes); (void)schRemoveTaskFromExecList(pJob, pTask); // ignore error schDeregisterTaskHb(pJob, pTask); @@ -746,6 +747,10 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { (void)atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); + if (pTask->delayTimer) { + taosTmrStop(pTask->delayTimer); + } + (void)schRemoveTaskFromExecList(pJob, pTask); // ignore error SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); @@ -1291,6 +1296,8 @@ void schHandleTimerEvent(void *param, void *tmrId) { SSchJob *pJob = NULL; int32_t code = 0; + qDebug("delayTimer %" PRIuPTR " is launched", (uintptr_t)tmrId); + int64_t rId = pTimerParam->rId; uint64_t queryId = pTimerParam->queryId; uint64_t taskId = pTimerParam->taskId; @@ -1299,7 +1306,11 @@ void schHandleTimerEvent(void *param, void *tmrId) { return; } - code = schLaunchTask(pJob, pTask); + if (0 == atomic_load_8(&pTask->delayLaunchPar.exit)) { + code = schLaunchTask(pJob, pTask); + } else { + SCH_TASK_DLOG("task will not be launched since query job exiting, status: %d", pTask->status); + } schProcessOnCbEnd(pJob, pTask, code); } @@ -1310,10 +1321,8 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) { pTask->delayLaunchPar.queryId = pJob->queryId; pTask->delayLaunchPar.taskId = pTask->taskId; - if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) { - SCH_ERR_RET(schPushTaskToExecList(pJob, pTask)); - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC); - } + SCH_ERR_RET(schPushTaskToExecList(pJob, pTask)); + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC); if (NULL == pTask->delayTimer) { pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer); @@ -1322,6 +1331,8 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } + SCH_TASK_DLOG("task delayTimer %" PRIuPTR " is started", (uintptr_t)pTask->delayTimer); + return TSDB_CODE_SUCCESS; } @@ -1357,10 +1368,11 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { while (pIter) { SSchTask *pTask = *(SSchTask **)pIter; - SCH_LOCK_TASK(pTask); if (pTask->delayTimer) { schStopTaskDelayTimer(pJob, pTask, true); } + + SCH_LOCK_TASK(pTask); schDropTaskOnExecNode(pJob, pTask); SCH_UNLOCK_TASK(pTask); diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 4e04b0210b..1eb7dd5281 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -424,13 +424,22 @@ int32_t schValidateSubplan(SSchJob *pJob, SSubplan *pSubplan, int32_t level, int } void schStopTaskDelayTimer(SSchJob *pJob, SSchTask *pTask, bool syncOp) { + SCH_TASK_DLOG("try to stop task delayTimer %" PRIuPTR, (uintptr_t)pTask->delayTimer); + tmr_h delayTimer = pTask->delayTimer; + + atomic_store_8(&pTask->delayLaunchPar.exit, 1); + if (!taosTmrStopA(&pTask->delayTimer)) { + SCH_TASK_DLOG("task delayTimer %" PRIuPTR " not stopped", (uintptr_t)delayTimer); + if (syncOp) { - while (!taosTmrIsStopped(&pTask->delayTimer)) { + while (!taosTmrIsStopped(&delayTimer)) { taosMsleep(1); } + + SCH_TASK_DLOG("task delayTimer %" PRIuPTR " is stopped", (uintptr_t)delayTimer); } else { - SCH_TASK_WLOG("stop task delayTimer failed, may stopped, status:%d", pTask->status); + SCH_TASK_WLOG("stop task delayTimer %" PRIuPTR " failed, may stopped, status:%d", (uintptr_t)delayTimer, pTask->status); } } } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f26106cf5b..edf402012e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1293,8 +1293,8 @@ static void cliHandleException(SCliConn* conn) { if (conn->registered) { int8_t ref = transGetRefCount(conn); if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) { - tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd, - conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds); +// tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd, +// conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds); uv_close((uv_handle_t*)conn->stream, cliDestroy); } } diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 6f8826cedb..70e0425244 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -44,7 +44,7 @@ extern "C" { #define MP_RETIRE_UNIT_MIN_SIZE (50 * 1048576UL) #define MP_CFG_UPDATE_MIN_RESERVE_SIZE (50 * 1024 * 1048576UL) -#define MP_DEFAULT_RESERVE_MEM_PERCENT 1 // TODO 20 !!!!!!!!!!!!!!!!!!!!! +#define MP_DEFAULT_RESERVE_MEM_PERCENT 20 #define MP_MIN_FREE_SIZE_AFTER_RESERVE (4 * 1024 * 1048576UL) #define MP_MIN_MEM_POOL_SIZE (5 * 1024 * 1048576UL) diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index d2a2b1fb9a..a35179bdc6 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -538,6 +538,18 @@ int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scop return cfgAddItem(pCfg, &item, name); } +int32_t cfgAddInt32Ex(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope, + int8_t dynScope) { + SConfigItem item = {.dtype = CFG_DTYPE_INT32, + .i32 = defaultVal, + .imin = minval, + .imax = maxval, + .scope = scope, + .dynScope = dynScope}; + return cfgAddItem(pCfg, &item, name); +} + + int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope, int8_t dynScope) { if (defaultVal < minval || defaultVal > maxval) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 67bdffa01c..bc293178ec 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -509,12 +509,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_WINDOW_CONDITION, "The time pseudo colum TAOS_DEFINE_ERROR(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, "Executor internal error") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_JOIN_CONDITION, "Not supported join on condition") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_FILTER_NOT_SUPPORT_TYPE, "Not supported range type") -TAOS_DEFINE_ERROR(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD, "Query memory upper limit is reached") -TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED, "Query memory exhausted") -TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM, "Too few available memory for query") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_FILTER_WRONG_OPTR_TYPE, "Wrong operator type") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_FILTER_RANGE_ERROR, "Wrong filter range") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_FILTER_INVALID_TYPE, "Invalid filter type") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD, "Query memory upper limit is reached") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED, "Query memory exhausted") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM, "Too few available memory for query") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NO_FETCH_TIMEOUT, "Timeout for long time no fetch") // grant TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired") diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 19585435c5..18fa2f2be5 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -1051,7 +1051,7 @@ void* mpMgmtThreadFunc(void* param) { while (0 == atomic_load_8(&gMPMgmt.modExit)) { mpUpdateSystemAvailableMemorySize(); - retireSize = pPool->cfg.reserveSize - tsCurrentAvailMemorySize; + retireSize = pPool->cfg.reserveSize - atomic_load_64(&tsCurrentAvailMemorySize); if (retireSize > 0) { (*pPool->cfg.cb.failFp)(retireSize, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); @@ -1723,8 +1723,10 @@ int32_t taosMemoryPoolInit(mpReserveFailFp failFp, mpReserveReachFp reachFp) { uInfo("total memory size: %" PRId64 "KB", tsTotalMemoryKB); - tsMinReservedMemorySize = TMAX(MIN_RESERVE_MEM_SIZE, tsTotalMemoryKB / 1024 * MP_DEFAULT_RESERVE_MEM_PERCENT / 100); - + if (0 == tsMinReservedMemorySize) { + tsMinReservedMemorySize = TMAX(MIN_RESERVE_MEM_SIZE, tsTotalMemoryKB / 1024 * MP_DEFAULT_RESERVE_MEM_PERCENT / 100); + } + SMemPoolCfg cfg = {0}; int64_t sysAvailSize = 0; diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index fb8e597163..0f9e2bbb3e 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -347,7 +347,7 @@ static SPageInfo* getPageInfoFromPayload(void* page) { return ppi; } -int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id, +int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int64_t inMemBufSize, const char* id, const char* dir) { *pBuf = NULL; SDiskbasedBuf* pPBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf)); diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index ec33a61d40..86b14f12e3 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -454,10 +454,6 @@ void mptInit() { memset(mptCtx.pSrcString, 'P', mptCtrl.maxSingleAllocSize - 1); mptCtx.pSrcString[mptCtrl.maxSingleAllocSize - 1] = 0; - void* p = taosMemMalloc(1048576UL * 20000); - - mptWriteMem(p, (1048576UL * 20000)); - } void mptDestroySession(uint64_t qId, int64_t tId, int32_t eId, int32_t taskIdx, SMPTestJobCtx* pJobCtx, void* session) { @@ -1576,7 +1572,7 @@ TEST(PerfTest, GetSysAvail) { memset(p, 0, msize); int64_t totalUs = taosGetTimestampUs() - st; printf("memset %" PRId64 " used time:%" PRId64 "us, speed:%dMB/ms\n", msize, totalUs, msize/1048576UL/(totalUs/1000UL)); - + int64_t freeSize = 0; int32_t loopTimes = 1000000; st = taosGetTimestampUs(); @@ -1584,7 +1580,7 @@ TEST(PerfTest, GetSysAvail) { for (int32_t i = 0; i < loopTimes; ++i) { code = taosGetSysAvailMemory(&freeSize); assert(0 == code); - //taosMsleep(10); + //taosMsleep(1); } totalUs = taosGetTimestampUs() - st; @@ -1592,6 +1588,53 @@ TEST(PerfTest, GetSysAvail) { } #endif +#if 1 +TEST(MiscTest, monSysAvailSize) { + char* caseName = "MiscTest:monSysAvailSize"; + int32_t code = 0; + + int64_t freeSize = 0; + int32_t loopTimes = 1000000000; + for (int32_t i = 0; i < loopTimes; ++i) { + code = taosGetSysAvailMemory(&freeSize); + assert(0 == code); + printf(" %" PRId64, freeSize); + if (i && 0 == (i % 10)) { + struct tm Tm, *ptm; + struct timeval timeSecs; + + TAOS_UNUSED(taosGetTimeOfDay(&timeSecs)); + time_t curTime = timeSecs.tv_sec; + ptm = taosLocalTime(&curTime, &Tm, NULL, 0); + + printf("- %02d/%02d %02d:%02d:%02d.%06d \n", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec); + } + taosMsleep(1); + } +} +#endif + + +#if 0 +TEST(MiscTest, simNonPoolAct) { + char* caseName = "MiscTest:simNonPoolAct"; + int64_t msize = 1048576UL*1024, asize = 0; + int32_t loopTimes = 1000000; + + for (int32_t i = 0; i < loopTimes; ++i) { + asize = taosRand() % msize; + void* p = taosMemMalloc(asize); + mptWriteMem(p, asize); + + taosMsleep(100); + taosMemFree(p); + + printf("sim %dth alloc/free %" PRId64 " bytes\n", i, asize); + } +} +#endif + + #if 0 TEST(PerfTest, allocLatency) { char* caseName = "PerfTest:allocLatency"; diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 29fb52e124..bfa982d556 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -48,7 +48,8 @@ class TDSimClient: "telemetryReporting": "0", "tqDebugflag": "135", "stDebugflag":"135", - "safetyCheckLevel":"2" + "safetyCheckLevel":"2", + "minReservedMemorySize":"1024" } def getLogDir(self): @@ -889,4 +890,4 @@ class TDDnodes: def getAsan(self): return self.asan -tdDnodes = TDDnodes() \ No newline at end of file +tdDnodes = TDDnodes() diff --git a/tests/script/api/asyncQuery.c b/tests/script/api/asyncQuery.c index 23e4aced1d..ef1218570b 100644 --- a/tests/script/api/asyncQuery.c +++ b/tests/script/api/asyncQuery.c @@ -74,16 +74,16 @@ void selectCallback(void* param, TAOS_RES* res, int32_t code) { } static void verifyQueryAsync(TAOS* taos) { + taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL); + taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL); + taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL); + taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL); + taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL); + taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL); + taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL); + taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL); + taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL); taos_query_a(taos, "select twa(c1) from stb interval(10s);", selectCallback, NULL); - taos_query_a(taos, "select twa(c1) from stb interval(10s);", selectCallback, NULL); - taos_query_a(taos, "select twa(c1) from stb interval(10s);", selectCallback, NULL); - taos_query_a(taos, "select * from stb", selectCallback, NULL); - taos_query_a(taos, "select * from stb", selectCallback, NULL); - taos_query_a(taos, "select * from stb", selectCallback, NULL); - taos_query_a(taos, "select * from stb", selectCallback, NULL); - taos_query_a(taos, "select * from stb", selectCallback, NULL); - taos_query_a(taos, "select * from stb", selectCallback, NULL); - taos_query_a(taos, "select * from stb", selectCallback, NULL); } int main(int argc, char* argv[]) { @@ -99,7 +99,7 @@ int main(int argc, char* argv[]) { for (int64_t i = 0; i < 1000000000; ++i) { verifyQueryAsync(taos); printf("%llu queries launched, errTimes:%lld \n", i * 10, errTimes); - while ((i * 10 - atomic_load_64(&finQueries)) > 1000) { + while ((i * 10 - atomic_load_64(&finQueries)) > 100) { printf("left queries:%llu\n", (i * 10 - atomic_load_64(&finQueries))); taosMsleep(2000); } diff --git a/tests/script/test.sh b/tests/script/test.sh index b10865dd65..26c01a6c09 100755 --- a/tests/script/test.sh +++ b/tests/script/test.sh @@ -111,6 +111,7 @@ echo "wal 0" >> $TAOS_CFG echo "asyncLog 0" >> $TAOS_CFG echo "locale en_US.UTF-8" >> $TAOS_CFG echo "enableCoreFile 1" >> $TAOS_CFG +echo "minReservedMemorySize 1024" >> $TAOS_CFG echo " " >> $TAOS_CFG ulimit -n 600000