diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index aa48f5cb29..17a28b8b82 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -47,6 +47,7 @@ typedef struct { void* streamBackendInit(const char* path); void streamBackendCleanup(void* arg); void streamBackendHandleCleanup(void* arg); +int32_t streamBackendLoadCheckpointInfo(void* pMeta); int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId); SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 9ceea4545b..bfcedd2d53 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -393,7 +393,7 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) { int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i); char tbuf[256] = {0}; - sprintf(tbuf, "%s/checkpoint_%" PRId64 "", path, id); + sprintf(tbuf, "%s/checkpoint-%" PRId64 "", path, id); if (taosIsDir(tbuf)) { taosRemoveDir(tbuf); } @@ -402,11 +402,63 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { return 0; } +static int32_t compareCheckpoint(const void* a, const void* b) { + int64_t x = *(int64_t*)a; + int64_t y = *(int64_t*)b; + return x < y ? -1 : 1; +} + +int32_t streamBackendLoadCheckpointInfo(void* arg) { + SStreamMeta* pMeta = arg; + int32_t code = 0; + + int32_t len = strlen(pMeta->path) + 30; + char* checkpointPath = taosMemoryCalloc(1, len); + sprintf(checkpointPath, "%s/%s", pMeta->path, "checkpoints"); + + if (!taosDirExist(checkpointPath)) { + return 0; + // no checkpoint, nothing to load + } + + TdDirPtr pDir = taosOpenDir(checkpointPath); + if (pDir == NULL) return 0; + + TdDirEntryPtr de = NULL; + SArray* suffix = taosArrayInit(4, sizeof(int64_t)); + + while ((de = taosReadDir(pDir)) != NULL) { + if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue; + + if (taosDirEntryIsDir(de)) { + char checkpointPrefix[32] = {0}; + int64_t checkpointId = 0; + + int ret = sscanf(taosGetDirEntryName(de), "checkpoint-%" PRId64 "", &checkpointId); + if (ret == 1) { + taosArrayPush(suffix, &checkpointId); + } + } else { + continue; + } + } + taosArraySort(suffix, compareCheckpoint); + + for (int i = 0; i < taosArrayGetSize(suffix); i++) { + int64_t id = *(int64_t*)taosArrayGet(suffix, i); + taosArrayPush(pMeta->checkpointSaved, &id); + } + + taosArrayDestroy(suffix); + taosCloseDir(&pDir); + taosMemoryFree(checkpointPath); + return 0; +} int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { - SStreamMeta* pMeta = arg; - int64_t backendRid = pMeta->streamBackendRid; - int64_t st = taosGetTimestampMs(); - int32_t code = -1; + SStreamMeta* pMeta = arg; + int64_t backendRid = pMeta->streamBackendRid; + int64_t st = taosGetTimestampMs(); + int32_t code = -1; char path[256] = {0}; sprintf(path, "%s/%s", pMeta->path, "checkpoints"); @@ -417,7 +469,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { } char checkpointDir[256] = {0}; - snprintf(checkpointDir, tListLen(checkpointDir),"%s/checkpoint_%" PRIu64, path, checkpointId); + snprintf(checkpointDir, tListLen(checkpointDir), "%s/checkpoint-%" PRId64, path, checkpointId); SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); if (pHandle == NULL) { @@ -1203,8 +1255,8 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len } return true; } -rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* pChkptFileName, rocksdb_snapshot_t** snapshot, - rocksdb_readoptions_t** readOpt) { +rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* pChkptFileName, + rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt) { int idx = streamStateGetCfIdx(pState, pChkptFileName); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7d4e7e4615..b2dabd6356 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -100,6 +100,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF if (pMeta->streamBackend == NULL) { goto _err; } + pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -108,6 +109,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->checkpointCap = 4; taosInitRWLatch(&pMeta->checkpointDirLock); + code = streamBackendLoadCheckpointInfo(pMeta); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + goto _err; + } + taosMemoryFree(streamPath); taosInitRWLatch(&pMeta->lock); @@ -310,7 +317,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING)); - while(1) { + while (1) { taosRLockLatch(&pMeta->lock); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));