more code

This commit is contained in:
Hongze Cheng 2022-09-08 09:32:51 +08:00
parent 79f71be7e0
commit dda66c6eaf
1 changed files with 60 additions and 17 deletions

View File

@ -1307,7 +1307,7 @@ _err:
return code; return code;
} }
static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { static int32_t tsdbSnapWriteWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, int8_t* done) {
int32_t code = 0; int32_t code = 0;
SBlockData* pBlockData = &pWriter->bData; SBlockData* pBlockData = &pWriter->bData;
@ -1315,19 +1315,6 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) {
TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow); TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow);
TSDBKEY key = TSDBROW_KEY(&row); TSDBKEY key = TSDBROW_KEY(&row);
// End last table data write if need
if (id.suid != pWriter->id.suid || id.uid != pWriter->id.uid) {
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
}
// Start new table data write if need
if (pWriter->id.suid == 0 && pWriter->id.uid == 0) {
code = tsdbSnapWriteTableDataStart(pWriter, &id);
if (code) goto _err;
}
// Merge with .data file data
if (pWriter->dReader.pBlockIdx && tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &id) == 0) { if (pWriter->dReader.pBlockIdx && tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &id) == 0) {
_merge_block: _merge_block:
// merge with data block in row // merge with data block in row
@ -1347,10 +1334,15 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) {
} }
if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) { if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
// TODO: commit to the block code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
pWriter->cmprAlg);
if (code) goto _err;
} }
if (c < 0) goto _exit; if (c < 0) {
*done = 1;
goto _exit;
}
} }
// merge with dataBlk in whole // merge with dataBlk in whole
@ -1374,6 +1366,9 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) {
pWriter->cmprAlg); pWriter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
*done = 1;
goto _exit;
} else { } else {
code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData); code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData);
if (code) goto _err; if (code) goto _err;
@ -1386,7 +1381,20 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) {
if (code) goto _err; if (code) goto _err;
} }
// Append to the .stt data block (todo: check if need to set/reload sst block) _exit:
return code;
_err:
return code;
}
static int32_t tsdbSnapWriteWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) {
int32_t code = 0;
SBlockData* pBlockData = &pWriter->bData;
TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]};
TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow);
code = tBlockDataAppendRow(&pWriter->dWriter.sData, &row, NULL, id.uid); code = tBlockDataAppendRow(&pWriter->dWriter.sData, &row, NULL, id.uid);
if (code) goto _err; if (code) goto _err;
@ -1403,6 +1411,41 @@ _err:
return code; return code;
} }
static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) {
int32_t code = 0;
SBlockData* pBlockData = &pWriter->bData;
TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]};
// End last table data write if need
if (id.suid != pWriter->id.suid || id.uid != pWriter->id.uid) {
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
}
// Start new table data write if need
if (pWriter->id.suid == 0 && pWriter->id.uid == 0) {
code = tsdbSnapWriteTableDataStart(pWriter, &id);
if (code) goto _err;
}
// Merge with .data file data
int8_t done = 0;
code = tsdbSnapWriteWriteToDataFile(pWriter, iRow, &done);
if (code) goto _err;
// Append to the .stt data block (todo: check if need to set/reload sst block)
if (!done) {
code = tsdbSnapWriteWriteToSttFile(pWriter, iRow);
if (code) goto _err;
}
_exit:
return code;
_err:
return code;
}
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb; STsdb* pTsdb = pWriter->pTsdb;