more code
This commit is contained in:
parent
fb64bcf540
commit
8afa2c5354
|
@ -36,7 +36,7 @@ typedef struct {
|
|||
struct SSttFWriter *pWriter;
|
||||
} SCommitter;
|
||||
|
||||
static int32_t tsdbCommitOpenWriter(SCommitter *pCommitter) {
|
||||
static int32_t open_committer_writer(SCommitter *pCommitter) {
|
||||
int32_t code;
|
||||
int32_t lino;
|
||||
|
||||
|
@ -82,7 +82,7 @@ static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDB
|
|||
int32_t lino;
|
||||
|
||||
if (pCommitter->pWriter == NULL) {
|
||||
code = tsdbCommitOpenWriter(pCommitter);
|
||||
code = open_committer_writer(pCommitter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
|
|
|
@ -21,12 +21,14 @@ extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf,
|
|||
extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size);
|
||||
extern int32_t tsdbFsyncFile(STsdbFD *pFD);
|
||||
|
||||
typedef struct SFDataPtr {
|
||||
int64_t offset;
|
||||
int64_t size;
|
||||
} SFDataPtr;
|
||||
|
||||
typedef struct {
|
||||
struct {
|
||||
int64_t offset;
|
||||
int64_t size;
|
||||
} dict[4]; // 0:bloom filter, 1:SSttBlk, 2:SDelBlk, 3:STbStatisBlk
|
||||
uint8_t reserved[32];
|
||||
SFDataPtr dict[4]; // 0:bloom filter, 1:SSttBlk, 2:SDelBlk, 3:STbStatisBlk
|
||||
uint8_t reserved[32];
|
||||
} SFSttFooter;
|
||||
|
||||
struct SSttFWriter {
|
||||
|
@ -153,15 +155,38 @@ static int32_t create_stt_fwriter(const struct SSttFWriterConf *pConf, struct SS
|
|||
ppWriter[0]->config.aBuf = ppWriter[0]->aBuf;
|
||||
}
|
||||
|
||||
// time-series data block
|
||||
tBlockDataCreate(&ppWriter[0]->bData);
|
||||
ppWriter[0]->aSttBlk = taosArrayInit(64, sizeof(SSttBlk));
|
||||
if (ppWriter[0]->aSttBlk == NULL) {
|
||||
if ((ppWriter[0]->aSttBlk = taosArrayInit(64, sizeof(SSttBlk))) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// deleted data block
|
||||
if ((code = tDelBlockCreate(&ppWriter[0]->dData, pConf->maxRow))) goto _exit;
|
||||
if ((ppWriter[0]->aDelBlk = taosArrayInit(64, sizeof(SDelBlk))) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// statistics data block
|
||||
if ((code = tTbStatisBlockCreate(&ppWriter[0]->sData, pConf->maxRow))) goto _exit;
|
||||
if ((ppWriter[0]->aStatisBlk = taosArrayInit(64, sizeof(STbStatisBlock))) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// TODO: bloom filter
|
||||
|
||||
_exit:
|
||||
if (code && ppWriter[0]) {
|
||||
// statistics data block
|
||||
taosArrayDestroy(ppWriter[0]->aStatisBlk);
|
||||
tTbStatisBlockDestroy(&ppWriter[0]->sData);
|
||||
// deleted data block
|
||||
taosArrayDestroy(ppWriter[0]->aDelBlk);
|
||||
tDelBlockDestroy(&ppWriter[0]->dData);
|
||||
// time-series data block
|
||||
taosArrayDestroy(ppWriter[0]->aSttBlk);
|
||||
tBlockDataDestroy(&ppWriter[0]->bData);
|
||||
taosMemoryFree(ppWriter[0]);
|
||||
|
@ -172,9 +197,16 @@ _exit:
|
|||
|
||||
static int32_t destroy_stt_fwriter(struct SSttFWriter *pWriter) {
|
||||
if (pWriter) {
|
||||
tDestroyTSchema(pWriter->skmTb.pTSchema);
|
||||
for (int32_t i = 0; ARRAY_SIZE(pWriter->aBuf); i++) tFree(pWriter->aBuf[i]);
|
||||
tDestroyTSchema(pWriter->skmRow.pTSchema);
|
||||
for (int32_t i = 0; i < sizeof(pWriter->aBuf) / sizeof(pWriter->aBuf[0]); i++) taosMemoryFree(pWriter->aBuf[i]);
|
||||
tDestroyTSchema(pWriter->skmTb.pTSchema);
|
||||
// statistics data block
|
||||
taosArrayDestroy(pWriter->aStatisBlk);
|
||||
tTbStatisBlockDestroy(&pWriter->sData);
|
||||
// deleted data block
|
||||
taosArrayDestroy(pWriter->aDelBlk);
|
||||
tDelBlockDestroy(&pWriter->dData);
|
||||
// time-series data block
|
||||
taosArrayDestroy(pWriter->aSttBlk);
|
||||
tBlockDataDestroy(&pWriter->bData);
|
||||
taosMemoryFree(pWriter);
|
||||
|
@ -186,7 +218,7 @@ static int32_t open_stt_fwriter(struct SSttFWriter *pWriter) {
|
|||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; // TODO
|
||||
int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
|
||||
|
||||
code = tsdbOpenFile(pWriter->config.file.fname, pWriter->config.szPage, flag, &pWriter->pFd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
@ -217,10 +249,7 @@ int32_t tsdbSttFWriterOpen(const struct SSttFWriterConf *pConf, struct SSttFWrit
|
|||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pConf->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
if (ppWriter[0]) {
|
||||
taosMemoryFree(ppWriter[0]);
|
||||
ppWriter[0] = NULL;
|
||||
}
|
||||
if (ppWriter[0]) destroy_stt_fwriter(ppWriter[0]);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -33,11 +33,11 @@ int32_t tsdbSttFWriteDLData(struct SSttFWriter *pWriter, TABLEID *tbid, SDelData
|
|||
struct SSttFWriterConf {
|
||||
STsdb *pTsdb;
|
||||
struct STFile file;
|
||||
SSkmInfo *pSkmTb;
|
||||
SSkmInfo *pSkmRow;
|
||||
int32_t maxRow;
|
||||
int32_t szPage;
|
||||
int8_t cmprAlg;
|
||||
SSkmInfo *pSkmTb;
|
||||
SSkmInfo *pSkmRow;
|
||||
uint8_t **aBuf;
|
||||
};
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "dev.h"
|
||||
|
||||
// SDelBlock ----------
|
||||
int32_t tDelBlockCreate(SDelBlock *pDelBlock, int32_t capacity) {
|
||||
int32_t code;
|
||||
|
||||
|
@ -53,6 +54,33 @@ int32_t tDelBlockAppend(SDelBlock *pDelBlock, const TABLEID *tbid, const SDelDat
|
|||
return 0;
|
||||
}
|
||||
|
||||
// STbStatisBlock ----------
|
||||
|
||||
int32_t tTbStatisBlockCreate(STbStatisBlock *pTbStatisBlock, int32_t capacity) {
|
||||
memset(pTbStatisBlock, 0, sizeof(*pTbStatisBlock));
|
||||
pTbStatisBlock->capacity = capacity;
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(pTbStatisBlock->aData); ++i) {
|
||||
if (tRealloc((uint8_t **)&pTbStatisBlock->aData[i], sizeof(int64_t) * capacity)) {
|
||||
for (i--; i >= 0; --i) tFree(pTbStatisBlock->aData[i]);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tTbStatisBlockDestroy(STbStatisBlock *pTbStatisBlock) {
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(pTbStatisBlock->aData); ++i) {
|
||||
tFree(pTbStatisBlock->aData[i]);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tTbStatisBlockClear(STbStatisBlock *pTbStatisBlock) {
|
||||
pTbStatisBlock->nRow = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// other apis ----------
|
||||
int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb) {
|
||||
if (tbid->suid) {
|
||||
if (pSkmTb->suid == tbid->suid) {
|
||||
|
|
|
@ -22,18 +22,24 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
/* Exposed Handle */
|
||||
typedef struct SDelBlock SDelBlock;
|
||||
typedef struct SDelBlk SDelBlk;
|
||||
typedef struct STbStatisBlock STbStatisBlock;
|
||||
typedef struct STbStatisBlk STbStatisBlk;
|
||||
// SDelBlock ----------
|
||||
typedef struct SDelBlock SDelBlock;
|
||||
typedef struct SDelBlk SDelBlk;
|
||||
|
||||
/* Exposed APIs */
|
||||
int32_t tDelBlockCreate(SDelBlock *pDelBlock, int32_t capacity);
|
||||
int32_t tDelBlockDestroy(SDelBlock *pDelBlock);
|
||||
int32_t tDelBlockClear(SDelBlock *pDelBlock);
|
||||
int32_t tDelBlockAppend(SDelBlock *pDelBlock, const TABLEID *tbid, const SDelData *pDelData);
|
||||
|
||||
// STbStatisBlock ----------
|
||||
typedef struct STbStatisBlock STbStatisBlock;
|
||||
typedef struct STbStatisBlk STbStatisBlk;
|
||||
|
||||
int32_t tTbStatisBlockCreate(STbStatisBlock *pTbStatisBlock, int32_t capacity);
|
||||
int32_t tTbStatisBlockDestroy(STbStatisBlock *pTbStatisBlock);
|
||||
int32_t tTbStatisBlockClear(STbStatisBlock *pTbStatisBlock);
|
||||
|
||||
// other apis
|
||||
int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb);
|
||||
int32_t tsdbUpdateSkmRow(STsdb *pTsdb, const TABLEID *tbid, int32_t sver, SSkmInfo *pSkmRow);
|
||||
|
||||
|
|
Loading…
Reference in New Issue