Merge pull request #22350 from taosdata/fix/3_liaohj
other: merge main and fix bugs in stop stream tasks.
|
@ -83,7 +83,7 @@ If `maven` is used to manage the projects, what needs to be done is only adding
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.taosdata.jdbc</groupId>
|
<groupId>com.taosdata.jdbc</groupId>
|
||||||
<artifactId>taos-jdbcdriver</artifactId>
|
<artifactId>taos-jdbcdriver</artifactId>
|
||||||
<version>3.2.1</version>
|
<version>3.2.4</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
---
|
||||||
|
sidebar_label: qStudio
|
||||||
|
title: qStudio
|
||||||
|
description: Step-by-Step Guide to Accessing TDengine Data with qStudio
|
||||||
|
---
|
||||||
|
|
||||||
|
qStudio is a free cross-platform SQL data analysis tool that allows easy browsing of tables, variables, functions, and configuration settings in a database. The latest version of qStudio includes built-in support for TDengine.
|
||||||
|
|
||||||
|
## Prerequisites
|
||||||
|
|
||||||
|
To connect TDengine using qStudio, you need to complete the following preparations:
|
||||||
|
|
||||||
|
- Install qStudio: qStudio supports major operating systems, including Windows, macOS, and Linux. Please ensure you download the correct installation package for your platform from the [download page](https://www.timestored.com/qstudio/download/).
|
||||||
|
- Set up TDengine instance: Make sure TDengine is installed and running correctly, and the taosAdapter is installed and running. For detailed information, refer to the taosAdapter User Manual.
|
||||||
|
|
||||||
|
## Connecting to TDengine with qStudio
|
||||||
|
|
||||||
|
1. Launch the qStudio application and select "Server" and then "Add Server..." from the menu. Choose TDengine from the Server Type dropdown.
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
2. Configure the TDengine connection by entering the host address, port number, username, and password. If TDengine is deployed on the local machine, you can fill in the username and password only. The default username is "root," and the default password is "taosdata." Click "Test" to test the connection's availability. If the TDengine Java connector is not installed on the local machine, qStudio will prompt you to download and install it.
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
3. Once connected successfully, the screen will display as shown below. If the connection fails, check that the TDengine service and taosAdapter are running correctly, and ensure that the host address, port number, username, and password are correct.
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
4. Use qStudio to select databases and tables to browse data from the TDengine server.
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
5. You can also perform operations on TDengine data by executing SQL commands.
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
6. qStudio supports charting functions based on the data. For more information, please refer to the [qStudio documentation](https://www.timestored.com/qstudio/help).
|
||||||
|
|
||||||
|

|
After Width: | Height: | Size: 94 KiB |
After Width: | Height: | Size: 148 KiB |
After Width: | Height: | Size: 34 KiB |
After Width: | Height: | Size: 93 KiB |
After Width: | Height: | Size: 39 KiB |
After Width: | Height: | Size: 78 KiB |
|
@ -22,7 +22,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.taosdata.jdbc</groupId>
|
<groupId>com.taosdata.jdbc</groupId>
|
||||||
<artifactId>taos-jdbcdriver</artifactId>
|
<artifactId>taos-jdbcdriver</artifactId>
|
||||||
<version>3.2.1</version>
|
<version>3.2.4</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- ANCHOR_END: dep-->
|
<!-- ANCHOR_END: dep-->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -33,4 +33,4 @@
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -82,7 +82,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.taosdata.jdbc</groupId>
|
<groupId>com.taosdata.jdbc</groupId>
|
||||||
<artifactId>taos-jdbcdriver</artifactId>
|
<artifactId>taos-jdbcdriver</artifactId>
|
||||||
<version>3.2.1</version>
|
<version>3.2.4</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
|
@ -446,7 +446,7 @@ TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据
|
||||||
**注意**:
|
**注意**:
|
||||||
|
|
||||||
- JDBC REST 连接目前不支持参数绑定
|
- JDBC REST 连接目前不支持参数绑定
|
||||||
- 以下示例代码基于 taos-jdbcdriver-3.2.1
|
- 以下示例代码基于 taos-jdbcdriver-3.2.4
|
||||||
- binary 类型数据需要调用 setString 方法,nchar 类型数据需要调用 setNString 方法
|
- binary 类型数据需要调用 setString 方法,nchar 类型数据需要调用 setNString 方法
|
||||||
- 预处理语句中指定数据库与子表名称不要使用 `db.?`,应直接使用 `?`,然后在 setTableName 中指定数据库,如:`prepareStatement.setTableName("db.t1")`。
|
- 预处理语句中指定数据库与子表名称不要使用 `db.?`,应直接使用 `?`,然后在 setTableName 中指定数据库,如:`prepareStatement.setTableName("db.t1")`。
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
---
|
||||||
|
sidebar_label: qStudio
|
||||||
|
title: qStudio
|
||||||
|
description: 使用 qStudio 存取 TDengine 数据的详细指南
|
||||||
|
---
|
||||||
|
|
||||||
|
qStudio 是一款免费的多平台 SQL 数据分析工具,可以轻松浏览数据库中的表、变量、函数和配置设置。最新版本 qStudio 内嵌支持 TDengine。
|
||||||
|
|
||||||
|
## 前置条件
|
||||||
|
|
||||||
|
使用 qStudio 连接 TDengine 需要以下几方面的准备工作。
|
||||||
|
|
||||||
|
- 安装 qStudio。qStudio 支持主流操作系统包括 Windows、macOS 和 Linux。请注意[下载](https://www.timestored.com/qstudio/download/)正确平台的安装包。
|
||||||
|
- 安装 TDengine 实例,请确认 TDengine 正常运行,并且 taosAdapter 已经安装并正常运行,具体细节请参考 [taosAdapter 的使用手册](/reference/taosadapter)。
|
||||||
|
|
||||||
|
## 使用 qStudio 连接 TDengine
|
||||||
|
|
||||||
|
1. 启动 qStudio 应用,从菜单项选择“Server” 和 “Add Server...”,然后在 Server Type 下拉框中选择 TDengine。
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
2. 配置 TDengine 连接,填入主机地址、端口号、用户名和密码。如果 TDengine 部署在本机,可以只填用户名和密码,默认用户名为 root,默认密码为 taosdata。点击“Test”可以对连接是否可用进行测试。如果本机没有安装 TDengine Java
|
||||||
|
连接器,qStudio 会提示下载安装。
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
3. 连接成功将显示如下图所示。如果显示连接失败,请检查 TDengine 服务和 taosAdapter 是否正确运行,主机地址、端口号、用户名和密码是否正确。
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
4. 使用 qStudio 选择数据库和表可以浏览 TDengine 服务的数据。
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
5. 也可以通过执行 SQL 命令的方式对 TDengine 数据进行操作。
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
6. qStudio 支持根据数据绘制图表等功能,请参考 [qStudio 的帮助文档](https://www.timestored.com/qstudio/help)
|
||||||
|
|
||||||
|

|
After Width: | Height: | Size: 94 KiB |
After Width: | Height: | Size: 148 KiB |
After Width: | Height: | Size: 34 KiB |
After Width: | Height: | Size: 93 KiB |
After Width: | Height: | Size: 39 KiB |
After Width: | Height: | Size: 78 KiB |
|
@ -183,7 +183,7 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||||
if (pTask->status.timerActive == 1) {
|
if (pTask->status.timerActive >= 1) {
|
||||||
inTimer = true;
|
inTimer = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "tsdbFSet2.h"
|
#include "tsdbFSet2.h"
|
||||||
|
#include "tsdbMerge.h"
|
||||||
#include "tsdbReadUtil.h"
|
#include "tsdbReadUtil.h"
|
||||||
#include "tsdbSttFileRW.h"
|
#include "tsdbSttFileRW.h"
|
||||||
|
|
||||||
|
@ -352,10 +353,14 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t uidComparFn(const void *p1, const void *p2) {
|
static int32_t suidComparFn(const void *target, const void *p2) {
|
||||||
const uint64_t *uid1 = p1;
|
const uint64_t *targetUid = target;
|
||||||
const uint64_t *uid2 = p2;
|
const uint64_t *uid2 = p2;
|
||||||
return (*uid1) - (*uid2);
|
if (*uid2 == (*targetUid)) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return (*targetUid) < (*uid2) ? -1:1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint64_t suid, uint64_t uid,
|
static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint64_t suid, uint64_t uid,
|
||||||
|
@ -372,29 +377,55 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// for (; i < TARRAY2_SIZE(pStatisBlkArray); ++i) {
|
|
||||||
// SStatisBlk *p = &pStatisBlkArray->data[i];
|
|
||||||
// if (p->minTbid.uid <= uid && p->maxTbid.uid >= uid) {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (p->maxTbid.uid < uid) {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
|
if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStatisBlk *p = &pStatisBlkArray->data[i];
|
while(i < TARRAY2_SIZE(pStatisBlkArray)) {
|
||||||
STbStatisBlock block = {0};
|
SStatisBlk *p = &pStatisBlkArray->data[i];
|
||||||
tsdbSttFileReadStatisBlock(pReader, p, &block);
|
if (p->minTbid.suid > suid) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t index = tarray2SearchIdx(block.uid, &uid, sizeof(int64_t), uidComparFn, TD_EQ);
|
STbStatisBlock block = {0};
|
||||||
tStatisBlockDestroy(&block);
|
tsdbSttFileReadStatisBlock(pReader, p, &block);
|
||||||
|
|
||||||
return (index != -1);
|
int32_t index = tarray2SearchIdx(block.suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ);
|
||||||
|
if (index == -1) {
|
||||||
|
tStatisBlockDestroy(&block);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t j = index;
|
||||||
|
if (block.uid->data[j] == uid) {
|
||||||
|
tStatisBlockDestroy(&block);
|
||||||
|
return true;
|
||||||
|
} else if (block.uid->data[j] > uid) {
|
||||||
|
while (j >= 0 && block.suid->data[j] == suid) {
|
||||||
|
if (block.uid->data[j] == uid) {
|
||||||
|
tStatisBlockDestroy(&block);
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
j -= 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
j = index + 1;
|
||||||
|
while (j < block.suid->size && block.suid->data[j] == suid) {
|
||||||
|
if (block.uid->data[j] == uid) {
|
||||||
|
tStatisBlockDestroy(&block);
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
j += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tStatisBlockDestroy(&block);
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
|
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
|
||||||
|
@ -452,12 +483,12 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader
|
||||||
tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
|
tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
// bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader);
|
bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader);
|
||||||
// if (!exists) {
|
if (!exists) {
|
||||||
// pIter->iSttBlk = -1;
|
pIter->iSttBlk = -1;
|
||||||
// pIter->pSttBlk = NULL;
|
pIter->pSttBlk = NULL;
|
||||||
// return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
// }
|
}
|
||||||
|
|
||||||
// find the start block, actually we could load the position to avoid repeatly searching for the start position when
|
// find the start block, actually we could load the position to avoid repeatly searching for the start position when
|
||||||
// the skey is updated.
|
// the skey is updated.
|
||||||
|
|
|
@ -628,7 +628,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg);
|
return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_VND_TMQ_SEEK:
|
case TDMT_VND_TMQ_SEEK:
|
||||||
return tqProcessSeekReq(pVnode->pTq, pMsg);
|
return tqProcessSeekReq(pVnode->pTq, pMsg);
|
||||||
|
|
||||||
default:
|
default:
|
||||||
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
||||||
return TSDB_CODE_APP_ERROR;
|
return TSDB_CODE_APP_ERROR;
|
||||||
|
|
|
@ -550,13 +550,27 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
||||||
|
|
||||||
static void doRetryDispatchData(void* param, void* tmrId) {
|
static void doRetryDispatchData(void* param, void* tmrId) {
|
||||||
SStreamTask* pTask = param;
|
SStreamTask* pTask = param;
|
||||||
|
|
||||||
|
if (streamTaskShouldStop(&pTask->status)) {
|
||||||
|
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||||
|
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
|
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
|
||||||
|
|
||||||
int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
|
int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
|
if (!streamTaskShouldStop(&pTask->status)) {
|
||||||
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
|
qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
|
||||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
|
||||||
|
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||||
|
} else {
|
||||||
|
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||||
|
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -137,19 +137,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
tFreeStreamTask(*(SStreamTask**)pIter);
|
||||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
|
||||||
if (pTask->schedTimer) {
|
|
||||||
taosTmrStop(pTask->schedTimer);
|
|
||||||
pTask->schedTimer = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->launchTaskTimer) {
|
|
||||||
taosTmrStop(pTask->launchTaskTimer);
|
|
||||||
pTask->launchTaskTimer = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
tFreeStreamTask(pTask);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashCleanup(pMeta->pTasks);
|
taosHashCleanup(pMeta->pTasks);
|
||||||
|
@ -362,11 +350,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||||
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
||||||
doRemoveIdFromList(pMeta, num, pTask->id.taskId);
|
doRemoveIdFromList(pMeta, num, pTask->id.taskId);
|
||||||
|
|
||||||
// remove the ref by timer
|
|
||||||
if (pTask->triggerParam != 0) {
|
|
||||||
taosTmrStop(pTask->schedTimer);
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaRemoveTask(pMeta, taskId);
|
streamMetaRemoveTask(pMeta, taskId);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -540,14 +540,14 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
ASSERT((*ppTask)->status.timerActive == 1);
|
ASSERT((*ppTask)->status.timerActive >= 1);
|
||||||
|
|
||||||
if (streamTaskShouldStop(&(*ppTask)->status)) {
|
if (streamTaskShouldStop(&(*ppTask)->status)) {
|
||||||
const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
|
const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
|
||||||
qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);
|
qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);
|
||||||
|
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
(*ppTask)->status.timerActive = 0;
|
atomic_sub_fetch_8(&(*ppTask)->status.timerActive, 1);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -556,7 +556,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
ASSERT(pTask->status.timerActive == 1);
|
ASSERT(pTask->status.timerActive >= 1);
|
||||||
|
|
||||||
// abort the timer if intend to stop task
|
// abort the timer if intend to stop task
|
||||||
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
|
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
|
||||||
|
@ -578,7 +578,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// not in timer anymore
|
// not in timer anymore
|
||||||
pTask->status.timerActive = 0;
|
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
} else {
|
} else {
|
||||||
qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
|
qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
|
||||||
|
@ -609,11 +609,11 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
// todo failed to create timer
|
// todo failed to create timer
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
} else {
|
} else {
|
||||||
pTask->status.timerActive = 1; // timer is active
|
atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
|
||||||
qDebug("s-task:%s set timer active flag", pTask->id.idStr);
|
qDebug("s-task:%s set timer active flag", pTask->id.idStr);
|
||||||
}
|
}
|
||||||
} else { // timer exists
|
} else { // timer exists
|
||||||
pTask->status.timerActive = 1;
|
ASSERT(pTask->status.timerActive > 0);
|
||||||
qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
|
qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
|
||||||
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
|
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,11 +13,11 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <libs/transport/trpc.h>
|
#include "streamInt.h"
|
||||||
#include <streamInt.h>
|
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "tstream.h"
|
#include "tstream.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
#include "ttimer.h"
|
||||||
|
|
||||||
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
|
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
|
||||||
int32_t childId = taosArrayGetSize(pArray);
|
int32_t childId = taosArrayGetSize(pArray);
|
||||||
|
@ -213,6 +213,22 @@ static void freeItem(void* p) {
|
||||||
void tFreeStreamTask(SStreamTask* pTask) {
|
void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
qDebug("free s-task:%s, %p", pTask->id.idStr, pTask);
|
qDebug("free s-task:%s, %p", pTask->id.idStr, pTask);
|
||||||
|
|
||||||
|
// remove the ref by timer
|
||||||
|
while(pTask->status.timerActive > 0) {
|
||||||
|
qDebug("s-task:%s wait for task stop timer activities", pTask->id.idStr);
|
||||||
|
taosMsleep(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTask->schedTimer != NULL) {
|
||||||
|
taosTmrStop(pTask->schedTimer);
|
||||||
|
pTask->schedTimer = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTask->launchTaskTimer != NULL) {
|
||||||
|
taosTmrStop(pTask->launchTaskTimer);
|
||||||
|
pTask->launchTaskTimer = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
|
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
|
||||||
if (pTask->inputQueue) {
|
if (pTask->inputQueue) {
|
||||||
streamQueueClose(pTask->inputQueue);
|
streamQueueClose(pTask->inputQueue);
|
||||||
|
|