Merge pull request #24763 from taosdata/fix/crash_stream
refactor: do some internal refactor.
This commit is contained in:
commit
f95fb4f27e
|
@ -215,7 +215,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo);
|
||||||
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
|
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
|
||||||
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
|
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
|
||||||
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
|
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
|
||||||
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
|
|
||||||
bool qStreamScanhistoryFinished(qTaskInfo_t tinfo);
|
bool qStreamScanhistoryFinished(qTaskInfo_t tinfo);
|
||||||
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo);
|
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo);
|
||||||
void resetTaskInfo(qTaskInfo_t tinfo);
|
void resetTaskInfo(qTaskInfo_t tinfo);
|
||||||
|
|
|
@ -725,19 +725,6 @@ typedef struct SStreamTaskState {
|
||||||
char* name;
|
char* name;
|
||||||
} SStreamTaskState;
|
} SStreamTaskState;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t streamId;
|
|
||||||
int32_t downstreamTaskId;
|
|
||||||
int32_t taskId;
|
|
||||||
} SStreamRecoverDownstreamReq;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t streamId;
|
|
||||||
int32_t downstreamTaskId;
|
|
||||||
int32_t taskId;
|
|
||||||
SArray* checkpointVer; // SArray<SStreamCheckpointInfo>
|
|
||||||
} SStreamRecoverDownstreamRsp;
|
|
||||||
|
|
||||||
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
|
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
|
||||||
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);
|
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);
|
||||||
|
|
||||||
|
@ -746,10 +733,12 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
|
||||||
|
|
||||||
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq);
|
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq);
|
||||||
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
||||||
|
|
||||||
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
|
||||||
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
|
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
|
||||||
|
|
||||||
|
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
|
||||||
|
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
||||||
|
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
|
||||||
|
|
||||||
typedef struct SStreamTaskCheckpointReq {
|
typedef struct SStreamTaskCheckpointReq {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
|
@ -788,8 +777,7 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
|
||||||
// recover and fill history
|
// recover and fill history
|
||||||
void streamTaskCheckDownstream(SStreamTask* pTask);
|
void streamTaskCheckDownstream(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
|
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamId, int32_t vgId, int64_t stage, int64_t* oldStage);
|
||||||
int64_t* oldStage);
|
|
||||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
||||||
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
||||||
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask);
|
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask);
|
||||||
|
@ -802,7 +790,6 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
||||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
||||||
void streamTaskRestoreStatus(SStreamTask* pTask);
|
void streamTaskRestoreStatus(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamTaskStop(SStreamTask* pTask);
|
|
||||||
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
|
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
|
||||||
SRpcHandleInfo* pRpcInfo, int32_t taskId);
|
SRpcHandleInfo* pRpcInfo, int32_t taskId);
|
||||||
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
|
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
|
||||||
|
@ -814,9 +801,9 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer)
|
||||||
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
||||||
|
|
||||||
// common
|
// common
|
||||||
int32_t streamRestoreParam(SStreamTask* pTask);
|
|
||||||
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask);
|
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||||
void streamTaskResume(SStreamTask* pTask);
|
void streamTaskResume(SStreamTask* pTask);
|
||||||
|
int32_t streamTaskStop(SStreamTask* pTask);
|
||||||
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
|
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
|
||||||
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
||||||
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
||||||
|
@ -892,8 +879,9 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
|
||||||
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
|
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
|
||||||
void* streamDestroyStateMachine(SStreamTaskSM* pSM);
|
void* streamDestroyStateMachine(SStreamTaskSM* pSM);
|
||||||
|
|
||||||
int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq* req);
|
int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req);
|
||||||
void sendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp);
|
void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -85,8 +85,9 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
||||||
|
|
||||||
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
|
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
|
||||||
// checkpoint ver is the kept version, handled data should be the next version.
|
// checkpoint ver is the kept version, handled data should be the next version.
|
||||||
if (pTask->chkInfo.checkpointId != 0) {
|
if (pChkInfo->checkpointId != 0) {
|
||||||
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
|
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
|
||||||
|
pChkInfo->processedVer = pChkInfo->checkpointVer;
|
||||||
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64,
|
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64,
|
||||||
pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -835,6 +835,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
||||||
// checkpoint ver is the kept version, handled data should be the next version.
|
// checkpoint ver is the kept version, handled data should be the next version.
|
||||||
if (pChkInfo->checkpointId != 0) {
|
if (pChkInfo->checkpointId != 0) {
|
||||||
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
|
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
|
||||||
|
pChkInfo->processedVer = pChkInfo->checkpointVer;
|
||||||
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
||||||
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -317,14 +317,14 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
||||||
req.dstTaskId);
|
req.dstTaskId);
|
||||||
taosMemoryFree(req.pRetrieve);
|
tDeleteStreamRetrieveReq(&req);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if(pTask->info.taskLevel == TASK_LEVEL__SOURCE){
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
code = streamProcessRetrieveReq(pTask, &req);
|
code = streamProcessRetrieveReq(pTask, &req);
|
||||||
}else{
|
} else {
|
||||||
req.srcNodeId = pTask->info.nodeId;
|
req.srcNodeId = pTask->info.nodeId;
|
||||||
req.srcTaskId = pTask->id.taskId;
|
req.srcTaskId = pTask->id.taskId;
|
||||||
code = broadcastRetrieveMsg(pTask, &req);
|
code = broadcastRetrieveMsg(pTask, &req);
|
||||||
|
@ -334,7 +334,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
sendRetrieveRsp(&req, &rsp);
|
sendRetrieveRsp(&req, &rsp);
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
taosMemoryFree(req.pRetrieve);
|
tDeleteStreamRetrieveReq(&req);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -64,8 +64,6 @@ typedef struct {
|
||||||
SSchemaWrapper* schema;
|
SSchemaWrapper* schema;
|
||||||
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
|
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
|
||||||
int8_t recoverStep;
|
int8_t recoverStep;
|
||||||
// bool recoverStep1Finished;
|
|
||||||
// bool recoverStep2Finished;
|
|
||||||
int8_t recoverScanFinished;
|
int8_t recoverScanFinished;
|
||||||
SQueryTableDataCond tableCond;
|
SQueryTableDataCond tableCond;
|
||||||
SVersionRange fillHistoryVer;
|
SVersionRange fillHistoryVer;
|
||||||
|
@ -84,7 +82,6 @@ struct SExecTaskInfo {
|
||||||
int64_t version; // used for stream to record wal version, why not move to sschemainfo
|
int64_t version; // used for stream to record wal version, why not move to sschemainfo
|
||||||
SStreamTaskInfo streamInfo;
|
SStreamTaskInfo streamInfo;
|
||||||
SArray* schemaInfos;
|
SArray* schemaInfos;
|
||||||
SSchemaInfo schemaInfo;
|
|
||||||
const char* sql; // query sql string
|
const char* sql; // query sql string
|
||||||
jmp_buf env; // jump to this position when error happens.
|
jmp_buf env; // jump to this position when error happens.
|
||||||
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
||||||
|
|
|
@ -1027,52 +1027,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
|
||||||
const char* id = GET_TASKID(pTaskInfo);
|
|
||||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
uint16_t type = pOperator->operatorType;
|
|
||||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
|
|
||||||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
|
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
|
||||||
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
|
|
||||||
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
|
|
||||||
pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
|
|
||||||
qInfo("%s restore stream agg executors param for interval: %d, %" PRId64, id, pInfo->twAggSup.calTrigger,
|
|
||||||
pInfo->twAggSup.deleteMark);
|
|
||||||
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
|
|
||||||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
|
|
||||||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
|
||||||
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
|
|
||||||
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
|
|
||||||
pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
|
|
||||||
qInfo("%s restore stream agg executor param for session: %d, %" PRId64, id, pInfo->twAggSup.calTrigger,
|
|
||||||
pInfo->twAggSup.deleteMark);
|
|
||||||
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
|
|
||||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
|
||||||
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
|
|
||||||
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
|
|
||||||
pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
|
|
||||||
qInfo("%s restore stream agg executor param for state: %d, %" PRId64, id, pInfo->twAggSup.calTrigger,
|
|
||||||
pInfo->twAggSup.deleteMark);
|
|
||||||
}
|
|
||||||
|
|
||||||
// iterate operator tree
|
|
||||||
if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
|
|
||||||
if (pOperator->numOfDownstream > 1) {
|
|
||||||
qError("unexpected stream, multiple downstream");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
pOperator = pOperator->pDownstream[0];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) {
|
bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
return pTaskInfo->streamInfo.recoverScanFinished;
|
return pTaskInfo->streamInfo.recoverScanFinished;
|
||||||
|
|
|
@ -109,8 +109,6 @@ void destroyStreamDataBlock(SStreamDataBlock* pBlock);
|
||||||
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
|
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
|
||||||
int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock);
|
int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock);
|
||||||
|
|
||||||
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
|
|
||||||
|
|
||||||
int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId);
|
int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId);
|
||||||
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
|
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
|
||||||
|
|
||||||
|
|
|
@ -162,6 +162,8 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
|
||||||
|
|
||||||
void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
|
void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
|
||||||
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
|
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
|
||||||
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
|
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
|
||||||
|
@ -174,7 +176,7 @@ void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
|
||||||
tmsgSendRsp(pRsp);
|
tmsgSendRsp(pRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req){
|
int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq* req) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList);
|
int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||||
|
|
|
@ -539,11 +539,6 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
|
||||||
return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
|
return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamRestoreParam(SStreamTask* pTask) {
|
|
||||||
stDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr);
|
|
||||||
return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
|
|
||||||
}
|
|
||||||
|
|
||||||
// source
|
// source
|
||||||
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow) {
|
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow) {
|
||||||
return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
|
return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
|
||||||
|
|
Loading…
Reference in New Issue