rm expire checkpoint
This commit is contained in:
parent
c056e34534
commit
e547ed3699
|
@ -47,6 +47,7 @@ typedef struct {
|
|||
void* streamBackendInit(const char* path);
|
||||
void streamBackendCleanup(void* arg);
|
||||
void streamBackendHandleCleanup(void* arg);
|
||||
int32_t streamBackendLoadCheckpointInfo(void* pMeta);
|
||||
int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId);
|
||||
SListNode* streamBackendAddCompare(void* backend, void* arg);
|
||||
void streamBackendDelCompare(void* backend, void* arg);
|
||||
|
|
|
@ -393,7 +393,7 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) {
|
|||
for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) {
|
||||
int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i);
|
||||
char tbuf[256] = {0};
|
||||
sprintf(tbuf, "%s/checkpoint_%" PRId64 "", path, id);
|
||||
sprintf(tbuf, "%s/checkpoint-%" PRId64 "", path, id);
|
||||
if (taosIsDir(tbuf)) {
|
||||
taosRemoveDir(tbuf);
|
||||
}
|
||||
|
@ -402,11 +402,63 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t compareCheckpoint(const void* a, const void* b) {
|
||||
int64_t x = *(int64_t*)a;
|
||||
int64_t y = *(int64_t*)b;
|
||||
return x < y ? -1 : 1;
|
||||
}
|
||||
|
||||
int32_t streamBackendLoadCheckpointInfo(void* arg) {
|
||||
SStreamMeta* pMeta = arg;
|
||||
int32_t code = 0;
|
||||
|
||||
int32_t len = strlen(pMeta->path) + 30;
|
||||
char* checkpointPath = taosMemoryCalloc(1, len);
|
||||
sprintf(checkpointPath, "%s/%s", pMeta->path, "checkpoints");
|
||||
|
||||
if (!taosDirExist(checkpointPath)) {
|
||||
return 0;
|
||||
// no checkpoint, nothing to load
|
||||
}
|
||||
|
||||
TdDirPtr pDir = taosOpenDir(checkpointPath);
|
||||
if (pDir == NULL) return 0;
|
||||
|
||||
TdDirEntryPtr de = NULL;
|
||||
SArray* suffix = taosArrayInit(4, sizeof(int64_t));
|
||||
|
||||
while ((de = taosReadDir(pDir)) != NULL) {
|
||||
if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
|
||||
|
||||
if (taosDirEntryIsDir(de)) {
|
||||
char checkpointPrefix[32] = {0};
|
||||
int64_t checkpointId = 0;
|
||||
|
||||
int ret = sscanf(taosGetDirEntryName(de), "checkpoint-%" PRId64 "", &checkpointId);
|
||||
if (ret == 1) {
|
||||
taosArrayPush(suffix, &checkpointId);
|
||||
}
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
taosArraySort(suffix, compareCheckpoint);
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(suffix); i++) {
|
||||
int64_t id = *(int64_t*)taosArrayGet(suffix, i);
|
||||
taosArrayPush(pMeta->checkpointSaved, &id);
|
||||
}
|
||||
|
||||
taosArrayDestroy(suffix);
|
||||
taosCloseDir(&pDir);
|
||||
taosMemoryFree(checkpointPath);
|
||||
return 0;
|
||||
}
|
||||
int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
|
||||
SStreamMeta* pMeta = arg;
|
||||
int64_t backendRid = pMeta->streamBackendRid;
|
||||
int64_t st = taosGetTimestampMs();
|
||||
int32_t code = -1;
|
||||
SStreamMeta* pMeta = arg;
|
||||
int64_t backendRid = pMeta->streamBackendRid;
|
||||
int64_t st = taosGetTimestampMs();
|
||||
int32_t code = -1;
|
||||
|
||||
char path[256] = {0};
|
||||
sprintf(path, "%s/%s", pMeta->path, "checkpoints");
|
||||
|
@ -417,7 +469,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
|
|||
}
|
||||
|
||||
char checkpointDir[256] = {0};
|
||||
snprintf(checkpointDir, tListLen(checkpointDir),"%s/checkpoint_%" PRIu64, path, checkpointId);
|
||||
snprintf(checkpointDir, tListLen(checkpointDir), "%s/checkpoint-%" PRId64, path, checkpointId);
|
||||
|
||||
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
|
||||
if (pHandle == NULL) {
|
||||
|
@ -1203,8 +1255,8 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len
|
|||
}
|
||||
return true;
|
||||
}
|
||||
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* pChkptFileName, rocksdb_snapshot_t** snapshot,
|
||||
rocksdb_readoptions_t** readOpt) {
|
||||
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* pChkptFileName,
|
||||
rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt) {
|
||||
int idx = streamStateGetCfIdx(pState, pChkptFileName);
|
||||
|
||||
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
|
||||
|
|
|
@ -100,6 +100,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
if (pMeta->streamBackend == NULL) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
||||
pMeta->pTaskBackendUnique =
|
||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
|
@ -108,6 +109,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
pMeta->checkpointCap = 4;
|
||||
taosInitRWLatch(&pMeta->checkpointDirLock);
|
||||
|
||||
code = streamBackendLoadCheckpointInfo(pMeta);
|
||||
if (code != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(code);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
taosMemoryFree(streamPath);
|
||||
|
||||
taosInitRWLatch(&pMeta->lock);
|
||||
|
@ -310,7 +317,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
|
|||
|
||||
qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING));
|
||||
|
||||
while(1) {
|
||||
while (1) {
|
||||
taosRLockLatch(&pMeta->lock);
|
||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||
|
||||
|
|
Loading…
Reference in New Issue