From fa810a383a1252bb13dc8b09eaa57f38f2dffbd1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 3 Dec 2020 17:25:59 +0800 Subject: [PATCH 1/5] [TD-2265]: add the total number of rows in submit block during parse insert sql. --- src/client/src/tscParseInsert.c | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 6de8195d73..6ff1cdcb79 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -630,11 +630,17 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3 return TSDB_CODE_SUCCESS; } -static void tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) { +static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) { pBlocks->tid = pTableMeta->id.tid; pBlocks->uid = pTableMeta->id.uid; pBlocks->sversion = pTableMeta->sversion; - pBlocks->numOfRows += numOfRows; + + if (pBlocks->numOfRows + numOfRows >= INT16_MAX) { + return TSDB_CODE_TSC_INVALID_SQL; + } else { + pBlocks->numOfRows += numOfRows; + return TSDB_CODE_SUCCESS; + } } // data block is disordered, sort it in ascending order @@ -722,7 +728,11 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st } SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData); - tsSetBlockInfo(pBlocks, pTableMeta, numOfRows); + code = tsSetBlockInfo(pBlocks, pTableMeta, numOfRows); + if (code != TSDB_CODE_SUCCESS) { + tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", *str); + return code; + } dataBuf->vgId = pTableMeta->vgroupInfo.vgId; dataBuf->numOfTables = 1; From a3fb9a1a8e13c83b783cede3460650743f0e53e9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 3 Dec 2020 17:28:02 +0800 Subject: [PATCH 2/5] [TD-2265] --- src/client/src/tscParseInsert.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 6ff1cdcb79..08935adf8b 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1394,7 +1394,10 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta; SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData); - tsSetBlockInfo(pBlocks, pTableMeta, numOfRows); + code = tsSetBlockInfo(pBlocks, pTableMeta, numOfRows); + if (code != TSDB_CODE_SUCCESS) { + return tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", ""); + } if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { return code; From 5015365172cfd9cc91344eeadc2a266e0df19be4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 3 Dec 2020 17:28:38 +0800 Subject: [PATCH 3/5] [TD-2265] --- src/client/src/tscParseInsert.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 08935adf8b..18e5b6f074 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1396,7 +1396,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData); code = tsSetBlockInfo(pBlocks, pTableMeta, numOfRows); if (code != TSDB_CODE_SUCCESS) { - return tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", ""); + return tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", NULL); } if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { From 0fe731af0fccb4eb1cd31865d19b591a81a8f0cf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 3 Dec 2020 22:01:55 +0800 Subject: [PATCH 4/5] [TD-2325]: enable configure if 50% cpu will be used in query processing. --- packaging/cfg/taos.cfg | 5 ++- src/common/inc/tglobal.h | 1 + src/common/src/tglobal.c | 15 +++++++- src/util/src/tconfig.c | 2 -- src/vnode/src/vnodeRead.c | 75 ++++++++++++++++++++------------------- 5 files changed, 57 insertions(+), 41 deletions(-) diff --git a/packaging/cfg/taos.cfg b/packaging/cfg/taos.cfg index 7662d49280..014ebc37d2 100644 --- a/packaging/cfg/taos.cfg +++ b/packaging/cfg/taos.cfg @@ -251,7 +251,7 @@ # cqDebugFlag 131 # enable/disable recording the SQL in taos client -# tscEnableRecordSql 0 +# enableRecordSql 0 # generate core file when service crash # enableCoreFile 1 @@ -264,3 +264,6 @@ # enable/disable stream (continuous query) # stream 1 + +# only 50% CPU resources will be used in query processing +# halfCoresForQuery 0 diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 6e4274b358..10bcfcb82b 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -56,6 +56,7 @@ extern char tsTempDir[]; //query buffer management extern int32_t tsQueryBufferSize; // maximum allowed usage buffer for each data node during query processing +extern int32_t tsHalfCoresForQuery; // only 50% will be used in query processing // client extern int32_t tsTableMetaKeepTimer; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 18aa0ae6e3..026cd67a81 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -107,6 +107,9 @@ int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance // positive value (in MB) int32_t tsQueryBufferSize = -1; +// only 50% cpu will be used in query processing in dnode +int32_t tsHalfCoresForQuery = 0; + // db parameters int32_t tsCacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE; int32_t tsBlocksPerVnode = TSDB_DEFAULT_TOTAL_BLOCKS; @@ -884,6 +887,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_BYTE; taosInitConfigOption(cfg); + cfg.option = "halfCoresForQuery"; + cfg.ptr = &tsHalfCoresForQuery; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 1; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + // locale & charset cfg.option = "timezone"; cfg.ptr = tsTimezone; @@ -1290,7 +1303,7 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); - cfg.option = "tscEnableRecordSql"; + cfg.option = "enableRecordSql"; cfg.ptr = &tsTscEnableRecordSql; cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; diff --git a/src/util/src/tconfig.c b/src/util/src/tconfig.c index e89dea5a24..173de294cf 100644 --- a/src/util/src/tconfig.c +++ b/src/util/src/tconfig.c @@ -19,9 +19,7 @@ #include "taoserror.h" #include "tconfig.h" #include "tglobal.h" -#include "tkey.h" #include "tulog.h" -#include "tsocket.h" #include "tsystem.h" #include "tutil.h" diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 5ef79cfbf0..ed6d29505f 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -275,41 +275,40 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle); - -#if _NON_BLOCKING_RETRIEVE - bool freehandle = false; - bool buildRes = qTableQuery(*qhandle); // do execute query - - // build query rsp, the retrieve request has reached here already - if (buildRes) { - // update the connection info according to the retrieve connection - pRead->rpcHandle = qGetResultRetrieveMsg(*qhandle); - assert(pRead->rpcHandle != NULL); - - vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle, - pRead->rpcHandle); - - // set the real rsp error code - pRead->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qhandle, &freehandle, pRead->rpcHandle); - - // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client - code = TSDB_CODE_QRY_HAS_RSP; + // In the retrieve blocking model, only 50% CPU will be used in query processing + if (tsHalfCoresForQuery) { + qTableQuery(*qhandle); // do execute query + qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, false); } else { - void* h1 = qGetResultRetrieveMsg(*qhandle); - assert(h1 == NULL); + bool freehandle = false; + bool buildRes = qTableQuery(*qhandle); // do execute query - freehandle = qQueryCompleted(*qhandle); - } + // build query rsp, the retrieve request has reached here already + if (buildRes) { + // update the connection info according to the retrieve connection + pRead->rpcHandle = qGetResultRetrieveMsg(*qhandle); + assert(pRead->rpcHandle != NULL); - // NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle. - // If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle - if (freehandle || (!buildRes)) { - qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle); + vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle, + pRead->rpcHandle); + + // set the real rsp error code + pRead->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qhandle, &freehandle, pRead->rpcHandle); + + // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client + code = TSDB_CODE_QRY_HAS_RSP; + } else { + void *h1 = qGetResultRetrieveMsg(*qhandle); + assert(h1 == NULL); + freehandle = qQueryCompleted(*qhandle); + } + + // NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle. + // If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle + if (freehandle || (!buildRes)) { + qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle); + } } -#else - qTableQuery(*qhandle); // do execute query - qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, false); -#endif } return code; @@ -375,14 +374,16 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { freeHandle = true; } else { // result is not ready, return immediately assert(buildRes == true); -#if _NON_BLOCKING_RETRIEVE - if (!buildRes) { - assert(pRead->rpcHandle != NULL); - qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false); - return TSDB_CODE_QRY_NOT_READY; + // Only effects in the non-blocking model + if (!tsHalfCoresForQuery) { + if (!buildRes) { + assert(pRead->rpcHandle != NULL); + + qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false); + return TSDB_CODE_QRY_NOT_READY; + } } -#endif // ahandle is the sqlObj pointer code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pRead->rpcHandle); From 7090b3feb0bea56433e4d10565b18c74bf1f5e7b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 3 Dec 2020 22:52:17 +0800 Subject: [PATCH 5/5] [TD-2169]: fix memory leaks in group by query processing. --- src/query/src/qExecutor.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 0c07149e8e..bcce7c50a8 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1372,8 +1372,12 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat } if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - pResultRow->key = malloc(varDataTLen(pData)); - varDataCopy(pResultRow->key, pData); + if (pResultRow->key == NULL) { + pResultRow->key = malloc(varDataTLen(pData)); + varDataCopy(pResultRow->key, pData); + } else { + assert(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0); + } } else { pResultRow->win.skey = v; pResultRow->win.ekey = v;