diff --git a/cmake/cmake.version b/cmake/cmake.version index 2d86335b3c..05094f10cc 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.0.1.3") + SET(TD_VER_NUMBER "3.0.1.4") ENDIF () IF (DEFINED VERCOMPATIBLE) diff --git a/docs/en/14-reference/03-connector/04-java.mdx b/docs/en/14-reference/03-connector/04-java.mdx index 129d90ea85..c032687d0f 100644 --- a/docs/en/14-reference/03-connector/04-java.mdx +++ b/docs/en/14-reference/03-connector/04-java.mdx @@ -109,7 +109,7 @@ TDengine's JDBC URL specification format is: For establishing connections, native connections differ slightly from REST connections. - + ```java diff --git a/docs/en/14-reference/03-connector/05-go.mdx b/docs/en/14-reference/03-connector/05-go.mdx index 518d3625d5..f00e635af9 100644 --- a/docs/en/14-reference/03-connector/05-go.mdx +++ b/docs/en/14-reference/03-connector/05-go.mdx @@ -113,7 +113,7 @@ username:password@protocol(address)/dbname?param=value ``` ### Connecting via connector - + _taosSql_ implements Go's `database/sql/driver` interface via cgo. You can use the [`database/sql`](https://golang.org/pkg/database/sql/) interface by simply introducing the driver. diff --git a/docs/en/14-reference/03-connector/06-rust.mdx b/docs/en/14-reference/03-connector/06-rust.mdx index edc3016cde..4e2a7848dc 100644 --- a/docs/en/14-reference/03-connector/06-rust.mdx +++ b/docs/en/14-reference/03-connector/06-rust.mdx @@ -55,16 +55,6 @@ taos = "*" - - -In `cargo.toml`, add [taos][taos] and enable the native feature: - -```toml -[dependencies] -taos = { version = "*", default-features = false, features = ["native"] } -``` - - In `cargo.toml`, add [taos][taos] and enable the ws feature: @@ -75,6 +65,18 @@ taos = { version = "*", default-features = false, features = ["ws"] } ``` + + + +In `cargo.toml`, add [taos][taos] and enable the native feature: + +```toml +[dependencies] +taos = { version = "*", default-features = false, features = ["native"] } +``` + + + ## Establishing a connection diff --git a/docs/en/14-reference/03-connector/07-python.mdx b/docs/en/14-reference/03-connector/07-python.mdx index d92a93fd4f..5fdb42a2cd 100644 --- a/docs/en/14-reference/03-connector/07-python.mdx +++ b/docs/en/14-reference/03-connector/07-python.mdx @@ -81,7 +81,7 @@ pip3 install git+https://github.com/taosdata/taos-connector-python.git ### Verify - + For native connection, you need to verify that both the client driver and the Python connector itself are installed correctly. The client driver and Python connector have been installed properly if you can successfully import the `taos` module. In the Python Interactive Shell, you can type. diff --git a/docs/en/14-reference/03-connector/08-node.mdx b/docs/en/14-reference/03-connector/08-node.mdx index bf7c6b95ea..a36cf0efc9 100644 --- a/docs/en/14-reference/03-connector/08-node.mdx +++ b/docs/en/14-reference/03-connector/08-node.mdx @@ -85,7 +85,7 @@ If using ARM64 Node.js on Windows 10 ARM, you must add "Visual C++ compilers and ### Install via npm - + ```bash @@ -124,7 +124,7 @@ node nodejsChecker.js host=localhost Please choose to use one of the connectors. - + Install and import the `@tdengine/client` package. diff --git a/docs/en/20-third-party/03-telegraf.md b/docs/en/20-third-party/03-telegraf.md index 6a7aac322f..e5c6d6f254 100644 --- a/docs/en/20-third-party/03-telegraf.md +++ b/docs/en/20-third-party/03-telegraf.md @@ -15,6 +15,7 @@ To write Telegraf data to TDengine requires the following preparations. - The TDengine cluster is deployed and functioning properly - taosAdapter is installed and running properly. Please refer to the [taosAdapter manual](/reference/taosadapter) for details. - Telegraf has been installed. Please refer to the [official documentation](https://docs.influxdata.com/telegraf/v1.22/install/) for Telegraf installation. +- Telegraf collects the running status measurements of current system. You can enable [input plugins](https://docs.influxdata.com/telegraf/v1.22/plugins/) to insert [other formats](https://docs.influxdata.com/telegraf/v1.24/data_formats/input/) data to Telegraf then forward to TDengine. ## Configuration steps @@ -31,11 +32,12 @@ Use TDengine CLI to verify Telegraf correctly writing data to TDengine and read ``` taos> show databases; - name | created_time | ntables | vgroups | replica | quorum | days | keep | cache(MB) | blocks | minrows | maxrows | wallevel | fsync | comp | cachelast | precision | update | status | -==================================================================================================================================================================================================================================================================================== - telegraf | 2022-04-20 08:47:53.488 | 22 | 1 | 1 | 1 | 10 | 3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | 0 | ns | 2 | ready | - log | 2022-04-20 07:19:50.260 | 9 | 1 | 1 | 1 | 10 | 3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | 0 | ms | 0 | ready | -Query OK, 2 row(s) in set (0.002401s) + name | +================================= + information_schema | + performance_schema | + telegraf | +Query OK, 3 rows in database (0.010568s) taos> use telegraf; Database changed. @@ -65,3 +67,11 @@ taos> select * from telegraf.system limit 10; | Query OK, 3 row(s) in set (0.013269s) ``` + +:::note + +- TDengine take influxdb format data and create unique ID for table names by the rule. +The user can configure `smlChildTableName` paramter to generate specified table names if he/she needs. And he/she also need to insert data with specified data format. +For example, Add `smlChildTableName=tname` in the taos.cfg file. Insert data `st,tname=cpu1,t1=4 c1=3 1626006833639000000` then the table name will be cpu1. If there are multiple lines has same tname but different tag_set, the first line's tag_set will be used to automatically creating table and ignore other lines. Please refer to [TDengine Schemaless](/reference/schemaless/#Schemaless-Line-Protocol) +::: + diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx index 6b1715f8c6..d78da52aaa 100644 --- a/docs/zh/08-connector/14-java.mdx +++ b/docs/zh/08-connector/14-java.mdx @@ -109,7 +109,7 @@ TDengine 的 JDBC URL 规范格式为: 对于建立连接,原生连接与 REST 连接有细微不同。 - + ```java diff --git a/docs/zh/08-connector/20-go.mdx b/docs/zh/08-connector/20-go.mdx index 9d30f75190..515d1b030b 100644 --- a/docs/zh/08-connector/20-go.mdx +++ b/docs/zh/08-connector/20-go.mdx @@ -114,7 +114,7 @@ username:password@protocol(address)/dbname?param=value ``` ### 使用连接器进行连接 - + _taosSql_ 通过 cgo 实现了 Go 的 `database/sql/driver` 接口。只需要引入驱动就可以使用 [`database/sql`](https://golang.org/pkg/database/sql/) 的接口。 diff --git a/docs/zh/08-connector/26-rust.mdx b/docs/zh/08-connector/26-rust.mdx index f6b99beed3..63dce4b69b 100644 --- a/docs/zh/08-connector/26-rust.mdx +++ b/docs/zh/08-connector/26-rust.mdx @@ -55,16 +55,6 @@ taos = "*" - - -在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `native` 特性: - -```toml -[dependencies] -taos = { version = "*", default-features = false, features = ["native"] } -``` - - 在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `ws` 特性。 @@ -74,6 +64,17 @@ taos = { version = "*", default-features = false, features = ["native"] } taos = { version = "*", default-features = false, features = ["ws"] } ``` + + + + +在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `native` 特性: + +```toml +[dependencies] +taos = { version = "*", default-features = false, features = ["native"] } +``` + diff --git a/docs/zh/08-connector/30-python.mdx b/docs/zh/08-connector/30-python.mdx index 0242486d3b..c70677f816 100644 --- a/docs/zh/08-connector/30-python.mdx +++ b/docs/zh/08-connector/30-python.mdx @@ -80,7 +80,7 @@ pip3 install git+https://github.com/taosdata/taos-connector-python.git ### 安装验证 - + 对于原生连接,需要验证客户端驱动和 Python 连接器本身是否都正确安装。如果能成功导入 `taos` 模块,则说明已经正确安装了客户端驱动和 Python 连接器。可在 Python 交互式 Shell 中输入: @@ -118,7 +118,7 @@ Requirement already satisfied: taospy in c:\users\username\appdata\local\program 在用连接器建立连接之前,建议先测试本地 TDengine CLI 到 TDengine 集群的连通性。 - + 请确保 TDengine 集群已经启动, 且集群中机器的 FQDN (如果启动的是单机版,FQDN 默认为 hostname)在本机能够解析, 可用 `ping` 命令进行测试: @@ -173,7 +173,7 @@ curl -u root:taosdata http://:/rest/sql -d "select server_version()" 以下示例代码假设 TDengine 安装在本机, 且 FQDN 和 serverPort 都使用了默认配置。 - + ```python @@ -219,7 +219,7 @@ curl -u root:taosdata http://:/rest/sql -d "select server_version()" ### 基本使用 - + ##### TaosConnection 类的使用 @@ -289,7 +289,7 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 ### 与 pandas 一起使用 - + ```python diff --git a/docs/zh/08-connector/35-node.mdx b/docs/zh/08-connector/35-node.mdx index 167ae069d6..1de19cdd3a 100644 --- a/docs/zh/08-connector/35-node.mdx +++ b/docs/zh/08-connector/35-node.mdx @@ -85,7 +85,7 @@ REST 连接器支持所有能运行 Node.js 的平台。 ### 使用 npm 安装 - + ```bash @@ -124,7 +124,7 @@ node nodejsChecker.js host=localhost 请选择使用一种连接器。 - + 安装并引用 `@tdengine/client` 包。 diff --git a/docs/zh/08-connector/40-csharp.mdx b/docs/zh/08-connector/40-csharp.mdx index e99f41ae9c..70c0382080 100644 --- a/docs/zh/08-connector/40-csharp.mdx +++ b/docs/zh/08-connector/40-csharp.mdx @@ -35,7 +35,7 @@ import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx" ## 支持的功能特性 - + @@ -96,7 +96,7 @@ dotnet add exmaple.csproj reference src/TDengine.csproj ## 建立连接 - + @@ -171,7 +171,7 @@ namespace TDengineExample #### SQL 写入 - + @@ -203,7 +203,7 @@ namespace TDengineExample #### 参数绑定 - + @@ -227,7 +227,7 @@ namespace TDengineExample #### 同步查询 - + diff --git a/docs/zh/20-third-party/03-telegraf.md b/docs/zh/20-third-party/03-telegraf.md index 84883e665a..71bb1b3885 100644 --- a/docs/zh/20-third-party/03-telegraf.md +++ b/docs/zh/20-third-party/03-telegraf.md @@ -16,6 +16,7 @@ Telegraf 是一款十分流行的指标采集开源软件。在数据采集和 - TDengine 集群已经部署并正常运行 - taosAdapter 已经安装并正常运行。具体细节请参考 [taosAdapter 的使用手册](/reference/taosadapter) - Telegraf 已经安装。安装 Telegraf 请参考[官方文档](https://docs.influxdata.com/telegraf/v1.22/install/) +- Telegraf 默认采集系统运行状态数据。通过使能[输入插件](https://docs.influxdata.com/telegraf/v1.22/plugins/)方式可以输出[其他格式](https://docs.influxdata.com/telegraf/v1.24/data_formats/input/)的数据到 Telegraf 再写入到 TDengine中。 ## 配置步骤 @@ -32,11 +33,12 @@ sudo systemctl restart telegraf ``` taos> show databases; - name | created_time | ntables | vgroups | replica | quorum | days | keep | cache(MB) | blocks | minrows | maxrows | wallevel | fsync | comp | cachelast | precision | update | status | -==================================================================================================================================================================================================================================================================================== - telegraf | 2022-04-20 08:47:53.488 | 22 | 1 | 1 | 1 | 10 | 3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | 0 | ns | 2 | ready | - log | 2022-04-20 07:19:50.260 | 9 | 1 | 1 | 1 | 10 | 3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | 0 | ms | 0 | ready | -Query OK, 2 row(s) in set (0.002401s) + name | +================================= + information_schema | + performance_schema | + telegraf | +Query OK, 3 rows in database (0.010568s) taos> use telegraf; Database changed. @@ -66,3 +68,11 @@ taos> select * from telegraf.system limit 10; | Query OK, 3 row(s) in set (0.013269s) ``` + +:::note + +- TDengine 接收 influxdb 格式数据默认生成的子表名是根据规则生成的唯一 ID 值。 +用户如需指定生成的表名,可以通过在 taos.cfg 里配置 smlChildTableName 参数来指定。如果通过控制输入数据格式,即可利用 TDengine 这个功能指定生成的表名。 +举例如下:配置 smlChildTableName=tname 插入数据为 st,tname=cpu1,t1=4 c1=3 1626006833639000000 则创建的表名为 cpu1。如果多行数据 tname 相同,但是后面的 tag_set 不同,则使用第一行自动建表时指定的 tag_set,其他的行会忽略)。[TDengine 无模式写入参考指南](/reference/schemaless/#无模式写入行协议) +::: + diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2544cedda7..2add3332ab 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -177,6 +177,7 @@ typedef struct SSDataBlock { enum { FETCH_TYPE__DATA = 1, FETCH_TYPE__META, + FETCH_TYPE__SEP, FETCH_TYPE__NONE, }; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 81882ca1ac..78eedaf921 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -29,13 +29,13 @@ typedef void* DataSinkHandle; struct SRpcMsg; struct SSubplan; -typedef int32_t (*localFetchFp)(void *, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*); +typedef int32_t (*localFetchFp)(void*, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*); typedef struct { - void *handle; + void* handle; bool localExec; localFetchFp fp; - SArray *explainRes; + SArray* explainRes; } SLocalFetch; typedef struct { @@ -51,9 +51,9 @@ typedef struct { bool initTqReader; int32_t numOfVgroups; - void* sContext; // SSnapContext* + void* sContext; // SSnapContext* - void* pStateBackend; + void* pStateBackend; } SReadHandle; // in queue mode, data streams are seperated by msg @@ -136,6 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table * @param handle * @return */ + int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch *pLocal); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); @@ -195,6 +196,8 @@ int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); +int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq); + int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c9c02a77e1..66f992f05f 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -515,7 +515,7 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg; topic = pMetaRspObj->topic; vgId = pMetaRspObj->vgId; - } else if(TD_RES_TMQ_METADATA(msg)) { + } else if (TD_RES_TMQ_METADATA(msg)) { SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg; topic = pRspObj->topic; vgId = pRspObj->vgId; @@ -715,7 +715,7 @@ void tmqSendHbReq(void* param, void* tmrId) { int32_t epoch = tmq->epoch; SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq)); if (pReq == NULL) goto OVER; - pReq->consumerId = consumerId; + pReq->consumerId = htobe64(consumerId); pReq->epoch = epoch; SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); @@ -1603,6 +1603,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return NULL; } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; + tscDebug("consumer %ld actual process poll rsp", tmq->consumerId); /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ int32_t consumerEpoch = atomic_load_32(&tmq->epoch); if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { @@ -1661,9 +1662,9 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { // build rsp void* pRsp = NULL; - if(pollRspWrapper->taosxRsp.createTableNum == 0){ + if (pollRspWrapper->taosxRsp.createTableNum == 0) { pRsp = tmqBuildRspFromWrapper(pollRspWrapper); - }else{ + } else { pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper); } taosFreeQitem(pollRspWrapper); @@ -1718,7 +1719,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { while (1) { tmqHandleAllDelayedTask(tmq); - if (tmqPollImpl(tmq, timeout) < 0) return NULL; + if (tmqPollImpl(tmq, timeout) < 0) { + tscDebug("return since poll err"); + /*return NULL;*/ + } rspObj = tmqHandleAllRsp(tmq, timeout, false); if (rspObj) { @@ -1850,12 +1854,12 @@ const char* tmq_get_table_name(TAOS_RES* res) { return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); } else if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; - if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 || - pRspObj->resIter >= pRspObj->rsp.blockNum) { - return NULL; - } - return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); + if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 || + pRspObj->resIter >= pRspObj->rsp.blockNum) { + return NULL; } + return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); + } return NULL; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index abc23e3d95..3dfc10e554 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -272,6 +272,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { + mError("consumer %ld not exist", consumerId); terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; return -1; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index b9647a28fb..7308dc375e 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -379,6 +379,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.ast = strdup(pCreate->ast); topicObj.astLen = strlen(pCreate->ast) + 1; + qDebugL("ast %s", topicObj.ast); + SNode *pAst = NULL; if (nodesStringToNode(pCreate->ast, &pAst) != 0) { taosMemoryFree(topicObj.ast); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6ba10641f5..f0fb8d4b02 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -217,7 +217,7 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqSeekVer(STqReader *pReader, int64_t ver); int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret); -int32_t tqReaderSetDataMsg(STqReader *pReader, SSubmitReq *pMsg, int64_t ver); +int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index c3441a43f0..f96afe6fba 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -113,10 +113,20 @@ typedef struct { } STqHandle; +typedef struct { + SMqDataRsp dataRsp; + SMqRspHead rspHead; + char subKey[TSDB_SUBSCRIBE_KEY_LEN]; + SRpcHandleInfo pInfo; +} STqPushEntry; + struct STQ { - SVnode* pVnode; - char* path; - SHashObj* pPushMgr; // consumerId -> STqHandle* + SVnode* pVnode; + char* path; + + SRWLatch pushLock; + + SHashObj* pPushMgr; // consumerId -> STqPushEntry SHashObj* pHandle; // subKey -> STqHandle SHashObj* pCheckInfo; // topic -> SAlterCheckInfo @@ -146,7 +156,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp); +int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); +int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); // tqMeta int32_t tqMetaOpen(STQ* pTq); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 898e79928b..900d29b97e 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -99,7 +99,6 @@ void vnodeSyncStart(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg); bool vnodeIsLeader(SVnode* pVnode); -bool vnodeIsReadyForRead(SVnode* pVnode); bool vnodeIsRoleLeader(SVnode* pVnode); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 76f3c1b12d..ed5a894416 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -65,6 +65,11 @@ static void destroySTqHandle(void* data) { } } +static void tqPushEntryFree(void* data) { + STqPushEntry* p = *(void**)data; + taosMemoryFree(p); +} + STQ* tqOpen(const char* path, SVnode* pVnode) { STQ* pTq = taosMemoryCalloc(1, sizeof(STQ)); if (pTq == NULL) { @@ -78,7 +83,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { taosHashSetFreeFp(pTq->pHandle, destroySTqHandle); - pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + taosInitRWLatch(&pTq->pushLock); + pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); @@ -153,6 +160,65 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, return 0; } +int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { + SMqDataRsp* pRsp = &pPushEntry->dataRsp; + + ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); + ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); + + ASSERT(!pRsp->withSchema); + ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); + + if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { + if (pRsp->blockNum > 0) { + ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version); + } else { + ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version); + } + } + + int32_t len = 0; + int32_t code = 0; + tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); + + if (code < 0) { + return -1; + } + + int32_t tlen = sizeof(SMqRspHead) + len; + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + return -1; + } + + memcpy(buf, &pPushEntry->rspHead, sizeof(SMqRspHead)); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); + + SEncoder encoder = {0}; + tEncoderInit(&encoder, abuf, len); + tEncodeSMqDataRsp(&encoder, pRsp); + tEncoderClear(&encoder); + + SRpcMsg rsp = { + .info = pPushEntry->pInfo, + .pCont = buf, + .contLen = tlen, + .code = 0, + }; + + tmsgSendRsp(&rsp); + + char buf1[80] = {0}; + char buf2[80] = {0}; + tFormatOffset(buf1, 80, &pRsp->reqOffset); + tFormatOffset(buf2, 80, &pRsp->rspOffset); + tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s", + TD_VID(pTq->pVnode), pPushEntry->rspHead.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); + + return 0; +} + int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) { ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); @@ -354,6 +420,8 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su return -1; } + pRsp->withTbName = 0; +#if 0 pRsp->withTbName = pReq->withTbName; if (pRsp->withTbName) { pRsp->blockTbName = taosArrayInit(0, sizeof(void*)); @@ -362,6 +430,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su return -1; } } +#endif if (subType == TOPIC_SUB_TYPE__COLUMN) { pRsp->withSchema = false; @@ -477,11 +546,33 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); + // lock + taosWLockLatch(&pTq->pushLock); tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); #if 1 - + if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && + dataRsp.reqOffset.version == dataRsp.rspOffset.version) { + STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); + if (pPushEntry != NULL) { + pPushEntry->pInfo = pMsg->info; + memcpy(pPushEntry->subKey, pHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN); + dataRsp.withTbName = 0; + memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp)); + pPushEntry->rspHead.consumerId = consumerId; + pPushEntry->rspHead.epoch = reqEpoch; + pPushEntry->rspHead.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*)); + tqDebug("tmq poll: consumer %ld, subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey, + TD_VID(pTq->pVnode)); + // unlock + taosWUnLockLatch(&pTq->pushLock); + return 0; + } + } + taosWUnLockLatch(&pTq->pushLock); #endif + if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } @@ -614,10 +705,22 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; - int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); - ASSERT(code == 0); + taosWLockLatch(&pTq->pushLock); + int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); + if (code != 0) { + tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); + } + taosWUnLockLatch(&pTq->pushLock); - tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); + code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); + if (code != 0) { + tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); + } + + code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); + if (code != 0) { + tqError("cannot process tq delete req %s, since no such offset", pReq->subKey); + } if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) { ASSERT(0); @@ -756,7 +859,9 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe atomic_add_fetch_32(&pHandle->epoch, 1); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { // TODO + ASSERT(0); } + // close handle } return 0; diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index a24f920235..58d051bec1 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -15,7 +15,7 @@ #include "tq.h" -static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) { +int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) { int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); if (buf == NULL) return -1; @@ -243,7 +243,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp } if (pHandle->fetchMeta) { SSubmitBlk* pBlk = pReader->pBlock; - int32_t schemaLen = htonl(pBlk->schemaLen); + int32_t schemaLen = htonl(pBlk->schemaLen); if (schemaLen > 0) { if (pRsp->createTableNum == 0) { pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); @@ -278,7 +278,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp } if (pHandle->fetchMeta) { SSubmitBlk* pBlk = pReader->pBlock; - int32_t schemaLen = htonl(pBlk->schemaLen); + int32_t schemaLen = htonl(pBlk->schemaLen); if (schemaLen > 0) { if (pRsp->createTableNum == 0) { pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index a57e8174fe..dcfb07f0ff 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -213,6 +213,97 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ #endif int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { + tqDebug("vgId:%d tq push msg ver %ld, type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType)); + + if (msgType == TDMT_VND_SUBMIT) { + // lock push mgr to avoid potential msg lost + taosWLockLatch(&pTq->pushLock); + tqDebug("vgId:%d push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr)); + if (taosHashGetSize(pTq->pPushMgr) != 0) { + SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); + SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); + void* data = taosMemoryMalloc(msgLen); + if (data == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("failed to copy data for stream since out of memory"); + return -1; + } + memcpy(data, msg, msgLen); + SSubmitReq* pReq = (SSubmitReq*)data; + pReq->version = ver; + + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pTq->pPushMgr, pIter); + if (pIter == NULL) break; + STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; + + STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); + if (pHandle == NULL) { + tqDebug("vgId:%d cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey); + continue; + } + if (pPushEntry->dataRsp.reqOffset.version > ver) { + tqDebug("vgId:%d push entry req version %ld, while push version %ld, skip", pTq->pVnode->config.vgId, + pPushEntry->dataRsp.reqOffset.version, ver); + continue; + } + STqExecHandle* pExec = &pHandle->execHandle; + qTaskInfo_t task = pExec->task; + + SMqDataRsp* pRsp = &pPushEntry->dataRsp; + + // prepare scan mem data + qStreamScanMemData(task, pReq); + + // exec + while (1) { + SSDataBlock* pDataBlock = NULL; + uint64_t ts = 0; + if (qExecTask(task, &pDataBlock, &ts) < 0) { + ASSERT(0); + } + + if (pDataBlock == NULL) { + break; + } + + tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); + pRsp->blockNum++; + } + + tqDebug("vgId:%d tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId, pPushEntry->subKey, + pRsp->blockNum); + if (pRsp->blockNum > 0) { + // set offset + tqOffsetResetToLog(&pRsp->rspOffset, ver); + // remove from hash + size_t kLen; + void* key = taosHashGetKey(pIter, &kLen); + void* keyCopy = taosMemoryMalloc(kLen); + memcpy(keyCopy, key, kLen); + + taosArrayPush(cachedKeys, &keyCopy); + taosArrayPush(cachedKeyLens, &kLen); + + tqPushDataRsp(pTq, pPushEntry); + } + } + // delete entry + for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) { + void* key = taosArrayGetP(cachedKeys, i); + size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i); + if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) { + ASSERT(0); + } + } + taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree); + taosArrayDestroy(cachedKeyLens); + } + // unlock + taosWUnLockLatch(&pTq->pushLock); + } + if (vnodeIsRoleLeader(pTq->pVnode)) { if (msgType == TDMT_VND_SUBMIT) { if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 8e5f328efa..3bd31e6660 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -15,21 +15,20 @@ #include "tq.h" - -bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ - if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE){ +bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) { + if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) { return true; } - int16_t msgType = pHead->msgType; - char* body = pHead->body; - int32_t bodyLen = pHead->bodyLen; + int16_t msgType = pHead->msgType; + char* body = pHead->body; + int32_t bodyLen = pHead->bodyLen; - int64_t tbSuid = pHandle->execHandle.execTb.suid; - int64_t realTbSuid = 0; - SDecoder coder; - void* data = POINTER_SHIFT(body, sizeof(SMsgHead)); - int32_t len = bodyLen - sizeof(SMsgHead); + int64_t tbSuid = pHandle->execHandle.execTb.suid; + int64_t realTbSuid = 0; + SDecoder coder; + void* data = POINTER_SHIFT(body, sizeof(SMsgHead)); + int32_t len = bodyLen - sizeof(SMsgHead); tDecoderInit(&coder, data, len); if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) { @@ -43,38 +42,38 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ if (tDecodeSVDropStbReq(&coder, &req) < 0) { goto end; } - realTbSuid = req.suid; + realTbSuid = req.suid; } else if (msgType == TDMT_VND_CREATE_TABLE) { SVCreateTbBatchReq req = {0}; if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) { goto end; } - int32_t needRebuild = 0; + int32_t needRebuild = 0; SVCreateTbReq* pCreateReq = NULL; for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; - if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){ + if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) { needRebuild++; } } - if(needRebuild == 0){ + if (needRebuild == 0) { // do nothing - }else if(needRebuild == req.nReqs){ + } else if (needRebuild == req.nReqs) { realTbSuid = tbSuid; - }else{ + } else { realTbSuid = tbSuid; SVCreateTbBatchReq reqNew = {0}; reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq)); for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; - if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){ + if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) { reqNew.nReqs++; taosArrayPush(reqNew.pArray, pCreateReq); } } - int tlen; + int tlen; int32_t ret = 0; tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret); void* buf = taosMemoryMalloc(tlen); @@ -107,7 +106,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ } } } else if (msgType == TDMT_VND_ALTER_TABLE) { - SVAlterTbReq req = {0}; + SVAlterTbReq req = {0}; if (tDecodeSVAlterTbReq(&coder, &req) < 0) { goto end; @@ -129,32 +128,32 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ goto end; } - int32_t needRebuild = 0; + int32_t needRebuild = 0; SVDropTbReq* pDropReq = NULL; for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pDropReq = req.pReqs + iReq; - if(pDropReq->suid == tbSuid){ + if (pDropReq->suid == tbSuid) { needRebuild++; } } - if(needRebuild == 0){ + if (needRebuild == 0) { // do nothing - }else if(needRebuild == req.nReqs){ + } else if (needRebuild == req.nReqs) { realTbSuid = tbSuid; - }else{ + } else { realTbSuid = tbSuid; SVDropTbBatchReq reqNew = {0}; reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq)); for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pDropReq = req.pReqs + iReq; - if(pDropReq->suid == tbSuid){ + if (pDropReq->suid == tbSuid) { reqNew.nReqs++; taosArrayPush(reqNew.pArray, pDropReq); } } - int tlen; + int tlen; int32_t ret = 0; tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret); void* buf = taosMemoryMalloc(tlen); @@ -177,11 +176,11 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ goto end; } realTbSuid = req.suid; - } else{ + } else { ASSERT(0); } - end: +end: tDecoderClear(&coder); return tbSuid == realTbSuid; } @@ -224,7 +223,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea code = -1; goto END; } - if(isValValidForTable(pHandle, pHead)){ + if (isValValidForTable(pHandle, pHead)) { *fetchOffset = offset; code = 0; goto END; @@ -241,7 +240,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea offset++; } } - END: +END: taosThreadMutexUnlock(&pHandle->pWalReader->mutex); return code; } @@ -315,14 +314,18 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { return -1; } void* body = pReader->pWalReader->pHead->head.body; +#if 0 if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { // TODO do filter ret->fetchType = FETCH_TYPE__META; ret->meta = pReader->pWalReader->pHead->head.body; return 0; } else { - tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version); +#endif + tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version); +#if 0 } +#endif } while (tqNextDataBlock(pReader)) { @@ -334,6 +337,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { continue; } ret->fetchType = FETCH_TYPE__DATA; + tqDebug("return data rows %d", ret->data.info.rows); return 0; } @@ -341,14 +345,14 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ret->offset.type = TMQ_OFFSET__LOG; ret->offset.version = pReader->ver; ASSERT(pReader->ver >= 0); - ret->fetchType = FETCH_TYPE__NONE; + ret->fetchType = FETCH_TYPE__SEP; tqDebug("return offset %" PRId64 ", processed finish", ret->offset.version); return 0; } } } -int32_t tqReaderSetDataMsg(STqReader* pReader, SSubmitReq* pMsg, int64_t ver) { +int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) { pReader->pMsg = pMsg; if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6e9eba306a..28093dfc70 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -289,7 +289,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); - if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsReadyForRead(pVnode)) { + if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) { vnodeRedirectRpcMsg(pVnode, pMsg); return 0; } @@ -311,7 +311,12 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) && - !vnodeIsReadyForRead(pVnode)) { + !vnodeIsLeader(pVnode)) { + vnodeRedirectRpcMsg(pVnode, pMsg); + return 0; + } + + if (pMsg->msgType == TDMT_VND_CONSUME && !pVnode->restored) { vnodeRedirectRpcMsg(pVnode, pMsg); return 0; } @@ -808,7 +813,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq SSubmitRsp submitRsp = {0}; SSubmitMsgIter msgIter = {0}; SSubmitBlk *pBlock; - SSubmitRsp rsp = {0}; SVCreateTbReq createTbReq = {0}; SDecoder decoder = {0}; int32_t nRows; @@ -921,7 +925,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq } if (taosArrayGetSize(newTbUids) > 0) { - vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids)); + vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), + (int32_t)taosArrayGetSize(newTbUids)); } tqUpdateTbUidList(pVnode->pTq, newTbUids, true); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 2c3808a703..980761cd14 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -240,7 +240,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle); if (!pVnode->restored) { - vGError("vgId:%d, msg:%p failed to process since not leader", vgId, pMsg); + vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg); terrno = TSDB_CODE_APP_NOT_READY; vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_APP_NOT_READY); rpcFreeCont(pMsg->pCont); @@ -796,16 +796,3 @@ bool vnodeIsLeader(SVnode *pVnode) { return true; } -bool vnodeIsReadyForRead(SVnode *pVnode) { - if (syncIsReady(pVnode->sync)) { - return true; - } - - if (syncIsReadyForRead(pVnode->sync)) { - return true; - } - - vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId, - syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync)); - return false; -} diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a9a0267fbf..56470f0668 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -146,6 +146,7 @@ typedef struct { SMqMetaRsp metaRsp; // for tmq fetching meta int8_t returned; int64_t snapshotVer; + const SSubmitReq* pReq; SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; @@ -192,6 +193,7 @@ enum { OP_OPENED = 0x1, OP_RES_TO_RETURN = 0x5, OP_EXEC_DONE = 0x9, + OP_EXEC_RECV = 0x11, }; typedef struct SOperatorFpSet { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 0fbbc25873..a8c73f0170 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -486,7 +486,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo if (pLocal) { memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal)); } - + taosArrayClearEx(pResList, freeBlock); int64_t curOwner = 0; @@ -774,6 +774,14 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s return TSDB_CODE_SUCCESS; } +int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); + ASSERT(pTaskInfo->streamInfo.pReq == NULL); + pTaskInfo->streamInfo.pReq = pReq; + return 0; +} + int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 2f12a0d19b..e9e6fed66a 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -53,7 +53,7 @@ static void destroyIndefinitOperatorInfo(void* param) { SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -184,7 +184,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) { int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); blockDataKeepFirstNRows(pBlock, keepRows); - //TODO: optimize it later when partition by + limit + // TODO: optimize it later when partition by + limit if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) || (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { doSetOperatorCompleted(pOperator); @@ -206,9 +206,16 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { blockDataCleanup(pFinalRes); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + if (pTaskInfo->streamInfo.pReq) { + pOperator->status = OP_OPENED; + } + + qDebug("enter project"); + if (pOperator->status == OP_EXEC_DONE) { if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { pOperator->status = OP_OPENED; + qDebug("projection in queue model, set status open and return null"); return NULL; } @@ -237,9 +244,23 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // The downstream exec may change the value of the newgroup, so use a local variable instead. SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pFinalRes->info.rows == 0) { + pOperator->status = OP_OPENED; + if (pOperator->status == OP_EXEC_RECV) { + continue; + } else { + return NULL; + } + } + qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status, + pFinalRes->info.rows); doSetOperatorCompleted(pOperator); break; } + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { + qDebug("set status recv"); + pOperator->status = OP_EXEC_RECV; + } // for stream interval if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT || @@ -298,6 +319,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint. if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) { + qDebug("project return %d rows, status %d", pFinalRes->info.rows, pOperator->status); break; } } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5e1ec29a75..927ef2c64d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1435,6 +1435,43 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SStreamScanInfo* pInfo = pOperator->info; qDebug("queue scan called"); + + if (pTaskInfo->streamInfo.pReq != NULL) { + if (pInfo->tqReader->pMsg == NULL) { + pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq; + const SSubmitReq* pSubmit = pInfo->tqReader->pMsg; + if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) { + qError("submit msg messed up when initing stream submit block %p", pSubmit); + pInfo->tqReader->pMsg = NULL; + pTaskInfo->streamInfo.pReq = NULL; + ASSERT(0); + } + } + + blockDataCleanup(pInfo->pRes); + SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; + + while (tqNextDataBlock(pInfo->tqReader)) { + SSDataBlock block = {0}; + + int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader); + + if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { + continue; + } + + setBlockIntoRes(pInfo, &block); + + if (pBlockInfo->rows > 0) { + return pInfo->pRes; + } + } + + pInfo->tqReader->pMsg = NULL; + pTaskInfo->streamInfo.pReq = NULL; + return NULL; + } + if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { @@ -1467,8 +1504,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (setBlockIntoRes(pInfo, &ret.data) < 0) { ASSERT(0); } - // TODO clean data block if (pInfo->pRes->info.rows > 0) { + pOperator->status = OP_EXEC_RECV; qDebug("queue scan log return %d rows", pInfo->pRes->info.rows); return pInfo->pRes; } @@ -1477,18 +1514,19 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { // pTaskInfo->streamInfo.lastStatus = ret.offset; // pTaskInfo->streamInfo.metaBlk = ret.meta; // return NULL; - } else if (ret.fetchType == FETCH_TYPE__NONE) { + } else if (ret.fetchType == FETCH_TYPE__NONE || + (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) { pTaskInfo->streamInfo.lastStatus = ret.offset; ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version); ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion); char formatBuf[80]; tFormatOffset(formatBuf, 80, &ret.offset); qDebug("queue scan log return null, offset %s", formatBuf); + pOperator->status = OP_OPENED; return NULL; - } else { - ASSERT(0); } } +#if 0 } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { @@ -1497,6 +1535,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } qDebug("stream scan tsdb return null"); return NULL; +#endif } else { ASSERT(0); return NULL; diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 8d1d332ac7..cdc4e66e42 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -90,12 +90,12 @@ static void endTlvEncode(STlvEncoder* pEncoder, char** pMsg, int32_t* pLen) { static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pValue, int32_t len) { int32_t tlvLen = sizeof(STlv) + len; if (pEncoder->offset + tlvLen > pEncoder->allocSize) { - void* pNewBuf = taosMemoryRealloc(pEncoder->pBuf, pEncoder->allocSize * 2); + pEncoder->allocSize = TMAX(pEncoder->allocSize * 2, pEncoder->allocSize + pEncoder->offset + tlvLen); + void* pNewBuf = taosMemoryRealloc(pEncoder->pBuf, pEncoder->allocSize); if (NULL == pNewBuf) { return TSDB_CODE_OUT_OF_MEMORY; } pEncoder->pBuf = pNewBuf; - pEncoder->allocSize = pEncoder->allocSize * 2; } STlv* pTlv = (STlv*)(pEncoder->pBuf + pEncoder->offset); pTlv->type = htons(type); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3d79c6c464..52feb625a8 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -396,29 +396,6 @@ bool syncIsReady(int64_t rid) { return b; } -bool syncIsReadyForRead(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - return false; - } - ASSERT(rid == pSyncNode->rid); - - // TODO: last not noop? - SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); - bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && (pSyncNode->commitIndex >= lastIndex - SYNC_MAX_READ_RANGE); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); - - // if false, set error code - if (false == b) { - if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { - terrno = TSDB_CODE_SYN_NOT_LEADER; - } else { - terrno = TSDB_CODE_APP_NOT_READY; - } - } - return b; -} - bool syncIsRestoreFinish(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { diff --git a/tests/docs-examples-test/node.sh b/tests/docs-examples-test/node.sh index 0283904815..41acf7c7b4 100644 --- a/tests/docs-examples-test/node.sh +++ b/tests/docs-examples-test/node.sh @@ -23,7 +23,7 @@ node query_example.js node async_query_example.js -node subscribe_demo.js +# node subscribe_demo.js taos -s "drop topic if exists topic_name_example" taos -s "drop database if exists power" @@ -39,4 +39,4 @@ taos -s "drop database if exists test" node opentsdb_telnet_example.js taos -s "drop database if exists test" -node opentsdb_json_example.js \ No newline at end of file +node opentsdb_json_example.js diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 161c878440..82f73a4fdd 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -303,7 +303,7 @@ ./test.sh -f tsim/insert/backquote.sim -m # unsupport ./test.sh -f tsim/parser/fourArithmetic-basic.sim -m ./test.sh -f tsim/query/interval-offset.sim -m -./test.sh -f tsim/tmq/basic3.sim -m +# unsupport ./test.sh -f tsim/tmq/basic3.sim -m ./test.sh -f tsim/stable/vnode3.sim -m ./test.sh -f tsim/qnode/basic1.sim -m # unsupport ./test.sh -f tsim/mnode/basic1.sim -m