add checkpoint

This commit is contained in:
yihaoDeng 2023-07-15 09:35:33 +00:00
parent 7b4185dc56
commit b4de0892e6
2 changed files with 21 additions and 21 deletions

View File

@ -212,13 +212,13 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
return 0; return 0;
} }
void* streamBackendInit(const char* path, int64_t chkpId) { void* streamBackendInit(const char* streamPath, int64_t chkpId) {
uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; char* backendPath = NULL;
int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);
char* state = NULL; qDebug("start to init stream backend at %s", backendPath);
int32_t code = rebuildDirFromCheckpoint(path, chkpId, &state);
qDebug("start to init stream backend at %s", state); uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
pHandle->list = tdListNew(sizeof(SCfComparator)); pHandle->list = tdListNew(sizeof(SCfComparator));
taosThreadMutexInit(&pHandle->mutex, NULL); taosThreadMutexInit(&pHandle->mutex, NULL);
@ -254,12 +254,12 @@ void* streamBackendInit(const char* path, int64_t chkpId) {
char* err = NULL; char* err = NULL;
size_t nCf = 0; size_t nCf = 0;
char** cfs = rocksdb_list_column_families(opts, state, &nCf, &err); char** cfs = rocksdb_list_column_families(opts, backendPath, &nCf, &err);
if (nCf == 0 || nCf == 1 || err != NULL) { if (nCf == 0 || nCf == 1 || err != NULL) {
taosMemoryFreeClear(err); taosMemoryFreeClear(err);
pHandle->db = rocksdb_open(opts, path, &err); pHandle->db = rocksdb_open(opts, backendPath, &err);
if (err != NULL) { if (err != NULL) {
qError("failed to open rocksdb, path:%s, reason:%s", path, err); qError("failed to open rocksdb, path:%s, reason:%s", backendPath, err);
taosMemoryFreeClear(err); taosMemoryFreeClear(err);
goto _EXIT; goto _EXIT;
} }
@ -267,13 +267,13 @@ void* streamBackendInit(const char* path, int64_t chkpId) {
/* /*
list all cf and get prefix list all cf and get prefix
*/ */
streamStateOpenBackendCf(pHandle, (char*)state, cfs, nCf); streamStateOpenBackendCf(pHandle, (char*)backendPath, cfs, nCf);
} }
if (cfs != NULL) { if (cfs != NULL) {
rocksdb_list_column_families_destroy(cfs, nCf); rocksdb_list_column_families_destroy(cfs, nCf);
} }
qDebug("succ to init stream backend at %s, backend:%p", state, pHandle); qDebug("succ to init stream backend at %s, backend:%p", backendPath, pHandle);
taosMemoryFreeClear(state); taosMemoryFreeClear(backendPath);
return (void*)pHandle; return (void*)pHandle;
_EXIT: _EXIT:
@ -285,9 +285,9 @@ _EXIT:
taosHashCleanup(pHandle->cfInst); taosHashCleanup(pHandle->cfInst);
rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
tdListFree(pHandle->list); tdListFree(pHandle->list);
taosMemoryFree(state); taosMemoryFree(backendPath);
taosMemoryFree(pHandle); taosMemoryFree(pHandle);
qDebug("failed to init stream backend at %s", path); qDebug("failed to init stream backend at %s", backendPath);
return NULL; return NULL;
} }
void streamBackendCleanup(void* arg) { void streamBackendCleanup(void* arg) {

View File

@ -51,8 +51,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
goto _err; goto _err;
} }
memset(streamPath, 0, len);
memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(streamPath, 0755); code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) { if (code != 0) {
@ -90,13 +90,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->ahandle = ahandle; pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc; pMeta->expandFunc = expandFunc;
memset(streamPath, 0, len); // memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "state"); // sprintf(streamPath, "%s/%s", pMeta->path, "state");
code = taosMulModeMkDir(streamPath, 0755); // code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) { // if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code); // terrno = TAOS_SYSTEM_ERROR(code);
goto _err; // goto _err;
} // }
pMeta->pTaskBackendUnique = pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);