more code

This commit is contained in:
Hongze Cheng 2023-06-06 16:17:29 +08:00
parent 96d29bc93b
commit 89a9e01375
4 changed files with 399 additions and 221 deletions

View File

@ -64,6 +64,8 @@ int32_t tTombBlockFree(STombBlock *delBlock);
int32_t tTombBlockClear(STombBlock *delBlock);
int32_t tTombBlockPut(STombBlock *delBlock, const STombRecord *delRecord);
int32_t tTombRecordCmpr(const STombRecord *r1, const STombRecord *r2);
// STbStatisBlock ----------
#define STATIS_RECORD_NUM_ELEM 9
typedef union {

View File

@ -17,9 +17,13 @@
typedef struct {
SFDataPtr blockIdxPtr[1];
SFDataPtr tombBlkPtr[1]; // TODO: keep footer here
SFDataPtr rsrvd[2];
} SDataFooter;
} SHeadFooter;
typedef struct {
SFDataPtr tombBlkPtr[1];
SFDataPtr rsrvd[2];
} STombFooter;
// SDataFileReader =============================================
struct SDataFileReader {
@ -28,7 +32,8 @@ struct SDataFileReader {
uint8_t *bufArr[5];
struct {
bool footerLoaded;
bool headFooterLoaded;
bool tombFooterLoaded;
bool blockIdxLoaded;
bool tombBlkLoaded;
TABLEID tbid[1];
@ -36,15 +41,16 @@ struct SDataFileReader {
STsdbFD *fd[TSDB_FTYPE_MAX];
SDataFooter footer[1];
SHeadFooter headFooter[1];
STombFooter tombFooter[1];
TBlockIdxArray blockIdxArray[1];
TDataBlkArray dataBlkArray[1];
TTombBlkArray tombBlkArray[1];
};
static int32_t tsdbDataFileReadFooter(SDataFileReader *reader) {
if (!reader->config->files[TSDB_FTYPE_HEAD].exist //
|| reader->ctx->footerLoaded) {
static int32_t tsdbDataFileReadHeadFooter(SDataFileReader *reader) {
if (reader->fd[TSDB_FTYPE_HEAD] == NULL //
|| reader->ctx->headFooterLoaded) {
return 0;
}
@ -52,10 +58,32 @@ static int32_t tsdbDataFileReadFooter(SDataFileReader *reader) {
int32_t lino = 0;
code =
tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->config->files[TSDB_FTYPE_HEAD].file.size - sizeof(SDataFooter),
(uint8_t *)reader->footer, sizeof(SDataFooter));
tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->config->files[TSDB_FTYPE_HEAD].file.size - sizeof(SHeadFooter),
(uint8_t *)reader->headFooter, sizeof(SHeadFooter));
TSDB_CHECK_CODE(code, lino, _exit);
reader->ctx->footerLoaded = true;
reader->ctx->headFooterLoaded = true;
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbDataFileReadTombFooter(SDataFileReader *reader) {
if (reader->fd[TSDB_FTYPE_TOMB] == NULL //
|| reader->ctx->tombFooterLoaded) {
return 0;
}
int32_t code = 0;
int32_t lino = 0;
code =
tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->config->files[TSDB_FTYPE_TOMB].file.size - sizeof(STombFooter),
(uint8_t *)reader->tombFooter, sizeof(STombFooter));
TSDB_CHECK_CODE(code, lino, _exit);
reader->ctx->tombFooterLoaded = true;
_exit:
if (code) {
@ -81,21 +109,21 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig
if (fname) {
for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
if (fname[i] == NULL) continue;
if (fname[i]) {
code = tsdbOpenFile(fname[i], config->szPage, TD_FILE_READ, &reader[0]->fd[i]);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
} else {
for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
if (!config->files[i].exist) continue;
if (config->files[i].exist) {
char fname1[TSDB_FILENAME_LEN];
tsdbTFileName(config->tsdb, &config->files[i].file, fname1);
code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd[i]);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
_exit:
if (code) {
@ -127,22 +155,21 @@ int32_t tsdbDataFileReadBlockIdx(SDataFileReader *reader, const TBlockIdxArray *
int32_t code = 0;
int32_t lino = 0;
code = tsdbDataFileReadFooter(reader);
TSDB_CHECK_CODE(code, lino, _exit);
if (!reader->ctx->blockIdxLoaded) {
code = tsdbDataFileReadHeadFooter(reader);
TSDB_CHECK_CODE(code, lino, _exit);
TARRAY2_CLEAR(reader->blockIdxArray, NULL);
if (reader->config->files[TSDB_FTYPE_HEAD].exist //
&& reader->footer->blockIdxPtr->size) {
code = tRealloc(&reader->config->bufArr[0], reader->footer->blockIdxPtr->size);
if (reader->fd[TSDB_FTYPE_HEAD] //
&& reader->headFooter->blockIdxPtr->size) {
code = tRealloc(&reader->config->bufArr[0], reader->headFooter->blockIdxPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->footer->blockIdxPtr->offset, reader->config->bufArr[0],
reader->footer->blockIdxPtr->size);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->headFooter->blockIdxPtr->offset,
reader->config->bufArr[0], reader->headFooter->blockIdxPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
int32_t size = reader->footer->blockIdxPtr->size / sizeof(SBlockIdx);
int32_t size = reader->headFooter->blockIdxPtr->size / sizeof(SBlockIdx);
for (int32_t i = 0; i < size; ++i) {
code = TARRAY2_APPEND_PTR(reader->blockIdxArray, ((SBlockIdx *)reader->config->bufArr[0]) + i);
TSDB_CHECK_CODE(code, lino, _exit);
@ -163,7 +190,7 @@ _exit:
int32_t tsdbDataFileReadDataBlk(SDataFileReader *reader, const SBlockIdx *blockIdx,
const TDataBlkArray **dataBlkArray) {
ASSERT(reader->ctx->footerLoaded);
ASSERT(reader->ctx->headFooterLoaded);
if (reader->ctx->tbid->suid == blockIdx->suid && reader->ctx->tbid->uid == blockIdx->uid) {
dataBlkArray[0] = reader->dataBlkArray;
@ -224,23 +251,23 @@ int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **t
int32_t code = 0;
int32_t lino = 0;
if (!reader->ctx->tombBlkLoaded) {
if (!reader->ctx->footerLoaded) {
code = tsdbDataFileReadFooter(reader);
code = tsdbDataFileReadTombFooter(reader);
TSDB_CHECK_CODE(code, lino, _exit);
if (reader->fd[TSDB_FTYPE_TOMB] && !reader->ctx->tombBlkLoaded) {
code = tsdbDataFileReadTombFooter(reader);
TSDB_CHECK_CODE(code, lino, _exit);
}
TARRAY2_CLEAR(reader->tombBlkArray, NULL);
if (reader->footer->tombBlkPtr->size) {
code = tRealloc(&reader->config->bufArr[0], reader->footer->tombBlkPtr->size);
if (reader->tombFooter->tombBlkPtr->size) {
code = tRealloc(&reader->config->bufArr[0], reader->tombFooter->tombBlkPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->footer->tombBlkPtr->offset, reader->config->bufArr[0],
reader->footer->tombBlkPtr->size);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->tombFooter->tombBlkPtr->offset,
reader->config->bufArr[0], reader->tombFooter->tombBlkPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
int32_t size = reader->footer->tombBlkPtr->size / sizeof(STombBlk);
int32_t size = reader->tombFooter->tombBlkPtr->size / sizeof(STombBlk);
for (int32_t i = 0; i < size; ++i) {
code = TARRAY2_APPEND_PTR(reader->tombBlkArray, ((STombBlk *)reader->config->bufArr[0]) + i);
TSDB_CHECK_CODE(code, lino, _exit);
@ -301,6 +328,7 @@ struct SDataFileWriter {
struct {
bool opened;
SDataFileReader *reader;
// for ts data
const TBlockIdxArray *blockIdxArray;
int32_t blockIdxArrayIdx;
bool tbHasOldData;
@ -309,12 +337,19 @@ struct SDataFileWriter {
SBlockData bData[1];
int32_t iRow;
TABLEID tbid[1];
// for tomb data
bool hasOldTomb;
const TTombBlkArray *tombBlkArray;
int32_t tombBlkArrayIdx;
STombBlock tData[1];
int32_t iRowTomb;
} ctx[1];
STFile files[TSDB_FTYPE_MAX];
STsdbFD *fd[TSDB_FTYPE_MAX];
SDataFooter footer[1];
SHeadFooter headFooter[1];
STombFooter tombFooter[1];
TBlockIdxArray blockIdxArray[1];
TDataBlkArray dataBlkArray[1];
TTombBlkArray tombBlkArray[1];
@ -327,14 +362,14 @@ static int32_t tsdbDataFileWriteBlockIdx(SDataFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
writer->footer->blockIdxPtr->offset = writer->files[TSDB_FTYPE_HEAD].size;
writer->footer->blockIdxPtr->size = TARRAY2_DATA_LEN(writer->blockIdxArray);
writer->headFooter->blockIdxPtr->offset = writer->files[TSDB_FTYPE_HEAD].size;
writer->headFooter->blockIdxPtr->size = TARRAY2_DATA_LEN(writer->blockIdxArray);
if (writer->footer->blockIdxPtr->size) {
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->footer->blockIdxPtr->offset,
(void *)TARRAY2_DATA(writer->blockIdxArray), writer->footer->blockIdxPtr->size);
if (writer->headFooter->blockIdxPtr->size) {
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->headFooter->blockIdxPtr->offset,
(void *)TARRAY2_DATA(writer->blockIdxArray), writer->headFooter->blockIdxPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
writer->files[TSDB_FTYPE_HEAD].size += writer->footer->blockIdxPtr->size;
writer->files[TSDB_FTYPE_HEAD].size += writer->headFooter->blockIdxPtr->size;
}
_exit:
@ -350,7 +385,24 @@ static int32_t tsdbDataFileWriterCloseAbort(SDataFileWriter *writer) {
}
static int32_t tsdbDataFileWriterDoClose(SDataFileWriter *writer) {
// TODO
if (writer->ctx->reader) {
tsdbDataFileReaderClose(&writer->ctx->reader);
}
tTombBlockFree(writer->tData);
tStatisBlockFree(writer->sData);
tBlockDataDestroy(writer->bData);
TARRAY2_FREE(writer->tombBlkArray);
TARRAY2_FREE(writer->dataBlkArray);
TARRAY2_FREE(writer->blockIdxArray);
tTombBlockFree(writer->ctx->tData);
for (int32_t i = 0; i < ARRAY_SIZE(writer->bufArr); ++i) {
tFree(writer->bufArr[i]);
}
tDestroyTSchema(writer->skmRow->pTSchema);
tDestroyTSchema(writer->skmTb->pTSchema);
return 0;
}
@ -372,10 +424,6 @@ static int32_t tsdbDataFileWriterDoOpenReader(SDataFileWriter *writer) {
code = tsdbDataFileReaderOpen(NULL, config, &writer->ctx->reader);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDataFileReadBlockIdx(writer->ctx->reader, &writer->ctx->blockIdxArray);
TSDB_CHECK_CODE(code, lino, _exit);
break;
}
}
@ -399,9 +447,11 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
code = tsdbDataFileWriterDoOpenReader(writer);
TSDB_CHECK_CODE(code, lino, _exit);
int32_t ftype;
// .head
writer->files[TSDB_FTYPE_HEAD] = (STFile){
.type = TSDB_FTYPE_HEAD,
ftype = TSDB_FTYPE_HEAD;
writer->files[ftype] = (STFile){
.type = ftype,
.did = writer->config->did,
.fid = writer->config->fid,
.cid = writer->config->cid,
@ -409,38 +459,48 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
};
// .data
if (writer->config->files[TSDB_FTYPE_DATA].exist) {
writer->files[TSDB_FTYPE_DATA] = writer->config->files[TSDB_FTYPE_DATA].file;
ftype = TSDB_FTYPE_DATA;
if (writer->config->files[ftype].exist) {
writer->files[ftype] = writer->config->files[ftype].file;
} else {
writer->files[TSDB_FTYPE_DATA] = writer->files[TSDB_FTYPE_HEAD];
writer->files[TSDB_FTYPE_DATA].type = TSDB_FTYPE_DATA;
writer->files[ftype] = (STFile){
.type = ftype,
.did = writer->config->did,
.fid = writer->config->fid,
.cid = writer->config->cid,
.size = 0,
};
}
// .sma
if (writer->config->files[TSDB_FTYPE_SMA].exist) {
writer->files[TSDB_FTYPE_SMA] = writer->config->files[TSDB_FTYPE_SMA].file;
ftype = TSDB_FTYPE_SMA;
if (writer->config->files[ftype].exist) {
writer->files[ftype] = writer->config->files[ftype].file;
} else {
writer->files[TSDB_FTYPE_SMA] = writer->files[TSDB_FTYPE_HEAD];
writer->files[TSDB_FTYPE_SMA].type = TSDB_FTYPE_SMA;
writer->files[ftype] = (STFile){
.type = ftype,
.did = writer->config->did,
.fid = writer->config->fid,
.cid = writer->config->cid,
.size = 0,
};
}
// .tomb (todo)
writer->files[TSDB_FTYPE_TOMB] = (STFile){
.type = TSDB_FTYPE_TOMB,
// .tomb
ftype = TSDB_FTYPE_TOMB;
writer->files[ftype] = (STFile){
.type = ftype,
.did = writer->config->did,
.fid = writer->config->fid,
.cid = writer->config->cid,
.size = 0,
};
// TODO: init footer
// writer->footer->blockIdxPtr->offset = 0;
writer->ctx->opened = true;
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
} else {
writer->ctx->opened = true;
}
return code;
}
@ -548,16 +608,18 @@ static int32_t tsdbDataFileWriteDataBlk(SDataFileWriter *writer, const TDataBlkA
int32_t code = 0;
int32_t lino = 0;
SBlockIdx blockIdx[1];
blockIdx->suid = writer->ctx->tbid->suid;
blockIdx->uid = writer->ctx->tbid->uid;
blockIdx->offset = writer->files[TSDB_FTYPE_HEAD].size;
blockIdx->size = TARRAY2_DATA_LEN(dataBlkArray);
int32_t ftype = TSDB_FTYPE_HEAD;
SBlockIdx blockIdx[1] = {{
.suid = writer->ctx->tbid->suid,
.uid = writer->ctx->tbid->uid,
.offset = writer->files[ftype].size,
.size = TARRAY2_DATA_LEN(dataBlkArray),
}};
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], blockIdx->offset, (const uint8_t *)TARRAY2_DATA(dataBlkArray),
blockIdx->size);
code =
tsdbWriteFile(writer->fd[ftype], blockIdx->offset, (const uint8_t *)TARRAY2_DATA(dataBlkArray), blockIdx->size);
TSDB_CHECK_CODE(code, lino, _exit);
writer->files[TSDB_FTYPE_HEAD].size += blockIdx->size;
writer->files[ftype].size += blockIdx->size;
code = TARRAY2_APPEND_PTR(writer->blockIdxArray, blockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
@ -628,6 +690,7 @@ static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row)
for (; writer->ctx->dataBlkArrayIdx < TARRAY2_SIZE(writer->ctx->dataBlkArray); writer->ctx->dataBlkArrayIdx++) {
const SDataBlk *dataBlk = TARRAY2_GET_PTR(writer->ctx->dataBlkArray, writer->ctx->dataBlkArrayIdx);
TSDBKEY key = TSDBROW_KEY(row);
SDataBlk dataBlk1[1] = {{
.minKey = key,
@ -770,14 +833,14 @@ _exit:
return code;
}
static int32_t tsdbDataFileWriteFooter(SDataFileWriter *writer) {
static int32_t tsdbDataFileWriteHeadFooter(SDataFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->files[TSDB_FTYPE_HEAD].size,
(const uint8_t *)writer->footer, sizeof(SDataFooter));
(const uint8_t *)writer->headFooter, sizeof(SHeadFooter));
TSDB_CHECK_CODE(code, lino, _exit);
writer->files[TSDB_FTYPE_HEAD].size += sizeof(SDataFooter);
writer->files[TSDB_FTYPE_HEAD].size += sizeof(SHeadFooter);
_exit:
if (code) {
@ -847,19 +910,99 @@ _exit:
}
static int32_t tsdbDataFileDoWriteTombBlk(SDataFileWriter *writer) {
if (writer->fd[TSDB_FTYPE_TOMB] == NULL) return 0;
int32_t code = 0;
int32_t lino = 0;
writer->footer->tombBlkPtr->offset = writer->files[TSDB_FTYPE_TOMB].size;
writer->footer->tombBlkPtr->size = TARRAY2_DATA_LEN(writer->tombBlkArray);
int32_t ftype = TSDB_FTYPE_TOMB;
writer->tombFooter->tombBlkPtr->offset = writer->files[ftype].size;
writer->tombFooter->tombBlkPtr->size = TARRAY2_DATA_LEN(writer->tombBlkArray);
if (writer->footer->tombBlkPtr->size) {
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_TOMB], writer->footer->tombBlkPtr->offset,
(const uint8_t *)TARRAY2_DATA(writer->tombBlkArray), writer->footer->tombBlkPtr->size);
if (writer->tombFooter->tombBlkPtr->size) {
code = tsdbWriteFile(writer->fd[ftype], writer->tombFooter->tombBlkPtr->offset,
(const uint8_t *)TARRAY2_DATA(writer->tombBlkArray), writer->tombFooter->tombBlkPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
writer->files[ftype].size += writer->tombFooter->tombBlkPtr->size;
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbDataFileWriteTombFooter(SDataFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_TOMB], writer->files[TSDB_FTYPE_TOMB].size,
(const uint8_t *)writer->tombFooter, sizeof(STombFooter));
TSDB_CHECK_CODE(code, lino, _exit);
writer->files[TSDB_FTYPE_TOMB].size += sizeof(STombFooter);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STombRecord *record) {
int32_t code = 0;
int32_t lino = 0;
while (writer->ctx->hasOldTomb) {
for (; writer->ctx->iRowTomb < TOMB_BLOCK_SIZE(writer->ctx->tData); writer->ctx->iRowTomb++) {
STombRecord record1[1] = {{
.suid = TARRAY2_GET(writer->ctx->tData->suid, writer->ctx->iRowTomb),
.uid = TARRAY2_GET(writer->ctx->tData->uid, writer->ctx->iRowTomb),
.version = TARRAY2_GET(writer->ctx->tData->version, writer->ctx->iRowTomb),
.skey = TARRAY2_GET(writer->ctx->tData->skey, writer->ctx->iRowTomb),
.ekey = TARRAY2_GET(writer->ctx->tData->ekey, writer->ctx->iRowTomb),
}};
int32_t c = tTombRecordCmpr(record, record1);
if (c < 0) {
break;
} else if (c > 0) {
code = tTombBlockPut(writer->tData, record1);
TSDB_CHECK_CODE(code, lino, _exit);
if (TOMB_BLOCK_SIZE(writer->tData) >= writer->config->maxRow) {
code = tsdbDataFileDoWriteTombBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
ASSERT(0);
}
}
if (writer->ctx->tombBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->tombBlkArray)) {
writer->ctx->hasOldTomb = false;
break;
}
for (; writer->ctx->tombBlkArrayIdx < TARRAY2_SIZE(writer->ctx->tombBlkArray); ++writer->ctx->tombBlkArrayIdx) {
const STombBlk *tombBlk = TARRAY2_GET_PTR(writer->ctx->tombBlkArray, writer->ctx->tombBlkArrayIdx);
code = tsdbDataFileReadTombBlock(writer->ctx->reader, tombBlk, writer->ctx->tData);
TSDB_CHECK_CODE(code, lino, _exit);
writer->ctx->iRowTomb = 0;
writer->ctx->tombBlkArrayIdx++;
break;
}
}
_write:
if (record->suid == INT64_MAX) goto _exit;
code = tTombBlockPut(writer->tData, record);
TSDB_CHECK_CODE(code, lino, _exit);
if (TOMB_BLOCK_SIZE(writer->tData) >= writer->config->maxRow) {
code = tsdbDataFileDoWriteTombBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
writer->files[TSDB_FTYPE_TOMB].size += writer->footer->tombBlkPtr->size;
}
_exit:
@ -872,7 +1015,15 @@ _exit:
static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArray *opArr) {
int32_t code = 0;
int32_t lino = 0;
TABLEID tbid[1] = {{INT64_MAX, INT64_MAX}};
int32_t ftype;
STFileOp op;
if (writer->fd[TSDB_FTYPE_HEAD]) {
TABLEID tbid[1] = {{
.suid = INT64_MAX,
.uid = INT64_MAX,
}};
code = tsdbDataFileWriteTableDataEnd(writer);
TSDB_CHECK_CODE(code, lino, _exit);
@ -883,18 +1034,9 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
code = tsdbDataFileWriteBlockIdx(writer);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDataFileDoWriteTombBlock(writer);
code = tsdbDataFileWriteHeadFooter(writer);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDataFileDoWriteTombBlk(writer);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDataFileWriteFooter(writer);
TSDB_CHECK_CODE(code, lino, _exit);
STFileOp op;
int32_t ftype;
// .head
ftype = TSDB_FTYPE_HEAD;
if (writer->config->files[ftype].exist) {
@ -903,7 +1045,6 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.fid = writer->config->fid,
.of = writer->config->files[ftype].file,
};
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -917,14 +1058,12 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
// .data
ftype = TSDB_FTYPE_DATA;
if (writer->fd[ftype]) {
if (!writer->config->files[ftype].exist) {
op = (STFileOp){
.optype = TSDB_FOP_CREATE,
.fid = writer->config->fid,
.nf = writer->files[ftype],
};
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
} else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
@ -934,22 +1073,18 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.of = writer->config->files[ftype].file,
.nf = writer->files[ftype],
};
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// .sma
ftype = TSDB_FTYPE_SMA;
if (writer->fd[ftype]) {
if (!writer->config->files[ftype].exist) {
op = (STFileOp){
.optype = TSDB_FOP_CREATE,
.fid = writer->config->fid,
.nf = writer->files[ftype],
};
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
} else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
@ -959,21 +1094,56 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
.of = writer->config->files[ftype].file,
.nf = writer->files[ftype],
};
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// .tomb (TODO)
ftype = TSDB_FTYPE_TOMB;
if (writer->fd[TSDB_FTYPE_TOMB]) {
STombRecord record[1] = {{
.suid = INT64_MAX,
.uid = INT64_MAX,
.version = INT64_MAX,
}};
code = tsdbDataFileDoWriteTombRecord(writer, record);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDataFileDoWriteTombBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDataFileDoWriteTombBlk(writer);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDataFileWriteTombFooter(writer);
TSDB_CHECK_CODE(code, lino, _exit);
ftype = TSDB_FTYPE_SMA;
if (writer->config->files[ftype].exist) {
op = (STFileOp){
.optype = TSDB_FOP_REMOVE,
.fid = writer->config->fid,
.of = writer->config->files[ftype].file,
};
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
op = (STFileOp){
.optype = TSDB_FOP_CREATE,
.fid = writer->config->fid,
.nf = writer->files[ftype],
};
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
if (!writer->fd[i]) continue;
if (writer->fd[i]) {
code = tsdbFsyncFile(writer->fd[i]);
TSDB_CHECK_CODE(code, lino, _exit);
tsdbCloseFile(&writer->fd[i]);
}
}
_exit:
if (code) {
@ -999,7 +1169,7 @@ static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) {
}
tsdbTFileName(writer->config->tsdb, &writer->files[ftype], fname);
code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[i]);
code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]);
TSDB_CHECK_CODE(code, lino, _exit);
if (writer->files[ftype].size == 0) {
@ -1012,6 +1182,11 @@ static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) {
}
}
if (writer->ctx->reader) {
code = tsdbDataFileReadBlockIdx(writer->ctx->reader, &writer->ctx->blockIdxArray);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
@ -1060,8 +1235,7 @@ int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SRowInfo *row) {
TSDB_CHECK_CODE(code, lino, _exit);
}
// open FD
if (!writer->fd[TSDB_FTYPE_DATA]) {
if (writer->fd[TSDB_FTYPE_HEAD] == NULL) {
code = tsdbDataFileWriterOpenDataFD(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -1144,30 +1318,33 @@ static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) {
int32_t lino = 0;
char fname[TSDB_FILENAME_LEN];
int32_t ftypes[2] = {TSDB_FTYPE_HEAD, TSDB_FTYPE_TOMB};
int32_t ftype = TSDB_FTYPE_TOMB;
for (int32_t i = 0; i < ARRAY_SIZE(ftypes); ++i) {
int32_t ftype = ftypes[i];
ASSERT(writer->files[ftype].size == 0);
if (writer->fd[ftype]) continue;
int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
int32_t flag = (TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
tsdbTFileName(writer->config->tsdb, writer->files + ftype, fname);
code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]);
TSDB_CHECK_CODE(code, lino, _exit);
if (writer->files[ftype].size == 0) {
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
code = tsdbWriteFile(writer->fd[ftype], 0, hdr, TSDB_FHDR_SIZE);
TSDB_CHECK_CODE(code, lino, _exit);
writer->files[ftype].size += TSDB_FHDR_SIZE;
code = tsdbWriteFile(writer->fd[i], 0, hdr, TSDB_FHDR_SIZE);
if (writer->ctx->reader) {
code = tsdbDataFileReadTombBlk(writer->ctx->reader, &writer->ctx->tombBlkArray);
TSDB_CHECK_CODE(code, lino, _exit);
writer->files[i].size += TSDB_FHDR_SIZE;
}
if (TARRAY2_SIZE(writer->ctx->tombBlkArray) > 0) {
writer->ctx->hasOldTomb = true;
}
// Open iter (TODO)
writer->ctx->tombBlkArrayIdx = 0;
tTombBlockClear(writer->ctx->tData);
writer->ctx->iRowTomb = 0;
}
_exit:
if (code) {
@ -1180,26 +1357,19 @@ int32_t tsdbDataFileWriteTombRecord(SDataFileWriter *writer, const STombRecord *
int32_t code = 0;
int32_t lino = 0;
#if 1
if (!writer->ctx->opened) {
code = tsdbDataFileWriterDoOpen(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (!writer->fd[TSDB_FTYPE_TOMB]) {
if (writer->fd[TSDB_FTYPE_TOMB] == NULL) {
code = tsdbDataFileWriterOpenTombFD(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
#endif
code = tTombBlockPut(writer->tData, record);
code = tsdbDataFileDoWriteTombRecord(writer, record);
TSDB_CHECK_CODE(code, lino, _exit);
if (TOMB_BLOCK_SIZE(writer->tData) >= writer->config->maxRow) {
code = tsdbDataFileDoWriteTombBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);

View File

@ -50,7 +50,6 @@ static int32_t tsdbSttSegReaderOpen(SSttFileReader *reader, int64_t offset, SStt
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(reader->config->tsdb->pVnode);
segReader[0] = taosMemoryCalloc(1, sizeof(*segReader[0]));
if (!segReader[0]) return TSDB_CODE_OUT_OF_MEMORY;
@ -61,7 +60,7 @@ static int32_t tsdbSttSegReaderOpen(SSttFileReader *reader, int64_t offset, SStt
_exit:
if (code) {
TSDB_ERROR_LOG(vid, lino, code);
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
taosMemoryFree(segReader[0]);
segReader[0] = NULL;
}
@ -82,7 +81,6 @@ static int32_t tsdbSttSegReaderClose(SSttSegReader **reader) {
int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *config, SSttFileReader **reader) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(config->tsdb->pVnode);
reader[0] = taosMemoryCalloc(1, sizeof(*reader[0]));
if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
@ -112,7 +110,7 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con
_exit:
if (code) {
TSDB_ERROR_LOG(vid, lino, code);
TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code);
tsdbSttFileReaderClose(reader);
}
return code;
@ -720,7 +718,6 @@ int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter
int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArray *opArray) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(writer[0]->config->tsdb->pVnode);
if (writer[0]->ctx->opened) {
if (abort) {
@ -737,7 +734,7 @@ int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArr
_exit:
if (code) {
TSDB_ERROR_LOG(vid, lino, code);
TSDB_ERROR_LOG(TD_VID(writer[0]->config->tsdb->pVnode), lino, code);
}
return code;
}
@ -745,7 +742,6 @@ _exit:
int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(writer->config->tsdb->pVnode);
if (!writer->ctx->opened) {
code = tsdbSttFWriterDoOpen(writer);
@ -833,7 +829,7 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
_exit:
if (code) {
TSDB_ERROR_LOG(vid, lino, code);
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
}
return code;
}

View File

@ -45,6 +45,16 @@ int32_t tTombBlockPut(STombBlock *delBlock, const STombRecord *delRecord) {
return 0;
}
int32_t tTombRecordCmpr(const STombRecord *r1, const STombRecord *r2) {
if (r1->suid < r2->suid) return -1;
if (r1->suid > r2->suid) return 1;
if (r1->uid < r2->uid) return -1;
if (r1->uid > r2->uid) return 1;
if (r1->version < r2->version) return -1;
if (r1->version > r2->version) return 1;
return 0;
}
// STbStatisBlock ----------
int32_t tStatisBlockInit(STbStatisBlock *statisBlock) {
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->dataArr); ++i) {