diff --git a/include/util/tutil.h b/include/util/tutil.h index 2a3f0dcb02..de96300155 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -83,6 +83,12 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, } } +#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \ + if (CODE) { \ + LINO = __LINE__; \ + goto LABEL; \ + } + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index bf110f1ae3..cb877e3b2d 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -32,12 +32,6 @@ extern "C" { #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \ - if (CODE) { \ - LINO = __LINE__; \ - goto LABEL; \ - } - typedef struct TSDBROW TSDBROW; typedef struct TABLEID TABLEID; typedef struct TSDBKEY TSDBKEY; @@ -247,18 +241,17 @@ void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, ch // SDelFile void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]); // tsdbFS.c ============================================================================================== -int32_t tsdbFSOpen(STsdb *pTsdb); +int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback); int32_t tsdbFSClose(STsdb *pTsdb); int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS); void tsdbFSDestroy(STsdbFS *pFS); int32_t tDFileSetCmprFn(const void *p1, const void *p2); -int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFS); -int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFS); +int32_t tsdbFSCommit(STsdb *pTsdb); +int32_t tsdbFSRollback(STsdb *pTsdb); +int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFS); int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS); void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS); -int32_t tsdbFSRollback(STsdbFS *pFS); - int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet); int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile); // tsdbReaderWriter.c ============================================================================================== diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index aca99ecd2f..988ecc5dd3 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -87,11 +87,13 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeShouldCommit(SVnode* pVnode); int32_t vnodeCommit(SVnode* pVnode); +void vnodeRollback(SVnode* pVnode); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); int32_t vnodeSyncCommit(SVnode* pVnode); int32_t vnodeAsyncCommit(SVnode* pVnode); +bool vnodeShouldRollback(SVnode* pVnode); // vnodeSync.c int32_t vnodeSyncOpen(SVnode* pVnode, char* path); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 4685d3696b..42ab0c2a37 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -97,10 +97,11 @@ typedef struct SMCtbCursor SMCtbCursor; typedef struct SMStbCursor SMStbCursor; typedef struct STbUidStore STbUidStore; -int metaOpen(SVnode* pVnode, SMeta** ppMeta); +int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback); int metaClose(SMeta* pMeta); int metaBegin(SMeta* pMeta, int8_t fromSys); int metaCommit(SMeta* pMeta); +int metaFinishCommit(SMeta* pMeta); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); @@ -149,10 +150,12 @@ typedef struct { int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo); // tsdb -int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); +int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback); int tsdbClose(STsdb** pTsdb); int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb); +int32_t tsdbFinishCommit(STsdb* pTsdb); +int32_t tsdbRollbackCommit(STsdb* pTsdb); int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); @@ -200,7 +203,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem // sma int32_t smaInit(); void smaCleanUp(); -int32_t smaOpen(SVnode* pVnode); +int32_t smaOpen(SVnode* pVnode, int8_t rollback); int32_t smaClose(SSma* pSma); int32_t smaBegin(SSma* pSma); int32_t smaSyncPreCommit(SSma* pSma); diff --git a/source/dnode/vnode/src/meta/metaCommit.c b/source/dnode/vnode/src/meta/metaCommit.c index 85ed40970c..01ad833d20 100644 --- a/source/dnode/vnode/src/meta/metaCommit.c +++ b/source/dnode/vnode/src/meta/metaCommit.c @@ -34,6 +34,7 @@ int metaBegin(SMeta *pMeta, int8_t fromSys) { // commit the meta txn int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, &pMeta->txn); } +int metaFinishCommit(SMeta *pMeta) { return tdbPostCommit(pMeta->pEnv, &pMeta->txn); } // abort the meta txn int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, &pMeta->txn); } diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 515fd31e9d..2198033db9 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -27,7 +27,7 @@ static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int k static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); } static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); } -int metaOpen(SVnode *pVnode, SMeta **ppMeta) { +int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { SMeta *pMeta = NULL; int ret; int slen; @@ -60,49 +60,49 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { taosMkDir(pMeta->path); // open env - ret = tdbOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv); + ret = tdbOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv, rollback); if (ret < 0) { metaError("vgId:%d, failed to open meta env since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pTbDb - ret = tdbTbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb); + ret = tdbTbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta table db since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pSkmDb - ret = tdbTbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb); + ret = tdbTbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta schema db since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pUidIdx - ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(SUidIdxVal), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx); + ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(SUidIdxVal), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pNameIdx - ret = tdbTbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx); + ret = tdbTbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta name index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pCtbIdx - ret = tdbTbOpen("ctb.idx", sizeof(SCtbIdxKey), -1, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx); + ret = tdbTbOpen("ctb.idx", sizeof(SCtbIdxKey), -1, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta child table index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pSuidIdx - ret = tdbTbOpen("suid.idx", sizeof(tb_uid_t), 0, uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pSuidIdx); + ret = tdbTbOpen("suid.idx", sizeof(tb_uid_t), 0, uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pSuidIdx, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta super table index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; @@ -119,27 +119,27 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { goto _err; } - ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx); + ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pTtlIdx - ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx); + ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pSmaIdx - ret = tdbTbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx); + ret = tdbTbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta sma index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } - ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pStreamDb); + ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pStreamDb, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index a40bbd7d87..5c5b49ece5 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -165,6 +165,8 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) { } else { code = metaCommit(pWriter->pMeta); if (code) goto _err; + code = metaFinishCommit(pWriter->pMeta); + if (code) goto _err; } taosMemoryFree(pWriter); *ppWriter = NULL; diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index ef0d51f0eb..850e4c5697 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -29,19 +29,19 @@ static int32_t rsmaRestore(SSma *pSma); pKeepCfg->days = smaEvalDays(v, pCfg->retentions, l, pCfg->precision, pCfg->days); \ } while (0) -#define SMA_OPEN_RSMA_IMPL(v, l) \ - do { \ - SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \ - if (!RETENTION_VALID(r)) { \ - if (l == 0) { \ - goto _err; \ - } \ - break; \ - } \ - smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \ - if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg) < 0) { \ - goto _err; \ - } \ +#define SMA_OPEN_RSMA_IMPL(v, l) \ + do { \ + SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \ + if (!RETENTION_VALID(r)) { \ + if (l == 0) { \ + goto _err; \ + } \ + break; \ + } \ + smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \ + if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg, rollback) < 0) { \ + goto _err; \ + } \ } while (0) /** @@ -119,7 +119,7 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty return 0; } -int32_t smaOpen(SVnode *pVnode) { +int32_t smaOpen(SVnode *pVnode, int8_t rollback) { STsdbCfg *pCfg = &pVnode->config.tsdbCfg; ASSERT(!pVnode->pSma); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 8e4bc34e0c..7e3ba57dea 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -70,17 +70,17 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { } int32_t tqMetaOpen(STQ* pTq) { - if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB) < 0) { + if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) { ASSERT(0); return -1; } - if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore) < 0) { + if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) { ASSERT(0); return -1; } - if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore) < 0) { + if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) { ASSERT(0); return -1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index a4d993bc6c..460ef611a3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1041,38 +1041,20 @@ _exit: static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { int32_t code = 0; int32_t lino = 0; + STsdb *pTsdb = pCommitter->pTsdb; - STsdb *pTsdb = pCommitter->pTsdb; - SMemTable *pMemTable = pTsdb->imem; - - ASSERT(eno == 0 && - "tsdbCommit failure" - "Restart taosd"); - - code = tsdbFSCommit1(pTsdb, &pCommitter->fs); - TSDB_CHECK_CODE(code, lino, _exit); - - // lock - taosThreadRwlockWrlock(&pTsdb->rwLock); - - // commit or rollback - code = tsdbFSCommit2(pTsdb, &pCommitter->fs); - if (code) { - taosThreadRwlockUnlock(&pTsdb->rwLock); + if (eno) { + code = eno; + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbFSPrepareCommit(pCommitter->pTsdb, &pCommitter->fs); TSDB_CHECK_CODE(code, lino, _exit); } - pTsdb->imem = NULL; - - // unlock - taosThreadRwlockUnlock(&pTsdb->rwLock); - - tsdbUnrefMemTable(pMemTable); +_exit: tsdbFSDestroy(&pCommitter->fs); taosArrayDestroy(pCommitter->aTbDataP); - -_exit: - if (code) { + if (code || eno) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode)); @@ -1646,3 +1628,50 @@ _exit: } return code; } + +int32_t tsdbFinishCommit(STsdb *pTsdb) { + int32_t code = 0; + int32_t lino = 0; + SMemTable *pMemTable = pTsdb->imem; + + // lock + taosThreadRwlockWrlock(&pTsdb->rwLock); + + code = tsdbFSCommit(pTsdb); + if (code) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + TSDB_CHECK_CODE(code, lino, _exit); + } + + pTsdb->imem = NULL; + + // unlock + taosThreadRwlockUnlock(&pTsdb->rwLock); + if (pMemTable) { + tsdbUnrefMemTable(pMemTable); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbInfo("vgId:%d tsdb finish commit", TD_VID(pTsdb->pVnode)); + } + return code; +} + +int32_t tsdbRollbackCommit(STsdb *pTsdb) { + int32_t code = 0; + int32_t lino = 0; + + code = tsdbFSRollback(pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbInfo("vgId:%d tsdb rollback commit", TD_VID(pTsdb->pVnode)); + } + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 6fd5629592..8edac7abc5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -16,7 +16,7 @@ #include "tsdb.h" // ================================================================================================= -static int32_t tsdbEncodeFS(uint8_t *p, STsdbFS *pFS) { +static int32_t tsdbFSToBinary(uint8_t *p, STsdbFS *pFS) { int32_t n = 0; int8_t hasDel = pFS->pDelFile ? 1 : 0; uint32_t nSet = taosArrayGetSize(pFS->aDFileSet); @@ -39,214 +39,112 @@ static int32_t tsdbEncodeFS(uint8_t *p, STsdbFS *pFS) { return n; } -static int32_t tsdbGnrtCurrent(STsdb *pTsdb, STsdbFS *pFS, char *fname) { - int32_t code = 0; - int64_t n; - int64_t size; - uint8_t *pData = NULL; - TdFilePtr pFD = NULL; +static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, STsdbFS *pFS) { + int32_t code = 0; + int32_t n = 0; - // to binary - size = tsdbEncodeFS(NULL, pFS) + sizeof(TSCKSUM); - pData = taosMemoryMalloc(size); + // version + n += tGetI8(pData + n, NULL); + + // SDelFile + int8_t hasDel = 0; + n += tGetI8(pData + n, &hasDel); + if (hasDel) { + pFS->pDelFile = (SDelFile *)taosMemoryCalloc(1, sizeof(SDelFile)); + if (pFS->pDelFile == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + n += tGetDelFile(pData + n, pFS->pDelFile); + pFS->pDelFile->nRef = 1; + } else { + pFS->pDelFile = NULL; + } + + // aDFileSet + taosArrayClear(pFS->aDFileSet); + uint32_t nSet = 0; + n += tGetU32v(pData + n, &nSet); + for (uint32_t iSet = 0; iSet < nSet; iSet++) { + SDFileSet fSet = {0}; + + int32_t nt = tGetDFileSet(pData + n, &fSet); + if (nt < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + n += nt; + if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + + ASSERT(n + sizeof(TSCKSUM) == nData); + +_exit: + return code; +} + +static int32_t tsdbSaveFSToFile(STsdbFS *pFS, const char *fname) { + int32_t code = 0; + int32_t lino = 0; + + // encode to binary + int32_t size = tsdbFSToBinary(NULL, pFS) + sizeof(TSCKSUM); + uint8_t *pData = taosMemoryMalloc(size); if (pData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - n = tsdbEncodeFS(pData, pFS); - ASSERT(n + sizeof(TSCKSUM) == size); + tsdbFSToBinary(pData, pFS); taosCalcChecksumAppend(0, pData, size); - // create and write - pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + // save to file + TdFilePtr pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); if (pFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - n = taosWriteFile(pFD, pData, size); + int64_t n = taosWriteFile(pFD, pData, size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); } if (taosFsyncFile(pFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); } taosCloseFile(&pFD); +_exit: if (pData) taosMemoryFree(pData); - return code; - -_err: - tsdbError("vgId:%d, tsdb gnrt current failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - if (pData) taosMemoryFree(pData); + if (code) { + tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); + } return code; } -// static int32_t tsdbApplyDFileSetChange(STsdbFS *pFS, SDFileSet *pFrom, SDFileSet *pTo) { -// int32_t code = 0; -// char fname[TSDB_FILENAME_LEN]; +int32_t tsdbFSCreate(STsdbFS *pFS) { + int32_t code = 0; -// if (pFrom && pTo) { -// bool isSameDisk = (pFrom->diskId.level == pTo->diskId.level) && (pFrom->diskId.id == pTo->diskId.id); + pFS->pDelFile = NULL; + pFS->aDFileSet = taosArrayInit(0, sizeof(SDFileSet)); + if (pFS->aDFileSet == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } -// // head -// if (isSameDisk && pFrom->pHeadF->commitID == pTo->pHeadF->commitID) { -// ASSERT(pFrom->pHeadF->size == pTo->pHeadF->size); -// ASSERT(pFrom->pHeadF->offset == pTo->pHeadF->offset); -// } else { -// tsdbHeadFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pHeadF, fname); -// taosRemoveFile(fname); -// } - -// // data -// if (isSameDisk && pFrom->pDataF->commitID == pTo->pDataF->commitID) { -// if (pFrom->pDataF->size > pTo->pDataF->size) { -// code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_DATA_FILE); -// if (code) goto _err; -// } -// } else { -// tsdbDataFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pDataF, fname); -// taosRemoveFile(fname); -// } - -// // stt -// if (isSameDisk && pFrom->pLastF->commitID == pTo->pLastF->commitID) { -// if (pFrom->pLastF->size > pTo->pLastF->size) { -// code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_LAST_FILE); -// if (code) goto _err; -// } -// } else { -// tsdbLastFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pLastF, fname); -// taosRemoveFile(fname); -// } - -// // sma -// if (isSameDisk && pFrom->pSmaF->commitID == pTo->pSmaF->commitID) { -// if (pFrom->pSmaF->size > pTo->pSmaF->size) { -// code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_SMA_FILE); -// if (code) goto _err; -// } -// } else { -// tsdbSmaFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pSmaF, fname); -// taosRemoveFile(fname); -// } -// } else if (pFrom) { -// // head -// tsdbHeadFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pHeadF, fname); -// taosRemoveFile(fname); - -// // data -// tsdbDataFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pDataF, fname); -// taosRemoveFile(fname); - -// // stt -// tsdbLastFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pLastF, fname); -// taosRemoveFile(fname); - -// // fsm -// tsdbSmaFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pSmaF, fname); -// taosRemoveFile(fname); -// } - -// return code; - -// _err: -// tsdbError("vgId:%d, tsdb apply disk file set change failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); -// return code; -// } - -// static int32_t tsdbApplyDelFileChange(STsdbFS *pFS, SDelFile *pFrom, SDelFile *pTo) { -// int32_t code = 0; -// char fname[TSDB_FILENAME_LEN]; - -// if (pFrom && pTo) { -// if (!tsdbDelFileIsSame(pFrom, pTo)) { -// tsdbDelFileName(pFS->pTsdb, pFrom, fname); -// if (taosRemoveFile(fname) < 0) { -// code = TAOS_SYSTEM_ERROR(errno); -// goto _err; -// } -// } -// } else if (pFrom) { -// tsdbDelFileName(pFS->pTsdb, pFrom, fname); -// if (taosRemoveFile(fname) < 0) { -// code = TAOS_SYSTEM_ERROR(errno); -// goto _err; -// } -// } else { -// // do nothing -// } - -// return code; - -// _err: -// tsdbError("vgId:%d, tsdb apply del file change failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); -// return code; -// } - -// static int32_t tsdbFSApplyDiskChange(STsdbFS *pFS, STsdbFSState *pFrom, STsdbFSState *pTo) { -// int32_t code = 0; -// int32_t iFrom = 0; -// int32_t nFrom = taosArrayGetSize(pFrom->aDFileSet); -// int32_t iTo = 0; -// int32_t nTo = taosArrayGetSize(pTo->aDFileSet); -// SDFileSet *pDFileSetFrom; -// SDFileSet *pDFileSetTo; - -// // SDelFile -// code = tsdbApplyDelFileChange(pFS, pFrom->pDelFile, pTo->pDelFile); -// if (code) goto _err; - -// // SDFileSet -// while (iFrom < nFrom && iTo < nTo) { -// pDFileSetFrom = (SDFileSet *)taosArrayGet(pFrom->aDFileSet, iFrom); -// pDFileSetTo = (SDFileSet *)taosArrayGet(pTo->aDFileSet, iTo); - -// if (pDFileSetFrom->fid == pDFileSetTo->fid) { -// code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, pDFileSetTo); -// if (code) goto _err; - -// iFrom++; -// iTo++; -// } else if (pDFileSetFrom->fid < pDFileSetTo->fid) { -// code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, NULL); -// if (code) goto _err; - -// iFrom++; -// } else { -// iTo++; -// } -// } - -// while (iFrom < nFrom) { -// pDFileSetFrom = (SDFileSet *)taosArrayGet(pFrom->aDFileSet, iFrom); -// code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, NULL); -// if (code) goto _err; - -// iFrom++; -// } - -// #if 0 -// // do noting -// while (iTo < nTo) { -// pDFileSetTo = (SDFileSet *)taosArrayGetP(pTo->aDFileSet, iTo); -// code = tsdbApplyDFileSetChange(pFS, NULL, pDFileSetTo); -// if (code) goto _err; - -// iTo++; -// } -// #endif - -// return code; - -// _err: -// tsdbError("vgId:%d, tsdb fs apply disk change failed sicne %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); -// return code; -// } +_exit: + return code; +} void tsdbFSDestroy(STsdbFS *pFS) { if (pFS->pDelFile) { @@ -268,76 +166,81 @@ void tsdbFSDestroy(STsdbFS *pFS) { static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { int32_t code = 0; - int64_t size; - char fname[TSDB_FILENAME_LEN]; + int32_t lino = 0; + int64_t size = 0; + char fname[TSDB_FILENAME_LEN] = {0}; // SDelFile if (pTsdb->fs.pDelFile) { tsdbDelFileName(pTsdb, pTsdb->fs.pDelFile, fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } if (size != tsdbLogicToFileSize(pTsdb->fs.pDelFile->size, pTsdb->pVnode->config.tsdbPageSize)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } // SArray + int32_t fid = 0; for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); + fid = pSet->fid; // head ========= tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } if (size != tsdbLogicToFileSize(pSet->pHeadF->size, pTsdb->pVnode->config.tsdbPageSize)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } // data ========= tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } if (size < tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } else if (size > tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) { - code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } + // else if (size > tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) { + // code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE); + // TSDB_CHECK_CODE(code, lino, _exit); + // } // sma ============= tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } if (size < tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } else if (size > tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) { - code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } + // else if (size > tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) { + // code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE); + // TSDB_CHECK_CODE(code, lino, _exit); + // } // stt =========== for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } if (size != tsdbLogicToFileSize(pSet->aSttF[iStt]->size, pTsdb->pVnode->config.tsdbPageSize)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } } @@ -346,10 +249,11 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { // remove those invalid files (todo) } - return code; - -_err: - tsdbError("vgId:%d, tsdb scan and try fix fs failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code), + fid); + } return code; } @@ -363,141 +267,531 @@ int32_t tDFileSetCmprFn(const void *p1, const void *p2) { return 0; } -static int32_t tsdbRecoverFS(STsdb *pTsdb, uint8_t *pData, int64_t nData) { - int32_t code = 0; - int8_t hasDel; - uint32_t nSet; - int32_t n = 0; +static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) { + SVnode *pVnode = pTsdb->pVnode; + if (pVnode->pTfs) { + if (current) { + snprintf(current, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, + pTsdb->path, TD_DIRSEP); + } + if (current_t) { + snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, + pTsdb->path, TD_DIRSEP); + } + } else { + if (current) { + snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP); + } + if (current_t) { + snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sCURRENT.t", pTsdb->path, TD_DIRSEP); + } + } +} - // version - n += tGetI8(pData + n, NULL); +static int32_t tsdbLoadFSFromFile(const char *fname, STsdbFS *pFS) { + int32_t code = 0; + int32_t lino = 0; + uint8_t *pData = NULL; + + // load binary + TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ); + if (pFD == NULL) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t size; + if (taosFStatFile(pFD, &size, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + pData = taosMemoryMalloc(size); + if (pData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (taosReadFile(pFD, pData, size) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (!taosCheckChecksumWhole(pData, size)) { + code = TSDB_CODE_FILE_CORRUPTED; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + taosCloseFile(&pFD); + + // decode binary + code = tsdbBinaryToFS(pData, size, pFS); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (pData) taosMemoryFree(pData); + if (code) { + tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); + } + return code; +} + +static int32_t tsdbRemoveFileSet(STsdb *pTsdb, SDFileSet *pSet) { + int32_t code = 0; + char fname[TSDB_FILENAME_LEN] = {0}; + + int32_t nRef = atomic_sub_fetch_32(&pSet->pHeadF->nRef, 1); + if (nRef == 0) { + tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pSet->pHeadF); + } + + nRef = atomic_sub_fetch_32(&pSet->pDataF->nRef, 1); + if (nRef == 0) { + tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname); + taosRemoveFile(fname); + taosMemoryFree(pSet->pDataF); + } + + nRef = atomic_sub_fetch_32(&pSet->pSmaF->nRef, 1); + if (nRef == 0) { + tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname); + taosRemoveFile(fname); + taosMemoryFree(pSet->pSmaF); + } + + for (int8_t iStt = 0; iStt < pSet->nSttF; iStt++) { + nRef = atomic_sub_fetch_32(&pSet->aSttF[iStt]->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname); + taosRemoveFile(fname); + taosMemoryFree(pSet->aSttF[iStt]); + } + } + +_exit: + return code; +} + +static int32_t tsdbNewFileSet(STsdb *pTsdb, SDFileSet *pSetTo, SDFileSet *pSetFrom) { + int32_t code = 0; + int32_t lino = 0; + + *pSetTo = (SDFileSet){.diskId = pSetFrom->diskId, .fid = pSetFrom->fid, .nSttF = 0}; + + // head + pSetTo->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); + if (pSetTo->pHeadF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetTo->pHeadF = *pSetFrom->pHeadF; + pSetTo->pHeadF->nRef = 1; + + // data + pSetTo->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); + if (pSetTo->pDataF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetTo->pDataF = *pSetFrom->pDataF; + pSetTo->pDataF->nRef = 1; + + // sma + pSetTo->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); + if (pSetTo->pSmaF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetTo->pSmaF = *pSetFrom->pSmaF; + pSetTo->pSmaF->nRef = 1; + + // stt + for (int32_t iStt = 0; iStt < pSetFrom->nSttF; iStt++) { + pSetTo->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetTo->aSttF[iStt] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + pSetTo->nSttF++; + *pSetTo->aSttF[iStt] = *pSetFrom->aSttF[iStt]; + pSetTo->aSttF[iStt]->nRef = 1; + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSetNew) { + int32_t code = 0; + int32_t lino = 0; + int32_t nRef = 0; + bool sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id)); + char fname[TSDB_FILENAME_LEN] = {0}; + + // head + SHeadFile *pHeadF = pSetOld->pHeadF; + if ((!sameDisk) || (pHeadF->commitID != pSetNew->pHeadF->commitID)) { + pSetOld->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); + if (pSetOld->pHeadF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetOld->pHeadF = *pSetNew->pHeadF; + pSetOld->pHeadF->nRef = 1; + + nRef = atomic_sub_fetch_32(&pHeadF->nRef, 1); + if (nRef == 0) { + tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pHeadF, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pHeadF); + } + } else { + nRef = pHeadF->nRef; + *pHeadF = *pSetNew->pHeadF; + pHeadF->nRef = nRef; + } + + // data + SDataFile *pDataF = pSetOld->pDataF; + if ((!sameDisk) || (pDataF->commitID != pSetNew->pDataF->commitID)) { + pSetOld->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); + if (pSetOld->pDataF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetOld->pDataF = *pSetNew->pDataF; + pSetOld->pDataF->nRef = 1; + + nRef = atomic_sub_fetch_32(&pDataF->nRef, 1); + if (nRef == 0) { + tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pDataF, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pDataF); + } + } else { + nRef = pDataF->nRef; + *pDataF = *pSetNew->pDataF; + pDataF->nRef = nRef; + } + + // sma + SSmaFile *pSmaF = pSetOld->pSmaF; + if ((!sameDisk) || (pSmaF->commitID != pSetNew->pSmaF->commitID)) { + pSetOld->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); + if (pSetOld->pSmaF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetOld->pSmaF = *pSetNew->pSmaF; + pSetOld->pSmaF->nRef = 1; + + nRef = atomic_sub_fetch_32(&pSmaF->nRef, 1); + if (nRef == 0) { + tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSmaF, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pSmaF); + } + } else { + nRef = pSmaF->nRef; + *pSmaF = *pSetNew->pSmaF; + pSmaF->nRef = nRef; + } + + // stt + if (sameDisk) { + if (pSetNew->nSttF > pSetOld->nSttF) { + ASSERT(pSetNew->nSttF == pSetOld->nSttF + 1); + pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[pSetOld->nSttF] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetOld->aSttF[pSetOld->nSttF] = *pSetNew->aSttF[pSetOld->nSttF]; + pSetOld->aSttF[pSetOld->nSttF]->nRef = 1; + pSetOld->nSttF++; + } else if (pSetNew->nSttF < pSetOld->nSttF) { + ASSERT(pSetNew->nSttF == 1); + for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + SSttFile *pSttFile = pSetOld->aSttF[iStt]; + nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); + taosRemoveFile(fname); + taosMemoryFree(pSttFile); + } + pSetOld->aSttF[iStt] = NULL; + } + + pSetOld->nSttF = 1; + pSetOld->aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[0] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetOld->aSttF[0] = *pSetNew->aSttF[0]; + pSetOld->aSttF[0]->nRef = 1; + } else { + for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + if (pSetOld->aSttF[iStt]->commitID != pSetNew->aSttF[iStt]->commitID) { + SSttFile *pSttFile = pSetOld->aSttF[iStt]; + nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); + taosRemoveFile(fname); + taosMemoryFree(pSttFile); + } + + pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[iStt] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; + pSetOld->aSttF[iStt]->nRef = 1; + } else { + ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size); + ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset); + } + } + } + } else { + for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + SSttFile *pSttFile = pSetOld->aSttF[iStt]; + nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pSttFile); + } + } + + pSetOld->nSttF = 0; + for (int32_t iStt = 0; iStt < pSetNew->nSttF; iStt++) { + pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[iStt] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; + pSetOld->aSttF[iStt]->nRef = 1; + + pSetOld->nSttF++; + } + } + + if (!sameDisk) { + pSetOld->diskId = pSetNew->diskId; + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbFSApplyChange(STsdb *pTsdb, STsdbFS *pFS) { + int32_t code = 0; + int32_t lino = 0; + + int32_t nRef = 0; + char fname[TSDB_FILENAME_LEN] = {0}; // SDelFile - n += tGetI8(pData + n, &hasDel); - if (hasDel) { - pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); - if (pTsdb->fs.pDelFile == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + if (pFS->pDelFile) { + SDelFile *pDelFile = pTsdb->fs.pDelFile; - pTsdb->fs.pDelFile->nRef = 1; - n += tGetDelFile(pData + n, pTsdb->fs.pDelFile); + if (pDelFile == NULL || (pDelFile->commitID != pFS->pDelFile->commitID)) { + pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); + if (pTsdb->fs.pDelFile == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + *pTsdb->fs.pDelFile = *pFS->pDelFile; + pTsdb->fs.pDelFile->nRef = 1; + + if (pDelFile) { + nRef = atomic_sub_fetch_32(&pDelFile->nRef, 1); + if (nRef == 0) { + tsdbDelFileName(pTsdb, pDelFile, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pDelFile); + } + } + } } else { - pTsdb->fs.pDelFile = NULL; + ASSERT(pTsdb->fs.pDelFile == NULL); } - // SArray - taosArrayClear(pTsdb->fs.aDFileSet); - n += tGetU32v(pData + n, &nSet); - for (uint32_t iSet = 0; iSet < nSet; iSet++) { + // aDFileSet + int32_t iOld = 0; + int32_t iNew = 0; + while (true) { + int32_t nOld = taosArrayGetSize(pTsdb->fs.aDFileSet); + int32_t nNew = taosArrayGetSize(pFS->aDFileSet); SDFileSet fSet = {0}; + int8_t sameDisk = 0; - int32_t nt = tGetDFileSet(pData + n, &fSet); - if (nt < 0) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + if (iOld >= nOld && iNew >= nNew) break; - n += nt; + SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL; + SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFS->aDFileSet, iNew) : NULL; - if (taosArrayPush(pTsdb->fs.aDFileSet, &fSet) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + if (pSetOld && pSetNew) { + if (pSetOld->fid == pSetNew->fid) { + code = tsdbMergeFileSet(pTsdb, pSetOld, pSetNew); + TSDB_CHECK_CODE(code, lino, _exit); + + iOld++; + iNew++; + } else if (pSetOld->fid < pSetNew->fid) { + code = tsdbRemoveFileSet(pTsdb, pSetOld); + TSDB_CHECK_CODE(code, lino, _exit); + taosArrayRemove(pTsdb->fs.aDFileSet, iOld); + } else { + code = tsdbNewFileSet(pTsdb, &fSet, pSetNew); + TSDB_CHECK_CODE(code, lino, _exit) + + if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + iOld++; + iNew++; + } + } else if (pSetOld) { + code = tsdbRemoveFileSet(pTsdb, pSetOld); + TSDB_CHECK_CODE(code, lino, _exit); + taosArrayRemove(pTsdb->fs.aDFileSet, iOld); + } else { + code = tsdbNewFileSet(pTsdb, &fSet, pSetNew); + TSDB_CHECK_CODE(code, lino, _exit) + + if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + iOld++; + iNew++; } } - ASSERT(n + sizeof(TSCKSUM) == nData); - return code; - -_err: +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } // EXPOSED APIS ==================================================================================== -int32_t tsdbFSOpen(STsdb *pTsdb) { +int32_t tsdbFSCommit(STsdb *pTsdb) { int32_t code = 0; + int32_t lino = 0; + STsdbFS fs = {0}; + + char current[TSDB_FILENAME_LEN] = {0}; + char current_t[TSDB_FILENAME_LEN] = {0}; + tsdbGetCurrentFName(pTsdb, current, current_t); + + if (!taosCheckExistFile(current_t)) goto _exit; + + // rename the file + if (taosRenameFile(current_t, current) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // Load the new FS + code = tsdbFSCreate(&fs); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbLoadFSFromFile(current, &fs); + TSDB_CHECK_CODE(code, lino, _exit); + + // apply file change + code = tsdbFSApplyChange(pTsdb, &fs); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + tsdbFSDestroy(&fs); + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t tsdbFSRollback(STsdb *pTsdb) { + int32_t code = 0; + int32_t lino = 0; + + char current_t[TSDB_FILENAME_LEN] = {0}; + tsdbGetCurrentFName(pTsdb, NULL, current_t); + (void)taosRemoveFile(current_t); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno)); + } + return code; +} + +int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) { + int32_t code = 0; + int32_t lino = 0; SVnode *pVnode = pTsdb->pVnode; // open handle - pTsdb->fs.pDelFile = NULL; - pTsdb->fs.aDFileSet = taosArrayInit(0, sizeof(SDFileSet)); - if (pTsdb->fs.aDFileSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + code = tsdbFSCreate(&pTsdb->fs); + TSDB_CHECK_CODE(code, lino, _exit); - // load fs or keep empty - char fname[TSDB_FILENAME_LEN]; + // open impl + char current[TSDB_FILENAME_LEN] = {0}; + char current_t[TSDB_FILENAME_LEN] = {0}; + tsdbGetCurrentFName(pTsdb, current, current_t); - if (pVnode->pTfs) { - snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, - pTsdb->path, TD_DIRSEP); + if (taosCheckExistFile(current)) { + code = tsdbLoadFSFromFile(current, &pTsdb->fs); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosCheckExistFile(current_t)) { + if (rollback) { + code = tsdbFSRollback(pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbFSCommit(pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + } + } } else { - snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP); - } - - if (!taosCheckExistFile(fname)) { // empty one - code = tsdbGnrtCurrent(pTsdb, &pTsdb->fs, fname); - if (code) goto _err; - } else { - // read - TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ); - if (pFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbSaveFSToFile(&pTsdb->fs, current); + TSDB_CHECK_CODE(code, lino, _exit); - int64_t size; - if (taosFStatFile(pFD, &size, NULL) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - taosCloseFile(&pFD); - goto _err; - } - - uint8_t *pData = taosMemoryMalloc(size); - if (pData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - taosCloseFile(&pFD); - goto _err; - } - - int64_t n = taosReadFile(pFD, pData, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - taosMemoryFree(pData); - taosCloseFile(&pFD); - goto _err; - } - - if (!taosCheckChecksumWhole(pData, size)) { - code = TSDB_CODE_FILE_CORRUPTED; - taosMemoryFree(pData); - taosCloseFile(&pFD); - goto _err; - } - - taosCloseFile(&pFD); - - // recover fs - code = tsdbRecoverFS(pTsdb, pData, size); - if (code) { - taosMemoryFree(pData); - goto _err; - } - - taosMemoryFree(pData); + ASSERT(!rollback); } // scan and fix FS code = tsdbScanAndTryFixFS(pTsdb); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); - return code; - -_err: - tsdbError("vgId:%d, tsdb fs open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } @@ -538,19 +832,24 @@ int32_t tsdbFSClose(STsdb *pTsdb) { int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { int32_t code = 0; + int32_t lino = 0; pFS->pDelFile = NULL; - pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet)); - if (pFS->aDFileSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + if (pFS->aDFileSet) { + taosArrayClear(pFS->aDFileSet); + } else { + pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet)); + if (pFS->aDFileSet == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } } if (pTsdb->fs.pDelFile) { pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); if (pFS->pDelFile == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } *pFS->pDelFile = *pTsdb->fs.pDelFile; @@ -564,7 +863,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); if (fSet.pHeadF == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } *fSet.pHeadF = *pSet->pHeadF; @@ -572,7 +871,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); if (fSet.pDataF == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } *fSet.pDataF = *pSet->pDataF; @@ -580,7 +879,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); if (fSet.pSmaF == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } *fSet.pSmaF = *pSet->pSmaF; @@ -589,26 +888,21 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { fSet.aSttF[fSet.nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); if (fSet.aSttF[fSet.nSttF] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } *fSet.aSttF[fSet.nSttF] = *pSet->aSttF[fSet.nSttF]; } if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } } _exit: - return code; -} - -int32_t tsdbFSRollback(STsdbFS *pFS) { - int32_t code = 0; - - ASSERT(0); - + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } @@ -714,336 +1008,21 @@ _exit: return code; } -int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFSNew) { +int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFSNew) { int32_t code = 0; + int32_t lino = 0; char tfname[TSDB_FILENAME_LEN]; - char fname[TSDB_FILENAME_LEN]; - snprintf(tfname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, - pTsdb->path, TD_DIRSEP); - snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, - pTsdb->path, TD_DIRSEP); + tsdbGetCurrentFName(pTsdb, NULL, tfname); // gnrt CURRENT.t - code = tsdbGnrtCurrent(pTsdb, pFSNew, tfname); - if (code) goto _err; + code = tsdbSaveFSToFile(pFSNew, tfname); + TSDB_CHECK_CODE(code, lino, _exit); - // rename - code = taosRenameFile(tfname, fname); +_exit: if (code) { - code = TAOS_SYSTEM_ERROR(code); - goto _err; + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } - - return code; - -_err: - tsdbError("vgId:%d, tsdb fs commit phase 1 failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - -int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { - int32_t code = 0; - int32_t nRef; - char fname[TSDB_FILENAME_LEN]; - - // del - if (pFSNew->pDelFile) { - SDelFile *pDelFile = pTsdb->fs.pDelFile; - - if (pDelFile == NULL || (pDelFile->commitID != pFSNew->pDelFile->commitID)) { - pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); - if (pTsdb->fs.pDelFile == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - *pTsdb->fs.pDelFile = *pFSNew->pDelFile; - pTsdb->fs.pDelFile->nRef = 1; - - if (pDelFile) { - nRef = atomic_sub_fetch_32(&pDelFile->nRef, 1); - if (nRef == 0) { - tsdbDelFileName(pTsdb, pDelFile, fname); - taosRemoveFile(fname); - taosMemoryFree(pDelFile); - } - } - } - } else { - ASSERT(pTsdb->fs.pDelFile == NULL); - } - - // data - int32_t iOld = 0; - int32_t iNew = 0; - while (true) { - int32_t nOld = taosArrayGetSize(pTsdb->fs.aDFileSet); - int32_t nNew = taosArrayGetSize(pFSNew->aDFileSet); - SDFileSet fSet; - int8_t sameDisk; - - if (iOld >= nOld && iNew >= nNew) break; - - SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL; - SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFSNew->aDFileSet, iNew) : NULL; - - if (pSetOld && pSetNew) { - if (pSetOld->fid == pSetNew->fid) { - goto _merge_old_and_new; - } else if (pSetOld->fid < pSetNew->fid) { - goto _remove_old; - } else { - goto _add_new; - } - } else if (pSetOld) { - goto _remove_old; - } else { - goto _add_new; - } - - _merge_old_and_new: - sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id)); - - // head - fSet.pHeadF = pSetOld->pHeadF; - if ((!sameDisk) || (pSetOld->pHeadF->commitID != pSetNew->pHeadF->commitID)) { - pSetOld->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); - if (pSetOld->pHeadF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *pSetOld->pHeadF = *pSetNew->pHeadF; - pSetOld->pHeadF->nRef = 1; - - nRef = atomic_sub_fetch_32(&fSet.pHeadF->nRef, 1); - if (nRef == 0) { - tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pHeadF, fname); - (void)taosRemoveFile(fname); - taosMemoryFree(fSet.pHeadF); - } - } else { - ASSERT(fSet.pHeadF->size == pSetNew->pHeadF->size); - ASSERT(fSet.pHeadF->offset == pSetNew->pHeadF->offset); - } - - // data - fSet.pDataF = pSetOld->pDataF; - if ((!sameDisk) || (pSetOld->pDataF->commitID != pSetNew->pDataF->commitID)) { - pSetOld->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); - if (pSetOld->pDataF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *pSetOld->pDataF = *pSetNew->pDataF; - pSetOld->pDataF->nRef = 1; - - nRef = atomic_sub_fetch_32(&fSet.pDataF->nRef, 1); - if (nRef == 0) { - tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pDataF, fname); - taosRemoveFile(fname); - taosMemoryFree(fSet.pDataF); - } - } else { - ASSERT(pSetOld->pDataF->size <= pSetNew->pDataF->size); - pSetOld->pDataF->size = pSetNew->pDataF->size; - } - - // sma - fSet.pSmaF = pSetOld->pSmaF; - if ((!sameDisk) || (pSetOld->pSmaF->commitID != pSetNew->pSmaF->commitID)) { - pSetOld->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); - if (pSetOld->pSmaF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *pSetOld->pSmaF = *pSetNew->pSmaF; - pSetOld->pSmaF->nRef = 1; - - nRef = atomic_sub_fetch_32(&fSet.pSmaF->nRef, 1); - if (nRef == 0) { - tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pSmaF, fname); - (void)taosRemoveFile(fname); - taosMemoryFree(fSet.pSmaF); - } - } else { - ASSERT(pSetOld->pSmaF->size <= pSetNew->pSmaF->size); - pSetOld->pSmaF->size = pSetNew->pSmaF->size; - } - - // stt - if (sameDisk) { - if (pSetNew->nSttF > pSetOld->nSttF) { - ASSERT(pSetNew->nSttF == pSetOld->nSttF + 1); - pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); - if (pSetOld->aSttF[pSetOld->nSttF] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *pSetOld->aSttF[pSetOld->nSttF] = *pSetNew->aSttF[pSetOld->nSttF]; - pSetOld->aSttF[pSetOld->nSttF]->nRef = 1; - pSetOld->nSttF++; - } else if (pSetNew->nSttF < pSetOld->nSttF) { - ASSERT(pSetNew->nSttF == 1); - for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { - SSttFile *pSttFile = pSetOld->aSttF[iStt]; - nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); - if (nRef == 0) { - tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); - taosRemoveFile(fname); - taosMemoryFree(pSttFile); - } - pSetOld->aSttF[iStt] = NULL; - } - - pSetOld->nSttF = 1; - pSetOld->aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); - if (pSetOld->aSttF[0] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *pSetOld->aSttF[0] = *pSetNew->aSttF[0]; - pSetOld->aSttF[0]->nRef = 1; - } else { - for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { - if (pSetOld->aSttF[iStt]->commitID != pSetNew->aSttF[iStt]->commitID) { - SSttFile *pSttFile = pSetOld->aSttF[iStt]; - nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); - if (nRef == 0) { - tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); - taosRemoveFile(fname); - taosMemoryFree(pSttFile); - } - - pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); - if (pSetOld->aSttF[iStt] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; - pSetOld->aSttF[iStt]->nRef = 1; - } else { - ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size); - ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset); - } - } - } - } else { - ASSERT(pSetOld->nSttF == pSetNew->nSttF); - for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { - SSttFile *pSttFile = pSetOld->aSttF[iStt]; - nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); - if (nRef == 0) { - tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); - taosRemoveFile(fname); - taosMemoryFree(pSttFile); - } - - pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); - if (pSetOld->aSttF[iStt] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; - pSetOld->aSttF[iStt]->nRef = 1; - } - } - - if (!sameDisk) { - pSetOld->diskId = pSetNew->diskId; - } - - iOld++; - iNew++; - continue; - - _remove_old: - nRef = atomic_sub_fetch_32(&pSetOld->pHeadF->nRef, 1); - if (nRef == 0) { - tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pHeadF, fname); - (void)taosRemoveFile(fname); - taosMemoryFree(pSetOld->pHeadF); - } - - nRef = atomic_sub_fetch_32(&pSetOld->pDataF->nRef, 1); - if (nRef == 0) { - tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pDataF, fname); - taosRemoveFile(fname); - taosMemoryFree(pSetOld->pDataF); - } - - nRef = atomic_sub_fetch_32(&pSetOld->pSmaF->nRef, 1); - if (nRef == 0) { - tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pSmaF, fname); - taosRemoveFile(fname); - taosMemoryFree(pSetOld->pSmaF); - } - - for (int8_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { - nRef = atomic_sub_fetch_32(&pSetOld->aSttF[iStt]->nRef, 1); - if (nRef == 0) { - tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSttF[iStt], fname); - taosRemoveFile(fname); - taosMemoryFree(pSetOld->aSttF[iStt]); - } - } - - taosArrayRemove(pTsdb->fs.aDFileSet, iOld); - continue; - - _add_new: - fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nSttF = 1}; - - // head - fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); - if (fSet.pHeadF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *fSet.pHeadF = *pSetNew->pHeadF; - fSet.pHeadF->nRef = 1; - - // data - fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); - if (fSet.pDataF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *fSet.pDataF = *pSetNew->pDataF; - fSet.pDataF->nRef = 1; - - // sma - fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); - if (fSet.pSmaF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *fSet.pSmaF = *pSetNew->pSmaF; - fSet.pSmaF->nRef = 1; - - // stt - ASSERT(pSetNew->nSttF == 1); - fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); - if (fSet.aSttF[0] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *fSet.aSttF[0] = *pSetNew->aSttF[0]; - fSet.aSttF[0]->nRef = 1; - - if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - iOld++; - iNew++; - continue; - } - - return code; - -_err: - tsdbError("vgId:%d, tsdb fs commit phase 2 failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index 5197823490..efc74b68ba 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -33,7 +33,7 @@ int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) { * @param dir * @return int */ -int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKeepCfg) { +int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKeepCfg, int8_t rollback) { STsdb *pTsdb = NULL; int slen = 0; @@ -66,7 +66,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee } // open tsdb - if (tsdbFSOpen(pTsdb) < 0) { + if (tsdbFSOpen(pTsdb, rollback) < 0) { goto _err; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 2c68c57176..259c30d591 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -86,12 +86,12 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { } // do change fs - code = tsdbFSCommit1(pTsdb, &fs); + code = tsdbFSPrepareCommit(pTsdb, &fs); if (code) goto _err; taosThreadRwlockWrlock(&pTsdb->rwLock); - code = tsdbFSCommit2(pTsdb, &fs); + code = tsdbFSCommit(pTsdb); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 74c704598d..cdbbe6333d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1380,13 +1380,13 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { code = tsdbSnapWriteDelEnd(pWriter); if (code) goto _err; - code = tsdbFSCommit1(pWriter->pTsdb, &pWriter->fs); + code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs); if (code) goto _err; // lock taosThreadRwlockWrlock(&pTsdb->rwLock); - code = tsdbFSCommit2(pWriter->pTsdb, &pWriter->fs); + code = tsdbFSCommit(pWriter->pTsdb); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); goto _err; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 38f21174a7..24b678b5eb 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -20,8 +20,6 @@ static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo); -static int vnodeStartCommit(SVnode *pVnode); -static int vnodeEndCommit(SVnode *pVnode); static int vnodeCommitImpl(void *arg); static void vnodeWaitCommit(SVnode *pVnode); @@ -215,6 +213,8 @@ int vnodeSyncCommit(SVnode *pVnode) { } int vnodeCommit(SVnode *pVnode) { + int32_t code = 0; + int32_t lino = 0; SVnodeInfo info = {0}; char dir[TSDB_FILENAME_LEN]; @@ -234,8 +234,8 @@ int vnodeCommit(SVnode *pVnode) { snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); } if (vnodeSaveInfo(dir, &info) < 0) { - vError("vgId:%d, failed to save vnode info since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } walBeginSnapshot(pVnode->pWal, pVnode->state.applied); @@ -249,8 +249,8 @@ int vnodeCommit(SVnode *pVnode) { // commit each sub-system if (metaCommit(pVnode->pMeta) < 0) { - vError("vgId:%d, failed to commit meta since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _exit); } if (VND_IS_RSMA(pVnode)) { @@ -259,22 +259,26 @@ int vnodeCommit(SVnode *pVnode) { return -1; } } else { - if (tsdbCommit(pVnode->pTsdb) < 0) { - vError("vgId:%d, failed to commit tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } + code = tsdbCommit(pVnode->pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); } if (tqCommit(pVnode->pTq) < 0) { - vError("vgId:%d, failed to commit tq since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _exit); } - // walCommit (TODO) // commit info if (vnodeCommitInfo(dir, &info) < 0) { - vError("vgId:%d, failed to commit vnode info since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + + tsdbFinishCommit(pVnode->pTsdb); + + if (metaFinishCommit(pVnode->pMeta) < 0) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } pVnode->state.committed = info.state.committed; @@ -287,11 +291,31 @@ int vnodeCommit(SVnode *pVnode) { // apply the commit (TODO) walEndSnapshot(pVnode->pWal); - vInfo("vgId:%d, commit end", TD_VID(pVnode)); - +_exit: + if (code) { + vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } else { + vInfo("vgId:%d, commit end", TD_VID(pVnode)); + } return 0; } +bool vnodeShouldRollback(SVnode *pVnode) { + char tFName[TSDB_FILENAME_LEN] = {0}; + snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, + VND_INFO_FNAME_TMP); + + return taosCheckExistFile(tFName); +} + +void vnodeRollback(SVnode *pVnode) { + char tFName[TSDB_FILENAME_LEN] = {0}; + snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, + VND_INFO_FNAME_TMP); + + (void)taosRemoveFile(tFName); +} + static int vnodeCommitImpl(void *arg) { SVnode *pVnode = (SVnode *)arg; @@ -304,16 +328,6 @@ static int vnodeCommitImpl(void *arg) { return 0; } -static int vnodeStartCommit(SVnode *pVnode) { - // TODO - return 0; -} - -static int vnodeEndCommit(SVnode *pVnode) { - // TODO - return 0; -} - static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); } static int vnodeEncodeState(const void *pObj, SJson *pJson) { diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 001bb5f7f2..3dbd93bb27 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -110,6 +110,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { taosThreadMutexInit(&pVnode->mutex, NULL); taosThreadCondInit(&pVnode->poolNotEmpty, NULL); + int8_t rollback = vnodeShouldRollback(pVnode); + // open buffer pool if (vnodeOpenBufPool(pVnode) < 0) { vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); @@ -117,19 +119,19 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { } // open meta - if (metaOpen(pVnode, &pVnode->pMeta) < 0) { + if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) { vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open tsdb - if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL) < 0) { + if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) { vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open sma - if (smaOpen(pVnode)) { + if (smaOpen(pVnode, rollback)) { vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } @@ -153,14 +155,12 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { goto _err; } -#if !VNODE_AS_LIB // open query if (vnodeQueryOpen(pVnode)) { vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno)); terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } -#endif // vnode begin if (vnodeBegin(pVnode) < 0) { @@ -169,13 +169,15 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { goto _err; } -#if !VNODE_AS_LIB // open sync if (vnodeSyncOpen(pVnode, dir)) { vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } -#endif + + if (rollback) { + vnodeRollback(pVnode); + } return pVnode; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index cf72533b31..02b74c260b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -27,7 +27,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF char* streamPath = taosMemoryCalloc(1, len); sprintf(streamPath, "%s/%s", path, "stream"); pMeta->path = strdup(streamPath); - if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { + if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { taosMemoryFree(streamPath); goto _err; } @@ -36,11 +36,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF taosMulModeMkDir(streamPath, 0755); taosMemoryFree(streamPath); - if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) { + if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { goto _err; } - if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb) < 0) { + if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) { goto _err; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 6991f00bb7..c3c58e125a 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -99,26 +99,26 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int memset(statePath, 0, 300); tstrncpy(statePath, path, 300); } - if (tdbOpen(statePath, szPage, pages, &pState->db) < 0) { + if (tdbOpen(statePath, szPage, pages, &pState->db, 0) < 0) { goto _err; } // open state storage backend - if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb) < 0) { + if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb, 0) < 0) { goto _err; } // todo refactor - if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb) < 0) { + if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb, 0) < 0) { goto _err; } if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->db, - &pState->pSessionStateDb) < 0) { + &pState->pSessionStateDb, 0) < 0) { goto _err; } - if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb) < 0) { + if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb, 0) < 0) { goto _err; } diff --git a/source/libs/tdb/inc/tdb.h b/source/libs/tdb/inc/tdb.h index c90d4e3c03..3d92d164b4 100644 --- a/source/libs/tdb/inc/tdb.h +++ b/source/libs/tdb/inc/tdb.h @@ -31,15 +31,17 @@ typedef struct STBC TBC; typedef struct STxn TXN; // TDB -int32_t tdbOpen(const char *dbname, int szPage, int pages, TDB **ppDb); +int32_t tdbOpen(const char *dbname, int szPage, int pages, TDB **ppDb, int8_t rollback); int32_t tdbClose(TDB *pDb); int32_t tdbBegin(TDB *pDb, TXN *pTxn); int32_t tdbCommit(TDB *pDb, TXN *pTxn); +int32_t tdbPostCommit(TDB *pDb, TXN *pTxn); int32_t tdbAbort(TDB *pDb, TXN *pTxn); int32_t tdbAlter(TDB *pDb, int pages); // TTB -int32_t tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb); +int32_t tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb, + int8_t rollback); int32_t tdbTbClose(TTB *pTb); int32_t tdbTbDrop(TTB *pTb); int32_t tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn); diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index a2803d3ecf..a2002a3ac4 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -15,7 +15,7 @@ #include "tdbInt.h" -int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb) { +int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, int8_t rollback) { TDB *pDb; int dsize; int zsize; @@ -66,7 +66,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb) { #ifdef USE_MAINDB // open main db - ret = tdbTbOpen(TDB_MAINDB_NAME, -1, sizeof(SBtInfo), NULL, pDb, &pDb->pMainDb); + ret = tdbTbOpen(TDB_MAINDB_NAME, -1, sizeof(SBtInfo), NULL, pDb, &pDb->pMainDb, rollback); if (ret < 0) { return -1; } @@ -129,6 +129,21 @@ int32_t tdbCommit(TDB *pDb, TXN *pTxn) { return 0; } +int32_t tdbPostCommit(TDB *pDb, TXN *pTxn) { + SPager *pPager; + int ret; + + for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { + ret = tdbPagerPostCommit(pPager, pTxn); + if (ret < 0) { + tdbError("failed to commit pager since %s. dbName:%s, txnId:%d", tstrerror(terrno), pDb->dbName, pTxn->txnId); + return -1; + } + } + + return 0; +} + int32_t tdbAbort(TDB *pDb, TXN *pTxn) { SPager *pPager; int ret; diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 90332617cd..58f6cedc22 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -305,6 +305,18 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { return 0; } +int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) { + if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) { + tdbError("failed to remove file due to %s. file:%s", strerror(errno), pPager->jFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + pPager->inTran = 0; + + return 0; +} + // recovery dirty pages int tdbPagerAbort(SPager *pPager, TXN *pTxn) { SPage *pPage; @@ -657,3 +669,13 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) { return 0; } + +int tdbPagerRollback(SPager *pPager) { + if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) { + tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), pPager->jFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; +} diff --git a/source/libs/tdb/src/db/tdbTable.c b/source/libs/tdb/src/db/tdbTable.c index 008907ca77..73cc34a86f 100644 --- a/source/libs/tdb/src/db/tdbTable.c +++ b/source/libs/tdb/src/db/tdbTable.c @@ -24,7 +24,8 @@ struct STBC { SBTC btc; }; -int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb) { +int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb, + int8_t rollback) { TTB *pTb; SPager *pPager; int ret; @@ -110,10 +111,14 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF return -1; } - ret = tdbPagerRestore(pPager, pTb->pBt); - if (ret < 0) { - tdbOsFree(pTb); - return -1; + if (rollback) { + tdbPagerRollback(pPager); + } else { + ret = tdbPagerRestore(pPager, pTb->pBt); + if (ret < 0) { + tdbOsFree(pTb); + return -1; + } } *ppTb = pTb; diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index 68434a4319..b45747c972 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -190,12 +190,14 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt); int tdbPagerWrite(SPager *pPager, SPage *pPage); int tdbPagerBegin(SPager *pPager, TXN *pTxn); int tdbPagerCommit(SPager *pPager, TXN *pTxn); +int tdbPagerPostCommit(SPager *pPager, TXN *pTxn); int tdbPagerAbort(SPager *pPager, TXN *pTxn); int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg, TXN *pTxn); void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn); int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno); int tdbPagerRestore(SPager *pPager, SBTree *pBt); +int tdbPagerRollback(SPager *pPager); // tdbPCache.c ==================================== #define TDB_PCACHE_PAGE \ diff --git a/source/libs/tdb/test/tdbExOVFLTest.cpp b/source/libs/tdb/test/tdbExOVFLTest.cpp index d98c271edb..6ead7c0dd6 100644 --- a/source/libs/tdb/test/tdbExOVFLTest.cpp +++ b/source/libs/tdb/test/tdbExOVFLTest.cpp @@ -140,7 +140,7 @@ static void generateBigVal(char *val, int valLen) { static TDB *openEnv(char const *envName, int const pageSize, int const pageNum) { TDB *pEnv = NULL; - int ret = tdbOpen(envName, pageSize, pageNum, &pEnv); + int ret = tdbOpen(envName, pageSize, pageNum, &pEnv, 0); if (ret) { pEnv = NULL; } @@ -162,8 +162,8 @@ static void insertOfp(void) { // open db TTB *pDb = NULL; tdb_cmpr_fn_t compFunc = tKeyCmpr; - // ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb); - ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb); + // ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0); + ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); // open the pool @@ -211,8 +211,8 @@ TEST(TdbOVFLPagesTest, TbGetTest) { // open db TTB *pDb = NULL; tdb_cmpr_fn_t compFunc = tKeyCmpr; - // int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb); - int ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb); + // int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0); + int ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); // generate value payload @@ -253,7 +253,7 @@ TEST(TdbOVFLPagesTest, TbDeleteTest) { // open db TTB *pDb = NULL; tdb_cmpr_fn_t compFunc = tKeyCmpr; - ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb); + ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); // open the pool @@ -354,12 +354,12 @@ TEST(tdb_test, simple_insert1) { taosRemoveDir("tdb"); // Open Env - ret = tdbOpen("tdb", pageSize, 64, &pEnv); + ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0); GTEST_ASSERT_EQ(ret, 0); // Create a database compFunc = tKeyCmpr; - ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); + ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); { diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index 6070052127..f3a301cf5b 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -130,12 +130,12 @@ TEST(tdb_test, DISABLED_simple_insert1) { taosRemoveDir("tdb"); // Open Env - ret = tdbOpen("tdb", 4096, 64, &pEnv); + ret = tdbOpen("tdb", 4096, 64, &pEnv, 0); GTEST_ASSERT_EQ(ret, 0); // Create a database compFunc = tKeyCmpr; - ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); + ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); { @@ -250,12 +250,12 @@ TEST(tdb_test, DISABLED_simple_insert2) { taosRemoveDir("tdb"); // Open Env - ret = tdbOpen("tdb", 1024, 10, &pEnv); + ret = tdbOpen("tdb", 1024, 10, &pEnv, 0); GTEST_ASSERT_EQ(ret, 0); // Create a database compFunc = tDefaultKeyCmpr; - ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); + ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); { @@ -346,11 +346,11 @@ TEST(tdb_test, DISABLED_simple_delete1) { pPool = openPool(); // open env - ret = tdbOpen("tdb", 1024, 256, &pEnv); + ret = tdbOpen("tdb", 1024, 256, &pEnv, 0); GTEST_ASSERT_EQ(ret, 0); // open database - ret = tdbTbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb); + ret = tdbTbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); @@ -435,11 +435,11 @@ TEST(tdb_test, DISABLED_simple_upsert1) { taosRemoveDir("tdb"); // open env - ret = tdbOpen("tdb", 4096, 64, &pEnv); + ret = tdbOpen("tdb", 4096, 64, &pEnv, 0); GTEST_ASSERT_EQ(ret, 0); // open database - ret = tdbTbOpen("db.db", -1, -1, NULL, pEnv, &pDb); + ret = tdbTbOpen("db.db", -1, -1, NULL, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); pPool = openPool(); @@ -497,12 +497,12 @@ TEST(tdb_test, multi_thread_query) { taosRemoveDir("tdb"); // Open Env - ret = tdbOpen("tdb", 4096, 10, &pEnv); + ret = tdbOpen("tdb", 4096, 10, &pEnv, 0); GTEST_ASSERT_EQ(ret, 0); // Create a database compFunc = tKeyCmpr; - ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); + ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); char key[64]; @@ -614,10 +614,10 @@ TEST(tdb_test, DISABLED_multi_thread1) { taosRemoveDir("tdb"); // Open Env - ret = tdbOpen("tdb", 512, 1, &pDb); + ret = tdbOpen("tdb", 512, 1, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); - ret = tdbTbOpen("db.db", -1, -1, NULL, pDb, &pTb); + ret = tdbTbOpen("db.db", -1, -1, NULL, pDb, &pTb, 0); GTEST_ASSERT_EQ(ret, 0); auto insert = [](TDB *pDb, TTB *pTb, int nData, int *stop, std::shared_timed_mutex *mu) { @@ -726,4 +726,4 @@ TEST(tdb_test, DISABLED_multi_thread1) { ret = tdbClose(pDb); GTEST_ASSERT_EQ(ret, 0); #endif -} \ No newline at end of file +} diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index d332d913ca..be862308f1 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -343,6 +343,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { TdFilePtr pFile = (TdFilePtr)taosMemoryMalloc(sizeof(TdFile)); if (pFile == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; if (fd >= 0) close(fd); if (fp != NULL) fclose(fp); return NULL;