more code
This commit is contained in:
parent
416d4ac6ad
commit
d630c2fdd7
|
@ -1122,6 +1122,68 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) {
|
||||
int32_t code = 0;
|
||||
// TODO
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pWriter->id.suid == 0 && pWriter->id.uid == 0) return code;
|
||||
|
||||
if (pWriter->dReader.pBlockIdx && pWriter->dReader.pBlockIdx->suid == pWriter->id.suid &&
|
||||
pWriter->dReader.pBlockIdx->uid == pWriter->id.uid) {
|
||||
for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
|
||||
TSDBROW row = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
|
||||
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, pWriter->id.uid);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
|
||||
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
|
||||
pWriter->cmprAlg);
|
||||
if (code) goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
|
||||
pWriter->cmprAlg);
|
||||
if (code) goto _err;
|
||||
|
||||
for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) {
|
||||
SDataBlk dataBlk;
|
||||
tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk);
|
||||
|
||||
code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
code = tsdbSnapNextTableData(pWriter);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
// code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
|
||||
// pWriter->cmprAlg);
|
||||
// if (code) goto _err;
|
||||
|
||||
if (pWriter->dWriter.mDataBlk.nItem) {
|
||||
SBlockIdx blockIdx = {.suid = pWriter->id.suid, .uid = pWriter->id.uid};
|
||||
code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dWriter.mDataBlk, &blockIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blockIdx) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) {
|
||||
int32_t code = 0;
|
||||
STsdb* pTsdb = pWriter->pTsdb;
|
||||
|
@ -1182,31 +1244,8 @@ static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter) {
|
|||
|
||||
ASSERT(pWriter->dWriter.pWriter);
|
||||
|
||||
// todo: end current commit table
|
||||
for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
|
||||
TSDBROW row = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
|
||||
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, pWriter->dReader.pBlockIdx->uid);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) {
|
||||
SDataBlk dataBlk;
|
||||
tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk);
|
||||
|
||||
code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
if (pWriter->dWriter.mDataBlk.nItem) {
|
||||
SBlockIdx blockIdx = *pWriter->dReader.pBlockIdx;
|
||||
code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dWriter.mDataBlk, &blockIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blockIdx) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
code = tsdbSnapWriteTableDataEnd(pWriter);
|
||||
if (code) goto _err;
|
||||
|
||||
// copy remain table data
|
||||
TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX};
|
||||
|
@ -1255,7 +1294,9 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) {
|
|||
|
||||
// End last table data write if need
|
||||
if (id.suid != pWriter->id.suid || id.uid != pWriter->id.uid) {
|
||||
// TODO
|
||||
code = tsdbSnapWriteTableDataEnd(pWriter);
|
||||
if (code) goto _err;
|
||||
|
||||
pWriter->id.suid = 0;
|
||||
pWriter->id.uid = 0;
|
||||
}
|
||||
|
@ -1267,9 +1308,8 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) {
|
|||
if (code) goto _err;
|
||||
|
||||
// Start new table data
|
||||
pWriter->id.suid = id.suid;
|
||||
pWriter->id.uid = id.uid;
|
||||
tMapDataReset(&pWriter->dWriter.mDataBlk);
|
||||
code = tsdbSnapWriteTableDataStart(pWriter, &id);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
// Merge with .data file data
|
||||
|
@ -1527,6 +1567,7 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
// APIs
|
||||
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
|
||||
int32_t code = 0;
|
||||
STsdbSnapWriter* pWriter = NULL;
|
||||
|
|
Loading…
Reference in New Issue