Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2
This commit is contained in:
commit
17debe6a28
|
@ -274,6 +274,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
|||
tagArray = taosArrayDestroy(tagArray);
|
||||
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
|
||||
crTblArray = NULL;
|
||||
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
|
||||
continue;
|
||||
} else {
|
||||
SSubmitTbData tbData = {0};
|
||||
tqDebug("tq sink pipe, convert block:%d, rows:%d", i, rows);
|
||||
|
|
|
@ -372,6 +372,7 @@ typedef struct SStreamScanInfo {
|
|||
void* pState; // void
|
||||
SStoreTqReader readerFn;
|
||||
SStateStore stateStore;
|
||||
SSDataBlock* pCheckpointRes;
|
||||
} SStreamScanInfo;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -1769,8 +1769,9 @@ int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) {
|
|||
|
||||
void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) {
|
||||
void* pBuf = NULL;
|
||||
int32_t len = streamScanOperatorEncode(pInfo, pBuf);
|
||||
int32_t len = streamScanOperatorEncode(pInfo, &pBuf);
|
||||
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len);
|
||||
taosMemoryFree(pBuf);
|
||||
}
|
||||
|
||||
// other properties are recovered from the execution plan
|
||||
|
@ -2185,10 +2186,9 @@ FETCH_NEXT_BLOCK:
|
|||
if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||
streamScanOperatorSaveCheckpoint(pInfo);
|
||||
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
||||
pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
|
||||
}
|
||||
printDataBlock(pBlock, "stream scan ck");
|
||||
return pBlock;
|
||||
return pInfo->pCheckpointRes;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -2356,6 +2356,8 @@ static void destroyStreamScanOperatorInfo(void* param) {
|
|||
blockDataDestroy(pStreamScan->pUpdateDataRes);
|
||||
blockDataDestroy(pStreamScan->pCreateTbRes);
|
||||
taosArrayDestroy(pStreamScan->pBlockLists);
|
||||
blockDataDestroy(pStreamScan->pCheckpointRes);
|
||||
|
||||
taosMemoryFree(pStreamScan);
|
||||
}
|
||||
|
||||
|
@ -2573,6 +2575,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pInfo->pState = NULL;
|
||||
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||
pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;
|
||||
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||
|
||||
// for stream
|
||||
if (pTaskInfo->streamInfo.pState) {
|
||||
|
|
|
@ -2673,6 +2673,7 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) {
|
|||
len = doStreamIntervalEncodeOpState(&pBuf, pOperator);
|
||||
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
|
||||
strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len);
|
||||
taosMemoryFree(buf);
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||
|
@ -2828,7 +2829,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64,
|
||||
IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack);
|
||||
pInfo->numOfDatapack = 0;
|
||||
break;
|
||||
continue;
|
||||
} else {
|
||||
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||
}
|
||||
|
@ -3953,7 +3954,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
|
||||
setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId);
|
||||
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||
break;
|
||||
continue;
|
||||
} else {
|
||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||
}
|
||||
|
@ -4255,7 +4256,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
|||
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
|
||||
setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId);
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
break;
|
||||
continue;
|
||||
} else {
|
||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||
}
|
||||
|
@ -4675,6 +4676,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
return pBlock;
|
||||
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||
doStreamSessionSaveCheckpoint(pOperator);
|
||||
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
|
||||
setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId);
|
||||
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||
continue;
|
||||
} else {
|
||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||
}
|
||||
|
@ -5543,7 +5550,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||
qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack);
|
||||
pInfo->numOfDatapack = 0;
|
||||
break;
|
||||
continue;
|
||||
} else {
|
||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue