diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 1cbd7b042c..a7490ea422 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -46,6 +46,7 @@ typedef struct { void* streamBackendInit(const char* path); void streamBackendCleanup(void* arg); +int32_t streamBackendDoCheckpoint(int64_t rid, const char* cpPath); SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index df045eef20..bcbdd8fc21 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -41,6 +41,7 @@ typedef struct { } RocksdbCfInst; uint32_t nextPow2(uint32_t x) { + if (x <= 1) return 2; x = x - 1; x = x | (x >> 1); x = x | (x >> 2); @@ -189,6 +190,37 @@ void streamBackendCleanup(void* arg) { qDebug("destroy stream backend backend:%p", pHandle); return; } + +int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) { + int64_t st = taosGetTimestampMs(); + int32_t code = -1; + SBackendHandle* pHandle = taosAcquireRef(streamBackendId, backendRid); + if (pHandle == NULL) { + return -1; + } + qDebug("stream backend:%p start to do checkpoint at:%s ", pHandle, path); + + if (pHandle->db != NULL) { + char* err = NULL; + rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(pHandle->db, &err); + if (cp == NULL || err != NULL) { + taosMemoryFree(err); + qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, path, err); + } + rocksdb_checkpoint_create(cp, path, 64 << 20, &err); + if (err != NULL) { + taosMemoryFree(err); + qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, path, err); + } else { + code = 0; + qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, path, + taosGetTimestampMs() - st); + } + rocksdb_checkpoint_object_destroy(cp); + } + taosReleaseRef(streamBackendId, backendRid); + return code; +} SListNode* streamBackendAddCompare(void* backend, void* arg) { SBackendHandle* pHandle = (SBackendHandle*)backend; SListNode* node = NULL; @@ -810,7 +842,11 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t } int streamStateOpenBackend(void* backend, SStreamState* pState) { qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); - taosAcquireRef(streamBackendId, pState->streamBackendRid); + void* arg = taosAcquireRef(streamBackendId, pState->streamBackendRid); + if (arg == NULL) { + return -1; + } + SBackendHandle* handle = backend; sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ed9f99cf78..a572159af1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -89,6 +89,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } pMeta->streamBackend = streamBackendInit(streamPath); + + if (pMeta->streamBackend == NULL) { + goto _err; + } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); taosMemoryFree(streamPath); @@ -104,7 +108,6 @@ _err: if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); - // if (pMeta->streamBackend) streamBackendCleanup(pMeta->streamBackend); taosMemoryFree(pMeta); qError("failed to open stream meta"); return NULL; @@ -399,3 +402,17 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return 0; } + +int32_t streamDoCheckpoint(SStreamMeta* pMeta) { + int code = -1; + char buf[256]; + + sprintf(buf, "%s/%s", pMeta->path, "checkpoints"); + code = taosMulModeMkDir(buf, 0755); + if (code != 0) { + qError("failed to create chechpoint %s, reason:%s", buf, tstrerror(code)); + return code; + } + code = streamBackendDoCheckpoint(pMeta->streamBackendRid, buf); + return code; +} diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 967c7733c9..d9acbc31d6 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -120,6 +120,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz #ifdef USE_ROCKSDB SStreamMeta* pMeta = pStreamTask->pMeta; pState->streamBackendRid = pMeta->streamBackendRid; + int code = streamStateOpenBackend(pMeta->streamBackend, pState); if (code == -1) { taosReleaseRef(streamBackendId, pMeta->streamBackendRid);