fix: scheduler dead lock issue

This commit is contained in:
dapan1121 2024-12-05 18:07:37 +08:00
parent e40daae3fc
commit 011ff04234
32 changed files with 266 additions and 83 deletions

View File

@ -79,6 +79,7 @@ extern int8_t tsMemPoolFullFunc;
//extern int32_t tsQueryBufferPoolSize;
extern int32_t tsMinReservedMemorySize;
extern int64_t tsCurrentAvailMemorySize;
extern int32_t tsQueryNoFetchTimeoutSec;
extern int32_t tsNumOfQueryThreads;
extern int32_t tsNumOfRpcThreads;
extern int32_t tsNumOfRpcSessions;

View File

@ -63,7 +63,7 @@ typedef struct SFuncExecFuncs {
processFuncByRow processFuncByRow;
} SFuncExecFuncs;
#define MAX_INTERVAL_TIME_WINDOW 10000000 // maximum allowed time windows in final results
#define MAX_INTERVAL_TIME_WINDOW 1000000000 // maximum allowed time windows in final results
#define TOP_BOTTOM_QUERY_LIMIT 100
#define FUNCTIONS_NAME_MAX_LENGTH 32

View File

@ -644,6 +644,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_QRY_REACH_QMEM_THRESHOLD TAOS_DEF_ERROR_CODE(0, 0x0738)
#define TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED TAOS_DEF_ERROR_CODE(0, 0x0739)
#define TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM TAOS_DEF_ERROR_CODE(0, 0x073A)
#define TSDB_CODE_QRY_NO_FETCH_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x073B)
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800)

View File

@ -118,6 +118,7 @@ void cfgUnLock(SConfig *pCfg);
// clang-format off
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope, int8_t dynScope);
int32_t cfgAddInt32Ex(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope, int8_t dynScope);
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope, int8_t dynScope);
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope, int8_t dynScope);
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float minval, float maxval, int8_t scope, int8_t dynScope);

View File

@ -50,7 +50,7 @@ typedef struct SDiskbasedBufStatis {
* @param handle
* @return
*/
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id,
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int64_t inMemBufSize, const char* id,
const char* dir);
/**

View File

@ -682,7 +682,7 @@ void doDestroyRequest(void *p) {
SRequestObj *pRequest = (SRequestObj *)p;
uint64_t reqId = pRequest->requestId;
tscDebug("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
tscDebug("begin to destroy request 0x%" PRIx64 " p:%p", reqId, pRequest);
int64_t nextReqRefId = pRequest->relation.nextRefId;

View File

@ -58,13 +58,14 @@ int8_t tsMemPoolFullFunc = 0;
int8_t tsQueryUseMemoryPool = 1;
int32_t tsQueryBufferPoolSize = 0; //MB
int32_t tsSingleQueryMaxMemorySize = 0; //MB
int32_t tsMinReservedMemorySize = MIN_RESERVE_MEM_SIZE; //MB
int32_t tsMinReservedMemorySize = 0; //MB
int64_t tsCurrentAvailMemorySize = 0;
// queue & threads
int32_t tsQueryMinConcurrentTaskNum = 1;
int32_t tsQueryMaxConcurrentTaskNum = 0;
int32_t tsQueryConcurrentTaskNum = 0;
int32_t tsQueryNoFetchTimeoutSec = 3600 * 5;
int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 30000;
@ -750,9 +751,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0);
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "memPoolFullFunc", tsMemPoolFullFunc, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "singleQueryMaxMemorySize", tsSingleQueryMaxMemorySize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "singleQueryMaxMemorySize", tsSingleQueryMaxMemorySize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0);
//TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryBufferPoolSize", tsQueryBufferPoolSize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "minReservedMemorySize", tsMinReservedMemorySize, 1024, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0);
TAOS_CHECK_RETURN(cfgAddInt32Ex(pCfg, "minReservedMemorySize", 0, 1024, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryNoFetchTimeoutSec", tsQueryNoFetchTimeoutSec, 60, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE));
@ -2089,7 +2091,8 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"experimental", &tsExperimental},
{"maxTsmaNum", &tsMaxTsmaNum},
{"safetyCheckLevel", &tsSafetyCheckLevel},
{"bypassFlag", &tsBypassFlag}};
{"bypassFlag", &tsBypassFlag},
{"queryNoFetchTimeoutSec", &tsQueryNoFetchTimeoutSec}};
if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false);

View File

@ -938,7 +938,7 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int3
struct SOperatorInfo* pOperator);
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, int64_t* defaultBufsz);
extern void doDestroyExchangeOperatorInfo(void* param);

View File

@ -573,7 +573,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
}
uint32_t defaultPgsz = 0;
uint32_t defaultBufsz = 0;
int64_t defaultBufsz = 0;
code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
if (code) {
qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize);

View File

@ -972,7 +972,7 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
}
}
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, int64_t* defaultBufsz) {
*defaultPgsz = 4096;
uint32_t last = *defaultPgsz;
while (*defaultPgsz < rowSize * 4) {
@ -986,7 +986,8 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
// The default buffer for each operator in query is 10MB.
// at least four pages need to be in buffer
// TODO: make this variable to be configurable.
*defaultBufsz = 4096 * 2560;
// *defaultBufsz = 4096 * 2560;
*defaultBufsz = 4096UL * 2560000;
if ((*defaultBufsz) <= (*defaultPgsz)) {
(*defaultBufsz) = (*defaultPgsz) * 4;
if (*defaultBufsz < ((int64_t)(*defaultPgsz)) * 4) {

View File

@ -1161,7 +1161,7 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
}
uint32_t defaultPgsz = 0;
uint32_t defaultBufsz = 0;
int64_t defaultBufsz = 0;
pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);

View File

@ -47,6 +47,8 @@ extern "C" {
#define QW_RETIRE_JOB_BATCH_NUM 5
#define QW_DEFAULT_TIMEOUT_INTERVAL_SECS 3600
enum {
QW_CONC_TASK_LEVEL_LOW = 1,
QW_CONC_TASK_LEVEL_MIDDLE,
@ -186,6 +188,8 @@ typedef struct SQWTaskCtx {
void *sinkHandle;
SArray *tbInfo; // STbVerInfo
int64_t lastAckTs;
void *memPoolSession;
SQWJobInfo *pJobInfo;
} SQWTaskCtx;
@ -243,7 +247,8 @@ typedef struct SQWorker {
SQWStat stat;
int32_t *destroyed;
int8_t nodeStopped;
int8_t nodeStopped;
int32_t lastChkTs;
} SQWorker;
typedef struct SQWorkerMgmt {
@ -529,6 +534,7 @@ int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t clientId, int32_t rwType, SQ
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
void qwDbgDumpJobsInfo(void);
void qwDbgDumpMgmtInfo(SQWorker *mgmt);
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore, bool dynamicTask);
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet);
@ -540,6 +546,8 @@ int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
int32_t qwInitQueryPool(void);
void qwDestroyJobInfo(void* job);
bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool forceStop, int32_t errCode);
void qwStopAllTasks(SQWorker *mgmt);
void qwChkDropTimeoutQuery(SQWorker *mgmt, int32_t currTs);
bool qwRetireJob(SQWJobInfo* pJob);
void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session);
int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession);

View File

@ -36,7 +36,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes);
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
int32_t qwBuildAndSendFetchRsp(SQWTaskCtx *ctx, int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, int32_t rawDataLen, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);

View File

@ -168,6 +168,36 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
qwDbgDumpTasksInfo(mgmt);
}
void qwDbgDumpJobsInfo(void) {
if (!gQWDebug.dumpEnable) {
return;
}
qDebug("total remain job num %d, task initNum:%" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64,
taosHashGetSize(gQueryMgmt.pJobInfo), atomic_load_64(&gQueryMgmt.stat.taskInitNum),
atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum));
size_t keyLen = 0;
char* id = NULL;
int32_t jobIdx = 0;
SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL);
while (NULL != pJob) {
qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " the %dth remain job", pJob->memInfo->jobId, pJob->memInfo->clientId, jobIdx++);
int32_t sessionIdx = 0;
SQWSessionInfo* pSession = (SQWSessionInfo*)taosHashIterate(pJob->pSessions, NULL);
while (NULL != pSession) {
qDebug("QID:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d the %dth remain session",
pSession->qId, pSession->sId, pSession->cId, pSession->tId, pSession->eId, sessionIdx++);
pSession = (SQWSessionInfo*)taosHashIterate(pJob->pSessions, pSession);
}
pJob = (SQWJobInfo *)taosHashIterate(gQueryMgmt.pJobInfo, pJob);
}
}
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) {
int32_t contLen = 0;
char *rsp = NULL;

View File

@ -85,9 +85,11 @@ void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session) {
int32_t remainSessions = atomic_sub_fetch_32(&pJobInfo->memInfo->remainSession, 1);
QW_TASK_DLOG("task session destoryed, remainSessions:%d", remainSessions);
if (0 == remainSessions) {
QW_LOCK(QW_WRITE, &pJobInfo->lock);
if (0 == taosHashGetSize(pJobInfo->pSessions) && 0 == atomic_load_32(&pJobInfo->memInfo->remainSession)) {
if (/*0 == taosHashGetSize(pJobInfo->pSessions) && */0 == atomic_load_32(&pJobInfo->memInfo->remainSession)) {
atomic_store_8(&pJobInfo->destroyed, 1);
QW_UNLOCK(QW_WRITE, &pJobInfo->lock);
@ -96,6 +98,7 @@ void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session) {
(void)taosHashRemove(gQueryMgmt.pJobInfo, id2, sizeof(id2));
QW_TASK_DLOG_E("the whole query job removed");
} else {
QW_TASK_DLOG("job not removed, remainSessions:%d, %d", taosHashGetSize(pJobInfo->pSessions), pJobInfo->memInfo->remainSession);
QW_UNLOCK(QW_WRITE, &pJobInfo->lock);
}
}

View File

@ -162,7 +162,7 @@ int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
int32_t qwBuildAndSendFetchRsp(SQWTaskCtx *ctx, int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
int32_t code) {
if (NULL == pRsp) {
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
@ -184,6 +184,10 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
rpcRsp.info.compressed = pRsp->compressed;
tmsgSendRsp(&rpcRsp);
if (NULL != ctx) {
ctx->lastAckTs = taosGetTimestampSec();
}
return TSDB_CODE_SUCCESS;
}

View File

@ -317,7 +317,7 @@ int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) {
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
if (ctx->ctrlConnInfo.handle) {
tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER, 0);
tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER, ctx->rspCode);
}
ctx->ctrlConnInfo.handle = NULL;
@ -395,6 +395,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
QW_SET_QTID(id, qId, cId, tId, eId);
SQWTaskCtx octx;
int32_t code = TSDB_CODE_SUCCESS;
SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == ctx) {
@ -404,14 +405,20 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
octx = *ctx;
if (ctx->pJobInfo && TSDB_CODE_SUCCESS != ctx->pJobInfo->errCode) {
QW_UPDATE_RSP_CODE(ctx, ctx->pJobInfo->errCode);
}
atomic_store_ptr(&ctx->taskHandle, NULL);
atomic_store_ptr(&ctx->sinkHandle, NULL);
atomic_store_ptr(&ctx->pJobInfo, NULL);
atomic_store_ptr(&ctx->memPoolSession, NULL);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
code = QW_CTX_NOT_EXISTS_ERR_CODE(mgmt);
}
qwFreeTaskCtx(QW_FPARAMS(), &octx);
@ -419,7 +426,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
QW_TASK_DLOG_E("task ctx dropped");
return TSDB_CODE_SUCCESS;
return code;
}
int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
@ -747,7 +754,7 @@ bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool forceStop, int32_t errC
QW_LOCK(QW_WRITE, &ctx->lock);
QW_TASK_DLOG("start to stop task, forceStop:%d", forceStop);
QW_TASK_DLOG("start to stop task, forceStop:%d, error:%s", forceStop, tstrerror(errCode));
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG_E("task already dropping");
@ -830,3 +837,49 @@ bool qwRetireJob(SQWJobInfo *pJob) {
return retired;
}
void qwStopAllTasks(SQWorker *mgmt) {
uint64_t qId, cId, tId, sId;
int32_t eId;
int64_t rId = 0;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, cId, tId, eId);
sId = ctx->sId;
(void)qwStopTask(QW_FPARAMS(), ctx, true, TSDB_CODE_VND_STOPPED);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
}
void qwChkDropTimeoutQuery(SQWorker *mgmt, int32_t currTs) {
uint64_t qId, cId, tId, sId;
int32_t eId;
int64_t rId = 0;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
if ((ctx->lastAckTs <= 0) || (currTs - ctx->lastAckTs) < tsQueryNoFetchTimeoutSec) {
pIter = taosHashIterate(mgmt->ctxHash, pIter);
continue;
}
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, cId, tId, eId);
sId = ctx->sId;
(void)qwStopTask(QW_FPARAMS(), ctx, true, TSDB_CODE_QRY_NO_FETCH_TIMEOUT);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
}

View File

@ -21,24 +21,6 @@ SQWorkerMgmt gQwMgmt = {
TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT;
SQueryMgmt gQueryMgmt = {0};
void qwStopAllTasks(SQWorker *mgmt) {
uint64_t qId, cId, tId, sId;
int32_t eId;
int64_t rId = 0;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, cId, tId, eId);
sId = ctx->sId;
(void)qwStopTask(QW_FPARAMS(), ctx, true, TSDB_CODE_VND_STOPPED);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
}
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0;
@ -102,6 +84,7 @@ int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, rspCode, tstrerror(rspCode));
}
ctx->lastAckTs = taosGetTimestampSec();
ctx->queryRsped = true;
}
@ -502,7 +485,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32
qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
QW_ERR_RET(qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code));
QW_ERR_RET(qwBuildAndSendFetchRsp(ctx, ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code));
rsp = NULL;
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
@ -899,7 +882,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
QW_ERR_JRET(qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code));
QW_ERR_JRET(qwBuildAndSendFetchRsp(ctx, ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code));
rsp = NULL;
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
@ -922,7 +905,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwMsg->connInfo = ctx->dataConnInfo;
code = qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code);
code = qwBuildAndSendFetchRsp(ctx, ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("fetch rsp send fail, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
tstrerror(code), 0);
@ -1031,7 +1014,7 @@ _return:
}
if (!rsped) {
code = qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
code = qwBuildAndSendFetchRsp(ctx, qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("fetch rsp send fail, msgType:%s, handle:%p, code:%x - %s, dataLen:%d",
TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
@ -1088,7 +1071,7 @@ _return:
(void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL,
ctx->dynamicTask); // task already failed, no more error handling
} else {
tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER, 0);
tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER, code);
}
}
@ -1220,13 +1203,20 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
SQWHbInfo *rspList = NULL;
SArray *pExpiredSch = NULL;
int32_t code = 0;
int32_t currTs = taosGetTimestampSec();
qwDbgDumpMgmtInfo(mgmt);
qwDbgDumpJobsInfo();
if (gQWDebug.forceStop) {
qwStopAllTasks(mgmt);
}
if (mgmt->lastChkTs > 0 && (currTs - mgmt->lastChkTs) >= QW_DEFAULT_TIMEOUT_INTERVAL_SECS) {
qwChkDropTimeoutQuery(mgmt, currTs);
mgmt->lastChkTs = currTs;
}
QW_LOCK(QW_READ, &mgmt->schLock);
int32_t schNum = taosHashGetSize(mgmt->schHash);
@ -1434,6 +1424,8 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
QW_ERR_JRET(qExecutorInit());
mgmt->lastChkTs = taosGetTimestampSec();
*qWorkerMgmt = mgmt;
qDebug("qworker initialized, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);
@ -1671,8 +1663,7 @@ void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) {
continue;
}
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) &&
0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
bool retired = qwRetireJob(pJob);
@ -1689,6 +1680,8 @@ void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) {
pJob = (SQWJobInfo *)taosHashIterate(gQueryMgmt.pJobInfo, pJob);
}
taosHashCancelIterate(gQueryMgmt.pJobInfo, pJob);
qDebug("job retire in batch done, [prev:%d, curr:%d, total:%d] jobs, direct retiredSize:%" PRId64 " targetRetireSize:%" PRId64
", task initNum:%" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64,
alreadyJobNum, jobNum, taosHashGetSize(gQueryMgmt.pJobInfo), retiredSize, retireSize,

View File

@ -218,6 +218,7 @@ typedef struct SSchRedirectCtx {
} SSchRedirectCtx;
typedef struct SSchTimerParam {
int8_t exit;
int64_t rId;
uint64_t queryId;
uint64_t taskId;

View File

@ -1047,7 +1047,10 @@ int32_t schResetJobForRetry(SSchJob *pJob, SSchTask *pTask, int32_t rspCode, boo
SCH_UNLOCK_TASK(pTask);
SCH_RET(code);
}
SCH_LOCK(SCH_WRITE, &pTask->planLock);
qClearSubplanExecutionNode(pTask->plan);
SCH_UNLOCK(SCH_WRITE, &pTask->planLock);
schResetTaskForRetry(pJob, pTask);
SCH_UNLOCK_TASK(pTask);
}

View File

@ -426,10 +426,11 @@ _return:
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
pTask->waitRetry = true;
schDropTaskOnExecNode(pJob, pTask);
if (pTask->delayTimer) {
schStopTaskDelayTimer(pJob, pTask, false);
taosTmrStop(pTask->delayTimer);
}
schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes);
(void)schRemoveTaskFromExecList(pJob, pTask); // ignore error
schDeregisterTaskHb(pJob, pTask);
@ -746,6 +747,10 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
(void)atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
if (pTask->delayTimer) {
taosTmrStop(pTask->delayTimer);
}
(void)schRemoveTaskFromExecList(pJob, pTask); // ignore error
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
@ -1291,6 +1296,8 @@ void schHandleTimerEvent(void *param, void *tmrId) {
SSchJob *pJob = NULL;
int32_t code = 0;
qDebug("delayTimer %" PRIuPTR " is launched", (uintptr_t)tmrId);
int64_t rId = pTimerParam->rId;
uint64_t queryId = pTimerParam->queryId;
uint64_t taskId = pTimerParam->taskId;
@ -1299,7 +1306,11 @@ void schHandleTimerEvent(void *param, void *tmrId) {
return;
}
code = schLaunchTask(pJob, pTask);
if (0 == atomic_load_8(&pTask->delayLaunchPar.exit)) {
code = schLaunchTask(pJob, pTask);
} else {
SCH_TASK_DLOG("task will not be launched since query job exiting, status: %d", pTask->status);
}
schProcessOnCbEnd(pJob, pTask, code);
}
@ -1310,10 +1321,8 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
pTask->delayLaunchPar.queryId = pJob->queryId;
pTask->delayLaunchPar.taskId = pTask->taskId;
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
}
SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
if (NULL == pTask->delayTimer) {
pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer);
@ -1322,6 +1331,8 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("task delayTimer %" PRIuPTR " is started", (uintptr_t)pTask->delayTimer);
return TSDB_CODE_SUCCESS;
}
@ -1357,10 +1368,11 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
while (pIter) {
SSchTask *pTask = *(SSchTask **)pIter;
SCH_LOCK_TASK(pTask);
if (pTask->delayTimer) {
schStopTaskDelayTimer(pJob, pTask, true);
}
SCH_LOCK_TASK(pTask);
schDropTaskOnExecNode(pJob, pTask);
SCH_UNLOCK_TASK(pTask);

View File

@ -424,13 +424,22 @@ int32_t schValidateSubplan(SSchJob *pJob, SSubplan *pSubplan, int32_t level, int
}
void schStopTaskDelayTimer(SSchJob *pJob, SSchTask *pTask, bool syncOp) {
SCH_TASK_DLOG("try to stop task delayTimer %" PRIuPTR, (uintptr_t)pTask->delayTimer);
tmr_h delayTimer = pTask->delayTimer;
atomic_store_8(&pTask->delayLaunchPar.exit, 1);
if (!taosTmrStopA(&pTask->delayTimer)) {
SCH_TASK_DLOG("task delayTimer %" PRIuPTR " not stopped", (uintptr_t)delayTimer);
if (syncOp) {
while (!taosTmrIsStopped(&pTask->delayTimer)) {
while (!taosTmrIsStopped(&delayTimer)) {
taosMsleep(1);
}
SCH_TASK_DLOG("task delayTimer %" PRIuPTR " is stopped", (uintptr_t)delayTimer);
} else {
SCH_TASK_WLOG("stop task delayTimer failed, may stopped, status:%d", pTask->status);
SCH_TASK_WLOG("stop task delayTimer %" PRIuPTR " failed, may stopped, status:%d", (uintptr_t)delayTimer, pTask->status);
}
}
}

View File

@ -1293,8 +1293,8 @@ static void cliHandleException(SCliConn* conn) {
if (conn->registered) {
int8_t ref = transGetRefCount(conn);
if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) {
tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd,
conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds);
// tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd,
// conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds);
uv_close((uv_handle_t*)conn->stream, cliDestroy);
}
}

View File

@ -44,7 +44,7 @@ extern "C" {
#define MP_RETIRE_UNIT_MIN_SIZE (50 * 1048576UL)
#define MP_CFG_UPDATE_MIN_RESERVE_SIZE (50 * 1024 * 1048576UL)
#define MP_DEFAULT_RESERVE_MEM_PERCENT 1 // TODO 20 !!!!!!!!!!!!!!!!!!!!!
#define MP_DEFAULT_RESERVE_MEM_PERCENT 20
#define MP_MIN_FREE_SIZE_AFTER_RESERVE (4 * 1024 * 1048576UL)
#define MP_MIN_MEM_POOL_SIZE (5 * 1024 * 1048576UL)

View File

@ -538,6 +538,18 @@ int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scop
return cfgAddItem(pCfg, &item, name);
}
int32_t cfgAddInt32Ex(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope,
int8_t dynScope) {
SConfigItem item = {.dtype = CFG_DTYPE_INT32,
.i32 = defaultVal,
.imin = minval,
.imax = maxval,
.scope = scope,
.dynScope = dynScope};
return cfgAddItem(pCfg, &item, name);
}
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope,
int8_t dynScope) {
if (defaultVal < minval || defaultVal > maxval) {

View File

@ -509,12 +509,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_WINDOW_CONDITION, "The time pseudo colum
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, "Executor internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_JOIN_CONDITION, "Not supported join on condition")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_FILTER_NOT_SUPPORT_TYPE, "Not supported range type")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD, "Query memory upper limit is reached")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED, "Query memory exhausted")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM, "Too few available memory for query")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_FILTER_WRONG_OPTR_TYPE, "Wrong operator type")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_FILTER_RANGE_ERROR, "Wrong filter range")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_FILTER_INVALID_TYPE, "Invalid filter type")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD, "Query memory upper limit is reached")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED, "Query memory exhausted")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM, "Too few available memory for query")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NO_FETCH_TIMEOUT, "Timeout for long time no fetch")
// grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")

View File

@ -1051,7 +1051,7 @@ void* mpMgmtThreadFunc(void* param) {
while (0 == atomic_load_8(&gMPMgmt.modExit)) {
mpUpdateSystemAvailableMemorySize();
retireSize = pPool->cfg.reserveSize - tsCurrentAvailMemorySize;
retireSize = pPool->cfg.reserveSize - atomic_load_64(&tsCurrentAvailMemorySize);
if (retireSize > 0) {
(*pPool->cfg.cb.failFp)(retireSize, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
@ -1723,8 +1723,10 @@ int32_t taosMemoryPoolInit(mpReserveFailFp failFp, mpReserveReachFp reachFp) {
uInfo("total memory size: %" PRId64 "KB", tsTotalMemoryKB);
tsMinReservedMemorySize = TMAX(MIN_RESERVE_MEM_SIZE, tsTotalMemoryKB / 1024 * MP_DEFAULT_RESERVE_MEM_PERCENT / 100);
if (0 == tsMinReservedMemorySize) {
tsMinReservedMemorySize = TMAX(MIN_RESERVE_MEM_SIZE, tsTotalMemoryKB / 1024 * MP_DEFAULT_RESERVE_MEM_PERCENT / 100);
}
SMemPoolCfg cfg = {0};
int64_t sysAvailSize = 0;

View File

@ -347,7 +347,7 @@ static SPageInfo* getPageInfoFromPayload(void* page) {
return ppi;
}
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id,
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int64_t inMemBufSize, const char* id,
const char* dir) {
*pBuf = NULL;
SDiskbasedBuf* pPBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf));

View File

@ -454,10 +454,6 @@ void mptInit() {
memset(mptCtx.pSrcString, 'P', mptCtrl.maxSingleAllocSize - 1);
mptCtx.pSrcString[mptCtrl.maxSingleAllocSize - 1] = 0;
void* p = taosMemMalloc(1048576UL * 20000);
mptWriteMem(p, (1048576UL * 20000));
}
void mptDestroySession(uint64_t qId, int64_t tId, int32_t eId, int32_t taskIdx, SMPTestJobCtx* pJobCtx, void* session) {
@ -1576,7 +1572,7 @@ TEST(PerfTest, GetSysAvail) {
memset(p, 0, msize);
int64_t totalUs = taosGetTimestampUs() - st;
printf("memset %" PRId64 " used time:%" PRId64 "us, speed:%dMB/ms\n", msize, totalUs, msize/1048576UL/(totalUs/1000UL));
int64_t freeSize = 0;
int32_t loopTimes = 1000000;
st = taosGetTimestampUs();
@ -1584,7 +1580,7 @@ TEST(PerfTest, GetSysAvail) {
for (int32_t i = 0; i < loopTimes; ++i) {
code = taosGetSysAvailMemory(&freeSize);
assert(0 == code);
//taosMsleep(10);
//taosMsleep(1);
}
totalUs = taosGetTimestampUs() - st;
@ -1592,6 +1588,53 @@ TEST(PerfTest, GetSysAvail) {
}
#endif
#if 1
TEST(MiscTest, monSysAvailSize) {
char* caseName = "MiscTest:monSysAvailSize";
int32_t code = 0;
int64_t freeSize = 0;
int32_t loopTimes = 1000000000;
for (int32_t i = 0; i < loopTimes; ++i) {
code = taosGetSysAvailMemory(&freeSize);
assert(0 == code);
printf(" %" PRId64, freeSize);
if (i && 0 == (i % 10)) {
struct tm Tm, *ptm;
struct timeval timeSecs;
TAOS_UNUSED(taosGetTimeOfDay(&timeSecs));
time_t curTime = timeSecs.tv_sec;
ptm = taosLocalTime(&curTime, &Tm, NULL, 0);
printf("- %02d/%02d %02d:%02d:%02d.%06d \n", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec);
}
taosMsleep(1);
}
}
#endif
#if 0
TEST(MiscTest, simNonPoolAct) {
char* caseName = "MiscTest:simNonPoolAct";
int64_t msize = 1048576UL*1024, asize = 0;
int32_t loopTimes = 1000000;
for (int32_t i = 0; i < loopTimes; ++i) {
asize = taosRand() % msize;
void* p = taosMemMalloc(asize);
mptWriteMem(p, asize);
taosMsleep(100);
taosMemFree(p);
printf("sim %dth alloc/free %" PRId64 " bytes\n", i, asize);
}
}
#endif
#if 0
TEST(PerfTest, allocLatency) {
char* caseName = "PerfTest:allocLatency";

View File

@ -48,7 +48,8 @@ class TDSimClient:
"telemetryReporting": "0",
"tqDebugflag": "135",
"stDebugflag":"135",
"safetyCheckLevel":"2"
"safetyCheckLevel":"2",
"minReservedMemorySize":"1024"
}
def getLogDir(self):
@ -889,4 +890,4 @@ class TDDnodes:
def getAsan(self):
return self.asan
tdDnodes = TDDnodes()
tdDnodes = TDDnodes()

View File

@ -74,16 +74,16 @@ void selectCallback(void* param, TAOS_RES* res, int32_t code) {
}
static void verifyQueryAsync(TAOS* taos) {
taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL);
taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL);
taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL);
taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL);
taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL);
taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL);
taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL);
taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL);
taos_query_a(taos, "select * from (select cast(count(*) as binary(100)) a, rand() b from tbx where ts >= '2023-01-01' and ts <= '2024-12-01' interval(1s) fill(value, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')) t1 group by t1.a, t1.b;;", selectCallback, NULL);
taos_query_a(taos, "select twa(c1) from stb interval(10s);", selectCallback, NULL);
taos_query_a(taos, "select twa(c1) from stb interval(10s);", selectCallback, NULL);
taos_query_a(taos, "select twa(c1) from stb interval(10s);", selectCallback, NULL);
taos_query_a(taos, "select * from stb", selectCallback, NULL);
taos_query_a(taos, "select * from stb", selectCallback, NULL);
taos_query_a(taos, "select * from stb", selectCallback, NULL);
taos_query_a(taos, "select * from stb", selectCallback, NULL);
taos_query_a(taos, "select * from stb", selectCallback, NULL);
taos_query_a(taos, "select * from stb", selectCallback, NULL);
taos_query_a(taos, "select * from stb", selectCallback, NULL);
}
int main(int argc, char* argv[]) {
@ -99,7 +99,7 @@ int main(int argc, char* argv[]) {
for (int64_t i = 0; i < 1000000000; ++i) {
verifyQueryAsync(taos);
printf("%llu queries launched, errTimes:%lld \n", i * 10, errTimes);
while ((i * 10 - atomic_load_64(&finQueries)) > 1000) {
while ((i * 10 - atomic_load_64(&finQueries)) > 100) {
printf("left queries:%llu\n", (i * 10 - atomic_load_64(&finQueries)));
taosMsleep(2000);
}

View File

@ -111,6 +111,7 @@ echo "wal 0" >> $TAOS_CFG
echo "asyncLog 0" >> $TAOS_CFG
echo "locale en_US.UTF-8" >> $TAOS_CFG
echo "enableCoreFile 1" >> $TAOS_CFG
echo "minReservedMemorySize 1024" >> $TAOS_CFG
echo " " >> $TAOS_CFG
ulimit -n 600000