more work

This commit is contained in:
Hongze Cheng 2022-06-21 09:13:16 +00:00
parent 29e65ce767
commit 47e07b7b13
2 changed files with 213 additions and 11 deletions

View File

@ -172,8 +172,9 @@ void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
// tsdbFS.c ==============================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS);
int32_t tsdbFSClose(STsdbFS *pFS);
int32_t tsdbFSStart(STsdbFS *pFS);
int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback);
int32_t tsdbFSBegin(STsdbFS *pFS);
int32_t tsdbFSCommit(STsdbFS *pFS);
int32_t tsdbFSRollback(STsdbFS *pFS);
// tsdbReaderWriter.c ==============================================================================================
// SDataFWriter
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);

View File

@ -17,7 +17,7 @@
typedef struct {
SDelFile *pDelFile;
SArray *aDFileSet; // SArray<aDFileSet>
SArray *aDFileSet; // SArray<aDFileSet *>
} STsdbFSState;
struct STsdbFS {
@ -212,6 +212,59 @@ _err:
return code;
}
static int32_t tsdbSaveCurrentState(STsdbFS *pFS, STsdbFSState *pState) {
int32_t code = 0;
int64_t n;
int64_t size;
char tfname[TSDB_FILENAME_LEN];
char fname[TSDB_FILENAME_LEN];
char *pData = NULL;
TdFilePtr pFD = NULL;
snprintf(tfname, TSDB_FILENAME_LEN - 1, "%s/CURRENT.t", pFS->pTsdb->path);
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s/CURRENT", pFS->pTsdb->path);
// encode
code = tsdbFSStateToJsonStr(pState, &pData);
if (code) goto _err;
// create and write tfname
pFD = taosOpenFile(tfname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
size = strlen(pData);
n = taosWriteFile(pFD, pData, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile(pFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosCloseFile(&pFD);
// rename
code = taosRenameFile(tfname, fname);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
goto _err;
}
if (pData) taosMemoryFree(pData);
return code;
_err:
tsdbError("vgId:%d tsdb save current state failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
if (pData) taosMemoryFree(pData);
return code;
}
static int32_t tsdbLoadCurrentState(STsdbFS *pFS, STsdbFSState *pState) {
int32_t code = 0;
int64_t size;
@ -294,12 +347,107 @@ _err:
return code;
}
int32_t tsdbFSCloseImpl(STsdbFS *pFS) {
static int32_t tsdbFSCloseImpl(STsdbFS *pFS) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbApplyDFileSetChange(STsdbFS *pFS, SDFileSet *pFrom, SDFileSet *pTo) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbApplyDelFileChange(STsdbFS *pFS, SDelFile *pFrom, SDelFile *pTo) {
int32_t code = 0;
char fname[TSDB_FILENAME_LEN];
if (pFrom && pTo) {
if (pFrom != pTo) {
tsdbDelFileName(pFS->pTsdb, pFrom, fname);
if (taosRemoveFile(fname) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
}
} else if (pFrom) {
tsdbDelFileName(pFS->pTsdb, pFrom, fname);
if (taosRemoveFile(fname) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
} else {
// do nothing
}
return code;
_err:
tsdbError("vgId:%d tsdb apply del file change failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbFSApplyDiskChange(STsdbFS *pFS, STsdbFSState *pFrom, STsdbFSState *pTo) {
int32_t code = 0;
int32_t iFrom = 0;
int32_t nFrom = taosArrayGetSize(pFrom->aDFileSet);
int32_t iTo = 0;
int32_t nTo = taosArrayGetSize(pTo->aDFileSet);
SDFileSet *pDFileSetFrom;
SDFileSet *pDFileSetTo;
// SDelFile
code = tsdbApplyDelFileChange(pFS, pFrom->pDelFile, pTo->pDelFile);
if (code) goto _err;
// SDFileSet
while (iFrom < nFrom && iTo < nTo) {
pDFileSetFrom = (SDFileSet *)taosArrayGetP(pFrom->aDFileSet, iFrom);
pDFileSetTo = (SDFileSet *)taosArrayGetP(pTo->aDFileSet, iTo);
if (pDFileSetFrom->fid == pDFileSetTo->fid) {
code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, pDFileSetTo);
if (code) goto _err;
iFrom++;
iTo++;
} else if (pDFileSetFrom->fid < pDFileSetTo->fid) {
code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, NULL);
if (code) goto _err;
iFrom++;
} else {
iTo++;
}
}
while (iFrom < nFrom) {
pDFileSetFrom = (SDFileSet *)taosArrayGetP(pFrom->aDFileSet, iFrom);
code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, NULL);
if (code) goto _err;
iFrom++;
}
#if 0
// do noting
while (iTo < nTo) {
pDFileSetTo = (SDFileSet *)taosArrayGetP(pTo->aDFileSet, iTo);
code = tsdbApplyDFileSetChange(pFS, NULL, pDFileSetTo);
if (code) goto _err;
iTo++;
}
#endif
return code;
_err:
tsdbError("vgId:%d tsdb fs apply disk change failed sicne %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
return code;
}
// EXPOSED APIS ====================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) {
int32_t code = 0;
@ -325,7 +473,7 @@ int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pFS->cState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet));
pFS->cState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet *));
if (pFS->cState->aDFileSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
@ -336,7 +484,7 @@ int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pFS->nState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet));
pFS->nState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet *));
if (pFS->nState->aDFileSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
@ -376,14 +524,67 @@ _err:
return code;
}
int32_t tsdbFSStart(STsdbFS *pFS) {
int32_t tsdbFSBegin(STsdbFS *pFS) {
int32_t code = 0;
// TODO
ASSERT(!pFS->inTxn);
pFS->inTxn = 1;
pFS->nState->pDelFile = pFS->cState->pDelFile;
taosArrayClear(pFS->nState->aDFileSet);
for (int32_t iDFileSet = 0; iDFileSet < taosArrayGetSize(pFS->cState->aDFileSet); iDFileSet++) {
SDFileSet *pDFileSet = (SDFileSet *)taosArrayGetP(pFS->cState->aDFileSet, iDFileSet);
if (taosArrayPush(pFS->nState->aDFileSet, &pDFileSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
return code;
_err:
tsdbError("vgId:%d tsdb fs begin failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback) {
int32_t code = 0;
// TODO
int32_t tsdbFSCommit(STsdbFS *pFS) {
int32_t code = 0;
STsdbFSState *pState = pFS->nState;
// need lock (todo)
pFS->nState = pFS->cState;
pFS->cState = pState;
// save
code = tsdbSaveCurrentState(pFS, pFS->cState);
if (code) goto _err;
// apply commit on disk
code = tsdbFSApplyDiskChange(pFS, pFS->nState, pFS->cState);
if (code) goto _err;
pFS->inTxn = 0;
return code;
_err:
tsdbError("vgId:%d tsdb fs commit failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbFSRollback(STsdbFS *pFS) {
int32_t code = 0;
code = tsdbFSApplyDiskChange(pFS, pFS->nState, pFS->cState);
if (code) goto _err;
pFS->inTxn = 0;
return code;
_err:
tsdbError("vgId:%d tsdb fs rollback failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
return code;
}