From 6e5be0d30edae01c866db77df0e6d49c7be71632 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 20 Jul 2023 03:49:32 +0000 Subject: [PATCH] fix stream state transfer --- include/libs/stream/tstream.h | 4 +++- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 16 ++++++++++++++-- source/libs/stream/src/streamBackendRocksdb.c | 4 ++-- source/libs/stream/src/streamSnapshot.c | 2 ++ source/libs/stream/src/streamState.c | 5 +++-- 5 files changed, 24 insertions(+), 7 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 354f202e55..efc7901d4a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -367,7 +367,9 @@ typedef struct SStreamMeta { SHashObj* pTaskBackendUnique; TdThreadMutex backendMutex; - int32_t chkptNotReadyTasks; + int32_t chkptNotReadyTasks; + + int64_t checkpointId; SArray* checkpointSaved; SArray* checkpointInUse; int32_t checkpointCap; diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index f7bae25043..4c70f3f7b9 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -41,13 +41,25 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } + + SStreamMeta* meta = pTq->pStreamMeta; pReader->pTq = pTq; pReader->sver = sver; pReader->ever = ever; + int64_t checkpointId = meta ? meta->checkpointId : 0; + SStreamSnapReader* pSnapReader = NULL; - sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "checkpoints"); - if (streamSnapReaderOpen(pTq, sver, ever, tdir, &pSnapReader) == 0) { + + // restore from checkpoint if checkpointid != 0 + if (checkpointId != 0) { + sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "checkpoints", + TD_DIRSEP, checkpointId); + } else { + sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "state"); + } + + if (streamSnapReaderOpen(pTq, sver, checkpointId, tdir, &pSnapReader) == 0) { pReader->complete = 1; } else { code = -1; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 5ddfdef806..f643e5b1e8 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -163,8 +163,8 @@ int32_t copyFiles(const char* src, const char* dst) { char* name = taosGetDirEntryName(de); if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; - sprintf(absSrcPath, "%s/%s", src, name); - sprintf(absDstPath, "%s/%s", dst, name); + sprintf(absSrcPath, "%s%s%s", src, TD_DIRSEP, name); + sprintf(absDstPath, "%s%s%s", dst, TD_DIRSEP, name); if (!taosDirEntryIsDir(de)) { code = taosCopyFile(absSrcPath, absDstPath); if (code == -1) { diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 1a66c00389..ce82268f7e 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -64,6 +64,7 @@ struct SStreamSnapReader { int64_t sver; int64_t ever; SStreamSnapHandle handle; + int64_t checkpointId; }; struct SStreamSnapWriter { void* pMeta; @@ -217,6 +218,7 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, char* path if (pReader == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + pReader->handle.checkpointId = ever; // const char* path = NULL; if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) { taosMemoryFree(pReader); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 0a4f73a67c..ad18cdc091 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -108,7 +108,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz SStreamTask* pStreamTask = pTask; char statePath[1024]; if (!specPath) { - sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId); + sprintf(statePath, "%s%s%d", path, TD_DIRSEP, pStreamTask->id.taskId); } else { memset(statePath, 0, 1024); tstrncpy(statePath, path, 1024); @@ -762,7 +762,8 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { #ifdef USE_ROCKSDB - qDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey,key->win.ekey, key->groupId); + qDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey, + key->groupId); return streamStateSessionDel_rocksdb(pState, key); #else SStateSessionKey sKey = {.key = *key, .opNum = pState->number};