diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index debc088384..36b6b6e5e4 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -71,33 +71,28 @@ int32_t mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, void SMnode *pMnode = pFsm->data; mInfo("start to read snapshot from sdb"); - int32_t code = sdbReadSnapshot(pMnode->pSdb, (SSdbIter **)ppIter, (void**)ppBuf, len); - if (code != 0) { - mError("failed to read snapshot from sdb since %s", terrstr()); - } else { - if (*ppIter == NULL) { - mInfo("successfully to read snapshot from sdb"); - } - } + // sdbStartRead + // sdbDoRead + // sdbStopRead - return code; + return 0; } int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char *pBuf, int32_t len) { SMnode *pMnode = pFsm->data; - mndSetRestore(pMnode, false); - mInfo("start to apply snapshot to sdb, len:%d", len); - int32_t code = sdbApplySnapshot(pMnode->pSdb, pBuf, len); - if (code != 0) { - mError("failed to apply snapshot to sdb, len:%d", len); - } else { - mInfo("successfully to apply snapshot to sdb, len:%d", len); - mndSetRestore(pMnode, true); - } + // sdbStartWrite + // sdbDoWrite + + mndSetRestore(pMnode, false); + mInfo("start to apply snapshot to sdb"); + + // sdbStopWrite + mInfo("successfully to apply snapshot to sdb"); + mndSetRestore(pMnode, true); // taosMemoryFree(pBuf); - return code; + return 0; } void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { diff --git a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp index 80a5786853..43be55dd1d 100644 --- a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp +++ b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp @@ -492,7 +492,7 @@ TEST_F(MndTestSdb, 01_Write_Str) { ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2); ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1); - ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2 ); + ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2); sdbSetApplyIndex(pSdb, -1); ASSERT_EQ(sdbGetApplyIndex(pSdb), -1); ASSERT_EQ(mnode.insertTimes, 2); @@ -896,21 +896,28 @@ TEST_F(MndTestSdb, 01_Read_Str) { } { - void *sdbbuf = taosMemoryMalloc(1024 * 1024); - int32_t total = 0; + SSdbIter *pReader = NULL; + SSdbIter *pWritter = NULL; void *pBuf = NULL; int32_t len = 0; - SSdbIter *pIter = NULL; - while (sdbReadSnapshot(pSdb, &pIter, &pBuf, &len) == 0) { + int32_t code = 0; + + code = sdbStartRead(pSdb, &pReader); + ASSERT_EQ(code, 0); + code = sdbStartWrite(pSdb, &pWritter); + ASSERT_EQ(code, 0); + + while (sdbDoRead(pSdb, pReader, &pBuf, &len) == 0) { if (pBuf != NULL && len != 0) { - memcpy((char *)sdbbuf + total, pBuf, len); - total += len; + sdbDoWrite(pSdb, pWritter, pBuf, len); taosMemoryFree(pBuf); } else { break; } } - sdbApplySnapshot(pSdb, sdbbuf, total); + + sdbStopRead(pSdb, pReader); + sdbStopWrite(pSdb, pWritter, true); } ASSERT_EQ(sdbGetSize(pSdb, SDB_CONSUMER), 1); diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index b34b8684a6..c66b47a24b 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -187,6 +187,7 @@ typedef struct SSdb { typedef struct SSdbIter { TdFilePtr file; int64_t total; + char *name; } SSdbIter; typedef struct { @@ -380,8 +381,13 @@ SSdbRow *sdbAllocRow(int32_t objSize); void *sdbGetRowObj(SSdbRow *pRow); void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc); -int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, void **ppBuf, int32_t *len); -int32_t sdbApplySnapshot(SSdb *pSdb, void *pBuf, int32_t len); +int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter); +int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter); +int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len); + +int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter); +int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply); +int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len); const char *sdbTableName(ESdbType type); void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index aef3476440..485b729deb 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -71,6 +71,7 @@ void sdbCleanup(SSdb *pSdb) { } if (pSdb->tmpDir != NULL) { + taosRemoveDir(pSdb->tmpDir); taosMemoryFreeClear(pSdb->tmpDir); } diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 6891f94244..1ca5097ce6 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -445,82 +445,84 @@ int32_t sdbDeploy(SSdb *pSdb) { return 0; } -static SSdbIter *sdbOpenIter(SSdb *pSdb) { - char datafile[PATH_MAX] = {0}; - char tmpfile[PATH_MAX] = {0}; - snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); - snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); - - taosThreadMutexLock(&pSdb->filelock); - if (taosCopyFile(datafile, tmpfile) < 0) { - taosThreadMutexUnlock(&pSdb->filelock); - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr()); - return NULL; - } - taosThreadMutexUnlock(&pSdb->filelock); - +static SSdbIter *sdbCreateIter(SSdb *pSdb) { SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter)); if (pIter == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pIter->file = taosOpenFile(tmpfile, TD_FILE_READ); - if (pIter->file == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to read file:%s since %s", tmpfile, terrstr()); + char name[PATH_MAX + 100] = {0}; + snprintf(name, sizeof(name), "%s%ssdb.data.%" PRIu64, pSdb->tmpDir, TD_DIRSEP, (uint64_t)pIter); + pIter->name = strdup(name); + if (pIter->name == NULL) { taosMemoryFree(pIter); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } return pIter; } -static void sdbCloseIter(SSdb *pSdb, SSdbIter *pIter) { +static void sdbCloseIter(SSdbIter *pIter) { if (pIter == NULL) return; + if (pIter->file != NULL) { taosCloseFile(&pIter->file); + pIter->file = NULL; } - char tmpfile[PATH_MAX] = {0}; - snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); - taosRemoveFile(tmpfile); + if (pIter->name != NULL) { + taosRemoveFile(pIter->name); + taosMemoryFree(pIter->name); + pIter->name = NULL; + } + mInfo("sdbiter:%p, is closed, total:%" PRId64, pIter, pIter->total); taosMemoryFree(pIter); - mInfo("sdbiter:%p, is closed", pIter); } -static SSdbIter *sdbGetIter(SSdb *pSdb, SSdbIter **ppIter) { - SSdbIter *pIter = NULL; - if (ppIter != NULL) pIter = *ppIter; - - if (pIter == NULL) { - pIter = sdbOpenIter(pSdb); - if (pIter != NULL) { - mInfo("sdbiter:%p, is created to read snapshot", pIter); - *ppIter = pIter; - } else { - mError("failed to create sdbiter to read snapshot since %s", terrstr()); - *ppIter = NULL; - return NULL; - } - } else { - mInfo("sdbiter:%p, continue to read snapshot, total:%" PRId64, pIter, pIter->total); - } - - return pIter; -} - -int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, void **ppBuf, int32_t *len) { - SSdbIter *pIter = sdbGetIter(pSdb, ppIter); +int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { + SSdbIter *pIter = sdbCreateIter(pSdb); if (pIter == NULL) return -1; + char datafile[PATH_MAX] = {0}; + snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); + + taosThreadMutexLock(&pSdb->filelock); + if (taosCopyFile(datafile, pIter->name) < 0) { + taosThreadMutexUnlock(&pSdb->filelock); + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to copy file %s to %s since %s", datafile, pIter->name, terrstr()); + sdbCloseIter(pIter); + return -1; + } + taosThreadMutexUnlock(&pSdb->filelock); + + pIter->file = taosOpenFile(pIter->name, TD_FILE_READ); + if (pIter->file == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to open file:%s since %s", pIter->name, terrstr()); + sdbCloseIter(pIter); + return -1; + } + + *ppIter = pIter; + mInfo("sdbiter:%p, is created to read snapshot, file:%s", pIter, pIter->name); + return 0; +} + +int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter) { + sdbCloseIter(pIter); + return 0; +} + +int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len) { int32_t maxlen = 100; void *pBuf = taosMemoryCalloc(1, maxlen); if (pBuf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - sdbCloseIter(pSdb, pIter); + sdbCloseIter(pIter); return -1; } @@ -530,17 +532,12 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, void **ppBuf, int32_t *le mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total); *ppBuf = NULL; *len = 0; - *ppIter = NULL; - sdbCloseIter(pSdb, pIter); taosMemoryFree(pBuf); return -1; } else if (readlen == 0) { mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total); *ppBuf = NULL; *len = 0; - *ppIter = NULL; - *ppIter = NULL; - sdbCloseIter(pSdb, pIter); taosMemoryFree(pBuf); return 0; } else { // (readlen <= maxlen) @@ -552,46 +549,63 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, void **ppBuf, int32_t *le } } -int32_t sdbApplySnapshot(SSdb *pSdb, void *pBuf, int32_t len) { - char datafile[PATH_MAX] = {0}; - char tmpfile[PATH_MAX] = {0}; - snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); - snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); +int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter) { + SSdbIter *pIter = sdbCreateIter(pSdb); + if (pIter == NULL) return -1; - TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pFile == NULL) { + pIter->file = taosOpenFile(pIter->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pIter->file == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to write %s since %s", tmpfile, terrstr()); + mError("failed to open %s since %s", pIter->name, terrstr()); return -1; } - int32_t writelen = taosWriteFile(pFile, pBuf, len); + *ppIter = pIter; + mInfo("sdbiter:%p, is created to write snapshot, file:%s", pIter, pIter->name); + return 0; +} + +int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply) { + int32_t code = 0; + + if (!isApply) { + sdbCloseIter(pIter); + mInfo("sdbiter:%p, not apply to sdb", pIter); + return 0; + } + + taosFsyncFile(pIter->file); + taosCloseFile(&pIter->file); + pIter->file = NULL; + + char datafile[PATH_MAX] = {0}; + snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); + if (taosRenameFile(pIter->name, datafile) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("sdbiter:%p, failed to rename file %s to %s since %s", pIter, pIter->name, datafile, terrstr()); + sdbCloseIter(pIter); + return -1; + } + + sdbCloseIter(pIter); + if (sdbReadFile(pSdb) != 0) { + mError("sdbiter:%p, failed to read from %s since %s", pIter, datafile, terrstr()); + return -1; + } + + mInfo("sdbiter:%p, successfully applyed to sdb", pIter); + return 0; +} + +int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) { + int32_t writelen = taosWriteFile(pIter->file, pBuf, len); if (writelen != len) { terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to write %s since %s", tmpfile, terrstr()); - taosCloseFile(&pFile); - return -1; - } - - if (taosFsyncFile(pFile) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to fsync %s since %s", tmpfile, terrstr()); - taosCloseFile(&pFile); - return -1; - } - - (void)taosCloseFile(&pFile); - - if (taosRenameFile(tmpfile, datafile) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to rename file %s to %s since %s", tmpfile, datafile, terrstr()); - return -1; - } - - if (sdbReadFile(pSdb) != 0) { - mError("failed to read from %s since %s", datafile, terrstr()); + mError("failed to write len:%d since %s, total:%" PRId64, len, terrstr(), pIter->total); return -1; } + pIter->total += writelen; + mInfo("sdbiter:%p, write:%d bytes to snapshot, total:%" PRId64, pIter, writelen, pIter->total); return 0; } \ No newline at end of file