more fix
This commit is contained in:
parent
68aa7505bd
commit
f195decac9
|
@ -64,6 +64,7 @@ typedef struct STsdbFS STsdbFS;
|
|||
typedef struct SRowMerger SRowMerger;
|
||||
typedef struct STsdbFSState STsdbFSState;
|
||||
typedef struct STsdbSnapHdr STsdbSnapHdr;
|
||||
typedef struct STsdbReadSnap STsdbReadSnap;
|
||||
|
||||
#define TSDB_MAX_SUBBLOCKS 8
|
||||
#define TSDB_FHDR_SIZE 512
|
||||
|
@ -188,16 +189,22 @@ bool tsdbTbDataIterNext(STbDataIter *pIter);
|
|||
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
|
||||
// tsdbFile.c ==============================================================================================
|
||||
typedef enum { TSDB_HEAD_FILE = 0, TSDB_DATA_FILE, TSDB_LAST_FILE, TSDB_SMA_FILE } EDataFileT;
|
||||
void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]);
|
||||
bool tsdbFileIsSame(SDFileSet *pDFileSet1, SDFileSet *pDFileSet2, EDataFileT ftype);
|
||||
|
||||
bool tsdbDelFileIsSame(SDelFile *pDelFile1, SDelFile *pDelFile2);
|
||||
int32_t tsdbUpdateDFileHdr(TdFilePtr pFD, SDFileSet *pSet, EDataFileT ftype);
|
||||
int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype);
|
||||
int32_t tPutDataFileHdr(uint8_t *p, SDFileSet *pSet, EDataFileT ftype);
|
||||
int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile);
|
||||
int32_t tPutDataFile(uint8_t *p, SDataFile *pDataFile);
|
||||
int32_t tPutLastFile(uint8_t *p, SLastFile *pLastFile);
|
||||
int32_t tPutSmaFile(uint8_t *p, SSmaFile *pSmaFile);
|
||||
int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile);
|
||||
int32_t tGetDelFile(uint8_t *p, SDelFile *pDelFile);
|
||||
int32_t tPutDFileSet(uint8_t *p, SDFileSet *pSet);
|
||||
int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet);
|
||||
|
||||
void tsdbHeadFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SHeadFile *pHeadF, char fname[]);
|
||||
void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, char fname[]);
|
||||
void tsdbLastFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SLastFile *pLastF, char fname[]);
|
||||
void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]);
|
||||
// SDelFile
|
||||
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
|
||||
// tsdbFS.c ==============================================================================================
|
||||
|
@ -222,8 +229,7 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBu
|
|||
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2,
|
||||
SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg);
|
||||
|
||||
SDFileSet *tsdbDataFWriterGetWSet(SDataFWriter *pWriter);
|
||||
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
|
||||
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
|
||||
// SDataFReader
|
||||
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
|
||||
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
|
||||
|
@ -245,6 +251,9 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
|
|||
int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
|
||||
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, uint8_t **ppBuf);
|
||||
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf);
|
||||
// tsdbRead.c ==============================================================================================
|
||||
int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap);
|
||||
void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap);
|
||||
|
||||
#define TSDB_CACHE_NO(c) ((c).cacheLast == 0)
|
||||
#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0)
|
||||
|
@ -465,12 +474,6 @@ struct SDelIdx {
|
|||
int64_t size;
|
||||
};
|
||||
|
||||
struct SDelFile {
|
||||
int64_t commitID;
|
||||
int64_t size;
|
||||
int64_t offset;
|
||||
};
|
||||
|
||||
#pragma pack(push, 1)
|
||||
struct SBlockDataHdr {
|
||||
uint32_t delimiter;
|
||||
|
@ -479,34 +482,50 @@ struct SBlockDataHdr {
|
|||
};
|
||||
#pragma pack(pop)
|
||||
|
||||
struct SDelFile {
|
||||
volatile int32_t nRef;
|
||||
|
||||
int64_t commitID;
|
||||
int64_t size;
|
||||
int64_t offset;
|
||||
};
|
||||
|
||||
struct SHeadFile {
|
||||
volatile int32_t nRef;
|
||||
|
||||
int64_t commitID;
|
||||
int64_t size;
|
||||
int64_t offset;
|
||||
};
|
||||
|
||||
struct SDataFile {
|
||||
volatile int32_t nRef;
|
||||
|
||||
int64_t commitID;
|
||||
int64_t size;
|
||||
};
|
||||
|
||||
struct SLastFile {
|
||||
volatile int32_t nRef;
|
||||
|
||||
int64_t commitID;
|
||||
int64_t size;
|
||||
};
|
||||
|
||||
struct SSmaFile {
|
||||
volatile int32_t nRef;
|
||||
|
||||
int64_t commitID;
|
||||
int64_t size;
|
||||
};
|
||||
|
||||
struct SDFileSet {
|
||||
SDiskID diskId;
|
||||
int32_t fid;
|
||||
SHeadFile fHead;
|
||||
SDataFile fData;
|
||||
SLastFile fLast;
|
||||
SSmaFile fSma;
|
||||
SDiskID diskId;
|
||||
int32_t fid;
|
||||
SHeadFile *pHeadF;
|
||||
SDataFile *pDataF;
|
||||
SLastFile *pLastF;
|
||||
SSmaFile *pSmaF;
|
||||
};
|
||||
|
||||
struct SRowIter {
|
||||
|
@ -528,11 +547,13 @@ struct STsdbFSState {
|
|||
};
|
||||
|
||||
struct STsdbFS {
|
||||
STsdb *pTsdb;
|
||||
TdThreadRwlock lock;
|
||||
int8_t inTxn;
|
||||
STsdbFSState *cState;
|
||||
STsdbFSState *nState;
|
||||
STsdb *pTsdb;
|
||||
STsdbFSState *cState;
|
||||
STsdbFSState *nState;
|
||||
|
||||
// new
|
||||
SDelFile *pDelFile;
|
||||
SArray aDFileSetP; // SArray<SDFileSet *>
|
||||
};
|
||||
|
||||
struct SDelFWriter {
|
||||
|
@ -541,6 +562,27 @@ struct SDelFWriter {
|
|||
TdFilePtr pWriteH;
|
||||
};
|
||||
|
||||
struct SDataFWriter {
|
||||
STsdb *pTsdb;
|
||||
SDFileSet wSet;
|
||||
|
||||
TdFilePtr pHeadFD;
|
||||
TdFilePtr pDataFD;
|
||||
TdFilePtr pLastFD;
|
||||
TdFilePtr pSmaFD;
|
||||
|
||||
SHeadFile fHead;
|
||||
SDataFile fData;
|
||||
SLastFile fLast;
|
||||
SSmaFile fSma;
|
||||
};
|
||||
|
||||
struct STsdbReadSnap {
|
||||
SMemTable *pMem;
|
||||
SMemTable *pIMem;
|
||||
STsdbFS fs;
|
||||
};
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -273,7 +273,6 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
|||
int32_t code = 0;
|
||||
STsdb *pTsdb = pCommitter->pTsdb;
|
||||
SDFileSet *pRSet = NULL;
|
||||
SDFileSet wSet;
|
||||
|
||||
// memory
|
||||
pCommitter->nextKey = TSKEY_MAX;
|
||||
|
@ -292,23 +291,29 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
|||
}
|
||||
|
||||
// new
|
||||
SHeadFile fHead;
|
||||
SDataFile fData;
|
||||
SLastFile fLast;
|
||||
SSmaFile fSma;
|
||||
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .pLastF = &fLast, .pSmaF = &fSma};
|
||||
|
||||
taosArrayClear(pCommitter->aBlockIdxN);
|
||||
tMapDataReset(&pCommitter->nBlockMap);
|
||||
tBlockDataReset(&pCommitter->nBlockData);
|
||||
if (pRSet) {
|
||||
wSet = (SDFileSet){.diskId = pRSet->diskId,
|
||||
.fid = pCommitter->commitFid,
|
||||
.fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
|
||||
.fData = pRSet->fData,
|
||||
.fLast = {.commitID = pCommitter->commitID, .size = 0},
|
||||
.fSma = pRSet->fSma};
|
||||
wSet.diskId = pRSet->diskId;
|
||||
wSet.fid = pCommitter->commitFid;
|
||||
fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0};
|
||||
fData = *pRSet->pDataF;
|
||||
fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0};
|
||||
fSma = *pRSet->pSmaF;
|
||||
} else {
|
||||
wSet = (SDFileSet){.diskId = (SDiskID){.level = 0, .id = 0},
|
||||
.fid = pCommitter->commitFid,
|
||||
.fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
|
||||
.fData = {.commitID = pCommitter->commitID, .size = 0},
|
||||
.fLast = {.commitID = pCommitter->commitID, .size = 0},
|
||||
.fSma = {.commitID = pCommitter->commitID, .size = 0}};
|
||||
wSet.diskId = (SDiskID){.level = 0, .id = 0};
|
||||
wSet.fid = pCommitter->commitFid;
|
||||
fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0};
|
||||
fData = (SDataFile){.commitID = pCommitter->commitID, .size = 0};
|
||||
fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0};
|
||||
fSma = (SSmaFile){.commitID = pCommitter->commitID, .size = 0};
|
||||
}
|
||||
code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, &wSet);
|
||||
if (code) goto _err;
|
||||
|
@ -855,7 +860,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
|
|||
if (code) goto _err;
|
||||
|
||||
// upsert SDFileSet
|
||||
code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->pFS->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter));
|
||||
code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->pFS->nState, &pCommitter->pWriter->wSet);
|
||||
if (code) goto _err;
|
||||
|
||||
// close and sync
|
||||
|
|
|
@ -174,62 +174,64 @@ static int32_t tsdbApplyDFileSetChange(STsdbFS *pFS, SDFileSet *pFrom, SDFileSet
|
|||
char fname[TSDB_FILENAME_LEN];
|
||||
|
||||
if (pFrom && pTo) {
|
||||
bool isSameDisk = (pFrom->diskId.level == pTo->diskId.level) && (pFrom->diskId.id == pTo->diskId.id);
|
||||
|
||||
// head
|
||||
if (tsdbFileIsSame(pFrom, pTo, TSDB_HEAD_FILE)) {
|
||||
ASSERT(pFrom->fHead.size == pTo->fHead.size);
|
||||
ASSERT(pFrom->fHead.offset == pTo->fHead.offset);
|
||||
if (isSameDisk && pFrom->pHeadF->commitID == pTo->pHeadF->commitID) {
|
||||
ASSERT(pFrom->pHeadF->size == pTo->pHeadF->size);
|
||||
ASSERT(pFrom->pHeadF->offset == pTo->pHeadF->offset);
|
||||
} else {
|
||||
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_HEAD_FILE, fname);
|
||||
tsdbHeadFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pHeadF, fname);
|
||||
taosRemoveFile(fname);
|
||||
}
|
||||
|
||||
// data
|
||||
if (tsdbFileIsSame(pFrom, pTo, TSDB_DATA_FILE)) {
|
||||
if (pFrom->fData.size > pTo->fData.size) {
|
||||
if (isSameDisk && pFrom->pDataF->commitID == pTo->pDataF->commitID) {
|
||||
if (pFrom->pDataF->size > pTo->pDataF->size) {
|
||||
code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_DATA_FILE);
|
||||
if (code) goto _err;
|
||||
}
|
||||
} else {
|
||||
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_DATA_FILE, fname);
|
||||
tsdbDataFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pDataF, fname);
|
||||
taosRemoveFile(fname);
|
||||
}
|
||||
|
||||
// last
|
||||
if (tsdbFileIsSame(pFrom, pTo, TSDB_LAST_FILE)) {
|
||||
if (pFrom->fLast.size > pTo->fLast.size) {
|
||||
if (isSameDisk && pFrom->pLastF->commitID == pTo->pLastF->commitID) {
|
||||
if (pFrom->pLastF->size > pTo->pLastF->size) {
|
||||
code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_LAST_FILE);
|
||||
if (code) goto _err;
|
||||
}
|
||||
} else {
|
||||
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_LAST_FILE, fname);
|
||||
tsdbLastFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pLastF, fname);
|
||||
taosRemoveFile(fname);
|
||||
}
|
||||
|
||||
// sma
|
||||
if (tsdbFileIsSame(pFrom, pTo, TSDB_SMA_FILE)) {
|
||||
if (pFrom->fSma.size > pTo->fSma.size) {
|
||||
if (isSameDisk && pFrom->pSmaF->commitID == pTo->pSmaF->commitID) {
|
||||
if (pFrom->pSmaF->size > pTo->pSmaF->size) {
|
||||
code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_SMA_FILE);
|
||||
if (code) goto _err;
|
||||
}
|
||||
} else {
|
||||
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_SMA_FILE, fname);
|
||||
tsdbSmaFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pSmaF, fname);
|
||||
taosRemoveFile(fname);
|
||||
}
|
||||
} else if (pFrom) {
|
||||
// head
|
||||
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_HEAD_FILE, fname);
|
||||
tsdbHeadFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pHeadF, fname);
|
||||
taosRemoveFile(fname);
|
||||
|
||||
// data
|
||||
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_DATA_FILE, fname);
|
||||
tsdbDataFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pDataF, fname);
|
||||
taosRemoveFile(fname);
|
||||
|
||||
// last
|
||||
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_LAST_FILE, fname);
|
||||
tsdbLastFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pLastF, fname);
|
||||
taosRemoveFile(fname);
|
||||
|
||||
// fsm
|
||||
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_SMA_FILE, fname);
|
||||
tsdbSmaFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pSmaF, fname);
|
||||
taosRemoveFile(fname);
|
||||
}
|
||||
|
||||
|
@ -341,7 +343,6 @@ static void tsdbFSDestroy(STsdbFS *pFS) {
|
|||
taosMemoryFree(pFS->cState);
|
||||
}
|
||||
|
||||
taosThreadRwlockDestroy(&pFS->lock);
|
||||
taosMemoryFree(pFS);
|
||||
}
|
||||
// TODO
|
||||
|
@ -358,15 +359,6 @@ static int32_t tsdbFSCreate(STsdb *pTsdb, STsdbFS **ppFS) {
|
|||
}
|
||||
pFS->pTsdb = pTsdb;
|
||||
|
||||
code = taosThreadRwlockInit(&pFS->lock, NULL);
|
||||
if (code) {
|
||||
taosMemoryFree(pFS);
|
||||
code = TAOS_SYSTEM_ERROR(code);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pFS->inTxn = 0;
|
||||
|
||||
pFS->cState = (STsdbFSState *)taosMemoryCalloc(1, sizeof(STsdbFSState));
|
||||
if (pFS->cState == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -431,7 +423,7 @@ static int32_t tsdbScanAndTryFixFS(STsdbFS *pFS, int8_t deepScan) {
|
|||
SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pFS->cState->aDFileSet, iSet);
|
||||
|
||||
// head =========
|
||||
tsdbDataFileName(pTsdb, pDFileSet, TSDB_HEAD_FILE, fname);
|
||||
tsdbHeadFileName(pTsdb, pDFileSet->diskId, pDFileSet->fid, pDFileSet->pHeadF, fname);
|
||||
if (taosStatFile(fname, &size, NULL)) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
@ -442,16 +434,16 @@ static int32_t tsdbScanAndTryFixFS(STsdbFS *pFS, int8_t deepScan) {
|
|||
}
|
||||
|
||||
// data =========
|
||||
tsdbDataFileName(pTsdb, pDFileSet, TSDB_DATA_FILE, fname);
|
||||
tsdbDataFileName(pTsdb, pDFileSet->diskId, pDFileSet->fid, pDFileSet->pDataF, fname);
|
||||
if (taosStatFile(fname, &size, NULL)) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (size < pDFileSet->fData.size) {
|
||||
if (size < pDFileSet->pDataF->size) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
} else if (size > pDFileSet->fData.size) {
|
||||
} else if (size > pDFileSet->pDataF->size) {
|
||||
ASSERT(0);
|
||||
// need to rollback the file
|
||||
}
|
||||
|
@ -461,16 +453,16 @@ static int32_t tsdbScanAndTryFixFS(STsdbFS *pFS, int8_t deepScan) {
|
|||
}
|
||||
|
||||
// last ===========
|
||||
tsdbDataFileName(pTsdb, pDFileSet, TSDB_LAST_FILE, fname);
|
||||
tsdbLastFileName(pTsdb, pDFileSet->diskId, pDFileSet->fid, pDFileSet->pLastF, fname);
|
||||
if (taosStatFile(fname, &size, NULL)) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (size < pDFileSet->fLast.size) {
|
||||
if (size < pDFileSet->pLastF->size) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
} else if (size > pDFileSet->fLast.size) {
|
||||
} else if (size > pDFileSet->pLastF->size) {
|
||||
ASSERT(0);
|
||||
// need to rollback the file
|
||||
}
|
||||
|
@ -480,16 +472,16 @@ static int32_t tsdbScanAndTryFixFS(STsdbFS *pFS, int8_t deepScan) {
|
|||
}
|
||||
|
||||
// sma =============
|
||||
tsdbDataFileName(pTsdb, pDFileSet, TSDB_SMA_FILE, fname);
|
||||
tsdbSmaFileName(pTsdb, pDFileSet->diskId, pDFileSet->fid, pDFileSet->pSmaF, fname);
|
||||
if (taosStatFile(fname, &size, NULL)) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (size < pDFileSet->fSma.size) {
|
||||
if (size < pDFileSet->pSmaF->size) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
} else if (size > pDFileSet->fSma.size) {
|
||||
} else if (size > pDFileSet->pSmaF->size) {
|
||||
ASSERT(0);
|
||||
// need to rollback the file
|
||||
}
|
||||
|
@ -573,8 +565,6 @@ int32_t tsdbFSClose(STsdbFS *pFS) {
|
|||
int32_t tsdbFSBegin(STsdbFS *pFS) {
|
||||
int32_t code = 0;
|
||||
|
||||
ASSERT(!pFS->inTxn);
|
||||
|
||||
// SDelFile
|
||||
pFS->nState->pDelFile = NULL;
|
||||
if (pFS->cState->pDelFile) {
|
||||
|
@ -593,7 +583,6 @@ int32_t tsdbFSBegin(STsdbFS *pFS) {
|
|||
}
|
||||
}
|
||||
|
||||
pFS->inTxn = 1;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
|
@ -631,8 +620,6 @@ int32_t tsdbFSCommit(STsdbFS *pFS) {
|
|||
code = tsdbFSApplyDiskChange(pFS, pFS->nState, pFS->cState);
|
||||
if (code) goto _err;
|
||||
|
||||
pFS->inTxn = 0;
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
|
@ -646,8 +633,6 @@ int32_t tsdbFSRollback(STsdbFS *pFS) {
|
|||
code = tsdbFSApplyDiskChange(pFS, pFS->nState, pFS->cState);
|
||||
if (code) goto _err;
|
||||
|
||||
pFS->inTxn = 0;
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
#include "tsdb.h"
|
||||
|
||||
static int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile) {
|
||||
int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tPutI64v(p ? p + n : p, pHeadFile->commitID);
|
||||
|
@ -35,7 +35,7 @@ static int32_t tGetHeadFile(uint8_t *p, SHeadFile *pHeadFile) {
|
|||
return n;
|
||||
}
|
||||
|
||||
static int32_t tPutDataFile(uint8_t *p, SDataFile *pDataFile) {
|
||||
int32_t tPutDataFile(uint8_t *p, SDataFile *pDataFile) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tPutI64v(p ? p + n : p, pDataFile->commitID);
|
||||
|
@ -53,7 +53,7 @@ static int32_t tGetDataFile(uint8_t *p, SDataFile *pDataFile) {
|
|||
return n;
|
||||
}
|
||||
|
||||
static int32_t tPutLastFile(uint8_t *p, SLastFile *pLastFile) {
|
||||
int32_t tPutLastFile(uint8_t *p, SLastFile *pLastFile) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tPutI64v(p ? p + n : p, pLastFile->commitID);
|
||||
|
@ -71,7 +71,7 @@ static int32_t tGetLastFile(uint8_t *p, SLastFile *pLastFile) {
|
|||
return n;
|
||||
}
|
||||
|
||||
static int32_t tPutSmaFile(uint8_t *p, SSmaFile *pSmaFile) {
|
||||
int32_t tPutSmaFile(uint8_t *p, SSmaFile *pSmaFile) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tPutI64v(p ? p + n : p, pSmaFile->commitID);
|
||||
|
@ -90,90 +90,63 @@ static int32_t tGetSmaFile(uint8_t *p, SSmaFile *pSmaFile) {
|
|||
}
|
||||
|
||||
// EXPOSED APIS ==================================================
|
||||
void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]) {
|
||||
STfs *pTfs = pTsdb->pVnode->pTfs;
|
||||
|
||||
switch (ftype) {
|
||||
case TSDB_HEAD_FILE:
|
||||
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTfs, pDFileSet->diskId),
|
||||
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), pDFileSet->fid, pDFileSet->fHead.commitID,
|
||||
".head");
|
||||
break;
|
||||
case TSDB_DATA_FILE:
|
||||
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTfs, pDFileSet->diskId),
|
||||
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), pDFileSet->fid, pDFileSet->fData.commitID,
|
||||
".data");
|
||||
break;
|
||||
case TSDB_LAST_FILE:
|
||||
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTfs, pDFileSet->diskId),
|
||||
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), pDFileSet->fid, pDFileSet->fLast.commitID,
|
||||
".last");
|
||||
break;
|
||||
case TSDB_SMA_FILE:
|
||||
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTfs, pDFileSet->diskId),
|
||||
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), pDFileSet->fid, pDFileSet->fSma.commitID,
|
||||
".sma");
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
}
|
||||
void tsdbHeadFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SHeadFile *pHeadF, char fname[]) {
|
||||
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTsdb->pVnode->pTfs, did),
|
||||
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pHeadF->commitID, ".head");
|
||||
}
|
||||
|
||||
bool tsdbFileIsSame(SDFileSet *pDFileSet1, SDFileSet *pDFileSet2, EDataFileT ftype) {
|
||||
if (pDFileSet1->diskId.level != pDFileSet2->diskId.level || pDFileSet1->diskId.id != pDFileSet2->diskId.id) {
|
||||
return false;
|
||||
}
|
||||
void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, char fname[]) {
|
||||
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTsdb->pVnode->pTfs, did),
|
||||
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pDataF->commitID, ".data");
|
||||
}
|
||||
|
||||
switch (ftype) {
|
||||
case TSDB_HEAD_FILE:
|
||||
return pDFileSet1->fHead.commitID == pDFileSet2->fHead.commitID;
|
||||
case TSDB_DATA_FILE:
|
||||
return pDFileSet1->fData.commitID == pDFileSet2->fData.commitID;
|
||||
case TSDB_LAST_FILE:
|
||||
return pDFileSet1->fLast.commitID == pDFileSet2->fLast.commitID;
|
||||
case TSDB_SMA_FILE:
|
||||
return pDFileSet1->fSma.commitID == pDFileSet2->fSma.commitID;
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
}
|
||||
void tsdbLastFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SLastFile *pLastF, char fname[]) {
|
||||
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTsdb->pVnode->pTfs, did),
|
||||
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pLastF->commitID, ".last");
|
||||
}
|
||||
|
||||
void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]) {
|
||||
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTsdb->pVnode->pTfs, did),
|
||||
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pSmaF->commitID, ".sma");
|
||||
}
|
||||
|
||||
bool tsdbDelFileIsSame(SDelFile *pDelFile1, SDelFile *pDelFile2) { return pDelFile1->commitID == pDelFile2->commitID; }
|
||||
|
||||
int32_t tsdbUpdateDFileHdr(TdFilePtr pFD, SDFileSet *pSet, EDataFileT ftype) {
|
||||
int32_t code = 0;
|
||||
int64_t n;
|
||||
char hdr[TSDB_FHDR_SIZE];
|
||||
|
||||
memset(hdr, 0, TSDB_FHDR_SIZE);
|
||||
tPutDataFileHdr(hdr, pSet, ftype);
|
||||
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);
|
||||
|
||||
n = taosLSeekFile(pFD, 0, SEEK_SET);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
n = taosWriteFile(pFD, hdr, TSDB_FHDR_SIZE);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) {
|
||||
int32_t code = 0;
|
||||
int64_t size;
|
||||
int64_t n;
|
||||
TdFilePtr pFD;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
char hdr[TSDB_FHDR_SIZE] = {0};
|
||||
|
||||
tsdbDataFileName(pTsdb, pSet, ftype, fname);
|
||||
// truncate
|
||||
switch (ftype) {
|
||||
case TSDB_HEAD_FILE:
|
||||
size = pSet->pHeadF->size;
|
||||
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
|
||||
tPutHeadFile(hdr, pSet->pHeadF);
|
||||
break;
|
||||
case TSDB_DATA_FILE:
|
||||
size = pSet->pDataF->size;
|
||||
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
|
||||
tPutDataFile(hdr, pSet->pDataF);
|
||||
break;
|
||||
case TSDB_LAST_FILE:
|
||||
size = pSet->pLastF->size;
|
||||
tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pLastF, fname);
|
||||
tPutLastFile(hdr, pSet->pLastF);
|
||||
break;
|
||||
case TSDB_SMA_FILE:
|
||||
size = pSet->pSmaF->size;
|
||||
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
|
||||
tPutSmaFile(hdr, pSet->pSmaF);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);
|
||||
|
||||
// open
|
||||
pFD = taosOpenFile(fname, TD_FILE_WRITE);
|
||||
|
@ -182,31 +155,24 @@ int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
// truncate
|
||||
switch (ftype) {
|
||||
case TSDB_HEAD_FILE:
|
||||
size = pSet->fHead.size;
|
||||
break;
|
||||
case TSDB_DATA_FILE:
|
||||
size = pSet->fData.size;
|
||||
break;
|
||||
case TSDB_LAST_FILE:
|
||||
size = pSet->fLast.size;
|
||||
break;
|
||||
case TSDB_SMA_FILE:
|
||||
size = pSet->fSma.size;
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
// ftruncate
|
||||
if (taosFtruncateFile(pFD, size) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// update header
|
||||
code = tsdbUpdateDFileHdr(pFD, pSet, ftype);
|
||||
if (code) goto _err;
|
||||
n = taosLSeekFile(pFD, 0, SEEK_SET);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
n = taosWriteFile(pFD, hdr, TSDB_FHDR_SIZE);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// sync
|
||||
if (taosFsyncFile(pFD) < 0) {
|
||||
|
@ -223,39 +189,16 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tPutDataFileHdr(uint8_t *p, SDFileSet *pSet, EDataFileT ftype) {
|
||||
int32_t n = 0;
|
||||
|
||||
switch (ftype) {
|
||||
case TSDB_HEAD_FILE:
|
||||
n += tPutHeadFile(p ? p + n : p, &pSet->fHead);
|
||||
break;
|
||||
case TSDB_DATA_FILE:
|
||||
n += tPutDataFile(p ? p + n : p, &pSet->fData);
|
||||
break;
|
||||
case TSDB_LAST_FILE:
|
||||
n += tPutLastFile(p ? p + n : p, &pSet->fLast);
|
||||
break;
|
||||
case TSDB_SMA_FILE:
|
||||
n += tPutSmaFile(p ? p + n : p, &pSet->fSma);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
int32_t tPutDFileSet(uint8_t *p, SDFileSet *pSet) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tPutI32v(p ? p + n : p, pSet->diskId.level);
|
||||
n += tPutI32v(p ? p + n : p, pSet->diskId.id);
|
||||
n += tPutI32v(p ? p + n : p, pSet->fid);
|
||||
n += tPutHeadFile(p ? p + n : p, &pSet->fHead);
|
||||
n += tPutDataFile(p ? p + n : p, &pSet->fData);
|
||||
n += tPutLastFile(p ? p + n : p, &pSet->fLast);
|
||||
n += tPutSmaFile(p ? p + n : p, &pSet->fSma);
|
||||
n += tPutHeadFile(p ? p + n : p, pSet->pHeadF);
|
||||
n += tPutDataFile(p ? p + n : p, pSet->pDataF);
|
||||
n += tPutLastFile(p ? p + n : p, pSet->pLastF);
|
||||
n += tPutSmaFile(p ? p + n : p, pSet->pSmaF);
|
||||
|
||||
return n;
|
||||
}
|
||||
|
@ -266,10 +209,10 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
|
|||
n += tGetI32v(p + n, &pSet->diskId.level);
|
||||
n += tGetI32v(p + n, &pSet->diskId.id);
|
||||
n += tGetI32v(p + n, &pSet->fid);
|
||||
n += tGetHeadFile(p + n, &pSet->fHead);
|
||||
n += tGetDataFile(p + n, &pSet->fData);
|
||||
n += tGetLastFile(p + n, &pSet->fLast);
|
||||
n += tGetSmaFile(p + n, &pSet->fSma);
|
||||
n += tGetHeadFile(p + n, pSet->pHeadF);
|
||||
n += tGetDataFile(p + n, pSet->pDataF);
|
||||
n += tGetLastFile(p + n, pSet->pLastF);
|
||||
n += tGetSmaFile(p + n, pSet->pSmaF);
|
||||
|
||||
return n;
|
||||
}
|
||||
|
|
|
@ -607,6 +607,7 @@ void tsdbUnrefMemTable(SMemTable *pMemTable) {
|
|||
}
|
||||
|
||||
int32_t tsdbTakeMemSnapshot(STsdb *pTsdb, SMemTable **ppMem, SMemTable **ppIMem) {
|
||||
ASSERT(0);
|
||||
int32_t code = 0;
|
||||
|
||||
// lock
|
||||
|
@ -640,6 +641,7 @@ _exit:
|
|||
}
|
||||
|
||||
void tsdbUntakeMemSnapshot(STsdb *pTsdb, SMemTable *pMem, SMemTable *pIMem) {
|
||||
ASSERT(0);
|
||||
if (pMem) {
|
||||
tsdbUnrefMemTable(pMem);
|
||||
}
|
||||
|
|
|
@ -3244,3 +3244,59 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) {
|
||||
int32_t code = 0;
|
||||
|
||||
// alloc
|
||||
*ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
|
||||
if (*ppSnap == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// lock
|
||||
code = taosThreadRwlockRdlock(&pTsdb->rwLock);
|
||||
if (code) {
|
||||
code = TAOS_SYSTEM_ERROR(code);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// take snapshot
|
||||
(*ppSnap)->pMem = pTsdb->mem;
|
||||
(*ppSnap)->pIMem = pTsdb->imem;
|
||||
|
||||
if ((*ppSnap)->pMem) {
|
||||
tsdbRefMemTable((*ppSnap)->pMem);
|
||||
}
|
||||
|
||||
if ((*ppSnap)->pIMem) {
|
||||
tsdbRefMemTable((*ppSnap)->pIMem);
|
||||
}
|
||||
|
||||
// fs (todo)
|
||||
|
||||
// unlock
|
||||
code = taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
if (code) {
|
||||
code = TAOS_SYSTEM_ERROR(code);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
|
||||
if (pSnap) {
|
||||
if (pSnap->pMem) {
|
||||
tsdbUnrefMemTable(pSnap->pMem);
|
||||
}
|
||||
|
||||
if (pSnap->pIMem) {
|
||||
tsdbUnrefMemTable(pSnap->pIMem);
|
||||
}
|
||||
|
||||
// fs (todo)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -459,7 +459,7 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
|
|||
|
||||
// open impl
|
||||
// head
|
||||
tsdbDataFileName(pTsdb, pSet, TSDB_HEAD_FILE, fname);
|
||||
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
|
||||
pReader->pHeadFD = taosOpenFile(fname, TD_FILE_READ);
|
||||
if (pReader->pHeadFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -467,7 +467,7 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
|
|||
}
|
||||
|
||||
// data
|
||||
tsdbDataFileName(pTsdb, pSet, TSDB_DATA_FILE, fname);
|
||||
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
|
||||
pReader->pDataFD = taosOpenFile(fname, TD_FILE_READ);
|
||||
if (pReader->pDataFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -475,7 +475,7 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
|
|||
}
|
||||
|
||||
// last
|
||||
tsdbDataFileName(pTsdb, pSet, TSDB_LAST_FILE, fname);
|
||||
tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pLastF, fname);
|
||||
pReader->pLastFD = taosOpenFile(fname, TD_FILE_READ);
|
||||
if (pReader->pLastFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -483,7 +483,7 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
|
|||
}
|
||||
|
||||
// sma
|
||||
tsdbDataFileName(pTsdb, pSet, TSDB_SMA_FILE, fname);
|
||||
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
|
||||
pReader->pSmaFD = taosOpenFile(fname, TD_FILE_READ);
|
||||
if (pReader->pSmaFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -536,8 +536,8 @@ _err:
|
|||
|
||||
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppBuf) {
|
||||
int32_t code = 0;
|
||||
int64_t offset = pReader->pSet->fHead.offset;
|
||||
int64_t size = pReader->pSet->fHead.size - offset;
|
||||
int64_t offset = pReader->pSet->pHeadF->offset;
|
||||
int64_t size = pReader->pSet->pHeadF->size - offset;
|
||||
uint8_t *pBuf = NULL;
|
||||
int64_t n;
|
||||
uint32_t delimiter;
|
||||
|
@ -1211,17 +1211,6 @@ _err:
|
|||
}
|
||||
|
||||
// SDataFWriter ====================================================
|
||||
struct SDataFWriter {
|
||||
STsdb *pTsdb;
|
||||
SDFileSet wSet;
|
||||
TdFilePtr pHeadFD;
|
||||
TdFilePtr pDataFD;
|
||||
TdFilePtr pLastFD;
|
||||
TdFilePtr pSmaFD;
|
||||
};
|
||||
|
||||
SDFileSet *tsdbDataFWriterGetWSet(SDataFWriter *pWriter) { return &pWriter->wSet; }
|
||||
|
||||
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) {
|
||||
int32_t code = 0;
|
||||
int32_t flag;
|
||||
|
@ -1237,12 +1226,20 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
|
|||
goto _err;
|
||||
}
|
||||
pWriter->pTsdb = pTsdb;
|
||||
pWriter->wSet = *pSet;
|
||||
pSet = &pWriter->wSet;
|
||||
pWriter->wSet = (SDFileSet){.diskId = pSet->diskId,
|
||||
.fid = pSet->fid,
|
||||
.pHeadF = &pWriter->fHead,
|
||||
.pDataF = &pWriter->fData,
|
||||
.pLastF = &pWriter->fLast,
|
||||
.pSmaF = &pWriter->fSma};
|
||||
pWriter->fHead = *pSet->pHeadF;
|
||||
pWriter->fData = *pSet->pDataF;
|
||||
pWriter->fLast = *pSet->pLastF;
|
||||
pWriter->fSma = *pSet->pSmaF;
|
||||
|
||||
// head
|
||||
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
|
||||
tsdbDataFileName(pTsdb, pSet, TSDB_HEAD_FILE, fname);
|
||||
tsdbHeadFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fHead, fname);
|
||||
pWriter->pHeadFD = taosOpenFile(fname, flag);
|
||||
if (pWriter->pHeadFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -1257,28 +1254,28 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
|
|||
|
||||
ASSERT(n == TSDB_FHDR_SIZE);
|
||||
|
||||
pSet->fHead.size += TSDB_FHDR_SIZE;
|
||||
pWriter->fHead.size += TSDB_FHDR_SIZE;
|
||||
|
||||
// data
|
||||
if (pSet->fData.size == 0) {
|
||||
if (pWriter->fData.size == 0) {
|
||||
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
|
||||
} else {
|
||||
flag = TD_FILE_WRITE;
|
||||
}
|
||||
tsdbDataFileName(pTsdb, pSet, TSDB_DATA_FILE, fname);
|
||||
tsdbDataFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fData, fname);
|
||||
pWriter->pDataFD = taosOpenFile(fname, flag);
|
||||
if (pWriter->pDataFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
if (pSet->fData.size == 0) {
|
||||
if (pWriter->fData.size == 0) {
|
||||
n = taosWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pSet->fData.size += TSDB_FHDR_SIZE;
|
||||
pWriter->fData.size += TSDB_FHDR_SIZE;
|
||||
} else {
|
||||
n = taosLSeekFile(pWriter->pDataFD, 0, SEEK_END);
|
||||
if (n < 0) {
|
||||
|
@ -1286,29 +1283,29 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
|
|||
goto _err;
|
||||
}
|
||||
|
||||
ASSERT(n == pSet->fData.size);
|
||||
ASSERT(n == pWriter->fData.size);
|
||||
}
|
||||
|
||||
// last
|
||||
if (pSet->fLast.size == 0) {
|
||||
if (pWriter->fLast.size == 0) {
|
||||
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
|
||||
} else {
|
||||
flag = TD_FILE_WRITE;
|
||||
}
|
||||
tsdbDataFileName(pTsdb, pSet, TSDB_LAST_FILE, fname);
|
||||
tsdbLastFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fLast, fname);
|
||||
pWriter->pLastFD = taosOpenFile(fname, flag);
|
||||
if (pWriter->pLastFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
if (pSet->fLast.size == 0) {
|
||||
if (pWriter->fLast.size == 0) {
|
||||
n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pSet->fLast.size += TSDB_FHDR_SIZE;
|
||||
pWriter->fLast.size += TSDB_FHDR_SIZE;
|
||||
} else {
|
||||
n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_END);
|
||||
if (n < 0) {
|
||||
|
@ -1316,29 +1313,29 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
|
|||
goto _err;
|
||||
}
|
||||
|
||||
ASSERT(n == pSet->fLast.size);
|
||||
ASSERT(n == pWriter->fLast.size);
|
||||
}
|
||||
|
||||
// sma
|
||||
if (pSet->fSma.size == 0) {
|
||||
if (pWriter->fSma.size == 0) {
|
||||
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
|
||||
} else {
|
||||
flag = TD_FILE_WRITE;
|
||||
}
|
||||
tsdbDataFileName(pTsdb, pSet, TSDB_SMA_FILE, fname);
|
||||
tsdbSmaFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSma, fname);
|
||||
pWriter->pSmaFD = taosOpenFile(fname, flag);
|
||||
if (pWriter->pSmaFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
if (pSet->fSma.size == 0) {
|
||||
if (pWriter->fSma.size == 0) {
|
||||
n = taosWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pSet->fSma.size += TSDB_FHDR_SIZE;
|
||||
pWriter->fSma.size += TSDB_FHDR_SIZE;
|
||||
} else {
|
||||
n = taosLSeekFile(pWriter->pSmaFD, 0, SEEK_END);
|
||||
if (n < 0) {
|
||||
|
@ -1346,7 +1343,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
|
|||
goto _err;
|
||||
}
|
||||
|
||||
ASSERT(n == pSet->fSma.size);
|
||||
ASSERT(n == pWriter->fSma.size);
|
||||
}
|
||||
|
||||
*ppWriter = pWriter;
|
||||
|
@ -1418,22 +1415,76 @@ _err:
|
|||
|
||||
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) {
|
||||
int32_t code = 0;
|
||||
int64_t n;
|
||||
char hdr[TSDB_FHDR_SIZE];
|
||||
|
||||
// head ==============
|
||||
code = tsdbUpdateDFileHdr(pWriter->pHeadFD, &pWriter->wSet, TSDB_HEAD_FILE);
|
||||
if (code) goto _err;
|
||||
memset(hdr, 0, TSDB_FHDR_SIZE);
|
||||
tPutHeadFile(hdr, &pWriter->fHead);
|
||||
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);
|
||||
|
||||
n = taosLSeekFile(pWriter->pHeadFD, 0, SEEK_SET);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
n = taosWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// data ==============
|
||||
code = tsdbUpdateDFileHdr(pWriter->pHeadFD, &pWriter->wSet, TSDB_DATA_FILE);
|
||||
if (code) goto _err;
|
||||
memset(hdr, 0, TSDB_FHDR_SIZE);
|
||||
tPutDataFile(hdr, &pWriter->fData);
|
||||
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);
|
||||
|
||||
n = taosLSeekFile(pWriter->pDataFD, 0, SEEK_SET);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
n = taosWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// last ==============
|
||||
code = tsdbUpdateDFileHdr(pWriter->pHeadFD, &pWriter->wSet, TSDB_LAST_FILE);
|
||||
if (code) goto _err;
|
||||
memset(hdr, 0, TSDB_FHDR_SIZE);
|
||||
tPutLastFile(hdr, &pWriter->fLast);
|
||||
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);
|
||||
|
||||
n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// sma ==============
|
||||
code = tsdbUpdateDFileHdr(pWriter->pHeadFD, &pWriter->wSet, TSDB_SMA_FILE);
|
||||
if (code) goto _err;
|
||||
memset(hdr, 0, TSDB_FHDR_SIZE);
|
||||
tPutSmaFile(hdr, &pWriter->fSma);
|
||||
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);
|
||||
|
||||
n = taosLSeekFile(pWriter->pSmaFD, 0, SEEK_SET);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
n = taosWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
|
@ -1444,7 +1495,7 @@ _err:
|
|||
|
||||
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf) {
|
||||
int32_t code = 0;
|
||||
SHeadFile *pHeadFile = &pWriter->wSet.fHead;
|
||||
SHeadFile *pHeadFile = &pWriter->fHead;
|
||||
uint8_t *pBuf = NULL;
|
||||
int64_t size;
|
||||
int64_t n;
|
||||
|
@ -1494,7 +1545,7 @@ _err:
|
|||
|
||||
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf, SBlockIdx *pBlockIdx) {
|
||||
int32_t code = 0;
|
||||
SHeadFile *pHeadFile = &pWriter->wSet.fHead;
|
||||
SHeadFile *pHeadFile = &pWriter->fHead;
|
||||
SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
|
||||
uint8_t *pBuf = NULL;
|
||||
int64_t size;
|
||||
|
@ -1831,9 +1882,9 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
|
|||
pSubBlock->nRow = pBlockData->nRow;
|
||||
pSubBlock->cmprAlg = cmprAlg;
|
||||
if (pBlock->last) {
|
||||
pSubBlock->offset = pWriter->wSet.fLast.size;
|
||||
pSubBlock->offset = pWriter->fLast.size;
|
||||
} else {
|
||||
pSubBlock->offset = pWriter->wSet.fData.size;
|
||||
pSubBlock->offset = pWriter->fData.size;
|
||||
}
|
||||
|
||||
// ======================= BLOCK DATA =======================
|
||||
|
@ -1881,9 +1932,9 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
|
|||
|
||||
pSubBlock->szBlock = pSubBlock->szBlockCol + sizeof(TSCKSUM) + nData;
|
||||
if (pBlock->last) {
|
||||
pWriter->wSet.fLast.size += pSubBlock->szBlock;
|
||||
pWriter->fLast.size += pSubBlock->szBlock;
|
||||
} else {
|
||||
pWriter->wSet.fData.size += pSubBlock->szBlock;
|
||||
pWriter->fData.size += pSubBlock->szBlock;
|
||||
}
|
||||
|
||||
// ======================= BLOCK SMA =======================
|
||||
|
@ -1896,8 +1947,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
|
|||
if (code) goto _err;
|
||||
|
||||
if (pSubBlock->nSma > 0) {
|
||||
pSubBlock->sOffset = pWriter->wSet.fSma.size;
|
||||
pWriter->wSet.fSma.size += (sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM));
|
||||
pSubBlock->sOffset = pWriter->fSma.size;
|
||||
pWriter->fSma.size += (sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM));
|
||||
}
|
||||
|
||||
_exit:
|
||||
|
@ -1924,8 +1975,8 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
|
|||
char fNameTo[TSDB_FILENAME_LEN];
|
||||
|
||||
// head
|
||||
tsdbDataFileName(pTsdb, pSetFrom, TSDB_HEAD_FILE, fNameFrom);
|
||||
tsdbDataFileName(pTsdb, pSetTo, TSDB_HEAD_FILE, fNameTo);
|
||||
tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom);
|
||||
tsdbHeadFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pHeadF, fNameTo);
|
||||
|
||||
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
if (pOutFD == NULL) {
|
||||
|
@ -1939,7 +1990,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->fHead.size);
|
||||
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pHeadF->size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
@ -1948,8 +1999,8 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
|
|||
taosCloseFile(&PInFD);
|
||||
|
||||
// data
|
||||
tsdbDataFileName(pTsdb, pSetFrom, TSDB_DATA_FILE, fNameFrom);
|
||||
tsdbDataFileName(pTsdb, pSetTo, TSDB_DATA_FILE, fNameTo);
|
||||
tsdbDataFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pDataF, fNameFrom);
|
||||
tsdbDataFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pDataF, fNameTo);
|
||||
|
||||
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
if (pOutFD == NULL) {
|
||||
|
@ -1963,7 +2014,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->fData.size);
|
||||
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pDataF->size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
@ -1972,8 +2023,9 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
|
|||
taosCloseFile(&PInFD);
|
||||
|
||||
// last
|
||||
tsdbDataFileName(pTsdb, pSetFrom, TSDB_LAST_FILE, fNameFrom);
|
||||
tsdbDataFileName(pTsdb, pSetTo, TSDB_LAST_FILE, fNameTo);
|
||||
tsdbLastFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pLastF, fNameFrom);
|
||||
tsdbLastFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pLastF, fNameTo);
|
||||
|
||||
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
if (pOutFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -1986,7 +2038,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->fLast.size);
|
||||
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pLastF->size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
@ -1995,8 +2047,8 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
|
|||
taosCloseFile(&PInFD);
|
||||
|
||||
// sma
|
||||
tsdbDataFileName(pTsdb, pSetFrom, TSDB_SMA_FILE, fNameFrom);
|
||||
tsdbDataFileName(pTsdb, pSetTo, TSDB_SMA_FILE, fNameTo);
|
||||
tsdbSmaFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pSmaF, fNameFrom);
|
||||
tsdbSmaFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pSmaF, fNameTo);
|
||||
|
||||
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
if (pOutFD == NULL) {
|
||||
|
@ -2010,7 +2062,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->fSma.size);
|
||||
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pSmaF->size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
|
|
@ -798,7 +798,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
|
|||
code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW, NULL);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbFSStateUpsertDFileSet(pTsdb->pFS->nState, tsdbDataFWriterGetWSet(pWriter->pDataFWriter));
|
||||
code = tsdbFSStateUpsertDFileSet(pTsdb->pFS->nState, &pWriter->pDataFWriter->wSet);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
|
||||
|
@ -863,22 +863,26 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
|
|||
tBlockDataReset(&pWriter->bDataR);
|
||||
|
||||
// write
|
||||
SDFileSet wSet;
|
||||
SHeadFile fHead;
|
||||
SDataFile fData;
|
||||
SLastFile fLast;
|
||||
SSmaFile fSma;
|
||||
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .pLastF = &fLast, .pSmaF = &fSma};
|
||||
|
||||
if (pSet) {
|
||||
wSet = (SDFileSet){.diskId = pSet->diskId,
|
||||
.fid = fid,
|
||||
.fHead = {.commitID = pWriter->commitID, .offset = 0, .size = 0},
|
||||
.fData = pSet->fData,
|
||||
.fLast = {.commitID = pWriter->commitID, .size = 0},
|
||||
.fSma = pSet->fSma};
|
||||
wSet.diskId = pSet->diskId;
|
||||
wSet.fid = fid;
|
||||
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
|
||||
fData = *pSet->pDataF;
|
||||
fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0};
|
||||
fSma = *pSet->pSmaF;
|
||||
} else {
|
||||
wSet = (SDFileSet){.diskId = (SDiskID){.level = 0, .id = 0},
|
||||
.fid = fid,
|
||||
.fHead = {.commitID = pWriter->commitID, .offset = 0, .size = 0},
|
||||
.fData = {.commitID = pWriter->commitID, .size = 0},
|
||||
.fLast = {.commitID = pWriter->commitID, .size = 0},
|
||||
.fSma = {.commitID = pWriter->commitID, .size = 0}};
|
||||
wSet.diskId = (SDiskID){.level = 0, .id = 0};
|
||||
wSet.fid = fid;
|
||||
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
|
||||
fData = (SDataFile){.commitID = pWriter->commitID, .size = 0};
|
||||
fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0};
|
||||
fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0};
|
||||
}
|
||||
|
||||
code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet);
|
||||
|
|
Loading…
Reference in New Issue