fix(stream): fix error in finishing event handling.
This commit is contained in:
parent
762cfef498
commit
0b18192b23
|
@ -432,7 +432,7 @@ struct SStreamTask {
|
|||
typedef struct STaskStartInfo {
|
||||
int64_t startTs;
|
||||
int64_t readyTs;
|
||||
int32_t startedAfterNodeUpdate;
|
||||
int32_t startAllTasksFlag;
|
||||
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
||||
int32_t elapsedTime;
|
||||
} STaskStartInfo;
|
||||
|
@ -735,7 +735,6 @@ void streamTaskCheckDownstream(SStreamTask* pTask);
|
|||
int32_t onNormalTaskReady(SStreamTask* pTask);
|
||||
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
|
||||
|
||||
//int32_t streamTaskStartScanHistory(SStreamTask* pTask);
|
||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
|
||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
||||
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
||||
|
@ -798,7 +797,6 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);
|
|||
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
|
||||
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
|
||||
int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta);
|
||||
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
int32_t streamMetaReopen(SStreamMeta* pMeta);
|
||||
|
@ -808,12 +806,12 @@ void streamMetaNotifyClose(SStreamMeta* pMeta);
|
|||
void streamMetaStartHb(SStreamMeta* pMeta);
|
||||
void streamMetaInitForSnode(SStreamMeta* pMeta);
|
||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
||||
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask);
|
||||
|
||||
// checkpoint
|
||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
|
||||
void streamTaskClearCheckInfo(SStreamTask* pTask);
|
||||
|
||||
int32_t streamAlignTransferState(SStreamTask* pTask);
|
||||
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId);
|
||||
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
|
||||
|
|
|
@ -231,7 +231,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq);
|
||||
int32_t tqLaunchStreamTaskAsync(STQ* pTq);
|
||||
|
||||
int tqCommit(STQ*);
|
||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||
|
|
|
@ -1267,12 +1267,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
|
||||
|
||||
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
|
||||
#if 0
|
||||
// the fill-history task starts to process data in wal, let's set it status to be normal now
|
||||
if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) {
|
||||
streamSetStatusNormal(pTask);
|
||||
}
|
||||
#endif
|
||||
|
||||
// now the fill-history task starts to scan data from wal files.
|
||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||
|
@ -1530,14 +1524,17 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
|||
}
|
||||
|
||||
streamTaskResume(pTask);
|
||||
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
|
||||
|
||||
int32_t level = pTask->info.taskLevel;
|
||||
if (level == TASK_LEVEL__SINK) {
|
||||
if (status == TASK_STATUS__UNINIT) {
|
||||
|
||||
}
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
|
||||
if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
|
||||
// no lock needs to secure the access of the version
|
||||
if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
|
||||
|
@ -1558,6 +1555,11 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
|||
} else {
|
||||
streamSchedExec(pTask);
|
||||
}
|
||||
} else if (status == TASK_STATUS__UNINIT) {
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
EStreamTaskEvent event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
||||
streamTaskHandleEvent(pTask->status.pSM, event);
|
||||
}
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
|
@ -1951,7 +1953,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
|
||||
|
||||
pMeta->startInfo.startedAfterNodeUpdate = 1;
|
||||
pMeta->startInfo.startAllTasksFlag = 1;
|
||||
|
||||
if (updateTasks < numOfTasks) {
|
||||
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
||||
|
@ -1960,7 +1962,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
} else {
|
||||
if (!pTq->pVnode->restored) {
|
||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
||||
pMeta->startInfo.startedAfterNodeUpdate = 0;
|
||||
pMeta->startInfo.startAllTasksFlag = 0;
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
} else {
|
||||
tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId);
|
||||
|
@ -1991,7 +1993,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||
vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
|
||||
tqResetStreamTaskStatus(pTq);
|
||||
tqCheckAndRunStreamTaskAsync(pTq);
|
||||
tqLaunchStreamTaskAsync(pTq);
|
||||
} else {
|
||||
vInfo("vgId:%d, follower node not start stream tasks", vgId);
|
||||
}
|
||||
|
|
|
@ -98,6 +98,7 @@ int32_t tqStartStreamTask(STQ* pTq) {
|
|||
streamLaunchFillHistoryTask(pTask);
|
||||
}
|
||||
|
||||
streamMetaUpdateTaskReadyInfo(pTask);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
@ -111,7 +112,7 @@ int32_t tqStartStreamTask(STQ* pTq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
|
||||
int32_t tqLaunchStreamTaskAsync(STQ* pTq) {
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
|
||||
|
|
|
@ -552,7 +552,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
|||
pVnode->restored = true;
|
||||
|
||||
taosWLockLatch(&pVnode->pTq->pStreamMeta->lock);
|
||||
if (pVnode->pTq->pStreamMeta->startInfo.startedAfterNodeUpdate) {
|
||||
if (pVnode->pTq->pStreamMeta->startInfo.startAllTasksFlag) {
|
||||
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
|
||||
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
|
||||
return;
|
||||
|
@ -565,7 +565,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
|||
} else {
|
||||
vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
|
||||
tqResetStreamTaskStatus(pVnode->pTq);
|
||||
tqCheckAndRunStreamTaskAsync(pVnode->pTq);
|
||||
tqLaunchStreamTaskAsync(pVnode->pTq);
|
||||
}
|
||||
} else {
|
||||
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
|
||||
|
|
|
@ -229,10 +229,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
|
||||
int32_t streamMetaReopen(SStreamMeta* pMeta) {
|
||||
// backup the restart flag
|
||||
int32_t restartFlag = pMeta->startInfo.startedAfterNodeUpdate;
|
||||
int32_t restartFlag = pMeta->startInfo.startAllTasksFlag;
|
||||
streamMetaClear(pMeta);
|
||||
|
||||
pMeta->startInfo.startedAfterNodeUpdate = restartFlag;
|
||||
pMeta->startInfo.startAllTasksFlag = restartFlag;
|
||||
|
||||
// NOTE: role should not be changed during reopen meta
|
||||
pMeta->streamBackendRid = -1;
|
||||
|
@ -446,24 +446,6 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
|
|||
return (int32_t)size;
|
||||
}
|
||||
|
||||
int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) {
|
||||
int32_t num = 0;
|
||||
size_t size = taosArrayGetSize(pMeta->pTaskList);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||
SStreamTask** p = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((*p)->info.fillHistory == 0) {
|
||||
num += 1;
|
||||
}
|
||||
}
|
||||
|
||||
return num;
|
||||
}
|
||||
|
||||
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||
taosRLockLatch(&pMeta->lock);
|
||||
|
||||
|
@ -1106,6 +1088,6 @@ void streamMetaInitForSnode(SStreamMeta* pMeta) {
|
|||
|
||||
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
|
||||
taosHashClear(pStartInfo->pReadyTaskSet);
|
||||
pStartInfo->startedAfterNodeUpdate = 0;
|
||||
pStartInfo->startAllTasksFlag = 0;
|
||||
pStartInfo->readyTs = 0;
|
||||
}
|
|
@ -35,7 +35,7 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
|
|||
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
|
||||
static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
static void tryLaunchHistoryTask(void* param, void* tmrId);
|
||||
static int32_t updateTaskReadyInMeta(SStreamTask* pTask);
|
||||
static void doProcessDownstreamReadyRsp(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskSetReady(SStreamTask* pTask) {
|
||||
char* p = NULL;
|
||||
|
@ -57,7 +57,7 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
|
|||
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
|
||||
pTask->id.idStr, numOfDowns, el, p);
|
||||
|
||||
updateTaskReadyInMeta(pTask);
|
||||
streamMetaUpdateTaskReadyInfo(pTask);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -114,7 +114,7 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
// check status
|
||||
static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
|
||||
void streamTaskCheckDownstream(SStreamTask* pTask) {
|
||||
SDataRange* pRange = &pTask->dataRange;
|
||||
STimeWindow* pWindow = &pRange->window;
|
||||
|
||||
|
@ -163,10 +163,8 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
|
|||
}
|
||||
} else {
|
||||
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
||||
streamTaskOnHandleEventSuccess(pTask->status.pSM, TASK_EVENT_INIT);
|
||||
doProcessDownstreamReadyRsp(pTask);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static STaskRecheckInfo* createRecheckInfo(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
|
||||
|
@ -272,8 +270,13 @@ int32_t onNormalTaskReady(SStreamTask* pTask) {
|
|||
ASSERT(status == TASK_STATUS__READY);
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
int64_t startVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||
if (startVer == -1) {
|
||||
startVer = pTask->chkInfo.nextProcessVer;
|
||||
}
|
||||
|
||||
stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64,
|
||||
id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader));
|
||||
id, p, pTask->status.schedStatus, startVer);
|
||||
} else {
|
||||
stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p, pTask->status.schedStatus);
|
||||
}
|
||||
|
@ -304,36 +307,15 @@ int32_t onScanhistoryTaskReady(SStreamTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// todo: refactor this function.
|
||||
static void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
||||
streamTaskOnHandleEventSuccess(pTask->status.pSM, TASK_EVENT_INIT);
|
||||
|
||||
#if 0
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
int8_t status = pTask->status.taskStatus;
|
||||
const char* str = streamGetTaskStatusStr(status);
|
||||
|
||||
ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__READY);
|
||||
streamTaskSetRangeStreamCalc(pTask);
|
||||
|
||||
if (status == TASK_STATUS__SCAN_HISTORY) {
|
||||
stDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
|
||||
streamTaskStartScanHistory(pTask);
|
||||
// start the related fill-history task, when current task is ready
|
||||
streamLaunchFillHistoryTask(pTask);
|
||||
void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
||||
EStreamTaskEvent event;
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
event = HAS_RELATED_FILLHISTORY_TASK(pTask)? TASK_EVENT_INIT_STREAM_SCANHIST:TASK_EVENT_INIT;
|
||||
} else {
|
||||
// fill-history tasks are not allowed to reach here.
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id);
|
||||
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||
ASSERT(pTask->hTaskInfo.id.taskId == 0);
|
||||
} else {
|
||||
stDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
|
||||
streamTaskEnablePause(pTask);
|
||||
event = TASK_EVENT_INIT_SCANHIST;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
|
||||
}
|
||||
|
||||
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
|
||||
|
@ -951,17 +933,6 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
|
|||
}
|
||||
}
|
||||
|
||||
// only the downstream tasks are ready, set the task to be ready to work.
|
||||
void streamTaskCheckDownstream(SStreamTask* pTask) {
|
||||
// if (pTask->info.fillHistory) {
|
||||
// ASSERT(0);
|
||||
// stDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
|
||||
// return;
|
||||
// }
|
||||
|
||||
doCheckDownstreamStatus(pTask);
|
||||
}
|
||||
|
||||
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
|
||||
#if 0
|
||||
int8_t status = pTask->status.taskStatus;
|
||||
|
@ -1076,7 +1047,7 @@ void streamTaskEnablePause(SStreamTask* pTask) {
|
|||
pTask->status.pauseAllowed = 1;
|
||||
}
|
||||
|
||||
int32_t updateTaskReadyInMeta(SStreamTask* pTask) {
|
||||
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
|
@ -1088,13 +1059,9 @@ int32_t updateTaskReadyInMeta(SStreamTask* pTask) {
|
|||
|
||||
if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) {
|
||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||
pStartInfo->readyTs = pTask->execInfo.start;
|
||||
|
||||
if (pStartInfo->startTs != 0) {
|
||||
pStartInfo->elapsedTime = pStartInfo->readyTs - pStartInfo->startTs;
|
||||
} else {
|
||||
pStartInfo->elapsedTime = 0;
|
||||
}
|
||||
pStartInfo->readyTs = pTask->execInfo.start;
|
||||
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
||||
|
||||
streamMetaResetStartInfo(pStartInfo);
|
||||
|
||||
|
|
|
@ -140,9 +140,9 @@ void streamTaskRestoreStatus(SStreamTask* pTask) {
|
|||
pSM->prev.evt = 0;
|
||||
|
||||
pSM->startTs = taosGetTimestampMs();
|
||||
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
|
||||
}
|
||||
|
||||
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
|
||||
|
@ -186,6 +186,7 @@ void* streamDestroyStateMachine(SStreamTaskSM* pSM) {
|
|||
|
||||
static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans) {
|
||||
SStreamTask* pTask = pSM->pTask;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
if (pTrans->attachEvent.event != 0) {
|
||||
attachEvent(pTask, &pTrans->attachEvent);
|
||||
|
@ -198,11 +199,10 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
|
|||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {
|
||||
stDebug("s-task:%s attached event:%s handled", pTask->id.idStr, StreamTaskEventList[pTrans->event].name);
|
||||
stDebug("s-task:%s attached event:%s handled", id, StreamTaskEventList[pTrans->event].name);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else { // this event has been handled already
|
||||
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", pTask->id.idStr,
|
||||
StreamTaskEventList[event].name);
|
||||
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, StreamTaskEventList[event].name);
|
||||
taosMsleep(100);
|
||||
}
|
||||
}
|
||||
|
@ -244,7 +244,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
|||
|
||||
if (pSM->pActiveTrans != NULL) {
|
||||
// currently in some state transfer procedure, not auto invoke transfer, abort it
|
||||
stDebug("s-task:%s handle event procedure %s quit, status %s -> %s failed, handle event %s now",
|
||||
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
|
||||
pTask->id.idStr, StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name,
|
||||
pSM->pActiveTrans->next.name, StreamTaskEventList[event].name);
|
||||
}
|
||||
|
@ -275,16 +275,16 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
|||
ETaskStatus s = pSM->current.state;
|
||||
ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP);
|
||||
// the pSM->prev.evt may be 0, so print string is not appropriate.
|
||||
stDebug("status not handled success, current status:%s, trigger event:%d, %s", pSM->current.name, pSM->prev.evt,
|
||||
pTask->id.idStr);
|
||||
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr,
|
||||
StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pSM->prev.evt].name);
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (pTrans->event != event) {
|
||||
stWarn("s-task:%s handle event:%s failed, current status:%s", pTask->id.idStr, StreamTaskEventList[event].name,
|
||||
pSM->current.name);
|
||||
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
|
||||
StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pTrans->event].name);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
@ -318,7 +318,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
|||
|
||||
int32_t code = pNextTrans->pAction(pSM->pTask);
|
||||
if (pNextTrans->autoInvokeEndFn) {
|
||||
return streamTaskOnHandleEventSuccess(pSM, event);
|
||||
return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event);
|
||||
} else {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ sql create table ts4 using st tags(4,2,2);
|
|||
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s);
|
||||
sleep 1000
|
||||
|
||||
sleep 1000
|
||||
sql pause stream streams1;
|
||||
|
||||
sql insert into ts1 values(1648791213001,1,12,3,1.0);
|
||||
|
@ -247,6 +248,7 @@ sql create table ts4 using st tags(4,2,2);
|
|||
sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt3 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s);
|
||||
sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt4 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s);
|
||||
sql create stream streams5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt5 as select _wstart, count(*) c1, sum(a) c3 from ts1 interval(10s);
|
||||
sleep 1000
|
||||
|
||||
sql pause stream streams3;
|
||||
|
||||
|
|
|
@ -35,6 +35,8 @@ class TDTestCase:
|
|||
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {self.ctb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value)
|
||||
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value)
|
||||
init_num = 0
|
||||
time.sleep(1)
|
||||
|
||||
for i in range(self.tdCom.range_count):
|
||||
if i == 0:
|
||||
window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session'])
|
||||
|
|
Loading…
Reference in New Issue