more code
This commit is contained in:
parent
2f5eeb8000
commit
8d56306517
|
@ -1049,7 +1049,7 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI
|
||||||
|
|
||||||
SBlockIdx* pNewBlockIdx = taosArrayReserve(pWriter->aBlockIdx, 1);
|
SBlockIdx* pNewBlockIdx = taosArrayReserve(pWriter->aBlockIdx, 1);
|
||||||
if (pNewBlockIdx == NULL) {
|
if (pNewBlockIdx == NULL) {
|
||||||
code == TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1091,9 +1091,11 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
TABLEID id = {.suid = pWriter->tbid.suid, .uid = pWriter->tbid.suid ? 0 : pWriter->tbid.uid};
|
if (pId) {
|
||||||
code = tBlockDataInit(&pWriter->sData, &id, pWriter->skmTable.pTSchema, NULL, 0);
|
TABLEID id = {.suid = pWriter->tbid.suid, .uid = pWriter->tbid.suid ? 0 : pWriter->tbid.uid};
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
code = tBlockDataInit(&pWriter->sData, &id, pWriter->skmTable.pTSchema, NULL, 0);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -1206,8 +1208,8 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
if (pWriter->bData.nRow > 0) {
|
if (pWriter->bData.nRow > 0) {
|
||||||
if (pWriter->bData.nRow < pWriter->minRow) {
|
if (pWriter->bData.nRow < pWriter->minRow) {
|
||||||
for (int32_t iRow = 0; iRow < pWriter->bData.nRow; iRow++) {
|
for (int32_t iRow = 0; iRow < pWriter->bData.nRow; iRow++) {
|
||||||
code = tBlockDataAppendRow(&pWriter->sData, &tsdbRowFromBlockData(&pWriter->bData, iRow),
|
code =
|
||||||
pWriter->skmTable.pTSchema, pWriter->tbid.uid);
|
tBlockDataAppendRow(&pWriter->sData, &tsdbRowFromBlockData(&pWriter->bData, iRow), NULL, pWriter->tbid.uid);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
if (pWriter->sData.nRow >= pWriter->maxRow) {
|
if (pWriter->sData.nRow >= pWriter->maxRow) {
|
||||||
|
@ -1374,13 +1376,17 @@ static int32_t tsdbSnapWriteFileDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
|
|
||||||
ASSERT(pWriter->pDataFWriter);
|
ASSERT(pWriter->pDataFWriter);
|
||||||
|
|
||||||
code = tsdbSnapWriteTableData(pWriter, NULL);
|
// consume remain data and end with a NULL table row
|
||||||
|
SRowInfo* pRowInfo;
|
||||||
|
code = tsdbSnapWriteGetRow(pWriter, &pRowInfo);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
for (;;) {
|
||||||
|
code = tsdbSnapWriteTableData(pWriter, pRowInfo);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// TODO: ??
|
if (pRowInfo == NULL) break;
|
||||||
|
|
||||||
if (pWriter->sData.nRow) {
|
code = tsdbSnapWriteNextRow(pWriter, &pRowInfo);
|
||||||
code = tsdbWriteSttBlock(pWriter->pDataFWriter, &pWriter->sData, pWriter->aSttBlk, pWriter->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1717,88 +1723,33 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// SNAP_DATA_TSDB
|
// SNAP_DATA_TSDB
|
||||||
#if 1
|
|
||||||
pWriter->fid = INT32_MIN;
|
|
||||||
|
|
||||||
code = tBlockDataCreate(&pWriter->inData);
|
code = tBlockDataCreate(&pWriter->inData);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
code = tBlockDataCreate(&pWriter->bData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
#else
|
|
||||||
code = tBlockDataCreate(&pWriter->bData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
pWriter->fid = INT32_MIN;
|
pWriter->fid = INT32_MIN;
|
||||||
pWriter->id = (TABLEID){0};
|
|
||||||
// Reader
|
code = tBlockDataCreate(&pWriter->bData);
|
||||||
pWriter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
|
||||||
if (pWriter->dReader.aBlockIdx == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
code = tBlockDataCreate(&pWriter->dReader.bData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// Writer
|
code = tBlockDataCreate(&pWriter->sData);
|
||||||
pWriter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
|
||||||
if (pWriter->dWriter.aBlockIdx == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
pWriter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
|
|
||||||
if (pWriter->dWriter.aSttBlk == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
code = tBlockDataCreate(&pWriter->dWriter.bData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
code = tBlockDataCreate(&pWriter->dWriter.sData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// SNAP_DATA_DEL
|
// SNAP_DATA_DEL
|
||||||
pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx));
|
|
||||||
if (pWriter->aDelIdxR == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
pWriter->aDelData = taosArrayInit(0, sizeof(SDelData));
|
|
||||||
if (pWriter->aDelData == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
pWriter->aDelIdxW = taosArrayInit(0, sizeof(SDelIdx));
|
|
||||||
if (pWriter->aDelIdxW == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||||
*ppWriter = NULL;
|
|
||||||
|
|
||||||
if (pWriter) {
|
if (pWriter) {
|
||||||
#if 0
|
tBlockDataDestroy(&pWriter->sData, 1);
|
||||||
if (pWriter->aDelIdxW) taosArrayDestroy(pWriter->aDelIdxW);
|
|
||||||
if (pWriter->aDelData) taosArrayDestroy(pWriter->aDelData);
|
|
||||||
if (pWriter->aDelIdxR) taosArrayDestroy(pWriter->aDelIdxR);
|
|
||||||
tBlockDataDestroy(&pWriter->dWriter.sData, 1);
|
|
||||||
tBlockDataDestroy(&pWriter->dWriter.bData, 1);
|
|
||||||
if (pWriter->dWriter.aSttBlk) taosArrayDestroy(pWriter->dWriter.aSttBlk);
|
|
||||||
if (pWriter->dWriter.aBlockIdx) taosArrayDestroy(pWriter->dWriter.aBlockIdx);
|
|
||||||
tBlockDataDestroy(&pWriter->dReader.bData, 1);
|
|
||||||
if (pWriter->dReader.aBlockIdx) taosArrayDestroy(pWriter->dReader.aBlockIdx);
|
|
||||||
tBlockDataDestroy(&pWriter->bData, 1);
|
tBlockDataDestroy(&pWriter->bData, 1);
|
||||||
|
tBlockDataDestroy(&pWriter->inData, 1);
|
||||||
tsdbFSDestroy(&pWriter->fs);
|
tsdbFSDestroy(&pWriter->fs);
|
||||||
taosMemoryFree(pWriter);
|
pWriter = NULL;
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tsdbInfo("vgId:%d, %s done", TD_VID(pTsdb->pVnode), __func__);
|
tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, sver, ever);
|
||||||
*ppWriter = pWriter;
|
|
||||||
}
|
}
|
||||||
|
*ppWriter = pWriter;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue