commit
8fc2bb8dd5
|
@ -251,7 +251,7 @@
|
|||
# cqDebugFlag 131
|
||||
|
||||
# enable/disable recording the SQL in taos client
|
||||
# tscEnableRecordSql 0
|
||||
# enableRecordSql 0
|
||||
|
||||
# generate core file when service crash
|
||||
# enableCoreFile 1
|
||||
|
@ -264,3 +264,6 @@
|
|||
|
||||
# enable/disable stream (continuous query)
|
||||
# stream 1
|
||||
|
||||
# only 50% CPU resources will be used in query processing
|
||||
# halfCoresForQuery 0
|
||||
|
|
|
@ -630,11 +630,17 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
|
||||
static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
|
||||
pBlocks->tid = pTableMeta->id.tid;
|
||||
pBlocks->uid = pTableMeta->id.uid;
|
||||
pBlocks->sversion = pTableMeta->sversion;
|
||||
pBlocks->numOfRows += numOfRows;
|
||||
|
||||
if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
} else {
|
||||
pBlocks->numOfRows += numOfRows;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
// data block is disordered, sort it in ascending order
|
||||
|
@ -722,7 +728,11 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st
|
|||
}
|
||||
|
||||
SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
|
||||
tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
|
||||
code = tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", *str);
|
||||
return code;
|
||||
}
|
||||
|
||||
dataBuf->vgId = pTableMeta->vgroupInfo.vgId;
|
||||
dataBuf->numOfTables = 1;
|
||||
|
@ -1384,7 +1394,10 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
|
|||
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
|
||||
|
||||
SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData);
|
||||
tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
|
||||
code = tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", NULL);
|
||||
}
|
||||
|
||||
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
|
|
@ -56,6 +56,7 @@ extern char tsTempDir[];
|
|||
|
||||
//query buffer management
|
||||
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer for each data node during query processing
|
||||
extern int32_t tsHalfCoresForQuery; // only 50% will be used in query processing
|
||||
|
||||
// client
|
||||
extern int32_t tsTableMetaKeepTimer;
|
||||
|
|
|
@ -107,6 +107,9 @@ int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
|
|||
// positive value (in MB)
|
||||
int32_t tsQueryBufferSize = -1;
|
||||
|
||||
// only 50% cpu will be used in query processing in dnode
|
||||
int32_t tsHalfCoresForQuery = 0;
|
||||
|
||||
// db parameters
|
||||
int32_t tsCacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
|
||||
int32_t tsBlocksPerVnode = TSDB_DEFAULT_TOTAL_BLOCKS;
|
||||
|
@ -884,6 +887,16 @@ static void doInitGlobalConfig(void) {
|
|||
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "halfCoresForQuery";
|
||||
cfg.ptr = &tsHalfCoresForQuery;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
|
||||
cfg.minValue = 0;
|
||||
cfg.maxValue = 1;
|
||||
cfg.ptrLength = 1;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
// locale & charset
|
||||
cfg.option = "timezone";
|
||||
cfg.ptr = tsTimezone;
|
||||
|
@ -1290,7 +1303,7 @@ static void doInitGlobalConfig(void) {
|
|||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "tscEnableRecordSql";
|
||||
cfg.option = "enableRecordSql";
|
||||
cfg.ptr = &tsTscEnableRecordSql;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
|
||||
|
|
|
@ -1372,8 +1372,12 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
|
|||
}
|
||||
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
pResultRow->key = malloc(varDataTLen(pData));
|
||||
varDataCopy(pResultRow->key, pData);
|
||||
if (pResultRow->key == NULL) {
|
||||
pResultRow->key = malloc(varDataTLen(pData));
|
||||
varDataCopy(pResultRow->key, pData);
|
||||
} else {
|
||||
assert(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0);
|
||||
}
|
||||
} else {
|
||||
pResultRow->win.skey = v;
|
||||
pResultRow->win.ekey = v;
|
||||
|
|
|
@ -19,9 +19,7 @@
|
|||
#include "taoserror.h"
|
||||
#include "tconfig.h"
|
||||
#include "tglobal.h"
|
||||
#include "tkey.h"
|
||||
#include "tulog.h"
|
||||
#include "tsocket.h"
|
||||
#include "tsystem.h"
|
||||
#include "tutil.h"
|
||||
|
||||
|
|
|
@ -275,41 +275,40 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
|
||||
vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle);
|
||||
|
||||
|
||||
#if _NON_BLOCKING_RETRIEVE
|
||||
bool freehandle = false;
|
||||
bool buildRes = qTableQuery(*qhandle); // do execute query
|
||||
|
||||
// build query rsp, the retrieve request has reached here already
|
||||
if (buildRes) {
|
||||
// update the connection info according to the retrieve connection
|
||||
pRead->rpcHandle = qGetResultRetrieveMsg(*qhandle);
|
||||
assert(pRead->rpcHandle != NULL);
|
||||
|
||||
vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle,
|
||||
pRead->rpcHandle);
|
||||
|
||||
// set the real rsp error code
|
||||
pRead->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qhandle, &freehandle, pRead->rpcHandle);
|
||||
|
||||
// NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
|
||||
code = TSDB_CODE_QRY_HAS_RSP;
|
||||
// In the retrieve blocking model, only 50% CPU will be used in query processing
|
||||
if (tsHalfCoresForQuery) {
|
||||
qTableQuery(*qhandle); // do execute query
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, false);
|
||||
} else {
|
||||
void* h1 = qGetResultRetrieveMsg(*qhandle);
|
||||
assert(h1 == NULL);
|
||||
bool freehandle = false;
|
||||
bool buildRes = qTableQuery(*qhandle); // do execute query
|
||||
|
||||
freehandle = qQueryCompleted(*qhandle);
|
||||
}
|
||||
// build query rsp, the retrieve request has reached here already
|
||||
if (buildRes) {
|
||||
// update the connection info according to the retrieve connection
|
||||
pRead->rpcHandle = qGetResultRetrieveMsg(*qhandle);
|
||||
assert(pRead->rpcHandle != NULL);
|
||||
|
||||
// NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle.
|
||||
// If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle
|
||||
if (freehandle || (!buildRes)) {
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
|
||||
vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle,
|
||||
pRead->rpcHandle);
|
||||
|
||||
// set the real rsp error code
|
||||
pRead->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qhandle, &freehandle, pRead->rpcHandle);
|
||||
|
||||
// NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
|
||||
code = TSDB_CODE_QRY_HAS_RSP;
|
||||
} else {
|
||||
void *h1 = qGetResultRetrieveMsg(*qhandle);
|
||||
assert(h1 == NULL);
|
||||
freehandle = qQueryCompleted(*qhandle);
|
||||
}
|
||||
|
||||
// NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle.
|
||||
// If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle
|
||||
if (freehandle || (!buildRes)) {
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
|
||||
}
|
||||
}
|
||||
#else
|
||||
qTableQuery(*qhandle); // do execute query
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, false);
|
||||
#endif
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -375,14 +374,16 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
freeHandle = true;
|
||||
} else { // result is not ready, return immediately
|
||||
assert(buildRes == true);
|
||||
#if _NON_BLOCKING_RETRIEVE
|
||||
if (!buildRes) {
|
||||
assert(pRead->rpcHandle != NULL);
|
||||
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false);
|
||||
return TSDB_CODE_QRY_NOT_READY;
|
||||
// Only effects in the non-blocking model
|
||||
if (!tsHalfCoresForQuery) {
|
||||
if (!buildRes) {
|
||||
assert(pRead->rpcHandle != NULL);
|
||||
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false);
|
||||
return TSDB_CODE_QRY_NOT_READY;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
// ahandle is the sqlObj pointer
|
||||
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pRead->rpcHandle);
|
||||
|
|
Loading…
Reference in New Issue