From 19e52c364e8ecfc496686ea743fd69a2dc512709 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 22 Sep 2023 08:26:03 +0000 Subject: [PATCH] add cvt state --- source/libs/stream/inc/streamBackendRocksdb.h | 23 +-- source/libs/stream/src/streamBackendRocksdb.c | 133 ++++++++++++------ source/libs/stream/src/streamMeta.c | 20 ++- 3 files changed, 117 insertions(+), 59 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 5bcc29287d..3e07f2532e 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -51,8 +51,8 @@ typedef struct { typedef struct { rocksdb_t* db; - rocksdb_writeoptions_t* writeOpts; - rocksdb_readoptions_t* readOpts; + rocksdb_writeoptions_t* writeOpt; + rocksdb_readoptions_t* readOpt; rocksdb_options_t* dbOpt; rocksdb_env_t* env; rocksdb_cache_t* cache; @@ -63,18 +63,19 @@ typedef struct { RocksdbCfParam* pCfParams; rocksdb_compactionfilterfactory_t* filterFactory; - TdThreadMutex cfMutex; + TdThreadMutex mutex; } STaskBackendWrapper; -void* streamBackendInit(const char* path, int64_t chkpId); -void streamBackendCleanup(void* arg); -void streamBackendHandleCleanup(void* arg); -int32_t streamBackendLoadCheckpointInfo(void* pMeta); -int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId); -SListNode* streamBackendAddCompare(void* backend, void* arg); -void streamBackendDelCompare(void* backend, void* arg); -int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst); +void* streamBackendInit(const char* path, int64_t chkpId); +void streamBackendCleanup(void* arg); +void streamBackendHandleCleanup(void* arg); +int32_t streamBackendLoadCheckpointInfo(void* pMeta); +int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId); +SListNode* streamBackendAddCompare(void* backend, void* arg); +void streamBackendDelCompare(void* backend, void* arg); +int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst); +STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key); int streamStateOpenBackend(void* backend, SStreamState* pState); void streamStateCloseBackend(SStreamState* pState, bool remove); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 7feb029e81..ff53ec6c51 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1457,6 +1457,15 @@ STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key) { char* taskPath = taosMemoryCalloc(1, strlen(path) + 128); sprintf(taskPath, "%s%s%s%s%s", path, TD_DIRSEP, "state", TD_DIRSEP, key); + if (!taosDirExist(taskPath)) { + code = taosMkDir(taskPath); + if (code != 0) { + qError("failed to create dir: %s, reason:%s", taskPath, tstrerror(code)); + taosMemoryFree(taskPath); + return NULL; + } + } + char* err = NULL; STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(SBackendWrapper)); rocksdb_env_t* env = rocksdb_create_default_env(); @@ -1474,33 +1483,16 @@ STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key) { // rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2); rocksdb_options_set_atomic_flush(opts, 1); - pTaskBackend->env = env; pTaskBackend->dbOpt = opts; + pTaskBackend->env = env; pTaskBackend->cache = cache; pTaskBackend->filterFactory = rocksdb_compactionfilterfactory_create( NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName); rocksdb_options_set_compaction_filter_factory(pTaskBackend->dbOpt, pTaskBackend->filterFactory); + pTaskBackend->readOpt = rocksdb_readoptions_create(); + pTaskBackend->writeOpt = rocksdb_writeoptions_create(); - char* err = NULL; - size_t nCf = 0; - - char** cfs = rocksdb_list_column_families(opts, taskPath, &nCf, &err); - if (nCf == 0 || nCf == 1 || err != NULL) { - taosMemoryFreeClear(err); - pTaskBackend->db = rocksdb_open(opts, taskPath, &err); - if (err != NULL) { - qError("failed to open rocksdb, path:%s, reason:%s", taskPath, err); - taosMemoryFreeClear(err); - } - } else { - ASSERT(0); - } - if (cfs != NULL) { - rocksdb_list_column_families_destroy(cfs, nCf); - } - qDebug("succ to init stream backend at %s, backend:%p", taskPath, pTaskBackend); - - nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); + size_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); pTaskBackend->pCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); pTaskBackend->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam)); pTaskBackend->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*)); @@ -1528,8 +1520,65 @@ STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key) { pTaskBackend->pCfParams[i].tableOpt = tableOpt; } + char** cfs = rocksdb_list_column_families(opts, taskPath, &nCf, &err); + if (nCf == 0 || nCf == 1 || err != NULL) { + taosMemoryFreeClear(err); + pTaskBackend->db = rocksdb_open(opts, taskPath, &err); + if (err != NULL) { + qError("failed to open rocksdb, path:%s, reason:%s", taskPath, err); + taosMemoryFreeClear(err); + } + } else { + } + if (cfs != NULL) { + rocksdb_list_column_families_destroy(cfs, nCf); + } + + taosThreadMutexInit(&pTaskBackend->mutex, NULL); + taosMemoryFree(taskPath); + + qDebug("succ to init stream backend at %s, backend:%p", taskPath, pTaskBackend); return pTaskBackend; } +void streamMeteCloseTaskBackend(STaskBackendWrapper* wrapper) { + if (wrapper == NULL) return; + + rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + rocksdb_flushoptions_set_wait(flushOpt, 1); + + int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); + char* err = NULL; + for (int i = 0; i < nCf; i++) { + if (wrapper->pCf[i] != NULL) rocksdb_flush_cf(wrapper->db, flushOpt, wrapper->pCf[i], &err); + if (err != NULL) { + qError("failed to flush cf:%s, reason:%s", ginitDict[i].key, err); + taosMemoryFreeClear(err); + } + rocksdb_flushoptions_destroy(flushOpt); + } + for (int i = 0; i < nCf; i++) { + if (wrapper->pCf[i] != NULL) { + rocksdb_column_family_handle_destroy(wrapper->pCf[i]); + } + } + rocksdb_options_destroy(wrapper->dbOpt); + rocksdb_readoptions_destroy(wrapper->readOpt); + rocksdb_writeoptions_destroy(wrapper->writeOpt); + rocksdb_env_destroy(wrapper->env); + rocksdb_cache_destroy(wrapper->cache); + + taosMemoryFree(wrapper->pCf); + taosMemoryFree(wrapper->pCompares); + taosMemoryFree(wrapper->pCfOpts); + taosMemoryFree(wrapper->pCfParams); + + taosThreadMutexDestroy(&wrapper->mutex); + + rocksdb_close(wrapper->db); + taosMemoryFree(wrapper); + + return; +} int8_t getCfIdx(const char* key) { int idx = -1; @@ -1542,21 +1591,25 @@ int8_t getCfIdx(const char* key) { } return idx; } -rocksdb_column_family_handle_t* taskBackendOpenCf(STaskBackendWrapper* pBackend, const char* key) { - char* err = NULL; - int8_t idx = getCfIdx(key); - if (idx == -1) return NULL; +int32_t taskBackendOpenCf(STaskBackendWrapper* pBackend, const char* key) { + int32_t code = 0; + char* err = NULL; + int8_t idx = getCfIdx(key); + if (idx == -1) return -1; rocksdb_column_family_handle_t* cf = rocksdb_create_column_family(pBackend->db, pBackend->pCfOpts[idx], ginitDict[idx].key, &err); if (err != NULL) { qError("failed to open cf, key:%s, reason: %s", key, err); taosMemoryFree(err); - return NULL; + code = -1; + return code; } - return cf; + + pBackend->pCf[idx] = cf; + return code; } -int32_t copyData(RocksdbCfInst* pSrc, STaskBackendWrapper* pDst, int8_t i) { +int32_t copyDataAt(RocksdbCfInst* pSrc, STaskBackendWrapper* pDst, int8_t i) { int32_t WRITE_BATCH = 1024; char* err = NULL; int code = 0; @@ -1568,7 +1621,7 @@ int32_t copyData(RocksdbCfInst* pSrc, STaskBackendWrapper* pDst, int8_t i) { rocksdb_iter_seek_to_first(pIter); while (rocksdb_iter_valid(pIter)) { if (rocksdb_writebatch_count(wb) >= WRITE_BATCH) { - rocksdb_write(pDst->db, pDst->writeOpts, wb, &err); + rocksdb_write(pDst->db, pDst->writeOpt, wb, &err); if (err != NULL) { code = -1; goto _EXIT; @@ -1585,7 +1638,7 @@ int32_t copyData(RocksdbCfInst* pSrc, STaskBackendWrapper* pDst, int8_t i) { } if (rocksdb_writebatch_count(wb) > 0) { - rocksdb_write(pDst->db, pDst->writeOpts, wb, &err); + rocksdb_write(pDst->db, pDst->writeOpt, wb, &err); if (err != NULL) { code = -1; goto _EXIT; @@ -1613,19 +1666,17 @@ int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst) { rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i]; if (pSrcCf == NULL) continue; - rocksdb_column_family_handle_t* pDstCf = taskBackendOpenCf(pTaskBackend, ginitDict[i].key); - if (pDstCf == NULL) { - return -1; - } + code = taskBackendOpenCf(pTaskBackend, ginitDict[i].key); + if (code != 0) goto _EXIT; - code = copyData(pSrcBackend, pTaskBackend, i); - if (code != 0) { - return -1; - } - - pTaskBackend->pCf[i] = pDstCf; + code = copyDataAt(pSrcBackend, pTaskBackend, i); + if (code != 0) goto _EXIT; } - return 0; + +_EXIT: + streamMeteCloseTaskBackend(pTaskBackend); + + return code; } int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf) { SBackendWrapper* handle = backend; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f47961d026..d1f59d0a8f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -181,17 +181,20 @@ int32_t streamMetaCheckStateCompatible(SStreamMeta* pMeta) { } int32_t streamMetaDoStateDataConvertImpl(SStreamMeta* pMeta) { + int32_t code = 0; int64_t chkpId = streamGetLatestCheckpointId(pMeta); SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId); void* pIter = taosHashIterate(pBackend->cfInst, NULL); while (pIter) { - size_t len = 0; - void* key = taosHashGetKey(pIter, &len); - + void* key = taosHashGetKey(pIter, NULL); + code = streamStateConvertDataFormat(pMeta->path, key, *(void**)pIter); + if (code != 0) { + // continue + } + pIter = taosHashIterate(pBackend->cfInst, pIter); } - // streamBackendCleanup(); return 0; @@ -200,18 +203,21 @@ int32_t streamMetaDoStateDataConvert(SStreamMeta* pMeta) { int8_t compatible = streamMetaCheckStateCompatible(pMeta); if (compatible == STREAM_STATA_COMPATIBLE) { return 0; + } else if (compatible == STREAM_STATA_NEED_CONVERT) { + qInfo("stream state need covert backend format"); + + return streamMetaDoStateDataConvertImpl(pMeta); } else if (compatible == STREAM_STATA_NO_COMPATIBLE) { qError( "stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " "manually", tsDataDir); + return -1; - } else if (compatible == STREAM_STATA_NEED_CONVERT) { - qError("stream state need covert backend format"); - return streamMetaDoStateDataConvertImpl(pMeta); } return 0; } + SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) { int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));