add checkpoint
This commit is contained in:
parent
3020e27114
commit
aad669e33c
|
@ -44,7 +44,7 @@ typedef struct {
|
|||
int64_t defaultCfInit;
|
||||
} SBackendWrapper;
|
||||
|
||||
void* streamBackendInit(const char* path);
|
||||
void* streamBackendInit(const char* path, int64_t chkpId);
|
||||
void streamBackendCleanup(void* arg);
|
||||
void streamBackendHandleCleanup(void* arg);
|
||||
int32_t streamBackendLoadCheckpointInfo(void* pMeta);
|
||||
|
|
|
@ -144,10 +144,40 @@ SCfInit ginitDict[] = {
|
|||
encodeValueFunc, decodeValueFunc},
|
||||
};
|
||||
|
||||
void* streamBackendInit(const char* path) {
|
||||
int32_t streamBackendRebuildDirFromCheckpoint(const char* stateDir, const char* chkpDir) {
|
||||
// impl later
|
||||
|
||||
return 0;
|
||||
}
|
||||
void* streamBackendInit(const char* path, int64_t chkpId) {
|
||||
int32_t code = 0;
|
||||
char* state = taosMemoryCalloc(1, strlen(path) + 32);
|
||||
|
||||
sprintf(state, "%s/%s", path, "state");
|
||||
if (chkpId != 0) {
|
||||
char* chkp = taosMemoryCalloc(1, strlen(path) + 64);
|
||||
sprintf(chkp, "%s/%s/checkpoint-%" PRId64 "", path, "checkpoints", chkpId);
|
||||
if (taosIsDir(chkp)) {
|
||||
if (taosIsDir(state)) {
|
||||
// remove dir if exists
|
||||
// taosRenameFile(const char *oldName, const char *newName)
|
||||
taosRemoveDir(state);
|
||||
}
|
||||
taosMkDir(state);
|
||||
code = streamBackendRebuildDirFromCheckpoint(state, chkp);
|
||||
if (code != 0) {
|
||||
qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
}
|
||||
|
||||
} else {
|
||||
qError("failed to start stream backend at %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
taosMkDir(state);
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
|
||||
|
||||
qDebug("start to init stream backend at %s", path);
|
||||
qDebug("start to init stream backend at %s", state);
|
||||
SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
|
||||
pHandle->list = tdListNew(sizeof(SCfComparator));
|
||||
taosThreadMutexInit(&pHandle->mutex, NULL);
|
||||
|
@ -183,7 +213,7 @@ void* streamBackendInit(const char* path) {
|
|||
char* err = NULL;
|
||||
size_t nCf = 0;
|
||||
|
||||
char** cfs = rocksdb_list_column_families(opts, path, &nCf, &err);
|
||||
char** cfs = rocksdb_list_column_families(opts, state, &nCf, &err);
|
||||
if (nCf == 0 || nCf == 1 || err != NULL) {
|
||||
taosMemoryFreeClear(err);
|
||||
pHandle->db = rocksdb_open(opts, path, &err);
|
||||
|
@ -196,12 +226,12 @@ void* streamBackendInit(const char* path) {
|
|||
/*
|
||||
list all cf and get prefix
|
||||
*/
|
||||
streamStateOpenBackendCf(pHandle, (char*)path, cfs, nCf);
|
||||
streamStateOpenBackendCf(pHandle, (char*)state, cfs, nCf);
|
||||
}
|
||||
if (cfs != NULL) {
|
||||
rocksdb_list_column_families_destroy(cfs, nCf);
|
||||
}
|
||||
qDebug("succ to init stream backend at %s, backend:%p", path, pHandle);
|
||||
qDebug("succ to init stream backend at %s, backend:%p", state, pHandle);
|
||||
|
||||
return (void*)pHandle;
|
||||
_EXIT:
|
||||
|
|
|
@ -23,6 +23,8 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
|
|||
int32_t streamBackendId = 0;
|
||||
int32_t streamBackendCfWrapperId = 0;
|
||||
|
||||
int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
|
||||
|
||||
static void streamMetaEnvInit() {
|
||||
streamBackendId = taosOpenRef(64, streamBackendCleanup);
|
||||
streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
|
||||
|
@ -103,7 +105,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
pMeta->checkpointCap = 4;
|
||||
taosInitRWLatch(&pMeta->checkpointDirLock);
|
||||
|
||||
pMeta->streamBackend = streamBackendInit(streamPath);
|
||||
int64_t chkpId = streamGetLatestCheckpointId(pMeta);
|
||||
|
||||
pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId);
|
||||
if (pMeta->streamBackend == NULL) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -443,7 +447,7 @@ _err:
|
|||
return chkpId;
|
||||
}
|
||||
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
||||
int64_t checkpointId = streamGetLatestCheckpointId(pMeta);
|
||||
int64_t checkpointId = 0;
|
||||
|
||||
TBC* pCur = NULL;
|
||||
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
||||
|
|
Loading…
Reference in New Issue