refactor: modify FpSnapshotRead, FpSnapshotApply
This commit is contained in:
commit
9fc7536a0b
|
@ -44,12 +44,9 @@ extern "C" {
|
|||
}
|
||||
|
||||
#define SDB_GET_INT64(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt64, int64_t)
|
||||
|
||||
#define SDB_GET_INT32(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt32, int32_t)
|
||||
|
||||
#define SDB_GET_INT16(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt16, int16_t)
|
||||
|
||||
#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t)
|
||||
#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t)
|
||||
|
||||
#define SDB_GET_RESERVE(pRaw, dataPos, valLen, pos) \
|
||||
{ \
|
||||
|
@ -66,11 +63,8 @@ extern "C" {
|
|||
}
|
||||
|
||||
#define SDB_SET_INT64(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt64, int64_t)
|
||||
|
||||
#define SDB_SET_INT32(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt32, int32_t)
|
||||
|
||||
#define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t)
|
||||
|
||||
#define SDB_SET_INT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t)
|
||||
|
||||
#define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \
|
||||
|
@ -356,6 +350,14 @@ typedef struct SSdb {
|
|||
SdbDecodeFp decodeFps[SDB_MAX];
|
||||
} SSdb;
|
||||
|
||||
typedef struct SSdbIter {
|
||||
TdFilePtr file;
|
||||
int64_t readlen;
|
||||
} SSdbIter;
|
||||
|
||||
SSdbIter *sdbIterInit(SSdb *pSdb);
|
||||
SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *iter, char **ppBuf, int32_t *len);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -76,12 +76,12 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
SWal *pWal;
|
||||
int32_t errCode;
|
||||
bool restored;
|
||||
sem_t syncSem;
|
||||
int64_t sync;
|
||||
ESyncState state;
|
||||
bool isStandBy;
|
||||
bool restored;
|
||||
int32_t errCode;
|
||||
} SSyncMgmt;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -28,17 +28,15 @@ int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
|||
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
||||
|
||||
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
SMnode *pMnode = pFsm->data;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
SSdbRaw *pRaw = pMsg->pCont;
|
||||
SMnode *pMnode = pFsm->data;
|
||||
SSdbRaw *pRaw = pMsg->pCont;
|
||||
|
||||
mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state));
|
||||
sdbWriteWithoutFree(pSdb, pRaw);
|
||||
sdbSetApplyIndex(pSdb, cbMeta.index);
|
||||
sdbSetApplyTerm(pSdb, cbMeta.term);
|
||||
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
|
||||
sdbSetApplyIndex(pMnode->pSdb, cbMeta.index);
|
||||
sdbSetApplyTerm(pMnode->pSdb, cbMeta.term);
|
||||
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
tsem_post(&pMnode->syncMgmt.syncSem);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -64,8 +62,6 @@ int32_t mndSnapshotRead(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, void*
|
|||
} else {
|
||||
pIter = iter;
|
||||
}
|
||||
pIter = sdbIterRead(pIter, ppBuf, len);
|
||||
return pIter;
|
||||
*/
|
||||
|
||||
return 0;
|
||||
|
@ -78,7 +74,11 @@ int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char
|
|||
}
|
||||
|
||||
void mndReConfig(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
|
||||
|
||||
if (cbMeta.code == 0) {
|
||||
// config change success
|
||||
} else {
|
||||
// config change failed
|
||||
}
|
||||
}
|
||||
|
||||
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||
|
|
|
@ -392,3 +392,66 @@ int32_t sdbDeploy(SSdb *pSdb) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSdbIter *sdbIterInit(SSdb *pSdb) {
|
||||
char datafile[PATH_MAX] = {0};
|
||||
char tmpfile[PATH_MAX] = {0};
|
||||
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
||||
snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
|
||||
|
||||
if (taosCopyFile(datafile, tmpfile) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr());
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
|
||||
if (pIter == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pIter->file = taosOpenFile(tmpfile, TD_FILE_READ);
|
||||
if (pIter->file == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to read snapshot file:%s since %s", tmpfile, terrstr());
|
||||
taosMemoryFree(pIter);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mDebug("start to read snapshot file:%s, iter:%p", tmpfile, pIter);
|
||||
return pIter;
|
||||
}
|
||||
|
||||
SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *pIter, char **ppBuf, int32_t *buflen) {
|
||||
const int32_t maxlen = 100;
|
||||
|
||||
char *pBuf = taosMemoryCalloc(1, maxlen);
|
||||
if (pBuf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
|
||||
if (readlen == 0) {
|
||||
mTrace("read snapshot to the end, readlen:%" PRId64, pIter->readlen);
|
||||
taosMemoryFree(pBuf);
|
||||
taosCloseFile(&pIter->file);
|
||||
taosMemoryFree(pIter);
|
||||
pIter = NULL;
|
||||
} else if (readlen < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to read snapshot since %s, readlen:%" PRId64, terrstr(), pIter->readlen);
|
||||
taosMemoryFree(pBuf);
|
||||
taosCloseFile(&pIter->file);
|
||||
taosMemoryFree(pIter);
|
||||
pIter = NULL;
|
||||
} else {
|
||||
pIter->readlen += readlen;
|
||||
mTrace("read snapshot, readlen:%" PRId64, pIter->readlen);
|
||||
*ppBuf = pBuf;
|
||||
*buflen = readlen;
|
||||
}
|
||||
|
||||
return pIter;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue