diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 53f9cc3c92..38cd1e1b76 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -104,6 +104,7 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) { pHdr->type = SNAP_DATA_STREAM_STATE_BACKEND; pHdr->size = len; memcpy(pHdr->data, rowData, len); + taosMemoryFree(rowData); tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode)); return code; diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index b79979df67..c8d5797dad 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -181,5 +181,7 @@ int32_t streamBackendTriggerChkp(void* pMeta, char* dst); int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId); int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId); +int32_t taskBackendBuildSnap(void* arg, int64_t chkpId); + // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f936b569ae..5e48a345b1 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -950,7 +950,14 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI return 0; } - +int32_t taskBackendBuildSnap(void* arg, int64_t chkpId) { + SStreamMeta* pMeta = arg; + void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL); + while (pIter) { + pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); + } + return 0; +} int32_t streamBackendTriggerChkp(void* arg, char* dst) { return 0; // SStreamMeta* pMeta = arg; diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 42d77fc4c0..88790f2511 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -108,44 +108,57 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { return taosOpenFile(fullname, opt); } -int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) { - // impl later - int len = strlen(path); - char* tdir = taosMemoryCalloc(1, len + 256); - memcpy(tdir, path, len); +int32_t streamStateSnapBuild(void* arg, char* path, int64_t chkpId) { + return taskBackendBuildSnap(arg, chkpId); + // int32_t code = 0; + // int8_t validChkp = 0; - int32_t code = 0; + // int len = strlen(path); + // char* tpath = taosMemoryCalloc(1, len + 256); + // memcpy(tpath, path, len); - int8_t validChkp = 0; - if (chkpId != 0) { - sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP, - chkpId); - if (taosIsDir(tdir)) { - validChkp = 1; - qInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir); - streamBackendAddInUseChkp(pMeta, chkpId); - } else { - qWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir); - } - } + // SStreamMeta *pMeta = arg; + // if (chkpId != 0) { + // sprintf(tpath, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", + // TD_DIRSEP, + // chkpId); + // if (taosIsDir(tpath)) { + // validChkp = 1; + // qInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tpath); + // streamBackendAddInUseChkp(pMeta, chkpId); + // } else { + // qWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, + // tpath); + // } + // } // no checkpoint specified or not exists invalid checkpoint, do checkpoint at default path and translate it - if (validChkp == 0) { - sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state"); - char* chkpdir = taosMemoryCalloc(1, len + 256); - sprintf(chkpdir, "%s%s%s", tdir, TD_DIRSEP, "tmp"); - taosMemoryFree(tdir); + // if (validChkp == 0) { + // sprintf(tpath, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state"); + // char* chkpdir = taosMemoryCalloc(1, len + 256); + // sprintf(chkpdir, "%s%s%s", tpath, TD_DIRSEP, "tmp"); + // taosMemoryFree(tpath); - tdir = chkpdir; - qInfo("%s start to trigger checkpoint on %s", STREAM_STATE_TRANSFER, tdir); + // tpath = chkpdir; + // qInfo("%s start to trigger checkpoint on %s", STREAM_STATE_TRANSFER, tpath); - code = streamBackendTriggerChkp(pMeta, tdir); - if (code != 0) { - qError("%s failed to trigger chekckpoint at %s", STREAM_STATE_TRANSFER, tdir); - taosMemoryFree(tdir); - return code; - } - chkpId = 0; + // code = streamBackendTriggerChkp(arg, tpath); + // if (code != 0) { + // qError("%s failed to trigger chekckpoint at %s", STREAM_STATE_TRANSFER, tpath); + // taosMemoryFree(tpath); + // return code; + // } + // chkpId = 0; + // } + + //*dstPath = tpath; +} +int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) { + // impl later + char* tdir = NULL; + int32_t code = streamStateSnapBuild(pMeta, path, chkpId); + if (code != 0) { + return -1; } qInfo("%s start to read dir: %s", STREAM_STATE_TRANSFER, tdir);