diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 6564bcc769..6c2cec6292 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -49,7 +49,7 @@ extern "C" { do { \ (_t)->hTaskInfo.id.taskId = 0; \ (_t)->hTaskInfo.id.streamId = 0; \ - } while (0); + } while (0) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; @@ -739,6 +739,7 @@ bool streamTaskAllUpstreamClosed(SStreamTask* pTask); bool streamTaskSetSchedStatusWait(SStreamTask* pTask); int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask); int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); +int32_t streamTaskClearHTaskAttr(SStreamTask* pTask); int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 4015ba9c61..4e84b4cd26 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -183,8 +183,9 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr, p, numOfTasks); - ASSERT(0); -// streamTaskCheckDownstream(pTask); + EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; + streamTaskHandleEvent(pTask->status.pSM, event); + streamTaskCheckDownstream(pTask); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c4e74f84e5..e99503df33 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1029,7 +1029,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms bool restored = pTq->pVnode->restored; if (p != NULL && restored && p->info.fillHistory == 0) { - EStreamTaskEvent event = (p->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_STREAM_SCANHIST; + EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; streamTaskHandleEvent(p->status.pSM, event); } else if (!restored) { tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 8dfa1e2670..4bc386fb9a 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -99,7 +99,7 @@ int32_t tqStartStreamTask(STQ* pTq) { continue; } - EStreamTaskEvent event = (pTask->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_STREAM_SCANHIST; + EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; streamTaskHandleEvent(pTask->status.pSM, event); streamMetaReleaseTask(pMeta, pTask); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 8c37e785dd..2b1ea7c911 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -360,16 +360,16 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 5. clear the link between fill-history task and stream task info - CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask); +// CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask); // 6. save to disk taosWLockLatch(&pMeta->lock); pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL); - streamMetaSaveTask(pMeta, pStreamTask); - if (streamMetaCommit(pMeta) < 0) { +// streamMetaSaveTask(pMeta, pStreamTask); +// if (streamMetaCommit(pMeta) < 0) { // persist to disk - } +// } taosWUnLockLatch(&pMeta->lock); // 7. pause allowed. @@ -499,10 +499,10 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock } } else { // non-dispatch task, do task state transfer directly streamFreeQitem((SStreamQueueItem*)pBlock); - stDebug("s-task:%s non-dispatch task, start to transfer state directly", id); + stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, pTask->info.taskLevel); ASSERT(pTask->info.fillHistory == 1); - code = streamTransferStateToStreamTask(pTask); + code = streamTransferStateToStreamTask(pTask); if (code != TSDB_CODE_SUCCESS) { /*int8_t status = */ streamTaskSetSchedStatusInactive(pTask); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b35e48fa23..0fc37a38e4 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -548,21 +548,19 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t taosWLockLatch(&pMeta->lock); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { + pTask = *ppTask; + // it is an fill-history task, remove the related stream task's id that points to it - if ((*ppTask)->info.fillHistory == 1) { - STaskId streamTaskId = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId}; - SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &streamTaskId, sizeof(streamTaskId)); - if (ppStreamTask != NULL) { - CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask)); - } + if (pTask->info.fillHistory == 1) { + streamTaskClearHTaskAttr(pTask); } else { atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); } taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); + doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); ASSERT(pTask->status.timerActive == 0); - doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) { stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 1d12401d12..231f1ce299 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -691,6 +691,25 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) { return status; } +int32_t streamTaskClearHTaskAttr(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + if (pTask->info.fillHistory == 0) { + return TSDB_CODE_SUCCESS; + } + + STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId}; + SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId)); + + if (ppStreamTask != NULL) { + CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask)); + streamMetaSaveTask(pMeta, *ppStreamTask); + stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr, + (int32_t)sTaskId.taskId); + } + + return TSDB_CODE_SUCCESS; +} + int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId) { SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { @@ -709,7 +728,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI return code; } - stDebug("vgId:%d build and send drop table:0x%x msg", vgId, pTaskId->taskId); + stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId); return code; } diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 463b7ae771..7abdd155b9 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -261,14 +261,14 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { StreamTaskEventList[pEvtInfo->event].name, pSM->current.name); STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM, pEvtInfo->event); - ASSERT(pSM->pActiveTrans == NULL); + ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL); + pSM->pActiveTrans = pNextTrans; pSM->startTs = taosGetTimestampMs(); - taosThreadMutexUnlock(&pTask->lock); - + int32_t code = pNextTrans->pAction(pSM->pTask); - if (pTrans->autoInvokeEndFn) { + if (pNextTrans->autoInvokeEndFn) { return streamTaskOnHandleEventSuccess(pSM); } else { return code;