fix(stream): fix bugs caused by refactor sm.

This commit is contained in:
Haojun Liao 2023-10-20 10:23:48 +08:00
parent e359da7b19
commit e1f6043eed
8 changed files with 42 additions and 23 deletions

View File

@ -49,7 +49,7 @@ extern "C" {
do { \ do { \
(_t)->hTaskInfo.id.taskId = 0; \ (_t)->hTaskInfo.id.taskId = 0; \
(_t)->hTaskInfo.id.streamId = 0; \ (_t)->hTaskInfo.id.streamId = 0; \
} while (0); } while (0)
typedef struct SStreamTask SStreamTask; typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue; typedef struct SStreamQueue SStreamQueue;
@ -739,6 +739,7 @@ bool streamTaskAllUpstreamClosed(SStreamTask* pTask);
bool streamTaskSetSchedStatusWait(SStreamTask* pTask); bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask); int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask);
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM); int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM);

View File

@ -183,8 +183,9 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE,
pTask->id.idStr, p, numOfTasks); pTask->id.idStr, p, numOfTasks);
ASSERT(0); EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
// streamTaskCheckDownstream(pTask); streamTaskHandleEvent(pTask->status.pSM, event);
streamTaskCheckDownstream(pTask);
return 0; return 0;
} }

View File

@ -1029,7 +1029,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
bool restored = pTq->pVnode->restored; bool restored = pTq->pVnode->restored;
if (p != NULL && restored && p->info.fillHistory == 0) { if (p != NULL && restored && p->info.fillHistory == 0) {
EStreamTaskEvent event = (p->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_STREAM_SCANHIST; EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
streamTaskHandleEvent(p->status.pSM, event); streamTaskHandleEvent(p->status.pSM, event);
} else if (!restored) { } else if (!restored) {
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);

View File

@ -99,7 +99,7 @@ int32_t tqStartStreamTask(STQ* pTq) {
continue; continue;
} }
EStreamTaskEvent event = (pTask->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_STREAM_SCANHIST; EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
streamTaskHandleEvent(pTask->status.pSM, event); streamTaskHandleEvent(pTask->status.pSM, event);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} }

View File

@ -360,16 +360,16 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 5. clear the link between fill-history task and stream task info // 5. clear the link between fill-history task and stream task info
CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask); // CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
// 6. save to disk // 6. save to disk
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL); pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
streamMetaSaveTask(pMeta, pStreamTask); // streamMetaSaveTask(pMeta, pStreamTask);
if (streamMetaCommit(pMeta) < 0) { // if (streamMetaCommit(pMeta) < 0) {
// persist to disk // persist to disk
} // }
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
// 7. pause allowed. // 7. pause allowed.
@ -499,10 +499,10 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
} }
} else { // non-dispatch task, do task state transfer directly } else { // non-dispatch task, do task state transfer directly
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
stDebug("s-task:%s non-dispatch task, start to transfer state directly", id); stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, pTask->info.taskLevel);
ASSERT(pTask->info.fillHistory == 1); ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
/*int8_t status = */ streamTaskSetSchedStatusInactive(pTask); /*int8_t status = */ streamTaskSetSchedStatusInactive(pTask);
} }

View File

@ -548,21 +548,19 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) { if (ppTask) {
pTask = *ppTask;
// it is an fill-history task, remove the related stream task's id that points to it // it is an fill-history task, remove the related stream task's id that points to it
if ((*ppTask)->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
STaskId streamTaskId = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId}; streamTaskClearHTaskAttr(pTask);
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &streamTaskId, sizeof(streamTaskId));
if (ppStreamTask != NULL) {
CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask));
}
} else { } else {
atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
} }
taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
ASSERT(pTask->status.timerActive == 0); ASSERT(pTask->status.timerActive == 0);
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) { if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) {
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt);

View File

@ -691,6 +691,25 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
return status; return status;
} }
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
if (pTask->info.fillHistory == 0) {
return TSDB_CODE_SUCCESS;
}
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
if (ppStreamTask != NULL) {
CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask));
streamMetaSaveTask(pMeta, *ppStreamTask);
stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
(int32_t)sTaskId.taskId);
}
return TSDB_CODE_SUCCESS;
}
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId) { int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId) {
SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq)); SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
@ -709,7 +728,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
return code; return code;
} }
stDebug("vgId:%d build and send drop table:0x%x msg", vgId, pTaskId->taskId); stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId);
return code; return code;
} }

View File

@ -261,14 +261,14 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
StreamTaskEventList[pEvtInfo->event].name, pSM->current.name); StreamTaskEventList[pEvtInfo->event].name, pSM->current.name);
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM, pEvtInfo->event); STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM, pEvtInfo->event);
ASSERT(pSM->pActiveTrans == NULL); ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL);
pSM->pActiveTrans = pNextTrans; pSM->pActiveTrans = pNextTrans;
pSM->startTs = taosGetTimestampMs(); pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
int32_t code = pNextTrans->pAction(pSM->pTask); int32_t code = pNextTrans->pAction(pSM->pTask);
if (pTrans->autoInvokeEndFn) { if (pNextTrans->autoInvokeEndFn) {
return streamTaskOnHandleEventSuccess(pSM); return streamTaskOnHandleEventSuccess(pSM);
} else { } else {
return code; return code;