more work

This commit is contained in:
Hongze Cheng 2022-06-10 09:49:01 +00:00
parent 17efc22b03
commit 2ff00f23a5
5 changed files with 198 additions and 46 deletions

View File

@ -57,7 +57,7 @@ bool tsdbTbDataIterNext(STbDataIter *pIter);
bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow); bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow);
// tsdbFile.c ============================================================================================== // tsdbFile.c ==============================================================================================
typedef struct STsdbTombstoneFile STsdbTombstoneFile; typedef struct SDelFile SDelFile;
typedef struct STsdbCacheFile STsdbCacheFile; typedef struct STsdbCacheFile STsdbCacheFile;
typedef struct STsdbIndexFile STsdbIndexFile; typedef struct STsdbIndexFile STsdbIndexFile;
typedef struct STsdbDataFile STsdbDataFile; typedef struct STsdbDataFile STsdbDataFile;
@ -74,7 +74,25 @@ int32_t tsdbFSClose(STsdbFS *pFS);
int32_t tsdbFSStart(STsdbFS *pFS); int32_t tsdbFSStart(STsdbFS *pFS);
int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback); int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback);
// tsdbReaderWritter.c ============================================================================================== // tsdbReaderWriter.c ==============================================================================================
typedef struct SDelData SDelData;
typedef struct SDelIdx SDelIdx;
// SDelFWriter
typedef struct SDelFWriter SDelFWriter;
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile);
int32_t tsdbDelFWriterClose(SDelFWriter *pWriter);
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf);
int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SDelIdx *pDelIdx, uint8_t **ppBuf);
// SDelFReader
typedef struct SDelFReader SDelFReader;
int32_t tsdbDelFReaderOpen(SDelFReader *pReader, SDelFile *pFile);
int32_t tsdbDelFReaderClose(SDelFReader *pReader);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelData *pDelData, uint8_t **ppBuf);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf);
// tsdbCommit.c ============================================================================================== // tsdbCommit.c ==============================================================================================
@ -169,8 +187,6 @@ typedef struct SReadH SReadH;
typedef struct SDFileSetReader SDFileSetReader; typedef struct SDFileSetReader SDFileSetReader;
typedef struct SDFileSetWriter SDFileSetWriter; typedef struct SDFileSetWriter SDFileSetWriter;
typedef struct STombstoneFileWriter STombstoneFileWriter;
typedef struct STombstoneFileReader STombstoneFileReader;
// SDFileSetWriter // SDFileSetWriter
int32_t tsdbDFileSetWriterOpen(SDFileSetWriter *pWriter, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDFileSetWriterOpen(SDFileSetWriter *pWriter, STsdb *pTsdb, SDFileSet *pSet);
@ -186,13 +202,9 @@ int32_t tsdbLoadSBlockIdx(SDFileSetReader *pReader, SArray *pArray);
int32_t tsdbLoadSBlockInfo(SDFileSetReader *pReader, SBlockIdx *pBlockIdx, SBlockInfo *pBlockInfo); int32_t tsdbLoadSBlockInfo(SDFileSetReader *pReader, SBlockIdx *pBlockIdx, SBlockInfo *pBlockInfo);
int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockStatis *pBlockStatis); int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockStatis *pBlockStatis);
// STombstoneFileWriter // SDelFWriter
int32_t tsdbTomstoneFileWriterOpen(STombstoneFileWriter *pWriter, STsdb *pTsdb);
int32_t tsdbTomstoneFileWriterClose(STombstoneFileWriter *pWriter);
// STombstoneFileReader // SDelFReader
int32_t tsdbTomstoneFileReaderOpen(STombstoneFileReader *pReader, STsdb *pTsdb);
int32_t tsdbTomstoneFileReaderClose(STombstoneFileReader *pReader);
// tsdbUtil.c ============================================================================================== // tsdbUtil.c ==============================================================================================
int32_t tTABLEIDCmprFn(const void *p1, const void *p2); int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
@ -303,7 +315,7 @@ struct SMemTable {
TSDBKEY maxKey; TSDBKEY maxKey;
int64_t nRow; int64_t nRow;
int64_t nDelOp; int64_t nDelOp;
SArray *aTbData; // SArray<STbData> SArray *aTbData; // SArray<STbData*>
}; };
// struct STsdbFSMeta { // struct STsdbFSMeta {
@ -727,6 +739,12 @@ struct TABLEID {
tb_uid_t uid; tb_uid_t uid;
}; };
struct STbDataIter {
STbData *pTbData;
int8_t backward;
SMemSkipListNode *pNode;
};
struct SDelOp { struct SDelOp {
int64_t version; int64_t version;
TSKEY sKey; // included TSKEY sKey; // included
@ -735,17 +753,39 @@ struct SDelOp {
}; };
typedef struct { typedef struct {
tb_uid_t suid;
tb_uid_t uid;
int64_t version; int64_t version;
TSKEY sKey; TSKEY sKey;
TSKEY eKey; TSKEY eKey;
} SDelInfo; } SDelDataItem;
struct STbDataIter { struct SDelData {
STbData *pTbData; uint32_t delimiter;
int8_t backward; tb_uid_t suid;
SMemSkipListNode *pNode; tb_uid_t uid;
int8_t flags;
int64_t nItem;
uint8_t *pOffset;
uint32_t nData;
uint8_t *pData;
};
typedef struct {
tb_uid_t suid;
tb_uid_t uid;
TSKEY minKey;
TSKEY maxKey;
int64_t maxVersion;
int64_t minVersion;
int64_t offset;
int64_t size;
} SDelIdxItem;
struct SDelIdx {
uint32_t delimiter;
int8_t flags;
int64_t nItem;
uint8_t *pOffset;
uint8_t *pDelIdxItem;
}; };
#endif #endif

View File

@ -19,6 +19,9 @@ typedef struct SCommitter SCommitter;
struct SCommitter { struct SCommitter {
STsdb *pTsdb; STsdb *pTsdb;
uint8_t *pBuf1;
uint8_t *pBuf2;
/* commit data */
int32_t minutes; int32_t minutes;
int8_t precision; int8_t precision;
TSKEY nextCommitKey; TSKEY nextCommitKey;
@ -33,6 +36,14 @@ struct SCommitter {
// commit table data // commit table data
STbData *pTbData; STbData *pTbData;
SBlockIdx *pBlockIdx; SBlockIdx *pBlockIdx;
/* commit del */
SDelFReader *pTombstoneReader;
SDelFWriter *pTombstoneWritter;
SDelIdx delIdxO;
SDelIdx delIdxN;
SDelData delDataO;
SDelData delDataN;
/* commit cache */
}; };
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter); static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
@ -160,13 +171,62 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) { static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
// TODO STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem;
int32_t iTbData = 0;
int32_t nTbData = taosArrayGetSize(pMemTable->aTbData);
int32_t iDelIdx = 0;
int32_t nDelIdx; // TODO
int32_t c;
STbData *pTbData = NULL;
SDelIdx *pDelIdx = NULL;
while (iTbData < nTbData || iDelIdx < nDelIdx) {
if (iTbData < nTbData) {
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
} else {
pTbData = NULL;
}
if (iDelIdx < nDelIdx) {
// pDelIdx = ; // TODO
} else {
pDelIdx = NULL;
}
if (pTbData && pDelIdx) {
c = tTABLEIDCmprFn(pTbData, pDelIdx);
if (c == 0) {
iTbData++;
iDelIdx++;
} else if (c < 0) {
iTbData++;
pDelIdx = NULL;
} else {
iDelIdx++;
pTbData = NULL;
}
} else {
if (pTbData) {
iTbData++;
} else {
iDelIdx++;
}
}
// TODO: commit with the pTbData and pDelIdx
}
return code; return code;
} }
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
// TODO
ASSERT(pCommitter->delIdxN.nItem > 0);
return code;
_err:
return code; return code;
} }

View File

@ -20,7 +20,7 @@ struct STsdbFS {
TdThreadRwlock lock; TdThreadRwlock lock;
int64_t minVersion; int64_t minVersion;
int64_t maxVersion; int64_t maxVersion;
STsdbTombstoneFile *pTombstoneF; SDelFile *pTombstoneF;
STsdbCacheFile *pCacheF; STsdbCacheFile *pCacheF;
SArray *pArray; SArray *pArray;
}; };

View File

@ -18,11 +18,13 @@
static const char *tsdbFileSuffix[] = {".tombstone", ".cache", ".index", ".data", ".last", ".sma", ""}; static const char *tsdbFileSuffix[] = {".tombstone", ".cache", ".index", ".data", ".last", ".sma", ""};
// .tombstone // .tombstone
struct STsdbTombstoneFile { struct SDelFile {
TSKEY minKey; TSKEY minKey;
TSKEY maxKey; TSKEY maxKey;
int64_t minVersion; int64_t minVersion;
int64_t maxVersion; int64_t maxVersion;
int64_t size;
int64_t offset;
}; };
struct STsdbIndexFile { struct STsdbIndexFile {

View File

@ -73,12 +73,62 @@ int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockSta
return code; return code;
} }
// STombstoneFileWriter ==================================================== // SDelFWriter ====================================================
struct STombstoneFileWriter { struct SDelFWriter {
STsdb *pTsdb; SDelFile *pFile;
TdFilePtr pWriteH;
}; };
// STombstoneFileReader ==================================================== int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile) {
struct STombstoneFileReader { int32_t code = 0;
STsdb *pTsdb; // TODO
return code;
}
int32_t tsdbDelFWriterClose(SDelFWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SDelIdx *pDelIdx, uint8_t **ppBuf) {
int32_t code = 0;
// TODO
return code;
}
// SDelFReader ====================================================
struct SDelFReader {
SDelFile *pFile;
TdFilePtr pReadH;
}; };
int32_t tsdbDelFReaderOpen(SDelFReader *pReader, SDelFile *pFile) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbDelFReaderClose(SDelFReader *pReader) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbReadDelData(SDelFReader *pReader, SDelData *pDelData, uint8_t **ppBuf) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf) {
int32_t code = 0;
// TODO
return code;
}