delete invalid code

This commit is contained in:
liuyao 2023-07-11 19:21:27 +08:00
parent 503d7540f7
commit 6298f17c45
5 changed files with 0 additions and 27 deletions

View File

@ -99,9 +99,6 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
// todo refactor
void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId);
/** /**
* Set multiple input data blocks for the stream scan. * Set multiple input data blocks for the stream scan.
* @param tinfo * @param tinfo

View File

@ -69,8 +69,6 @@ typedef struct {
SVersionRange fillHistoryVer; SVersionRange fillHistoryVer;
STimeWindow fillHistoryWindow; STimeWindow fillHistoryWindow;
SStreamState* pState; SStreamState* pState;
int64_t dataVersion;
int64_t checkPointId;
} SStreamTaskInfo; } SStreamTaskInfo;
struct SExecTaskInfo { struct SExecTaskInfo {

View File

@ -223,12 +223,6 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
return code; return code;
} }
void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId) {
SExecTaskInfo* pTaskInfo = tinfo;
*dataVer = pTaskInfo->streamInfo.dataVersion;
*ckId = pTaskInfo->streamInfo.checkPointId;
}
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;

View File

@ -2318,11 +2318,6 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
return startPos; return startPos;
} }
static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int64_t ckId) {
pTaskInfo->streamInfo.dataVersion = version;
pTaskInfo->streamInfo.checkPointId = ckId;
}
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId, static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
SSHashObj* pUpdatedMap) { SSHashObj* pUpdatedMap) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info; SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
@ -2823,7 +2818,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_CHECKPOINT) { } else if (pBlock->info.type == STREAM_CHECKPOINT) {
doStreamIntervalSaveCheckpoint(pOperator); doStreamIntervalSaveCheckpoint(pOperator);
pAPI->stateStore.streamStateCommit(pInfo->pState); pAPI->stateStore.streamStateCommit(pInfo->pState);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
copyDataBlock(pInfo->pCheckpointRes, pBlock); copyDataBlock(pInfo->pCheckpointRes, pBlock);
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, qDebug("===stream===return data:%s. recv datablock num:%" PRIu64,
@ -3086,7 +3080,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamIntervalDecodeOpState(buff, pOperator); doStreamIntervalDecodeOpState(buff, pOperator);
taosMemoryFree(buff); taosMemoryFree(buff);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
} }
return pOperator; return pOperator;
@ -3953,7 +3946,6 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_CHECKPOINT) { } else if (pBlock->info.type == STREAM_CHECKPOINT) {
doStreamSessionSaveCheckpoint(pOperator); doStreamSessionSaveCheckpoint(pOperator);
pAggSup->stateStore.streamStateCommit(pAggSup->pState); pAggSup->stateStore.streamStateCommit(pAggSup->pState);
setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId);
copyDataBlock(pInfo->pCheckpointRes, pBlock); copyDataBlock(pInfo->pCheckpointRes, pBlock);
continue; continue;
} else { } else {
@ -4154,7 +4146,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamSessionDecodeOpState(buff, pOperator); doStreamSessionDecodeOpState(buff, pOperator);
taosMemoryFree(buff); taosMemoryFree(buff);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId);
} }
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
@ -4256,7 +4247,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_CHECKPOINT) { } else if (pBlock->info.type == STREAM_CHECKPOINT) {
doStreamSessionSaveCheckpoint(pOperator); doStreamSessionSaveCheckpoint(pOperator);
pAggSup->stateStore.streamStateCommit(pAggSup->pState); pAggSup->stateStore.streamStateCommit(pAggSup->pState);
setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId);
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
continue; continue;
} else { } else {
@ -4681,7 +4671,6 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_CHECKPOINT) { } else if (pBlock->info.type == STREAM_CHECKPOINT) {
doStreamSessionSaveCheckpoint(pOperator); doStreamSessionSaveCheckpoint(pOperator);
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId);
copyDataBlock(pInfo->pCheckpointRes, pBlock); copyDataBlock(pInfo->pCheckpointRes, pBlock);
continue; continue;
} else { } else {
@ -4878,7 +4867,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamStateDecodeOpState(buff, pOperator); doStreamStateDecodeOpState(buff, pOperator);
taosMemoryFree(buff); taosMemoryFree(buff);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId);
} }
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
@ -5548,7 +5536,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_CHECKPOINT) { } else if (pBlock->info.type == STREAM_CHECKPOINT) {
doStreamIntervalSaveCheckpoint(pOperator); doStreamIntervalSaveCheckpoint(pOperator);
pAPI->stateStore.streamStateCommit(pInfo->pState); pAPI->stateStore.streamStateCommit(pInfo->pState);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
pInfo->reCkBlock = true; pInfo->reCkBlock = true;
copyDataBlock(pInfo->pCheckpointRes, pBlock); copyDataBlock(pInfo->pCheckpointRes, pBlock);
qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack); qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack);
@ -5735,7 +5722,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamIntervalDecodeOpState(buff, pOperator); doStreamIntervalDecodeOpState(buff, pOperator);
taosMemoryFree(buff); taosMemoryFree(buff);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
} }
initIntervalDownStream(downstream, pPhyNode->type, pInfo); initIntervalDownStream(downstream, pPhyNode->type, pInfo);

View File

@ -18,8 +18,6 @@
// maximum allowed processed block batches. One block may include several submit blocks // maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 #define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
static int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId);
bool streamTaskShouldStop(const SStreamStatus* pStatus) { bool streamTaskShouldStop(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);