diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index cb3509e983..c87dab14d5 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -172,8 +172,9 @@ void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]); // tsdbFS.c ============================================================================================== int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS); int32_t tsdbFSClose(STsdbFS *pFS); -int32_t tsdbFSStart(STsdbFS *pFS); -int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback); +int32_t tsdbFSBegin(STsdbFS *pFS); +int32_t tsdbFSCommit(STsdbFS *pFS); +int32_t tsdbFSRollback(STsdbFS *pFS); // tsdbReaderWriter.c ============================================================================================== // SDataFWriter int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 04041f1dd1..87d718b9a1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -17,7 +17,7 @@ typedef struct { SDelFile *pDelFile; - SArray *aDFileSet; // SArray + SArray *aDFileSet; // SArray } STsdbFSState; struct STsdbFS { @@ -212,6 +212,59 @@ _err: return code; } +static int32_t tsdbSaveCurrentState(STsdbFS *pFS, STsdbFSState *pState) { + int32_t code = 0; + int64_t n; + int64_t size; + char tfname[TSDB_FILENAME_LEN]; + char fname[TSDB_FILENAME_LEN]; + char *pData = NULL; + TdFilePtr pFD = NULL; + + snprintf(tfname, TSDB_FILENAME_LEN - 1, "%s/CURRENT.t", pFS->pTsdb->path); + snprintf(fname, TSDB_FILENAME_LEN - 1, "%s/CURRENT", pFS->pTsdb->path); + + // encode + code = tsdbFSStateToJsonStr(pState, &pData); + if (code) goto _err; + + // create and write tfname + pFD = taosOpenFile(tfname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (pFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + size = strlen(pData); + n = taosWriteFile(pFD, pData, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosFsyncFile(pFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + taosCloseFile(&pFD); + + // rename + code = taosRenameFile(tfname, fname); + if (code) { + code = TAOS_SYSTEM_ERROR(code); + goto _err; + } + + if (pData) taosMemoryFree(pData); + return code; + +_err: + tsdbError("vgId:%d tsdb save current state failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); + if (pData) taosMemoryFree(pData); + return code; +} + static int32_t tsdbLoadCurrentState(STsdbFS *pFS, STsdbFSState *pState) { int32_t code = 0; int64_t size; @@ -294,12 +347,107 @@ _err: return code; } -int32_t tsdbFSCloseImpl(STsdbFS *pFS) { +static int32_t tsdbFSCloseImpl(STsdbFS *pFS) { int32_t code = 0; // TODO return code; } +static int32_t tsdbApplyDFileSetChange(STsdbFS *pFS, SDFileSet *pFrom, SDFileSet *pTo) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbApplyDelFileChange(STsdbFS *pFS, SDelFile *pFrom, SDelFile *pTo) { + int32_t code = 0; + char fname[TSDB_FILENAME_LEN]; + + if (pFrom && pTo) { + if (pFrom != pTo) { + tsdbDelFileName(pFS->pTsdb, pFrom, fname); + if (taosRemoveFile(fname) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } + } else if (pFrom) { + tsdbDelFileName(pFS->pTsdb, pFrom, fname); + if (taosRemoveFile(fname) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } else { + // do nothing + } + + return code; + +_err: + tsdbError("vgId:%d tsdb apply del file change failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); + return code; +} + +static int32_t tsdbFSApplyDiskChange(STsdbFS *pFS, STsdbFSState *pFrom, STsdbFSState *pTo) { + int32_t code = 0; + int32_t iFrom = 0; + int32_t nFrom = taosArrayGetSize(pFrom->aDFileSet); + int32_t iTo = 0; + int32_t nTo = taosArrayGetSize(pTo->aDFileSet); + SDFileSet *pDFileSetFrom; + SDFileSet *pDFileSetTo; + + // SDelFile + code = tsdbApplyDelFileChange(pFS, pFrom->pDelFile, pTo->pDelFile); + if (code) goto _err; + + // SDFileSet + while (iFrom < nFrom && iTo < nTo) { + pDFileSetFrom = (SDFileSet *)taosArrayGetP(pFrom->aDFileSet, iFrom); + pDFileSetTo = (SDFileSet *)taosArrayGetP(pTo->aDFileSet, iTo); + + if (pDFileSetFrom->fid == pDFileSetTo->fid) { + code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, pDFileSetTo); + if (code) goto _err; + + iFrom++; + iTo++; + } else if (pDFileSetFrom->fid < pDFileSetTo->fid) { + code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, NULL); + if (code) goto _err; + + iFrom++; + } else { + iTo++; + } + } + + while (iFrom < nFrom) { + pDFileSetFrom = (SDFileSet *)taosArrayGetP(pFrom->aDFileSet, iFrom); + code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, NULL); + if (code) goto _err; + + iFrom++; + } + +#if 0 + // do noting + while (iTo < nTo) { + pDFileSetTo = (SDFileSet *)taosArrayGetP(pTo->aDFileSet, iTo); + code = tsdbApplyDFileSetChange(pFS, NULL, pDFileSetTo); + if (code) goto _err; + + iTo++; + } +#endif + + return code; + +_err: + tsdbError("vgId:%d tsdb fs apply disk change failed sicne %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); + return code; +} + // EXPOSED APIS ==================================================================================== int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) { int32_t code = 0; @@ -325,7 +473,7 @@ int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - pFS->cState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet)); + pFS->cState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet *)); if (pFS->cState->aDFileSet == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -336,7 +484,7 @@ int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - pFS->nState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet)); + pFS->nState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet *)); if (pFS->nState->aDFileSet == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -376,14 +524,67 @@ _err: return code; } -int32_t tsdbFSStart(STsdbFS *pFS) { +int32_t tsdbFSBegin(STsdbFS *pFS) { int32_t code = 0; - // TODO + + ASSERT(!pFS->inTxn); + + pFS->inTxn = 1; + + pFS->nState->pDelFile = pFS->cState->pDelFile; + taosArrayClear(pFS->nState->aDFileSet); + for (int32_t iDFileSet = 0; iDFileSet < taosArrayGetSize(pFS->cState->aDFileSet); iDFileSet++) { + SDFileSet *pDFileSet = (SDFileSet *)taosArrayGetP(pFS->cState->aDFileSet, iDFileSet); + + if (taosArrayPush(pFS->nState->aDFileSet, &pDFileSet) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + + return code; + +_err: + tsdbError("vgId:%d tsdb fs begin failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback) { - int32_t code = 0; - // TODO +int32_t tsdbFSCommit(STsdbFS *pFS) { + int32_t code = 0; + STsdbFSState *pState = pFS->nState; + + // need lock (todo) + pFS->nState = pFS->cState; + pFS->cState = pState; + + // save + code = tsdbSaveCurrentState(pFS, pFS->cState); + if (code) goto _err; + + // apply commit on disk + code = tsdbFSApplyDiskChange(pFS, pFS->nState, pFS->cState); + if (code) goto _err; + + pFS->inTxn = 0; + + return code; + +_err: + tsdbError("vgId:%d tsdb fs commit failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); + return code; +} + +int32_t tsdbFSRollback(STsdbFS *pFS) { + int32_t code = 0; + + code = tsdbFSApplyDiskChange(pFS, pFS->nState, pFS->cState); + if (code) goto _err; + + pFS->inTxn = 0; + + return code; + +_err: + tsdbError("vgId:%d tsdb fs rollback failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); return code; }