Merge branch 'fix/TD-30837' of https://github.com/taosdata/TDengine into fix/TD-30837
This commit is contained in:
commit
270e54b404
|
@ -113,7 +113,7 @@ enum {
|
|||
|
||||
enum {
|
||||
TASK_TRIGGER_STATUS__INACTIVE = 1,
|
||||
TASK_TRIGGER_STATUS__ACTIVE,
|
||||
TASK_TRIGGER_STATUS__MAY_ACTIVE,
|
||||
};
|
||||
|
||||
typedef enum {
|
||||
|
@ -294,9 +294,10 @@ typedef struct SStreamStatus {
|
|||
int32_t timerActive; // timer is active
|
||||
int64_t lastExecTs; // last exec time stamp
|
||||
int32_t inScanHistorySentinel;
|
||||
bool appendTranstateBlock; // has append the transfer state data block already
|
||||
bool appendTranstateBlock; // has appended the transfer state data block already
|
||||
bool removeBackendFiles; // remove backend files on disk when free stream tasks
|
||||
SConsenChkptInfo consenChkptInfo;
|
||||
STimeWindow latestForceWindow; // latest generated time window, only valid in
|
||||
} SStreamStatus;
|
||||
|
||||
typedef struct SDataRange {
|
||||
|
|
|
@ -238,7 +238,9 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32
|
|||
int64_t checkpointId, SRpcMsg* pMsg);
|
||||
|
||||
int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock);
|
||||
int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerType, int32_t trigger);
|
||||
int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger);
|
||||
int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval,
|
||||
STimeWindow* pLatestWindow, const char* id);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -749,7 +749,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
|||
|
||||
streamTaskCompleteCheckRsp(pInfo, true, id);
|
||||
|
||||
// not record the failed of the current task if try to close current vnode
|
||||
// not record the failure of the current task if try to close current vnode
|
||||
// otherwise, the put of message operation may incur invalid read of message queue.
|
||||
if (!pMeta->closeFlag) {
|
||||
int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
|
||||
|
|
|
@ -307,8 +307,9 @@ void streamFreeQitem(SStreamQueueItem* data) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerType, int32_t trigger) {
|
||||
int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, STimeWindow* pLatestWindow, const char* id) {
|
||||
QRY_PARAM_CHECK(pTrigger);
|
||||
int64_t ts = INT64_MIN;
|
||||
SStreamTrigger* p = NULL;
|
||||
|
||||
int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&p);
|
||||
|
@ -324,22 +325,49 @@ int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerTyp
|
|||
}
|
||||
|
||||
// let's calculate the previous time window
|
||||
// todo get the time precision for ts
|
||||
if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
|
||||
SInterval interval = {.interval = trigger, .sliding = trigger, .intervalUnit = 'a', .slidingUnit = 'a'};
|
||||
int64_t now = taosGetTimestampMs();
|
||||
SInterval interval = {.interval = trigger,
|
||||
.sliding = trigger,
|
||||
.intervalUnit = pInterval->intervalUnit,
|
||||
.slidingUnit = pInterval->slidingUnit};
|
||||
|
||||
ts = taosGetTimestampMs();
|
||||
|
||||
if (pLatestWindow->skey == INT64_MIN) {
|
||||
STimeWindow window = getAlignQueryTimeWindow(&interval, ts - trigger);
|
||||
|
||||
STimeWindow window = getAlignQueryTimeWindow(&interval, now - trigger);
|
||||
p->pBlock->info.window.skey = window.skey;
|
||||
p->pBlock->info.window.ekey = TMAX(now, window.ekey);
|
||||
p->pBlock->info.type = STREAM_GET_RESULT;
|
||||
stDebug("force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64,
|
||||
p->pBlock->info.window.skey, p->pBlock->info.window.ekey);
|
||||
p->pBlock->info.window.ekey = TMAX(ts, window.ekey);
|
||||
} else {
|
||||
p->pBlock->info.type = STREAM_GET_ALL;
|
||||
int64_t skey = pLatestWindow->skey + trigger;
|
||||
p->pBlock->info.window.skey = skey;
|
||||
p->pBlock->info.window.ekey = TMAX(ts, skey + trigger);
|
||||
}
|
||||
|
||||
*pTrigger = p;
|
||||
p->pBlock->info.type = STREAM_GET_RESULT;
|
||||
stDebug("s-task:%s force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, id,
|
||||
p->pBlock->info.window.skey, p->pBlock->info.window.ekey);
|
||||
|
||||
*pTrigger = p;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger) {
|
||||
QRY_PARAM_CHECK(pTrigger);
|
||||
SStreamTrigger* p = NULL;
|
||||
|
||||
int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&p);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
p->type = STREAM_INPUT__GET_RES;
|
||||
p->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
if (p->pBlock == NULL) {
|
||||
taosFreeQitem(p);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
p->pBlock->info.type = STREAM_GET_ALL;
|
||||
*pTrigger = p;
|
||||
return code;
|
||||
}
|
|
@ -352,7 +352,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
|
||||
(pTask->info.delaySchedParam != 0)) {
|
||||
(void)atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE,
|
||||
TASK_TRIGGER_STATUS__ACTIVE);
|
||||
TASK_TRIGGER_STATUS__MAY_ACTIVE);
|
||||
stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr,
|
||||
pTask->schedInfo.status);
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) {
|
|||
return;
|
||||
}
|
||||
|
||||
pTask->status.latestForceWindow = lastTimeWindow;
|
||||
pTask->info.delaySchedParam = interval.sliding;
|
||||
pTask->info.watermark = waterMark;
|
||||
pTask->info.interval = interval;
|
||||
|
@ -156,27 +157,71 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
|
|||
const char* id = pTask->id.idStr;
|
||||
int32_t nextTrigger = (int32_t)pTask->info.delaySchedParam;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
int32_t code = 0;
|
||||
|
||||
int8_t status = atomic_load_8(&pTask->schedInfo.status);
|
||||
stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
|
||||
|
||||
if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
stDebug("s-task:%s should stop, jump out of schedTimer", id);
|
||||
return;
|
||||
}
|
||||
|
||||
if (streamTaskShouldPause(pTask)) {
|
||||
stDebug("s-task:%s is paused, check in nextTrigger:%ds", id, nextTrigger/1000);
|
||||
streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
|
||||
"sched-run-tmr");
|
||||
}
|
||||
|
||||
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
|
||||
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger);
|
||||
} else {
|
||||
if ((status == TASK_TRIGGER_STATUS__ACTIVE) ||
|
||||
(pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE)) {
|
||||
SStreamTrigger* pTrigger;
|
||||
if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
SStreamTrigger* pTrigger = NULL;
|
||||
|
||||
int32_t code = streamCreateSinkResTrigger(&pTrigger, pTask->info.trigger, pTask->info.delaySchedParam);
|
||||
while (1) {
|
||||
code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval,
|
||||
&pTask->status.latestForceWindow, id);
|
||||
if (code != 0) {
|
||||
stError("s-task:%s failed to prepare force window close trigger, code:%s, try again in %dms", id,
|
||||
tstrerror(code), nextTrigger);
|
||||
goto _end;
|
||||
}
|
||||
|
||||
// in the force window close model, status trigger does not matter. So we do not set the trigger model
|
||||
code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to put retrieve aggRes block into q, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
goto _end;
|
||||
}
|
||||
|
||||
// check whether the time window gaps exist or not
|
||||
int64_t now = taosGetTimestamp(pTask->info.interval.precision);
|
||||
int64_t intervalEndTs = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval;
|
||||
|
||||
// there are gaps, needs to be filled
|
||||
STimeWindow w = pTrigger->pBlock->info.window;
|
||||
w.ekey = w.skey + pTask->info.interval.interval;
|
||||
if (w.skey <= pTask->status.latestForceWindow.skey) {
|
||||
stFatal("s-task:%s invalid new time window in force_window_close model, skey:%" PRId64
|
||||
" should be greater than latestForceWindow skey:%" PRId64,
|
||||
pTask->id.idStr, w.skey, pTask->status.latestForceWindow.skey);
|
||||
}
|
||||
|
||||
pTask->status.latestForceWindow = w;
|
||||
if (intervalEndTs + pTask->info.watermark + pTask->info.interval.interval > now) {
|
||||
break;
|
||||
} else {
|
||||
stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey);
|
||||
}
|
||||
}
|
||||
|
||||
} else if (status == TASK_TRIGGER_STATUS__MAY_ACTIVE) {
|
||||
SStreamTrigger* pTrigger = NULL;
|
||||
code = streamCreateSinkResTrigger(&pTrigger);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, tstrerror(code),
|
||||
nextTrigger);
|
||||
terrno = code;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
|
@ -187,11 +232,11 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
|
|||
stError("s-task:%s failed to put retrieve aggRes block into q, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
|
||||
code = streamTrySchedExec(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to sched to run, wait for next time", pTask->id.idStr);
|
||||
}
|
||||
code = streamTrySchedExec(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to sched to run, wait for next time", pTask->id.idStr);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print =============== create database
|
||||
sql create database test vgroups 2;
|
||||
sql select * from information_schema.ins_databases
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print $data00 $data01 $data02
|
||||
|
||||
sql use test
|
||||
sql create stable st(ts timestamp, a int) tags(t int);
|
||||
sql create table tu1 using st tags(1);
|
||||
|
||||
sql create stream stream1 trigger force_window_close into str_dst as select _wstart, count(*) from st partition by tbname interval(5s);
|
||||
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql insert into tu1 values(now, 1);
|
||||
sleep 5500
|
||||
|
||||
sql pause stream stream1
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop0:
|
||||
sleep 500
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
goto end_loop
|
||||
endi
|
||||
|
||||
sql insert into tu1 values(now, 1);
|
||||
goto loop0
|
||||
|
||||
end_loop:
|
||||
|
||||
sql resume stream stream1
|
||||
sql select * from str_dst
|
||||
|
||||
if $rows != 3 then
|
||||
print expect 3, actual: $rows
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -111,7 +111,7 @@ class TDTestCase:
|
|||
if partition:
|
||||
tdLog.info("create stream with partition by tag and tbname ")
|
||||
partition_elm_new = f'partition by {partition}, t1'
|
||||
self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_tag{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_tag{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
|
||||
self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_tag{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_tag{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, t1 as t_t1, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
|
||||
partition_elm_new = f'partition by {partition}, c1'
|
||||
self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_column1{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_column1{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, c1 as c_c1, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
|
||||
partition_elm_new = f'partition by {partition}, c2'
|
||||
|
@ -125,6 +125,7 @@ class TDTestCase:
|
|||
tdLog.info("create stream general table")
|
||||
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _irowts as irowts,tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.tb_name} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
|
||||
|
||||
# insert data
|
||||
self.tdCom.date_time = self.tdCom.genTs(precision=self.tdCom.precision)[0]
|
||||
start_time = self.tdCom.date_time
|
||||
time.sleep(1)
|
||||
|
@ -224,9 +225,13 @@ class TDTestCase:
|
|||
|
||||
# get query time range using interval count windows
|
||||
tdSql.query(f'select _wstart, _wend ,last(ts) from {self.stb_name} where ts >= {start_force_ts} and ts <= {end_ts} partition by tbname interval({self.tdCom.dataDict["interval"]}s)fill ({fill_value}) ')
|
||||
# getData don't support negative index
|
||||
end_new_ts = tdSql.getData(tdSql.queryRows-1, 1)
|
||||
end_last_but_one_ts = tdSql.getData(tdSql.queryRows-2, 1)
|
||||
#source data include that fill valuse is null and "_isfilled" column of the stream output is false
|
||||
tdSql.execute(f"insert into {self.ctb_name} (ts,c1) values(\"{end_new_ts}\",-102) ")
|
||||
tdSql.execute(f"insert into {self.tb_name} (ts,c1) values(\"{end_new_ts}\",-51) ")
|
||||
tdSql.execute(f"insert into {self.ctb_name} (ts,c1) values(\"{end_last_but_one_ts}\",NULL) ")
|
||||
|
||||
for i in range(self.tdCom.range_count):
|
||||
ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s'
|
||||
|
@ -240,6 +245,7 @@ class TDTestCase:
|
|||
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=ts_cast_delete_value)
|
||||
|
||||
# wait for the stream to process the data
|
||||
# print(self.tdCom.dataDict["interval"]*(final_range_count+2))
|
||||
time.sleep(self.tdCom.dataDict["interval"]*(final_range_count+2))
|
||||
|
||||
# check the data
|
||||
|
@ -247,11 +253,13 @@ class TDTestCase:
|
|||
print(tbname)
|
||||
tdSql.query(f'select _wstart, _wend ,last(ts) from {tbname} where ts >= {start_force_ts} and ts <= {end_ts} partition by tbname interval({self.tdCom.dataDict["interval"]}s)fill ({fill_value}) ')
|
||||
start_new_ts = tdSql.getData(0, 1)
|
||||
if tbname != self.tb_name:
|
||||
if tbname == self.ctb_name:
|
||||
if "value" in fill_value.lower():
|
||||
fill_value='VALUE,1,2,3,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
|
||||
if partition == "tbname":
|
||||
# print(self.tdCom.dataDict["interval"]*(final_range_count+2))
|
||||
self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
|
||||
elif tbname == self.stb_name:
|
||||
if partition == "tbname":
|
||||
self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
|
||||
# check tag and tbname filter
|
||||
self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {self.stb_stream_des_where_tag_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tag} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
|
||||
|
@ -262,6 +270,18 @@ class TDTestCase:
|
|||
if partition == "tbname":
|
||||
self.tdCom.check_query_data(f'select irowts, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts , _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
|
||||
|
||||
# Recreate a sub-table that meets the filtering "where_tag" and check if the streaming results are automatically included within it."
|
||||
where_tag_ctbname = f"{self.ctb_name}_where_tag"
|
||||
tdSql.execute(f"create table {where_tag_ctbname} using {self.stb_name} (t1) tags({tag_t1_value}) ")
|
||||
where_tag_timestamp = self.tdCom.genTs(precision=self.tdCom.precision)[0]
|
||||
where_tag_ts_start_value = str(where_tag_timestamp)+ '+2s'
|
||||
self.tdCom.sinsert_rows(tbname=where_tag_ctbname, ts_value=where_tag_ts_start_value)
|
||||
time.sleep(self.tdCom.dataDict["interval"]+5)
|
||||
tdSql.query(f'select distinct(table_name) from {self.stb_stream_des_where_tag_table} where table_name="{where_tag_ctbname}"')
|
||||
tdSql.checkEqual(tdSql.queryResult[0][0], where_tag_ctbname)
|
||||
|
||||
|
||||
|
||||
if self.delete:
|
||||
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=start_ts, end_ts=ts_cast_delete_value)
|
||||
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=start_ts, end_ts=ts_cast_delete_value)
|
||||
|
|
Loading…
Reference in New Issue