add checkpoint

This commit is contained in:
yihaoDeng 2023-07-17 02:22:21 +00:00
parent 4b11a65bb7
commit dea2e73d2e
2 changed files with 9 additions and 8 deletions

View File

@ -180,17 +180,16 @@ _err:
taosMemoryFreeClear(absSrcPath); taosMemoryFreeClear(absSrcPath);
taosMemoryFreeClear(absDstPath); taosMemoryFreeClear(absDstPath);
taosCloseDir(&pDir); taosCloseDir(&pDir);
return code; return code >= 0 ? 0 : -1;
} }
int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
// impl later // impl later
int32_t code = 0; int32_t code = 0;
char* state = taosMemoryCalloc(1, strlen(path) + 32); char* state = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(state, "%s/%s", path, "state"); sprintf(state, "%s/%s", path, "state");
if (chkpId != 0) { if (chkpId != 0) {
char* chkp = taosMemoryCalloc(1, strlen(path) + 64); char* chkp = taosMemoryCalloc(1, strlen(path) + 64);
sprintf(chkp, "%s/%s/checkpoint-%" PRId64 "", path, "checkpoints", chkpId); sprintf(chkp, "%s/%s/checkpoint%" PRId64 "", path, "checkpoints", chkpId);
if (taosIsDir(chkp) && isValidCheckpoint(chkp)) { if (taosIsDir(chkp) && isValidCheckpoint(chkp)) {
if (taosIsDir(state)) { if (taosIsDir(state)) {
// remove dir if exists // remove dir if exists
@ -201,6 +200,8 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
code = copyFiles(chkp, state); code = copyFiles(chkp, state);
if (code != 0) { if (code != 0) {
qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno))); qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
qInfo("succ to restart stream backend at checkpoint path: %s", chkp);
} }
} else { } else {
@ -458,7 +459,7 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) {
for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) { for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) {
int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i); int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i);
char tbuf[256] = {0}; char tbuf[256] = {0};
sprintf(tbuf, "%s/checkpoint-%" PRId64 "", path, id); sprintf(tbuf, "%s/checkpoint%" PRId64 "", path, id);
if (taosIsDir(tbuf)) { if (taosIsDir(tbuf)) {
taosRemoveDir(tbuf); taosRemoveDir(tbuf);
} }
@ -499,7 +500,7 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
char checkpointPrefix[32] = {0}; char checkpointPrefix[32] = {0};
int64_t checkpointId = 0; int64_t checkpointId = 0;
int ret = sscanf(taosGetDirEntryName(de), "checkpoint-%" PRId64 "", &checkpointId); int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId);
if (ret == 1) { if (ret == 1) {
taosArrayPush(suffix, &checkpointId); taosArrayPush(suffix, &checkpointId);
} }
@ -534,7 +535,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
} }
char checkpointDir[256] = {0}; char checkpointDir[256] = {0};
snprintf(checkpointDir, tListLen(checkpointDir), "%s/checkpoint-%" PRId64, path, checkpointId); snprintf(checkpointDir, tListLen(checkpointDir), "%s/checkpoint%" PRId64, path, checkpointId);
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
if (pHandle == NULL) { if (pHandle == NULL) {
@ -1630,10 +1631,10 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
pCur->number = pState->number;
pCur->db = ((SBackendCfWrapper*)pState->pTdbState->pBackendCfWrapper)->rocksdb; pCur->db = ((SBackendCfWrapper*)pState->pTdbState->pBackendCfWrapper)->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);

View File

@ -102,7 +102,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t)); pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t));
pMeta->checkpointInUse = taosArrayInit(4, sizeof(int64_t)); pMeta->checkpointInUse = taosArrayInit(4, sizeof(int64_t));
pMeta->checkpointCap = 4; pMeta->checkpointCap = 8;
taosInitRWLatch(&pMeta->checkpointDirLock); taosInitRWLatch(&pMeta->checkpointDirLock);
int64_t chkpId = streamGetLatestCheckpointId(pMeta); int64_t chkpId = streamGetLatestCheckpointId(pMeta);