Merge branch 'enh/3.0_planner_optimize' into refact/query_opt

This commit is contained in:
Xiaoyu Wang 2022-09-21 13:56:34 +08:00
commit 71749cf6c6
35 changed files with 558 additions and 199 deletions

View File

@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 125c77a GIT_TAG 509ec72
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE

View File

@ -3,7 +3,11 @@ sidebar_label: Docker
title: Quick Install on Docker title: Quick Install on Docker
--- ---
This document describes how to install TDengine in a Docker container and perform queries and inserts. To get started with TDengine in a non-containerized environment, see [Quick Install](../../get-started/package). If you want to view the source code, build TDengine yourself, or contribute to the project, see the [TDengine GitHub repository](https://github.com/taosdata/TDengine). This document describes how to install TDengine in a Docker container and perform queries and inserts.
- To get started with TDengine in a non-containerized environment, see [Quick Install from Package](../../get-started/package).
- For a fully managed solution, see the [TDengine Cloud documentation](/cloud/).
- If you want to view the source code, build TDengine yourself, or contribute to the project, see the [TDengine GitHub repository](https://github.com/taosdata/TDengine).
## Run TDengine ## Run TDengine

View File

@ -7,7 +7,11 @@ import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem"; import TabItem from "@theme/TabItem";
import PkgListV3 from "/components/PkgListV3"; import PkgListV3 from "/components/PkgListV3";
For information about installing TDengine on Docker, see [Quick Install on Docker](../../get-started/docker). If you want to view the source code, build TDengine yourself, or contribute to the project, see the [TDengine GitHub repository](https://github.com/taosdata/TDengine). This document describes how to install TDengine on Linux and Windows and perform queries and inserts.
- To get started with TDengine on Docker, see [Quick Install on Docker](../../get-started/docker).
- For a fully managed solution, see the [TDengine Cloud documentation](/cloud/).
- If you want to view the source code, build TDengine yourself, or contribute to the project, see the [TDengine GitHub repository](https://github.com/taosdata/TDengine).
The full package of TDengine includes the TDengine Server (`taosd`), TDengine Client (`taosc`), taosAdapter for connecting with third-party systems and providing a RESTful interface, a command-line interface (CLI, taos), and some tools. Note that taosAdapter supports Linux only. In addition to connectors for multiple languages, TDengine also provides a [REST API](../../reference/rest-api) through [taosAdapter](../../reference/taosadapter). The full package of TDengine includes the TDengine Server (`taosd`), TDengine Client (`taosc`), taosAdapter for connecting with third-party systems and providing a RESTful interface, a command-line interface (CLI, taos), and some tools. Note that taosAdapter supports Linux only. In addition to connectors for multiple languages, TDengine also provides a [REST API](../../reference/rest-api) through [taosAdapter](../../reference/taosadapter).

View File

@ -3,9 +3,9 @@ title: Get Started
description: This article describes how to install TDengine and test its performance. description: This article describes how to install TDengine and test its performance.
--- ---
The full package of TDengine includes the TDengine Server (`taosd`), TDengine Client (`taosc`), taosAdapter for connecting with third-party systems and providing a RESTful interface, a command-line interface, and some tools. In addition to connectors for multiple languages, TDengine also provides a [RESTful interface](/reference/rest-api) through [taosAdapter](/reference/taosadapter). You can install and run TDengine on Linux and Windows machines as well as Docker containers. You can also deploy TDengine as a managed service with TDengine Cloud.
You can install and run TDengine on Linux and Windows machines as well as Docker containers. The full package of TDengine includes the TDengine Server (`taosd`), TDengine Client (`taosc`), taosAdapter for connecting with third-party systems and providing a RESTful interface, a command-line interface, and some tools. In addition to connectors for multiple languages, TDengine also provides a [RESTful interface](/reference/rest-api) through [taosAdapter](/reference/taosadapter).
```mdx-code-block ```mdx-code-block
import DocCardList from '@theme/DocCardList'; import DocCardList from '@theme/DocCardList';

View File

@ -16,6 +16,8 @@ INSERT INTO
[(field1_name, ...)] [(field1_name, ...)]
VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
...]; ...];
INSERT INTO tb_name [(field1_name, ...)] subquery
``` ```
**Timestamps** **Timestamps**
@ -37,7 +39,7 @@ INSERT INTO
4. The FILE clause inserts tags or data from a comma-separates values (CSV) file. Do not include headers in your CSV files. 4. The FILE clause inserts tags or data from a comma-separates values (CSV) file. Do not include headers in your CSV files.
5. A single INSERT statement can write data to multiple tables. 5. A single `INSERT ... VALUES` statement and `INSERT ... FILE` statement can write data to multiple tables.
6. The INSERT statement is fully parsed before being executed, so that if any element of the statement fails, the entire statement will fail. For example, the following statement will not create a table because the latter part of the statement is invalid: 6. The INSERT statement is fully parsed before being executed, so that if any element of the statement fails, the entire statement will fail. For example, the following statement will not create a table because the latter part of the statement is invalid:
@ -47,6 +49,8 @@ INSERT INTO
7. However, an INSERT statement that writes data to multiple subtables can succeed for some tables and fail for others. This situation is caused because vnodes perform write operations independently of each other. One vnode failing to write data does not affect the ability of other vnodes to write successfully. 7. However, an INSERT statement that writes data to multiple subtables can succeed for some tables and fail for others. This situation is caused because vnodes perform write operations independently of each other. One vnode failing to write data does not affect the ability of other vnodes to write successfully.
8. Data from TDengine can be inserted into a specified table using the `INSERT ... subquery` statement. Arbitrary query statements are supported. This syntax can only be used for subtables and normal tables, and does not support automatic table creation.
## Insert a Record ## Insert a Record
Single row or multiple rows specified with VALUES can be inserted into a specific table. A single row is inserted using the below statement. Single row or multiple rows specified with VALUES can be inserted into a specific table. A single row is inserted using the below statement.

View File

@ -17,6 +17,8 @@ INSERT INTO
[(field1_name, ...)] [(field1_name, ...)]
VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
...]; ...];
INSERT INTO tb_name [(field1_name, ...)] subquery
``` ```
**关于时间戳** **关于时间戳**
@ -38,7 +40,7 @@ INSERT INTO
4. FILE 语法表示数据来自于 CSV 文件英文逗号分隔、英文单引号括住每个值CSV 文件无需表头。 4. FILE 语法表示数据来自于 CSV 文件英文逗号分隔、英文单引号括住每个值CSV 文件无需表头。
5. 无论使用哪种语法,均可以在一条 INSERT 语句中同时向多个表插入数据。 5. `INSERT ... VALUES` 语句和 `INSERT ... FILE` 语句均可以在一条 INSERT 语句中同时向多个表插入数据。
6. INSERT 语句是完整解析后再执行的,对如下语句,不会再出现数据错误但建表成功的情况: 6. INSERT 语句是完整解析后再执行的,对如下语句,不会再出现数据错误但建表成功的情况:
@ -48,6 +50,8 @@ INSERT INTO
7. 对于向多个子表插入数据的情况,依然会有部分数据写入失败,部分数据写入成功的情况。这是因为多个子表可能分布在不同的 VNODE 上,客户端将 INSERT 语句完整解析后,将数据发往各个涉及的 VNODE 上,每个 VNODE 独立进行写入操作。如果某个 VNODE 因为某些原因(比如网络问题或磁盘故障)导致写入失败,并不会影响其他 VNODE 节点的写入。 7. 对于向多个子表插入数据的情况,依然会有部分数据写入失败,部分数据写入成功的情况。这是因为多个子表可能分布在不同的 VNODE 上,客户端将 INSERT 语句完整解析后,将数据发往各个涉及的 VNODE 上,每个 VNODE 独立进行写入操作。如果某个 VNODE 因为某些原因(比如网络问题或磁盘故障)导致写入失败,并不会影响其他 VNODE 节点的写入。
8. 可以使用 `INSERT ... subquery` 语句将 TDengine 中的数据插入到指定表中。subquery 可以是任意的查询语句。此语法只能用于子表和普通表,且不支持自动建表。
## 插入一条记录 ## 插入一条记录
指定已经创建好的数据子表的表名,并通过 VALUES 关键字提供一行或多行数据,即可向数据库写入这些数据。例如,执行如下语句可以写入一行记录: 指定已经创建好的数据子表的表名,并通过 VALUES 关键字提供一行或多行数据,即可向数据库写入这些数据。例如,执行如下语句可以写入一行记录:

View File

@ -95,6 +95,8 @@ extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in
extern int32_t tsQueryPolicy; extern int32_t tsQueryPolicy;
extern int32_t tsQuerySmaOptimize; extern int32_t tsQuerySmaOptimize;
extern bool tsQueryPlannerTrace; extern bool tsQueryPlannerTrace;
extern int32_t tsQueryNodeChunkSize;
extern bool tsQueryUseNodeAllocator;
// client // client
extern int32_t tsMinSlidingTime; extern int32_t tsMinSlidingTime;

View File

@ -275,6 +275,18 @@ typedef struct SNodeList {
SListCell* pTail; SListCell* pTail;
} SNodeList; } SNodeList;
typedef struct SNodeAllocator SNodeAllocator;
int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator);
void nodesDestroyNodeAllocator(void* pAllocator);
void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator);
int32_t nodesAllocatorInit();
int32_t nodesCreateAllocator(int32_t chunkSize, int64_t* pRefId);
void nodesDestroyAllocator(int64_t refId);
void nodesResetAllocator(int64_t refId);
int64_t nodesIncAllocatorRefCount(int64_t refId);
SNode* nodesMakeNode(ENodeType type); SNode* nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNode* pNode); void nodesDestroyNode(SNode* pNode);

View File

@ -67,6 +67,7 @@ typedef struct SSchedulerReq {
SRequestConnInfo *pConn; SRequestConnInfo *pConn;
SArray *pNodeList; SArray *pNodeList;
SQueryPlan *pDag; SQueryPlan *pDag;
int64_t allocatorRefId;
const char *sql; const char *sql;
int64_t startTs; int64_t startTs;
schedulerExecFp execFp; schedulerExecFp execFp;

View File

@ -250,6 +250,7 @@ typedef struct SRequestObj {
bool inRetry; bool inRetry;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry; uint32_t retry;
int64_t allocatorRefId;
} SRequestObj; } SRequestObj;
typedef struct SSyncQueryParam { typedef struct SSyncQueryParam {

View File

@ -288,6 +288,7 @@ void *createRequest(uint64_t connId, int32_t type) {
pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default
pRequest->type = type; pRequest->type = type;
pRequest->allocatorRefId = -1;
pRequest->pDb = getDbOfConnection(pTscObj); pRequest->pDb = getDbOfConnection(pTscObj);
pRequest->pTscObj = pTscObj; pRequest->pTscObj = pTscObj;
@ -349,6 +350,7 @@ void doDestroyRequest(void *p) {
taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList); taosArrayDestroy(pRequest->dbList);
taosArrayDestroy(pRequest->targetTableList); taosArrayDestroy(pRequest->targetTableList);
nodesDestroyAllocator(pRequest->allocatorRefId);
destroyQueryExecRes(&pRequest->body.resInfo.execRes); destroyQueryExecRes(&pRequest->body.resInfo.execRes);
@ -411,6 +413,7 @@ void taos_init_imp(void) {
initTaskQueue(); initTaskQueue();
fmFuncMgtInit(); fmFuncMgtInit();
nodesAllocatorInit();
clientConnRefPool = taosOpenRef(200, destroyTscObj); clientConnRefPool = taosOpenRef(200, destroyTscObj);
clientReqRefPool = taosOpenRef(40960, doDestroyRequest); clientReqRefPool = taosOpenRef(40960, doDestroyRequest);

View File

@ -194,6 +194,18 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
(*pRequest)->allocatorRefId = -1;
if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
if (TSDB_CODE_SUCCESS != nodesCreateAllocator(tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self,
(*pRequest)->requestId, pTscObj->id, sql);
destroyRequest(*pRequest);
*pRequest = NULL;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1036,6 +1048,8 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
pRequest->body.subplanNum = pDag->numOfSubplans; pRequest->body.subplanNum = pDag->numOfSubplans;
} }
nodesResetAllocator(-1);
pRequest->metric.planEnd = taosGetTimestampUs(); pRequest->metric.planEnd = taosGetTimestampUs();
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " create query plan success, elapsed time:%.2f ms, 0x%" PRIx64, pRequest->self, tscDebug("0x%" PRIx64 " create query plan success, elapsed time:%.2f ms, 0x%" PRIx64, pRequest->self,
@ -1053,6 +1067,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
.pConn = &conn, .pConn = &conn,
.pNodeList = pNodeList, .pNodeList = pNodeList,
.pDag = pDag, .pDag = pDag,
.allocatorRefId = pRequest->allocatorRefId,
.sql = pRequest->sqlstr, .sql = pRequest->sqlstr,
.startTs = pRequest->metric.start, .startTs = pRequest->metric.start,
.execFp = schedulerExecCb, .execFp = schedulerExecCb,

View File

@ -700,6 +700,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
pRequest->metric.ctgEnd = taosGetTimestampUs(); pRequest->metric.ctgEnd = taosGetTimestampUs();
nodesResetAllocator(pRequest->allocatorRefId);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
pRequest->stableQuery = pQuery->stableQuery; pRequest->stableQuery = pQuery->stableQuery;
@ -729,6 +731,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
} else { } else {
destorySqlParseWrapper(pWrapper); destorySqlParseWrapper(pWrapper);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
nodesResetAllocator(-1);
if (NEED_CLIENT_HANDLE_ERROR(code)) { if (NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
@ -801,11 +804,13 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
} }
SQuery *pQuery = NULL; SQuery *pQuery = NULL;
nodesResetAllocator(pRequest->allocatorRefId);
pRequest->metric.syntaxStart = taosGetTimestampUs(); pRequest->metric.syntaxStart = taosGetTimestampUs();
SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)}; SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)};
code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq); code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq);
nodesResetAllocator(-1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }

View File

@ -227,8 +227,8 @@ static const SSysDbTableSchema transSchema[] = {
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "db1", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "db2", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "stable", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "last_action_info", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "last_action_info", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},

View File

@ -92,6 +92,8 @@ bool tsSmlDataFormat =
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
int32_t tsQuerySmaOptimize = 0; int32_t tsQuerySmaOptimize = 0;
bool tsQueryPlannerTrace = false; bool tsQueryPlannerTrace = false;
int32_t tsQueryNodeChunkSize = 32 * 1024;
bool tsQueryUseNodeAllocator = true;
/* /*
* denote if the server needs to compress response message at the application layer to client, including query rsp, * denote if the server needs to compress response message at the application layer to client, including query rsp,
@ -284,6 +286,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1; if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1;
if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1; if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1;
if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, true) != 0) return -1;
if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, true) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1; if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
@ -385,9 +389,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4); tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1;
// tsNumOfQnodeFetchThreads = tsNumOfCores / 2; // tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); // tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
// if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; // if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
tsNumOfSnodeSharedThreads = tsNumOfCores / 4; tsNumOfSnodeSharedThreads = tsNumOfCores / 4;
tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4); tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4);
@ -527,7 +531,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype; pItem->stype = stype;
} }
/* /*
pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads"); pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfQnodeFetchThreads = numOfCores / 2; tsNumOfQnodeFetchThreads = numOfCores / 2;
@ -535,7 +539,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->i32 = tsNumOfQnodeFetchThreads; pItem->i32 = tsNumOfQnodeFetchThreads;
pItem->stype = stype; pItem->stype = stype;
} }
*/ */
pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads"); pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
@ -643,6 +647,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32; tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32;
tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32; tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32;
tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval; tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval;
tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32;
tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval;
return 0; return 0;
} }
@ -693,7 +699,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
@ -941,10 +947,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
} else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) { } else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) {
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
/* /*
} else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) { } else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) {
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
*/ */
} else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) { } else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) {
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
} else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) { } else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) {
@ -976,6 +982,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
qDebugFlag = cfgGetItem(pCfg, "qDebugFlag")->i32; qDebugFlag = cfgGetItem(pCfg, "qDebugFlag")->i32;
} else if (strcasecmp("queryPlannerTrace", name) == 0) { } else if (strcasecmp("queryPlannerTrace", name) == 0) {
tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval; tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval;
} else if (strcasecmp("queryNodeChunkSize", name) == 0) {
tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32;
} else if (strcasecmp("queryUseNodeAllocator", name) == 0) {
tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval;
} }
break; break;
} }

View File

@ -165,8 +165,8 @@ typedef struct {
SEpSet lastEpset; SEpSet lastEpset;
tmsg_t lastMsgType; tmsg_t lastMsgType;
tmsg_t originRpcType; tmsg_t originRpcType;
char dbname1[TSDB_TABLE_FNAME_LEN]; char dbname[TSDB_TABLE_FNAME_LEN];
char dbname2[TSDB_TABLE_FNAME_LEN]; char stbname[TSDB_TABLE_FNAME_LEN];
int32_t startFunc; int32_t startFunc;
int32_t stopFunc; int32_t stopFunc;
int32_t paramLen; int32_t paramLen;

View File

@ -71,7 +71,7 @@ int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen); void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2); void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname);
void mndTransSetSerial(STrans *pTrans); void mndTransSetSerial(STrans *pTrans);
void mndTransSetOper(STrans *pTrans, EOperType oper); void mndTransSetOper(STrans *pTrans, EOperType oper);

View File

@ -671,7 +671,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER; goto _OVER;
} }
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); // hack way
mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
// create stb for stream // create stb for stream

View File

@ -127,8 +127,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT8(pRaw, dataPos, 0, _OVER) SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->originRpcType, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->originRpcType, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
@ -289,8 +289,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
pTrans->oper = oper; pTrans->oper = oper;
SDB_GET_INT16(pRaw, dataPos, &pTrans->originRpcType, _OVER) SDB_GET_INT16(pRaw, dataPos, &pTrans->originRpcType, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER) SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
@ -706,7 +706,7 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c
if (pIter == NULL) break; if (pIter == NULL) break;
if (pTrans->oper == oper) { if (pTrans->oper == oper) {
if (strcasecmp(dbname, pTrans->dbname1) == 0) { if (strcasecmp(dbname, pTrans->dbname) == 0) {
mInfo("trans:%d, db:%s oper:%d matched with input", pTrans->id, dbname, oper); mInfo("trans:%d, db:%s oper:%d matched with input", pTrans->id, dbname, oper);
if (pTrans->pRpcArray == NULL) { if (pTrans->pRpcArray == NULL) {
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo)); pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
@ -725,12 +725,12 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c
return code; return code;
} }
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2) { void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname) {
if (dbname1 != NULL) { if (dbname != NULL) {
tstrncpy(pTrans->dbname1, dbname1, TSDB_TABLE_FNAME_LEN); tstrncpy(pTrans->dbname, dbname, TSDB_TABLE_FNAME_LEN);
} }
if (dbname2 != NULL) { if (stbname != NULL) {
tstrncpy(pTrans->dbname2, dbname2, TSDB_TABLE_FNAME_LEN); tstrncpy(pTrans->stbname, stbname, TSDB_TABLE_FNAME_LEN);
} }
} }
@ -759,9 +759,9 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
return 0; return 0;
} }
static bool mndCheckDbConflict(const char *db, STrans *pTrans) { static bool mndCheckDbConflict(const char *conflict, STrans *pTrans) {
if (db[0] == 0) return false; if (conflict[0] == 0) return false;
if (strcasecmp(db, pTrans->dbname1) == 0 || strcasecmp(db, pTrans->dbname2) == 0) return true; if (strcasecmp(conflict, pTrans->dbname) == 0 || strcasecmp(conflict, pTrans->stbname) == 0) return true;
return false; return false;
} }
@ -780,28 +780,28 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
if (pNew->conflict == TRN_CONFLICT_DB) { if (pNew->conflict == TRN_CONFLICT_DB) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (mndCheckDbConflict(pNew->dbname1, pTrans)) conflict = true; if (mndCheckDbConflict(pNew->dbname, pTrans)) conflict = true;
if (mndCheckDbConflict(pNew->dbname2, pTrans)) conflict = true; if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true;
} }
} }
if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) { if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB) { if (pTrans->conflict == TRN_CONFLICT_DB) {
if (mndCheckDbConflict(pNew->dbname1, pTrans)) conflict = true; if (mndCheckDbConflict(pNew->dbname, pTrans)) conflict = true;
if (mndCheckDbConflict(pNew->dbname2, pTrans)) conflict = true; if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true;
} }
if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (mndCheckDbConflict(pNew->dbname2, pTrans)) conflict = true; // for stb if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true; // for stb
} }
} }
if (conflict) { if (conflict) {
mError("trans:%d, db1:%s db2:%s type:%d, can't execute since conflict with trans:%d db1:%s db2:%s type:%d", mError("trans:%d, db:%s stb:%s type:%d, can't execute since conflict with trans:%d db:%s stb:%s type:%d",
pNew->id, pNew->dbname1, pNew->dbname2, pNew->conflict, pTrans->id, pTrans->dbname1, pTrans->dbname2, pNew->id, pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname,
pTrans->conflict); pTrans->conflict);
} else { } else {
mDebug("trans:%d, db1:%s db2:%s type:%d, not conflict with trans:%d db1:%s db2:%s type:%d", pNew->id, mDebug("trans:%d, db:%s stb:%s type:%d, not conflict with trans:%d db:%s stb:%s type:%d", pNew->id,
pNew->dbname1, pNew->dbname2, pNew->conflict, pTrans->id, pTrans->dbname1, pTrans->dbname2, pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname,
pTrans->conflict); pTrans->conflict);
} }
sdbRelease(pMnode->pSdb, pTrans); sdbRelease(pMnode->pSdb, pTrans);
@ -812,7 +812,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (strlen(pTrans->dbname1) == 0 && strlen(pTrans->dbname2) == 0) { if (strlen(pTrans->dbname) == 0 && strlen(pTrans->stbname) == 0) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT; terrno = TSDB_CODE_MND_TRANS_CONFLICT;
mError("trans:%d, failed to prepare conflict db not set", pTrans->id); mError("trans:%d, failed to prepare conflict db not set", pTrans->id);
return -1; return -1;
@ -913,12 +913,12 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
if (pTrans->originRpcType == TDMT_MND_CREATE_DB) { if (pTrans->originRpcType == TDMT_MND_CREATE_DB) {
mInfo("trans:%d, origin msgtype:%s", pTrans->id, TMSG_INFO(pTrans->originRpcType)); mInfo("trans:%d, origin msgtype:%s", pTrans->id, TMSG_INFO(pTrans->originRpcType));
SDbObj *pDb = mndAcquireDb(pMnode, pTrans->dbname1); SDbObj *pDb = mndAcquireDb(pMnode, pTrans->dbname);
if (pDb != NULL) { if (pDb != NULL) {
for (int32_t j = 0; j < 12; j++) { for (int32_t j = 0; j < 12; j++) {
bool ready = mndIsDbReady(pMnode, pDb); bool ready = mndIsDbReady(pMnode, pDb);
if (!ready) { if (!ready) {
mInfo("trans:%d, db:%s not ready yet, wait %d times", pTrans->id, pTrans->dbname1, j); mInfo("trans:%d, db:%s not ready yet, wait %d times", pTrans->id, pTrans->dbname, j);
taosMsleep(1000); taosMsleep(1000);
} else { } else {
break; break;
@ -929,7 +929,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
} else if (pTrans->originRpcType == TDMT_MND_CREATE_STB) { } else if (pTrans->originRpcType == TDMT_MND_CREATE_STB) {
void *pCont = NULL; void *pCont = NULL;
int32_t contLen = 0; int32_t contLen = 0;
if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname1, pTrans->dbname2, &pCont, &contLen) != 0) { if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname, pTrans->stbname, &pCont, &contLen) != 0) {
mndTransSetRpcRsp(pTrans, pCont, contLen); mndTransSetRpcRsp(pTrans, pCont, contLen);
} }
} }
@ -1599,15 +1599,15 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)stage, false); colDataAppend(pColInfo, numOfRows, (const char *)stage, false);
char dbname1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(dbname1, mndGetDbStr(pTrans->dbname1), pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(pTrans->dbname), pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)dbname1, false); colDataAppend(pColInfo, numOfRows, (const char *)dbname, false);
char dbname2[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char stbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(dbname2, mndGetDbStr(pTrans->dbname2), pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(stbname, mndGetDbStr(pTrans->stbname), pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)dbname2, false); colDataAppend(pColInfo, numOfRows, (const char *)stbname, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false); colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false);

View File

@ -17,6 +17,18 @@
#include "plannodes.h" #include "plannodes.h"
#include "tdatablock.h" #include "tdatablock.h"
#ifndef htonll
#define htonll(x) \
(((int64_t)x & 0x00000000000000ff) << 7 * 8) | (((int64_t)x & 0x000000000000ff00) << 5 * 8) | \
(((int64_t)x & 0x0000000000ff0000) << 3 * 8) | (((int64_t)x & 0x00000000ff000000) << 1 * 8) | \
(((int64_t)x & 0x000000ff00000000) >> 1 * 8) | (((int64_t)x & 0x0000ff0000000000) >> 3 * 8) | \
(((int64_t)x & 0x00ff000000000000) >> 5 * 8) | (((int64_t)x & 0xff00000000000000) >> 7 * 8)
#define ntohll(x) htonll(x)
#endif
#define NODES_MSG_DEFAULT_LEN 1024 #define NODES_MSG_DEFAULT_LEN 1024
#define TLV_TYPE_ARRAY_ELEM 0 #define TLV_TYPE_ARRAY_ELEM 0
@ -86,8 +98,8 @@ static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pV
pEncoder->allocSize = pEncoder->allocSize * 2; pEncoder->allocSize = pEncoder->allocSize * 2;
} }
STlv* pTlv = (STlv*)(pEncoder->pBuf + pEncoder->offset); STlv* pTlv = (STlv*)(pEncoder->pBuf + pEncoder->offset);
pTlv->type = type; pTlv->type = htons(type);
pTlv->len = len; pTlv->len = htonl(len);
memcpy(pTlv->value, pValue, len); memcpy(pTlv->value, pValue, len);
pEncoder->offset += tlvLen; pEncoder->offset += tlvLen;
++(pEncoder->tlvCount); ++(pEncoder->tlvCount);
@ -117,26 +129,32 @@ static int32_t tlvEncodeValueI8(STlvEncoder* pEncoder, int8_t value) {
} }
static int32_t tlvEncodeI16(STlvEncoder* pEncoder, int16_t type, int16_t value) { static int32_t tlvEncodeI16(STlvEncoder* pEncoder, int16_t type, int16_t value) {
value = htons(value);
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueI16(STlvEncoder* pEncoder, int16_t value) { static int32_t tlvEncodeValueI16(STlvEncoder* pEncoder, int16_t value) {
value = htons(value);
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
} }
static int32_t tlvEncodeI32(STlvEncoder* pEncoder, int16_t type, int32_t value) { static int32_t tlvEncodeI32(STlvEncoder* pEncoder, int16_t type, int32_t value) {
value = htonl(value);
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueI32(STlvEncoder* pEncoder, int32_t value) { static int32_t tlvEncodeValueI32(STlvEncoder* pEncoder, int32_t value) {
value = htonl(value);
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
} }
static int32_t tlvEncodeI64(STlvEncoder* pEncoder, int16_t type, int64_t value) { static int32_t tlvEncodeI64(STlvEncoder* pEncoder, int16_t type, int64_t value) {
value = htonll(value);
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueI64(STlvEncoder* pEncoder, int64_t value) { static int32_t tlvEncodeValueI64(STlvEncoder* pEncoder, int64_t value) {
value = htonll(value);
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
} }
@ -149,34 +167,44 @@ static int32_t tlvEncodeValueU8(STlvEncoder* pEncoder, uint8_t value) {
} }
static int32_t tlvEncodeU16(STlvEncoder* pEncoder, int16_t type, uint16_t value) { static int32_t tlvEncodeU16(STlvEncoder* pEncoder, int16_t type, uint16_t value) {
value = htons(value);
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueU16(STlvEncoder* pEncoder, uint16_t value) { static int32_t tlvEncodeValueU16(STlvEncoder* pEncoder, uint16_t value) {
value = htons(value);
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
} }
static int32_t tlvEncodeU64(STlvEncoder* pEncoder, int16_t type, uint64_t value) { static int32_t tlvEncodeU64(STlvEncoder* pEncoder, int16_t type, uint64_t value) {
value = htonll(value);
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueU64(STlvEncoder* pEncoder, uint64_t value) { static int32_t tlvEncodeValueU64(STlvEncoder* pEncoder, uint64_t value) {
value = htonll(value);
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
} }
static int32_t tlvEncodeDouble(STlvEncoder* pEncoder, int16_t type, double value) { static int32_t tlvEncodeDouble(STlvEncoder* pEncoder, int16_t type, double value) {
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); int64_t temp = *(int64_t*)&value;
temp = htonll(temp);
return tlvEncodeImpl(pEncoder, type, &temp, sizeof(temp));
} }
static int32_t tlvEncodeValueDouble(STlvEncoder* pEncoder, double value) { static int32_t tlvEncodeValueDouble(STlvEncoder* pEncoder, double value) {
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); int64_t temp = *(int64_t*)&value;
temp = htonll(temp);
return tlvEncodeValueImpl(pEncoder, &temp, sizeof(temp));
} }
static int32_t tlvEncodeEnum(STlvEncoder* pEncoder, int16_t type, int32_t value) { static int32_t tlvEncodeEnum(STlvEncoder* pEncoder, int16_t type, int32_t value) {
value = htonl(value);
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueEnum(STlvEncoder* pEncoder, int32_t value) { static int32_t tlvEncodeValueEnum(STlvEncoder* pEncoder, int32_t value) {
value = htonl(value);
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
} }
@ -197,7 +225,7 @@ static int32_t tlvEncodeCStr(STlvEncoder* pEncoder, int16_t type, const char* pV
static int32_t tlvEncodeValueCStr(STlvEncoder* pEncoder, const char* pValue) { static int32_t tlvEncodeValueCStr(STlvEncoder* pEncoder, const char* pValue) {
int16_t len = strlen(pValue); int16_t len = strlen(pValue);
int32_t code = tlvEncodeValueImpl(pEncoder, &len, sizeof(len)); int32_t code = tlvEncodeValueI16(pEncoder, len);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueImpl(pEncoder, pValue, len); code = tlvEncodeValueImpl(pEncoder, pValue, len);
} }
@ -218,8 +246,8 @@ static int32_t tlvEncodeObj(STlvEncoder* pEncoder, int16_t type, FToMsg func, co
int32_t code = func(pObj, pEncoder); int32_t code = func(pObj, pEncoder);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
STlv* pTlv = (STlv*)(pEncoder->pBuf + start); STlv* pTlv = (STlv*)(pEncoder->pBuf + start);
pTlv->type = type; pTlv->type = htons(type);
pTlv->len = pEncoder->offset - start - sizeof(STlv); pTlv->len = htonl(pEncoder->offset - start - sizeof(STlv));
} }
++(pEncoder->tlvCount); ++(pEncoder->tlvCount);
return code; return code;
@ -236,8 +264,8 @@ static int32_t tlvEncodeObjArray(STlvEncoder* pEncoder, int16_t type, FToMsg fun
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
STlv* pTlv = (STlv*)(pEncoder->pBuf + start); STlv* pTlv = (STlv*)(pEncoder->pBuf + start);
pTlv->type = type; pTlv->type = htons(type);
pTlv->len = pEncoder->offset - start - sizeof(STlv); pTlv->len = htonl(pEncoder->offset - start - sizeof(STlv));
} }
} }
return code; return code;
@ -259,6 +287,8 @@ static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) {
} }
*pTlv = (STlv*)(pDecoder->pBuf + pDecoder->offset); *pTlv = (STlv*)(pDecoder->pBuf + pDecoder->offset);
(*pTlv)->type = ntohs((*pTlv)->type);
(*pTlv)->len = ntohl((*pTlv)->len);
if ((*pTlv)->len + pDecoder->offset > pDecoder->bufSize) { if ((*pTlv)->len + pDecoder->offset > pDecoder->bufSize) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
@ -291,22 +321,52 @@ static int32_t tlvDecodeValueI8(STlvDecoder* pDecoder, int8_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
} }
static int32_t tlvDecodeI16(STlv* pTlv, int16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeI16(STlv* pTlv, int16_t* pValue) {
int32_t code = tlvDecodeImpl(pTlv, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohs(*pValue);
}
return code;
}
static int32_t tlvDecodeValueI16(STlvDecoder* pDecoder, int16_t* pValue) { static int32_t tlvDecodeValueI16(STlvDecoder* pDecoder, int16_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); int32_t code = tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohs(*pValue);
}
return code;
} }
static int32_t tlvDecodeI32(STlv* pTlv, int32_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeI32(STlv* pTlv, int32_t* pValue) {
int32_t code = tlvDecodeImpl(pTlv, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohl(*pValue);
}
return code;
}
static int32_t tlvDecodeValueI32(STlvDecoder* pDecoder, int32_t* pValue) { static int32_t tlvDecodeValueI32(STlvDecoder* pDecoder, int32_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); int32_t code = tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohl(*pValue);
}
return code;
} }
static int32_t tlvDecodeI64(STlv* pTlv, int64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeI64(STlv* pTlv, int64_t* pValue) {
int32_t code = tlvDecodeImpl(pTlv, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohll(*pValue);
}
return code;
}
static int32_t tlvDecodeValueI64(STlvDecoder* pDecoder, int64_t* pValue) { static int32_t tlvDecodeValueI64(STlvDecoder* pDecoder, int64_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); int32_t code = tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohll(*pValue);
}
return code;
} }
static int32_t tlvDecodeU8(STlv* pTlv, uint8_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeU8(STlv* pTlv, uint8_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); }
@ -315,22 +375,54 @@ static int32_t tlvDecodeValueU8(STlvDecoder* pDecoder, uint8_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
} }
static int32_t tlvDecodeU16(STlv* pTlv, uint16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeU16(STlv* pTlv, uint16_t* pValue) {
int32_t code = tlvDecodeImpl(pTlv, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohs(*pValue);
}
return code;
}
static int32_t tlvDecodeValueU16(STlvDecoder* pDecoder, uint16_t* pValue) { static int32_t tlvDecodeValueU16(STlvDecoder* pDecoder, uint16_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); int32_t code = tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohs(*pValue);
}
return code;
} }
static int32_t tlvDecodeU64(STlv* pTlv, uint64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeU64(STlv* pTlv, uint64_t* pValue) {
int32_t code = tlvDecodeImpl(pTlv, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohll(*pValue);
}
return code;
}
static int32_t tlvDecodeValueU64(STlvDecoder* pDecoder, uint64_t* pValue) { static int32_t tlvDecodeValueU64(STlvDecoder* pDecoder, uint64_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); int32_t code = tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
if (TSDB_CODE_SUCCESS == code) {
*pValue = ntohll(*pValue);
}
return code;
} }
static int32_t tlvDecodeDouble(STlv* pTlv, double* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeDouble(STlv* pTlv, double* pValue) {
int64_t temp = 0;
int32_t code = tlvDecodeI64(pTlv, &temp);
if (TSDB_CODE_SUCCESS == code) {
*pValue = *(double*)&temp;
}
return code;
}
static int32_t tlvDecodeValueDouble(STlvDecoder* pDecoder, double* pValue) { static int32_t tlvDecodeValueDouble(STlvDecoder* pDecoder, double* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); int64_t temp = 0;
int32_t code = tlvDecodeValueI64(pDecoder, &temp);
if (TSDB_CODE_SUCCESS == code) {
*pValue = *(double*)&temp;
}
return code;
} }
static int32_t convertIntegerType(int32_t value, void* pValue, int16_t len) { static int32_t convertIntegerType(int32_t value, void* pValue, int16_t len) {
@ -2462,33 +2554,54 @@ static int32_t msgToPhysiWindowNode(STlvDecoder* pDecoder, void* pObj) {
return code; return code;
} }
enum { enum { PHY_INTERVAL_CODE_WINDOW = 1, PHY_INTERVAL_CODE_INLINE_ATTRS };
PHY_INTERVAL_CODE_WINDOW = 1,
PHY_INTERVAL_CODE_INTERVAL, static int32_t physiIntervalNodeInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
PHY_INTERVAL_CODE_OFFSET, const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj;
PHY_INTERVAL_CODE_SLIDING,
PHY_INTERVAL_CODE_INTERVAL_UNIT, int32_t code = tlvEncodeValueI64(pEncoder, pNode->interval);
PHY_INTERVAL_CODE_SLIDING_UNIT if (TSDB_CODE_SUCCESS == code) {
}; code = tlvEncodeValueI64(pEncoder, pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI64(pEncoder, pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI8(pEncoder, pNode->intervalUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI8(pEncoder, pNode->slidingUnit);
}
return code;
}
static int32_t physiIntervalNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t physiIntervalNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj; const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, PHY_INTERVAL_CODE_WINDOW, physiWindowNodeToMsg, &pNode->window); int32_t code = tlvEncodeObj(pEncoder, PHY_INTERVAL_CODE_WINDOW, physiWindowNodeToMsg, &pNode->window);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_INTERVAL_CODE_INTERVAL, pNode->interval); code = tlvEncodeObj(pEncoder, PHY_INTERVAL_CODE_INLINE_ATTRS, physiIntervalNodeInlineToMsg, pNode);
}
return code;
}
static int32_t msgToPhysiIntervalNodeInline(STlvDecoder* pDecoder, void* pObj) {
SIntervalPhysiNode* pNode = (SIntervalPhysiNode*)pObj;
int32_t code = tlvDecodeValueI64(pDecoder, &pNode->interval);
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI64(pDecoder, &pNode->offset);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_INTERVAL_CODE_OFFSET, pNode->offset); code = tlvDecodeValueI64(pDecoder, &pNode->sliding);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_INTERVAL_CODE_SLIDING, pNode->sliding); code = tlvDecodeValueI8(pDecoder, &pNode->intervalUnit);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, PHY_INTERVAL_CODE_INTERVAL_UNIT, pNode->intervalUnit); code = tlvDecodeValueI8(pDecoder, &pNode->slidingUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, PHY_INTERVAL_CODE_SLIDING_UNIT, pNode->slidingUnit);
} }
return code; return code;
@ -2504,20 +2617,8 @@ static int32_t msgToPhysiIntervalNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_INTERVAL_CODE_WINDOW: case PHY_INTERVAL_CODE_WINDOW:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiWindowNode, &pNode->window); code = tlvDecodeObjFromTlv(pTlv, msgToPhysiWindowNode, &pNode->window);
break; break;
case PHY_INTERVAL_CODE_INTERVAL: case PHY_INTERVAL_CODE_INLINE_ATTRS:
code = tlvDecodeI64(pTlv, &pNode->interval); code = tlvDecodeObjFromTlv(pTlv, msgToPhysiIntervalNodeInline, pNode);
break;
case PHY_INTERVAL_CODE_OFFSET:
code = tlvDecodeI64(pTlv, &pNode->offset);
break;
case PHY_INTERVAL_CODE_SLIDING:
code = tlvDecodeI64(pTlv, &pNode->sliding);
break;
case PHY_INTERVAL_CODE_INTERVAL_UNIT:
code = tlvDecodeI8(pTlv, &pNode->intervalUnit);
break;
case PHY_INTERVAL_CODE_SLIDING_UNIT:
code = tlvDecodeI8(pTlv, &pNode->slidingUnit);
break; break;
default: default:
break; break;

View File

@ -21,9 +21,163 @@
#include "taoserror.h" #include "taoserror.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "thash.h" #include "thash.h"
#include "tref.h"
static SNode* makeNode(ENodeType type, size_t size) { typedef struct SNodeMemChunk {
SNode* p = taosMemoryCalloc(1, size); int32_t availableSize;
int32_t usedSize;
char* pBuf;
struct SNodeMemChunk* pNext;
} SNodeMemChunk;
typedef struct SNodeAllocator {
int64_t self;
int32_t chunkSize;
int32_t chunkNum;
SNodeMemChunk* pCurrChunk;
SNodeMemChunk* pChunks;
} SNodeAllocator;
static threadlocal SNodeAllocator* pNodeAllocator;
static int32_t allocatorReqRefPool = -1;
int32_t nodesAllocatorInit() {
if (allocatorReqRefPool >= 0) {
nodesWarn("nodes already initialized");
return TSDB_CODE_SUCCESS;
}
allocatorReqRefPool = taosOpenRef(40960, nodesDestroyNodeAllocator);
if (allocatorReqRefPool < 0) {
nodesError("init nodes failed");
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) {
SNodeMemChunk* pNewChunk = taosMemoryCalloc(1, sizeof(SNodeMemChunk) + pAllocator->chunkSize);
if (NULL == pNewChunk) {
return NULL;
}
pNewChunk->pBuf = (char*)(pNewChunk + 1);
pNewChunk->availableSize = pAllocator->chunkSize;
pNewChunk->usedSize = 0;
pNewChunk->pNext = NULL;
if (NULL != pAllocator->pCurrChunk) {
pAllocator->pCurrChunk->pNext = pNewChunk;
}
pAllocator->pCurrChunk = pNewChunk;
if (NULL == pAllocator->pChunks) {
pAllocator->pChunks = pNewChunk;
}
++(pAllocator->chunkNum);
return pNewChunk;
}
static void* nodesCallocImpl(int32_t size) {
if (NULL == pNodeAllocator) {
return taosMemoryCalloc(1, size);
}
if (pNodeAllocator->pCurrChunk->usedSize + size > pNodeAllocator->pCurrChunk->availableSize) {
if (NULL == callocNodeChunk(pNodeAllocator)) {
return NULL;
}
}
void* p = pNodeAllocator->pCurrChunk->pBuf + pNodeAllocator->pCurrChunk->usedSize;
pNodeAllocator->pCurrChunk->usedSize += size;
return p;
}
static void* nodesCalloc(int32_t num, int32_t size) {
void* p = nodesCallocImpl(num * size + 1);
if (NULL == p) {
return NULL;
}
*(char*)p = (NULL != pNodeAllocator) ? 1 : 0;
return (char*)p + 1;
}
static void nodesFree(void* p) {
char* ptr = (char*)p - 1;
if (0 == *ptr) {
taosMemoryFree(ptr);
}
return;
}
int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator) {
*pAllocator = taosMemoryCalloc(1, sizeof(SNodeAllocator));
if (NULL == *pAllocator) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pAllocator)->chunkSize = chunkSize;
if (NULL == callocNodeChunk(*pAllocator)) {
taosMemoryFreeClear(*pAllocator);
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
void nodesDestroyNodeAllocator(void* p) {
if (NULL == p) {
return;
}
SNodeAllocator* pAllocator = p;
nodesDebug("alloc chunkNum: %d, chunkTotakSize: %d", pAllocator->chunkNum,
pAllocator->chunkNum * pAllocator->chunkSize);
SNodeMemChunk* pChunk = pAllocator->pChunks;
while (NULL != pChunk) {
SNodeMemChunk* pTemp = pChunk->pNext;
taosMemoryFree(pChunk);
pChunk = pTemp;
}
taosMemoryFree(pAllocator);
}
void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator) { pNodeAllocator = pAllocator; }
int32_t nodesCreateAllocator(int32_t chunkSize, int64_t* pRefId) {
SNodeAllocator* pAllocator = NULL;
int32_t code = nodesCreateNodeAllocator(chunkSize, &pAllocator);
if (TSDB_CODE_SUCCESS == code) {
pAllocator->self = taosAddRef(allocatorReqRefPool, pAllocator);
*pRefId = pAllocator->self;
}
return code;
}
void nodesDestroyAllocator(int64_t refId) {
if (refId <= 0) {
return;
}
taosReleaseRef(allocatorReqRefPool, refId);
}
void nodesResetAllocator(int64_t refId) {
if (refId <= 0) {
pNodeAllocator = NULL;
} else {
pNodeAllocator = taosAcquireRef(allocatorReqRefPool, refId);
taosReleaseRef(allocatorReqRefPool, refId);
}
}
int64_t nodesIncAllocatorRefCount(int64_t refId) {
if (refId <= 0) {
return -1;
}
SNodeAllocator* pAllocator = taosAcquireRef(allocatorReqRefPool, refId);
return pAllocator->self;
}
static SNode* makeNode(ENodeType type, int32_t size) {
SNode* p = nodesCalloc(1, size);
if (NULL == p) { if (NULL == p) {
return NULL; return NULL;
} }
@ -824,6 +978,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pLogicNode->pWStartTs); nodesDestroyNode(pLogicNode->pWStartTs);
nodesDestroyNode(pLogicNode->pValues); nodesDestroyNode(pLogicNode->pValues);
nodesDestroyList(pLogicNode->pFillExprs); nodesDestroyList(pLogicNode->pFillExprs);
nodesDestroyList(pLogicNode->pNotFillExprs);
break; break;
} }
case QUERY_NODE_LOGIC_PLAN_SORT: { case QUERY_NODE_LOGIC_PLAN_SORT: {
@ -1021,12 +1176,12 @@ void nodesDestroyNode(SNode* pNode) {
default: default:
break; break;
} }
taosMemoryFreeClear(pNode); nodesFree(pNode);
return; return;
} }
SNodeList* nodesMakeList() { SNodeList* nodesMakeList() {
SNodeList* p = taosMemoryCalloc(1, sizeof(SNodeList)); SNodeList* p = nodesCalloc(1, sizeof(SNodeList));
if (NULL == p) { if (NULL == p) {
return NULL; return NULL;
} }
@ -1037,7 +1192,7 @@ int32_t nodesListAppend(SNodeList* pList, SNode* pNode) {
if (NULL == pList || NULL == pNode) { if (NULL == pList || NULL == pNode) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SListCell* p = taosMemoryCalloc(1, sizeof(SListCell)); SListCell* p = nodesCalloc(1, sizeof(SListCell));
if (NULL == p) { if (NULL == p) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -1104,7 +1259,7 @@ int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc) {
} }
pTarget->pTail = pSrc->pTail; pTarget->pTail = pSrc->pTail;
pTarget->length += pSrc->length; pTarget->length += pSrc->length;
taosMemoryFreeClear(pSrc); nodesFree(pSrc);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1124,7 +1279,7 @@ int32_t nodesListPushFront(SNodeList* pList, SNode* pNode) {
if (NULL == pList || NULL == pNode) { if (NULL == pList || NULL == pNode) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SListCell* p = taosMemoryCalloc(1, sizeof(SListCell)); SListCell* p = nodesCalloc(1, sizeof(SListCell));
if (NULL == p) { if (NULL == p) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -1152,7 +1307,7 @@ SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) {
} }
SListCell* pNext = pCell->pNext; SListCell* pNext = pCell->pNext;
nodesDestroyNode(pCell->pNode); nodesDestroyNode(pCell->pNode);
taosMemoryFreeClear(pCell); nodesFree(pCell);
--(pList->length); --(pList->length);
return pNext; return pNext;
} }
@ -1172,7 +1327,7 @@ void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc) {
pPos->pPrev = pSrc->pTail; pPos->pPrev = pSrc->pTail;
pTarget->length += pSrc->length; pTarget->length += pSrc->length;
taosMemoryFreeClear(pSrc); nodesFree(pSrc);
} }
SNode* nodesListGetNode(SNodeList* pList, int32_t index) { SNode* nodesListGetNode(SNodeList* pList, int32_t index) {
@ -1204,7 +1359,7 @@ void nodesDestroyList(SNodeList* pList) {
while (NULL != pNext) { while (NULL != pNext) {
pNext = nodesListErase(pList, pNext); pNext = nodesListErase(pList, pNext);
} }
taosMemoryFreeClear(pList); nodesFree(pList);
} }
void nodesClearList(SNodeList* pList) { void nodesClearList(SNodeList* pList) {
@ -1216,9 +1371,9 @@ void nodesClearList(SNodeList* pList) {
while (NULL != pNext) { while (NULL != pNext) {
SListCell* tmp = pNext; SListCell* tmp = pNext;
pNext = pNext->pNext; pNext = pNext->pNext;
taosMemoryFreeClear(tmp); nodesFree(tmp);
} }
taosMemoryFreeClear(pList); nodesFree(pList);
} }
void* nodesGetValueFromNode(SValueNode* pNode) { void* nodesGetValueFromNode(SValueNode* pNode) {

View File

@ -247,7 +247,8 @@ SNode* releaseRawExprNode(SAstCreateContext* pCxt, SNode* pNode) {
pExpr->userAlias[len] = '\0'; pExpr->userAlias[len] = '\0';
} }
} }
taosMemoryFreeClear(pNode); pRawExpr->pNode = NULL;
nodesDestroyNode(pNode);
return pRealizedExpr; return pRealizedExpr;
} }

View File

@ -119,12 +119,18 @@ class ParserTestBaseImpl {
TEST_INTERFACE_ASYNC_API TEST_INTERFACE_ASYNC_API
}; };
static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) { static void destoryParseContext(SParseContext* pCxt) {
taosArrayDestroy(pCxt->pTableMetaPos);
taosArrayDestroy(pCxt->pTableVgroupPos);
delete pCxt;
}
static void destoryParseMetaCacheWarpper(SParseMetaCache* pMetaCache, bool request) {
destoryParseMetaCache(pMetaCache, request); destoryParseMetaCache(pMetaCache, request);
delete pMetaCache; delete pMetaCache;
} }
static void _destroyQuery(SQuery** pQuery) { static void destroyQuery(SQuery** pQuery) {
if (nullptr == pQuery) { if (nullptr == pQuery) {
return; return;
} }
@ -303,10 +309,10 @@ class ParserTestBaseImpl {
setParseContext(sql, &cxt); setParseContext(sql, &cxt);
if (qIsInsertValuesSql(cxt.pSql, cxt.sqlLen)) { if (qIsInsertValuesSql(cxt.pSql, cxt.sqlLen)) {
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
doParseInsertSql(&cxt, query.get(), nullptr); doParseInsertSql(&cxt, query.get(), nullptr);
} else { } else {
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
doParse(&cxt, query.get()); doParse(&cxt, query.get());
SQuery* pQuery = *(query.get()); SQuery* pQuery = *(query.get());
@ -335,7 +341,7 @@ class ParserTestBaseImpl {
SParseContext cxt = {0}; SParseContext cxt = {0};
setParseContext(sql, &cxt); setParseContext(sql, &cxt);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
doParseSql(&cxt, query.get()); doParseSql(&cxt, query.get());
SQuery* pQuery = *(query.get()); SQuery* pQuery = *(query.get());
@ -354,26 +360,26 @@ class ParserTestBaseImpl {
void runAsyncInternalFuncs(const string& sql, int32_t expect, ParserStage checkStage) { void runAsyncInternalFuncs(const string& sql, int32_t expect, ParserStage checkStage) {
reset(expect, checkStage, TEST_INTERFACE_ASYNC_INTERNAL); reset(expect, checkStage, TEST_INTERFACE_ASYNC_INTERNAL);
try { try {
SParseContext cxt = {0}; unique_ptr<SParseContext, function<void(SParseContext*)> > cxt(new SParseContext(), destoryParseContext);
setParseContext(sql, &cxt, true); setParseContext(sql, cxt.get(), true);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
bool request = true; bool request = true;
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache( unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
new SParseMetaCache(), bind(_destoryParseMetaCache, _1, cref(request))); new SParseMetaCache(), bind(destoryParseMetaCacheWarpper, _1, cref(request)));
bool isInsertValues = qIsInsertValuesSql(cxt.pSql, cxt.sqlLen); bool isInsertValues = qIsInsertValuesSql(cxt->pSql, cxt->sqlLen);
if (isInsertValues) { if (isInsertValues) {
doParseInsertSyntax(&cxt, query.get(), metaCache.get()); doParseInsertSyntax(cxt.get(), query.get(), metaCache.get());
} else { } else {
doParse(&cxt, query.get()); doParse(cxt.get(), query.get());
doCollectMetaKey(&cxt, *(query.get()), metaCache.get()); doCollectMetaKey(cxt.get(), *(query.get()), metaCache.get());
} }
SQuery* pQuery = *(query.get()); SQuery* pQuery = *(query.get());
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(), unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
MockCatalogService::destoryCatalogReq); MockCatalogService::destoryCatalogReq);
doBuildCatalogReq(&cxt, metaCache.get(), catalogReq.get()); doBuildCatalogReq(cxt.get(), metaCache.get(), catalogReq.get());
string err; string err;
thread t1([&]() { thread t1([&]() {
@ -386,13 +392,13 @@ class ParserTestBaseImpl {
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), isInsertValues); doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), isInsertValues);
if (isInsertValues) { if (isInsertValues) {
doParseInsertSql(&cxt, query.get(), metaCache.get()); doParseInsertSql(cxt.get(), query.get(), metaCache.get());
} else { } else {
doAuthenticate(&cxt, pQuery, metaCache.get()); doAuthenticate(cxt.get(), pQuery, metaCache.get());
doTranslate(&cxt, pQuery, metaCache.get()); doTranslate(cxt.get(), pQuery, metaCache.get());
doCalculateConstant(&cxt, pQuery); doCalculateConstant(cxt.get(), pQuery);
} }
} catch (const TerminateFlag& e) { } catch (const TerminateFlag& e) {
// success and terminate // success and terminate
@ -423,13 +429,13 @@ class ParserTestBaseImpl {
void runAsyncApis(const string& sql, int32_t expect, ParserStage checkStage) { void runAsyncApis(const string& sql, int32_t expect, ParserStage checkStage) {
reset(expect, checkStage, TEST_INTERFACE_ASYNC_API); reset(expect, checkStage, TEST_INTERFACE_ASYNC_API);
try { try {
SParseContext cxt = {0}; unique_ptr<SParseContext, function<void(SParseContext*)> > cxt(new SParseContext(), destoryParseContext);
setParseContext(sql, &cxt); setParseContext(sql, cxt.get());
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(), unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
MockCatalogService::destoryCatalogReq); MockCatalogService::destoryCatalogReq);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
doParseSqlSyntax(&cxt, query.get(), catalogReq.get()); doParseSqlSyntax(cxt.get(), query.get(), catalogReq.get());
SQuery* pQuery = *(query.get()); SQuery* pQuery = *(query.get());
string err; string err;
@ -438,7 +444,7 @@ class ParserTestBaseImpl {
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData); unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
doGetAllMeta(catalogReq.get(), metaData.get()); doGetAllMeta(catalogReq.get(), metaData.get());
doAnalyseSqlSemantic(&cxt, catalogReq.get(), metaData.get(), pQuery); doAnalyseSqlSemantic(cxt.get(), catalogReq.get(), metaData.get(), pQuery);
} catch (const TerminateFlag& e) { } catch (const TerminateFlag& e) {
// success and terminate // success and terminate
} catch (const runtime_error& e) { } catch (const runtime_error& e) {

View File

@ -997,6 +997,7 @@ static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSub
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort); code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode((SNode*)pScan);
code = nodesListMakeStrictAppend(&pSubplan->pChildren, code = nodesListMakeStrictAppend(&pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT)); (SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT));
} }

View File

@ -22,6 +22,7 @@
#include "mockCatalog.h" #include "mockCatalog.h"
#include "parser.h" #include "parser.h"
#include "planTestUtil.h" #include "planTestUtil.h"
#include "tglobal.h"
class PlannerEnv : public testing::Environment { class PlannerEnv : public testing::Environment {
public: public:
@ -30,6 +31,7 @@ class PlannerEnv : public testing::Environment {
initMetaDataEnv(); initMetaDataEnv();
generateMetaData(); generateMetaData();
initLog(TD_TMP_DIR_PATH "td"); initLog(TD_TMP_DIR_PATH "td");
initCfg();
} }
virtual void TearDown() { virtual void TearDown() {
@ -67,6 +69,8 @@ class PlannerEnv : public testing::Environment {
std::cout << "failed to init log file" << std::endl; std::cout << "failed to init log file" << std::endl;
} }
} }
void initCfg() { tsQueryPlannerTrace = true; }
}; };
static void parseArg(int argc, char* argv[]) { static void parseArg(int argc, char* argv[]) {
@ -79,6 +83,7 @@ static void parseArg(int argc, char* argv[]) {
{"limitSql", required_argument, NULL, 'i'}, {"limitSql", required_argument, NULL, 'i'},
{"log", required_argument, NULL, 'l'}, {"log", required_argument, NULL, 'l'},
{"queryPolicy", required_argument, NULL, 'q'}, {"queryPolicy", required_argument, NULL, 'q'},
{"useNodeAllocator", required_argument, NULL, 'a'},
{0, 0, 0, 0} {0, 0, 0, 0}
}; };
// clang-format on // clang-format on
@ -99,6 +104,9 @@ static void parseArg(int argc, char* argv[]) {
case 'q': case 'q':
setQueryPolicy(optarg); setQueryPolicy(optarg);
break; break;
case 'a':
setUseNodeAllocator(optarg);
break;
default: default:
break; break;
} }

View File

@ -41,6 +41,7 @@ using namespace testing;
enum DumpModule { enum DumpModule {
DUMP_MODULE_NOTHING = 1, DUMP_MODULE_NOTHING = 1,
DUMP_MODULE_SQL,
DUMP_MODULE_PARSER, DUMP_MODULE_PARSER,
DUMP_MODULE_LOGIC, DUMP_MODULE_LOGIC,
DUMP_MODULE_OPTIMIZED, DUMP_MODULE_OPTIMIZED,
@ -56,10 +57,13 @@ int32_t g_skipSql = 0;
int32_t g_limitSql = 0; int32_t g_limitSql = 0;
int32_t g_logLevel = 131; int32_t g_logLevel = 131;
int32_t g_queryPolicy = QUERY_POLICY_VNODE; int32_t g_queryPolicy = QUERY_POLICY_VNODE;
bool g_useNodeAllocator = false;
void setDumpModule(const char* pModule) { void setDumpModule(const char* pModule) {
if (NULL == pModule) { if (NULL == pModule) {
g_dumpModule = DUMP_MODULE_ALL; g_dumpModule = DUMP_MODULE_ALL;
} else if (0 == strncasecmp(pModule, "sql", strlen(pModule))) {
g_dumpModule = DUMP_MODULE_SQL;
} else if (0 == strncasecmp(pModule, "parser", strlen(pModule))) { } else if (0 == strncasecmp(pModule, "parser", strlen(pModule))) {
g_dumpModule = DUMP_MODULE_PARSER; g_dumpModule = DUMP_MODULE_PARSER;
} else if (0 == strncasecmp(pModule, "logic", strlen(pModule))) { } else if (0 == strncasecmp(pModule, "logic", strlen(pModule))) {
@ -79,10 +83,11 @@ void setDumpModule(const char* pModule) {
} }
} }
void setSkipSqlNum(const char* pNum) { g_skipSql = stoi(pNum); } void setSkipSqlNum(const char* pArg) { g_skipSql = stoi(pArg); }
void setLimitSqlNum(const char* pNum) { g_limitSql = stoi(pNum); } void setLimitSqlNum(const char* pArg) { g_limitSql = stoi(pArg); }
void setLogLevel(const char* pLogLevel) { g_logLevel = stoi(pLogLevel); } void setLogLevel(const char* pArg) { g_logLevel = stoi(pArg); }
void setQueryPolicy(const char* pQueryPolicy) { g_queryPolicy = stoi(pQueryPolicy); } void setQueryPolicy(const char* pArg) { g_queryPolicy = stoi(pArg); }
void setUseNodeAllocator(const char* pArg) { g_useNodeAllocator = stoi(pArg); }
int32_t getLogLevel() { return g_logLevel; } int32_t getLogLevel() { return g_logLevel; }
@ -124,6 +129,12 @@ class PlannerTestBaseImpl {
} }
void runImpl(const string& sql, int32_t queryPolicy) { void runImpl(const string& sql, int32_t queryPolicy) {
SNodeAllocator* pAllocator = NULL;
if (g_useNodeAllocator) {
nodesCreateNodeAllocator(32 * 1024, &pAllocator);
nodesResetThreadLevelAllocator(pAllocator);
}
reset(); reset();
tsQueryPolicy = queryPolicy; tsQueryPolicy = queryPolicy;
try { try {
@ -155,8 +166,13 @@ class PlannerTestBaseImpl {
dump(g_dumpModule); dump(g_dumpModule);
} catch (...) { } catch (...) {
dump(DUMP_MODULE_ALL); dump(DUMP_MODULE_ALL);
nodesDestroyNodeAllocator(pAllocator);
nodesResetThreadLevelAllocator(NULL);
throw; throw;
} }
nodesDestroyNodeAllocator(pAllocator);
nodesResetThreadLevelAllocator(NULL);
} }
void prepare(const string& sql) { void prepare(const string& sql) {
@ -216,6 +232,8 @@ class PlannerTestBaseImpl {
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan); doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan);
unique_ptr<SQueryPlan, void (*)(SQueryPlan*)> plan(pPlan, (void (*)(SQueryPlan*))nodesDestroyNode); unique_ptr<SQueryPlan, void (*)(SQueryPlan*)> plan(pPlan, (void (*)(SQueryPlan*))nodesDestroyNode);
checkPlanMsg((SNode*)pPlan);
dump(g_dumpModule); dump(g_dumpModule);
} catch (...) { } catch (...) {
dump(DUMP_MODULE_ALL); dump(DUMP_MODULE_ALL);
@ -252,7 +270,6 @@ class PlannerTestBaseImpl {
string splitLogicPlan_; string splitLogicPlan_;
string scaledLogicPlan_; string scaledLogicPlan_;
string physiPlan_; string physiPlan_;
string physiPlanMsg_;
vector<string> physiSubplans_; vector<string> physiSubplans_;
}; };
@ -276,17 +293,16 @@ class PlannerTestBaseImpl {
res_.splitLogicPlan_.clear(); res_.splitLogicPlan_.clear();
res_.scaledLogicPlan_.clear(); res_.scaledLogicPlan_.clear();
res_.physiPlan_.clear(); res_.physiPlan_.clear();
res_.physiPlanMsg_.clear();
res_.physiSubplans_.clear(); res_.physiSubplans_.clear();
} }
void dump(DumpModule module) { void dump(DumpModule module) {
cout << "========================================== " << sqlNo_ << " sql : [" << stmtEnv_.sql_ << "]" << endl;
if (DUMP_MODULE_NOTHING == module) { if (DUMP_MODULE_NOTHING == module) {
return; return;
} }
cout << "========================================== " << sqlNo_ << " sql : [" << stmtEnv_.sql_ << "]" << endl;
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) { if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) {
if (res_.prepareAst_.empty()) { if (res_.prepareAst_.empty()) {
cout << "+++++++++++++++++++++syntax tree : " << endl; cout << "+++++++++++++++++++++syntax tree : " << endl;
@ -411,8 +427,6 @@ class PlannerTestBaseImpl {
SNode* pSubplan; SNode* pSubplan;
FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { res_.physiSubplans_.push_back(toString(pSubplan)); } FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { res_.physiSubplans_.push_back(toString(pSubplan)); }
} }
res_.physiPlanMsg_ = toMsg((SNode*)(*pPlan));
cout << "json len: " << res_.physiPlan_.length() << ", msg len: " << res_.physiPlanMsg_.length() << endl;
} }
void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) { void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) {
@ -451,32 +465,22 @@ class PlannerTestBaseImpl {
string toString(const SNode* pRoot) { string toString(const SNode* pRoot) {
char* pStr = NULL; char* pStr = NULL;
int32_t len = 0; int32_t len = 0;
auto start = chrono::steady_clock::now();
DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len) DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len)
if (QUERY_NODE_PHYSICAL_PLAN == nodeType(pRoot)) {
cout << "nodesNodeToString: "
<< chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - start).count() << "us" << endl;
}
string str(pStr); string str(pStr);
taosMemoryFreeClear(pStr); taosMemoryFreeClear(pStr);
return str; return str;
} }
string toMsg(const SNode* pRoot) { void checkPlanMsg(const SNode* pRoot) {
char* pStr = NULL; char* pStr = NULL;
int32_t len = 0; int32_t len = 0;
auto start = chrono::steady_clock::now();
DO_WITH_THROW(nodesNodeToMsg, pRoot, &pStr, &len) DO_WITH_THROW(nodesNodeToMsg, pRoot, &pStr, &len)
cout << "nodesNodeToMsg: "
<< chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - start).count() << "us" << endl;
string copyStr(pStr, len);
SNode* pNode = NULL; SNode* pNode = NULL;
char* pNewStr = NULL; char* pNewStr = NULL;
int32_t newlen = 0; int32_t newlen = 0;
DO_WITH_THROW(nodesMsgToNode, pStr, len, &pNode) DO_WITH_THROW(nodesMsgToNode, copyStr.c_str(), len, &pNode)
DO_WITH_THROW(nodesNodeToMsg, pNode, &pNewStr, &newlen) DO_WITH_THROW(nodesNodeToMsg, pNode, &pNewStr, &newlen)
if (newlen != len || 0 != memcmp(pStr, pNewStr, len)) { if (newlen != len || 0 != memcmp(pStr, pNewStr, len)) {
cout << "nodesNodeToMsg error!!!!!!!!!!!!!! len = " << len << ", newlen = " << newlen << endl; cout << "nodesNodeToMsg error!!!!!!!!!!!!!! len = " << len << ", newlen = " << newlen << endl;
@ -490,9 +494,7 @@ class PlannerTestBaseImpl {
nodesDestroyNode(pNode); nodesDestroyNode(pNode);
taosMemoryFreeClear(pNewStr); taosMemoryFreeClear(pNewStr);
string str(pStr, len);
taosMemoryFreeClear(pStr); taosMemoryFreeClear(pStr);
return str;
} }
caseEnv caseEnv_; caseEnv caseEnv_;

View File

@ -41,11 +41,12 @@ class PlannerTestBase : public testing::Test {
std::unique_ptr<PlannerTestBaseImpl> impl_; std::unique_ptr<PlannerTestBaseImpl> impl_;
}; };
extern void setDumpModule(const char* pModule); extern void setDumpModule(const char* pArg);
extern void setSkipSqlNum(const char* pNum); extern void setSkipSqlNum(const char* pArg);
extern void setLimitSqlNum(const char* pNum); extern void setLimitSqlNum(const char* pArg);
extern void setLogLevel(const char* pLogLevel); extern void setLogLevel(const char* pArg);
extern void setQueryPolicy(const char* pQueryPolicy); extern void setQueryPolicy(const char* pArg);
extern void setUseNodeAllocator(const char* pArg);
extern int32_t getLogLevel(); extern int32_t getLogLevel();
#endif // PLAN_TEST_UTIL_H #endif // PLAN_TEST_UTIL_H

View File

@ -847,7 +847,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
memcpy(res->datum.p, output.columnData->pData, len); memcpy(res->datum.p, output.columnData->pData, len);
} else if (IS_VAR_DATA_TYPE(type)) { } else if (IS_VAR_DATA_TYPE(type)) {
//res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1); //res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData), 1); res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData) + 1, 1);
res->node.resType.bytes = varDataTLen(output.columnData->pData); res->node.resType.bytes = varDataTLen(output.columnData->pData);
memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData)); memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
} else { } else {

View File

@ -255,6 +255,7 @@ typedef struct SSchJob {
SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeLoad> SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeLoad>
SArray *levels; // starting from 0. SArray<SSchLevel> SArray *levels; // starting from 0. SArray<SSchLevel>
SQueryPlan *pDag; SQueryPlan *pDag;
int64_t allocatorRefId;
SArray *dataSrcTasks; // SArray<SQueryTask*> SArray *dataSrcTasks; // SArray<SQueryTask*>
int32_t levelIdx; int32_t levelIdx;

View File

@ -673,6 +673,7 @@ void schFreeJobImpl(void *job) {
destroyQueryExecRes(&pJob->execRes); destroyQueryExecRes(&pJob->execRes);
qDestroyQueryPlan(pJob->pDag); qDestroyQueryPlan(pJob->pDag);
nodesDestroyAllocator(pJob->allocatorRefId);
taosMemoryFreeClear(pJob->userRes.execRes); taosMemoryFreeClear(pJob->userRes.execRes);
taosMemoryFreeClear(pJob->fetchRes); taosMemoryFreeClear(pJob->fetchRes);
@ -724,6 +725,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
pJob->sql = strdup(pReq->sql); pJob->sql = strdup(pReq->sql);
} }
pJob->pDag = pReq->pDag; pJob->pDag = pReq->pDag;
pJob->allocatorRefId = nodesIncAllocatorRefCount(pReq->allocatorRefId);
pJob->chkKillFp = pReq->chkKillFp; pJob->chkKillFp = pReq->chkKillFp;
pJob->chkKillParam = pReq->chkKillParam; pJob->chkKillParam = pReq->chkKillParam;
pJob->userRes.execFp = pReq->execFp; pJob->userRes.execFp = pReq->execFp;

View File

@ -841,6 +841,10 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
// copy content to the parent page // copy content to the parent page
tdbBtreeInitPage(pParent, &(SBtreeInitPageArg){.flags = flags, .pBt = pBt}, 0); tdbBtreeInitPage(pParent, &(SBtreeInitPageArg){.flags = flags, .pBt = pBt}, 0);
tdbPageCopy(pNews[0], pParent, 1); tdbPageCopy(pNews[0], pParent, 1);
if (!TDB_BTREE_PAGE_IS_LEAF(pNews[0])) {
((SIntHdr *)(pParent->pData))->pgno = ((SIntHdr *)(pNews[0]->pData))->pgno;
}
} }
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {

View File

@ -260,7 +260,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
pPage->isDirty = 0; pPage->isDirty = 0;
// tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
tdbPCacheRelease(pPager->pCache, pPage, pTxn); tdbPCacheRelease(pPager->pCache, pPage, pTxn);
} }
@ -353,7 +353,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
pPage->isDirty = 0; pPage->isDirty = 0;
// tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
tdbPCacheRelease(pPager->pCache, pPage, pTxn); tdbPCacheRelease(pPager->pCache, pPage, pTxn);
} }

View File

@ -31,45 +31,57 @@ class TDTestCase:
tdSql.prepare() tdSql.prepare()
for i in range(self.tbnum): for i in range(self.tbnum):
tdSql.execute(f'create table {self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.ttl_param}') tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.ttl_param}')
tdSql.query(f'show tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum) tdSql.checkRows(self.tbnum)
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval']) sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'])
tdSql.query(f'show tables') tdSql.query(f'show db.tables')
tdSql.checkRows(0) tdSql.checkRows(0)
for i in range(self.tbnum): for i in range(self.tbnum):
tdSql.execute(f'create table {self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.default_ttl}') tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.default_ttl}')
for i in range(int(self.tbnum/2)): for i in range(int(self.tbnum/2)):
tdSql.execute(f'alter table {self.ntbname}_{i} ttl {self.modify_ttl}') tdSql.execute(f'alter table db.{self.ntbname}_{i} ttl {self.modify_ttl}')
sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval']) sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'])
tdSql.query(f'show tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum - int(self.tbnum/2)) tdSql.checkRows(self.tbnum - int(self.tbnum/2))
tdSql.execute('drop database db') tdSql.execute('drop database db')
def ttl_check_ctb(self): def ttl_check_ctb(self):
tdSql.prepare() tdSql.prepare()
tdSql.execute(f'create table {self.stbname} (ts timestamp,c0 int) tags(t0 int)') tdSql.execute(f'create table db.{self.stbname} (ts timestamp,c0 int) tags(t0 int)')
for i in range(self.tbnum): for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({i}) ttl {self.ttl_param}') tdSql.execute(f'create table db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param}')
tdSql.query(f'show tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum) tdSql.checkRows(self.tbnum)
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval']) sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'])
tdSql.query(f'show tables') tdSql.query(f'show db.tables')
tdSql.checkRows(0) tdSql.checkRows(0)
for i in range(self.tbnum): for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({i}) ttl {self.default_ttl}') tdSql.execute(f'create table db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.default_ttl}')
tdSql.query(f'show tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum) tdSql.checkRows(self.tbnum)
for i in range(int(self.tbnum/2)): for i in range(int(self.tbnum/2)):
tdSql.execute(f'alter table {self.stbname}_{i} ttl {self.modify_ttl}') tdSql.execute(f'alter table db.{self.stbname}_{i} ttl {self.modify_ttl}')
sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval']) sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'])
tdSql.query(f'show tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum - int(self.tbnum/2)) tdSql.checkRows(self.tbnum - int(self.tbnum/2))
tdSql.execute('drop database db') tdSql.execute('drop database db')
def ttl_check_insert(self):
tdSql.prepare()
tdSql.execute(f'create table db.{self.stbname} (ts timestamp,c0 int) tags(t0 int)')
for i in range(self.tbnum):
tdSql.execute(f'insert into db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param} values(now,1)')
tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum)
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'])
tdSql.query(f'show db.tables')
tdSql.checkRows(0)
tdSql.execute('drop database db')
def run(self): def run(self):
self.ttl_check_ntb() self.ttl_check_ntb()
self.ttl_check_ctb() self.ttl_check_ctb()
self.ttl_check_insert()
def stop(self): def stop(self):
tdSql.close() tdSql.close()

View File

@ -32,7 +32,7 @@ python3 ./test.py -f 1-insert/block_wise.py
python3 ./test.py -f 1-insert/create_retentions.py python3 ./test.py -f 1-insert/create_retentions.py
python3 ./test.py -f 1-insert/table_param_ttl.py python3 ./test.py -f 1-insert/table_param_ttl.py
python3 ./test.py -f 1-insert/mutil_stage.py python3 ./test.py -f 1-insert/mutil_stage.py
python3 ./test.py -f 1-insert/table_param_ttl.py -R
python3 ./test.py -f 1-insert/update_data_muti_rows.py python3 ./test.py -f 1-insert/update_data_muti_rows.py
python3 ./test.py -f 1-insert/db_tb_name_check.py python3 ./test.py -f 1-insert/db_tb_name_check.py

View File

@ -294,8 +294,8 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
tjsonAddIntegerToObject(item, "conflict", pObj->conflict); tjsonAddIntegerToObject(item, "conflict", pObj->conflict);
tjsonAddIntegerToObject(item, "exec", pObj->exec); tjsonAddIntegerToObject(item, "exec", pObj->exec);
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime)); tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
tjsonAddStringToObject(item, "dbname1", pObj->dbname1); tjsonAddStringToObject(item, "dbname", pObj->dbname);
tjsonAddStringToObject(item, "dbname2", pObj->dbname2); tjsonAddStringToObject(item, "stbname", pObj->stbname);
tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions)); tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions));
tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions)); tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions));
tjsonAddIntegerToObject(item, "undoActionNum", taosArrayGetSize(pObj->undoActions)); tjsonAddIntegerToObject(item, "undoActionNum", taosArrayGetSize(pObj->undoActions));