refact code
This commit is contained in:
parent
37ae918114
commit
46aa6d4896
|
@ -25,7 +25,7 @@ extern "C" {
|
|||
|
||||
typedef TARRAY2(SSttBlk) TSttBlkArray;
|
||||
typedef TARRAY2(SStatisBlk) TStatisBlkArray;
|
||||
typedef TARRAY2(SDelBlk) TDelBlkArray;
|
||||
typedef TARRAY2(STombBlk) TTombBlkArray;
|
||||
|
||||
// SSttFileReader ==========================================
|
||||
typedef struct SSttFileReader SSttFileReader;
|
||||
|
@ -42,11 +42,11 @@ int32_t tsdbSttFileReaderGetSegReader(SSttFileReader *reader, const TSttSegReade
|
|||
// SSttSegReader
|
||||
int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray);
|
||||
int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray);
|
||||
int32_t tsdbSttFileReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArray);
|
||||
int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **delBlkArray);
|
||||
|
||||
int32_t tsdbSttFileReadDataBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData);
|
||||
int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *sData);
|
||||
int32_t tsdbSttFileReadDelBlock(SSttSegReader *reader, const SDelBlk *delBlk, SDelBlock *dData);
|
||||
int32_t tsdbSttFileReadDelBlock(SSttSegReader *reader, const STombBlk *delBlk, STombBlock *dData);
|
||||
|
||||
struct SSttFileReaderConfig {
|
||||
STsdb *tsdb;
|
||||
|
@ -63,7 +63,7 @@ int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter
|
|||
int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArray *opArray);
|
||||
int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row);
|
||||
int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *pBlockData);
|
||||
int32_t tsdbSttFileWriteDelRecord(SSttFileWriter *writer, const SDelRecord *record);
|
||||
int32_t tsdbSttFileWriteTombRecord(SSttFileWriter *writer, const STombRecord *record);
|
||||
|
||||
struct SSttFileWriterConfig {
|
||||
STsdb *tsdb;
|
||||
|
|
|
@ -34,7 +34,7 @@ typedef union {
|
|||
int64_t skey;
|
||||
int64_t ekey;
|
||||
};
|
||||
} SDelRecord;
|
||||
} STombRecord;
|
||||
|
||||
typedef union {
|
||||
TARRAY2(int64_t) dataArr[DEL_RECORD_NUM_ELEM];
|
||||
|
@ -45,9 +45,9 @@ typedef union {
|
|||
TARRAY2(int64_t) skey[1];
|
||||
TARRAY2(int64_t) ekey[1];
|
||||
};
|
||||
} SDelBlock;
|
||||
} STombBlock;
|
||||
|
||||
typedef struct SDelBlk {
|
||||
typedef struct {
|
||||
int32_t numRec;
|
||||
int32_t size[DEL_RECORD_NUM_ELEM];
|
||||
TABLEID minTid;
|
||||
|
@ -55,14 +55,14 @@ typedef struct SDelBlk {
|
|||
int64_t minVer;
|
||||
int64_t maxVer;
|
||||
SFDataPtr dp[1];
|
||||
} SDelBlk;
|
||||
} STombBlk;
|
||||
|
||||
#define DEL_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid)
|
||||
#define TOMB_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid)
|
||||
|
||||
int32_t tDelBlockInit(SDelBlock *delBlock);
|
||||
int32_t tDelBlockFree(SDelBlock *delBlock);
|
||||
int32_t tDelBlockClear(SDelBlock *delBlock);
|
||||
int32_t tDelBlockPut(SDelBlock *delBlock, const SDelRecord *delRecord);
|
||||
int32_t tTombBlockInit(STombBlock *delBlock);
|
||||
int32_t tTombBlockFree(STombBlock *delBlock);
|
||||
int32_t tTombBlockClear(STombBlock *delBlock);
|
||||
int32_t tTombBlockPut(STombBlock *delBlock, const STombRecord *delRecord);
|
||||
|
||||
// STbStatisBlock ----------
|
||||
#define STATIS_RECORD_NUM_ELEM 9
|
||||
|
|
|
@ -231,8 +231,8 @@ static int32_t tsdbCommitDelData(SCommitter2 *committer) {
|
|||
SRBTreeIter iter[1] = {tRBTreeIterCreate(committer->tsdb->imem->tbDataTree, 1)};
|
||||
|
||||
for (SRBTreeNode *node = tRBTreeIterNext(iter); node; node = tRBTreeIterNext(iter)) {
|
||||
STbData *tbData = TCONTAINER_OF(node, STbData, rbtn);
|
||||
SDelRecord record[1] = {{
|
||||
STbData *tbData = TCONTAINER_OF(node, STbData, rbtn);
|
||||
STombRecord record[1] = {{
|
||||
.suid = tbData->suid,
|
||||
.uid = tbData->uid,
|
||||
}};
|
||||
|
@ -258,7 +258,7 @@ static int32_t tsdbCommitDelData(SCommitter2 *committer) {
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbSttFileWriteDelRecord(committer->sttWriter, record);
|
||||
code = tsdbSttFileWriteTombRecord(committer->sttWriter, record);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,8 @@
|
|||
typedef struct {
|
||||
int64_t prevFooter;
|
||||
SFDataPtr sttBlkPtr[1];
|
||||
SFDataPtr delBlkPtr[1];
|
||||
SFDataPtr statisBlkPtr[1];
|
||||
SFDataPtr tombBlkPtr[1];
|
||||
SFDataPtr rsrvd[2];
|
||||
} SSttFooter;
|
||||
|
||||
|
@ -36,12 +36,12 @@ struct SSttSegReader {
|
|||
SSttFooter footer[1];
|
||||
struct {
|
||||
bool sttBlkLoaded;
|
||||
bool delBlkLoaded;
|
||||
bool statisBlkLoaded;
|
||||
bool tombBlkLoaded;
|
||||
} ctx[1];
|
||||
TSttBlkArray sttBlkArray[1];
|
||||
TStatisBlkArray statisBlkArray[1];
|
||||
TDelBlkArray delBlkArray[1];
|
||||
TTombBlkArray tombBlkArray[1];
|
||||
};
|
||||
|
||||
// SSttFileReader
|
||||
|
@ -71,7 +71,7 @@ _exit:
|
|||
static int32_t tsdbSttSegReaderClose(SSttSegReader **reader) {
|
||||
if (reader[0]) {
|
||||
TARRAY2_FREE(reader[0]->sttBlkArray);
|
||||
TARRAY2_FREE(reader[0]->delBlkArray);
|
||||
TARRAY2_FREE(reader[0]->tombBlkArray);
|
||||
TARRAY2_FREE(reader[0]->statisBlkArray);
|
||||
taosMemoryFree(reader[0]);
|
||||
reader[0] = NULL;
|
||||
|
@ -165,31 +165,31 @@ int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbSttFileReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArray) {
|
||||
if (!reader->ctx->delBlkLoaded) {
|
||||
if (reader->footer->delBlkPtr->size > 0) {
|
||||
ASSERT(reader->footer->delBlkPtr->size % sizeof(SDelBlk) == 0);
|
||||
int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **tombBlkArray) {
|
||||
if (!reader->ctx->tombBlkLoaded) {
|
||||
if (reader->footer->tombBlkPtr->size > 0) {
|
||||
ASSERT(reader->footer->tombBlkPtr->size % sizeof(STombBlk) == 0);
|
||||
|
||||
int32_t size = reader->footer->delBlkPtr->size / sizeof(SDelBlk);
|
||||
void *data = taosMemoryMalloc(reader->footer->delBlkPtr->size);
|
||||
int32_t size = reader->footer->tombBlkPtr->size / sizeof(STombBlk);
|
||||
void *data = taosMemoryMalloc(reader->footer->tombBlkPtr->size);
|
||||
if (!data) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
int32_t code =
|
||||
tsdbReadFile(reader->reader->fd, reader->footer->delBlkPtr->offset, data, reader->footer->delBlkPtr->size);
|
||||
tsdbReadFile(reader->reader->fd, reader->footer->tombBlkPtr->offset, data, reader->footer->tombBlkPtr->size);
|
||||
if (code) {
|
||||
taosMemoryFree(data);
|
||||
return code;
|
||||
}
|
||||
|
||||
TARRAY2_INIT_EX(reader->delBlkArray, size, size, data);
|
||||
TARRAY2_INIT_EX(reader->tombBlkArray, size, size, data);
|
||||
} else {
|
||||
TARRAY2_INIT(reader->delBlkArray);
|
||||
TARRAY2_INIT(reader->tombBlkArray);
|
||||
}
|
||||
|
||||
reader->ctx->delBlkLoaded = true;
|
||||
reader->ctx->tombBlkLoaded = true;
|
||||
}
|
||||
|
||||
delBlkArray[0] = reader->delBlkArray;
|
||||
tombBlkArray[0] = reader->tombBlkArray;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -243,34 +243,34 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbSttFileReadDelBlock(SSttSegReader *reader, const SDelBlk *delBlk, SDelBlock *dData) {
|
||||
int32_t tsdbSttFileReadDelBlock(SSttSegReader *reader, const STombBlk *tombBlk, STombBlock *dData) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
tDelBlockClear(dData);
|
||||
tTombBlockClear(dData);
|
||||
|
||||
code = tRealloc(&reader->reader->config->bufArr[0], delBlk->dp->size);
|
||||
code = tRealloc(&reader->reader->config->bufArr[0], tombBlk->dp->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbReadFile(reader->reader->fd, delBlk->dp->offset, reader->reader->config->bufArr[0], delBlk->dp->size);
|
||||
code = tsdbReadFile(reader->reader->fd, tombBlk->dp->offset, reader->reader->config->bufArr[0], tombBlk->dp->size);
|
||||
if (code) TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
int64_t size = 0;
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(dData->dataArr); ++i) {
|
||||
code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, delBlk->size[i], TSDB_DATA_TYPE_BIGINT,
|
||||
TWO_STAGE_COMP, &reader->reader->config->bufArr[1], sizeof(int64_t) * delBlk->numRec,
|
||||
code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT,
|
||||
TWO_STAGE_COMP, &reader->reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec,
|
||||
&reader->reader->config->bufArr[2]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
for (int32_t j = 0; j < delBlk->numRec; ++j) {
|
||||
for (int32_t j = 0; j < tombBlk->numRec; ++j) {
|
||||
code = TARRAY2_APPEND(&dData->dataArr[i], ((int64_t *)(reader->reader->config->bufArr[1]))[j]);
|
||||
continue;
|
||||
}
|
||||
|
||||
size += delBlk->size[i];
|
||||
size += tombBlk->size[i];
|
||||
}
|
||||
|
||||
ASSERT(size == delBlk->dp->size);
|
||||
ASSERT(size == tombBlk->dp->size);
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
|
||||
|
@ -327,11 +327,11 @@ struct SSttFileWriter {
|
|||
// data
|
||||
TSttBlkArray sttBlkArray[1];
|
||||
TStatisBlkArray statisBlkArray[1];
|
||||
TDelBlkArray delBlkArray[1];
|
||||
TTombBlkArray tombBlkArray[1];
|
||||
SSttFooter footer[1];
|
||||
SBlockData bData[1];
|
||||
STbStatisBlock sData[1];
|
||||
SDelBlock dData[1];
|
||||
STombBlock dData[1];
|
||||
// helper data
|
||||
SSkmInfo skmTb[1];
|
||||
SSkmInfo skmRow[1];
|
||||
|
@ -444,14 +444,14 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) {
|
||||
if (DEL_BLOCK_SIZE(writer->dData) == 0) return 0;
|
||||
static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) {
|
||||
if (TOMB_BLOCK_SIZE(writer->dData) == 0) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
SDelBlk delBlk[1] = {{
|
||||
.numRec = DEL_BLOCK_SIZE(writer->dData),
|
||||
STombBlk tombBlk[1] = {{
|
||||
.numRec = TOMB_BLOCK_SIZE(writer->dData),
|
||||
.minTid =
|
||||
{
|
||||
.suid = TARRAY2_FIRST(writer->dData->suid),
|
||||
|
@ -471,9 +471,9 @@ static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) {
|
|||
},
|
||||
}};
|
||||
|
||||
for (int32_t i = 1; i < DEL_BLOCK_SIZE(writer->dData); i++) {
|
||||
delBlk->minVer = TMIN(delBlk->minVer, TARRAY2_GET(writer->dData->version, i));
|
||||
delBlk->maxVer = TMAX(delBlk->maxVer, TARRAY2_GET(writer->dData->version, i));
|
||||
for (int32_t i = 1; i < TOMB_BLOCK_SIZE(writer->dData); i++) {
|
||||
tombBlk->minVer = TMIN(tombBlk->minVer, TARRAY2_GET(writer->dData->version, i));
|
||||
tombBlk->maxVer = TMAX(tombBlk->maxVer, TARRAY2_GET(writer->dData->version, i));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(writer->dData->dataArr); i++) {
|
||||
|
@ -486,15 +486,15 @@ static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) {
|
|||
code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[0], size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
delBlk->size[i] = size;
|
||||
delBlk->dp[0].size += size;
|
||||
tombBlk->size[i] = size;
|
||||
tombBlk->dp[0].size += size;
|
||||
writer->file->size += size;
|
||||
}
|
||||
|
||||
code = TARRAY2_APPEND_PTR(writer->delBlkArray, delBlk);
|
||||
code = TARRAY2_APPEND_PTR(writer->tombBlkArray, tombBlk);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
tDelBlockClear(writer->dData);
|
||||
tTombBlockClear(writer->dData);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -545,18 +545,18 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSttFileDoWriteDelBlk(SSttFileWriter *writer) {
|
||||
static int32_t tsdbSttFileDoWriteTombBlk(SSttFileWriter *writer) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
writer->footer->delBlkPtr->offset = writer->file->size;
|
||||
writer->footer->delBlkPtr->size = TARRAY2_DATA_LEN(writer->delBlkArray);
|
||||
writer->footer->tombBlkPtr->offset = writer->file->size;
|
||||
writer->footer->tombBlkPtr->size = TARRAY2_DATA_LEN(writer->tombBlkArray);
|
||||
|
||||
if (writer->footer->delBlkPtr->size) {
|
||||
code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->delBlkArray),
|
||||
writer->footer->delBlkPtr->size);
|
||||
if (writer->footer->tombBlkPtr->size) {
|
||||
code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->tombBlkArray),
|
||||
writer->footer->tombBlkPtr->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
writer->file->size += writer->footer->delBlkPtr->size;
|
||||
writer->file->size += writer->footer->tombBlkPtr->size;
|
||||
}
|
||||
|
||||
_exit:
|
||||
|
@ -624,9 +624,9 @@ static void tsdbSttFWriterDoClose(SSttFileWriter *writer) {
|
|||
tDestroyTSchema(writer->skmRow->pTSchema);
|
||||
tDestroyTSchema(writer->skmTb->pTSchema);
|
||||
tStatisBlockFree(writer->sData);
|
||||
tDelBlockFree(writer->dData);
|
||||
tTombBlockFree(writer->dData);
|
||||
tBlockDataDestroy(writer->bData);
|
||||
TARRAY2_FREE(writer->delBlkArray);
|
||||
TARRAY2_FREE(writer->tombBlkArray);
|
||||
TARRAY2_FREE(writer->statisBlkArray);
|
||||
TARRAY2_FREE(writer->sttBlkArray);
|
||||
}
|
||||
|
@ -646,7 +646,7 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o
|
|||
code = tsdbSttFileDoWriteStatisBlock(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbSttFileDoWriteDelBlock(writer);
|
||||
code = tsdbSttFileDoWriteTombBlock(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbSttFileDoWriteSttBlk(writer);
|
||||
|
@ -655,7 +655,7 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o
|
|||
code = tsdbSttFileDoWriteStatisBlk(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbSttFileDoWriteDelBlk(writer);
|
||||
code = tsdbSttFileDoWriteTombBlk(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbSttFileDoWriteFooter(writer);
|
||||
|
@ -851,7 +851,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbSttFileWriteDelRecord(SSttFileWriter *writer, const SDelRecord *record) {
|
||||
int32_t tsdbSttFileWriteTombRecord(SSttFileWriter *writer, const STombRecord *record) {
|
||||
int32_t code;
|
||||
int32_t lino;
|
||||
|
||||
|
@ -872,12 +872,12 @@ int32_t tsdbSttFileWriteDelRecord(SSttFileWriter *writer, const SDelRecord *reco
|
|||
}
|
||||
|
||||
// write SDelRecord
|
||||
code = tDelBlockPut(writer->dData, record);
|
||||
code = tTombBlockPut(writer->dData, record);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// write SDelBlock if need
|
||||
if (DEL_BLOCK_SIZE(writer->dData) >= writer->config->maxRow) {
|
||||
code = tsdbSttFileDoWriteDelBlock(writer);
|
||||
if (TOMB_BLOCK_SIZE(writer->dData) >= writer->config->maxRow) {
|
||||
code = tsdbSttFileDoWriteTombBlock(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
|
|
|
@ -16,28 +16,28 @@
|
|||
#include "dev.h"
|
||||
|
||||
// SDelBlock ----------
|
||||
int32_t tDelBlockInit(SDelBlock *delBlock) {
|
||||
int32_t tTombBlockInit(STombBlock *delBlock) {
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(delBlock->dataArr); ++i) {
|
||||
TARRAY2_INIT(&delBlock->dataArr[i]);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDelBlockFree(SDelBlock *delBlock) {
|
||||
int32_t tTombBlockFree(STombBlock *delBlock) {
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(delBlock->dataArr); ++i) {
|
||||
TARRAY2_FREE(&delBlock->dataArr[i]);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDelBlockClear(SDelBlock *delBlock) {
|
||||
int32_t tTombBlockClear(STombBlock *delBlock) {
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(delBlock->dataArr); ++i) {
|
||||
TARRAY2_CLEAR(&delBlock->dataArr[i], NULL);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDelBlockPut(SDelBlock *delBlock, const SDelRecord *delRecord) {
|
||||
int32_t tTombBlockPut(STombBlock *delBlock, const STombRecord *delRecord) {
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(delBlock->dataArr); ++i) {
|
||||
int32_t code = TARRAY2_APPEND(&delBlock->dataArr[i], delRecord->aData[i]);
|
||||
if (code) return code;
|
||||
|
|
Loading…
Reference in New Issue