more code
This commit is contained in:
parent
b8578e242e
commit
a902d412eb
|
@ -1030,6 +1030,7 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI
|
|||
if (pWriter->pDIter) {
|
||||
STsdbDataIter2* pIter = pWriter->pDIter;
|
||||
|
||||
// assert last table data end
|
||||
ASSERT(pIter->dIter.iRow >= pIter->dIter.bData.nRow);
|
||||
ASSERT(pIter->dIter.iDataBlk >= pIter->dIter.mDataBlk.nItem);
|
||||
|
||||
|
@ -1095,8 +1096,8 @@ _exit:
|
|||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, pId->suid,
|
||||
pId->uid);
|
||||
tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__,
|
||||
pWriter->tbid.suid, pWriter->tbid.uid);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1388,24 +1389,21 @@ static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
|
|||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
// TODO: take pRow == NULL into account
|
||||
TSDBKEY inKey = pRow ? TSDBROW_KEY(pRow) : TSDBKEY_MAX;
|
||||
|
||||
if (pWriter->pDIter == NULL || (pWriter->pDIter->dIter.iRow >= pWriter->pDIter->dIter.bData.nRow &&
|
||||
pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem)) {
|
||||
code = tsdbSnapWriteTableRowImpl(pWriter, pRow);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
goto _write_row;
|
||||
} else {
|
||||
for (;;) {
|
||||
while (pWriter->pDIter->dIter.iRow < pWriter->pDIter->dIter.bData.nRow) {
|
||||
TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow);
|
||||
|
||||
int32_t c = tsdbRowCmprFn(pRow, &row);
|
||||
int32_t c = tsdbKeyCmprFn(&inKey, &TSDBROW_KEY(&row));
|
||||
if (c < 0) {
|
||||
code = tsdbSnapWriteTableRowImpl(pWriter, pRow);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
goto _exit;
|
||||
goto _write_row;
|
||||
} else if (c > 0) {
|
||||
code = tsdbSnapWriteTableRowImpl(pWriter, pRow);
|
||||
code = tsdbSnapWriteTableRowImpl(pWriter, &row);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
pWriter->pDIter->dIter.iRow++;
|
||||
|
@ -1415,21 +1413,15 @@ static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
|
|||
}
|
||||
|
||||
for (;;) {
|
||||
if (pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem) {
|
||||
code = tsdbSnapWriteTableRowImpl(pWriter, pRow);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
goto _exit;
|
||||
}
|
||||
if (pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem) goto _write_row;
|
||||
|
||||
// FIXME: Here can be slow, use array instead
|
||||
SDataBlk dataBlk;
|
||||
tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);
|
||||
|
||||
int32_t c = tDataBlkCmprFn(&dataBlk, &(SDataBlk){.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)});
|
||||
int32_t c = tDataBlkCmprFn(&dataBlk, &(SDataBlk){.minKey = inKey, .maxKey = inKey});
|
||||
if (c > 0) {
|
||||
code = tsdbSnapWriteTableRowImpl(pWriter, pRow);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
goto _exit;
|
||||
goto _write_row;
|
||||
} else if (c < 0) {
|
||||
if (pWriter->bData.nRow > 0) {
|
||||
code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
|
||||
|
@ -1450,6 +1442,12 @@ static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
|
|||
}
|
||||
}
|
||||
|
||||
_write_row:
|
||||
if (pRow) {
|
||||
code = tsdbSnapWriteTableRowImpl(pWriter, pRow);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
|
@ -1510,7 +1508,7 @@ static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* pWriter, SSnapDataHd
|
|||
SRowInfo* pRowInfo;
|
||||
code = tsdbSnapWriteGetRow(pWriter, &pRowInfo);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
for (int32_t iRow = 0; iRow < pWriter->bData.nRow; ++iRow) {
|
||||
for (int32_t iRow = 0; iRow < pWriter->inData.nRow; ++iRow) {
|
||||
SRowInfo rInfo = {.suid = pWriter->inData.suid,
|
||||
.uid = pWriter->inData.uid ? pWriter->inData.uid : pWriter->inData.aUid[iRow],
|
||||
.row = tsdbRowFromBlockData(&pWriter->inData, iRow)};
|
||||
|
|
|
@ -757,7 +757,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
|||
|
||||
pTColVal->value.nData = pColVal->value.nData;
|
||||
if (pTColVal->value.nData) {
|
||||
memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
|
||||
memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
|
||||
}
|
||||
pTColVal->flag = 0;
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue