diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index e06a08acba..fe20639c77 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -215,7 +215,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo); int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); -int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo); bool qStreamScanhistoryFinished(qTaskInfo_t tinfo); int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo); void resetTaskInfo(qTaskInfo_t tinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b11db04e82..1ae14cf5c4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -724,19 +724,6 @@ typedef struct SStreamTaskState { char* name; } SStreamTaskState; -typedef struct { - int64_t streamId; - int32_t downstreamTaskId; - int32_t taskId; -} SStreamRecoverDownstreamReq; - -typedef struct { - int64_t streamId; - int32_t downstreamTaskId; - int32_t taskId; - SArray* checkpointVer; // SArray -} SStreamRecoverDownstreamRsp; - int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq); int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq); @@ -789,8 +776,7 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); // recover and fill history void streamTaskCheckDownstream(SStreamTask* pTask); -int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, - int64_t* oldStage); +int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamId, int32_t vgId, int64_t stage, int64_t* oldStage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask); @@ -803,7 +789,6 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event); void streamTaskRestoreStatus(SStreamTask* pTask); -int32_t streamTaskStop(SStreamTask* pTask); int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, SRpcHandleInfo* pRpcInfo, int32_t taskId); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); @@ -815,9 +800,9 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common -int32_t streamRestoreParam(SStreamTask* pTask); void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask); void streamTaskResume(SStreamTask* pTask); +int32_t streamTaskStop(SStreamTask* pTask); int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); @@ -894,6 +879,7 @@ void* streamDestroyStateMachine(SStreamTaskSM* pSM); int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req); void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 0c9a5e3197..886ce9705d 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -64,8 +64,6 @@ typedef struct { SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor int8_t recoverStep; -// bool recoverStep1Finished; -// bool recoverStep2Finished; int8_t recoverScanFinished; SQueryTableDataCond tableCond; SVersionRange fillHistoryVer; @@ -84,7 +82,6 @@ struct SExecTaskInfo { int64_t version; // used for stream to record wal version, why not move to sschemainfo SStreamTaskInfo streamInfo; SArray* schemaInfos; - SSchemaInfo schemaInfo; const char* sql; // query sql string jmp_buf env; // jump to this position when error happens. EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6d80b79d9d..2534c5e9f0 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1027,52 +1027,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { return 0; } -int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - const char* id = GET_TASKID(pTaskInfo); - SOperatorInfo* pOperator = pTaskInfo->pRoot; - - while (1) { - uint16_t type = pOperator->operatorType; - if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) { - SStreamIntervalOperatorInfo* pInfo = pOperator->info; - pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; - pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; - pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved; - qInfo("%s restore stream agg executors param for interval: %d, %" PRId64, id, pInfo->twAggSup.calTrigger, - pInfo->twAggSup.deleteMark); - } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { - SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; - pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; - pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved; - qInfo("%s restore stream agg executor param for session: %d, %" PRId64, id, pInfo->twAggSup.calTrigger, - pInfo->twAggSup.deleteMark); - } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { - SStreamStateAggOperatorInfo* pInfo = pOperator->info; - pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; - pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; - pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved; - qInfo("%s restore stream agg executor param for state: %d, %" PRId64, id, pInfo->twAggSup.calTrigger, - pInfo->twAggSup.deleteMark); - } - - // iterate operator tree - if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) { - if (pOperator->numOfDownstream > 1) { - qError("unexpected stream, multiple downstream"); - return -1; - } - return 0; - } else { - pOperator = pOperator->pDownstream[0]; - } - } -} - bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; return pTaskInfo->streamInfo.recoverScanFinished; diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index ee98bc801b..fc921a4acc 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -539,11 +539,6 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask) { return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor); } -int32_t streamRestoreParam(SStreamTask* pTask) { - stDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr); - return qRestoreStreamOperatorOption(pTask->exec.pExecutor); -} - // source int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow) { return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);