diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index c6ab916ee1..debc088384 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -71,7 +71,7 @@ int32_t mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, void SMnode *pMnode = pFsm->data; mInfo("start to read snapshot from sdb"); - int32_t code = sdbReadSnapshot(pMnode->pSdb, (SSdbIter **)ppIter, ppBuf, len); + int32_t code = sdbReadSnapshot(pMnode->pSdb, (SSdbIter **)ppIter, (void**)ppBuf, len); if (code != 0) { mError("failed to read snapshot from sdb since %s", terrstr()); } else { diff --git a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp index df535c4456..80a5786853 100644 --- a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp +++ b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp @@ -895,7 +895,28 @@ TEST_F(MndTestSdb, 01_Read_Str) { ASSERT_EQ(code, TSDB_CODE_SDB_OBJ_CREATING); } + { + void *sdbbuf = taosMemoryMalloc(1024 * 1024); + int32_t total = 0; + void *pBuf = NULL; + int32_t len = 0; + SSdbIter *pIter = NULL; + while (sdbReadSnapshot(pSdb, &pIter, &pBuf, &len) == 0) { + if (pBuf != NULL && len != 0) { + memcpy((char *)sdbbuf + total, pBuf, len); + total += len; + taosMemoryFree(pBuf); + } else { + break; + } + } + sdbApplySnapshot(pSdb, sdbbuf, total); + } + + ASSERT_EQ(sdbGetSize(pSdb, SDB_CONSUMER), 1); + ASSERT_EQ(sdbGetTableVer(pSdb, SDB_CONSUMER), 4); + sdbCleanup(pSdb); - ASSERT_EQ(mnode.insertTimes, 5); - ASSERT_EQ(mnode.deleteTimes, 5); + ASSERT_EQ(mnode.insertTimes, 9); + ASSERT_EQ(mnode.deleteTimes, 9); } \ No newline at end of file diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 411d4c59ea..b34b8684a6 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -380,8 +380,8 @@ SSdbRow *sdbAllocRow(int32_t objSize); void *sdbGetRowObj(SSdbRow *pRow); void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc); -int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *len); -int32_t sdbApplySnapshot(SSdb *pSdb, char *pBuf, int32_t len); +int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, void **ppBuf, int32_t *len); +int32_t sdbApplySnapshot(SSdb *pSdb, void *pBuf, int32_t len); const char *sdbTableName(ESdbType type); void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index b2dcbd68e3..6891f94244 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -452,7 +452,7 @@ static SSdbIter *sdbOpenIter(SSdb *pSdb) { snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); taosThreadMutexLock(&pSdb->filelock); - if (taosCopyFile(datafile, tmpfile) != 0) { + if (taosCopyFile(datafile, tmpfile) < 0) { taosThreadMutexUnlock(&pSdb->filelock); terrno = TAOS_SYSTEM_ERROR(errno); mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr()); @@ -512,12 +512,12 @@ static SSdbIter *sdbGetIter(SSdb *pSdb, SSdbIter **ppIter) { return pIter; } -int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *len) { +int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, void **ppBuf, int32_t *len) { SSdbIter *pIter = sdbGetIter(pSdb, ppIter); if (pIter == NULL) return -1; int32_t maxlen = 100; - char *pBuf = taosMemoryCalloc(1, maxlen); + void *pBuf = taosMemoryCalloc(1, maxlen); if (pBuf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; sdbCloseIter(pSdb, pIter); @@ -525,7 +525,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le } int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen); - if (readlen < 0 || (readlen == 0 && errno != 0)) { + if (readlen < 0 || readlen > maxlen) { terrno = TAOS_SYSTEM_ERROR(errno); mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total); *ppBuf = NULL; @@ -539,35 +539,20 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le *ppBuf = NULL; *len = 0; *ppIter = NULL; + *ppIter = NULL; sdbCloseIter(pSdb, pIter); taosMemoryFree(pBuf); return 0; - } else if ((readlen < maxlen && errno != 0) || readlen == maxlen) { + } else { // (readlen <= maxlen) pIter->total += readlen; mInfo("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total); *ppBuf = pBuf; *len = readlen; return 0; - } else if (readlen < maxlen && errno == 0) { - mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total); - *ppBuf = pBuf; - *len = readlen; - *ppIter = NULL; - sdbCloseIter(pSdb, pIter); - return 0; - } else { - // impossible - mError("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total); - *ppBuf = NULL; - *len = 0; - *ppIter = NULL; - sdbCloseIter(pSdb, pIter); - taosMemoryFree(pBuf); - return -1; } } -int32_t sdbApplySnapshot(SSdb *pSdb, char *pBuf, int32_t len) { +int32_t sdbApplySnapshot(SSdb *pSdb, void *pBuf, int32_t len) { char datafile[PATH_MAX] = {0}; char tmpfile[PATH_MAX] = {0}; snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);