delete expired check point
This commit is contained in:
parent
4625dd98f0
commit
9be89ce00a
|
@ -65,6 +65,7 @@ void streamStateClose(SStreamState* pState);
|
|||
int32_t streamStateBegin(SStreamState* pState);
|
||||
int32_t streamStateCommit(SStreamState* pState);
|
||||
void streamStateDestroy(SStreamState* pState);
|
||||
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark);
|
||||
|
||||
typedef struct {
|
||||
rocksdb_iterator_t* iter;
|
||||
|
|
|
@ -54,7 +54,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pS
|
|||
int32_t recoverSnapshot(SStreamFileState* pFileState);
|
||||
|
||||
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
|
||||
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId);
|
||||
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -2151,7 +2151,7 @@ FETCH_NEXT_BLOCK:
|
|||
// record the scan action.
|
||||
pInfo->numOfExec++;
|
||||
pOperator->resultInfo.totalRows += pBlockInfo->rows;
|
||||
// printDataBlock(pInfo->pRes, "stream scan");
|
||||
printDataBlock(pInfo->pRes, "stream scan");
|
||||
|
||||
qDebug("scan rows: %" PRId64, pBlockInfo->rows);
|
||||
if (pBlockInfo->rows > 0) {
|
||||
|
|
|
@ -1454,10 +1454,6 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
|
|||
return w;
|
||||
}
|
||||
|
||||
static void deleteIntervalDiscBuf(SStreamState* pState, TSKEY mark) {
|
||||
//todo(liuyao) delete expired check point
|
||||
}
|
||||
|
||||
static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren, TSKEY maxTs) {
|
||||
int32_t size = taosArrayGetSize(pChildren);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
|
@ -2404,6 +2400,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
|
|||
STimeWindow nextWin = {0};
|
||||
if (IS_FINAL_OP(pInfo)) {
|
||||
nextWin = getFinalTimeWindow(ts, &pInfo->interval);
|
||||
qDebug("===stream===final ts:%" PRId64 ", getFinalTimeWindow:%" PRId64,ts, nextWin.skey);
|
||||
} else {
|
||||
nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
|
||||
}
|
||||
|
@ -2448,7 +2445,9 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
|
|||
}
|
||||
|
||||
if (ignore) {
|
||||
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
|
||||
// startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
|
||||
int32_t prevEndPos = startPos;
|
||||
startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos);
|
||||
if (startPos < 0) {
|
||||
break;
|
||||
}
|
||||
|
@ -2456,6 +2455,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
|
|||
}
|
||||
}
|
||||
|
||||
qDebug("===stream===final setIntervalOutputBuf:%" PRId64, nextWin.skey);
|
||||
int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput,
|
||||
pSup->rowEntryInfoOffset, &pInfo->aggSup);
|
||||
pResult = (SResultRow*)pResPos->pRowBuff;
|
||||
|
@ -2474,6 +2474,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
|
|||
.groupId = groupId,
|
||||
};
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
|
||||
qDebug("===stream===is final%d saveWinResult:%" PRId64, IS_FINAL_OP(pInfo), key.ts);
|
||||
saveWinResult(&key, pResPos, pUpdatedMap);
|
||||
}
|
||||
|
||||
|
@ -2575,9 +2576,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
|
||||
qDebug("===stream===clear semi operator");
|
||||
} else {
|
||||
deleteIntervalDiscBuf(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
|
||||
if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
|
||||
streamStateCommit(pInfo->pState);
|
||||
streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
|
||||
pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs;
|
||||
}
|
||||
qDebug("===stream===interval final close");
|
||||
|
@ -4813,10 +4814,10 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
printDataBlock(pInfo->binfo.pRes, "single interval");
|
||||
return pInfo->binfo.pRes;
|
||||
}
|
||||
deleteIntervalDiscBuf(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
|
||||
setOperatorCompleted(pOperator);
|
||||
if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
|
||||
streamStateCommit(pInfo->pState);
|
||||
streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
|
||||
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
|
||||
pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs;
|
||||
}
|
||||
|
|
|
@ -1072,6 +1072,14 @@ void streamStateDestroy(SStreamState* pState) {
|
|||
taosMemoryFreeClear(pState);
|
||||
}
|
||||
|
||||
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
|
||||
#ifdef USE_ROCKSDB
|
||||
return deleteExpiredCheckPoint(pState->pFileState, mark);
|
||||
#else
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
#if 0
|
||||
char* streamStateSessionDump(SStreamState* pState) {
|
||||
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||
|
|
|
@ -372,17 +372,20 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
|||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
|
||||
const char* taskKey = "streamFileState";
|
||||
char keyBuf[128] = {0};
|
||||
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, checkpointId);
|
||||
return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
|
||||
}
|
||||
|
||||
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) {
|
||||
const char* taskKey = "streamFileState";
|
||||
return streamDefaultIter_rocksdb(pFileState->pFileStore, taskKey, NULL, list);
|
||||
}
|
||||
int32_t recoverSnapshot(SStreamFileState* pFileState) {
|
||||
|
||||
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
const char* taskKey = "streamFileState";
|
||||
int64_t maxCheckPointId = 0;
|
||||
|
@ -410,13 +413,18 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
|
|||
TSKEY ts;
|
||||
sscanf(val, "%" PRId64 "", &ts);
|
||||
taosMemoryFree(val);
|
||||
if (ts < pFileState->deleteMark) {
|
||||
if (ts < mark) {
|
||||
forceRemoveCheckpoint(pFileState, i);
|
||||
break;
|
||||
} else {
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t recoverSnapshot(SStreamFileState* pFileState) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark);
|
||||
void* pStVal = NULL;
|
||||
int32_t len = 0;
|
||||
|
||||
|
|
Loading…
Reference in New Issue