enh: sdb snapshot
This commit is contained in:
parent
b5a1131c25
commit
576a7ee0a7
|
@ -71,7 +71,7 @@ int32_t mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, void
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
mInfo("start to read snapshot from sdb");
|
mInfo("start to read snapshot from sdb");
|
||||||
|
|
||||||
int32_t code = sdbReadSnapshot(pMnode->pSdb, (SSdbIter **)ppIter, ppBuf, len);
|
int32_t code = sdbReadSnapshot(pMnode->pSdb, (SSdbIter **)ppIter, (void**)ppBuf, len);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("failed to read snapshot from sdb since %s", terrstr());
|
mError("failed to read snapshot from sdb since %s", terrstr());
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -895,7 +895,28 @@ TEST_F(MndTestSdb, 01_Read_Str) {
|
||||||
ASSERT_EQ(code, TSDB_CODE_SDB_OBJ_CREATING);
|
ASSERT_EQ(code, TSDB_CODE_SDB_OBJ_CREATING);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
void *sdbbuf = taosMemoryMalloc(1024 * 1024);
|
||||||
|
int32_t total = 0;
|
||||||
|
void *pBuf = NULL;
|
||||||
|
int32_t len = 0;
|
||||||
|
SSdbIter *pIter = NULL;
|
||||||
|
while (sdbReadSnapshot(pSdb, &pIter, &pBuf, &len) == 0) {
|
||||||
|
if (pBuf != NULL && len != 0) {
|
||||||
|
memcpy((char *)sdbbuf + total, pBuf, len);
|
||||||
|
total += len;
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sdbApplySnapshot(pSdb, sdbbuf, total);
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_EQ(sdbGetSize(pSdb, SDB_CONSUMER), 1);
|
||||||
|
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_CONSUMER), 4);
|
||||||
|
|
||||||
sdbCleanup(pSdb);
|
sdbCleanup(pSdb);
|
||||||
ASSERT_EQ(mnode.insertTimes, 5);
|
ASSERT_EQ(mnode.insertTimes, 9);
|
||||||
ASSERT_EQ(mnode.deleteTimes, 5);
|
ASSERT_EQ(mnode.deleteTimes, 9);
|
||||||
}
|
}
|
|
@ -380,8 +380,8 @@ SSdbRow *sdbAllocRow(int32_t objSize);
|
||||||
void *sdbGetRowObj(SSdbRow *pRow);
|
void *sdbGetRowObj(SSdbRow *pRow);
|
||||||
void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc);
|
void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc);
|
||||||
|
|
||||||
int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *len);
|
int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, void **ppBuf, int32_t *len);
|
||||||
int32_t sdbApplySnapshot(SSdb *pSdb, char *pBuf, int32_t len);
|
int32_t sdbApplySnapshot(SSdb *pSdb, void *pBuf, int32_t len);
|
||||||
|
|
||||||
const char *sdbTableName(ESdbType type);
|
const char *sdbTableName(ESdbType type);
|
||||||
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
|
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
|
||||||
|
|
|
@ -452,7 +452,7 @@ static SSdbIter *sdbOpenIter(SSdb *pSdb) {
|
||||||
snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
|
snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
|
||||||
|
|
||||||
taosThreadMutexLock(&pSdb->filelock);
|
taosThreadMutexLock(&pSdb->filelock);
|
||||||
if (taosCopyFile(datafile, tmpfile) != 0) {
|
if (taosCopyFile(datafile, tmpfile) < 0) {
|
||||||
taosThreadMutexUnlock(&pSdb->filelock);
|
taosThreadMutexUnlock(&pSdb->filelock);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr());
|
mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr());
|
||||||
|
@ -512,12 +512,12 @@ static SSdbIter *sdbGetIter(SSdb *pSdb, SSdbIter **ppIter) {
|
||||||
return pIter;
|
return pIter;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *len) {
|
int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, void **ppBuf, int32_t *len) {
|
||||||
SSdbIter *pIter = sdbGetIter(pSdb, ppIter);
|
SSdbIter *pIter = sdbGetIter(pSdb, ppIter);
|
||||||
if (pIter == NULL) return -1;
|
if (pIter == NULL) return -1;
|
||||||
|
|
||||||
int32_t maxlen = 100;
|
int32_t maxlen = 100;
|
||||||
char *pBuf = taosMemoryCalloc(1, maxlen);
|
void *pBuf = taosMemoryCalloc(1, maxlen);
|
||||||
if (pBuf == NULL) {
|
if (pBuf == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
sdbCloseIter(pSdb, pIter);
|
sdbCloseIter(pSdb, pIter);
|
||||||
|
@ -525,7 +525,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
|
int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
|
||||||
if (readlen < 0 || (readlen == 0 && errno != 0)) {
|
if (readlen < 0 || readlen > maxlen) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total);
|
mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total);
|
||||||
*ppBuf = NULL;
|
*ppBuf = NULL;
|
||||||
|
@ -539,35 +539,20 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le
|
||||||
*ppBuf = NULL;
|
*ppBuf = NULL;
|
||||||
*len = 0;
|
*len = 0;
|
||||||
*ppIter = NULL;
|
*ppIter = NULL;
|
||||||
|
*ppIter = NULL;
|
||||||
sdbCloseIter(pSdb, pIter);
|
sdbCloseIter(pSdb, pIter);
|
||||||
taosMemoryFree(pBuf);
|
taosMemoryFree(pBuf);
|
||||||
return 0;
|
return 0;
|
||||||
} else if ((readlen < maxlen && errno != 0) || readlen == maxlen) {
|
} else { // (readlen <= maxlen)
|
||||||
pIter->total += readlen;
|
pIter->total += readlen;
|
||||||
mInfo("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
|
mInfo("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
|
||||||
*ppBuf = pBuf;
|
*ppBuf = pBuf;
|
||||||
*len = readlen;
|
*len = readlen;
|
||||||
return 0;
|
return 0;
|
||||||
} else if (readlen < maxlen && errno == 0) {
|
|
||||||
mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
|
|
||||||
*ppBuf = pBuf;
|
|
||||||
*len = readlen;
|
|
||||||
*ppIter = NULL;
|
|
||||||
sdbCloseIter(pSdb, pIter);
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
// impossible
|
|
||||||
mError("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
|
|
||||||
*ppBuf = NULL;
|
|
||||||
*len = 0;
|
|
||||||
*ppIter = NULL;
|
|
||||||
sdbCloseIter(pSdb, pIter);
|
|
||||||
taosMemoryFree(pBuf);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sdbApplySnapshot(SSdb *pSdb, char *pBuf, int32_t len) {
|
int32_t sdbApplySnapshot(SSdb *pSdb, void *pBuf, int32_t len) {
|
||||||
char datafile[PATH_MAX] = {0};
|
char datafile[PATH_MAX] = {0};
|
||||||
char tmpfile[PATH_MAX] = {0};
|
char tmpfile[PATH_MAX] = {0};
|
||||||
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
||||||
|
|
Loading…
Reference in New Issue