refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-02-19 10:31:25 +08:00
parent cf571e4f1f
commit e62dd2ff5c
5 changed files with 3 additions and 72 deletions

View File

@ -215,7 +215,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo);
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
bool qStreamScanhistoryFinished(qTaskInfo_t tinfo);
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo);
void resetTaskInfo(qTaskInfo_t tinfo);

View File

@ -724,19 +724,6 @@ typedef struct SStreamTaskState {
char* name;
} SStreamTaskState;
typedef struct {
int64_t streamId;
int32_t downstreamTaskId;
int32_t taskId;
} SStreamRecoverDownstreamReq;
typedef struct {
int64_t streamId;
int32_t downstreamTaskId;
int32_t taskId;
SArray* checkpointVer; // SArray<SStreamCheckpointInfo>
} SStreamRecoverDownstreamRsp;
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);
@ -789,8 +776,7 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
// recover and fill history
void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
int64_t* oldStage);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamId, int32_t vgId, int64_t stage, int64_t* oldStage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask);
@ -803,7 +789,6 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
void streamTaskRestoreStatus(SStreamTask* pTask);
int32_t streamTaskStop(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
@ -815,9 +800,9 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer)
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
// common
int32_t streamRestoreParam(SStreamTask* pTask);
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask);
void streamTaskResume(SStreamTask* pTask);
int32_t streamTaskStop(SStreamTask* pTask);
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
@ -894,6 +879,7 @@ void* streamDestroyStateMachine(SStreamTaskSM* pSM);
int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req);
void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp);
#ifdef __cplusplus
}
#endif

View File

@ -64,8 +64,6 @@ typedef struct {
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
int8_t recoverStep;
// bool recoverStep1Finished;
// bool recoverStep2Finished;
int8_t recoverScanFinished;
SQueryTableDataCond tableCond;
SVersionRange fillHistoryVer;
@ -84,7 +82,6 @@ struct SExecTaskInfo {
int64_t version; // used for stream to record wal version, why not move to sschemainfo
SStreamTaskInfo streamInfo;
SArray* schemaInfos;
SSchemaInfo schemaInfo;
const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]

View File

@ -1027,52 +1027,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
return 0;
}
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
const char* id = GET_TASKID(pTaskInfo);
SOperatorInfo* pOperator = pTaskInfo->pRoot;
while (1) {
uint16_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
qInfo("%s restore stream agg executors param for interval: %d, %" PRId64, id, pInfo->twAggSup.calTrigger,
pInfo->twAggSup.deleteMark);
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
qInfo("%s restore stream agg executor param for session: %d, %" PRId64, id, pInfo->twAggSup.calTrigger,
pInfo->twAggSup.deleteMark);
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
qInfo("%s restore stream agg executor param for state: %d, %" PRId64, id, pInfo->twAggSup.calTrigger,
pInfo->twAggSup.deleteMark);
}
// iterate operator tree
if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
if (pOperator->numOfDownstream > 1) {
qError("unexpected stream, multiple downstream");
return -1;
}
return 0;
} else {
pOperator = pOperator->pDownstream[0];
}
}
}
bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverScanFinished;

View File

@ -539,11 +539,6 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
}
int32_t streamRestoreParam(SStreamTask* pTask) {
stDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr);
return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
}
// source
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow) {
return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);