more code

This commit is contained in:
Hongze Cheng 2023-03-24 14:24:34 +08:00
parent 06db212d66
commit 873c9705fe
5 changed files with 170 additions and 192 deletions

View File

@ -1,75 +1,83 @@
# vnode # vnode
add_library(vnode STATIC "") add_library(vnode STATIC "")
set(
VNODE_SOURCE_FILES
"src/vnd/vnodeOpen.c"
"src/vnd/vnodeBufPool.c"
"src/vnd/vnodeCfg.c"
"src/vnd/vnodeCommit.c"
"src/vnd/vnodeQuery.c"
"src/vnd/vnodeModule.c"
"src/vnd/vnodeSvr.c"
"src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c"
"src/vnd/vnodeRetention.c"
# meta
"src/meta/metaOpen.c"
"src/meta/metaIdx.c"
"src/meta/metaTable.c"
"src/meta/metaSma.c"
"src/meta/metaQuery.c"
"src/meta/metaCommit.c"
"src/meta/metaEntry.c"
"src/meta/metaSnapshot.c"
"src/meta/metaCache.c"
# sma
"src/sma/smaEnv.c"
"src/sma/smaUtil.c"
"src/sma/smaFS.c"
"src/sma/smaOpen.c"
"src/sma/smaCommit.c"
"src/sma/smaRollup.c"
"src/sma/smaSnapshot.c"
"src/sma/smaTimeRange.c"
# tsdb
"src/tsdb/tsdbCommit.c"
"src/tsdb/tsdbFile.c"
"src/tsdb/tsdbFS.c"
"src/tsdb/tsdbOpen.c"
"src/tsdb/tsdbMemTable.c"
"src/tsdb/tsdbRead.c"
"src/tsdb/tsdbCache.c"
"src/tsdb/tsdbWrite.c"
"src/tsdb/tsdbReaderWriter.c"
"src/tsdb/tsdbUtil.c"
"src/tsdb/tsdbSnapshot.c"
"src/tsdb/tsdbCacheRead.c"
"src/tsdb/tsdbRetention.c"
"src/tsdb/tsdbDiskData.c"
"src/tsdb/tsdbMergeTree.c"
"src/tsdb/tsdbDataIter.c"
# tq
"src/tq/tq.c"
"src/tq/tqExec.c"
"src/tq/tqMeta.c"
"src/tq/tqRead.c"
"src/tq/tqOffset.c"
"src/tq/tqPush.c"
"src/tq/tqSink.c"
"src/tq/tqCommit.c"
"src/tq/tqSnapshot.c"
"src/tq/tqOffsetSnapshot.c"
)
if (USE_DEV_CODE)
aux_source_directory("src/tsdb/dev" VNODE_SOURCE_DEV_FILES)
list(
APPEND
VNODE_SOURCE_FILES
${VNODE_SOURCE_DEV_FILES}
)
endif(USE_DEV_CODE)
target_sources( target_sources(
vnode vnode
PRIVATE PRIVATE
${VNODE_SOURCE_FILES}
# vnode
"src/vnd/vnodeOpen.c"
"src/vnd/vnodeBufPool.c"
"src/vnd/vnodeCfg.c"
"src/vnd/vnodeCommit.c"
"src/vnd/vnodeQuery.c"
"src/vnd/vnodeModule.c"
"src/vnd/vnodeSvr.c"
"src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c"
"src/vnd/vnodeRetention.c"
# meta
"src/meta/metaOpen.c"
"src/meta/metaIdx.c"
"src/meta/metaTable.c"
"src/meta/metaSma.c"
"src/meta/metaQuery.c"
"src/meta/metaCommit.c"
"src/meta/metaEntry.c"
"src/meta/metaSnapshot.c"
"src/meta/metaCache.c"
# sma
"src/sma/smaEnv.c"
"src/sma/smaUtil.c"
"src/sma/smaFS.c"
"src/sma/smaOpen.c"
"src/sma/smaCommit.c"
"src/sma/smaRollup.c"
"src/sma/smaSnapshot.c"
"src/sma/smaTimeRange.c"
# tsdb
"src/tsdb/tsdbCommit.c"
"src/tsdb/tsdbFile.c"
"src/tsdb/tsdbFS.c"
"src/tsdb/tsdbOpen.c"
"src/tsdb/tsdbMemTable.c"
"src/tsdb/tsdbRead.c"
"src/tsdb/tsdbCache.c"
"src/tsdb/tsdbWrite.c"
"src/tsdb/tsdbReaderWriter.c"
"src/tsdb/tsdbUtil.c"
"src/tsdb/tsdbSnapshot.c"
"src/tsdb/tsdbCacheRead.c"
"src/tsdb/tsdbRetention.c"
"src/tsdb/tsdbDiskData.c"
"src/tsdb/tsdbMergeTree.c"
"src/tsdb/tsdbDataIter.c"
# # dev
"src/tsdb/dev/tsdbCommit.c"
"src/tsdb/dev/tsdbMerge.c"
"src/tsdb/dev/tsdbSttFWriter.c"
# tq
"src/tq/tq.c"
"src/tq/tqExec.c"
"src/tq/tqMeta.c"
"src/tq/tqRead.c"
"src/tq/tqOffset.c"
"src/tq/tqPush.c"
"src/tq/tqSink.c"
"src/tq/tqCommit.c"
"src/tq/tqSnapshot.c"
"src/tq/tqOffsetSnapshot.c"
) )
IF (TD_VNODE_PLUGINS) IF (TD_VNODE_PLUGINS)

View File

@ -16,34 +16,69 @@
#include "tsdbSttFWriter.h" #include "tsdbSttFWriter.h"
#include "tsdbUtil.h" #include "tsdbUtil.h"
extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD);
extern void tsdbCloseFile(STsdbFD **ppFD);
extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size);
extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size);
extern int32_t tsdbFsyncFile(STsdbFD *pFD);
struct SSttFWriter { struct SSttFWriter {
SSttFWriterConf conf; SSttFWriterConf config;
SBlockData bData; // time-series data
SDelBlock dData; SBlockData bData;
SArray *aSttBlk; // SArray<SSttBlk> SSttBlk sttBlk;
SArray *aDelBlk; // SArray<SDelBlk> SArray *aSttBlk; // SArray<SSttBlk>
SSkmInfo skmTb; // tombstone data
SSkmInfo skmRow; SDelBlock dData;
STsdbFD *pFd; SArray *aDelBlk; // SArray<SDelBlk>
// helper data
SSkmInfo skmTb;
SSkmInfo skmRow;
STsdbFD *pFd;
}; };
static int32_t tsdbSttFWriteTSBlock(SSttFWriter *pWriter) {
int32_t code = 0;
int32_t lino;
SBlockData *pBData = &pWriter->bData;
// compress data block
code = tCmprBlockData(pBData, pWriter->config.cmprAlg, NULL, NULL, NULL /* TODO */, NULL /* TODO */);
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_NULL(taosArrayPush(pWriter->aSttBlk, &pWriter->sttBlk), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
tBlockDataClear(pBData);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino,
tstrerror(code));
}
return code;
}
int32_t tsdbSttFWriterOpen(const SSttFWriterConf *pConf, SSttFWriter **ppWriter) { int32_t tsdbSttFWriterOpen(const SSttFWriterConf *pConf, SSttFWriter **ppWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino;
ppWriter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter)); if ((ppWriter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter))) == NULL) {
if (ppWriter[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
ppWriter[0]->conf = pConf[0]; ppWriter[0]->config = pConf[0];
if (ppWriter[0]->conf.pSkmTb == NULL) ppWriter[0]->conf.pSkmTb = &ppWriter[0]->skmTb; if (ppWriter[0]->config.pSkmTb == NULL) ppWriter[0]->config.pSkmTb = &ppWriter[0]->skmTb;
if (ppWriter[0]->conf.pSkmRow == NULL) ppWriter[0]->conf.pSkmRow = &ppWriter[0]->skmRow; if (ppWriter[0]->config.pSkmRow == NULL) ppWriter[0]->config.pSkmRow = &ppWriter[0]->skmRow;
tBlockDataCreate(&ppWriter[0]->bData); tBlockDataCreate(&ppWriter[0]->bData);
// tDelBlockCreate(&ppWriter[0]->dData); // tDelBlockCreate(&ppWriter[0]->dData);
int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; // TODO
code = tsdbOpenFile(NULL /* TODO */, pConf->szPage, flag, &ppWriter[0]->pFd);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pConf->pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pConf->pTsdb->pVnode), __func__, lino, tstrerror(code));
@ -63,7 +98,41 @@ int32_t tsdbSttFWriterClose(SSttFWriter **ppWriter) {
int32_t tsdbSttFWriteTSData(SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow) { int32_t tsdbSttFWriteTSData(SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow) {
int32_t code = 0; int32_t code = 0;
// TODO int32_t lino;
if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) {
if (pWriter->bData.nRow > 0) {
code = tsdbSttFWriteTSBlock(pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
}
// TODO: code = tsdbUpdateTableSchema(pWriter->config.pTsdb, tbid->uid, tbid->suid, pWriter->config.pSkmTb);
TSDB_CHECK_CODE(code, lino, _exit);
TABLEID id = {.suid = tbid->suid, .uid = tbid->suid ? 0 : tbid->uid};
code = tBlockDataInit(&pWriter->bData, &id, pWriter->config.pSkmTb->pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (pRow->type == TSDBROW_ROW_FMT) {
// TODO: code = tsdbUpdateRowSchema(pWriter->config.pTsdb, tbid->uid, tbid->suid, pRow->row,
// pWriter->config.pSkmRow);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataAppendRow(&pWriter->bData, pRow, pWriter->config.pSkmRow->pTSchema, tbid->uid);
TSDB_CHECK_CODE(code, lino, _exit);
if (pWriter->bData.nRow >= pWriter->config.maxRow) {
code = tsdbSttFWriteTSBlock(pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino,
tstrerror(code));
}
return code; return code;
} }
@ -72,107 +141,3 @@ int32_t tsdbSttFWriteDLData(SSttFWriter *pWriter, TABLEID *tbid, SDelData *pDelD
// TODO // TODO
return code; return code;
} }
#if 0
extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD);
extern void tsdbCloseFile(STsdbFD **ppFD);
struct SSttFWriter {
STsdb *pTsdb;
SSttFile file;
SBlockData bData;
SArray *aSttBlk;
STsdbFD *pFd;
SSkmInfo *pSkmTb;
SSkmInfo *pSkmRow;
int32_t maxRow;
};
static int32_t tsdbSttFWriteTSDataBlock(SSttFWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbSttFWriterOpen(STsdb *pTsdb, const SSttFile *pSttFile, SSttFWriter **ppWriter) {
int32_t code = 0;
int32_t lino = 0;
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
int32_t flag = TD_FILE_READ | TD_FILE_WRITE;
char fname[TSDB_FILENAME_LEN];
ppWriter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter));
if (ppWriter[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
ppWriter[0]->pTsdb = pTsdb;
ppWriter[0]->file = pSttFile[0];
tBlockDataCreate(&ppWriter[0]->bData);
ppWriter[0]->aSttBlk = taosArrayInit(64, sizeof(SSttBlk));
if (ppWriter[0]->aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (pSttFile->size == 0) flag |= (TD_FILE_CREATE | TD_FILE_TRUNC);
tsdbSttFileName(pTsdb, (SDiskID){0}, 0, &ppWriter[0]->file, fname); // TODO
code = tsdbOpenFile(fname, szPage, flag, &ppWriter[0]->pFd);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
if (ppWriter[0]) {
tBlockDataDestroy(&ppWriter[0]->bData);
taosArrayDestroy(ppWriter[0]->aSttBlk);
taosMemoryFree(ppWriter[0]);
ppWriter[0] = NULL;
}
}
return 0;
}
int32_t tsdbSttFWriterClose(SSttFWriter *pWriter) {
// TODO
return 0;
}
int32_t tsdbSttFWriteTSData(SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow) {
int32_t code = 0;
int32_t lino = 0;
if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) {
if (pWriter->bData.nRow > 0) {
code = tsdbSttFWriteTSDataBlock(pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataInit(&pWriter->bData, tbid, NULL /* TODO */, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataAppendRow(&pWriter->bData, pRow, NULL /* TODO */, tbid->uid);
TSDB_CHECK_CODE(code, lino, _exit);
if (pWriter->bData.nRow >= pWriter->maxRow) {
code = tsdbSttFWriteTSDataBlock(pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return 0;
}
int32_t tsdbSttFWriteDLData(SSttFWriter *pWriter, int64_t suid, int64_t uid, int64_t version) {
int32_t code = 0;
int32_t lino = 0;
// TODO write row
return 0;
}
#endif

View File

@ -35,6 +35,8 @@ struct SSttFWriterConf {
SSkmInfo *pSkmTb; SSkmInfo *pSkmTb;
SSkmInfo *pSkmRow; SSkmInfo *pSkmRow;
int32_t maxRow; int32_t maxRow;
int32_t szPage;
int8_t cmprAlg;
}; };
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -27,11 +27,14 @@ typedef struct SDelBlock SDelBlock;
typedef struct SDelBlk SDelBlk; typedef struct SDelBlk SDelBlk;
/* Exposed APIs */ /* Exposed APIs */
int32_t tDelBlockCreate(SDelBlock *pDelBlock);
int32_t tDelBlockDestroy(SDelBlock *pDelBlock);
int32_t tDelBlockClear(SDelBlock *pDelBlock);
int32_t tDelBlockAppend(SDelBlock *pDelBlock, const TABLEID *tbid, const SDelData *pDelData);
/* Exposed Structs */ /* Exposed Structs */
struct SDelBlock { struct SDelBlock {
// <suid, uid, version, skey, ekey> SColData aColData[4]; // <suid, uid, version, skey, ekey>
SColData aColData[4];
}; };
struct SDelBlk { struct SDelBlk {

View File

@ -141,7 +141,7 @@ _exit:
return code; return code;
} }
static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size) { int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size) {
int32_t code = 0; int32_t code = 0;
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage); int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage); int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
@ -173,7 +173,7 @@ _exit:
return code; return code;
} }
static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
int32_t code = 0; int32_t code = 0;
int64_t n = 0; int64_t n = 0;
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage); int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
@ -202,7 +202,7 @@ _exit:
return code; return code;
} }
static int32_t tsdbFsyncFile(STsdbFD *pFD) { int32_t tsdbFsyncFile(STsdbFD *pFD) {
int32_t code = 0; int32_t code = 0;
code = tsdbWriteFilePage(pFD); code = tsdbWriteFilePage(pFD);