Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_refact
This commit is contained in:
commit
669c985572
|
@ -881,10 +881,48 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
|
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
// write blockIdx
|
||||||
|
code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->aBlockIdxN, NULL);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
// update file header
|
||||||
|
code = tsdbUpdateDFileSetHeader(pCommitter->pWriter);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
// upsert SDFileSet
|
||||||
|
code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->fs->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter));
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
// close and sync
|
||||||
|
code = tsdbDataFWriterClose(&pCommitter->pWriter, 1);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
if (pCommitter->pReader) {
|
||||||
|
code = tsdbDataFReaderClose(&pCommitter->pReader);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("vgId:%d commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STsdb *pTsdb = pCommitter->pTsdb;
|
STsdb *pTsdb = pCommitter->pTsdb;
|
||||||
SMemTable *pMemTable = pTsdb->imem;
|
SMemTable *pMemTable = pTsdb->imem;
|
||||||
|
|
||||||
|
// commit file data start
|
||||||
|
code = tsdbCommitFileDataStart(pCommitter);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
// commit file data impl
|
||||||
int32_t iTbData = 0;
|
int32_t iTbData = 0;
|
||||||
int32_t nTbData = taosArrayGetSize(pMemTable->aTbData);
|
int32_t nTbData = taosArrayGetSize(pMemTable->aTbData);
|
||||||
int32_t iBlockIdx = 0;
|
int32_t iBlockIdx = 0;
|
||||||
|
@ -940,56 +978,6 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
tsdbError("vgId:%d commit file data impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
// write blockIdx
|
|
||||||
code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->aBlockIdxN, NULL);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
// update file header
|
|
||||||
code = tsdbUpdateDFileSetHeader(pCommitter->pWriter);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
// upsert SDFileSet
|
|
||||||
code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->fs->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter));
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
// close and sync
|
|
||||||
code = tsdbDataFWriterClose(&pCommitter->pWriter, 1);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
if (pCommitter->pReader) {
|
|
||||||
code = tsdbDataFReaderClose(&pCommitter->pReader);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
tsdbError("vgId:%d commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
// commit file data start
|
|
||||||
code = tsdbCommitFileDataStart(pCommitter);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
// commit file data impl
|
|
||||||
code = tsdbCommitFileDataImpl(pCommitter);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
// commit file data end
|
// commit file data end
|
||||||
code = tsdbCommitFileDataEnd(pCommitter);
|
code = tsdbCommitFileDataEnd(pCommitter);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
@ -997,7 +985,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d commit file data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1239,14 +1239,7 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) {
|
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int64_t size = TSDB_FHDR_SIZE;
|
|
||||||
int64_t n;
|
|
||||||
uint8_t hdr[TSDB_FHDR_SIZE];
|
|
||||||
SHeadFile *pHeadFile = &pWriter->wSet.fHead;
|
|
||||||
SDataFile *pDataFile = &pWriter->wSet.fData;
|
|
||||||
SLastFile *pLastFile = &pWriter->wSet.fLast;
|
|
||||||
SSmaFile *pSmaFile = &pWriter->wSet.fSma;
|
|
||||||
|
|
||||||
// head ==============
|
// head ==============
|
||||||
code = tsdbUpdateDFileHdr(pWriter->pHeadFD, &pWriter->wSet, TSDB_HEAD_FILE);
|
code = tsdbUpdateDFileHdr(pWriter->pHeadFD, &pWriter->wSet, TSDB_HEAD_FILE);
|
||||||
|
|
Loading…
Reference in New Issue