optimize stream operator checkpoint

This commit is contained in:
54liuyao 2024-05-16 16:24:58 +08:00
parent 41e8a6996f
commit 7b50663240
8 changed files with 158 additions and 55 deletions

View File

@ -446,7 +446,13 @@ typedef struct STimeWindowAggSupp {
SColumnInfoData timeWindowData; // query time window info for scalar function execution. SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp; } STimeWindowAggSupp;
typedef struct SSteamOpBasicInfo {
int32_t primaryPkIndex;
bool updateOperatorInfo;
} SSteamOpBasicInfo;
typedef struct SStreamScanInfo { typedef struct SStreamScanInfo {
SSteamOpBasicInfo basic;
SExprInfo* pPseudoExpr; SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr; int32_t numOfPseudoExpr;
SExprSupp tbnameCalSup; SExprSupp tbnameCalSup;
@ -568,10 +574,6 @@ typedef struct SOpCheckPointInfo {
SHashObj* children; // key:child id SHashObj* children; // key:child id
} SOpCheckPointInfo; } SOpCheckPointInfo;
typedef struct SSteamOpBasicInfo {
int32_t primaryPkIndex;
} SSteamOpBasicInfo;
typedef struct SStreamIntervalOperatorInfo { typedef struct SStreamIntervalOperatorInfo {
SOptrBasicInfo binfo; // basic info SOptrBasicInfo binfo; // basic info
SSteamOpBasicInfo basic; SSteamOpBasicInfo basic;

View File

@ -0,0 +1,32 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef STREAM_EXECUTORINT_H
#define STREAM_EXECUTORINT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "executorInt.h"
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type);
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo);
void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo);
#ifdef __cplusplus
}
#endif
#endif // STREAM_EXECUTORINT_H

View File

@ -20,6 +20,7 @@
#include "os.h" #include "os.h"
#include "querynodes.h" #include "querynodes.h"
#include "systable.h" #include "systable.h"
#include "streamexecutorInt.h"
#include "tname.h" #include "tname.h"
#include "tdatablock.h" #include "tdatablock.h"
@ -2426,10 +2427,13 @@ void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) {
if (!pInfo->pState) { if (!pInfo->pState) {
return; return;
} }
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
void* pBuf = NULL; void* pBuf = NULL;
int32_t len = streamScanOperatorEncode(pInfo, &pBuf); int32_t len = streamScanOperatorEncode(pInfo, &pBuf);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len);
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
saveStreamOperatorStateComplete(&pInfo->basic);
}
} }
// other properties are recovered from the execution plan // other properties are recovered from the execution plan
@ -2582,6 +2586,7 @@ FETCH_NEXT_BLOCK:
case STREAM_NORMAL: case STREAM_NORMAL:
case STREAM_GET_ALL: case STREAM_GET_ALL:
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
return pBlock; return pBlock;
case STREAM_RETRIEVE: { case STREAM_RETRIEVE: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
@ -2622,6 +2627,7 @@ FETCH_NEXT_BLOCK:
if (pInfo->pDeleteDataRes->info.rows > 0) { if (pInfo->pDeleteDataRes->info.rows > 0) {
printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo)); printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pInfo->pDeleteDataRes->info.type);
return pInfo->pDeleteDataRes; return pInfo->pDeleteDataRes;
} else { } else {
goto FETCH_NEXT_BLOCK; goto FETCH_NEXT_BLOCK;
@ -2639,6 +2645,7 @@ FETCH_NEXT_BLOCK:
if (pInfo->pDeleteDataRes->info.rows > 0) { if (pInfo->pDeleteDataRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo)); printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pInfo->pDeleteDataRes->info.type);
return pInfo->pDeleteDataRes; return pInfo->pDeleteDataRes;
} else { } else {
goto FETCH_NEXT_BLOCK; goto FETCH_NEXT_BLOCK;
@ -2652,6 +2659,7 @@ FETCH_NEXT_BLOCK:
break; break;
} }
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
return pBlock; return pBlock;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
qDebug("stream scan mode:%d, %s", pInfo->scanMode, id); qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
@ -2659,6 +2667,7 @@ FETCH_NEXT_BLOCK:
case STREAM_SCAN_FROM_RES: { case STREAM_SCAN_FROM_RES: {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes); doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
pInfo->pRes->info.dataLoad = 1; pInfo->pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
@ -2762,6 +2771,7 @@ FETCH_NEXT_BLOCK:
} }
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

View File

@ -17,6 +17,7 @@
#include "functionMgt.h" #include "functionMgt.h"
#include "operator.h" #include "operator.h"
#include "querytask.h" #include "querytask.h"
#include "streamexecutorInt.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tcommon.h" #include "tcommon.h"
#include "tdatablock.h" #include "tdatablock.h"
@ -415,6 +416,7 @@ void* doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) { void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
SStreamCountAggOperatorInfo* pInfo = pOperator->info; SStreamCountAggOperatorInfo* pInfo = pOperator->info;
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamCountEncodeOpState(NULL, 0, pOperator, true); int32_t len = doStreamCountEncodeOpState(NULL, 0, pOperator, true);
void* buf = taosMemoryCalloc(1, len); void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf; void* pBuf = buf;
@ -422,6 +424,8 @@ void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME, pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), buf, len); strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), buf, len);
taosMemoryFree(buf); taosMemoryFree(buf);
saveStreamOperatorStateComplete(&pInfo->basic);
}
} }
void doResetCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock) { void doResetCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock) {
@ -550,6 +554,7 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
break; break;
} }
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
bool add = pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator); bool add = pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator);

View File

@ -18,6 +18,7 @@
#include "functionMgt.h" #include "functionMgt.h"
#include "operator.h" #include "operator.h"
#include "querytask.h" #include "querytask.h"
#include "streamexecutorInt.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tcommon.h" #include "tcommon.h"
#include "tcompare.h" #include "tcompare.h"
@ -458,6 +459,7 @@ void* doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) { void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
SStreamEventAggOperatorInfo* pInfo = pOperator->info; SStreamEventAggOperatorInfo* pInfo = pOperator->info;
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator); int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator);
void* buf = taosMemoryCalloc(1, len); void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf; void* pBuf = buf;
@ -465,6 +467,8 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME, pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME,
strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), buf, len); strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), buf, len);
taosMemoryFree(buf); taosMemoryFree(buf);
saveStreamOperatorStateComplete(&pInfo->basic);
}
} }
static SSDataBlock* buildEventResult(SOperatorInfo* pOperator) { static SSDataBlock* buildEventResult(SOperatorInfo* pOperator) {
@ -531,6 +535,7 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
break; break;
} }
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) { pBlock->info.type == STREAM_CLEAR) {

View File

@ -0,0 +1,30 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type) {
if (type != STREAM_GET_ALL && type != STREAM_CHECKPOINT) {
pBasicInfo->updateOperatorInfo = true;
}
}
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo) {
return pBasicInfo->updateOperatorInfo;
}
void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo) {
pBasicInfo->updateOperatorInfo = false;
}

View File

@ -18,6 +18,7 @@
#include "functionMgt.h" #include "functionMgt.h"
#include "operator.h" #include "operator.h"
#include "querytask.h" #include "querytask.h"
#include "streamexecutorInt.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tcommon.h" #include "tcommon.h"
#include "tcompare.h" #include "tcompare.h"
@ -1211,6 +1212,7 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) { void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator); int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator);
void* buf = taosMemoryCalloc(1, len); void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf; void* pBuf = buf;
@ -1218,6 +1220,8 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) {
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len); strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len);
taosMemoryFree(buf); taosMemoryFree(buf);
saveStreamOperatorStateComplete(&pInfo->basic);
}
} }
static void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins) { static void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins) {
@ -1347,6 +1351,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
pInfo->numOfDatapack++; pInfo->numOfDatapack++;
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type; pInfo->binfo.pRes->info.type = pBlock->info.type;
@ -2690,6 +2695,7 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) { void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true); int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true);
void* buf = taosMemoryCalloc(1, len); void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf; void* pBuf = buf;
@ -2697,6 +2703,8 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len); strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len);
taosMemoryFree(buf); taosMemoryFree(buf);
saveStreamOperatorStateComplete(&pInfo->basic);
}
} }
void resetUnCloseSessionWinInfo(SSHashObj* winMap) { void resetUnCloseSessionWinInfo(SSHashObj* winMap) {
@ -2766,6 +2774,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
break; break;
} }
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) { pBlock->info.type == STREAM_CLEAR) {
@ -3176,6 +3185,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
break; break;
} }
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) { pBlock->info.type == STREAM_CLEAR) {
@ -3673,6 +3683,7 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) { void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true); int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true);
void* buf = taosMemoryCalloc(1, len); void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf; void* pBuf = buf;
@ -3680,6 +3691,8 @@ void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) {
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME,
strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len); strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len);
taosMemoryFree(buf); taosMemoryFree(buf);
saveStreamOperatorStateComplete(&pInfo->basic);
}
} }
static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) { static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) {
@ -3746,6 +3759,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
break; break;
} }
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) { pBlock->info.type == STREAM_CLEAR) {
@ -4069,6 +4083,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
pInfo->numOfDatapack++; pInfo->numOfDatapack++;
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) { pBlock->info.type == STREAM_CLEAR) {
@ -4465,6 +4480,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
} }
pInfo->numOfDatapack++; pInfo->numOfDatapack++;
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type; pInfo->binfo.pRes->info.type = pBlock->info.type;

View File

@ -557,7 +557,6 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
const int32_t BATCH_LIMIT = 256; const int32_t BATCH_LIMIT = 256;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
int32_t numOfElems = listNEles(pSnapshot);
SListNode* pNode = NULL; SListNode* pNode = NULL;
int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName); int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
@ -589,8 +588,11 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
} }
taosMemoryFree(buf); taosMemoryFree(buf);
if (streamStateGetBatchSize(batch) > 0) { int32_t numOfElems = streamStateGetBatchSize(batch);
if (numOfElems > 0) {
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
} else {
goto _end;
} }
streamStateClearBatch(batch); streamStateClearBatch(batch);
@ -609,6 +611,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
} }
_end:
streamStateDestroyBatch(batch); streamStateDestroyBatch(batch);
return code; return code;
} }