add cvt state

This commit is contained in:
yihaoDeng 2023-09-22 08:26:03 +00:00
parent 470849982e
commit 19e52c364e
3 changed files with 117 additions and 59 deletions

View File

@ -51,8 +51,8 @@ typedef struct {
typedef struct {
rocksdb_t* db;
rocksdb_writeoptions_t* writeOpts;
rocksdb_readoptions_t* readOpts;
rocksdb_writeoptions_t* writeOpt;
rocksdb_readoptions_t* readOpt;
rocksdb_options_t* dbOpt;
rocksdb_env_t* env;
rocksdb_cache_t* cache;
@ -63,7 +63,7 @@ typedef struct {
RocksdbCfParam* pCfParams;
rocksdb_compactionfilterfactory_t* filterFactory;
TdThreadMutex cfMutex;
TdThreadMutex mutex;
} STaskBackendWrapper;
@ -75,6 +75,7 @@ int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId);
SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst);
STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key);
int streamStateOpenBackend(void* backend, SStreamState* pState);
void streamStateCloseBackend(SStreamState* pState, bool remove);

View File

@ -1457,6 +1457,15 @@ STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key) {
char* taskPath = taosMemoryCalloc(1, strlen(path) + 128);
sprintf(taskPath, "%s%s%s%s%s", path, TD_DIRSEP, "state", TD_DIRSEP, key);
if (!taosDirExist(taskPath)) {
code = taosMkDir(taskPath);
if (code != 0) {
qError("failed to create dir: %s, reason:%s", taskPath, tstrerror(code));
taosMemoryFree(taskPath);
return NULL;
}
}
char* err = NULL;
STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(SBackendWrapper));
rocksdb_env_t* env = rocksdb_create_default_env();
@ -1474,33 +1483,16 @@ STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key) {
// rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2);
rocksdb_options_set_atomic_flush(opts, 1);
pTaskBackend->env = env;
pTaskBackend->dbOpt = opts;
pTaskBackend->env = env;
pTaskBackend->cache = cache;
pTaskBackend->filterFactory = rocksdb_compactionfilterfactory_create(
NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
rocksdb_options_set_compaction_filter_factory(pTaskBackend->dbOpt, pTaskBackend->filterFactory);
pTaskBackend->readOpt = rocksdb_readoptions_create();
pTaskBackend->writeOpt = rocksdb_writeoptions_create();
char* err = NULL;
size_t nCf = 0;
char** cfs = rocksdb_list_column_families(opts, taskPath, &nCf, &err);
if (nCf == 0 || nCf == 1 || err != NULL) {
taosMemoryFreeClear(err);
pTaskBackend->db = rocksdb_open(opts, taskPath, &err);
if (err != NULL) {
qError("failed to open rocksdb, path:%s, reason:%s", taskPath, err);
taosMemoryFreeClear(err);
}
} else {
ASSERT(0);
}
if (cfs != NULL) {
rocksdb_list_column_families_destroy(cfs, nCf);
}
qDebug("succ to init stream backend at %s, backend:%p", taskPath, pTaskBackend);
nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
size_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
pTaskBackend->pCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
pTaskBackend->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
pTaskBackend->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
@ -1528,8 +1520,65 @@ STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key) {
pTaskBackend->pCfParams[i].tableOpt = tableOpt;
}
char** cfs = rocksdb_list_column_families(opts, taskPath, &nCf, &err);
if (nCf == 0 || nCf == 1 || err != NULL) {
taosMemoryFreeClear(err);
pTaskBackend->db = rocksdb_open(opts, taskPath, &err);
if (err != NULL) {
qError("failed to open rocksdb, path:%s, reason:%s", taskPath, err);
taosMemoryFreeClear(err);
}
} else {
}
if (cfs != NULL) {
rocksdb_list_column_families_destroy(cfs, nCf);
}
taosThreadMutexInit(&pTaskBackend->mutex, NULL);
taosMemoryFree(taskPath);
qDebug("succ to init stream backend at %s, backend:%p", taskPath, pTaskBackend);
return pTaskBackend;
}
void streamMeteCloseTaskBackend(STaskBackendWrapper* wrapper) {
if (wrapper == NULL) return;
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flushOpt, 1);
int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
char* err = NULL;
for (int i = 0; i < nCf; i++) {
if (wrapper->pCf[i] != NULL) rocksdb_flush_cf(wrapper->db, flushOpt, wrapper->pCf[i], &err);
if (err != NULL) {
qError("failed to flush cf:%s, reason:%s", ginitDict[i].key, err);
taosMemoryFreeClear(err);
}
rocksdb_flushoptions_destroy(flushOpt);
}
for (int i = 0; i < nCf; i++) {
if (wrapper->pCf[i] != NULL) {
rocksdb_column_family_handle_destroy(wrapper->pCf[i]);
}
}
rocksdb_options_destroy(wrapper->dbOpt);
rocksdb_readoptions_destroy(wrapper->readOpt);
rocksdb_writeoptions_destroy(wrapper->writeOpt);
rocksdb_env_destroy(wrapper->env);
rocksdb_cache_destroy(wrapper->cache);
taosMemoryFree(wrapper->pCf);
taosMemoryFree(wrapper->pCompares);
taosMemoryFree(wrapper->pCfOpts);
taosMemoryFree(wrapper->pCfParams);
taosThreadMutexDestroy(&wrapper->mutex);
rocksdb_close(wrapper->db);
taosMemoryFree(wrapper);
return;
}
int8_t getCfIdx(const char* key) {
int idx = -1;
@ -1542,21 +1591,25 @@ int8_t getCfIdx(const char* key) {
}
return idx;
}
rocksdb_column_family_handle_t* taskBackendOpenCf(STaskBackendWrapper* pBackend, const char* key) {
int32_t taskBackendOpenCf(STaskBackendWrapper* pBackend, const char* key) {
int32_t code = 0;
char* err = NULL;
int8_t idx = getCfIdx(key);
if (idx == -1) return NULL;
if (idx == -1) return -1;
rocksdb_column_family_handle_t* cf =
rocksdb_create_column_family(pBackend->db, pBackend->pCfOpts[idx], ginitDict[idx].key, &err);
if (err != NULL) {
qError("failed to open cf, key:%s, reason: %s", key, err);
taosMemoryFree(err);
return NULL;
code = -1;
return code;
}
return cf;
pBackend->pCf[idx] = cf;
return code;
}
int32_t copyData(RocksdbCfInst* pSrc, STaskBackendWrapper* pDst, int8_t i) {
int32_t copyDataAt(RocksdbCfInst* pSrc, STaskBackendWrapper* pDst, int8_t i) {
int32_t WRITE_BATCH = 1024;
char* err = NULL;
int code = 0;
@ -1568,7 +1621,7 @@ int32_t copyData(RocksdbCfInst* pSrc, STaskBackendWrapper* pDst, int8_t i) {
rocksdb_iter_seek_to_first(pIter);
while (rocksdb_iter_valid(pIter)) {
if (rocksdb_writebatch_count(wb) >= WRITE_BATCH) {
rocksdb_write(pDst->db, pDst->writeOpts, wb, &err);
rocksdb_write(pDst->db, pDst->writeOpt, wb, &err);
if (err != NULL) {
code = -1;
goto _EXIT;
@ -1585,7 +1638,7 @@ int32_t copyData(RocksdbCfInst* pSrc, STaskBackendWrapper* pDst, int8_t i) {
}
if (rocksdb_writebatch_count(wb) > 0) {
rocksdb_write(pDst->db, pDst->writeOpts, wb, &err);
rocksdb_write(pDst->db, pDst->writeOpt, wb, &err);
if (err != NULL) {
code = -1;
goto _EXIT;
@ -1613,19 +1666,17 @@ int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst) {
rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i];
if (pSrcCf == NULL) continue;
rocksdb_column_family_handle_t* pDstCf = taskBackendOpenCf(pTaskBackend, ginitDict[i].key);
if (pDstCf == NULL) {
return -1;
code = taskBackendOpenCf(pTaskBackend, ginitDict[i].key);
if (code != 0) goto _EXIT;
code = copyDataAt(pSrcBackend, pTaskBackend, i);
if (code != 0) goto _EXIT;
}
code = copyData(pSrcBackend, pTaskBackend, i);
if (code != 0) {
return -1;
}
_EXIT:
streamMeteCloseTaskBackend(pTaskBackend);
pTaskBackend->pCf[i] = pDstCf;
}
return 0;
return code;
}
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf) {
SBackendWrapper* handle = backend;

View File

@ -181,17 +181,20 @@ int32_t streamMetaCheckStateCompatible(SStreamMeta* pMeta) {
}
int32_t streamMetaDoStateDataConvertImpl(SStreamMeta* pMeta) {
int32_t code = 0;
int64_t chkpId = streamGetLatestCheckpointId(pMeta);
SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId);
void* pIter = taosHashIterate(pBackend->cfInst, NULL);
while (pIter) {
size_t len = 0;
void* key = taosHashGetKey(pIter, &len);
void* key = taosHashGetKey(pIter, NULL);
code = streamStateConvertDataFormat(pMeta->path, key, *(void**)pIter);
if (code != 0) {
// continue
}
pIter = taosHashIterate(pBackend->cfInst, pIter);
}
// streamBackendCleanup();
return 0;
@ -200,18 +203,21 @@ int32_t streamMetaDoStateDataConvert(SStreamMeta* pMeta) {
int8_t compatible = streamMetaCheckStateCompatible(pMeta);
if (compatible == STREAM_STATA_COMPATIBLE) {
return 0;
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
qInfo("stream state need covert backend format");
return streamMetaDoStateDataConvertImpl(pMeta);
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
qError(
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
"manually",
tsDataDir);
return -1;
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
qError("stream state need covert backend format");
return streamMetaDoStateDataConvertImpl(pMeta);
}
return 0;
}
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {
int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));