make it compile
This commit is contained in:
parent
ec886154a4
commit
2f5eeb8000
|
@ -1106,6 +1106,95 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSnapWriteTableRowImpl(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
code = tBlockDataAppendRow(&pWriter->bData, pRow, pWriter->skmTable.pTSchema, pWriter->tbid.uid);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
if (pWriter->bData.nRow >= pWriter->maxRow) {
|
||||||
|
code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
TSDBKEY inKey = pRow ? TSDBROW_KEY(pRow) : TSDBKEY_MAX;
|
||||||
|
|
||||||
|
if (pWriter->pDIter == NULL || (pWriter->pDIter->dIter.iRow >= pWriter->pDIter->dIter.bData.nRow &&
|
||||||
|
pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem)) {
|
||||||
|
goto _write_row;
|
||||||
|
} else {
|
||||||
|
for (;;) {
|
||||||
|
while (pWriter->pDIter->dIter.iRow < pWriter->pDIter->dIter.bData.nRow) {
|
||||||
|
TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow);
|
||||||
|
|
||||||
|
int32_t c = tsdbKeyCmprFn(&inKey, &TSDBROW_KEY(&row));
|
||||||
|
if (c < 0) {
|
||||||
|
goto _write_row;
|
||||||
|
} else if (c > 0) {
|
||||||
|
code = tsdbSnapWriteTableRowImpl(pWriter, &row);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
pWriter->pDIter->dIter.iRow++;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
if (pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem) goto _write_row;
|
||||||
|
|
||||||
|
// FIXME: Here can be slow, use array instead
|
||||||
|
SDataBlk dataBlk;
|
||||||
|
tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);
|
||||||
|
|
||||||
|
int32_t c = tDataBlkCmprFn(&dataBlk, &(SDataBlk){.minKey = inKey, .maxKey = inKey});
|
||||||
|
if (c > 0) {
|
||||||
|
goto _write_row;
|
||||||
|
} else if (c < 0) {
|
||||||
|
if (pWriter->bData.nRow > 0) {
|
||||||
|
code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk);
|
||||||
|
pWriter->pDIter->dIter.iDataBlk++;
|
||||||
|
} else {
|
||||||
|
code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
pWriter->pDIter->dIter.iRow = 0;
|
||||||
|
pWriter->pDIter->dIter.iDataBlk++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_write_row:
|
||||||
|
if (pRow) {
|
||||||
|
code = tsdbSnapWriteTableRowImpl(pWriter, pRow);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -1251,6 +1340,34 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowInfo) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
// switch to new table if need
|
||||||
|
if (pRowInfo == NULL || pRowInfo->uid != pWriter->tbid.uid) {
|
||||||
|
if (pWriter->tbid.uid != 0) {
|
||||||
|
code = tsdbSnapWriteTableDataEnd(pWriter);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbSnapWriteTableDataStart(pWriter, (TABLEID*)pRowInfo);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
// end with a NULL row
|
||||||
|
if (pRowInfo) {
|
||||||
|
code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteFileDataEnd(STsdbSnapWriter* pWriter) {
|
static int32_t tsdbSnapWriteFileDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -1370,123 +1487,6 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteTableRowImpl(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
code = tBlockDataAppendRow(&pWriter->bData, pRow, pWriter->skmTable.pTSchema, pWriter->tbid.uid);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (pWriter->bData.nRow >= pWriter->maxRow) {
|
|
||||||
code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
TSDBKEY inKey = pRow ? TSDBROW_KEY(pRow) : TSDBKEY_MAX;
|
|
||||||
|
|
||||||
if (pWriter->pDIter == NULL || (pWriter->pDIter->dIter.iRow >= pWriter->pDIter->dIter.bData.nRow &&
|
|
||||||
pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem)) {
|
|
||||||
goto _write_row;
|
|
||||||
} else {
|
|
||||||
for (;;) {
|
|
||||||
while (pWriter->pDIter->dIter.iRow < pWriter->pDIter->dIter.bData.nRow) {
|
|
||||||
TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow);
|
|
||||||
|
|
||||||
int32_t c = tsdbKeyCmprFn(&inKey, &TSDBROW_KEY(&row));
|
|
||||||
if (c < 0) {
|
|
||||||
goto _write_row;
|
|
||||||
} else if (c > 0) {
|
|
||||||
code = tsdbSnapWriteTableRowImpl(pWriter, &row);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
pWriter->pDIter->dIter.iRow++;
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (;;) {
|
|
||||||
if (pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem) goto _write_row;
|
|
||||||
|
|
||||||
// FIXME: Here can be slow, use array instead
|
|
||||||
SDataBlk dataBlk;
|
|
||||||
tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);
|
|
||||||
|
|
||||||
int32_t c = tDataBlkCmprFn(&dataBlk, &(SDataBlk){.minKey = inKey, .maxKey = inKey});
|
|
||||||
if (c > 0) {
|
|
||||||
goto _write_row;
|
|
||||||
} else if (c < 0) {
|
|
||||||
if (pWriter->bData.nRow > 0) {
|
|
||||||
code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk);
|
|
||||||
pWriter->pDIter->dIter.iDataBlk++;
|
|
||||||
} else {
|
|
||||||
code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
pWriter->pDIter->dIter.iRow = 0;
|
|
||||||
pWriter->pDIter->dIter.iDataBlk++;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_write_row:
|
|
||||||
if (pRow) {
|
|
||||||
code = tsdbSnapWriteTableRowImpl(pWriter, pRow);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowInfo) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
// switch to new table if need
|
|
||||||
if (pRowInfo == NULL || pRowInfo->uid != pWriter->tbid.uid) {
|
|
||||||
if (pWriter->tbid.uid != 0) {
|
|
||||||
code = tsdbSnapWriteTableDataEnd(pWriter);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbSnapWriteTableDataStart(pWriter, (TABLEID*)pRowInfo);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
// end with a NULL row
|
|
||||||
if (pRowInfo) {
|
|
||||||
code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
|
static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -1849,21 +1849,13 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
|
||||||
taosArrayDestroy(pWriter->aDelIdxR);
|
taosArrayDestroy(pWriter->aDelIdxR);
|
||||||
|
|
||||||
// SNAP_DATA_TSDB
|
// SNAP_DATA_TSDB
|
||||||
|
tBlockDataDestroy(&pWriter->sData, 1);
|
||||||
// // Writer
|
|
||||||
// tBlockDataDestroy(&pWriter->dWriter.sData, 1);
|
|
||||||
// tBlockDataDestroy(&pWriter->dWriter.bData, 1);
|
|
||||||
// taosArrayDestroy(pWriter->dWriter.aSttBlk);
|
|
||||||
// tMapDataClear(&pWriter->dWriter.mDataBlk);
|
|
||||||
// taosArrayDestroy(pWriter->dWriter.aBlockIdx);
|
|
||||||
|
|
||||||
// // Reader
|
|
||||||
// tBlockDataDestroy(&pWriter->dReader.bData, 1);
|
|
||||||
// tMapDataClear(&pWriter->dReader.mDataBlk);
|
|
||||||
// taosArrayDestroy(pWriter->dReader.aBlockIdx);
|
|
||||||
|
|
||||||
tBlockDataDestroy(&pWriter->bData, 1);
|
tBlockDataDestroy(&pWriter->bData, 1);
|
||||||
|
taosArrayDestroy(pWriter->aSttBlk);
|
||||||
|
tMapDataClear(&pWriter->mDataBlk);
|
||||||
|
taosArrayDestroy(pWriter->aBlockIdx);
|
||||||
tDestroyTSchema(pWriter->skmTable.pTSchema);
|
tDestroyTSchema(pWriter->skmTable.pTSchema);
|
||||||
|
tBlockDataDestroy(&pWriter->inData, 1);
|
||||||
|
|
||||||
for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
|
for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
|
||||||
tFree(pWriter->aBuf[iBuf]);
|
tFree(pWriter->aBuf[iBuf]);
|
||||||
|
|
Loading…
Reference in New Issue