diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 27a9168a99..fe6a7de734 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -59,9 +59,10 @@ extern "C" { #define STREAM_EXEC_T_RESUME_TASK (-6) #define STREAM_EXEC_T_ADD_FAILED_TASK (-7) -typedef struct SStreamTask SStreamTask; -typedef struct SStreamQueue SStreamQueue; -typedef struct SStreamTaskSM SStreamTaskSM; +typedef struct SStreamTask SStreamTask; +typedef struct SStreamQueue SStreamQueue; +typedef struct SStreamTaskSM SStreamTaskSM; +typedef struct SStreamQueueItem SStreamQueueItem; #define SSTREAM_TASK_VER 4 #define SSTREAM_TASK_INCOMPATIBLE_VER 1 @@ -153,10 +154,6 @@ typedef enum EStreamTaskEvent { typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param); -typedef struct { - int8_t type; -} SStreamQueueItem; - typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data); typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver); @@ -190,13 +187,6 @@ typedef struct { SSDataBlock* pBlock; } SStreamRefDataBlock; -typedef struct SStreamQueueNode SStreamQueueNode; - -struct SStreamQueueNode { - SStreamQueueItem* item; - SStreamQueueNode* next; -}; - SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); @@ -437,7 +427,7 @@ struct SStreamTask { SCheckpointInfo chkInfo; STaskExec exec; SDataRange dataRange; - SVersionRange step2Range; + SVersionRange step2Range; // version range used to scan wal, information in dataRange should not modified. SHistoryTaskInfo hTaskInfo; STaskId streamTaskId; STaskExecStatisInfo execInfo; @@ -445,14 +435,11 @@ struct SStreamTask { TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ SMsgCb* pMsgCb; // msg handle SStreamState* pState; // state backend - SArray* pRspMsgList; SUpstreamInfo upstreamInfo; STaskCheckInfo taskCheckInfo; // the followings attributes don't be serialized SScanhistorySchedInfo schedHistoryInfo; - - int32_t numOfWaitingUpstream; int32_t refCnt; int32_t transferStateAlignCnt; struct SStreamMeta* pMeta; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 949bc2e60b..3ccb25a62a 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -88,6 +88,10 @@ struct SStreamQueue { int8_t status; }; +struct SStreamQueueItem { + int8_t type; +}; + typedef enum { EXEC_CONTINUE = 0x0, EXEC_AFTER_IDLE = 0x1, diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 98c0d95781..fb5b5e57d4 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -48,9 +48,9 @@ static int32_t streamTaskSetReady(SStreamTask* pTask) { SStreamTaskState* p = streamTaskGetStatus(pTask); if ((p->state == TASK_STATUS__SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) { - pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); + int32_t numOfUps = taosArrayGetSize(pTask->upstreamInfo.pList); stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", - pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, p->name); + pTask->id.idStr, pTask->info.taskLevel, numOfUps, p->name); } ASSERT(pTask->status.downstreamReady == 0); @@ -117,18 +117,16 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) { int32_t level = pTask->info.taskLevel; ETaskStatus status = streamTaskGetStatus(pTask)->state; - ASSERT((pTask->status.downstreamReady == 1) && (status == TASK_STATUS__SCAN_HISTORY)); + ASSERT((pTask->status.downstreamReady == 1) && (status == TASK_STATUS__SCAN_HISTORY) && + (pTask->info.fillHistory == 1)); if (level == TASK_LEVEL__SOURCE) { return doStartScanHistoryTask(pTask); } else if (level == TASK_LEVEL__AGG) { - if (pTask->info.fillHistory) { - streamSetParamForScanHistory(pTask); - } + return streamSetParamForScanHistory(pTask); } else if (level == TASK_LEVEL__SINK) { stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); } - return 0; } @@ -208,27 +206,21 @@ int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) { } int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { - const char* id = pTask->id.idStr; - // set the state to be ready streamTaskSetReady(pTask); streamTaskSetRangeStreamCalc(pTask); SStreamTaskState* p = streamTaskGetStatus(pTask); - ASSERT(p->state == TASK_STATUS__SCAN_HISTORY); + ASSERT((p->state == TASK_STATUS__SCAN_HISTORY) && (pTask->info.fillHistory == 1)); - if (pTask->info.fillHistory == 1) { - stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", id, p->name); - streamTaskStartScanHistory(pTask); - } else { - stDebug("s-task:%s scan wal data, status:%s", id, p->name); - } + stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", pTask->id.idStr, p->name); + streamTaskStartScanHistory(pTask); // NOTE: there will be an deadlock if launch fill history here. -// // start the related fill-history task, when current task is ready -// if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { -// streamLaunchFillHistoryTask(pTask); -// } + // start the related fill-history task, when current task is ready + // if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + // streamLaunchFillHistoryTask(pTask); + // } return TSDB_CODE_SUCCESS; } @@ -515,6 +507,8 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVe SVersionRange* pRange = &pTask->dataRange.range; ASSERT(nextProcessVer >= pRange->maxVer); + // maxVer for fill-history task is the version, where the last timestamp is acquired. + // it's also the maximum version to scan data in tsdb. int64_t walScanStartVer = pRange->maxVer + 1; if (walScanStartVer > nextProcessVer - 1) { stDebug( diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c7e34987ab..7f5ea52f58 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -456,11 +456,6 @@ void tFreeStreamTask(SStreamTask* pTask) { tSimpleHashCleanup(pTask->pNameMap); } - if (pTask->pRspMsgList != NULL) { - taosArrayDestroyEx(pTask->pRspMsgList, freeItem); - pTask->pRspMsgList = NULL; - } - pTask->status.pSM = streamDestroyStateMachine(pTask->status.pSM); streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);