more code
This commit is contained in:
parent
7203819888
commit
fff4a96bdf
|
@ -1105,7 +1105,9 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
// TODO: do write the row, refer line 1593
|
// write a NULL row to end current table data write
|
||||||
|
code = tsdbSnapWriteTableRow(pWriter, NULL);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
if (pWriter->bData.nRow > 0) {
|
if (pWriter->bData.nRow > 0) {
|
||||||
if (pWriter->bData.nRow < pWriter->minRow) {
|
if (pWriter->bData.nRow < pWriter->minRow) {
|
||||||
|
@ -1120,7 +1122,7 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tBlockDataReset(&pWriter->bData);
|
tBlockDataClear(&pWriter->bData);
|
||||||
} else {
|
} else {
|
||||||
code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
|
code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
@ -1134,6 +1136,9 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pBlockIdx->suid = pWriter->tbid.suid;
|
||||||
|
pBlockIdx->uid = pWriter->tbid.uid;
|
||||||
|
|
||||||
code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mDataBlk, pBlockIdx);
|
code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mDataBlk, pBlockIdx);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
@ -1247,9 +1252,14 @@ static int32_t tsdbSnapWriteFileDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
|
|
||||||
ASSERT(pWriter->pDataFWriter);
|
ASSERT(pWriter->pDataFWriter);
|
||||||
|
|
||||||
code = tsdbSnapWriteTableData(pWriter, NULL /* TODO */);
|
code = tsdbSnapWriteTableData(pWriter, NULL);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
if (pWriter->sData.nRow) {
|
||||||
|
code = tsdbWriteSttBlock(pWriter->pDataFWriter, &pWriter->sData, pWriter->aSttBlk, pWriter->cmprAlg);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
// do file-level updates
|
// do file-level updates
|
||||||
code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aSttBlk);
|
code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aSttBlk);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
@ -1271,8 +1281,12 @@ static int32_t tsdbSnapWriteFileDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: do clear sources
|
// clear sources
|
||||||
{}
|
while (pWriter->iterList) {
|
||||||
|
STsdbDataIter2* pIter = pWriter->iterList;
|
||||||
|
pWriter->iterList = pIter->next;
|
||||||
|
tsdbCloseDataIter2(pIter);
|
||||||
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -1349,7 +1363,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
|
static int32_t tsdbSnapWriteTableRowImpl(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
@ -1368,6 +1382,79 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
// TODO: take pRow == NULL into account
|
||||||
|
|
||||||
|
if (pWriter->pDIter == NULL || (pWriter->pDIter->dIter.iRow >= pWriter->pDIter->dIter.bData.nRow &&
|
||||||
|
pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem)) {
|
||||||
|
code = tsdbSnapWriteTableRowImpl(pWriter, pRow);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
} else {
|
||||||
|
for (;;) {
|
||||||
|
while (pWriter->pDIter->dIter.iRow < pWriter->pDIter->dIter.bData.nRow) {
|
||||||
|
TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow);
|
||||||
|
|
||||||
|
int32_t c = tsdbRowCmprFn(pRow, &row);
|
||||||
|
if (c < 0) {
|
||||||
|
code = tsdbSnapWriteTableRowImpl(pWriter, pRow);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
goto _exit;
|
||||||
|
} else if (c > 0) {
|
||||||
|
code = tsdbSnapWriteTableRowImpl(pWriter, pRow);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
pWriter->pDIter->dIter.iRow++;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
if (pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem) {
|
||||||
|
code = tsdbSnapWriteTableRowImpl(pWriter, pRow);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME: Here can be slow, use array instead
|
||||||
|
SDataBlk dataBlk;
|
||||||
|
tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);
|
||||||
|
|
||||||
|
int32_t c = tDataBlkCmprFn(&dataBlk, &(SDataBlk){.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)});
|
||||||
|
if (c > 0) {
|
||||||
|
code = tsdbSnapWriteTableRowImpl(pWriter, pRow);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
goto _exit;
|
||||||
|
} else if (c < 0) {
|
||||||
|
if (pWriter->bData.nRow > 0) {
|
||||||
|
code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk);
|
||||||
|
pWriter->pDIter->dIter.iDataBlk++;
|
||||||
|
} else {
|
||||||
|
code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
pWriter->pDIter->dIter.iRow = 0;
|
||||||
|
pWriter->pDIter->dIter.iDataBlk++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowInfo) {
|
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowInfo) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -1384,68 +1471,9 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn
|
||||||
}
|
}
|
||||||
|
|
||||||
// end with a NULL row
|
// end with a NULL row
|
||||||
if (pRowInfo == NULL) goto _exit;
|
if (pRowInfo) {
|
||||||
|
|
||||||
// do write the row
|
|
||||||
if (pWriter->pDIter == NULL || (pWriter->pDIter->dIter.iRow >= pWriter->pDIter->dIter.bData.nRow &&
|
|
||||||
pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem)) {
|
|
||||||
code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
|
code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
} else {
|
|
||||||
for (;;) {
|
|
||||||
while (pWriter->pDIter->dIter.iRow < pWriter->pDIter->dIter.bData.nRow) {
|
|
||||||
TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow);
|
|
||||||
|
|
||||||
int32_t c = tsdbRowCmprFn(&pRowInfo->row, &row);
|
|
||||||
if (c < 0) {
|
|
||||||
code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
goto _exit;
|
|
||||||
} else if (c > 0) {
|
|
||||||
code = tsdbSnapWriteTableRow(pWriter, &row);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
pWriter->pDIter->dIter.iRow++;
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (;;) {
|
|
||||||
if (pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem) {
|
|
||||||
code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME: Here can be slow, use array instead
|
|
||||||
SDataBlk dataBlk;
|
|
||||||
tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);
|
|
||||||
|
|
||||||
int32_t c = tDataBlkCmprFn(
|
|
||||||
&dataBlk, &(SDataBlk){.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)});
|
|
||||||
if (c > 0) {
|
|
||||||
code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
goto _exit;
|
|
||||||
} else if (c < 0) {
|
|
||||||
if (pWriter->bData.nRow > 0) {
|
|
||||||
code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk);
|
|
||||||
pWriter->pDIter->dIter.iDataBlk++;
|
|
||||||
} else {
|
|
||||||
code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
pWriter->pDIter->dIter.iRow = 0;
|
|
||||||
pWriter->pDIter->dIter.iDataBlk++;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
|
Loading…
Reference in New Issue