diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index e6c3405d7f..b9ddecd3be 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -105,7 +105,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); +SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo); @@ -133,7 +133,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle); -SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle); SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle); diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 2db5ea2f1e..47e82314ad 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -468,7 +468,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) { - pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo); + pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) { SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode; pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); @@ -477,10 +477,10 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { int32_t children = 0; - pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); + pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) { int32_t children = pHandle->numOfVgroups; - pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); + pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6ec14bc218..55c1b89c30 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2959,7 +2959,7 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) { } SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, int32_t numOfChild) { + SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -5614,7 +5614,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo) { + SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) { SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index bfcedd2d53..2ab55687a2 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -417,8 +417,8 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { sprintf(checkpointPath, "%s/%s", pMeta->path, "checkpoints"); if (!taosDirExist(checkpointPath)) { - return 0; // no checkpoint, nothing to load + return 0; } TdDirPtr pDir = taosOpenDir(checkpointPath); @@ -1053,7 +1053,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); inst->dbOpt = handle->dbOpt; - rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); + //rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); } else { inst = *pInst; @@ -1174,7 +1174,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL); SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare); - rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); + //rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 50564a9070..455a91b6be 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -256,12 +256,12 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) { if (streamMetaCommit(pMeta) < 0) { taosWUnLockLatch(&pMeta->lock); - qError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64", since %s", - pMeta->vgId, checkpointId, terrstr()); + qError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId, + checkpointId, terrstr()); return -1; } else { taosWUnLockLatch(&pMeta->lock); - qInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%. DONE" PRId64, pMeta->vgId, checkpointId); + qInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b2dabd6356..6b3f917176 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -96,12 +96,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - pMeta->streamBackend = streamBackendInit(streamPath); - if (pMeta->streamBackend == NULL) { - goto _err; - } - - pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t)); @@ -109,6 +103,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->checkpointCap = 4; taosInitRWLatch(&pMeta->checkpointDirLock); + pMeta->streamBackend = streamBackendInit(streamPath); + if (pMeta->streamBackend == NULL) { + goto _err; + } + pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); + code = streamBackendLoadCheckpointInfo(pMeta); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code);