stream operator checkpoint

This commit is contained in:
liuyao 2023-07-10 11:07:08 +08:00
parent 5940bbfb33
commit 84f0dfd6d4
4 changed files with 24 additions and 18 deletions

View File

@ -274,6 +274,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tagArray = taosArrayDestroy(tagArray); tagArray = taosArrayDestroy(tagArray);
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq); taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
crTblArray = NULL; crTblArray = NULL;
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
continue;
} else { } else {
SSubmitTbData tbData = {0}; SSubmitTbData tbData = {0};
tqDebug("tq sink pipe, convert block:%d, rows:%d", i, rows); tqDebug("tq sink pipe, convert block:%d, rows:%d", i, rows);

View File

@ -327,24 +327,24 @@ typedef struct SStreamScanInfo {
SExprSupp tagCalSup; SExprSupp tagCalSup;
int32_t primaryTsIndex; // primary time stamp slot id int32_t primaryTsIndex; // primary time stamp slot id
SReadHandle readHandle; SReadHandle readHandle;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
SArray* pBlockLists; // multiple SSDatablock. SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock SSDataBlock* pRes; // result SSDataBlock
SSDataBlock* pUpdateRes; // update SSDataBlock SSDataBlock* pUpdateRes; // update SSDataBlock
int32_t updateResIndex; int32_t updateResIndex;
int32_t blockType; // current block type int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned? int32_t validBlockIndex; // Is current data has returned?
uint64_t numOfExec; // execution times uint64_t numOfExec; // execution times
STqReader* tqReader; STqReader* tqReader;
uint64_t groupId; uint64_t groupId;
struct SUpdateInfo* pUpdateInfo; struct SUpdateInfo* pUpdateInfo;
EStreamScanMode scanMode; EStreamScanMode scanMode;
struct SOperatorInfo* pStreamScanOp; struct SOperatorInfo* pStreamScanOp;
struct SOperatorInfo* pTableScanOp; struct SOperatorInfo* pTableScanOp;
SArray* childIds; SArray* childIds;
SWindowSupporter windowSup; SWindowSupporter windowSup;
SPartitionBySupporter partitionSup; SPartitionBySupporter partitionSup;
@ -366,12 +366,13 @@ typedef struct SStreamScanInfo {
int32_t blockRecoverTotCnt; int32_t blockRecoverTotCnt;
SSDataBlock* pRecoverRes; SSDataBlock* pRecoverRes;
SSDataBlock* pCreateTbRes; SSDataBlock* pCreateTbRes;
int8_t igCheckUpdate; int8_t igCheckUpdate;
int8_t igExpired; int8_t igExpired;
void* pState; //void void* pState; // void
SStoreTqReader readerFn; SStoreTqReader readerFn;
SStateStore stateStore; SStateStore stateStore;
SSDataBlock* pCheckpointRes;
} SStreamScanInfo; } SStreamScanInfo;
typedef struct { typedef struct {

View File

@ -1769,7 +1769,7 @@ int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) {
void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) { void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) {
void* pBuf = NULL; void* pBuf = NULL;
int32_t len = streamScanOperatorEncode(pInfo, pBuf); int32_t len = streamScanOperatorEncode(pInfo, &pBuf);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len);
} }
@ -2188,7 +2188,7 @@ FETCH_NEXT_BLOCK:
pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
} }
printDataBlock(pBlock, "stream scan ck"); printDataBlock(pBlock, "stream scan ck");
return pBlock; return pInfo->pCheckpointRes;
} }
return NULL; return NULL;
@ -2356,6 +2356,8 @@ static void destroyStreamScanOperatorInfo(void* param) {
blockDataDestroy(pStreamScan->pUpdateDataRes); blockDataDestroy(pStreamScan->pUpdateDataRes);
blockDataDestroy(pStreamScan->pCreateTbRes); blockDataDestroy(pStreamScan->pCreateTbRes);
taosArrayDestroy(pStreamScan->pBlockLists); taosArrayDestroy(pStreamScan->pBlockLists);
blockDataDestroy(pStreamScan->pCheckpointRes);
taosMemoryFree(pStreamScan); taosMemoryFree(pStreamScan);
} }
@ -2573,6 +2575,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pState = NULL; pInfo->pState = NULL;
pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn; pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
// for stream // for stream
if (pTaskInfo->streamInfo.pState) { if (pTaskInfo->streamInfo.pState) {

View File

@ -2828,7 +2828,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, qDebug("===stream===return data:%s. recv datablock num:%" PRIu64,
IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack); IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack);
pInfo->numOfDatapack = 0; pInfo->numOfDatapack = 0;
break; continue;
} else { } else {
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
} }
@ -3953,7 +3953,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
pAggSup->stateStore.streamStateCommit(pAggSup->pState); pAggSup->stateStore.streamStateCommit(pAggSup->pState);
setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId); setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId);
copyDataBlock(pInfo->pCheckpointRes, pBlock); copyDataBlock(pInfo->pCheckpointRes, pBlock);
break; continue;
} else { } else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
} }
@ -4255,7 +4255,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
pAggSup->stateStore.streamStateCommit(pAggSup->pState); pAggSup->stateStore.streamStateCommit(pAggSup->pState);
setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId); setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId);
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
break; continue;
} else { } else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
} }
@ -5543,7 +5543,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
copyDataBlock(pInfo->pCheckpointRes, pBlock); copyDataBlock(pInfo->pCheckpointRes, pBlock);
qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack); qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack);
pInfo->numOfDatapack = 0; pInfo->numOfDatapack = 0;
break; continue;
} else { } else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
} }