more code

This commit is contained in:
Hongze Cheng 2022-10-18 13:38:26 +08:00
parent bb59553e39
commit b7342525c7
9 changed files with 50 additions and 42 deletions

View File

@ -247,7 +247,7 @@ void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, ch
// SDelFile // SDelFile
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]); void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
// tsdbFS.c ============================================================================================== // tsdbFS.c ==============================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb); int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback);
int32_t tsdbFSClose(STsdb *pTsdb); int32_t tsdbFSClose(STsdb *pTsdb);
int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS); int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS);
void tsdbFSDestroy(STsdbFS *pFS); void tsdbFSDestroy(STsdbFS *pFS);

View File

@ -87,11 +87,13 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeBegin(SVnode* pVnode);
int32_t vnodeShouldCommit(SVnode* pVnode); int32_t vnodeShouldCommit(SVnode* pVnode);
int32_t vnodeCommit(SVnode* pVnode); int32_t vnodeCommit(SVnode* pVnode);
void vnodeRollback(SVnode* pVnode);
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
int32_t vnodeSyncCommit(SVnode* pVnode); int32_t vnodeSyncCommit(SVnode* pVnode);
int32_t vnodeAsyncCommit(SVnode* pVnode); int32_t vnodeAsyncCommit(SVnode* pVnode);
bool vnodeShouldRollback(SVnode* pVnode);
// vnodeSync.c // vnodeSync.c
int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncOpen(SVnode* pVnode, char* path);

View File

@ -97,7 +97,7 @@ typedef struct SMCtbCursor SMCtbCursor;
typedef struct SMStbCursor SMStbCursor; typedef struct SMStbCursor SMStbCursor;
typedef struct STbUidStore STbUidStore; typedef struct STbUidStore STbUidStore;
int metaOpen(SVnode* pVnode, SMeta** ppMeta); int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
int metaClose(SMeta* pMeta); int metaClose(SMeta* pMeta);
int metaBegin(SMeta* pMeta, int8_t fromSys); int metaBegin(SMeta* pMeta, int8_t fromSys);
int metaCommit(SMeta* pMeta); int metaCommit(SMeta* pMeta);
@ -149,7 +149,7 @@ typedef struct {
int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo); int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
// tsdb // tsdb
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback);
int tsdbClose(STsdb** pTsdb); int tsdbClose(STsdb** pTsdb);
int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbBegin(STsdb* pTsdb);
int32_t tsdbCommit(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb);
@ -200,7 +200,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
// sma // sma
int32_t smaInit(); int32_t smaInit();
void smaCleanUp(); void smaCleanUp();
int32_t smaOpen(SVnode* pVnode); int32_t smaOpen(SVnode* pVnode, int8_t rollback);
int32_t smaClose(SSma* pSma); int32_t smaClose(SSma* pSma);
int32_t smaBegin(SSma* pSma); int32_t smaBegin(SSma* pSma);
int32_t smaSyncPreCommit(SSma* pSma); int32_t smaSyncPreCommit(SSma* pSma);

View File

@ -27,7 +27,7 @@ static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int k
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); } static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); } static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
int metaOpen(SVnode *pVnode, SMeta **ppMeta) { int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
SMeta *pMeta = NULL; SMeta *pMeta = NULL;
int ret; int ret;
int slen; int slen;

View File

@ -29,19 +29,19 @@ static int32_t rsmaRestore(SSma *pSma);
pKeepCfg->days = smaEvalDays(v, pCfg->retentions, l, pCfg->precision, pCfg->days); \ pKeepCfg->days = smaEvalDays(v, pCfg->retentions, l, pCfg->precision, pCfg->days); \
} while (0) } while (0)
#define SMA_OPEN_RSMA_IMPL(v, l) \ #define SMA_OPEN_RSMA_IMPL(v, l) \
do { \ do { \
SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \ SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \
if (!RETENTION_VALID(r)) { \ if (!RETENTION_VALID(r)) { \
if (l == 0) { \ if (l == 0) { \
goto _err; \ goto _err; \
} \ } \
break; \ break; \
} \ } \
smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \ smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \
if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg) < 0) { \ if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg, rollback) < 0) { \
goto _err; \ goto _err; \
} \ } \
} while (0) } while (0)
/** /**
@ -119,7 +119,7 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty
return 0; return 0;
} }
int32_t smaOpen(SVnode *pVnode) { int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
STsdbCfg *pCfg = &pVnode->config.tsdbCfg; STsdbCfg *pCfg = &pVnode->config.tsdbCfg;
ASSERT(!pVnode->pSma); ASSERT(!pVnode->pSma);

View File

@ -415,7 +415,7 @@ _err:
} }
// EXPOSED APIS ==================================================================================== // EXPOSED APIS ====================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb) { int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
SVnode *pVnode = pTsdb->pVnode; SVnode *pVnode = pTsdb->pVnode;

View File

@ -33,7 +33,7 @@ int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) {
* @param dir * @param dir
* @return int * @return int
*/ */
int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKeepCfg) { int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKeepCfg, int8_t rollback) {
STsdb *pTsdb = NULL; STsdb *pTsdb = NULL;
int slen = 0; int slen = 0;
@ -66,7 +66,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
} }
// open tsdb // open tsdb
if (tsdbFSOpen(pTsdb) < 0) { if (tsdbFSOpen(pTsdb, rollback) < 0) {
goto _err; goto _err;
} }

View File

@ -20,8 +20,6 @@
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo); static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo);
static int vnodeStartCommit(SVnode *pVnode);
static int vnodeEndCommit(SVnode *pVnode);
static int vnodeCommitImpl(void *arg); static int vnodeCommitImpl(void *arg);
static void vnodeWaitCommit(SVnode *pVnode); static void vnodeWaitCommit(SVnode *pVnode);
@ -241,7 +239,7 @@ int vnodeCommit(SVnode *pVnode) {
// preCommit // preCommit
// smaSyncPreCommit(pVnode->pSma); // smaSyncPreCommit(pVnode->pSma);
if(smaAsyncPreCommit(pVnode->pSma) < 0){ if (smaAsyncPreCommit(pVnode->pSma) < 0) {
vError("vgId:%d, failed to async pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to async pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
@ -309,6 +307,22 @@ int vnodeCommit(SVnode *pVnode) {
return 0; return 0;
} }
bool vnodeShouldRollback(SVnode *pVnode) {
char tFName[TSDB_FILENAME_LEN] = {0};
snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
VND_INFO_FNAME_TMP);
return taosCheckExistFile(tFName);
}
void vnodeRollback(SVnode *pVnode) {
char tFName[TSDB_FILENAME_LEN] = {0};
snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
VND_INFO_FNAME_TMP);
(void)taosRemoveFile(tFName);
}
static int vnodeCommitImpl(void *arg) { static int vnodeCommitImpl(void *arg) {
SVnode *pVnode = (SVnode *)arg; SVnode *pVnode = (SVnode *)arg;
@ -321,16 +335,6 @@ static int vnodeCommitImpl(void *arg) {
return 0; return 0;
} }
static int vnodeStartCommit(SVnode *pVnode) {
// TODO
return 0;
}
static int vnodeEndCommit(SVnode *pVnode) {
// TODO
return 0;
}
static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); } static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); }
static int vnodeEncodeState(const void *pObj, SJson *pJson) { static int vnodeEncodeState(const void *pObj, SJson *pJson) {

View File

@ -110,6 +110,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
taosThreadMutexInit(&pVnode->mutex, NULL); taosThreadMutexInit(&pVnode->mutex, NULL);
taosThreadCondInit(&pVnode->poolNotEmpty, NULL); taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
int8_t rollback = vnodeShouldRollback(pVnode);
// open buffer pool // open buffer pool
if (vnodeOpenBufPool(pVnode) < 0) { if (vnodeOpenBufPool(pVnode) < 0) {
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
@ -117,19 +119,19 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
} }
// open meta // open meta
if (metaOpen(pVnode, &pVnode->pMeta) < 0) { if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) {
vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open tsdb // open tsdb
if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL) < 0) { if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) {
vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open sma // open sma
if (smaOpen(pVnode)) { if (smaOpen(pVnode, rollback)) {
vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
@ -153,14 +155,12 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
goto _err; goto _err;
} }
#if !VNODE_AS_LIB
// open query // open query
if (vnodeQueryOpen(pVnode)) { if (vnodeQueryOpen(pVnode)) {
vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
#endif
// vnode begin // vnode begin
if (vnodeBegin(pVnode) < 0) { if (vnodeBegin(pVnode) < 0) {
@ -169,13 +169,15 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
goto _err; goto _err;
} }
#if !VNODE_AS_LIB
// open sync // open sync
if (vnodeSyncOpen(pVnode, dir)) { if (vnodeSyncOpen(pVnode, dir)) {
vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
#endif
if (rollback) {
vnodeRollback(pVnode);
}
return pVnode; return pVnode;