Merge branch '3.0' into feature/3_liaohj

This commit is contained in:
Haojun Liao 2022-10-30 11:18:36 +08:00
commit ab2c7c575b
26 changed files with 619 additions and 190 deletions

View File

@ -201,7 +201,7 @@ The configuration parameters in the URL are as follows:
- httpConnectTimeout: REST connection timeout in milliseconds, the default value is 5000 ms. - httpConnectTimeout: REST connection timeout in milliseconds, the default value is 5000 ms.
- httpSocketTimeout: socket timeout in milliseconds, the default value is 5000 ms. It only takes effect when batchfetch is false. - httpSocketTimeout: socket timeout in milliseconds, the default value is 5000 ms. It only takes effect when batchfetch is false.
- messageWaitTimeout: message transmission timeout in milliseconds, the default value is 3000 ms. It only takes effect when batchfetch is true. - messageWaitTimeout: message transmission timeout in milliseconds, the default value is 3000 ms. It only takes effect when batchfetch is true.
- useSSL: connecting Securely Using SSL. true: using SSL conneciton, false: not using SSL connection. - useSSL: connecting Securely Using SSL. true: using SSL connection, false: not using SSL connection.
**Note**: Some configuration items (e.g., locale, timezone) do not work in the REST connection. **Note**: Some configuration items (e.g., locale, timezone) do not work in the REST connection.
@ -268,7 +268,7 @@ The configuration parameters in properties are as follows.
- TSDBDriver.HTTP_CONNECT_TIMEOUT: REST connection timeout in milliseconds, the default value is 5000 ms. It only takes effect when using JDBC REST connection. - TSDBDriver.HTTP_CONNECT_TIMEOUT: REST connection timeout in milliseconds, the default value is 5000 ms. It only takes effect when using JDBC REST connection.
- TSDBDriver.HTTP_SOCKET_TIMEOUT: socket timeout in milliseconds, the default value is 5000 ms. It only takes effect when using JDBC REST connection and batchfetch is false. - TSDBDriver.HTTP_SOCKET_TIMEOUT: socket timeout in milliseconds, the default value is 5000 ms. It only takes effect when using JDBC REST connection and batchfetch is false.
- TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: message transmission timeout in milliseconds, the default value is 3000 ms. It only takes effect when using JDBC REST connection and batchfetch is true. - TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: message transmission timeout in milliseconds, the default value is 3000 ms. It only takes effect when using JDBC REST connection and batchfetch is true.
- TSDBDriver.PROPERTY_KEY_USE_SSL: connecting Securely Using SSL. true: using SSL conneciton, false: not using SSL connection. It only takes effect when using JDBC REST connection. - TSDBDriver.PROPERTY_KEY_USE_SSL: connecting Securely Using SSL. true: using SSL connection, false: not using SSL connection. It only takes effect when using JDBC REST connection.
For JDBC native connections, you can specify other parameters, such as log level, SQL length, etc., by specifying URL and Properties. For more detailed configuration, please refer to [Client Configuration](/reference/config/#Client-Only). For JDBC native connections, you can specify other parameters, such as log level, SQL length, etc., by specifying URL and Properties. For more detailed configuration, please refer to [Client Configuration](/reference/config/#Client-Only).
### Priority of configuration parameters ### Priority of configuration parameters
@ -824,7 +824,7 @@ Example usage is as follows.
//query or insert //query or insert
// ... // ...
connection.close(); // put back to conneciton pool connection.close(); // put back to connection pool
} }
``` ```
@ -856,7 +856,7 @@ public static void main(String[] args) throws Exception {
//query or insert //query or insert
// ... // ...
connection.close(); // put back to conneciton pool connection.close(); // put back to connection pool
} }
``` ```
@ -878,7 +878,9 @@ The source code of the sample application is under `TDengine/examples/JDBC`:
| taos-jdbcdriver version | major changes | | taos-jdbcdriver version | major changes |
| :---------------------: | :--------------------------------------------: | | :---------------------: | :--------------------------------------------: |
| 3.0.1 - 3.0.2 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use 3.0.2 in the JDK 8 environment |
| 3.0.0 | Support for TDengine 3.0 | | 3.0.0 | Support for TDengine 3.0 |
| 2.0.41 | fix decode method of username and password in REST connection |
| 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | | 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters |
| 2.0.38 | JDBC REST connections add bulk pull function | | 2.0.38 | JDBC REST connections add bulk pull function |
| 2.0.37 | Support json tags | | 2.0.37 | Support json tags |
@ -910,6 +912,12 @@ The source code of the sample application is under `TDengine/examples/JDBC`:
**Solution**: Use taos-jdbcdriver 2.* with your TDengine 2.* deployment. **Solution**: Use taos-jdbcdriver 2.* with your TDengine 2.* deployment.
5. java.lang.NoSuchMethodError: java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer; ... taos-jdbcdriver-3.0.1.jar
**Cause**taos-jdbcdriver 3.0.1 is compiled on JDK 11.
**Solution** Use taos-jdbcdriver 3.0.2.
For additional troubleshooting, see [FAQ](../../../train-faq/faq). For additional troubleshooting, see [FAQ](../../../train-faq/faq).
## API Reference ## API Reference

View File

@ -70,7 +70,8 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对
<Tabs defaultValue="maven"> <Tabs defaultValue="maven">
<TabItem value="maven" label="使用 Maven 安装"> <TabItem value="maven" label="使用 Maven 安装">
目前 taos-jdbcdriver 已经发布到 [Sonatype Repository](https://search.maven.org/artifact/com.taosdata.jdbc/taos-jdbcdriver) 仓库,且各大仓库都已同步。 目前 taos-jdbcdriver 已经发布到 [Sonatype Repository](https://search.maven.org/artifact/com.taosdata.jdbc/taos-jdbcdriver)
仓库,且各大仓库都已同步。
- [sonatype](https://search.maven.org/artifact/com.taosdata.jdbc/taos-jdbcdriver) - [sonatype](https://search.maven.org/artifact/com.taosdata.jdbc/taos-jdbcdriver)
- [mvnrepository](https://mvnrepository.com/artifact/com.taosdata.jdbc/taos-jdbcdriver) - [mvnrepository](https://mvnrepository.com/artifact/com.taosdata.jdbc/taos-jdbcdriver)
@ -118,7 +119,8 @@ String jdbcUrl = "jdbc:TAOS://taosdemo.com:6030/test?user=root&password=taosdata
Connection conn = DriverManager.getConnection(jdbcUrl); Connection conn = DriverManager.getConnection(jdbcUrl);
``` ```
以上示例,使用了 JDBC 原生连接的 TSDBDriver建立了到 hostname 为 taosdemo.com端口为 6030TDengine 的默认端口),数据库名为 test 的连接。这个 URL 中指定用户名user为 root密码password为 taosdata。 以上示例,使用了 JDBC 原生连接的 TSDBDriver建立了到 hostname 为 taosdemo.com端口为 6030TDengine 的默认端口),数据库名为 test 的连接。这个 URL
中指定用户名user为 root密码password为 taosdata。
**注意**:使用 JDBC 原生连接taos-jdbcdriver 需要依赖客户端驱动Linux 下是 libtaos.soWindows 下是 taos.dllmacOS 下是 libtaos.dylib **注意**:使用 JDBC 原生连接taos-jdbcdriver 需要依赖客户端驱动Linux 下是 libtaos.soWindows 下是 taos.dllmacOS 下是 libtaos.dylib
@ -131,7 +133,8 @@ url 中的配置参数如下:
- locale客户端语言环境默认值系统当前 locale。 - locale客户端语言环境默认值系统当前 locale。
- timezone客户端使用的时区默认值为系统当前时区。 - timezone客户端使用的时区默认值为系统当前时区。
- batchfetch: true在执行查询时批量拉取结果集false逐行拉取结果集。默认值为true。开启批量拉取同时获取一批数据在查询数据量较大时批量拉取可以有效的提升查询性能。 - batchfetch: true在执行查询时批量拉取结果集false逐行拉取结果集。默认值为true。开启批量拉取同时获取一批数据在查询数据量较大时批量拉取可以有效的提升查询性能。
- batchErrorIgnoretrue在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败将继续执行下面的 SQL。false不再执行失败 SQL 后的任何语句。默认值为false。 - batchErrorIgnoretrue在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败将继续执行下面的 SQL。false不再执行失败 SQL
后的任何语句。默认值为false。
JDBC 原生连接的使用请参见[视频教程](https://www.taosdata.com/blog/2020/11/11/1955.html)。 JDBC 原生连接的使用请参见[视频教程](https://www.taosdata.com/blog/2020/11/11/1955.html)。
@ -170,11 +173,13 @@ secondEp cluster_node2:6030
# locale en_US.UTF-8 # locale en_US.UTF-8
``` ```
以上示例jdbc 会使用客户端的配置文件,建立到 hostname 为 cluster_node1、端口为 6030、数据库名为 test 的连接。当集群中 firstEp 节点失效时JDBC 会尝试使用 secondEp 连接集群。 以上示例jdbc 会使用客户端的配置文件,建立到 hostname 为 cluster_node1、端口为 6030、数据库名为 test 的连接。当集群中 firstEp 节点失效时JDBC 会尝试使用 secondEp
连接集群。
TDengine 中,只要保证 firstEp 和 secondEp 中一个节点有效,就可以正常建立到集群的连接。 TDengine 中,只要保证 firstEp 和 secondEp 中一个节点有效,就可以正常建立到集群的连接。
> **注意**:这里的配置文件指的是调用 JDBC Connector 的应用程序所在机器上的配置文件Linux OS 上默认值 /etc/taos/taos.cfg Windows OS 上默认值 C://TDengine/cfg/taos.cfg。 > **注意**:这里的配置文件指的是调用 JDBC Connector 的应用程序所在机器上的配置文件Linux OS 上默认值 /etc/taos/taos.cfg Windows OS 上默认值
C://TDengine/cfg/taos.cfg。
</TabItem> </TabItem>
<TabItem value="rest" label="REST 连接"> <TabItem value="rest" label="REST 连接">
@ -185,7 +190,8 @@ String jdbcUrl = "jdbc:TAOS-RS://taosdemo.com:6041/test?user=root&password=taosd
Connection conn = DriverManager.getConnection(jdbcUrl); Connection conn = DriverManager.getConnection(jdbcUrl);
``` ```
以上示例,使用了 JDBC REST 连接的 RestfulDriver建立了到 hostname 为 taosdemo.com端口为 6041数据库名为 test 的连接。这个 URL 中指定用户名user为 root密码password为 taosdata。 以上示例,使用了 JDBC REST 连接的 RestfulDriver建立了到 hostname 为 taosdemo.com端口为 6041数据库名为 test 的连接。这个 URL 中指定用户名user
root密码password为 taosdata。
使用 JDBC REST 连接,不需要依赖客户端驱动。与 JDBC 原生连接相比,仅需要: 使用 JDBC REST 连接,不需要依赖客户端驱动。与 JDBC 原生连接相比,仅需要:
@ -197,9 +203,11 @@ url 中的配置参数如下:
- user登录 TDengine 用户名,默认值 'root'。 - user登录 TDengine 用户名,默认值 'root'。
- password用户登录密码默认值 'taosdata'。 - password用户登录密码默认值 'taosdata'。
- batchfetch: true在执行查询时批量拉取结果集false逐行拉取结果集。默认值为false。逐行拉取结果集使用 HTTP 方式进行数据传输。JDBC REST 连接支持批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTPWebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。 - batchfetch: true在执行查询时批量拉取结果集false逐行拉取结果集。默认值为false。逐行拉取结果集使用 HTTP 方式进行数据传输。JDBC REST
连接支持批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTPWebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
- charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。 - charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。
- batchErrorIgnoretrue在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false不再执行失败 SQL 后的任何语句。默认值为false。 - batchErrorIgnoretrue在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false不再执行失败 SQL
后的任何语句。默认值为false。
- httpConnectTimeout: 连接超时时间,单位 ms 默认值为 5000。 - httpConnectTimeout: 连接超时时间,单位 ms 默认值为 5000。
- httpSocketTimeout: socket 超时时间,单位 ms默认值为 5000。仅在 batchfetch 设置为 false 时生效。 - httpSocketTimeout: socket 超时时间,单位 ms默认值为 5000。仅在 batchfetch 设置为 false 时生效。
- messageWaitTimeout: 消息超时时间, 单位 ms 默认值为 3000。 仅在 batchfetch 设置为 true 时生效。 - messageWaitTimeout: 消息超时时间, 单位 ms 默认值为 3000。 仅在 batchfetch 设置为 true 时生效。
@ -215,7 +223,9 @@ url 中的配置参数如下:
INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6); INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6);
``` ```
- 如果在 url 中指定了 dbname那么JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url在 SQL 中不需要指定 dbname。例如url 为 jdbc:TAOS-RS://127.0.0.1:6041/test那么可以执行 sqlinsert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6); - 如果在 url 中指定了 dbname那么JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url在 SQL 中不需要指定 dbname。例如url 为
jdbc:TAOS-RS://127.0.0.1:6041/test那么可以执行 sqlinsert into t1 using weather(ts, temperature)
tags('California.SanFrancisco') values(now, 24.6);
::: :::
@ -826,7 +836,7 @@ public abstract class ConsumerLoop {
//query or insert //query or insert
// ... // ...
connection.close(); // put back to conneciton pool connection.close(); // put back to connection pool
} }
``` ```
@ -858,7 +868,7 @@ public static void main(String[] args) throws Exception {
//query or insert //query or insert
// ... // ...
connection.close(); // put back to conneciton pool connection.close(); // put back to connection pool
} }
``` ```
@ -880,7 +890,9 @@ public static void main(String[] args) throws Exception {
| taos-jdbcdriver 版本 | 主要变化 | | taos-jdbcdriver 版本 | 主要变化 |
| :------------------: | :----------------------------: | | :------------------: | :----------------------------: |
| 3.0.1 - 3.0.2 | 修复一些情况下结果集数据解析错误的问题。3.0.1 在 JDK 11 环境编译JDK 8 环境下建议使用 3.0.2 版本 |
| 3.0.0 | 支持 TDengine 3.0 | | 3.0.0 | 支持 TDengine 3.0 |
| 2.0.41 | 修正 REST 连接中用户名和密码转码方式 |
| 2.0.39 - 2.0.40 | 增加 REST 连接/请求 超时设置 | | 2.0.39 - 2.0.40 | 增加 REST 连接/请求 超时设置 |
| 2.0.38 | JDBC REST 连接增加批量拉取功能 | | 2.0.38 | JDBC REST 连接增加批量拉取功能 |
| 2.0.37 | 增加对 json tag 支持 | | 2.0.37 | 增加对 json tag 支持 |
@ -912,6 +924,12 @@ public static void main(String[] args) throws Exception {
**解决方法** 使用 taos-jdbcdriver 2.* 版本连接 TDengine 2.* 版本。 **解决方法** 使用 taos-jdbcdriver 2.* 版本连接 TDengine 2.* 版本。
5. java.lang.NoSuchMethodError: java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer; ... taos-jdbcdriver-3.0.1.jar
**原因**taos-jdbcdriver 3.0.1 版本需要在 JDK 11+ 环境使用。
**解决方法** 更换 taos-jdbcdriver 3.0.2 版本。
其它问题请参考 [FAQ](../../../train-faq/faq) 其它问题请参考 [FAQ](../../../train-faq/faq)
## API 参考 ## API 参考

View File

@ -269,6 +269,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
#if defined(TD_MSG_NUMBER_) #if defined(TD_MSG_NUMBER_)

View File

@ -678,24 +678,61 @@ void syncReconfigFinishPrint2(char* s, const SyncReconfigFinish* pMsg);
void syncReconfigFinishLog(const SyncReconfigFinish* pMsg); void syncReconfigFinishLog(const SyncReconfigFinish* pMsg);
void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
// on message ---------------------- // ---------------------------------------------
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); typedef enum {
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); SYNC_LOCAL_CMD_STEP_DOWN = 100,
int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg); } ESyncLocalCmd;
typedef struct SyncLocalCmd {
uint32_t bytes;
int32_t vgId;
uint32_t msgType;
SRaftId srcId;
SRaftId destId;
int32_t cmd;
SyncTerm sdNewTerm; // step down new term
} SyncLocalCmd;
SyncLocalCmd* syncLocalCmdBuild(int32_t vgId);
void syncLocalCmdDestroy(SyncLocalCmd* pMsg);
void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen);
void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg);
char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len);
SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len);
void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg);
void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg);
SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg);
char* syncLocalCmd2Str(const SyncLocalCmd* pMsg);
// for debug ----------------------
void syncLocalCmdPrint(const SyncLocalCmd* pMsg);
void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg);
void syncLocalCmdLog(const SyncLocalCmd* pMsg);
void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg);
// on message ----------------------
int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex);
int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg); int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg); int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg); int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex);
int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg);
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg);
// ----------------------------------------- // -----------------------------------------
typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg);
typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg);

View File

@ -538,12 +538,12 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
} else if (pMsg->msgType == TDMT_SYNC_PING) { } else if (pMsg->msgType == TDMT_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
code = syncNodeOnPingCb(pSyncNode, pSyncMsg); code = syncNodeOnPing(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg); syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); code = syncNodeOnPingReply(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {

View File

@ -3496,6 +3496,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order); resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
// resetDataBlockScanInfo(pReader->status.pTableMap, pReader->window.skey);
// no data in files, let's try buffer in memory // no data in files, let's try buffer in memory
if (pReader->status.fileIter.numOfFiles == 0) { if (pReader->status.fileIter.numOfFiles == 0) {
@ -3618,16 +3619,6 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
code = doOpenReaderImpl(pNextReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doOpenReaderImpl(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
} }
@ -3758,15 +3749,21 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (pReader->innerReader[0] != NULL && pReader->step == 0) { if (pReader->innerReader[0] != NULL && pReader->step == 0) {
bool ret = doTsdbNextDataBlock(pReader->innerReader[0]); bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
resetDataBlockScanInfo(pReader->innerReader[0]->status.pTableMap, pReader->innerReader[0]->window.ekey);
pReader->step = EXTERNAL_ROWS_PREV; pReader->step = EXTERNAL_ROWS_PREV;
if (ret) { if (ret) {
return ret; return ret;
} }
} }
if (pReader->step == EXTERNAL_ROWS_PREV) { if (pReader->step == EXTERNAL_ROWS_PREV) {
// prepare for the main scan
int32_t code = doOpenReaderImpl(pReader);
resetDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pReader->step = EXTERNAL_ROWS_MAIN; pReader->step = EXTERNAL_ROWS_MAIN;
} }
@ -3776,7 +3773,13 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
} }
if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) { if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
// prepare for the next row scan
int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
resetDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey); resetDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]); bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
pReader->step = EXTERNAL_ROWS_NEXT; pReader->step = EXTERNAL_ROWS_NEXT;
if (ret1) { if (ret1) {

View File

@ -301,13 +301,13 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} else if (pMsg->msgType == TDMT_SYNC_PING) { } else if (pMsg->msgType == TDMT_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
code = syncNodeOnPingCb(pSyncNode, pSyncMsg); code = syncNodeOnPing(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg); syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); code = syncNodeOnPingReply(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {

View File

@ -1114,6 +1114,7 @@ int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn,
CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup, exists)); CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup, exists));
} }
#if 0
int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp) { int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp) {
CTG_API_ENTER(); CTG_API_ENTER();
@ -1176,6 +1177,7 @@ _return:
CTG_API_LEAVE(code); CTG_API_LEAVE(code);
} }
#endif
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, catalogCallback fp, int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, catalogCallback fp,
void* param, int64_t* jobId) { void* param, int64_t* jobId) {

View File

@ -63,6 +63,7 @@ enum {
CTGT_RSP_QNODELIST, CTGT_RSP_QNODELIST,
CTGT_RSP_UDF, CTGT_RSP_UDF,
CTGT_RSP_SVRVER, CTGT_RSP_SVRVER,
CTGT_RSP_DNODElIST,
CTGT_RSP_TBMETA_NOT_EXIST, CTGT_RSP_TBMETA_NOT_EXIST,
}; };
@ -702,6 +703,30 @@ void ctgTestRspSvrVer(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRs
pRsp->pCont = pReq; pRsp->pCont = pReq;
} }
void ctgTestRspDndeList(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
rpcFreeCont(pMsg->pCont);
SDnodeListRsp dRsp = {0};
dRsp.dnodeList = taosArrayInit(1, sizeof(SEpSet));
SEpSet epSet = {0};
epSet.numOfEps = 1;
tstrncpy(epSet.eps[0].fqdn, "localhost", TSDB_FQDN_LEN);
epSet.eps[0].port = 6030;
(void)taosArrayPush(dRsp.dnodeList, &epSet);
int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &dRsp);
void *pReq = rpcMallocCont(rspLen);
tSerializeSDnodeListRsp(pReq, rspLen, &dRsp);
pRsp->code = 0;
pRsp->contLen = rspLen;
pRsp->pCont = pReq;
tFreeSDnodeListRsp(&dRsp);
}
void ctgTestRspAuto(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { void ctgTestRspAuto(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
switch (pMsg->msgType) { switch (pMsg->msgType) {
@ -727,6 +752,9 @@ void ctgTestRspAuto(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp)
case TDMT_MND_SERVER_VERSION: case TDMT_MND_SERVER_VERSION:
ctgTestRspSvrVer(shandle, pEpSet, pMsg, pRsp); ctgTestRspSvrVer(shandle, pEpSet, pMsg, pRsp);
break; break;
case TDMT_MND_DNODE_LIST:
ctgTestRspDndeList(shandle, pEpSet, pMsg, pRsp);
break;
default: default:
break; break;
} }
@ -779,6 +807,9 @@ void ctgTestRspByIdx(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp
case CTGT_RSP_SVRVER: case CTGT_RSP_SVRVER:
ctgTestRspSvrVer(shandle, pEpSet, pMsg, pRsp); ctgTestRspSvrVer(shandle, pEpSet, pMsg, pRsp);
break; break;
case CTGT_RSP_DNODElIST:
ctgTestRspDndeList(shandle, pEpSet, pMsg, pRsp);
break;
default: default:
ctgTestRspAuto(shandle, pEpSet, pMsg, pRsp); ctgTestRspAuto(shandle, pEpSet, pMsg, pRsp);
break; break;
@ -2948,6 +2979,69 @@ TEST(apiTest, catalogGetServerVersion_test) {
catalogDestroy(); catalogDestroy();
} }
TEST(apiTest, catalogUpdateTableIndex_test) {
struct SCatalog *pCtg = NULL;
SRequestConnInfo connInfo = {0};
SRequestConnInfo *mockPointer = (SRequestConnInfo *)&connInfo;
ctgTestInitLogFile();
memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc));
ctgTestRspIdx = 0;
ctgTestRspFunc[0] = CTGT_RSP_SVRVER;
ctgTestSetRspByIdx();
initQueryModuleMsgHandle();
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
STableIndexRsp rsp = {0};
strcpy(rsp.dbFName, ctgTestDbname);
strcpy(rsp.tbName, ctgTestSTablename);
rsp.suid = ctgTestSuid;
rsp.version = 1;
code = catalogUpdateTableIndex(pCtg, &rsp);
ASSERT_EQ(code, 0);
catalogDestroy();
}
TEST(apiTest, catalogGetDnodeList_test) {
struct SCatalog *pCtg = NULL;
SRequestConnInfo connInfo = {0};
SRequestConnInfo *mockPointer = (SRequestConnInfo *)&connInfo;
ctgTestInitLogFile();
memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc));
ctgTestRspIdx = 0;
ctgTestRspFunc[0] = CTGT_RSP_DNODElIST;
ctgTestSetRspByIdx();
initQueryModuleMsgHandle();
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SArray* pList = NULL;
code = catalogGetDnodeList(pCtg, mockPointer, &pList);
ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize(pList), 1);
taosArrayDestroy(pList);
catalogDestroy();
}
int main(int argc, char **argv) { int main(int argc, char **argv) {

View File

@ -31,10 +31,10 @@ extern "C" {
#define TIMER_MAX_MS 0x7FFFFFFF #define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000 #define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 5000 #define PING_TIMER_MS 5000
#define ELECT_TIMER_MS_MIN 5000 #define ELECT_TIMER_MS_MIN 2500
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
#define HEARTBEAT_TIMER_MS 900 #define HEARTBEAT_TIMER_MS 1000
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})

View File

@ -49,8 +49,8 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths);
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId); static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
// process message ---- // process message ----
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg);
// --------------------------------- // ---------------------------------
static void syncNodeFreeCb(void* param) { static void syncNodeFreeCb(void* param) {
@ -1327,8 +1327,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
} }
// init callback // init callback
pSyncNode->FpOnPing = syncNodeOnPingCb; pSyncNode->FpOnPing = syncNodeOnPing;
pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; pSyncNode->FpOnPingReply = syncNodeOnPingReply;
pSyncNode->FpOnClientRequest = syncNodeOnClientRequest; pSyncNode->FpOnClientRequest = syncNodeOnClientRequest;
pSyncNode->FpOnTimeout = syncNodeOnTimer; pSyncNode->FpOnTimeout = syncNodeOnTimer;
pSyncNode->FpOnSnapshot = syncNodeOnSnapshot; pSyncNode->FpOnSnapshot = syncNodeOnSnapshot;
@ -3003,8 +3003,9 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
} }
// on message ---- // on message ----
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg) {
// log state sTrace("vgId:%d, recv sync-ping", ths->vgId);
SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId); SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsgReply, &rpcMsg); syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
@ -3022,9 +3023,9 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
return 0; return 0;
} }
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg) {
int32_t ret = 0; int32_t ret = 0;
syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg); sTrace("vgId:%d, recv sync-ping-reply", ths->vgId);
return ret; return ret;
} }

View File

@ -3095,3 +3095,153 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) {
taosMemoryFree(serialized); taosMemoryFree(serialized);
} }
} }
// ---------------------------------------------
SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) {
uint32_t bytes = sizeof(SyncLocalCmd);
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
return pMsg;
}
void syncLocalCmdDestroy(SyncLocalCmd* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncLocalCmdSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncLocalCmdDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncLocalCmdSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg) {
syncLocalCmdDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncLocalCmd* pMsg = syncLocalCmdDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) {
char u64buf[128];
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128];
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
cJSON_AddStringToObject(pDestId, "addr", u64buf);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128];
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
cJSON_AddNumberToObject(pRoot, "cmd", pMsg->cmd);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm);
cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncLocalCmd2Json", pRoot);
return pJson;
}
char* syncLocalCmd2Str(const SyncLocalCmd* pMsg) {
cJSON* pJson = syncLocalCmd2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncLocalCmdPrint(const SyncLocalCmd* pMsg) {
char* serialized = syncLocalCmd2Str(pMsg);
printf("syncLocalCmdPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg) {
char* serialized = syncLocalCmd2Str(pMsg);
printf("syncLocalCmdPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncLocalCmdLog(const SyncLocalCmd* pMsg) {
char* serialized = syncLocalCmd2Str(pMsg);
sTrace("syncLocalCmdLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncLocalCmd2Str(pMsg);
sTrace("syncLocalCmdLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}

View File

@ -57,7 +57,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) {
SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
if (nextIndex < logStartIndex || nextIndex - 1 > logEndIndex) { if (nextIndex < logStartIndex || nextIndex - 1 > logEndIndex) {
char logBuf[128]; char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "start snapshot for next-index:%" PRId64 ", start:%" PRId64 ", end:%" PRId64, snprintf(logBuf, sizeof(logBuf), "maybe start snapshot for next-index:%" PRId64 ", start:%" PRId64 ", end:%" PRId64,
nextIndex, logStartIndex, logEndIndex); nextIndex, logStartIndex, logEndIndex);
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);

View File

@ -59,6 +59,7 @@ add_executable(syncRestoreFromSnapshot "")
add_executable(syncRaftCfgIndexTest "") add_executable(syncRaftCfgIndexTest "")
add_executable(syncHeartbeatTest "") add_executable(syncHeartbeatTest "")
add_executable(syncHeartbeatReplyTest "") add_executable(syncHeartbeatReplyTest "")
add_executable(syncLocalCmdTest "")
target_sources(syncTest target_sources(syncTest
@ -305,6 +306,10 @@ target_sources(syncHeartbeatReplyTest
PRIVATE PRIVATE
"syncHeartbeatReplyTest.cpp" "syncHeartbeatReplyTest.cpp"
) )
target_sources(syncLocalCmdTest
PRIVATE
"syncLocalCmdTest.cpp"
)
target_include_directories(syncTest target_include_directories(syncTest
@ -612,6 +617,11 @@ target_include_directories(syncHeartbeatReplyTest
"${TD_SOURCE_DIR}/include/libs/sync" "${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories(syncLocalCmdTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest target_link_libraries(syncTest
@ -858,6 +868,10 @@ target_link_libraries(syncHeartbeatReplyTest
sync sync
gtest_main gtest_main
) )
target_link_libraries(syncLocalCmdTest
sync
gtest_main
)
enable_testing() enable_testing()

View File

@ -270,7 +270,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
pMsg->msgType = 9999; pMsg->msgType = 9999;
pMsg->contLen = 256; pMsg->contLen = 256;
pMsg->pCont = rpcMallocCont(pMsg->contLen); pMsg->pCont = rpcMallocCont(pMsg->contLen);
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%" PRId64, myIndex, i, count,
taosGetTimestampMs()); taosGetTimestampMs());
return pMsg; return pMsg;
} }

View File

@ -191,7 +191,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
pMsg->msgType = 9999; pMsg->msgType = 9999;
pMsg->contLen = 256; pMsg->contLen = 256;
pMsg->pCont = rpcMallocCont(pMsg->contLen); pMsg->pCont = rpcMallocCont(pMsg->contLen);
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%" PRId64, myIndex, i, count,
taosGetTimestampMs()); taosGetTimestampMs());
return pMsg; return pMsg;
} }

View File

@ -186,7 +186,7 @@ int main(int argc, char **argv) {
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, pEntry->index, &pEntry); int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, pEntry->index, &pEntry);
ASSERT(code == 0); ASSERT(code == 0);
syncEntryLog2((char *)"==pEntry2==", pEntry2); syncEntryLog2((char *)"==pEntry==", pEntry);
// step5 // step5
uint32_t len; uint32_t len;

View File

@ -13,7 +13,7 @@ void print(SHashObj *pNextIndex) {
SRaftId *pRaftId = (SRaftId *)key; SRaftId *pRaftId = (SRaftId *)key;
printf("key:<" PRIu64 ", %d>, value:%" PRIu64 " \n", pRaftId->addr, pRaftId->vgId, *p); printf("key:<%" PRIu64 ", %d>, value:%" PRIu64 " \n", pRaftId->addr, pRaftId->vgId, *p);
p = (uint64_t *)taosHashIterate(pNextIndex, p); p = (uint64_t *)taosHashIterate(pNextIndex, p);
} }
} }

View File

@ -0,0 +1,100 @@
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
SyncLocalCmd *createMsg() {
SyncLocalCmd *pMsg = syncLocalCmdBuild(1000);
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->sdNewTerm = 123;
pMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
return pMsg;
}
void test1() {
SyncLocalCmd *pMsg = createMsg();
syncLocalCmdLog2((char *)"test1:", pMsg);
syncLocalCmdDestroy(pMsg);
}
void test2() {
SyncLocalCmd *pMsg = createMsg();
uint32_t len = pMsg->bytes;
char *serialized = (char *)taosMemoryMalloc(len);
syncLocalCmdSerialize(pMsg, serialized, len);
SyncLocalCmd *pMsg2 = syncLocalCmdBuild(1000);
syncLocalCmdDeserialize(serialized, len, pMsg2);
syncLocalCmdLog2((char *)"test2: syncLocalCmdSerialize -> syncLocalCmdDeserialize ", pMsg2);
taosMemoryFree(serialized);
syncLocalCmdDestroy(pMsg);
syncLocalCmdDestroy(pMsg2);
}
void test3() {
SyncLocalCmd *pMsg = createMsg();
uint32_t len;
char *serialized = syncLocalCmdSerialize2(pMsg, &len);
SyncLocalCmd *pMsg2 = syncLocalCmdDeserialize2(serialized, len);
syncLocalCmdLog2((char *)"test3: syncLocalCmdSerialize3 -> syncLocalCmdDeserialize2 ", pMsg2);
taosMemoryFree(serialized);
syncLocalCmdDestroy(pMsg);
syncLocalCmdDestroy(pMsg2);
}
void test4() {
SyncLocalCmd *pMsg = createMsg();
SRpcMsg rpcMsg;
syncLocalCmd2RpcMsg(pMsg, &rpcMsg);
SyncLocalCmd *pMsg2 = (SyncLocalCmd *)taosMemoryMalloc(rpcMsg.contLen);
syncLocalCmdFromRpcMsg(&rpcMsg, pMsg2);
syncLocalCmdLog2((char *)"test4: syncLocalCmd2RpcMsg -> syncLocalCmdFromRpcMsg ", pMsg2);
rpcFreeCont(rpcMsg.pCont);
syncLocalCmdDestroy(pMsg);
syncLocalCmdDestroy(pMsg2);
}
void test5() {
SyncLocalCmd *pMsg = createMsg();
SRpcMsg rpcMsg;
syncLocalCmd2RpcMsg(pMsg, &rpcMsg);
SyncLocalCmd *pMsg2 = syncLocalCmdFromRpcMsg2(&rpcMsg);
syncLocalCmdLog2((char *)"test5: syncLocalCmd2RpcMsg -> syncLocalCmdFromRpcMsg2 ", pMsg2);
rpcFreeCont(rpcMsg.pCont);
syncLocalCmdDestroy(pMsg);
syncLocalCmdDestroy(pMsg2);
}
int main() {
gRaftDetailLog = true;
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
logTest();
test1();
test2();
test3();
test4();
test5();
return 0;
}

View File

@ -15,7 +15,7 @@ int main(int argc, char** argv) {
char host[128]; char host[128];
uint16_t port; uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port); syncUtilU642Addr(u64, host, sizeof(host), &port);
printf("" PRIu64 " -> %s:%d \n", u64, host, port); printf("%" PRIu64 " -> %s:%d \n", u64, host, port);
} else if (argc == 3) { } else if (argc == 3) {
uint64_t u64; uint64_t u64;

View File

@ -97,8 +97,8 @@ void test1() {
sTrace("lastIndex: %" PRId64, lastIndex); sTrace("lastIndex: %" PRId64, lastIndex);
sTrace("lastTerm: %" PRIu64, lastTerm); sTrace("lastTerm: %" PRIu64, lastTerm);
sTrace("syncStartIndex: %" PRId64, syncStartIndex); sTrace("syncStartIndex: %" PRId64, syncStartIndex);
sTrace("" PRId64 "'s preIndex: %" PRId64, testIndex, preIndex); sTrace("testIndex: %" PRId64 " preIndex: %" PRId64, testIndex, preIndex);
sTrace("" PRId64 "'s preTerm: %" PRIu64, testIndex, preTerm); sTrace("testIndex: %" PRId64 " preTerm: %" PRIu64, testIndex, preTerm);
if (gAssert) { if (gAssert) {
assert(lastIndex == -1); assert(lastIndex == -1);
@ -170,8 +170,8 @@ void test2() {
SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i); SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i);
SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i); SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i);
sTrace("" PRId64 "'s preIndex: %" PRId64, i, preIndex); sTrace("i: %" PRId64 " preIndex: %" PRId64, i, preIndex);
sTrace("" PRId64 "'s preTerm: %" PRIu64, i, preTerm); sTrace("i: %" PRId64 " preTerm: %" PRIu64, i, preTerm);
if (gAssert) { if (gAssert) {
SyncIndex preIndexArr[12] = {-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; SyncIndex preIndexArr[12] = {-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
@ -292,8 +292,8 @@ void test4() {
SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i); SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i);
SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i); SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i);
sTrace("" PRId64 "'s preIndex: %" PRId64, i, preIndex); sTrace("i: %" PRId64 " preIndex: %" PRId64, i, preIndex);
sTrace("" PRId64 "'s preTerm: %" PRIu64, i, preTerm); sTrace("i: %" PRId64 " preTerm: %" PRIu64, i, preTerm);
} }
logStoreDestory(pLogStore); logStoreDestory(pLogStore);
@ -354,8 +354,8 @@ void test5() {
SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i); SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i);
SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i); SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i);
sTrace("" PRId64 "'s preIndex: %" PRId64, i, preIndex); sTrace("i: %" PRId64 " preIndex: %" PRId64, i, preIndex);
sTrace("" PRId64 "'s preTerm: %" PRIu64, i, preTerm); sTrace("i: %" PRId64 " preTerm: %" PRIu64, i, preTerm);
if (gAssert) { if (gAssert) {
SyncIndex preIndexArr[12] = {9999, 9999, 9999, 9999, 9999, 9999, 5, 6, 7, 8, 9, 10}; SyncIndex preIndexArr[12] = {9999, 9999, 9999, 9999, 9999, 9999, 5, 6, 7, 8, 9, 10};

View File

@ -145,7 +145,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
pMsg->msgType = 9999; pMsg->msgType = 9999;
pMsg->contLen = 256; pMsg->contLen = 256;
pMsg->pCont = rpcMallocCont(pMsg->contLen); pMsg->pCont = rpcMallocCont(pMsg->contLen);
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%" PRId64, myIndex, i, count,
taosGetTimestampMs()); taosGetTimestampMs());
return pMsg; return pMsg;
} }

View File

@ -58,18 +58,18 @@ void syncRespMgrGetTest(uint64_t i) {
if (ret == 1) { if (ret == 1) {
printStub(&stub); printStub(&stub);
} else if (ret == 0) { } else if (ret == 0) {
printf("" PRId64 " notFound \n", i); printf("%" PRId64 " notFound \n", i);
} }
} }
void syncRespMgrGetAndDelTest(uint64_t i) { void syncRespMgrGetAndDelTest(uint64_t i) {
printf("------syncRespMgrGetAndDelTest-------" PRIu64 "-- \n", i); printf("------syncRespMgrGetAndDelTest-------%" PRIu64 "-- \n", i);
SRespStub stub; SRespStub stub;
int32_t ret = syncRespMgrGetAndDel(pMgr, i, &stub); int32_t ret = syncRespMgrGetAndDel(pMgr, i, &stub);
if (ret == 1) { if (ret == 1) {
printStub(&stub); printStub(&stub);
} else if (ret == 0) { } else if (ret == 0) {
printf("" PRId64 " notFound \n", i); printf("%" PRId64 " notFound \n", i);
} }
} }

View File

@ -154,7 +154,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb== pFsm:%p", pFsm); } void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb== pFsm:%p", pFsm); }
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) { void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) {
char* s = syncCfg2Str(&(cbMeta.newCfg)); char* s = syncCfg2Str(&(cbMeta->newCfg));
sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64 sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64
", newCfg:%s", ", newCfg:%s",
cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term, s); cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term, s);
@ -308,7 +308,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
pMsg->msgType = TDMT_VND_SUBMIT; pMsg->msgType = TDMT_VND_SUBMIT;
pMsg->contLen = 256; pMsg->contLen = 256;
pMsg->pCont = rpcMallocCont(pMsg->contLen); pMsg->pCont = rpcMallocCont(pMsg->contLen);
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%" PRId64, myIndex, i, count,
taosGetTimestampMs()); taosGetTimestampMs());
return pMsg; return pMsg;
} }

View File

@ -655,7 +655,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
uv_tcp_init(pObj->loop, cli); uv_tcp_init(pObj->loop, cli);
if (uv_accept(stream, (uv_stream_t*)cli) == 0) { if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
#ifdef WINDOWS #if defined(WINDOWS) || defined(DARWIN)
if (pObj->numOfWorkerReady < pObj->numOfThreads) { if (pObj->numOfWorkerReady < pObj->numOfThreads) {
tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads,
pObj->numOfWorkerReady); pObj->numOfWorkerReady);
@ -779,7 +779,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
return false; return false;
} }
#ifdef WINDOWS #if defined(WINDOWS) || defined(DARWIN)
uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
#else #else
uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
@ -799,7 +799,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
QUEUE_INIT(&pThrd->conn); QUEUE_INIT(&pThrd->conn);
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
#ifdef WINDOWS #if defined(WINDOWS) || defined(DARWIN)
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
#else #else
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
@ -976,17 +976,18 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
uv_loop_init(srv->loop); uv_loop_init(srv->loop);
char pipeName[PATH_MAX]; char pipeName[PATH_MAX];
#ifdef WINDOWS #if defined(WINDOWS) || defined(DARWIN)
int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0); int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0);
if (ret != 0) { if (ret != 0) {
tError("failed to init pipe, errmsg: %s", uv_err_name(ret)); tError("failed to init pipe, errmsg: %s", uv_err_name(ret));
goto End; goto End;
} }
#if defined(WINDOWS)
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%d-%" PRIu64, taosSafeRand(), GetCurrentProcessId()); snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%d-%" PRIu64, taosSafeRand(), GetCurrentProcessId());
// char pipeName[PATH_MAX] = {0}; #elif defined(DARWIN)
// snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(),
// taosGetSelfPthreadId()); taosGetSelfPthreadId());
#endif
ret = uv_pipe_bind(&srv->pipeListen, pipeName); ret = uv_pipe_bind(&srv->pipeListen, pipeName);
if (ret != 0) { if (ret != 0) {
@ -1036,7 +1037,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->pThreadObj[i] = thrd; srv->pThreadObj[i] = thrd;
uv_os_sock_t fds[2]; uv_os_sock_t fds[2];
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
goto End; goto End;
} }
@ -1059,8 +1060,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto End; goto End;
} }
} }
#endif #endif
if (false == taosValidIpAndPort(srv->ip, srv->port)) { if (false == taosValidIpAndPort(srv->ip, srv->port)) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr()); tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr());