diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 602b3035b9..37c3683edb 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -942,11 +942,11 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) { if (pIter == NULL) break; maxChkpId = TMAX(maxChkpId, pStream->checkpointId); - mError("stream %p checkpoint %" PRId64 "", pStream, pStream->checkpointId); + mDebug("stream %p checkpoint %" PRId64 "", pStream, pStream->checkpointId); sdbRelease(pSdb, pStream); } - mError("generated checkpoint %" PRId64 "", maxChkpId + 1); + mDebug("generated checkpoint %" PRId64 "", maxChkpId + 1); return maxChkpId + 1; } diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index a73c289440..bed0f79f02 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -139,7 +139,8 @@ void streamBackendDelCompare(void* backend, void* arg); int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst); STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId); -void taskDbDestroy(void* pBackend); +void taskDbDestroy(void* pBackend, bool flush); +void taskDbDestroy2(void* pBackend); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); @@ -217,7 +218,7 @@ int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** p int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key); int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); void* streamDefaultIterCreate_rocksdb(SStreamState* pState); -bool streamDefaultIterValid_rocksdb(void* iter); +bool streamDefaultIterValid_rocksdb(void* iter); void streamDefaultIterSeek_rocksdb(void* iter, const char* key); void streamDefaultIterNext_rocksdb(void* iter); char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len); @@ -245,8 +246,8 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); -STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId); -void taskDbDestroy(void* pDb); +// STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId); +// void taskDbDestroy(void* pDb, bool flush); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index dc61bd25f2..ccf166c390 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -190,12 +190,12 @@ int32_t getCfIdx(const char* cfName) { } bool isValidCheckpoint(const char* dir) { - // return true; + return true; STaskDbWrapper* pDb = taskDbOpenImpl(NULL, NULL, (char*)dir); if (pDb == NULL) { - return true; + return false; } - taskDbDestroy(pDb); + taskDbDestroy(pDb, false); return true; } @@ -1788,7 +1788,7 @@ STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath) { return pTaskDb; _EXIT: - taskDbDestroy(pTaskDb); + taskDbDestroy(pTaskDb, false); if (err) taosMemoryFree(err); if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf); return NULL; @@ -1807,7 +1807,7 @@ STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) { return pTaskDb; } -void taskDbDestroy(void* pDb) { +void taskDbDestroy(void* pDb, bool flush) { STaskDbWrapper* wrapper = pDb; qDebug("succ to destroy stream backend:%p", wrapper); @@ -1815,24 +1815,33 @@ void taskDbDestroy(void* pDb) { if (wrapper == NULL) return; - if (wrapper->db && wrapper->pCf) { - rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); - rocksdb_flushoptions_set_wait(flushOpt, 1); + if (flush) { + if (wrapper->db && wrapper->pCf) { + rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + rocksdb_flushoptions_set_wait(flushOpt, 1); - char* err = NULL; - for (int i = 0; i < nCf; i++) { - if (wrapper->pCf[i] != NULL) rocksdb_flush_cf(wrapper->db, flushOpt, wrapper->pCf[i], &err); - if (err != NULL) { - stError("failed to flush cf:%s, reason:%s", ginitDict[i].key, err); - taosMemoryFreeClear(err); + char* err = NULL; + rocksdb_column_family_handle_t** cfs = taosMemoryCalloc(1, sizeof(rocksdb_column_family_handle_t*) * nCf); + int numOfFlushCf = 0; + for (int i = 0; i < nCf; i++) { + if (wrapper->pCf[i] != NULL) { + cfs[numOfFlushCf++] = wrapper->pCf[i]; + } } + if (numOfFlushCf != 0) { + rocksdb_flush_cfs(wrapper->db, flushOpt, cfs, numOfFlushCf, &err); + if (err != NULL) { + stError("failed to flush all cfs, reason:%s", err); + taosMemoryFreeClear(err); + } + } + taosMemoryFree(cfs); + rocksdb_flushoptions_destroy(flushOpt); } - rocksdb_flushoptions_destroy(flushOpt); - - for (int i = 0; i < nCf; i++) { - if (wrapper->pCf[i] != NULL) { - rocksdb_column_family_handle_destroy(wrapper->pCf[i]); - } + } + for (int i = 0; i < nCf; i++) { + if (wrapper->pCf[i] != NULL) { + rocksdb_column_family_handle_destroy(wrapper->pCf[i]); } } rocksdb_options_destroy(wrapper->dbOpt); @@ -1869,6 +1878,8 @@ void taskDbDestroy(void* pDb) { return; } +void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); } + int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) { int64_t st = taosGetTimestampMs(); int32_t code = -1; @@ -2007,7 +2018,7 @@ int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) { } _EXIT: - taskDbDestroy(pTaskDb); + taskDbDestroy(pTaskDb, true); return code; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 78b4814aa6..0e137a673e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -57,7 +57,7 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid); static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup); - taskDbWrapperId = taosOpenRef(64, taskDbDestroy); + taskDbWrapperId = taosOpenRef(64, taskDbDestroy2); streamMetaId = taosOpenRef(64, streamMetaCloseImpl); @@ -1239,8 +1239,8 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { void streamMetaNotifyClose(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; - stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", vgId, - (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); + stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", + vgId, (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); streamMetaWLock(pMeta);