[td-1739]
This commit is contained in:
parent
36ae97a0e5
commit
5b23931687
|
@ -44,14 +44,17 @@ extern int32_t tsMaxShellConns;
|
|||
extern int32_t tsShellActivityTimer;
|
||||
extern uint32_t tsMaxTmrCtrl;
|
||||
extern float tsNumOfThreadsPerCore;
|
||||
extern float tsRatioOfQueryThreads;
|
||||
extern float tsRatioOfQueryThreads; // todo remove it
|
||||
extern int8_t tsDaylight;
|
||||
extern char tsTimezone[];
|
||||
extern char tsLocale[];
|
||||
extern char tsCharset[]; // default encode string
|
||||
extern char tsCharset[]; // default encode string
|
||||
extern int32_t tsEnableCoreFile;
|
||||
extern int32_t tsCompressMsgSize;
|
||||
|
||||
//query buffer management
|
||||
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer for each data node during query processing
|
||||
|
||||
// client
|
||||
extern int32_t tsTableMetaKeepTimer;
|
||||
extern int32_t tsMaxSQLStringLen;
|
||||
|
|
|
@ -45,14 +45,14 @@ int32_t tsEnableTelemetryReporting = 1;
|
|||
char tsEmail[TSDB_FQDN_LEN] = {0};
|
||||
|
||||
// common
|
||||
int32_t tsRpcTimer = 1000;
|
||||
int32_t tsRpcMaxTime = 600; // seconds;
|
||||
int32_t tsMaxShellConns = 5000;
|
||||
int32_t tsRpcTimer = 1000;
|
||||
int32_t tsRpcMaxTime = 600; // seconds;
|
||||
int32_t tsMaxShellConns = 5000;
|
||||
int32_t tsMaxConnections = 5000;
|
||||
int32_t tsShellActivityTimer = 3; // second
|
||||
float tsNumOfThreadsPerCore = 1.0;
|
||||
float tsRatioOfQueryThreads = 0.5;
|
||||
int8_t tsDaylight = 0;
|
||||
int32_t tsShellActivityTimer = 3; // second
|
||||
float tsNumOfThreadsPerCore = 1.0f;
|
||||
float tsRatioOfQueryThreads = 0.5f;
|
||||
int8_t tsDaylight = 0;
|
||||
char tsTimezone[TSDB_TIMEZONE_LEN] = {0};
|
||||
char tsLocale[TSDB_LOCALE_LEN] = {0};
|
||||
char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string
|
||||
|
@ -99,6 +99,12 @@ float tsStreamComputDelayRatio = 0.1f;
|
|||
int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once
|
||||
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
|
||||
|
||||
// the maximum allowed query buffer size during query processing for each data node.
|
||||
// -1 no limit (default)
|
||||
// 0 no query allowed, queries are disabled
|
||||
// positive value (in MB)
|
||||
int32_t tsQueryBufferSize = -1;
|
||||
|
||||
// db parameters
|
||||
int32_t tsCacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
|
||||
int32_t tsBlocksPerVnode = TSDB_DEFAULT_TOTAL_BLOCKS;
|
||||
|
@ -676,7 +682,7 @@ static void doInitGlobalConfig(void) {
|
|||
cfg.minValue = TSDB_MIN_CACHE_BLOCK_SIZE;
|
||||
cfg.maxValue = TSDB_MAX_CACHE_BLOCK_SIZE;
|
||||
cfg.ptrLength = 0;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_Mb;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_MB;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "blocks";
|
||||
|
@ -839,6 +845,16 @@ static void doInitGlobalConfig(void) {
|
|||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "queryBufferSize";
|
||||
cfg.ptr = &tsQueryBufferSize;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
|
||||
cfg.minValue = -1;
|
||||
cfg.maxValue = 10000000000000; //10TB
|
||||
cfg.ptrLength = 0;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
// locale & charset
|
||||
cfg.option = "timezone";
|
||||
cfg.ptr = tsTimezone;
|
||||
|
|
|
@ -78,7 +78,6 @@ int32_t qKillQuery(qinfo_t qinfo);
|
|||
|
||||
int32_t qQueryCompleted(qinfo_t qinfo);
|
||||
|
||||
|
||||
/**
|
||||
* destroy query info structure
|
||||
* @param qHandle
|
||||
|
|
|
@ -230,6 +230,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_READY, 0, 0x0707, "Query not
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_HAS_RSP, 0, 0x0708, "Query should response")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_IN_EXEC, 0, 0x0709, "Multiple retrieval of this query")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW, 0, 0x070A, "Too many time window in query")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, 0, 0x070B, "Query buffer limit has reached")
|
||||
|
||||
// grant
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "License expired")
|
||||
|
|
|
@ -194,6 +194,8 @@ static void buildTagQueryResult(SQInfo *pQInfo);
|
|||
|
||||
static int32_t setAdditionalInfo(SQInfo *pQInfo, void *pTable, STableQueryInfo *pTableQueryInfo);
|
||||
static int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo);
|
||||
static int32_t checkForQueryBuf(int32_t numOfTables);
|
||||
static void releaseQueryBuf(int32_t numOfTables);
|
||||
|
||||
bool doFilterData(SQuery *pQuery, int32_t elemPos) {
|
||||
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
|
||||
|
@ -6492,6 +6494,8 @@ static void freeQInfo(SQInfo *pQInfo) {
|
|||
|
||||
qDebug("QInfo:%p start to free QInfo", pQInfo);
|
||||
|
||||
releaseQueryBuf(pQInfo->tableqinfoGroupInfo.numOfTables);
|
||||
|
||||
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
|
||||
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
|
@ -6726,6 +6730,11 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
|
|||
assert(0);
|
||||
}
|
||||
|
||||
code = checkForQueryBuf(tableGroupInfo.numOfTables);
|
||||
if (code != TSDB_CODE_SUCCESS) { // not enough query buffer, abort
|
||||
goto _over;
|
||||
}
|
||||
|
||||
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &tableGroupInfo, pTagColumnInfo, isSTableQuery);
|
||||
pExprs = NULL;
|
||||
pGroupbyExpr = NULL;
|
||||
|
@ -7127,6 +7136,48 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
|||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
}
|
||||
|
||||
static int64_t getQuerySupportBufSize(int32_t numOfTables) {
|
||||
size_t s1 = sizeof(STableQueryInfo);
|
||||
size_t s2 = sizeof(SHashNode);
|
||||
|
||||
// size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb
|
||||
return (s1 + s2) * 1.5 * numOfTables;
|
||||
}
|
||||
|
||||
int32_t checkForQueryBuf(int32_t numOfTables) {
|
||||
int64_t t = getQuerySupportBufSize(numOfTables);
|
||||
if (tsQueryBufferSize < 0) {
|
||||
return true;
|
||||
} else if (tsQueryBufferSize > 0) {
|
||||
|
||||
while(1) {
|
||||
int64_t s = tsQueryBufferSize;
|
||||
int64_t remain = s - t;
|
||||
if (remain >= 0) {
|
||||
if (atomic_val_compare_exchange_64(&tsQueryBufferSize, s, remain) == s) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// disable query processing if the value of tsQueryBufferSize is zero.
|
||||
return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
|
||||
}
|
||||
|
||||
void releaseQueryBuf(int32_t numOfTables) {
|
||||
if (tsQueryBufferSize <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t t = getQuerySupportBufSize(numOfTables);
|
||||
|
||||
// restore value is not enough buffer available
|
||||
atomic_add_fetch_64(&tsQueryBufferSize, t);
|
||||
}
|
||||
|
||||
void* qGetResultRetrieveMsg(qinfo_t qinfo) {
|
||||
SQInfo* pQInfo = (SQInfo*) qinfo;
|
||||
assert(pQInfo != NULL);
|
||||
|
|
|
@ -53,7 +53,7 @@ enum {
|
|||
TAOS_CFG_UTYPE_NONE,
|
||||
TAOS_CFG_UTYPE_PERCENT,
|
||||
TAOS_CFG_UTYPE_GB,
|
||||
TAOS_CFG_UTYPE_Mb,
|
||||
TAOS_CFG_UTYPE_MB,
|
||||
TAOS_CFG_UTYPE_BYTE,
|
||||
TAOS_CFG_UTYPE_SECOND,
|
||||
TAOS_CFG_UTYPE_MS
|
||||
|
|
Loading…
Reference in New Issue