refactor: mnode snapshot
This commit is contained in:
parent
f99b20aaa9
commit
d2d0ae20aa
|
@ -44,12 +44,9 @@ extern "C" {
|
||||||
}
|
}
|
||||||
|
|
||||||
#define SDB_GET_INT64(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt64, int64_t)
|
#define SDB_GET_INT64(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt64, int64_t)
|
||||||
|
|
||||||
#define SDB_GET_INT32(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt32, int32_t)
|
#define SDB_GET_INT32(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt32, int32_t)
|
||||||
|
|
||||||
#define SDB_GET_INT16(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt16, int16_t)
|
#define SDB_GET_INT16(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt16, int16_t)
|
||||||
|
#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t)
|
||||||
#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t)
|
|
||||||
|
|
||||||
#define SDB_GET_RESERVE(pRaw, dataPos, valLen, pos) \
|
#define SDB_GET_RESERVE(pRaw, dataPos, valLen, pos) \
|
||||||
{ \
|
{ \
|
||||||
|
@ -66,11 +63,8 @@ extern "C" {
|
||||||
}
|
}
|
||||||
|
|
||||||
#define SDB_SET_INT64(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt64, int64_t)
|
#define SDB_SET_INT64(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt64, int64_t)
|
||||||
|
|
||||||
#define SDB_SET_INT32(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt32, int32_t)
|
#define SDB_SET_INT32(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt32, int32_t)
|
||||||
|
|
||||||
#define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t)
|
#define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t)
|
||||||
|
|
||||||
#define SDB_SET_INT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t)
|
#define SDB_SET_INT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t)
|
||||||
|
|
||||||
#define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \
|
#define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \
|
||||||
|
@ -356,6 +350,14 @@ typedef struct SSdb {
|
||||||
SdbDecodeFp decodeFps[SDB_MAX];
|
SdbDecodeFp decodeFps[SDB_MAX];
|
||||||
} SSdb;
|
} SSdb;
|
||||||
|
|
||||||
|
typedef struct SSdbIter {
|
||||||
|
TdFilePtr file;
|
||||||
|
int64_t readlen;
|
||||||
|
} SSdbIter;
|
||||||
|
|
||||||
|
SSdbIter *sdbIterInit(SSdb *pSdb);
|
||||||
|
SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *iter, char **ppBuf, int32_t *len);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -76,12 +76,12 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SWal *pWal;
|
SWal *pWal;
|
||||||
int32_t errCode;
|
|
||||||
bool restored;
|
|
||||||
sem_t syncSem;
|
sem_t syncSem;
|
||||||
int64_t sync;
|
int64_t sync;
|
||||||
ESyncState state;
|
ESyncState state;
|
||||||
bool isStandBy;
|
bool isStandBy;
|
||||||
|
bool restored;
|
||||||
|
int32_t errCode;
|
||||||
} SSyncMgmt;
|
} SSyncMgmt;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -22,17 +22,15 @@ int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue
|
||||||
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
||||||
|
|
||||||
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdbRaw *pRaw = pMsg->pCont;
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
|
||||||
SSdbRaw *pRaw = pMsg->pCont;
|
|
||||||
|
|
||||||
mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state));
|
mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state));
|
||||||
sdbWriteWithoutFree(pSdb, pRaw);
|
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
|
||||||
sdbSetApplyIndex(pSdb, cbMeta.index);
|
sdbSetApplyIndex(pMnode->pSdb, cbMeta.index);
|
||||||
sdbSetApplyTerm(pSdb, cbMeta.term);
|
sdbSetApplyTerm(pMnode->pSdb, cbMeta.term);
|
||||||
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
|
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
|
||||||
tsem_post(&pMgmt->syncSem);
|
tsem_post(&pMnode->syncMgmt.syncSem);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,20 +47,15 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
|
||||||
pMnode->syncMgmt.restored = true;
|
pMnode->syncMgmt.restored = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* mndSnapshotRead(struct SSyncFSM* pFsm, const SSnapshot* snapshot, void* iter, char** ppBuf, int32_t* len) {
|
void *mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *snapshot, void *iter, char **ppBuf, int32_t *len) {
|
||||||
/*
|
SMnode *pMnode = pFsm->data;
|
||||||
SMnode *pMnode = pFsm->data;
|
SSdbIter *pIter = iter;
|
||||||
SSdbIter *pIter;
|
|
||||||
if (iter == NULL) {
|
|
||||||
pIter = sdbIterInit(pMnode->sdb)
|
|
||||||
} else {
|
|
||||||
pIter = iter;
|
|
||||||
}
|
|
||||||
pIter = sdbIterRead(pIter, ppBuf, len);
|
|
||||||
return pIter;
|
|
||||||
*/
|
|
||||||
|
|
||||||
return NULL;
|
if (iter == NULL) {
|
||||||
|
pIter = sdbIterInit(pMnode->pSdb);
|
||||||
|
}
|
||||||
|
|
||||||
|
return sdbIterRead(pMnode->pSdb, pIter, ppBuf, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len) {
|
int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len) {
|
||||||
|
|
|
@ -392,3 +392,66 @@ int32_t sdbDeploy(SSdb *pSdb) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSdbIter *sdbIterInit(SSdb *pSdb) {
|
||||||
|
char datafile[PATH_MAX] = {0};
|
||||||
|
char tmpfile[PATH_MAX] = {0};
|
||||||
|
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
||||||
|
snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
|
||||||
|
|
||||||
|
if (taosCopyFile(datafile, tmpfile) != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr());
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
|
||||||
|
if (pIter == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter->file = taosOpenFile(tmpfile, TD_FILE_READ);
|
||||||
|
if (pIter->file == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
mError("failed to read snapshot file:%s since %s", tmpfile, terrstr());
|
||||||
|
taosMemoryFree(pIter);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
mDebug("start to read snapshot file:%s, iter:%p", tmpfile, pIter);
|
||||||
|
return pIter;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *pIter, char **ppBuf, int32_t *buflen) {
|
||||||
|
const int32_t maxlen = 100;
|
||||||
|
|
||||||
|
char *pBuf = taosMemoryCalloc(1, maxlen);
|
||||||
|
if (pBuf == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
|
||||||
|
if (readlen == 0) {
|
||||||
|
mTrace("read snapshot to the end, readlen:%" PRId64, pIter->readlen);
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
taosCloseFile(&pIter->file);
|
||||||
|
taosMemoryFree(pIter);
|
||||||
|
pIter = NULL;
|
||||||
|
} else if (readlen < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
mError("failed to read snapshot since %s, readlen:%" PRId64, terrstr(), pIter->readlen);
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
taosCloseFile(&pIter->file);
|
||||||
|
taosMemoryFree(pIter);
|
||||||
|
pIter = NULL;
|
||||||
|
} else {
|
||||||
|
pIter->readlen += readlen;
|
||||||
|
mTrace("read snapshot, readlen:%" PRId64, pIter->readlen);
|
||||||
|
*ppBuf = pBuf;
|
||||||
|
*buflen = readlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pIter;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue