more code

This commit is contained in:
Hongze Cheng 2023-03-30 17:05:58 +08:00
parent 8afa2c5354
commit 88907b1996
1 changed files with 60 additions and 4 deletions

View File

@ -104,6 +104,21 @@ _exit:
return code; return code;
} }
static int32_t write_statistics_block(struct SSttFWriter *pWriter) {
int32_t code = 0;
tTbStatisBlockClear(&pWriter->sData);
// TODO
return code;
}
static int32_t write_delete_block(struct SSttFWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t write_stt_blk(struct SSttFWriter *pWriter) { static int32_t write_stt_blk(struct SSttFWriter *pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
@ -217,14 +232,19 @@ static int32_t destroy_stt_fwriter(struct SSttFWriter *pWriter) {
static int32_t open_stt_fwriter(struct SSttFWriter *pWriter) { static int32_t open_stt_fwriter(struct SSttFWriter *pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
code = tsdbOpenFile(pWriter->config.file.fname, pWriter->config.szPage, flag, &pWriter->pFd); code = tsdbOpenFile(pWriter->config.file.fname, pWriter->config.szPage, flag, &pWriter->pFd);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteFile(pWriter->pFd, 0, hdr, sizeof(hdr));
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
if (pWriter->pFd) tsdbCloseFile(&pWriter->pFd);
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino,
tstrerror(code)); tstrerror(code));
} }
@ -248,8 +268,8 @@ int32_t tsdbSttFWriterOpen(const struct SSttFWriterConf *pConf, struct SSttFWrit
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pConf->pTsdb->pVnode), __func__, lino, tstrerror(code));
if (ppWriter[0]) destroy_stt_fwriter(ppWriter[0]); if (ppWriter[0]) destroy_stt_fwriter(ppWriter[0]);
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pConf->pTsdb->pVnode), __func__, lino, tstrerror(code));
} }
return code; return code;
} }
@ -287,12 +307,28 @@ int32_t tsdbSttFWriteTSData(struct SSttFWriter *pWriter, TABLEID *tbid, TSDBROW
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
TSDBKEY key = TSDBROW_KEY(pRow);
if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) { if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) {
if (pWriter->bData.nRow > 0) { if (pWriter->bData.nRow > 0) {
code = write_timeseries_block(pWriter); code = write_timeseries_block(pWriter);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (pWriter->sData.nRow >= pWriter->config.maxRow) {
code = write_statistics_block(pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
}
pWriter->sData.aData[0][pWriter->sData.nRow] = tbid->suid; // suid
pWriter->sData.aData[1][pWriter->sData.nRow] = tbid->uid; // uid
pWriter->sData.aData[2][pWriter->sData.nRow] = key.ts; // skey
pWriter->sData.aData[3][pWriter->sData.nRow] = key.version; // sver
pWriter->sData.aData[4][pWriter->sData.nRow] = key.ts; // ekey
pWriter->sData.aData[5][pWriter->sData.nRow] = key.version; // ever
pWriter->sData.aData[6][pWriter->sData.nRow] = 1; // count
pWriter->sData.nRow++;
code = tsdbUpdateSkmTb(pWriter->config.pTsdb, tbid, pWriter->config.pSkmTb); code = tsdbUpdateSkmTb(pWriter->config.pTsdb, tbid, pWriter->config.pSkmTb);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
@ -314,6 +350,17 @@ int32_t tsdbSttFWriteTSData(struct SSttFWriter *pWriter, TABLEID *tbid, TSDBROW
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (key.ts > pWriter->sData.aData[4][pWriter->sData.nRow - 1]) {
pWriter->sData.aData[4][pWriter->sData.nRow - 1] = key.ts; // ekey
pWriter->sData.aData[5][pWriter->sData.nRow - 1] = key.version; // ever
pWriter->sData.aData[6][pWriter->sData.nRow - 1]++; // count
} else if (key.ts == pWriter->sData.aData[4][pWriter->sData.nRow - 1]) {
pWriter->sData.aData[4][pWriter->sData.nRow - 1] = key.ts; // ekey
pWriter->sData.aData[5][pWriter->sData.nRow - 1] = key.version; // ever
} else {
ASSERTS(0, "timestamp should be in ascending order");
}
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino,
@ -323,7 +370,16 @@ _exit:
} }
int32_t tsdbSttFWriteDLData(struct SSttFWriter *pWriter, TABLEID *tbid, SDelData *pDelData) { int32_t tsdbSttFWriteDLData(struct SSttFWriter *pWriter, TABLEID *tbid, SDelData *pDelData) {
int32_t code = 0; pWriter->dData.aData[0][pWriter->dData.nRow] = tbid->suid; // suid
// TODO pWriter->dData.aData[1][pWriter->dData.nRow] = tbid->uid; // uid
return code; pWriter->dData.aData[2][pWriter->dData.nRow] = pDelData->version; // version
pWriter->dData.aData[3][pWriter->dData.nRow] = pDelData->sKey; // skey
pWriter->dData.aData[4][pWriter->dData.nRow] = pDelData->eKey; // ekey
pWriter->dData.nRow++;
if (pWriter->dData.nRow >= pWriter->config.maxRow) {
return write_delete_block(pWriter);
} else {
return 0;
}
} }