Merge branch 'refact/streamsm' of https://github.com/taosdata/TDengine into refact/streamsm

This commit is contained in:
yihaoDeng 2023-10-20 20:49:17 +08:00
commit aaf7e070ee
3 changed files with 23 additions and 7 deletions

View File

@ -276,6 +276,8 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) {
}
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
int32_t vgId = pMeta->vgId;
taosWLockLatch(&pMeta->lock);
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
@ -297,10 +299,15 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
char* str = NULL;
streamTaskGetStatus(p, &str);
streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
// save the task
streamMetaSaveTask(pMeta, p);
int32_t code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
if (code != TSDB_CODE_SUCCESS) {
stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId);
taosWUnLockLatch(&pMeta->lock);
return -1;
} else { // save the task
streamMetaSaveTask(pMeta, p);
}
stDebug(
"vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, "

View File

@ -318,7 +318,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
// remove the ref by timer
while (pTask->status.timerActive > 0) {
stDebug("s-task:%s wait for task stop timer activities, ref:%d", pTask->id.idStr, pTask->status.timerActive);
taosMsleep(10);
taosMsleep(100);
}
if (pTask->schedInfo.pTimer != NULL) {

View File

@ -115,7 +115,11 @@ static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, con
}
}
ASSERT(0);
if (event == TASK_EVENT_CHECKPOINT_DONE && pState->current.state == TASK_STATUS__STOP) {
} else {
ASSERT(0);
}
return NULL;
}
@ -183,8 +187,13 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
taosThreadMutexLock(&pTask->lock);
STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event);
stDebug("s-task:%s start to handle event:%s, state:%s", pTask->id.idStr, StreamTaskEventList[event].name,
pSM->current.name);
if (pTrans == NULL) {
stWarn("s-task:%s status:%s not allowed handle event:%s", pTask->id.idStr, pSM->current.name, StreamTaskEventList[event].name);
return -1;
} else {
stDebug("s-task:%s start to handle event:%s, state:%s", pTask->id.idStr, StreamTaskEventList[event].name,
pSM->current.name);
}
if (pTrans->attachEvent.event != 0) {
attachEvent(pTask, &pTrans->attachEvent);