more code

This commit is contained in:
Hongze Cheng 2023-05-28 23:33:03 +08:00
parent 9d031f4d7b
commit 9d6a264e4d
7 changed files with 137 additions and 60 deletions

View File

@ -14,6 +14,7 @@
*/
#include "tsdbDef.h"
#include "tsdbFSet.h"
#ifndef _TSDB_DATA_FILE_RW_H
#define _TSDB_DATA_FILE_RW_H
@ -26,24 +27,19 @@ extern "C" {
typedef struct SDataFileReader SDataFileReader;
typedef struct SDataFileReaderConfig SDataFileReaderConfig;
int32_t tsdbDataFileReaderOpen(const char *fname, const SDataFileReaderConfig *config, SDataFileReader **reader);
int32_t tsdbDataFileReaderClose(SDataFileReader *reader);
// SDataFileWriter =============================================
typedef struct SDataFileWriter SDataFileWriter;
typedef struct SDataFileWriterConfig SDataFileWriterConfig;
typedef struct SDataFileWriterConfig {
STsdb *tsdb;
} SDataFileWriterConfig;
int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer);
int32_t tsdbDataFileWriterClose(SDataFileWriter *writer);
int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, STFileOp op[/*TSDB_FTYPE_MAX*/]);
int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SBlockData *bData);
int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData);
struct SDataFileReaderConfig {
STsdb *pTsdb;
// TODO
};
struct SDataFileWriterConfig {
STsdb *pTsdb;
// TODO
};
// int32_t tsdbDataFileWriteDelData(SDataFileWriter *writer, SBlockData *bData);
#ifdef __cplusplus
}

View File

@ -34,7 +34,7 @@ typedef struct SSttSegReader SSttSegReader;
typedef TARRAY2(SSttSegReader *) TSttSegReaderArray;
// SSttFileReader
int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **reader);
int32_t tsdbSttFReaderOpen(const char *fname, const SSttFileReaderConfig *config, SSttFileReader **reader);
int32_t tsdbSttFReaderClose(SSttFileReader **reader);
int32_t tsdbSttFReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **readerArray);

View File

@ -24,8 +24,9 @@ extern "C" {
// SDelBlock ----------
#define DEL_RECORD_NUM_ELEM 5
typedef union {
int64_t aData[5];
int64_t aData[DEL_RECORD_NUM_ELEM];
struct {
int64_t suid;
int64_t uid;
@ -36,7 +37,7 @@ typedef union {
} SDelRecord;
typedef union {
TARRAY2(int64_t) aData[5];
TARRAY2(int64_t) aData[DEL_RECORD_NUM_ELEM];
struct {
TARRAY2(int64_t) suid[1];
TARRAY2(int64_t) uid[1];
@ -48,11 +49,12 @@ typedef union {
typedef struct SDelBlk {
int32_t nRow;
int32_t size[DEL_RECORD_NUM_ELEM];
TABLEID minTid;
TABLEID maxTid;
int64_t minVer;
int64_t maxVer;
SFDataPtr dp;
SFDataPtr dp[1];
} SDelBlk;
#define DEL_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid)
@ -65,8 +67,9 @@ int32_t tDelBlockEncode(SDelBlock *delBlock, void *buf, int32_t size);
int32_t tDelBlockDecode(const void *buf, SDelBlock *delBlock);
// STbStatisBlock ----------
#define STATIS_RECORD_NUM_ELEM 9
typedef union {
int64_t aData[9];
int64_t aData[STATIS_RECORD_NUM_ELEM];
struct {
int64_t suid;
int64_t uid;
@ -81,7 +84,7 @@ typedef union {
} STbStatisRecord;
typedef union {
TARRAY2(int64_t) aData[9];
TARRAY2(int64_t) aData[STATIS_RECORD_NUM_ELEM];
struct {
TARRAY2(int64_t) suid[1];
TARRAY2(int64_t) uid[1];
@ -97,7 +100,7 @@ typedef union {
typedef struct STbStatisBlk {
int32_t numRec;
int32_t size[9];
int32_t size[STATIS_RECORD_NUM_ELEM];
TABLEID minTid;
TABLEID maxTid;
int64_t minVer;

View File

@ -272,8 +272,10 @@ static int32_t commit_fset_end(SCommitter *pCommitter) {
code = tsdbSttFWriterClose(&pCommitter->pWriter, 0, &op);
TSDB_CHECK_CODE(code, lino, _exit);
if (op.optype != TSDB_FOP_NONE) {
code = TARRAY2_APPEND(&pCommitter->opArray, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {

View File

@ -22,50 +22,81 @@ struct SDataFileReader {
// SDataFileWriter =============================================
struct SDataFileWriter {
SDataFileWriterConfig config[1];
struct {
bool opened;
} ctx[1];
// TODO
};
int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **ppWriter) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(config->pTsdb->pVnode);
// TODO
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
}
return code;
int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer) {
writer[0] = taosMemoryCalloc(1, sizeof(SDataFileWriter));
if (!writer[0]) return TSDB_CODE_OUT_OF_MEMORY;
writer[0]->ctx->opened = false;
return 0;
}
int32_t tsdbDataFileWriterClose(SDataFileWriter *pWriter) {
static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer) {
// TODO
return 0;
}
static int32_t tsdbDataFileWriterCloseAbort(SDataFileWriter *writer) {
// TODO
return 0;
}
static int32_t tsdbDataFileWriterDoClose(SDataFileWriter *writer) {
// TODO
return 0;
}
static int32_t tsdbDataFileWriterCloseImpl(SDataFileWriter *writer, bool abort, STFileOp op[/*TSDB_FTYPE_MAX*/]) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = 0; // TODO: TD_VID(config->pTsdb->pVnode);
// TODO
int32_t vid = TD_VID(writer->config->tsdb->pVnode);
if (!writer->ctx->opened) {
for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) op[i].optype = TSDB_FOP_NONE;
} else {
if (abort) {
code = tsdbDataFileWriterCloseAbort(writer);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbDataFileWriterCloseCommit(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
tsdbDataFileWriterDoClose(writer);
}
taosMemoryFree(writer);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
}
return code;
}
int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, STFileOp op[/*TSDB_FTYPE_MAX*/]) {
int32_t code = tsdbDataFileWriterCloseImpl(writer[0], abort, op);
if (code) {
return code;
} else {
writer[0] = NULL;
return 0;
}
}
int32_t tsdbDataFileWriteTSData(SDataFileWriter *pWriter, SBlockData *pBlockData) {
static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
// TODO
return 0;
}
int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SBlockData *bData) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = 0; // TODO: TD_VID(config->pTsdb->pVnode);
// TODO
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
}
return code;
}
int32_t vid = TD_VID(writer->config->tsdb->pVnode);
if (!writer->ctx->opened) {
code = tsdbDataFileWriterDoOpen(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *pWriter, SBlockData *pBlockData) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = 0; // TODO: TD_VID(config->pTsdb->pVnode);
// TODO
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));

View File

@ -202,7 +202,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
.tsdb = merger->tsdb,
// TODO
};
code = tsdbSttFReaderOpen(&config, &reader);
code = tsdbSttFReaderOpen(fobj->fname, &config, &reader);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(&merger->sttReaderArr, reader);
@ -259,7 +259,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
// open data file writer
if (merger->ctx.toData) {
SDataFileWriterConfig config = {
.pTsdb = merger->tsdb,
.tsdb = merger->tsdb,
// TODO
};
code = tsdbDataFileWriterOpen(&config, &merger->dataWriter);

View File

@ -84,7 +84,7 @@ static int32_t tsdbSttSegReaderClose(SSttSegReader **reader) {
return 0;
}
int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **reader) {
int32_t tsdbSttFReaderOpen(const char *fname, const SSttFileReaderConfig *config, SSttFileReader **reader) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(config->tsdb->pVnode);
@ -95,8 +95,6 @@ int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **
reader[0]->config[0] = config[0];
// open file
char fname[TSDB_FILENAME_LEN];
tsdbTFileName(config->tsdb, config->file, fname);
code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd);
TSDB_CHECK_CODE(code, lino, _exit);
@ -221,13 +219,60 @@ int32_t tsdbSttFReadSttBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBloc
int32_t tsdbSttFReadDelBlock(SSttSegReader *reader, const SDelBlk *delBlk, SDelBlock *dData) {
int32_t code = 0;
// TODO
int32_t lino = 0;
int32_t vid = TD_VID(reader->reader->config->tsdb->pVnode);
tDelBlockClear(dData);
code = tRealloc(&reader->reader->config->aBuf[0], delBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->reader->fd, delBlk->dp->offset, reader->reader->config->aBuf[0], delBlk->dp->size);
if (code) TSDB_CHECK_CODE(code, lino, _exit);
int64_t size = 0;
for (int32_t i = 0; i < ARRAY_SIZE(dData->aData); ++i) {
code = tsdbDecmprData(reader->reader->config->aBuf[0] + size, delBlk->size[i], TSDB_DATA_TYPE_BIGINT,
TWO_STAGE_COMP, NULL, 0, NULL); // TODO
TSDB_CHECK_CODE(code, lino, _exit);
size += delBlk->size[i];
}
ASSERT(size == delBlk->dp->size);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d, reason:%s", vid, __func__, lino, tstrerror(code));
}
return code;
}
int32_t tsdbSttFReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *statisBlk, STbStatisBlock *sData) {
int32_t code = 0;
// TODO
int32_t lino = 0;
int32_t vid = TD_VID(reader->reader->config->tsdb->pVnode);
tStatisBlockClear(sData);
code = tRealloc(&reader->reader->config->aBuf[0], statisBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->reader->fd, statisBlk->dp->offset, reader->reader->config->aBuf[0], statisBlk->dp->size);
if (code) TSDB_CHECK_CODE(code, lino, _exit);
int64_t size = 0;
for (int32_t i = 0; i < ARRAY_SIZE(sData->aData); ++i) {
code = tsdbDecmprData(reader->reader->config->aBuf[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT,
TWO_STAGE_COMP, NULL, 0, NULL); // TODO
TSDB_CHECK_CODE(code, lino, _exit);
size += statisBlk->size[i];
}
ASSERT(size == statisBlk->dp->size);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d, reason:%s", vid, __func__, lino, tstrerror(code));
}
return code;
}