refactor backend

This commit is contained in:
yihaoDeng 2023-11-07 11:49:56 +08:00
parent 95cfc5eb46
commit e5449bf161
1 changed files with 62 additions and 70 deletions

View File

@ -22,6 +22,7 @@
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
int32_t copyFiles(const char* src, const char* dst);
typedef struct SDbChkp {
int8_t init;
char* pCurrent;
@ -67,7 +68,7 @@ typedef struct {
SHashObj* pDbChkpTbl;
TdThreadRwlock rwLock;
} SBackendManager;
} SBkdMgt;
typedef struct SCompactFilteFactory {
void* status;
@ -146,7 +147,6 @@ typedef struct {
} SCfInit;
void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void* arg);
typedef struct {
void* funcName;
DestroyFactoryFunc destroy;
@ -486,14 +486,14 @@ _ERROR:
taosMemoryFree(dstDir);
return code;
}
SBackendManager* bkdMgtCreate(char* path) {
SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager));
SBkdMgt* bkdMgtCreate(char* path) {
SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt));
p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
taosThreadRwlockInit(&p->rwLock, NULL);
return p;
}
void bkdMgtDestroy(SBackendManager* bm) {
void bkdMgtDestroy(SBkdMgt* bm) {
if (bm == NULL) return;
void* pIter = taosHashIterate(bm->pDbChkpTbl, NULL);
while (pIter) {
@ -507,7 +507,7 @@ void bkdMgtDestroy(SBackendManager* bm) {
taosMemoryFree(bm);
}
int32_t bkdMgtGetDelta(SBackendManager* bm, char* taskId, int64_t chkpId, SArray* list) {
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list) {
int32_t code = 0;
taosThreadRwlockWrlock(&bm->rwLock);
@ -518,7 +518,7 @@ int32_t bkdMgtGetDelta(SBackendManager* bm, char* taskId, int64_t chkpId, SArray
return code;
}
int32_t bkdMgtAddChkp(SBackendManager* bm, char* task, char* path) {
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path) {
int32_t code = -1;
taosThreadRwlockWrlock(&bm->rwLock);
@ -538,7 +538,7 @@ int32_t bkdMgtAddChkp(SBackendManager* bm, char* task, char* path) {
return code;
}
int32_t bkdMgtDumpTo(SBackendManager* bm, char* taskId, char* dname) {
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname) {
int32_t code = 0;
taosThreadRwlockRdlock(&bm->rwLock);
@ -549,53 +549,8 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char* taskId, char* dname) {
return code;
}
void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void* arg) {
rocksdb_compactionfilterfactory_t* filterFactory = rocksdb_compactionfilterfactory_create(
arg, ginitDict[i].destroyFilter, ginitDict[i].createFilter, ginitDict[i].funcName);
rocksdb_options_set_compaction_filter_factory(opt, filterFactory);
}
bool isValidCheckpoint(const char* dir) { return true; }
int32_t copyFiles(const char* src, const char* dst) {
int32_t code = 0;
// opt later, just hard link
int32_t sLen = strlen(src);
int32_t dLen = strlen(dst);
char* srcName = taosMemoryCalloc(1, sLen + 64);
char* dstName = taosMemoryCalloc(1, dLen + 64);
TdDirPtr pDir = taosOpenDir(src);
if (pDir == NULL) {
taosMemoryFree(srcName);
taosMemoryFree(dstName);
return -1;
}
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
if (!taosDirEntryIsDir(de)) {
code = taosCopyFile(srcName, dstName);
if (code == -1) {
goto _err;
}
}
memset(srcName, 0, sLen + 64);
memset(dstName, 0, dLen + 64);
}
_err:
taosMemoryFreeClear(srcName);
taosMemoryFreeClear(dstName);
taosCloseDir(&pDir);
return code >= 0 ? 0 : -1;
}
int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
// impl later
int32_t code = 0;
@ -1754,23 +1709,23 @@ void destroyCompactFilteFactory(void* arg) {
}
const char* compactFilteFactoryName(void* arg) {
SCompactFilteFactory* state = arg;
return "stream_compact_filter";
return "stream_compact_factory_filter";
}
const char* compactFilteFactoryNameSess(void* arg) {
SCompactFilteFactory* state = arg;
return "stream_compact_filter_sess";
return "stream_compact_factory_filter_sess";
}
const char* compactFilteFactoryNameState(void* arg) {
SCompactFilteFactory* state = arg;
return "stream_compact_filter_state";
return "stream_compact_factory_filter_state";
}
const char* compactFilteFactoryNameFill(void* arg) {
SCompactFilteFactory* state = arg;
return "stream_compact_filter_fill";
return "stream_compact_factory_filter_fill";
}
const char* compactFilteFactoryNameFunc(void* arg) {
SCompactFilteFactory* state = arg;
return "stream_compact_filter_func";
return "stream_compact_factory_filter_func";
}
void destroyCompactFilte(void* arg) { (void)arg; }
@ -1788,21 +1743,18 @@ unsigned char compactFilteSess(void* arg, int level, const char* key, size_t kle
char** newval, size_t* newvlen, unsigned char* value_changed) {
// not impl yet
return 0;
// return streamStateValueIsStale((char*)val) ? 1 : 0;
}
unsigned char compactFilteState(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) {
// not impl yet
return 0;
// return streamStateValueIsStale((char*)val) ? 1 : 0;
}
unsigned char compactFilteFill(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) {
// not impl yet
return 0;
// return streamStateValueIsStale((char*)val) ? 1 : 0;
}
unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
@ -1935,12 +1887,13 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName);
rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
rocksdb_compactionfilterfactory_t* filterFactory =
rocksdb_compactionfilterfactory_create(NULL, cfPara->destroyFilter, cfPara->createFilter, cfPara->funcName);
rocksdb_options_set_compaction_filter_factory(opt, filterFactory);
pTaskDb->pCompares[i] = compare;
pTaskDb->pCfOpts[i] = opt;
pTaskDb->pCfParams[i].tableOpt = tableOpt;
// set filter factory
dbSetFilterFactory(opt, i, NULL);
}
return;
}
@ -2054,8 +2007,8 @@ _EXIT:
return NULL;
}
void taskDbDestroy(void* pBackend) {
STaskDbWrapper* wrapper = pBackend;
void taskDbDestroy(void* pDb) {
STaskDbWrapper* wrapper = pDb;
qDebug("succ to destroy stream backend:%p", wrapper);
int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
@ -2107,7 +2060,7 @@ void taskDbDestroy(void* pBackend) {
if (wrapper->db) rocksdb_close(wrapper->db);
taskDbDestroyChkpOpt(pBackend);
taskDbDestroyChkpOpt(wrapper);
taosMemoryFree(wrapper->idstr);
taosMemoryFree(wrapper->path);
@ -2116,14 +2069,14 @@ void taskDbDestroy(void* pBackend) {
return;
}
int32_t taskDbOpenCfByKey(STaskDbWrapper* pBackend, const char* key) {
int32_t taskDbOpenCfByKey(STaskDbWrapper* pDb, const char* key) {
int32_t code = 0;
char* err = NULL;
int8_t idx = getCfIdx(key);
if (idx == -1) return -1;
rocksdb_column_family_handle_t* cf =
rocksdb_create_column_family(pBackend->db, pBackend->pCfOpts[idx], ginitDict[idx].key, &err);
rocksdb_create_column_family(pDb->db, pDb->pCfOpts[idx], ginitDict[idx].key, &err);
if (err != NULL) {
stError("failed to open cf, key:%s, reason: %s", key, err);
taosMemoryFree(err);
@ -2131,7 +2084,7 @@ int32_t taskDbOpenCfByKey(STaskDbWrapper* pBackend, const char* key) {
return code;
}
pBackend->pCf[idx] = cf;
pDb->pCf[idx] = cf;
return code;
}
int32_t copyDataAt(RocksdbCfInst* pSrc, STaskDbWrapper* pDst, int8_t i) {
@ -3690,4 +3643,43 @@ uint32_t nextPow2(uint32_t x) {
x = x | (x >> 8);
x = x | (x >> 16);
return x + 1;
}
int32_t copyFiles(const char* src, const char* dst) {
int32_t code = 0;
// opt later, just hard link
int32_t sLen = strlen(src);
int32_t dLen = strlen(dst);
char* srcName = taosMemoryCalloc(1, sLen + 64);
char* dstName = taosMemoryCalloc(1, dLen + 64);
TdDirPtr pDir = taosOpenDir(src);
if (pDir == NULL) {
taosMemoryFree(srcName);
taosMemoryFree(dstName);
return -1;
}
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
if (!taosDirEntryIsDir(de)) {
code = taosCopyFile(srcName, dstName);
if (code == -1) {
goto _err;
}
}
memset(srcName, 0, sLen + 64);
memset(dstName, 0, dLen + 64);
}
_err:
taosMemoryFreeClear(srcName);
taosMemoryFreeClear(dstName);
taosCloseDir(&pDir);
return code >= 0 ? 0 : -1;
}