Merge branch 'mark/tmq' of https://github.com/taosdata/TDengine into mark/tmq

This commit is contained in:
wangmm0220 2023-08-09 15:47:42 +08:00
commit 1bf2d58629
23 changed files with 250 additions and 315 deletions

View File

@ -98,7 +98,7 @@ Provides information about user-created databases. Similar to SHOW DATABASES.
| 21 | cachesize | INT | Memory per vnode used for caching the newest data. It should be noted that `cachesize` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 21 | cachesize | INT | Memory per vnode used for caching the newest data. It should be noted that `cachesize` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 22 | wal_level | INT | WAL level. It should be noted that `wal_level` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 22 | wal_level | INT | WAL level. It should be noted that `wal_level` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 23 | wal_fsync_period | INT | Interval at which WAL is written to disk. It should be noted that `wal_fsync_period` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 23 | wal_fsync_period | INT | Interval at which WAL is written to disk. It should be noted that `wal_fsync_period` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 24 | wal_retention_period | INT | WAL retention period. It should be noted that `wal_retention_period` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 24 | wal_retention_period | INT | WAL retention period, in second. It should be noted that `wal_retention_period` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 25 | wal_retention_size | INT | Maximum WAL size. It should be noted that `wal_retention_size` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 25 | wal_retention_size | INT | Maximum WAL size. It should be noted that `wal_retention_size` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 26 | stt_trigger | SMALLINT | The threshold for number of files to trigger file merging. It should be noted that `stt_trigger` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 26 | stt_trigger | SMALLINT | The threshold for number of files to trigger file merging. It should be noted that `stt_trigger` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 27 | table_prefix | SMALLINT | The prefix length in the table name that is ignored when distributing table to vnode based on table name. It should be noted that `table_prefix` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 27 | table_prefix | SMALLINT | The prefix length in the table name that is ignored when distributing table to vnode based on table name. It should be noted that `table_prefix` is a TDengine keyword and needs to be escaped with ` when used as a column name. |

View File

@ -79,6 +79,12 @@ Parameter Description:
- tz: Optional parameter that specifies the timezone of the returned time, following the IANA Time Zone rules, e.g. `America/New_York`. - tz: Optional parameter that specifies the timezone of the returned time, following the IANA Time Zone rules, e.g. `America/New_York`.
- req_id: Optional parameter that specifies the request id for tracing. - req_id: Optional parameter that specifies the request id for tracing.
:::note
URL Encoding. Make sure that parameters are properly encoded. For example, when specifying a timezone you must properly encode special characters. ?tz=Etc/GMT+10 will not work because the <+> plus symbol is recognized as a space in the url. It's best practice to encode all special characters in a parameter. Instead use ?tz=Etc%2FGMT%2B10 for the parameter.
:::
For example, `http://h1.taos.com:6041/rest/sql/test` is a URL to `h1.taos.com:6041` and sets the default database name to `test`. For example, `http://h1.taos.com:6041/rest/sql/test` is a URL to `h1.taos.com:6041` and sets the default database name to `test`.
TDengine supports both Basic authentication and custom authentication mechanisms, and subsequent versions will provide a standard secure digital signature mechanism for authentication. TDengine supports both Basic authentication and custom authentication mechanisms, and subsequent versions will provide a standard secure digital signature mechanism for authentication.

View File

@ -31,7 +31,8 @@ Websocket connections are supported on all platforms that can run Go.
| connector-rust version | TDengine version | major features | | connector-rust version | TDengine version | major features |
| :----------------: | :--------------: | :--------------------------------------------------: | | :----------------: | :--------------: | :--------------------------------------------------: |
| v0.8.12 | 3.0.5.0 or later | TMQ: Get consuming progress and seek offset to consume. | | v0.9.2 | 3.0.7.0 or later | STMT: Get tag_fields and col_fields under ws. |
| v0.8.12 | 3.0.5.0 | TMQ: Get consuming progress and seek offset to consume. |
| v0.8.0 | 3.0.4.0 | Support schemaless insert. | | v0.8.0 | 3.0.4.0 | Support schemaless insert. |
| v0.7.6 | 3.0.3.0 | Support req_id in query. | | v0.7.6 | 3.0.3.0 | Support req_id in query. |
| v0.6.0 | 3.0.0.0 | Base features. | | v0.6.0 | 3.0.0.0 | Base features. |

View File

@ -338,7 +338,7 @@ Remark:
Equivalent function: sum Equivalent function: sum
```sql ```sql
Select max(value) from (select first(val) value from table_name interval(10s) fill(linear)) interval(10s) Select sum(value) from (select first(val) value from table_name interval(10s) fill(linear)) interval(10s)
``` ```
Note: This function has no interpolation requirements, so it can be directly calculated. Note: This function has no interpolation requirements, so it can be directly calculated.

View File

@ -30,7 +30,8 @@ Websocket 连接支持所有能运行 Rust 的平台。
| Rust 连接器版本 | TDengine 版本 | 主要功能 | | Rust 连接器版本 | TDengine 版本 | 主要功能 |
| :----------------: | :--------------: | :--------------------------------------------------: | | :----------------: | :--------------: | :--------------------------------------------------: |
| v0.8.12 | 3.0.5.0 or later | 消息订阅:获取消费进度及按照指定进度开始消费。 | | v0.9.2 | 3.0.7.0 or later | STMTws 下获取 tag_fields、col_fields。 |
| v0.8.12 | 3.0.5.0 | 消息订阅:获取消费进度及按照指定进度开始消费。 |
| v0.8.0 | 3.0.4.0 | 支持无模式写入。 | | v0.8.0 | 3.0.4.0 | 支持无模式写入。 |
| v0.7.6 | 3.0.3.0 | 支持在请求中使用 req_id。 | | v0.7.6 | 3.0.3.0 | 支持在请求中使用 req_id。 |
| v0.6.0 | 3.0.0.0 | 基础功能。 | | v0.6.0 | 3.0.0.0 | 基础功能。 |

View File

@ -98,7 +98,7 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| 21 | cachesize | INT | 表示每个 vnode 中用于缓存子表最近数据的内存大小。需要注意,`cachesize` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 21 | cachesize | INT | 表示每个 vnode 中用于缓存子表最近数据的内存大小。需要注意,`cachesize` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 22 | wal_level | INT | WAL 级别。需要注意,`wal_level` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 22 | wal_level | INT | WAL 级别。需要注意,`wal_level` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 23 | wal_fsync_period | INT | 数据落盘周期。需要注意,`wal_fsync_period` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 23 | wal_fsync_period | INT | 数据落盘周期。需要注意,`wal_fsync_period` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 24 | wal_retention_period | INT | WAL 的保存时长。需要注意,`wal_retention_period` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 24 | wal_retention_period | INT | WAL 的保存时长,单位为秒。需要注意,`wal_retention_period` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 25 | wal_retention_size | INT | WAL 的保存上限。需要注意,`wal_retention_size` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 25 | wal_retention_size | INT | WAL 的保存上限。需要注意,`wal_retention_size` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 26 | stt_trigger | SMALLINT | 触发文件合并的落盘文件的个数。需要注意,`stt_trigger` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 26 | stt_trigger | SMALLINT | 触发文件合并的落盘文件的个数。需要注意,`stt_trigger` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 27 | table_prefix | SMALLINT | 内部存储引擎根据表名分配存储该表数据的 VNODE 时要忽略的前缀的长度。需要注意,`table_prefix` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 27 | table_prefix | SMALLINT | 内部存储引擎根据表名分配存储该表数据的 VNODE 时要忽略的前缀的长度。需要注意,`table_prefix` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |

View File

@ -371,7 +371,7 @@ Select min(val) from table_name
等效函数sum 等效函数sum
```sql ```sql
Select max(value) from (select first(val) value from table_name interval(10s) fill(linear)) interval(10s) Select sum(value) from (select first(val) value from table_name interval(10s) fill(linear)) interval(10s)
``` ```
备注:该函数无插值需求,因此可用直接计算。 备注:该函数无插值需求,因此可用直接计算。

View File

@ -45,7 +45,6 @@ enum {
TASK_STATUS__FAIL, TASK_STATUS__FAIL,
TASK_STATUS__STOP, TASK_STATUS__STOP,
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal
TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__HALT, // pause, but not be manipulated by user command
TASK_STATUS__PAUSE, // pause TASK_STATUS__PAUSE, // pause
}; };

View File

@ -1251,7 +1251,8 @@ TEST(clientCase, td_25129) {
} }
for(int i = 0; i < numOfAssign; i++){ for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId);
printf("assign i:%d, vgId:%d, committed:%lld, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, committed, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
} }
while (1) { while (1) {

View File

@ -42,7 +42,7 @@
static SDnode globalDnode = {0}; static SDnode globalDnode = {0};
static const char *dmOS[10] = {"Ubuntu", "CentOS Linux", "Red Hat", "Debian GNU", "CoreOS", static const char *dmOS[10] = {"Ubuntu", "CentOS Linux", "Red Hat", "Debian GNU", "CoreOS",
"FreeBSD", "openSUSE", "SLES", "Fedora", "MacOS"}; "FreeBSD", "openSUSE", "SLES", "Fedora", "macOS"};
SDnode *dmInstance() { return &globalDnode; } SDnode *dmInstance() { return &globalDnode; }

View File

@ -692,6 +692,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
taosArrayDestroy(rebOutput.modifyConsumers); taosArrayDestroy(rebOutput.modifyConsumers);
taosArrayDestroy(rebOutput.rebVgs); taosArrayDestroy(rebOutput.rebVgs);
taosHashCancelIterate(pReq->rebSubHash, pIter);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mInfo("mq re-balance failed, due to out of memory"); mInfo("mq re-balance failed, due to out of memory");
taosHashCleanup(pReq->rebSubHash); taosHashCleanup(pReq->rebSubHash);

View File

@ -346,9 +346,9 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
rsp.status = streamTaskCheckStatus(pTask); rsp.status = streamTaskCheckStatus(pTask);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
qDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d", const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else { } else {
rsp.status = 0; rsp.status = 0;
qDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 qDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64

View File

@ -336,10 +336,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
STqOffset* pOffset = &vgOffset.offset; STqOffset* pOffset = &vgOffset.offset;
if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) { if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64, tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts); pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
} else if (pOffset->val.type == TMQ_OFFSET__LOG) { } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId, tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
pOffset->val.version); pOffset->val.version);
} else { } else {
tqError("invalid commit offset type:%d", pOffset->val.type); tqError("invalid commit offset type:%d", pOffset->val.type);
@ -367,12 +367,12 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
SRpcMsg rsp = {.info = pMsg->info}; SRpcMsg rsp = {.info = pMsg->info};
int code = 0; int code = 0;
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) { if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
} }
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey); tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
@ -1049,9 +1049,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
rsp.status = streamTaskCheckStatus(pTask); rsp.status = streamTaskCheckStatus(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d", const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, tqDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else { } else {
rsp.status = 0; rsp.status = 0;
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
@ -1153,7 +1153,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// 3. It's an fill history task, do nothing. wait for the main task to start it // 3. It's an fill history task, do nothing. wait for the main task to start it
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
if (p != NULL) { // reset the downstreamReady flag. if (p != NULL) { // reset the downstreamReady flag.
p->status.downstreamReady = 0;
streamTaskCheckDownstreamTasks(p); streamTaskCheckDownstreamTasks(p);
} }
@ -1162,12 +1161,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
} }
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
int32_t code = TSDB_CODE_SUCCESS; SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
char* msg = pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg;
int32_t code = TSDB_CODE_SUCCESS;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed", tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
@ -1175,12 +1172,20 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return -1; return -1;
} }
// do recovery step 1 // do recovery step1
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus); tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus);
int64_t st = taosGetTimestampMs(); if (pTask->tsInfo.step1Start == 0) {
ASSERT(pTask->status.pauseAllowed == false);
pTask->tsInfo.step1Start = taosGetTimestampMs();
if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask);
}
} else {
tqDebug("s-task:%s resume from paused, start ts:%"PRId64, pTask->id.idStr, pTask->tsInfo.step1Start);
}
// we have to continue retrying to successfully execute the scan history task. // we have to continue retrying to successfully execute the scan history task.
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
@ -1193,31 +1198,21 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
ASSERT(pTask->status.pauseAllowed == false);
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask); ASSERT(pTask->status.pauseAllowed == true);
} }
if (!streamTaskRecoverScanStep1Finished(pTask)) { streamSourceScanHistoryData(pTask);
streamSourceScanHistoryData(pTask); if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
} double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el,
// disable the pause when handling the step2 scan of tsdb data. TASK_SCHED_STATUS__INACTIVE);
// the whole next procedure cann't be stopped.
// todo fix it: the following procedure should be executed completed and then shutdown when trying to close vnode.
if (pTask->info.fillHistory == 1) {
streamTaskDisablePause(pTask);
}
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
streamMetaReleaseTask(pMeta, pTask);
return 0; return 0;
} }
double el = (taosGetTimestampMs() - st) / 1000.0; // the following procedure should be executed, no matter status is stop/pause or not
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el); tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el);
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
@ -1225,77 +1220,71 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pStreamTask = NULL; SStreamTask* pStreamTask = NULL;
bool done = false; bool done = false;
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { // 1. get the related stream task
// 1. stop the related stream task, get the current scan wal version of stream task, ver. pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) {
if (pStreamTask == NULL) { // todo delete this task, if the related stream task is dropped
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr); pTask->streamTaskId.taskId, pTask->id.idStr);
pTask->status.taskStatus = TASK_STATUS__DROPPING; tqDebug("s-task:%s fill-history task set status to be dropping", id);
tqDebug("s-task:%s fill-history task set status to be dropping", id);
streamMetaSaveTask(pMeta, pTask); streamMetaUnregisterTask(pMeta, pTask->id.taskId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return -1; return -1;
}
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
// stream task in TASK_STATUS__SCAN_HISTORY can not be paused.
// wait for the stream task get ready for scan history data
while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug(
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
taosMsleep(100);
}
// now we can stop the stream task execution
streamTaskHalt(pStreamTask);
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
// if it's an source task, extract the last version in wal.
pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
} }
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
// 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the
// stream task get ready for scan history data
while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug(
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
taosMsleep(100);
}
// now we can stop the stream task execution
streamTaskHalt(pStreamTask);
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
// if it's an source task, extract the last version in wal.
pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
if (done) { if (done) {
pTask->tsInfo.step2Start = taosGetTimestampMs(); pTask->tsInfo.step2Start = taosGetTimestampMs();
streamTaskEndScanWAL(pTask); streamTaskEndScanWAL(pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} else { } else {
if (!streamTaskRecoverScanStep1Finished(pTask)) { STimeWindow* pWindow = &pTask->dataRange.window;
STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 ", do secondary scan-history from WAL after halt the related stream task:%s",
", do secondary scan-history from WAL after halt the related stream task:%s", id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
pTask->tsInfo.step2Start = taosGetTimestampMs(); pTask->tsInfo.step2Start = taosGetTimestampMs();
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
}
if (!streamTaskRecoverScanStep2Finished(pTask)) { int64_t dstVer = pTask->dataRange.range.minVer - 1;
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL;
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
int64_t dstVer = pTask->dataRange.range.minVer - 1; pTask->chkInfo.currentVer = dstVer;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
pTask->chkInfo.currentVer = dstVer; tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
tqDebug("s-task:%s wal reader start scan from WAL ver:%" PRId64 ", set sched-status:%d", id, dstVer,
TASK_SCHED_STATUS__INACTIVE);
}
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
// set the fill-history task to be normal
if (pTask->info.fillHistory == 1) {
streamSetStatusNormal(pTask);
}
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
// 5. resume the related stream task. // 5. resume the related stream task.
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
@ -1312,7 +1301,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (pTask->historyTaskId.taskId == 0) { if (pTask->historyTaskId.taskId == 0) {
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; *pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
tqDebug( tqDebug(
"s-task:%s scanhistory in stream time window completed, no related fill-history task, reset the time " "s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time "
"window:%" PRId64 " - %" PRId64, "window:%" PRId64 " - %" PRId64,
id, pWindow->skey, pWindow->ekey); id, pWindow->skey, pWindow->ekey);
qResetStreamInfoTimeWindow(pTask->exec.pExecutor); qResetStreamInfoTimeWindow(pTask->exec.pExecutor);
@ -1508,7 +1497,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
if (pTask != NULL) { if (pTask != NULL) {
// even in halt status, the data in inputQ must be processed // even in halt status, the data in inputQ must be processed
int8_t st = pTask->status.taskStatus; int8_t st = pTask->status.taskStatus;
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__SCAN_HISTORY_WAL) { if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY/* || st == TASK_STATUS__SCAN_HISTORY_WAL*/) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.version); pTask->chkInfo.version);
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
@ -1645,7 +1634,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
} }
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory) { if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
streamStartRecoverTask(pTask, igUntreated); streamStartRecoverTask(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);

View File

@ -211,7 +211,7 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 "reach the maximum ver:%" PRId64 qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
", not scan wal anymore, set the transfer state flag", ", not scan wal anymore, set the transfer state flag",
pTask->id.idStr, ver, pTask->dataRange.range.maxVer); pTask->id.idStr, ver, pTask->dataRange.range.maxVer);
pTask->status.transferState = true; pTask->status.transferState = true;
@ -251,19 +251,19 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int32_t status = pTask->status.taskStatus; int32_t status = pTask->status.taskStatus;
// non-source or fill-history tasks don't need to response the WAL scan action. // non-source or fill-history tasks don't need to response the WAL scan action.
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__SCAN_HISTORY_WAL) { if (status != TASK_STATUS__NORMAL) {
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
if ((pTask->info.fillHistory == 1) && pTask->status.transferState) { if ((pTask->info.fillHistory == 1) && pTask->status.transferState) {
ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL); ASSERT(status == TASK_STATUS__NORMAL);
// the maximum version of data in the WAL has reached already, the step2 is done // the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
pTask->dataRange.range.maxVer); pTask->dataRange.range.maxVer);

View File

@ -1028,55 +1028,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
return code; return code;
} }
/*
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
int32_t code = 0;
SLRUCache *pCache = pTsdb->lruCache;
SArray *pCidList = pr->pCidList;
int num_keys = TARRAY_SIZE(pCidList);
for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = NULL;
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (!h) {
taosThreadMutexLock(&pTsdb->lruMutex);
h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (!h) {
pLastCol = tsdbCacheLoadCol(pTsdb, pr, pr->pSlotIds[i], uid, cid, ltype);
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pCache, key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, &h,
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
}
pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol;
reallocVarData(&lastCol.colVal);
if (h) {
taosLRUCacheRelease(pCache, h, false);
}
taosArrayPush(pLastArray, &lastCol);
}
return code;
}
*/
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
int32_t code = 0; int32_t code = 0;
// fetch schema // fetch schema
@ -1108,6 +1060,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *));
taosThreadMutexLock(&pTsdb->lruMutex);
taosThreadMutexLock(&pTsdb->rCache.rMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksMayWrite(pTsdb, true, false, false); rocksMayWrite(pTsdb, true, false, false);
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
@ -1137,7 +1090,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksdb_free(values_list[i]); rocksdb_free(values_list[i]);
rocksdb_free(values_list[i + num_keys]); rocksdb_free(values_list[i + num_keys]);
taosThreadMutexLock(&pTsdb->lruMutex); // taosThreadMutexLock(&pTsdb->lruMutex);
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen); LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
if (h) { if (h) {
@ -1159,7 +1112,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
} }
taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen); taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen);
taosThreadMutexUnlock(&pTsdb->lruMutex); // taosThreadMutexUnlock(&pTsdb->lruMutex);
} }
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
taosMemoryFree(keys_list[i]); taosMemoryFree(keys_list[i]);
@ -1171,6 +1124,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksMayWrite(pTsdb, true, false, true); rocksMayWrite(pTsdb, true, false, true);
taosThreadMutexUnlock(&pTsdb->lruMutex);
_exit: _exit:
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
@ -1311,62 +1266,7 @@ int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
return code; return code;
} }
/*
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
// getTableCacheKey(uid, "lr", key, &keyLen);
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
}
// getTableCacheKey(uid, "l", key, &keyLen);
getTableCacheKey(uid, 1, key, &keyLen);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
return code;
}
*/
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) { int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) {
int32_t code = 0; int32_t code = 0;
STSRow *cacheRow = NULL; STSRow *cacheRow = NULL;
@ -1767,6 +1667,10 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
} }
if (record.version <= pReader->info.verRange.maxVer) { if (record.version <= pReader->info.verRange.maxVer) {
/*
tsdbError("tomb xx load/cache: vgId:%d fid:%d commit %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);
*/
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pInfo->pTombData, &delData); taosArrayPush(pInfo->pTombData, &delData);
} }
@ -1977,9 +1881,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
goto _err; goto _err;
} }
loadDataTomb(state->pr, state->pr->pFileReader);
state->pr->pCurFileSet = state->pFileSet; state->pr->pCurFileSet = state->pFileSet;
loadDataTomb(state->pr, state->pr->pFileReader);
} }
if (!state->pIndexList) { if (!state->pIndexList) {
@ -2017,6 +1921,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
state->iBrinIndex = indexSize; state->iBrinIndex = indexSize;
} }
if (state->pFileSet != state->pr->pCurFileSet) {
state->pr->pCurFileSet = state->pFileSet;
}
code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid, code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid,
state->pr, state->lastTs, aCols, nCols); state->pr, state->lastTs, aCols, nCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -624,6 +624,11 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
// return tqProcessPollReq(pVnode->pTq, pMsg); // return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_WALINFO: case TDMT_VND_TMQ_VG_WALINFO:
return tqProcessVgWalInfoReq(pVnode->pTq, pMsg); return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_COMMITTEDINFO:
return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_SEEK:
return tqProcessSeekReq(pVnode->pTq, pMsg);
default: default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType); vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;

View File

@ -341,6 +341,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
return NULL; return NULL;
} }
qResetStreamInfoTimeWindow(pTaskInfo);
return pTaskInfo; return pTaskInfo;
} }

View File

@ -1550,10 +1550,86 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
} }
} }
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) { static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) {
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
if (pWindow->skey != INT64_MIN) {
qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey);
ASSERT(pCol->pData != NULL);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
p[i] = (*ts >= pWindow->skey);
if (!p[i]) {
hasUnqualified = true;
}
}
} else if (pWindow->ekey != INT64_MAX) {
qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
p[i] = (*ts <= pWindow->ekey);
if (!p[i]) {
hasUnqualified = true;
}
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
taosMemoryFree(p);
}
}
// re-build the delete block, ONLY according to the split timestamp
static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) {
if (skey == INT64_MIN) {
return;
}
int32_t numOfRows = pBlock->info.rows;
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData;
SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData;
for (int32_t i = 0; i < numOfRows; i++) {
if (tsStartCol[i] < skey) {
tsStartCol[i] = skey;
}
if (tsEndCol[i] >= skey) {
p[i] = true;
} else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified = true;
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
taosMemoryFree(p);
}
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STimeWindow* pTimeWindow, bool filter) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
SOperatorInfo* pOperator = pInfo->pStreamScanOp; SOperatorInfo* pOperator = pInfo->pStreamScanOp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
const char* id = GET_TASKID(pTaskInfo);
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
@ -1593,7 +1669,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) { if (pInfo->numOfPseudoExpr > 0) {
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
pBlockInfo->rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache); pBlockInfo->rows, id, &pTableScanInfo->base.metaCache);
// ignore the table not exists error, since this table may have been dropped during the scan procedure. // ignore the table not exists error, since this table may have been dropped during the scan procedure.
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
blockDataFreeRes((SSDataBlock*)pBlock); blockDataFreeRes((SSDataBlock*)pBlock);
@ -1608,8 +1684,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
} }
// filter the block extracted from WAL files, according to the time window apply additional time window filter
doBlockDataWindowFilter(pInfo->pRes, pInfo->primaryTsIndex, pTimeWindow, id);
pInfo->pRes->info.dataLoad = 1; pInfo->pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pInfo->pRes->info.rows == 0) {
return 0;
}
calBlockTbName(pInfo, pInfo->pRes); calBlockTbName(pInfo, pInfo->pRes);
return 0; return 0;
@ -1666,7 +1748,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version); pTaskInfo->streamInfo.currentOffset.version);
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
setBlockIntoRes(pInfo, pRes, true); STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX};
setBlockIntoRes(pInfo, pRes, &defaultWindow, true);
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes; return pInfo->pRes;
} }
@ -1775,80 +1858,6 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
} }
} }
static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) {
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
if (pWindow->skey != INT64_MIN) {
qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
p[i] = (*ts >= pWindow->skey);
if (!p[i]) {
hasUnqualified = true;
}
}
} else if (pWindow->ekey != INT64_MAX) {
qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
p[i] = (*ts <= pWindow->ekey);
if (!p[i]) {
hasUnqualified = true;
}
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
taosMemoryFree(p);
}
}
// re-build the delete block, ONLY according to the split timestamp
static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) {
if (skey == INT64_MIN) {
return;
}
int32_t numOfRows = pBlock->info.rows;
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData;
SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData;
for (int32_t i = 0; i < numOfRows; i++) {
if (tsStartCol[i] < skey) {
tsStartCol[i] = skey;
}
if (tsEndCol[i] >= skey) {
p[i] = true;
} else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified = true;
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
taosMemoryFree(p);
}
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not // NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -2121,8 +2130,7 @@ FETCH_NEXT_BLOCK:
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
SSDataBlock* pBlock = pInfo->pRes; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
SDataBlockInfo* pBlockInfo = &pBlock->info;
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
NEXT_SUBMIT_BLK: NEXT_SUBMIT_BLK:
@ -2146,21 +2154,23 @@ FETCH_NEXT_BLOCK:
} }
} }
blockDataCleanup(pBlock); blockDataCleanup(pInfo->pRes);
while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) { while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) {
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id); int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id);
qDebug("retrieve data from submit completed code:%s, rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, qDebug("retrieve data from submit completed code:%s rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, id);
id);
if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) { if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) {
qDebug("retrieve data failed, try next block in submit block, %s", id); qDebug("retrieve data failed, try next block in submit block, %s", id);
continue; continue;
} }
setBlockIntoRes(pInfo, pRes, false); setBlockIntoRes(pInfo, pRes, &pStreamInfo->fillHistoryWindow, false);
if (pInfo->pRes->info.rows == 0) {
continue;
}
if (pInfo->pCreateTbRes->info.rows > 0) { if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
@ -2168,13 +2178,8 @@ FETCH_NEXT_BLOCK:
return pInfo->pCreateTbRes; return pInfo->pCreateTbRes;
} }
// apply additional time window filter doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows; int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows;
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes); qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes);
@ -2196,7 +2201,7 @@ FETCH_NEXT_BLOCK:
qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id); qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id);
if (pBlockInfo->rows > 0) { if (pBlockInfo->rows > 0) {
return pBlock; return pInfo->pRes;
} }
if (pInfo->pUpdateDataRes->info.rows > 0) { if (pInfo->pUpdateDataRes->info.rows > 0) {
@ -2587,7 +2592,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate; pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
pInfo->igExpired = pTableScanNode->igExpired; pInfo->igExpired = pTableScanNode->igExpired;
pInfo->twAggSup.maxTs = INT64_MIN; pInfo->twAggSup.maxTs = INT64_MIN;
pInfo->pState = NULL; pInfo->pState = pTaskInfo->streamInfo.pState;
pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn; pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;

View File

@ -3736,7 +3736,6 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true);
if (winNum > 0) { if (winNum > 0) {
saveSessionOutputBuf(pAggSup, &winInfo);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(winInfo, pInfo->pStUpdated); saveResult(winInfo, pInfo->pStUpdated);
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
@ -3747,9 +3746,8 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
getSessionHashKey(&winInfo.sessionWin, &key); getSessionHashKey(&winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
} }
} else {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)winInfo.pOutputBuf, &pAggSup->stateStore);
} }
saveSessionOutputBuf(pAggSup, &winInfo);
} }
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
@ -4398,7 +4396,6 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo); setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) { if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) {
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated); compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated);
saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(curInfo.winInfo, pInfo->pSeUpdated); saveResult(curInfo.winInfo, pInfo->pSeUpdated);
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
@ -4412,7 +4409,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
} }
if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)curInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
} }
if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) { if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) {

View File

@ -379,7 +379,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
return -1; return -1;
} }
qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyStreamDataBlock((SStreamDataBlock*) pItem); destroyStreamDataBlock((SStreamDataBlock*) pItem);

View File

@ -172,6 +172,12 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
bool finished = false; bool finished = false;
while (1) { while (1) {
if (streamTaskShouldPause(&pTask->status)) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
return 0;
}
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) { if (pRes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -404,6 +410,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState(pTask); streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask); streamTaskReloadState(pStreamTask);
// clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0;
streamTaskResumeFromHalt(pStreamTask); streamTaskResumeFromHalt(pStreamTask);
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
@ -414,6 +422,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
// save to disk // save to disk
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pStreamTask); streamMetaSaveTask(pMeta, pStreamTask);
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
// persist to disk // persist to disk
@ -615,7 +624,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
// todo the task should be commit here // todo the task should be commit here
if (taosQueueEmpty(pTask->inputQueue->queue)) { if (taosQueueEmpty(pTask->inputQueue->queue)) {
// fill-history WAL scan has completed // fill-history WAL scan has completed
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && pTask->status.transferState == true) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) {
streamTaskRecoverSetAllStepFinished(pTask); streamTaskRecoverSetAllStepFinished(pTask);
streamTaskEndScanWAL(pTask); streamTaskEndScanWAL(pTask);
} else { } else {

View File

@ -85,6 +85,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask); streamSetParamForScanHistory(pTask);
} }
streamTaskEnablePause(pTask);
streamTaskScanHistoryPrepare(pTask); streamTaskScanHistoryPrepare(pTask);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
@ -839,7 +840,7 @@ void streamTaskPause(SStreamTask* pTask) {
return; return;
} }
while(!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) { while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
status = pTask->status.taskStatus; status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING) { if (status == TASK_STATUS__DROPPING) {
qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
@ -856,8 +857,19 @@ void streamTaskPause(SStreamTask* pTask) {
taosMsleep(100); taosMsleep(100);
} }
// todo: use the lock of the task.
taosWLockLatch(&pMeta->lock);
status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
taosWUnLockLatch(&pMeta->lock);
qDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr);
return;
}
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
taosWUnLockLatch(&pMeta->lock);
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr, qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,

View File

@ -450,7 +450,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3 #,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3
,,n,system-test,python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3 ,,n,system-test,python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3
,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1 #,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 #,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5