Merge branch 'fix/TD-30837' of https://github.com/taosdata/TDengine into fix/TD-30837

This commit is contained in:
chenhaoran 2024-10-28 18:55:23 +08:00
commit 25350fd338
11 changed files with 118 additions and 29 deletions

View File

@ -113,7 +113,7 @@ enum {
enum {
TASK_TRIGGER_STATUS__INACTIVE = 1,
TASK_TRIGGER_STATUS__ACTIVE,
TASK_TRIGGER_STATUS__MAY_ACTIVE,
};
typedef enum {
@ -294,9 +294,10 @@ typedef struct SStreamStatus {
int32_t timerActive; // timer is active
int64_t lastExecTs; // last exec time stamp
int32_t inScanHistorySentinel;
bool appendTranstateBlock; // has append the transfer state data block already
bool appendTranstateBlock; // has appended the transfer state data block already
bool removeBackendFiles; // remove backend files on disk when free stream tasks
SConsenChkptInfo consenChkptInfo;
STimeWindow latestForceWindow; // latest generated time window, only valid in
} SStreamStatus;
typedef struct SDataRange {

View File

@ -1099,6 +1099,7 @@ static int32_t getOpratorIntervalInfo(SOperatorInfo* pOperator, int64_t* pWaterM
SStreamScanInfo* pScanOp = (SStreamScanInfo*) pOperator->info;
*pWaterMark = pScanOp->twAggSup.waterMark;
*pInterval = pScanOp->interval;
*pLastWindow = pScanOp->lastScanRange;
return TSDB_CODE_SUCCESS;
}

View File

@ -4583,6 +4583,8 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
SET_WIN_KEY_INVALID(pInfo->lastScanRange.skey);
SET_WIN_KEY_INVALID(pInfo->lastScanRange.ekey);
// for stream
if (pTaskInfo->streamInfo.pState) {
void* buff = NULL;

View File

@ -1315,8 +1315,9 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
pkLen = colDataGetRowLength(pPkColDataInfo, startPos);
}
if (pInfo->ignoreExpiredData && checkExpiredData(&pAggSup->stateStore, pAggSup->pUpdateInfo, &pInfo->twAggSup,
pBlock->info.id.uid, tsCols[startPos], pPkVal, pkLen)) {
if (pInfo->twAggSup.calTrigger != STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pInfo->ignoreExpiredData &&
checkExpiredData(&pAggSup->stateStore, pAggSup->pUpdateInfo, &pInfo->twAggSup, pBlock->info.id.uid,
tsCols[startPos], pPkVal, pkLen)) {
qDebug("===stream===ignore expired data, window end ts:%" PRId64 ", maxts - wartermak:%" PRId64, tsCols[startPos],
pInfo->twAggSup.maxTs - pInfo->twAggSup.waterMark);
continue;

View File

@ -11008,6 +11008,13 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
}
}
}
if ((SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta &&
TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!hasTbnameFunction(pSelect->pPartitionByList)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"When trigger was force window close, Super table must patitioned by table name");
}
}
if (NULL != pSelect->pGroupByList) {

View File

@ -238,7 +238,9 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32
int64_t checkpointId, SRpcMsg* pMsg);
int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock);
int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerType, int32_t trigger);
int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger);
int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval,
STimeWindow* pLatestWindow);
#ifdef __cplusplus
}

View File

@ -749,7 +749,7 @@ void rspMonitorFn(void* param, void* tmrId) {
streamTaskCompleteCheckRsp(pInfo, true, id);
// not record the failed of the current task if try to close current vnode
// not record the failure of the current task if try to close current vnode
// otherwise, the put of message operation may incur invalid read of message queue.
if (!pMeta->closeFlag) {
int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);

View File

@ -307,8 +307,9 @@ void streamFreeQitem(SStreamQueueItem* data) {
}
}
int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerType, int32_t trigger) {
int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, STimeWindow* pLatestWindow) {
QRY_PARAM_CHECK(pTrigger);
int64_t ts = INT64_MIN;
SStreamTrigger* p = NULL;
int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&p);
@ -324,22 +325,49 @@ int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerTyp
}
// let's calculate the previous time window
// todo get the time precision for ts
if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
SInterval interval = {.interval = trigger, .sliding = trigger, .intervalUnit = 'a', .slidingUnit = 'a'};
int64_t now = taosGetTimestampMs();
SInterval interval = {.interval = trigger,
.sliding = trigger,
.intervalUnit = pInterval->intervalUnit,
.slidingUnit = pInterval->slidingUnit};
ts = taosGetTimestampMs();
if (pLatestWindow->skey == INT64_MIN) {
STimeWindow window = getAlignQueryTimeWindow(&interval, ts);
STimeWindow window = getAlignQueryTimeWindow(&interval, now - trigger);
p->pBlock->info.window.skey = window.skey;
p->pBlock->info.window.ekey = TMAX(now, window.ekey);
p->pBlock->info.type = STREAM_GET_RESULT;
stDebug("force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64,
p->pBlock->info.window.skey, p->pBlock->info.window.ekey);
p->pBlock->info.window.ekey = TMAX(ts, window.ekey);
} else {
p->pBlock->info.type = STREAM_GET_ALL;
int64_t skey = pLatestWindow->skey + trigger;
p->pBlock->info.window.skey = skey;
p->pBlock->info.window.ekey = TMAX(ts, skey + trigger);
}
*pTrigger = p;
p->pBlock->info.type = STREAM_GET_RESULT;
stDebug("force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, p->pBlock->info.window.skey,
p->pBlock->info.window.ekey);
*pTrigger = p;
return code;
}
int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger) {
QRY_PARAM_CHECK(pTrigger);
SStreamTrigger* p = NULL;
int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&p);
if (code) {
return code;
}
p->type = STREAM_INPUT__GET_RES;
p->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (p->pBlock == NULL) {
taosFreeQitem(p);
return terrno;
}
p->pBlock->info.type = STREAM_GET_ALL;
*pTrigger = p;
return code;
}

View File

@ -352,7 +352,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
(pTask->info.delaySchedParam != 0)) {
(void)atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE,
TASK_TRIGGER_STATUS__ACTIVE);
TASK_TRIGGER_STATUS__MAY_ACTIVE);
stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr,
pTask->schedInfo.status);
}

View File

@ -40,6 +40,7 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) {
return;
}
pTask->status.latestForceWindow = lastTimeWindow;
pTask->info.delaySchedParam = interval.sliding;
pTask->info.watermark = waterMark;
pTask->info.interval = interval;
@ -156,27 +157,71 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
const char* id = pTask->id.idStr;
int32_t nextTrigger = (int32_t)pTask->info.delaySchedParam;
int32_t vgId = pTask->pMeta->vgId;
int32_t code = 0;
int8_t status = atomic_load_8(&pTask->schedInfo.status);
stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
if (streamTaskShouldStop(pTask)) {
stDebug("s-task:%s should stop, jump out of schedTimer", id);
return;
}
if (streamTaskShouldPause(pTask)) {
stDebug("s-task:%s is paused, check in nextTrigger:%ds", id, nextTrigger/1000);
streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
"sched-run-tmr");
}
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger);
} else {
if ((status == TASK_TRIGGER_STATUS__ACTIVE) ||
(pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE)) {
SStreamTrigger* pTrigger;
if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SStreamTrigger* pTrigger = NULL;
int32_t code = streamCreateSinkResTrigger(&pTrigger, pTask->info.trigger, pTask->info.delaySchedParam);
while (1) {
code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval,
&pTask->status.latestForceWindow);
if (code != 0) {
stError("s-task:%s failed to prepare force window close trigger, code:%s, try again in %dms", id,
tstrerror(code), nextTrigger);
goto _end;
}
// in the force window close model, status trigger does not matter. So we do not set the trigger model
code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to put retrieve aggRes block into q, code:%s", pTask->id.idStr, tstrerror(code));
goto _end;
}
// check whether the time window gaps exist or not
int64_t now = taosGetTimestamp(pTask->info.interval.precision);
int64_t intervalEndTs = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval;
// there are gaps, needs to be filled
STimeWindow w = pTrigger->pBlock->info.window;
w.ekey = w.skey + pTask->info.interval.interval;
if (w.skey <= pTask->status.latestForceWindow.skey) {
stFatal("s-task:%s invalid new time window in force_window_close model, skey:%" PRId64
" should be greater than latestForceWindow skey:%" PRId64,
pTask->id.idStr, w.skey, pTask->status.latestForceWindow.skey);
}
pTask->status.latestForceWindow = w;
if (intervalEndTs + pTask->info.watermark + pTask->info.interval.interval > now) {
break;
} else {
stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey);
}
}
} else if (status == TASK_TRIGGER_STATUS__MAY_ACTIVE) {
SStreamTrigger* pTrigger = NULL;
code = streamCreateSinkResTrigger(&pTrigger);
if (code) {
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, tstrerror(code),
nextTrigger);
terrno = code;
goto _end;
}
@ -187,11 +232,11 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
stError("s-task:%s failed to put retrieve aggRes block into q, code:%s", pTask->id.idStr, tstrerror(code));
goto _end;
}
}
code = streamTrySchedExec(pTask);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to sched to run, wait for next time", pTask->id.idStr);
}
code = streamTrySchedExec(pTask);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to sched to run, wait for next time", pTask->id.idStr);
}
}

View File

@ -29,6 +29,8 @@ sql_error create stream streams9 trigger at_once IGNORE EXPIRED 1 IGNORE UPDATE
sql_error create stream streams10 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt10 as select _wstart, sum(a) from st partition by tbname,ta interval(2s) SLIDING(1s);
sql create stream streams11 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt11 as select _wstart, avg(a) from st partition by tbname,ta interval(2s) SLIDING(2s);
sql_error create stream streams10 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt10 as select _wstart, sum(a) from st interval(2s);
print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT