fix(stream): fix bugs caused by refactor sm.

This commit is contained in:
Haojun Liao 2023-10-20 10:23:48 +08:00
parent 4d5d078fbe
commit 270a847809
8 changed files with 42 additions and 23 deletions

View File

@ -49,7 +49,7 @@ extern "C" {
do { \
(_t)->hTaskInfo.id.taskId = 0; \
(_t)->hTaskInfo.id.streamId = 0; \
} while (0);
} while (0)
typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue;
@ -739,6 +739,7 @@ bool streamTaskAllUpstreamClosed(SStreamTask* pTask);
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask);
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM);

View File

@ -183,8 +183,9 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE,
pTask->id.idStr, p, numOfTasks);
ASSERT(0);
// streamTaskCheckDownstream(pTask);
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
streamTaskHandleEvent(pTask->status.pSM, event);
streamTaskCheckDownstream(pTask);
return 0;
}

View File

@ -1029,7 +1029,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
bool restored = pTq->pVnode->restored;
if (p != NULL && restored && p->info.fillHistory == 0) {
EStreamTaskEvent event = (p->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_STREAM_SCANHIST;
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
streamTaskHandleEvent(p->status.pSM, event);
} else if (!restored) {
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);

View File

@ -99,7 +99,7 @@ int32_t tqStartStreamTask(STQ* pTq) {
continue;
}
EStreamTaskEvent event = (pTask->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_STREAM_SCANHIST;
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
streamTaskHandleEvent(pTask->status.pSM, event);
streamMetaReleaseTask(pMeta, pTask);
}

View File

@ -360,16 +360,16 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 5. clear the link between fill-history task and stream task info
CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
// CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
// 6. save to disk
taosWLockLatch(&pMeta->lock);
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
streamMetaSaveTask(pMeta, pStreamTask);
if (streamMetaCommit(pMeta) < 0) {
// streamMetaSaveTask(pMeta, pStreamTask);
// if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
// }
taosWUnLockLatch(&pMeta->lock);
// 7. pause allowed.
@ -499,10 +499,10 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
}
} else { // non-dispatch task, do task state transfer directly
streamFreeQitem((SStreamQueueItem*)pBlock);
stDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, pTask->info.taskLevel);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) {
/*int8_t status = */ streamTaskSetSchedStatusInactive(pTask);
}

View File

@ -548,21 +548,19 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
taosWLockLatch(&pMeta->lock);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
pTask = *ppTask;
// it is an fill-history task, remove the related stream task's id that points to it
if ((*ppTask)->info.fillHistory == 1) {
STaskId streamTaskId = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId};
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &streamTaskId, sizeof(streamTaskId));
if (ppStreamTask != NULL) {
CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask));
}
if (pTask->info.fillHistory == 1) {
streamTaskClearHTaskAttr(pTask);
} else {
atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
}
taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
ASSERT(pTask->status.timerActive == 0);
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) {
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt);

View File

@ -691,6 +691,25 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
return status;
}
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
if (pTask->info.fillHistory == 0) {
return TSDB_CODE_SUCCESS;
}
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
if (ppStreamTask != NULL) {
CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask));
streamMetaSaveTask(pMeta, *ppStreamTask);
stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
(int32_t)sTaskId.taskId);
}
return TSDB_CODE_SUCCESS;
}
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId) {
SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
@ -709,7 +728,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
return code;
}
stDebug("vgId:%d build and send drop table:0x%x msg", vgId, pTaskId->taskId);
stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId);
return code;
}

View File

@ -261,14 +261,14 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
StreamTaskEventList[pEvtInfo->event].name, pSM->current.name);
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM, pEvtInfo->event);
ASSERT(pSM->pActiveTrans == NULL);
ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL);
pSM->pActiveTrans = pNextTrans;
pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock);
int32_t code = pNextTrans->pAction(pSM->pTask);
if (pTrans->autoInvokeEndFn) {
if (pNextTrans->autoInvokeEndFn) {
return streamTaskOnHandleEventSuccess(pSM);
} else {
return code;