Merge pull request #18858 from taosdata/fix/td-21029
fix: vnode snapshot rollback
This commit is contained in:
commit
49fa443920
|
@ -87,17 +87,24 @@ typedef struct SCommitInfo SCommitInfo;
|
|||
#define VNODE_RSMA1_DIR "rsma1"
|
||||
#define VNODE_RSMA2_DIR "rsma2"
|
||||
|
||||
#define VND_INFO_FNAME "vnode.json"
|
||||
|
||||
// vnd.h
|
||||
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
|
||||
void vnodeBufPoolFree(SVBufPool* pPool, void* p);
|
||||
void vnodeBufPoolRef(SVBufPool* pPool);
|
||||
void vnodeBufPoolUnRef(SVBufPool* pPool);
|
||||
int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo);
|
||||
|
||||
// meta
|
||||
typedef struct SMCtbCursor SMCtbCursor;
|
||||
typedef struct SMStbCursor SMStbCursor;
|
||||
typedef struct STbUidStore STbUidStore;
|
||||
|
||||
#define META_BEGIN_HEAP_BUFFERPOOL 0
|
||||
#define META_BEGIN_HEAP_OS 1
|
||||
#define META_BEGIN_HEAP_NIL 2
|
||||
|
||||
int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
|
||||
int metaClose(SMeta* pMeta);
|
||||
int metaBegin(SMeta* pMeta, int8_t fromSys);
|
||||
|
@ -105,6 +112,7 @@ TXN* metaGetTxn(SMeta* pMeta);
|
|||
int metaCommit(SMeta* pMeta, TXN* txn);
|
||||
int metaFinishCommit(SMeta* pMeta, TXN* txn);
|
||||
int metaPrepareAsyncCommit(SMeta* pMeta);
|
||||
int metaAbort(SMeta* pMeta);
|
||||
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
|
||||
|
@ -238,6 +246,7 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
|
|||
// STsdbSnapWriter ========================================
|
||||
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter);
|
||||
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter);
|
||||
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
|
||||
// STqSnapshotReader ==
|
||||
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader);
|
||||
|
|
|
@ -19,19 +19,21 @@ static FORCE_INLINE void *metaMalloc(void *pPool, size_t size) { return vnodeBuf
|
|||
static FORCE_INLINE void metaFree(void *pPool, void *p) { vnodeBufPoolFree((SVBufPool *)pPool, p); }
|
||||
|
||||
// begin a meta txn
|
||||
int metaBegin(SMeta *pMeta, int8_t fromSys) {
|
||||
void *(*xMalloc)(void *, size_t);
|
||||
void (*xFree)(void *, void *);
|
||||
int metaBegin(SMeta *pMeta, int8_t heap) {
|
||||
void *(*xMalloc)(void *, size_t) = NULL;
|
||||
void (*xFree)(void *, void *) = NULL;
|
||||
void *xArg = NULL;
|
||||
|
||||
if (fromSys) {
|
||||
// default heap to META_BEGIN_HEAP_NIL
|
||||
if (heap == META_BEGIN_HEAP_OS) {
|
||||
xMalloc = tdbDefaultMalloc;
|
||||
xFree = tdbDefaultFree;
|
||||
} else {
|
||||
} else if (heap == META_BEGIN_HEAP_BUFFERPOOL) {
|
||||
xMalloc = metaMalloc;
|
||||
xFree = metaFree;
|
||||
xArg = pMeta->pVnode->inUse;
|
||||
}
|
||||
|
||||
if (tdbBegin(pMeta->pEnv, &pMeta->txn, xMalloc, xFree, xArg, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -145,7 +145,7 @@ int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWr
|
|||
pWriter->sver = sver;
|
||||
pWriter->ever = ever;
|
||||
|
||||
metaBegin(pMeta, 1);
|
||||
metaBegin(pMeta, META_BEGIN_HEAP_NIL);
|
||||
|
||||
*ppWriter = pWriter;
|
||||
return code;
|
||||
|
@ -161,7 +161,8 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
|
|||
SMetaSnapWriter* pWriter = *ppWriter;
|
||||
|
||||
if (rollback) {
|
||||
ASSERT(0);
|
||||
code = metaAbort(pWriter->pMeta);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
|
||||
if (code) goto _err;
|
||||
|
|
|
@ -1376,27 +1376,34 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
if (pWriter->dWriter.pWriter) {
|
||||
code = tsdbSnapWriteCloseFile(pWriter);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
|
||||
code = tsdbSnapWriteDelEnd(pWriter);
|
||||
if (code) goto _exit;
|
||||
|
||||
code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs);
|
||||
if (code) goto _exit;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
|
||||
int32_t code = 0;
|
||||
STsdbSnapWriter* pWriter = *ppWriter;
|
||||
STsdb* pTsdb = pWriter->pTsdb;
|
||||
|
||||
if (rollback) {
|
||||
ASSERT(0);
|
||||
// code = tsdbFSRollback(pWriter->pTsdb->pFS);
|
||||
// if (code) goto _err;
|
||||
tsdbRollbackCommit(pWriter->pTsdb);
|
||||
} else {
|
||||
if (pWriter->dWriter.pWriter) {
|
||||
code = tsdbSnapWriteCloseFile(pWriter);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
code = tsdbSnapWriteDelEnd(pWriter);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs);
|
||||
if (code) goto _err;
|
||||
|
||||
// lock
|
||||
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||
|
||||
|
|
|
@ -134,9 +134,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
|||
tjsonAddItemToArray(pNodeInfoArr, pNodeInfo);
|
||||
}
|
||||
|
||||
// add tsdb page size config
|
||||
if (tjsonAddIntegerToObject(pJson, "tsdbPageSize", pCfg->tsdbPageSize) < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -16,11 +16,9 @@
|
|||
#include "vnd.h"
|
||||
#include "vnodeInt.h"
|
||||
|
||||
#define VND_INFO_FNAME "vnode.json"
|
||||
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
|
||||
|
||||
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
|
||||
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo);
|
||||
static int vnodeCommitImpl(SCommitInfo *pInfo);
|
||||
|
||||
int vnodeBegin(SVnode *pVnode) {
|
||||
|
@ -40,7 +38,7 @@ int vnodeBegin(SVnode *pVnode) {
|
|||
|
||||
pVnode->state.commitID++;
|
||||
// begin meta
|
||||
if (metaBegin(pVnode->pMeta, 0) < 0) {
|
||||
if (metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL) < 0) {
|
||||
vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
@ -407,7 +405,7 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
|
||||
int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
|
||||
SJson *pJson = NULL;
|
||||
|
||||
pJson = tjsonParse(pData);
|
||||
|
|
|
@ -21,6 +21,8 @@ struct SVSnapReader {
|
|||
int64_t sver;
|
||||
int64_t ever;
|
||||
int64_t index;
|
||||
// config
|
||||
int8_t cfgDone;
|
||||
// meta
|
||||
int8_t metaDone;
|
||||
SMetaSnapReader *pMetaReader;
|
||||
|
@ -88,6 +90,53 @@ int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
|
|||
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
|
||||
int32_t code = 0;
|
||||
|
||||
// CONFIG ==============
|
||||
// FIXME: if commit multiple times and the config changed?
|
||||
if (!pReader->cfgDone) {
|
||||
char fName[TSDB_FILENAME_LEN];
|
||||
if (pReader->pVnode->pTfs) {
|
||||
snprintf(fName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pReader->pVnode->pTfs), TD_DIRSEP,
|
||||
pReader->pVnode->path, TD_DIRSEP, VND_INFO_FNAME);
|
||||
} else {
|
||||
snprintf(fName, TSDB_FILENAME_LEN, "%s%s%s", pReader->pVnode->path, TD_DIRSEP, VND_INFO_FNAME);
|
||||
}
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ);
|
||||
if (NULL == pFile) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
int64_t size;
|
||||
if (taosFStatFile(pFile, &size, NULL) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
taosCloseFile(&pFile);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1);
|
||||
if (*ppData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosCloseFile(&pFile);
|
||||
goto _err;
|
||||
}
|
||||
((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG;
|
||||
((SSnapDataHdr *)(*ppData))->size = size + 1;
|
||||
((SSnapDataHdr *)(*ppData))->data[size] = '\0';
|
||||
|
||||
if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
taosMemoryFree(*ppData);
|
||||
taosCloseFile(&pFile);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
taosCloseFile(&pFile);
|
||||
|
||||
pReader->cfgDone = 1;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// META ==============
|
||||
if (!pReader->metaDone) {
|
||||
// open reader if not
|
||||
|
@ -230,6 +279,8 @@ struct SVSnapWriter {
|
|||
int64_t ever;
|
||||
int64_t commitID;
|
||||
int64_t index;
|
||||
// config
|
||||
SVnodeInfo info;
|
||||
// meta
|
||||
SMetaSnapWriter *pMetaSnapWriter;
|
||||
// tsdb
|
||||
|
@ -248,6 +299,10 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
|
|||
int32_t code = 0;
|
||||
SVSnapWriter *pWriter = NULL;
|
||||
|
||||
// commit memory data
|
||||
vnodeAsyncCommit(pVnode);
|
||||
tsem_wait(&pVnode->canCommit);
|
||||
|
||||
// alloc
|
||||
pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter));
|
||||
if (pWriter == NULL) {
|
||||
|
@ -258,16 +313,8 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
|
|||
pWriter->sver = sver;
|
||||
pWriter->ever = ever;
|
||||
|
||||
// commit it
|
||||
code = vnodeSyncCommit(pVnode);
|
||||
if (code) {
|
||||
taosMemoryFree(pWriter);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// inc commit ID
|
||||
pVnode->state.commitID++;
|
||||
pWriter->commitID = pVnode->state.commitID;
|
||||
pWriter->commitID = ++pVnode->state.commitID;
|
||||
|
||||
vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode),
|
||||
sver, ever, pWriter->commitID);
|
||||
|
@ -284,53 +331,89 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
|||
int32_t code = 0;
|
||||
SVnode *pVnode = pWriter->pVnode;
|
||||
|
||||
// prepare
|
||||
if (pWriter->pTsdbSnapWriter) {
|
||||
tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter);
|
||||
}
|
||||
|
||||
// commit json
|
||||
if (!rollback) {
|
||||
pVnode->config = pWriter->info.config;
|
||||
pVnode->state = (SVState){.committed = pWriter->info.state.committed,
|
||||
.applied = pWriter->info.state.committed,
|
||||
.commitID = pWriter->commitID,
|
||||
.commitTerm = pWriter->info.state.commitTerm,
|
||||
.applyTerm = pWriter->info.state.commitTerm};
|
||||
pVnode->statis = pWriter->info.statis;
|
||||
char dir[TSDB_FILENAME_LEN] = {0};
|
||||
if (pWriter->pVnode->pTfs) {
|
||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
||||
} else {
|
||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path);
|
||||
}
|
||||
|
||||
vnodeCommitInfo(dir, &pWriter->info);
|
||||
} else {
|
||||
vnodeRollback(pWriter->pVnode);
|
||||
}
|
||||
|
||||
// commit/rollback sub-system
|
||||
if (pWriter->pMetaSnapWriter) {
|
||||
code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback);
|
||||
if (code) goto _err;
|
||||
if (code) goto _exit;
|
||||
}
|
||||
|
||||
if (pWriter->pTsdbSnapWriter) {
|
||||
code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback);
|
||||
if (code) goto _err;
|
||||
if (code) goto _exit;
|
||||
}
|
||||
|
||||
if (pWriter->pRsmaSnapWriter) {
|
||||
code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
|
||||
if (code) goto _err;
|
||||
if (code) goto _exit;
|
||||
}
|
||||
|
||||
if (!rollback) {
|
||||
SVnodeInfo info = {0};
|
||||
char dir[TSDB_FILENAME_LEN];
|
||||
vnodeBegin(pVnode);
|
||||
|
||||
pVnode->state.committed = pWriter->ever;
|
||||
pVnode->state.applied = pWriter->ever;
|
||||
pVnode->state.applyTerm = pSnapshot->lastApplyTerm;
|
||||
pVnode->state.commitTerm = pSnapshot->lastApplyTerm;
|
||||
|
||||
info.config = pVnode->config;
|
||||
info.state.committed = pVnode->state.applied;
|
||||
info.state.commitTerm = pVnode->state.applyTerm;
|
||||
info.state.commitID = pVnode->state.commitID;
|
||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
||||
code = vnodeSaveInfo(dir, &info);
|
||||
if (code) goto _err;
|
||||
|
||||
code = vnodeCommitInfo(dir, &info);
|
||||
if (code) goto _err;
|
||||
|
||||
vnodeBegin(pVnode);
|
||||
_exit:
|
||||
if (code) {
|
||||
vError("vgId:%d, vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code));
|
||||
} else {
|
||||
ASSERT(0);
|
||||
vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback);
|
||||
taosMemoryFree(pWriter);
|
||||
}
|
||||
tsem_post(&pVnode->canCommit);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
|
||||
int32_t code = 0;
|
||||
|
||||
SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
|
||||
|
||||
// decode info
|
||||
if (vnodeDecodeInfo(pHdr->data, &pWriter->info) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// change some value
|
||||
pWriter->info.state.commitID = pWriter->commitID;
|
||||
|
||||
// modify info as needed
|
||||
char dir[TSDB_FILENAME_LEN] = {0};
|
||||
if (pWriter->pVnode->pTfs) {
|
||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pWriter->pVnode->pTfs), TD_DIRSEP,
|
||||
pWriter->pVnode->path);
|
||||
} else {
|
||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path);
|
||||
}
|
||||
if (vnodeSaveInfo(dir, &pWriter->info) < 0) {
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
_exit:
|
||||
vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback);
|
||||
taosMemoryFree(pWriter);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
vError("vgId:%d, vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -347,6 +430,10 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
|
|||
pHdr->type, nData);
|
||||
|
||||
switch (pHdr->type) {
|
||||
case SNAP_DATA_CFG: {
|
||||
code = vnodeSnapWriteInfo(pWriter, pData, nData);
|
||||
if (code) goto _err;
|
||||
} break;
|
||||
case SNAP_DATA_META: {
|
||||
// meta
|
||||
if (pWriter->pMetaSnapWriter == NULL) {
|
||||
|
|
|
@ -317,11 +317,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
// commit if need
|
||||
if (vnodeShouldCommit(pVnode)) {
|
||||
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
|
||||
#if 0
|
||||
vnodeSyncCommit(pVnode);
|
||||
#else
|
||||
vnodeAsyncCommit(pVnode);
|
||||
#endif
|
||||
|
||||
// start a new one
|
||||
if (vnodeBegin(pVnode) < 0) {
|
||||
|
|
Loading…
Reference in New Issue