Merge pull request #5068 from taosdata/feature/TD-2863
[TD-2863]increase query buffer size to 64 bits
This commit is contained in:
commit
bf48eef8c7
|
@ -270,3 +270,9 @@
|
||||||
|
|
||||||
# in retrieve blocking model, only in 50% query threads will be used in query processing in dnode
|
# in retrieve blocking model, only in 50% query threads will be used in query processing in dnode
|
||||||
# retrieveBlockingModel 0
|
# retrieveBlockingModel 0
|
||||||
|
|
||||||
|
# the maximum allowed query buffer size in MB during query processing for each data node
|
||||||
|
# -1 no limit (default)
|
||||||
|
# 0 no query allowed, queries are disabled
|
||||||
|
# queryBufferSize -1
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,8 @@ extern int32_t tsCompressMsgSize;
|
||||||
extern char tsTempDir[];
|
extern char tsTempDir[];
|
||||||
|
|
||||||
//query buffer management
|
//query buffer management
|
||||||
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer for each data node during query processing
|
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
|
||||||
|
extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node during query processing
|
||||||
extern int32_t tsRetrieveBlockingModel;// retrieve threads will be blocked
|
extern int32_t tsRetrieveBlockingModel;// retrieve threads will be blocked
|
||||||
|
|
||||||
extern int8_t tsKeepOriginalColumnName;
|
extern int8_t tsKeepOriginalColumnName;
|
||||||
|
|
|
@ -105,6 +105,7 @@ int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
|
||||||
// 0 no query allowed, queries are disabled
|
// 0 no query allowed, queries are disabled
|
||||||
// positive value (in MB)
|
// positive value (in MB)
|
||||||
int32_t tsQueryBufferSize = -1;
|
int32_t tsQueryBufferSize = -1;
|
||||||
|
int64_t tsQueryBufferSizeBytes = -1;
|
||||||
|
|
||||||
// in retrieve blocking model, the retrieve threads will wait for the completion of the query processing.
|
// in retrieve blocking model, the retrieve threads will wait for the completion of the query processing.
|
||||||
int32_t tsRetrieveBlockingModel = 0;
|
int32_t tsRetrieveBlockingModel = 0;
|
||||||
|
@ -283,7 +284,7 @@ bool taosCfgDynamicOptions(char *msg) {
|
||||||
int32_t cfgLen = (int32_t)strlen(cfg->option);
|
int32_t cfgLen = (int32_t)strlen(cfg->option);
|
||||||
if (cfgLen != olen) continue;
|
if (cfgLen != olen) continue;
|
||||||
if (strncasecmp(option, cfg->option, olen) != 0) continue;
|
if (strncasecmp(option, cfg->option, olen) != 0) continue;
|
||||||
if (cfg->valType != TAOS_CFG_VTYPE_INT32) {
|
if (cfg->valType == TAOS_CFG_VTYPE_INT32) {
|
||||||
*((int32_t *)cfg->ptr) = vint;
|
*((int32_t *)cfg->ptr) = vint;
|
||||||
} else {
|
} else {
|
||||||
*((int8_t *)cfg->ptr) = (int8_t)vint;
|
*((int8_t *)cfg->ptr) = (int8_t)vint;
|
||||||
|
@ -1488,6 +1489,10 @@ int32_t taosCheckGlobalCfg() {
|
||||||
tsSyncPort = tsServerPort + TSDB_PORT_SYNC;
|
tsSyncPort = tsServerPort + TSDB_PORT_SYNC;
|
||||||
tsHttpPort = tsServerPort + TSDB_PORT_HTTP;
|
tsHttpPort = tsServerPort + TSDB_PORT_HTTP;
|
||||||
|
|
||||||
|
if (tsQueryBufferSize >= 0) {
|
||||||
|
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
|
||||||
|
}
|
||||||
|
|
||||||
taosPrintGlobalCfg();
|
taosPrintGlobalCfg();
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -7730,15 +7730,15 @@ static int64_t getQuerySupportBufSize(size_t numOfTables) {
|
||||||
|
|
||||||
int32_t checkForQueryBuf(size_t numOfTables) {
|
int32_t checkForQueryBuf(size_t numOfTables) {
|
||||||
int64_t t = getQuerySupportBufSize(numOfTables);
|
int64_t t = getQuerySupportBufSize(numOfTables);
|
||||||
if (tsQueryBufferSize < 0) {
|
if (tsQueryBufferSizeBytes < 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else if (tsQueryBufferSize > 0) {
|
} else if (tsQueryBufferSizeBytes > 0) {
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
int64_t s = tsQueryBufferSize;
|
int64_t s = tsQueryBufferSizeBytes;
|
||||||
int64_t remain = s - t;
|
int64_t remain = s - t;
|
||||||
if (remain >= 0) {
|
if (remain >= 0) {
|
||||||
if (atomic_val_compare_exchange_64(&tsQueryBufferSize, s, remain) == s) {
|
if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -7752,14 +7752,14 @@ int32_t checkForQueryBuf(size_t numOfTables) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void releaseQueryBuf(size_t numOfTables) {
|
void releaseQueryBuf(size_t numOfTables) {
|
||||||
if (tsQueryBufferSize <= 0) {
|
if (tsQueryBufferSizeBytes < 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t t = getQuerySupportBufSize(numOfTables);
|
int64_t t = getQuerySupportBufSize(numOfTables);
|
||||||
|
|
||||||
// restore value is not enough buffer available
|
// restore value is not enough buffer available
|
||||||
atomic_add_fetch_64(&tsQueryBufferSize, t);
|
atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* qGetResultRetrieveMsg(qinfo_t qinfo) {
|
void* qGetResultRetrieveMsg(qinfo_t qinfo) {
|
||||||
|
|
Loading…
Reference in New Issue