diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 7d877987ac..ea70f31ff1 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -1,4 +1,10 @@ -# Pull Request Checklist +# Description + +Please briefly describe the code changes in this pull request. + +# Checklist + +Please check the items in the checklist if applicable. - [ ] Is the user manual updated? - [ ] Are the test cases passed and automated? diff --git a/docs/en/07-develop/01-connect.md b/docs/en/07-develop/01-connect.md index a1fd6136fe..4086f15193 100644 --- a/docs/en/07-develop/01-connect.md +++ b/docs/en/07-develop/01-connect.md @@ -94,7 +94,7 @@ If you are using Maven to manage the project, simply add the following dependenc com.taosdata.jdbc taos-jdbcdriver - 3.3.3 + 3.4.0 ``` diff --git a/docs/en/14-reference/05-connector/14-java.mdx b/docs/en/14-reference/05-connector/14-java.mdx index 7302348f2e..29f919b5cf 100644 --- a/docs/en/14-reference/05-connector/14-java.mdx +++ b/docs/en/14-reference/05-connector/14-java.mdx @@ -35,6 +35,8 @@ REST connections are supported on all platforms that can run Java. | taos-jdbcdriver Version | Major Changes | TDengine Version | | :----------------------: | :-------------------------------------------------------------------------------------------------------------------------------------------------------: | :-------------------: | +| 3.4.0 | 1. Replace the fastjson library with the Jackson library; 2. WebSocket connection protocal uses independent identification; 3. Optimize the use of backend pull threads to avoid user misuse leading to timeouts.| - | +| 3.3.4 | 1. Fixed getInt error when data type is float| - | | 3.3.3 | 1. Fixed memory leak caused by closing WebSocket statement | - | | 3.3.2 | 1. Optimized parameter binding performance under WebSocket connections; 2. Improved support for MyBatis | - | | 3.3.0 | 1. Optimized data transmission performance under WebSocket connections; 2. Supported skipping SSL verification, turned off by default | 3.3.2.0 and above | diff --git a/docs/examples/JDBC/mybatisplus-demo/pom.xml b/docs/examples/JDBC/mybatisplus-demo/pom.xml index f792946c96..2077e31d8d 100644 --- a/docs/examples/JDBC/mybatisplus-demo/pom.xml +++ b/docs/examples/JDBC/mybatisplus-demo/pom.xml @@ -47,7 +47,7 @@ com.taosdata.jdbc taos-jdbcdriver - 3.2.4 + 3.4.0 diff --git a/docs/examples/JDBC/mybatisplus-demo/src/main/java/com/taosdata/example/mybatisplusdemo/domain/Meters.java b/docs/examples/JDBC/mybatisplus-demo/src/main/java/com/taosdata/example/mybatisplusdemo/domain/Meters.java new file mode 100644 index 0000000000..e886e56269 --- /dev/null +++ b/docs/examples/JDBC/mybatisplus-demo/src/main/java/com/taosdata/example/mybatisplusdemo/domain/Meters.java @@ -0,0 +1,16 @@ +package com.taosdata.example.mybatisplusdemo.domain; + +import lombok.Data; + +import java.sql.Timestamp; + +@Data +public class Meters { + private String tbname; + private Timestamp ts; + private float current; + private int voltage; + private float phase; + private int groupid; + private byte[] location; +} diff --git a/docs/examples/JDBC/mybatisplus-demo/src/main/java/com/taosdata/example/mybatisplusdemo/mapper/MetersMapper.java b/docs/examples/JDBC/mybatisplus-demo/src/main/java/com/taosdata/example/mybatisplusdemo/mapper/MetersMapper.java new file mode 100644 index 0000000000..441c340886 --- /dev/null +++ b/docs/examples/JDBC/mybatisplus-demo/src/main/java/com/taosdata/example/mybatisplusdemo/mapper/MetersMapper.java @@ -0,0 +1,31 @@ +package com.taosdata.example.mybatisplusdemo.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.taosdata.example.mybatisplusdemo.domain.Meters; +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Update; + +import java.util.List; + +public interface MetersMapper extends BaseMapper { + + @Update("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") + int createTable(); + + @Insert("insert into meters (tbname, ts, groupid, location, current, voltage, phase) values(#{tbname}, #{ts}, #{groupid}, #{location}, #{current}, #{voltage}, #{phase})") + int insertOne(Meters one); + + @Insert({ + "" + }) + int insertBatch(@Param("list") List metersList); + + @Update("drop stable if exists meters") + void dropTable(); +} diff --git a/docs/examples/JDBC/mybatisplus-demo/src/main/resources/application.yml b/docs/examples/JDBC/mybatisplus-demo/src/main/resources/application.yml index 985ed1675e..e9855bf011 100644 --- a/docs/examples/JDBC/mybatisplus-demo/src/main/resources/application.yml +++ b/docs/examples/JDBC/mybatisplus-demo/src/main/resources/application.yml @@ -1,7 +1,7 @@ spring: datasource: - driver-class-name: com.taosdata.jdbc.TSDBDriver - url: jdbc:TAOS://localhost:6030/mp_test?charset=UTF-8&locale=en_US.UTF-8&timezone=UTC-8 + driver-class-name: com.taosdata.jdbc.ws.WebSocketDriver + url: jdbc:TAOS-WS://localhost:6041/mp_test?charset=UTF-8&locale=en_US.UTF-8&timezone=UTC-8 username: root password: taosdata diff --git a/docs/examples/JDBC/mybatisplus-demo/src/test/java/com/taosdata/example/mybatisplusdemo/mapper/MetersMapperTest.java b/docs/examples/JDBC/mybatisplus-demo/src/test/java/com/taosdata/example/mybatisplusdemo/mapper/MetersMapperTest.java new file mode 100644 index 0000000000..2d8458e9d9 --- /dev/null +++ b/docs/examples/JDBC/mybatisplus-demo/src/test/java/com/taosdata/example/mybatisplusdemo/mapper/MetersMapperTest.java @@ -0,0 +1,112 @@ +package com.taosdata.example.mybatisplusdemo.mapper; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.taosdata.example.mybatisplusdemo.domain.Meters; +import com.taosdata.example.mybatisplusdemo.domain.Weather; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.sql.Timestamp; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +@RunWith(SpringJUnit4ClassRunner.class) +@SpringBootTest +public class MetersMapperTest { + + private static Random random = new Random(System.currentTimeMillis()); + + @Autowired + private MetersMapper mapper; + + @Before + public void createTable(){ + mapper.dropTable(); + mapper.createTable(); + Meters one = new Meters(); + one.setTbname("test_10001"); + one.setGroupid(10001); + one.setCurrent(random.nextFloat()); + one.setPhase(random.nextFloat()); + one.setCurrent(12345); + one.setTs(new Timestamp(1605024000000l)); + one.setLocation("望京".getBytes()); + mapper.insertOne(one); + } + + @Test + public void testSelectList() { + List meters = mapper.selectList(null); + meters.forEach(System.out::println); + } + + @Test + public void testInsertBatch() { + List metersList = new LinkedList<>(); + for (int i = 0; i < 100; i++){ + Meters one = new Meters(); + one.setTbname("tb_" + i); + one.setGroupid(i); + one.setCurrent(random.nextFloat()); + one.setPhase(random.nextFloat()); + one.setCurrent(random.nextInt()); + one.setTs(new Timestamp(1605024000000l + i)); + one.setLocation(("望京" + i).getBytes()); + metersList.add(one); + + } + int affectRows = mapper.insertBatch(metersList); + Assert.assertEquals(100, affectRows); + } + + @Test + public void testSelectOne() { + QueryWrapper wrapper = new QueryWrapper<>(); + wrapper.eq("location", "望京".getBytes()); + Meters one = mapper.selectOne(wrapper); + System.out.println(one); + Assert.assertEquals(12345, one.getCurrent(), 0.00f); + Assert.assertArrayEquals("望京".getBytes(), one.getLocation()); + } + + // @Test + // public void testSelectByMap() { + // Map map = new HashMap<>(); + // map.put("location", "beijing"); + // List weathers = mapper.selectByMap(map); + // Assert.assertEquals(1, weathers.size()); + // } + + @Test + public void testSelectObjs() { + List ts = mapper.selectObjs(null); + System.out.println(ts); + } + + @Test + public void testSelectCount() { + int count = mapper.selectCount(null); +// Assert.assertEquals(5, count); + System.out.println(count); + } + + @Test + public void testSelectPage() { + IPage page = new Page(1, 2); + IPage metersIPage = mapper.selectPage(page, null); + System.out.println("total : " + metersIPage.getTotal()); + System.out.println("pages : " + metersIPage.getPages()); + for (Meters meters : metersIPage.getRecords()) { + System.out.println(meters); + } + } + +} \ No newline at end of file diff --git a/docs/examples/JDBC/springbootdemo/pom.xml b/docs/examples/JDBC/springbootdemo/pom.xml index ee15f6013e..b3ead6cd11 100644 --- a/docs/examples/JDBC/springbootdemo/pom.xml +++ b/docs/examples/JDBC/springbootdemo/pom.xml @@ -68,7 +68,7 @@ com.taosdata.jdbc taos-jdbcdriver - 3.0.0 + 3.4.0 diff --git a/docs/zh/07-develop/01-connect/index.md b/docs/zh/07-develop/01-connect/index.md index bd26bea46d..94f55967ec 100644 --- a/docs/zh/07-develop/01-connect/index.md +++ b/docs/zh/07-develop/01-connect/index.md @@ -89,7 +89,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速 com.taosdata.jdbc taos-jdbcdriver - 3.3.3 + 3.4.0 ``` diff --git a/docs/zh/14-reference/05-connector/14-java.mdx b/docs/zh/14-reference/05-connector/14-java.mdx index 0a167dd5ee..e8554ae668 100644 --- a/docs/zh/14-reference/05-connector/14-java.mdx +++ b/docs/zh/14-reference/05-connector/14-java.mdx @@ -34,6 +34,7 @@ REST 连接支持所有能运行 Java 的平台。 | taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 | | :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: | | 3.4.0 | 1. 使用 jackson 库替换 fastjson 库;2. WebSocket 采用独立协议标识;3. 优化后台拉取线程使用,避免用户误用导致超时。 | - | +| 3.3.4 | 1. 解决了 getInt 在数据类型为 float 报错 | - | | 3.3.3 | 1. 解决了 WebSocket statement 关闭导致的内存泄漏 | - | | 3.3.2 | 1. 优化 WebSocket 连接下的参数绑定性能;2. 优化了对 mybatis 的支持 | - | | 3.3.0 | 1. 优化 WebSocket 连接下的数据传输性能;2. 支持跳过 SSL 验证,默认关闭 | 3.3.2.0 及更高版本 | diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index de10d6844e..53b8e0e0b9 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -714,7 +714,7 @@ int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeChec void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId); bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId); void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal); -int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask); +int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask, int64_t sendingChkptId); void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId); int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, SRpcHandleInfo* pInfo, int32_t code); diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 941956ae2b..8e4b307276 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -379,9 +379,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) { - mError("vgId:%d HbMsgId:%d already handled, bh msg discard", pEntry->nodeId, req.msgId); + mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId); - terrno = TSDB_CODE_INVALID_MSG; + terrno = TSDB_CODE_SUCCESS; doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); streamMutexUnlock(&execInfo.lock); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 326e8d4ada..1ea524dc78 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -652,7 +652,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve streamMetaWUnLock(pMeta); if (code < 0) { - tqError("failed to add s-task:0x%x into vgId:%d meta, existed:%d, code:%s", vgId, taskId, numOfTasks, + tqError("vgId:%d failed to register s-task:0x%x into meta, existed tasks:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); return code; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 2280b7f06f..9eab33cee8 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1208,34 +1208,37 @@ void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_ // record the dispatch checkpoint trigger info in the list // memory insufficient may cause the stream computing stopped -int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { +int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask, int64_t sendingChkptId) { SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; int64_t now = taosGetTimestampMs(); int32_t code = 0; streamMutexLock(&pInfo->lock); - pInfo->dispatchTrigger = true; - if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; + if (sendingChkptId > pInfo->failedId) { + pInfo->dispatchTrigger = true; + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; - STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId}; - void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); - if (px == NULL) { // pause the stream task, if memory not enough - code = terrno; - } - } else { - for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) { - SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i); - if (pVgInfo == NULL) { - continue; - } - - STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId}; - void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); + STaskTriggerSendInfo p = { + .sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId}; + void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); if (px == NULL) { // pause the stream task, if memory not enough code = terrno; - break; + } + } else { + for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) { + SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i); + if (pVgInfo == NULL) { + continue; + } + + STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId}; + void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); + if (px == NULL) { // pause the stream task, if memory not enough + code = terrno; + break; + } } } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5807240f5e..b4fcf1edc9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -845,7 +845,15 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { streamMutexUnlock(&pTask->msgInfo.lock); code = doBuildDispatchMsg(pTask, pBlock); + + int64_t chkptId = 0; if (code == 0) { + if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + SSDataBlock* p = taosArrayGet(pBlock->blocks, 0); + if (pBlock != NULL) { + chkptId = p->info.version; + } + } destroyStreamDataBlock(pBlock); } else { // todo handle build dispatch msg failed } @@ -862,7 +870,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { continue; } - code = streamTaskInitTriggerDispatchInfo(pTask); + code = streamTaskInitTriggerDispatchInfo(pTask, chkptId); if (code != TSDB_CODE_SUCCESS) { // todo handle error } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 86f305df60..a6f87711bf 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -723,8 +723,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask); code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)); - if (code) { // todo remove it from task list + if (code) { stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId); + void* pUnused = taosArrayPop(pMeta->pTaskList); int32_t ret = taosRemoveRef(streamTaskRefPool, refId); if (ret != 0) { @@ -734,6 +735,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) { + int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); + void* pUnused = taosArrayPop(pMeta->pTaskList); + int32_t ret = taosRemoveRef(streamTaskRefPool, refId); if (ret) { stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId); @@ -742,6 +746,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } if ((code = streamMetaCommit(pMeta)) != 0) { + int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); + void* pUnused = taosArrayPop(pMeta->pTaskList); + int32_t ret = taosRemoveRef(streamTaskRefPool, refId); if (ret) { stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);