Merge branch '3.0' into feature/3.0_interval_hash_optimize
This commit is contained in:
commit
2f5e197dbe
11
README.md
11
README.md
|
@ -15,11 +15,11 @@
|
|||
[](https://coveralls.io/github/taosdata/TDengine?branch=develop)
|
||||
[](https://bestpractices.coreinfrastructure.org/projects/4201)
|
||||
|
||||
English | [简体中文](README-CN.md) | We are hiring, check [here](https://tdengine.com/careers)
|
||||
English | [简体中文](README-CN.md) | [Lean more about TSDB](https://tdengine.com/tsdb)
|
||||
|
||||
# What is TDengine?
|
||||
|
||||
TDengine is an open source, high-performance, cloud native [time-series database](https://tdengine.com/tsdb/what-is-a-time-series-database/) optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. It enables efficient, real-time data ingestion, processing, and monitoring of TB and even PB scale data per day, generated by billions of sensors and data collectors. TDengine differentiates itself from other time-seires databases with the following advantages:
|
||||
TDengine is an open source, high-performance, cloud native [time-series database](https://tdengine.com/tsdb/) optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. It enables efficient, real-time data ingestion, processing, and monitoring of TB and even PB scale data per day, generated by billions of sensors and data collectors. TDengine differentiates itself from other time-seires databases with the following advantages:
|
||||
|
||||
- **[High-Performance](https://tdengine.com/tdengine/high-performance-time-series-database/)**: TDengine is the only time-series database to solve the high cardinality issue to support billions of data collection points while out performing other time-series databases for data ingestion, querying and data compression.
|
||||
|
||||
|
@ -33,6 +33,8 @@ TDengine is an open source, high-performance, cloud native [time-series database
|
|||
|
||||
- **[Open Source](https://tdengine.com/tdengine/open-source-time-series-database/)**: TDengine’s core modules, including cluster feature, are all available under open source licenses. It has gathered 18.8k stars on GitHub. There is an active developer community, and over 139k running instances worldwide.
|
||||
|
||||
For a full list of TDengine competitive advantages, please [check here](https://tdengine.com/tdengine/)
|
||||
|
||||
# Documentation
|
||||
|
||||
For user manual, system design and architecture, please refer to [TDengine Documentation](https://docs.tdengine.com) ([TDengine 文档](https://docs.taosdata.com))
|
||||
|
@ -319,6 +321,7 @@ TDengine provides abundant developing tools for users to develop on TDengine. Fo
|
|||
|
||||
Please follow the [contribution guidelines](CONTRIBUTING.md) to contribute to the project.
|
||||
|
||||
# Join TDengine WeChat Group
|
||||
# Join TDengine User Community
|
||||
|
||||
Add WeChat “tdengine” to join the group,you can communicate with other users.
|
||||
- Join [TDengine Discord Channel](https://discord.com/invite/VZdSuUg4pS?utm_id=discord)
|
||||
- Join wechat group by adding WeChat “tdengine”
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
IF (DEFINED VERNUMBER)
|
||||
SET(TD_VER_NUMBER ${VERNUMBER})
|
||||
ELSE ()
|
||||
SET(TD_VER_NUMBER "3.0.1.0")
|
||||
SET(TD_VER_NUMBER "3.0.1.1")
|
||||
ENDIF ()
|
||||
|
||||
IF (DEFINED VERCOMPATIBLE)
|
||||
|
|
|
@ -111,7 +111,7 @@ Note: TDengine only supports Windows Server 2016/2019 and Windows 10/11 on the W
|
|||
</Tabs>
|
||||
|
||||
:::info
|
||||
For information about TDengine releases, see [Release History](../../releases).
|
||||
For information about TDengine releases, see [Release History](../../releases/tdengine).
|
||||
:::
|
||||
|
||||
:::note
|
||||
|
|
|
@ -4,7 +4,7 @@ import PkgListV3 from "/components/PkgListV3";
|
|||
|
||||
<PkgListV3 type={1} sys="Linux" />
|
||||
|
||||
[All Downloads](../../releases)
|
||||
[All Downloads](../../releases/tdengine)
|
||||
|
||||
2. Unzip
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ import PkgListV3 from "/components/PkgListV3";
|
|||
|
||||
<PkgListV3 type={4} sys="Windows" />
|
||||
|
||||
[All Downloads](../../releases)
|
||||
[All Downloads](../../releases/tdengine)
|
||||
2. Execute the installer, select the default value as prompted, and complete the installation
|
||||
3. Installation path
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ taosAdapter provides the following features.
|
|||
|
||||
### Install taosAdapter
|
||||
|
||||
If you use the TDengine server, you don't need additional steps to install taosAdapter. You can download taosAdapter from [TDengine 3.0 released versions](../../releases) to download the TDengine server installation package. If you need to deploy taosAdapter separately on another server other than the TDengine server, you should install the full TDengine server package on that server to install taosAdapter. If you need to build taosAdapter from source code, you can refer to the [Building taosAdapter]( https://github.com/taosdata/taosadapter/blob/3.0/BUILD.md) documentation.
|
||||
If you use the TDengine server, you don't need additional steps to install taosAdapter. You can download taosAdapter from [TDengine 3.0 released versions](../../releases/tdengine) to download the TDengine server installation package. If you need to deploy taosAdapter separately on another server other than the TDengine server, you should install the full TDengine server package on that server to install taosAdapter. If you need to build taosAdapter from source code, you can refer to the [Building taosAdapter]( https://github.com/taosdata/taosadapter/blob/3.0/BUILD.md) documentation.
|
||||
|
||||
### Start/Stop taosAdapter
|
||||
|
||||
|
|
|
@ -1,9 +0,0 @@
|
|||
---
|
||||
sidebar_label: Releases
|
||||
title: Released Versions
|
||||
---
|
||||
|
||||
import Release from "/components/ReleaseV3";
|
||||
|
||||
|
||||
<Release versionPrefix="3.0" />
|
|
@ -0,0 +1,17 @@
|
|||
---
|
||||
sidebar_label: TDengine
|
||||
title: TDengine
|
||||
description: TDengine release history, Release Notes and download links.
|
||||
---
|
||||
|
||||
import Release from "/components/ReleaseV3";
|
||||
|
||||
|
||||
## 3.0.1.1
|
||||
|
||||
<Release type="tdengine" version="3.0.1.1" />
|
||||
|
||||
## 3.0.1.0
|
||||
|
||||
<Release type="tdengine" version="3.0.1.0" />
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
---
|
||||
sidebar_label: taosTools
|
||||
title: taosTools
|
||||
description: taosTools release history, Release Notes, download links.
|
||||
---
|
||||
|
||||
import Release from "/components/ReleaseV3";
|
||||
|
||||
## 2.2.0
|
||||
|
||||
<Release type="tools" version="2.2.0" />
|
||||
|
||||
## 2.1.3
|
||||
|
||||
<Release type="tools" version="2.1.3" />
|
|
@ -0,0 +1 @@
|
|||
label: Releases
|
|
@ -73,6 +73,7 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问
|
|||
</TabItem>
|
||||
|
||||
<TabItem value="apt-get" label="apt-get">
|
||||
|
||||
可以使用 `apt-get` 工具从官方仓库安装。
|
||||
|
||||
**配置包仓库**
|
||||
|
|
|
@ -6,6 +6,11 @@ description: TDengine 发布历史、Release Notes 及下载链接
|
|||
|
||||
import Release from "/components/ReleaseV3";
|
||||
|
||||
|
||||
## 3.0.1.1
|
||||
|
||||
<Release type="tdengine" version="3.0.1.1" />
|
||||
|
||||
## 3.0.1.0
|
||||
|
||||
<Release type="tdengine" version="3.0.1.0" />
|
||||
|
|
|
@ -6,6 +6,10 @@ description: taosTools 的发布历史、Release Notes 和下载链接
|
|||
|
||||
import Release from "/components/ReleaseV3";
|
||||
|
||||
## 2.2.0
|
||||
|
||||
<Release type="tools" version="2.2.0" />
|
||||
|
||||
## 2.1.3
|
||||
|
||||
<Release type="tools" version="2.1.3" />
|
||||
|
|
|
@ -285,8 +285,8 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
|
|||
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData);
|
||||
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
|
||||
// tsdbRead.c ==============================================================================================
|
||||
int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap);
|
||||
void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap);
|
||||
int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap, const char* id);
|
||||
void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap, const char* id);
|
||||
// tsdbMerge.c ==============================================================================================
|
||||
int32_t tsdbMerge(STsdb *pTsdb);
|
||||
|
||||
|
|
|
@ -847,7 +847,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
|||
|
||||
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
|
||||
|
||||
tsdbTakeReadSnap(pTsdb, &pIter->pReadSnap);
|
||||
tsdbTakeReadSnap(pTsdb, &pIter->pReadSnap, NULL);
|
||||
|
||||
STbData *pMem = NULL;
|
||||
if (pIter->pReadSnap->pMem) {
|
||||
|
@ -941,7 +941,7 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
|
|||
taosArrayDestroy(pIter->pSkyline);
|
||||
}
|
||||
|
||||
tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap);
|
||||
tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap, NULL);
|
||||
|
||||
_err:
|
||||
return code;
|
||||
|
|
|
@ -2096,8 +2096,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
|
||||
// it is a clean block, load it directly
|
||||
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) {
|
||||
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
|
||||
goto _end;
|
||||
if (pReader->order == TSDB_ORDER_ASC || (pReader->order == TSDB_ORDER_DESC && (!hasDataInLastBlock(pLastBlockReader)))) {
|
||||
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
} else { // file blocks not exist
|
||||
pBlockScanInfo = pReader->status.pTableIter;
|
||||
|
@ -3363,7 +3365,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
|||
goto _err;
|
||||
}
|
||||
|
||||
code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap);
|
||||
code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -3387,7 +3389,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
|||
STsdbReader* pPrevReader = pReader->innerReader[0];
|
||||
SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;
|
||||
|
||||
code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap);
|
||||
code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap, pReader->idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -3444,7 +3446,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
|||
tsdbDataFReaderClose(&pReader->pFileReader);
|
||||
}
|
||||
|
||||
tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap);
|
||||
tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
|
||||
|
||||
taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
|
||||
SIOCostSummary* pCost = &pReader->cost;
|
||||
|
@ -3720,8 +3722,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
|||
}
|
||||
}
|
||||
|
||||
tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
|
||||
pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
||||
tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%"PRId64", query range:%" PRId64 " - %" PRId64 " in query %s",
|
||||
pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -3866,7 +3868,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) {
|
||||
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
|
||||
int32_t code = 0;
|
||||
|
||||
// alloc
|
||||
|
@ -3909,12 +3911,12 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) {
|
|||
goto _exit;
|
||||
}
|
||||
|
||||
tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
|
||||
tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
|
||||
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
|
||||
if (pSnap) {
|
||||
if (pSnap->pMem) {
|
||||
tsdbUnrefMemTable(pSnap->pMem);
|
||||
|
@ -3927,6 +3929,5 @@ void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
|
|||
tsdbFSUnref(pTsdb, &pSnap->fs);
|
||||
taosMemoryFree(pSnap);
|
||||
}
|
||||
|
||||
tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
|
||||
tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
|
||||
}
|
||||
|
|
|
@ -533,6 +533,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
|
|||
taosArrayPush(rsp.pArray, &cRsp);
|
||||
}
|
||||
|
||||
vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
|
||||
tqUpdateTbUidList(pVnode->pTq, tbUids, true);
|
||||
if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
|
||||
goto _exit;
|
||||
|
@ -885,8 +886,9 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
if (NULL != submitBlkRsp.pMeta) {
|
||||
vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta);
|
||||
}
|
||||
|
||||
taosArrayPush(newTbUids, &createTbReq.uid);
|
||||
}
|
||||
taosArrayPush(newTbUids, &createTbReq.uid);
|
||||
|
||||
submitBlkRsp.uid = createTbReq.uid;
|
||||
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
|
||||
|
@ -917,6 +919,11 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
submitRsp.affectedRows += submitBlkRsp.affectedRows;
|
||||
taosArrayPush(submitRsp.pArray, &submitBlkRsp);
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(newTbUids) > 0) {
|
||||
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids));
|
||||
}
|
||||
|
||||
tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
|
||||
|
||||
_exit:
|
||||
|
|
|
@ -265,6 +265,15 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
|||
|
||||
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
STableListInfo* pListInfo = &pTaskInfo->tableqinfoList;
|
||||
|
||||
if (isAdd) {
|
||||
qDebug("add %d tables id into query list, %s", (int32_t) taosArrayGetSize(tableIdList), pTaskInfo->id.str);
|
||||
}
|
||||
|
||||
if (pListInfo->map == NULL) {
|
||||
pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
}
|
||||
|
||||
// traverse to the stream scanner node to add this table id
|
||||
SOperatorInfo* pInfo = pTaskInfo->pRoot;
|
||||
|
@ -311,13 +320,19 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
}
|
||||
}
|
||||
|
||||
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
|
||||
if (pTaskInfo->tableqinfoList.map == NULL) {
|
||||
pTaskInfo->tableqinfoList.map =
|
||||
taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
bool exists = false;
|
||||
for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) {
|
||||
STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k);
|
||||
if (pKeyInfo->uid == keyInfo.uid) {
|
||||
qWarn("ignore duplicated query table uid:%" PRIu64 " added, %s", pKeyInfo->uid, pTaskInfo->id.str);
|
||||
exists = true;
|
||||
}
|
||||
}
|
||||
|
||||
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
|
||||
if (!exists) {
|
||||
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
|
||||
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
|
||||
}
|
||||
}
|
||||
|
||||
if (keyBuf != NULL) {
|
||||
|
@ -480,6 +495,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) {
|
|||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = ret;
|
||||
cleanUpUdfs();
|
||||
|
||||
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
|
||||
atomic_store_64(&pTaskInfo->owner, 0);
|
||||
|
||||
|
@ -512,8 +528,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) {
|
|||
}
|
||||
|
||||
cleanUpUdfs();
|
||||
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
|
||||
|
||||
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
|
||||
qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
||||
GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
|
||||
|
||||
|
|
|
@ -617,19 +617,28 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
|
||||
// if scan table by table
|
||||
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
|
||||
if (pInfo->noTable) return NULL;
|
||||
if (pInfo->noTable) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* result = doTableScanGroup(pOperator);
|
||||
if (result) {
|
||||
return result;
|
||||
}
|
||||
|
||||
// if no data, switch to next table and continue scan
|
||||
pInfo->currentTable++;
|
||||
if (pInfo->currentTable >= taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList)) {
|
||||
if (pInfo->currentTable >= numOfTables) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
|
||||
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
|
||||
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, pInfo->currentTable, pTaskInfo->id.str);
|
||||
|
||||
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
|
||||
pInfo->scanTimes = 0;
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
#include "tcommon.h"
|
||||
#include "tmsg.h"
|
||||
#include "tname.h"
|
||||
#include "tdatablock.h"
|
||||
|
||||
SQWorkerMgmt gQwMgmt = {
|
||||
.lock = 0,
|
||||
|
@ -16,6 +17,11 @@ SQWorkerMgmt gQwMgmt = {
|
|||
.qwNum = 0,
|
||||
};
|
||||
|
||||
static void freeBlock(void* param) {
|
||||
SSDataBlock* pBlock = *(SSDataBlock**)param;
|
||||
blockDataDestroy(pBlock);
|
||||
}
|
||||
|
||||
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
|
||||
int32_t code = 0;
|
||||
SSchedulerHbRsp rsp = {0};
|
||||
|
@ -88,6 +94,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
|||
// if *taskHandle is NULL, it's killed right now
|
||||
if (taskHandle) {
|
||||
qwDbgSimulateSleep();
|
||||
|
||||
code = qExecTaskOpt(taskHandle, pResList, &useconds);
|
||||
if (code) {
|
||||
if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
|
||||
|
@ -150,8 +157,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
|||
}
|
||||
|
||||
_return:
|
||||
|
||||
taosArrayDestroy(pResList);
|
||||
taosArrayDestroyEx(pResList, freeBlock);
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
|
@ -915,13 +921,13 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
|||
|
||||
void *pIter = taosHashIterate(mgmt->schHash, NULL);
|
||||
while (pIter) {
|
||||
SQWSchStatus *sch = (SQWSchStatus *)pIter;
|
||||
if (NULL == sch->hbConnInfo.handle) {
|
||||
SQWSchStatus *sch1 = (SQWSchStatus *)pIter;
|
||||
if (NULL == sch1->hbConnInfo.handle) {
|
||||
uint64_t *sId = taosHashGetKey(pIter, NULL);
|
||||
QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
|
||||
|
||||
if (sch->hbBrokenTs > 0 && ((currentMs - sch->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) &&
|
||||
taosHashGetSize(sch->tasksHash) <= 0) {
|
||||
if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) &&
|
||||
taosHashGetSize(sch1->tasksHash) <= 0) {
|
||||
taosArrayPush(pExpiredSch, sId);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue