enh: sdb snapshot
This commit is contained in:
parent
576a7ee0a7
commit
147c7ee0b4
|
@ -71,33 +71,28 @@ int32_t mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, void
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
mInfo("start to read snapshot from sdb");
|
mInfo("start to read snapshot from sdb");
|
||||||
|
|
||||||
int32_t code = sdbReadSnapshot(pMnode->pSdb, (SSdbIter **)ppIter, (void**)ppBuf, len);
|
// sdbStartRead
|
||||||
if (code != 0) {
|
// sdbDoRead
|
||||||
mError("failed to read snapshot from sdb since %s", terrstr());
|
// sdbStopRead
|
||||||
} else {
|
|
||||||
if (*ppIter == NULL) {
|
|
||||||
mInfo("successfully to read snapshot from sdb");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char *pBuf, int32_t len) {
|
int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char *pBuf, int32_t len) {
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
mndSetRestore(pMnode, false);
|
|
||||||
mInfo("start to apply snapshot to sdb, len:%d", len);
|
|
||||||
|
|
||||||
int32_t code = sdbApplySnapshot(pMnode->pSdb, pBuf, len);
|
// sdbStartWrite
|
||||||
if (code != 0) {
|
// sdbDoWrite
|
||||||
mError("failed to apply snapshot to sdb, len:%d", len);
|
|
||||||
} else {
|
mndSetRestore(pMnode, false);
|
||||||
mInfo("successfully to apply snapshot to sdb, len:%d", len);
|
mInfo("start to apply snapshot to sdb");
|
||||||
mndSetRestore(pMnode, true);
|
|
||||||
}
|
// sdbStopWrite
|
||||||
|
mInfo("successfully to apply snapshot to sdb");
|
||||||
|
mndSetRestore(pMnode, true);
|
||||||
|
|
||||||
// taosMemoryFree(pBuf);
|
// taosMemoryFree(pBuf);
|
||||||
return code;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
|
void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
|
||||||
|
|
|
@ -492,7 +492,7 @@ TEST_F(MndTestSdb, 01_Write_Str) {
|
||||||
|
|
||||||
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2);
|
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2);
|
||||||
ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1);
|
ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1);
|
||||||
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2 );
|
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2);
|
||||||
sdbSetApplyIndex(pSdb, -1);
|
sdbSetApplyIndex(pSdb, -1);
|
||||||
ASSERT_EQ(sdbGetApplyIndex(pSdb), -1);
|
ASSERT_EQ(sdbGetApplyIndex(pSdb), -1);
|
||||||
ASSERT_EQ(mnode.insertTimes, 2);
|
ASSERT_EQ(mnode.insertTimes, 2);
|
||||||
|
@ -896,21 +896,28 @@ TEST_F(MndTestSdb, 01_Read_Str) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
void *sdbbuf = taosMemoryMalloc(1024 * 1024);
|
SSdbIter *pReader = NULL;
|
||||||
int32_t total = 0;
|
SSdbIter *pWritter = NULL;
|
||||||
void *pBuf = NULL;
|
void *pBuf = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
SSdbIter *pIter = NULL;
|
int32_t code = 0;
|
||||||
while (sdbReadSnapshot(pSdb, &pIter, &pBuf, &len) == 0) {
|
|
||||||
|
code = sdbStartRead(pSdb, &pReader);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
code = sdbStartWrite(pSdb, &pWritter);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
while (sdbDoRead(pSdb, pReader, &pBuf, &len) == 0) {
|
||||||
if (pBuf != NULL && len != 0) {
|
if (pBuf != NULL && len != 0) {
|
||||||
memcpy((char *)sdbbuf + total, pBuf, len);
|
sdbDoWrite(pSdb, pWritter, pBuf, len);
|
||||||
total += len;
|
|
||||||
taosMemoryFree(pBuf);
|
taosMemoryFree(pBuf);
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sdbApplySnapshot(pSdb, sdbbuf, total);
|
|
||||||
|
sdbStopRead(pSdb, pReader);
|
||||||
|
sdbStopWrite(pSdb, pWritter, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT_EQ(sdbGetSize(pSdb, SDB_CONSUMER), 1);
|
ASSERT_EQ(sdbGetSize(pSdb, SDB_CONSUMER), 1);
|
||||||
|
|
|
@ -187,6 +187,7 @@ typedef struct SSdb {
|
||||||
typedef struct SSdbIter {
|
typedef struct SSdbIter {
|
||||||
TdFilePtr file;
|
TdFilePtr file;
|
||||||
int64_t total;
|
int64_t total;
|
||||||
|
char *name;
|
||||||
} SSdbIter;
|
} SSdbIter;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -380,8 +381,13 @@ SSdbRow *sdbAllocRow(int32_t objSize);
|
||||||
void *sdbGetRowObj(SSdbRow *pRow);
|
void *sdbGetRowObj(SSdbRow *pRow);
|
||||||
void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc);
|
void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc);
|
||||||
|
|
||||||
int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, void **ppBuf, int32_t *len);
|
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter);
|
||||||
int32_t sdbApplySnapshot(SSdb *pSdb, void *pBuf, int32_t len);
|
int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter);
|
||||||
|
int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len);
|
||||||
|
|
||||||
|
int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter);
|
||||||
|
int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply);
|
||||||
|
int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len);
|
||||||
|
|
||||||
const char *sdbTableName(ESdbType type);
|
const char *sdbTableName(ESdbType type);
|
||||||
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
|
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
|
||||||
|
|
|
@ -71,6 +71,7 @@ void sdbCleanup(SSdb *pSdb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSdb->tmpDir != NULL) {
|
if (pSdb->tmpDir != NULL) {
|
||||||
|
taosRemoveDir(pSdb->tmpDir);
|
||||||
taosMemoryFreeClear(pSdb->tmpDir);
|
taosMemoryFreeClear(pSdb->tmpDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -445,82 +445,84 @@ int32_t sdbDeploy(SSdb *pSdb) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSdbIter *sdbOpenIter(SSdb *pSdb) {
|
static SSdbIter *sdbCreateIter(SSdb *pSdb) {
|
||||||
char datafile[PATH_MAX] = {0};
|
|
||||||
char tmpfile[PATH_MAX] = {0};
|
|
||||||
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
|
||||||
snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pSdb->filelock);
|
|
||||||
if (taosCopyFile(datafile, tmpfile) < 0) {
|
|
||||||
taosThreadMutexUnlock(&pSdb->filelock);
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr());
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
taosThreadMutexUnlock(&pSdb->filelock);
|
|
||||||
|
|
||||||
SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
|
SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pIter->file = taosOpenFile(tmpfile, TD_FILE_READ);
|
char name[PATH_MAX + 100] = {0};
|
||||||
if (pIter->file == NULL) {
|
snprintf(name, sizeof(name), "%s%ssdb.data.%" PRIu64, pSdb->tmpDir, TD_DIRSEP, (uint64_t)pIter);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
pIter->name = strdup(name);
|
||||||
mError("failed to read file:%s since %s", tmpfile, terrstr());
|
if (pIter->name == NULL) {
|
||||||
taosMemoryFree(pIter);
|
taosMemoryFree(pIter);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pIter;
|
return pIter;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void sdbCloseIter(SSdb *pSdb, SSdbIter *pIter) {
|
static void sdbCloseIter(SSdbIter *pIter) {
|
||||||
if (pIter == NULL) return;
|
if (pIter == NULL) return;
|
||||||
|
|
||||||
if (pIter->file != NULL) {
|
if (pIter->file != NULL) {
|
||||||
taosCloseFile(&pIter->file);
|
taosCloseFile(&pIter->file);
|
||||||
|
pIter->file = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
char tmpfile[PATH_MAX] = {0};
|
if (pIter->name != NULL) {
|
||||||
snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
|
taosRemoveFile(pIter->name);
|
||||||
taosRemoveFile(tmpfile);
|
taosMemoryFree(pIter->name);
|
||||||
|
pIter->name = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
mInfo("sdbiter:%p, is closed, total:%" PRId64, pIter, pIter->total);
|
||||||
taosMemoryFree(pIter);
|
taosMemoryFree(pIter);
|
||||||
mInfo("sdbiter:%p, is closed", pIter);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSdbIter *sdbGetIter(SSdb *pSdb, SSdbIter **ppIter) {
|
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
|
||||||
SSdbIter *pIter = NULL;
|
SSdbIter *pIter = sdbCreateIter(pSdb);
|
||||||
if (ppIter != NULL) pIter = *ppIter;
|
|
||||||
|
|
||||||
if (pIter == NULL) {
|
|
||||||
pIter = sdbOpenIter(pSdb);
|
|
||||||
if (pIter != NULL) {
|
|
||||||
mInfo("sdbiter:%p, is created to read snapshot", pIter);
|
|
||||||
*ppIter = pIter;
|
|
||||||
} else {
|
|
||||||
mError("failed to create sdbiter to read snapshot since %s", terrstr());
|
|
||||||
*ppIter = NULL;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
mInfo("sdbiter:%p, continue to read snapshot, total:%" PRId64, pIter, pIter->total);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pIter;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, void **ppBuf, int32_t *len) {
|
|
||||||
SSdbIter *pIter = sdbGetIter(pSdb, ppIter);
|
|
||||||
if (pIter == NULL) return -1;
|
if (pIter == NULL) return -1;
|
||||||
|
|
||||||
|
char datafile[PATH_MAX] = {0};
|
||||||
|
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pSdb->filelock);
|
||||||
|
if (taosCopyFile(datafile, pIter->name) < 0) {
|
||||||
|
taosThreadMutexUnlock(&pSdb->filelock);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
mError("failed to copy file %s to %s since %s", datafile, pIter->name, terrstr());
|
||||||
|
sdbCloseIter(pIter);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&pSdb->filelock);
|
||||||
|
|
||||||
|
pIter->file = taosOpenFile(pIter->name, TD_FILE_READ);
|
||||||
|
if (pIter->file == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
mError("failed to open file:%s since %s", pIter->name, terrstr());
|
||||||
|
sdbCloseIter(pIter);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppIter = pIter;
|
||||||
|
mInfo("sdbiter:%p, is created to read snapshot, file:%s", pIter, pIter->name);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter) {
|
||||||
|
sdbCloseIter(pIter);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len) {
|
||||||
int32_t maxlen = 100;
|
int32_t maxlen = 100;
|
||||||
void *pBuf = taosMemoryCalloc(1, maxlen);
|
void *pBuf = taosMemoryCalloc(1, maxlen);
|
||||||
if (pBuf == NULL) {
|
if (pBuf == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
sdbCloseIter(pSdb, pIter);
|
sdbCloseIter(pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -530,17 +532,12 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, void **ppBuf, int32_t *le
|
||||||
mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total);
|
mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total);
|
||||||
*ppBuf = NULL;
|
*ppBuf = NULL;
|
||||||
*len = 0;
|
*len = 0;
|
||||||
*ppIter = NULL;
|
|
||||||
sdbCloseIter(pSdb, pIter);
|
|
||||||
taosMemoryFree(pBuf);
|
taosMemoryFree(pBuf);
|
||||||
return -1;
|
return -1;
|
||||||
} else if (readlen == 0) {
|
} else if (readlen == 0) {
|
||||||
mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
|
mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
|
||||||
*ppBuf = NULL;
|
*ppBuf = NULL;
|
||||||
*len = 0;
|
*len = 0;
|
||||||
*ppIter = NULL;
|
|
||||||
*ppIter = NULL;
|
|
||||||
sdbCloseIter(pSdb, pIter);
|
|
||||||
taosMemoryFree(pBuf);
|
taosMemoryFree(pBuf);
|
||||||
return 0;
|
return 0;
|
||||||
} else { // (readlen <= maxlen)
|
} else { // (readlen <= maxlen)
|
||||||
|
@ -552,46 +549,63 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, void **ppBuf, int32_t *le
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sdbApplySnapshot(SSdb *pSdb, void *pBuf, int32_t len) {
|
int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter) {
|
||||||
char datafile[PATH_MAX] = {0};
|
SSdbIter *pIter = sdbCreateIter(pSdb);
|
||||||
char tmpfile[PATH_MAX] = {0};
|
if (pIter == NULL) return -1;
|
||||||
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
|
||||||
snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
|
|
||||||
|
|
||||||
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
pIter->file = taosOpenFile(pIter->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
if (pFile == NULL) {
|
if (pIter->file == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
mError("failed to write %s since %s", tmpfile, terrstr());
|
mError("failed to open %s since %s", pIter->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t writelen = taosWriteFile(pFile, pBuf, len);
|
*ppIter = pIter;
|
||||||
|
mInfo("sdbiter:%p, is created to write snapshot, file:%s", pIter, pIter->name);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if (!isApply) {
|
||||||
|
sdbCloseIter(pIter);
|
||||||
|
mInfo("sdbiter:%p, not apply to sdb", pIter);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosFsyncFile(pIter->file);
|
||||||
|
taosCloseFile(&pIter->file);
|
||||||
|
pIter->file = NULL;
|
||||||
|
|
||||||
|
char datafile[PATH_MAX] = {0};
|
||||||
|
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
||||||
|
if (taosRenameFile(pIter->name, datafile) != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
mError("sdbiter:%p, failed to rename file %s to %s since %s", pIter, pIter->name, datafile, terrstr());
|
||||||
|
sdbCloseIter(pIter);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbCloseIter(pIter);
|
||||||
|
if (sdbReadFile(pSdb) != 0) {
|
||||||
|
mError("sdbiter:%p, failed to read from %s since %s", pIter, datafile, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mInfo("sdbiter:%p, successfully applyed to sdb", pIter);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) {
|
||||||
|
int32_t writelen = taosWriteFile(pIter->file, pBuf, len);
|
||||||
if (writelen != len) {
|
if (writelen != len) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
mError("failed to write %s since %s", tmpfile, terrstr());
|
mError("failed to write len:%d since %s, total:%" PRId64, len, terrstr(), pIter->total);
|
||||||
taosCloseFile(&pFile);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosFsyncFile(pFile) != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
mError("failed to fsync %s since %s", tmpfile, terrstr());
|
|
||||||
taosCloseFile(&pFile);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
(void)taosCloseFile(&pFile);
|
|
||||||
|
|
||||||
if (taosRenameFile(tmpfile, datafile) != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
mError("failed to rename file %s to %s since %s", tmpfile, datafile, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sdbReadFile(pSdb) != 0) {
|
|
||||||
mError("failed to read from %s since %s", datafile, terrstr());
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pIter->total += writelen;
|
||||||
|
mInfo("sdbiter:%p, write:%d bytes to snapshot, total:%" PRId64, pIter, writelen, pIter->total);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
Loading…
Reference in New Issue