Merge branch 'main' of https://github.com/taosdata/TDengine into fix/TD-33020
This commit is contained in:
commit
d1f8f383fb
|
@ -1,4 +1,10 @@
|
|||
# Pull Request Checklist
|
||||
# Description
|
||||
|
||||
Please briefly describe the code changes in this pull request.
|
||||
|
||||
# Checklist
|
||||
|
||||
Please check the items in the checklist if applicable.
|
||||
|
||||
- [ ] Is the user manual updated?
|
||||
- [ ] Are the test cases passed and automated?
|
||||
|
|
|
@ -94,7 +94,7 @@ If you are using Maven to manage the project, simply add the following dependenc
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.3.3</version>
|
||||
<version>3.4.0</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
|
|
|
@ -35,6 +35,8 @@ REST connections are supported on all platforms that can run Java.
|
|||
|
||||
| taos-jdbcdriver Version | Major Changes | TDengine Version |
|
||||
| :----------------------: | :-------------------------------------------------------------------------------------------------------------------------------------------------------: | :-------------------: |
|
||||
| 3.4.0 | 1. Replace the fastjson library with the Jackson library; 2. WebSocket connection protocal uses independent identification; 3. Optimize the use of backend pull threads to avoid user misuse leading to timeouts.| - |
|
||||
| 3.3.4 | 1. Fixed getInt error when data type is float| - |
|
||||
| 3.3.3 | 1. Fixed memory leak caused by closing WebSocket statement | - |
|
||||
| 3.3.2 | 1. Optimized parameter binding performance under WebSocket connections; 2. Improved support for MyBatis | - |
|
||||
| 3.3.0 | 1. Optimized data transmission performance under WebSocket connections; 2. Supported skipping SSL verification, turned off by default | 3.3.2.0 and above |
|
||||
|
|
|
@ -47,7 +47,7 @@
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.2.4</version>
|
||||
<version>3.4.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
package com.taosdata.example.mybatisplusdemo.domain;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
|
||||
@Data
|
||||
public class Meters {
|
||||
private String tbname;
|
||||
private Timestamp ts;
|
||||
private float current;
|
||||
private int voltage;
|
||||
private float phase;
|
||||
private int groupid;
|
||||
private byte[] location;
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package com.taosdata.example.mybatisplusdemo.mapper;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.taosdata.example.mybatisplusdemo.domain.Meters;
|
||||
import org.apache.ibatis.annotations.Insert;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
import org.apache.ibatis.annotations.Update;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface MetersMapper extends BaseMapper<Meters> {
|
||||
|
||||
@Update("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
|
||||
int createTable();
|
||||
|
||||
@Insert("insert into meters (tbname, ts, groupid, location, current, voltage, phase) values(#{tbname}, #{ts}, #{groupid}, #{location}, #{current}, #{voltage}, #{phase})")
|
||||
int insertOne(Meters one);
|
||||
|
||||
@Insert({
|
||||
"<script>",
|
||||
"insert into meters (tbname, ts, groupid, location, current, voltage, phase) values ",
|
||||
"<foreach collection='list' item='item' index='index' separator=','>",
|
||||
"(#{item.tbname}, #{item.ts}, #{item.groupid}, #{item.location}, #{item.current}, #{item.voltage}, #{item.phase})",
|
||||
"</foreach>",
|
||||
"</script>"
|
||||
})
|
||||
int insertBatch(@Param("list") List<Meters> metersList);
|
||||
|
||||
@Update("drop stable if exists meters")
|
||||
void dropTable();
|
||||
}
|
|
@ -1,7 +1,7 @@
|
|||
spring:
|
||||
datasource:
|
||||
driver-class-name: com.taosdata.jdbc.TSDBDriver
|
||||
url: jdbc:TAOS://localhost:6030/mp_test?charset=UTF-8&locale=en_US.UTF-8&timezone=UTC-8
|
||||
driver-class-name: com.taosdata.jdbc.ws.WebSocketDriver
|
||||
url: jdbc:TAOS-WS://localhost:6041/mp_test?charset=UTF-8&locale=en_US.UTF-8&timezone=UTC-8
|
||||
username: root
|
||||
password: taosdata
|
||||
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
package com.taosdata.example.mybatisplusdemo.mapper;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.taosdata.example.mybatisplusdemo.domain.Meters;
|
||||
import com.taosdata.example.mybatisplusdemo.domain.Weather;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@SpringBootTest
|
||||
public class MetersMapperTest {
|
||||
|
||||
private static Random random = new Random(System.currentTimeMillis());
|
||||
|
||||
@Autowired
|
||||
private MetersMapper mapper;
|
||||
|
||||
@Before
|
||||
public void createTable(){
|
||||
mapper.dropTable();
|
||||
mapper.createTable();
|
||||
Meters one = new Meters();
|
||||
one.setTbname("test_10001");
|
||||
one.setGroupid(10001);
|
||||
one.setCurrent(random.nextFloat());
|
||||
one.setPhase(random.nextFloat());
|
||||
one.setCurrent(12345);
|
||||
one.setTs(new Timestamp(1605024000000l));
|
||||
one.setLocation("望京".getBytes());
|
||||
mapper.insertOne(one);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectList() {
|
||||
List<Meters> meters = mapper.selectList(null);
|
||||
meters.forEach(System.out::println);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertBatch() {
|
||||
List<Meters> metersList = new LinkedList<>();
|
||||
for (int i = 0; i < 100; i++){
|
||||
Meters one = new Meters();
|
||||
one.setTbname("tb_" + i);
|
||||
one.setGroupid(i);
|
||||
one.setCurrent(random.nextFloat());
|
||||
one.setPhase(random.nextFloat());
|
||||
one.setCurrent(random.nextInt());
|
||||
one.setTs(new Timestamp(1605024000000l + i));
|
||||
one.setLocation(("望京" + i).getBytes());
|
||||
metersList.add(one);
|
||||
|
||||
}
|
||||
int affectRows = mapper.insertBatch(metersList);
|
||||
Assert.assertEquals(100, affectRows);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectOne() {
|
||||
QueryWrapper<Meters> wrapper = new QueryWrapper<>();
|
||||
wrapper.eq("location", "望京".getBytes());
|
||||
Meters one = mapper.selectOne(wrapper);
|
||||
System.out.println(one);
|
||||
Assert.assertEquals(12345, one.getCurrent(), 0.00f);
|
||||
Assert.assertArrayEquals("望京".getBytes(), one.getLocation());
|
||||
}
|
||||
|
||||
// @Test
|
||||
// public void testSelectByMap() {
|
||||
// Map<String, Object> map = new HashMap<>();
|
||||
// map.put("location", "beijing");
|
||||
// List<Weather> weathers = mapper.selectByMap(map);
|
||||
// Assert.assertEquals(1, weathers.size());
|
||||
// }
|
||||
|
||||
@Test
|
||||
public void testSelectObjs() {
|
||||
List<Object> ts = mapper.selectObjs(null);
|
||||
System.out.println(ts);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectCount() {
|
||||
int count = mapper.selectCount(null);
|
||||
// Assert.assertEquals(5, count);
|
||||
System.out.println(count);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectPage() {
|
||||
IPage page = new Page(1, 2);
|
||||
IPage<Meters> metersIPage = mapper.selectPage(page, null);
|
||||
System.out.println("total : " + metersIPage.getTotal());
|
||||
System.out.println("pages : " + metersIPage.getPages());
|
||||
for (Meters meters : metersIPage.getRecords()) {
|
||||
System.out.println(meters);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -68,7 +68,7 @@
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.0.0</version>
|
||||
<version>3.4.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
|
|
@ -89,7 +89,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.3.3</version>
|
||||
<version>3.4.0</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ REST 连接支持所有能运行 Java 的平台。
|
|||
| taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 |
|
||||
| :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: |
|
||||
| 3.4.0 | 1. 使用 jackson 库替换 fastjson 库;2. WebSocket 采用独立协议标识;3. 优化后台拉取线程使用,避免用户误用导致超时。 | - |
|
||||
| 3.3.4 | 1. 解决了 getInt 在数据类型为 float 报错 | - |
|
||||
| 3.3.3 | 1. 解决了 WebSocket statement 关闭导致的内存泄漏 | - |
|
||||
| 3.3.2 | 1. 优化 WebSocket 连接下的参数绑定性能;2. 优化了对 mybatis 的支持 | - |
|
||||
| 3.3.0 | 1. 优化 WebSocket 连接下的数据传输性能;2. 支持跳过 SSL 验证,默认关闭 | 3.3.2.0 及更高版本 |
|
||||
|
|
|
@ -714,7 +714,7 @@ int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeChec
|
|||
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId);
|
||||
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId);
|
||||
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal);
|
||||
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
|
||||
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask, int64_t sendingChkptId);
|
||||
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
|
||||
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
||||
SRpcHandleInfo* pInfo, int32_t code);
|
||||
|
|
|
@ -379,9 +379,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) {
|
||||
mError("vgId:%d HbMsgId:%d already handled, bh msg discard", pEntry->nodeId, req.msgId);
|
||||
mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId);
|
||||
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
|
||||
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
|
|
|
@ -652,7 +652,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
|
|||
streamMetaWUnLock(pMeta);
|
||||
|
||||
if (code < 0) {
|
||||
tqError("failed to add s-task:0x%x into vgId:%d meta, existed:%d, code:%s", vgId, taskId, numOfTasks,
|
||||
tqError("vgId:%d failed to register s-task:0x%x into meta, existed tasks:%d, code:%s", vgId, taskId, numOfTasks,
|
||||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -1208,34 +1208,37 @@ void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_
|
|||
|
||||
// record the dispatch checkpoint trigger info in the list
|
||||
// memory insufficient may cause the stream computing stopped
|
||||
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
|
||||
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask, int64_t sendingChkptId) {
|
||||
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
int32_t code = 0;
|
||||
|
||||
streamMutexLock(&pInfo->lock);
|
||||
|
||||
pInfo->dispatchTrigger = true;
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
|
||||
if (sendingChkptId > pInfo->failedId) {
|
||||
pInfo->dispatchTrigger = true;
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
|
||||
|
||||
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
|
||||
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
||||
if (px == NULL) { // pause the stream task, if memory not enough
|
||||
code = terrno;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
|
||||
if (pVgInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
|
||||
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
||||
STaskTriggerSendInfo p = {
|
||||
.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
|
||||
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
||||
if (px == NULL) { // pause the stream task, if memory not enough
|
||||
code = terrno;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
|
||||
if (pVgInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
|
||||
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
||||
if (px == NULL) { // pause the stream task, if memory not enough
|
||||
code = terrno;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -845,7 +845,15 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
streamMutexUnlock(&pTask->msgInfo.lock);
|
||||
|
||||
code = doBuildDispatchMsg(pTask, pBlock);
|
||||
|
||||
int64_t chkptId = 0;
|
||||
if (code == 0) {
|
||||
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||
SSDataBlock* p = taosArrayGet(pBlock->blocks, 0);
|
||||
if (pBlock != NULL) {
|
||||
chkptId = p->info.version;
|
||||
}
|
||||
}
|
||||
destroyStreamDataBlock(pBlock);
|
||||
} else { // todo handle build dispatch msg failed
|
||||
}
|
||||
|
@ -862,7 +870,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
continue;
|
||||
}
|
||||
|
||||
code = streamTaskInitTriggerDispatchInfo(pTask);
|
||||
code = streamTaskInitTriggerDispatchInfo(pTask, chkptId);
|
||||
if (code != TSDB_CODE_SUCCESS) { // todo handle error
|
||||
}
|
||||
}
|
||||
|
|
|
@ -723,8 +723,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
|||
|
||||
pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask);
|
||||
code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t));
|
||||
if (code) { // todo remove it from task list
|
||||
if (code) {
|
||||
stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
|
||||
void* pUnused = taosArrayPop(pMeta->pTaskList);
|
||||
|
||||
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
||||
if (ret != 0) {
|
||||
|
@ -734,6 +735,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
|||
}
|
||||
|
||||
if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
|
||||
int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||
void* pUnused = taosArrayPop(pMeta->pTaskList);
|
||||
|
||||
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
||||
if (ret) {
|
||||
stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
|
||||
|
@ -742,6 +746,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
|||
}
|
||||
|
||||
if ((code = streamMetaCommit(pMeta)) != 0) {
|
||||
int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||
void* pUnused = taosArrayPop(pMeta->pTaskList);
|
||||
|
||||
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
||||
if (ret) {
|
||||
stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
|
||||
|
|
Loading…
Reference in New Issue