more tsdb snapshot
This commit is contained in:
parent
f20f97032b
commit
255f42e07f
|
@ -84,6 +84,8 @@ typedef struct SLDataIter SLDataIter;
|
|||
#define TSDBKEY_MIN ((TSDBKEY){.ts = TSKEY_MIN, .version = VERSION_MIN})
|
||||
#define TSDBKEY_MAX ((TSDBKEY){.ts = TSKEY_MAX, .version = VERSION_MAX})
|
||||
|
||||
#define TABLE_SAME_SCHEMA(SUID1, UID1, SUID2, UID2) ((SUID1) ? (SUID1) == (SUID2) : (UID1) == (UID2))
|
||||
|
||||
#define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM))
|
||||
#define LOGIC_TO_FILE_OFFSET(LOFFSET, PAGE) \
|
||||
((LOFFSET) / PAGE_CONTENT_SIZE(PAGE) * (PAGE) + (LOFFSET) % PAGE_CONTENT_SIZE(PAGE))
|
||||
|
|
|
@ -616,11 +616,12 @@ struct STsdbSnapWriter {
|
|||
int8_t cmprAlg;
|
||||
int64_t commitID;
|
||||
uint8_t* aBuf[5];
|
||||
|
||||
// for data file
|
||||
SBlockData bData;
|
||||
|
||||
int32_t fid;
|
||||
TABLEID id;
|
||||
int32_t fid;
|
||||
TABLEID id;
|
||||
SSkmInfo skmTable;
|
||||
struct {
|
||||
SDataFReader* pReader;
|
||||
SArray* aBlockIdx;
|
||||
|
@ -639,7 +640,6 @@ struct STsdbSnapWriter {
|
|||
SBlockData bData;
|
||||
SBlockData sData;
|
||||
} dWriter;
|
||||
SSkmInfo skmTable;
|
||||
|
||||
// for del file
|
||||
SDelFReader* pDelFReader;
|
||||
|
@ -651,448 +651,28 @@ struct STsdbSnapWriter {
|
|||
};
|
||||
|
||||
// SNAP_DATA_TSDB
|
||||
#if 0
|
||||
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
|
||||
ASSERT(pWriter->pDataFWriter);
|
||||
|
||||
if (pWriter->pBlockIdxW == NULL) goto _exit;
|
||||
|
||||
// consume remain rows
|
||||
if (pWriter->pBlockData) {
|
||||
ASSERT(pWriter->iRow < pWriter->pBlockData->nRow);
|
||||
while (pWriter->iRow < pWriter->pBlockData->nRow) {
|
||||
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL,
|
||||
0); // todo
|
||||
if (code) goto _err;
|
||||
|
||||
if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) {
|
||||
// pWriter->blockW.last = 0;
|
||||
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
|
||||
// &pWriter->blockW, pWriter->cmprAlg);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk);
|
||||
if (code) goto _err;
|
||||
|
||||
tDataBlkReset(&pWriter->blockW);
|
||||
tBlockDataClear(&pWriter->bDataW);
|
||||
}
|
||||
|
||||
pWriter->iRow++;
|
||||
}
|
||||
}
|
||||
|
||||
// write remain data if has
|
||||
if (pWriter->bDataW.nRow > 0) {
|
||||
// pWriter->blockW.last = 0;
|
||||
if (pWriter->bDataW.nRow < pWriter->minRow) {
|
||||
if (pWriter->iBlock > pWriter->mBlock.nItem) {
|
||||
// pWriter->blockW.last = 1;
|
||||
}
|
||||
}
|
||||
|
||||
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
|
||||
// &pWriter->blockW, pWriter->cmprAlg);
|
||||
// if (code) goto _err;
|
||||
|
||||
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
if (pWriter->iBlock >= pWriter->mBlock.nItem) break;
|
||||
|
||||
SDataBlk block;
|
||||
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk);
|
||||
|
||||
// if (block.last) {
|
||||
// code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
|
||||
// if (code) goto _err;
|
||||
|
||||
// tBlockReset(&block);
|
||||
// block.last = 1;
|
||||
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pWriter->pBlockIdxW, &block,
|
||||
// pWriter->cmprAlg);
|
||||
// if (code) goto _err;
|
||||
// }
|
||||
|
||||
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk);
|
||||
if (code) goto _err;
|
||||
|
||||
pWriter->iBlock++;
|
||||
}
|
||||
|
||||
// SDataBlk
|
||||
// code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW);
|
||||
// if (code) goto _err;
|
||||
|
||||
// SBlockIdx
|
||||
if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
_exit:
|
||||
tsdbInfo("vgId:%d, tsdb snapshot write table data end for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, tsdb snapshot write table data end for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
|
||||
pWriter->pTsdb->path, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) {
|
||||
int32_t code = 0;
|
||||
|
||||
code = tsdbReadDataBlk(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock);
|
||||
if (code) goto _err;
|
||||
|
||||
// SBlockData
|
||||
SDataBlk block;
|
||||
tMapDataReset(&pWriter->mBlockW);
|
||||
for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) {
|
||||
tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetDataBlk);
|
||||
|
||||
// if (block.last) {
|
||||
// code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
|
||||
// if (code) goto _err;
|
||||
|
||||
// tBlockReset(&block);
|
||||
// block.last = 1;
|
||||
// code =
|
||||
// tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block,
|
||||
// pWriter->cmprAlg);
|
||||
// if (code) goto _err;
|
||||
// }
|
||||
|
||||
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
// SDataBlk
|
||||
SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
|
||||
code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mBlockW, &blockIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
// SBlockIdx
|
||||
if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, tsdb snapshot move write table data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
|
||||
pWriter->pTsdb->path, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
SBlockData* pBlockData = &pWriter->bData;
|
||||
int32_t iRow = 0;
|
||||
TSDBROW row;
|
||||
TSDBROW* pRow = &row;
|
||||
|
||||
// // correct schema
|
||||
// code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData);
|
||||
// if (code) goto _err;
|
||||
|
||||
// loop to merge
|
||||
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
|
||||
while (true) {
|
||||
if (pRow == NULL) break;
|
||||
|
||||
if (pWriter->pBlockData) {
|
||||
ASSERT(pWriter->iRow < pWriter->pBlockData->nRow);
|
||||
|
||||
int32_t c = tsdbRowCmprFn(pRow, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow));
|
||||
|
||||
ASSERT(c);
|
||||
|
||||
if (c < 0) {
|
||||
// code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
|
||||
// if (code) goto _err;
|
||||
|
||||
iRow++;
|
||||
if (iRow < pWriter->pBlockData->nRow) {
|
||||
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
|
||||
} else {
|
||||
pRow = NULL;
|
||||
}
|
||||
} else if (c > 0) {
|
||||
// code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow),
|
||||
// NULL); if (code) goto _err;
|
||||
|
||||
pWriter->iRow++;
|
||||
if (pWriter->iRow >= pWriter->pBlockData->nRow) {
|
||||
pWriter->pBlockData = NULL;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
TSDBKEY key = TSDBROW_KEY(pRow);
|
||||
|
||||
while (true) {
|
||||
if (pWriter->iBlock >= pWriter->mBlock.nItem) break;
|
||||
|
||||
SDataBlk block;
|
||||
int32_t c;
|
||||
|
||||
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk);
|
||||
|
||||
// if (block.last) {
|
||||
// pWriter->pBlockData = &pWriter->bDataR;
|
||||
|
||||
// code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL,
|
||||
// NULL); if (code) goto _err; pWriter->iRow = 0;
|
||||
|
||||
// pWriter->iBlock++;
|
||||
// break;
|
||||
// }
|
||||
|
||||
c = tsdbKeyCmprFn(&block.maxKey, &key);
|
||||
|
||||
ASSERT(c);
|
||||
|
||||
if (c < 0) {
|
||||
if (pWriter->bDataW.nRow) {
|
||||
// pWriter->blockW.last = 0;
|
||||
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
|
||||
// &pWriter->blockW, pWriter->cmprAlg);
|
||||
// if (code) goto _err;
|
||||
|
||||
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk);
|
||||
if (code) goto _err;
|
||||
|
||||
tDataBlkReset(&pWriter->blockW);
|
||||
tBlockDataClear(&pWriter->bDataW);
|
||||
}
|
||||
|
||||
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk);
|
||||
if (code) goto _err;
|
||||
|
||||
pWriter->iBlock++;
|
||||
} else {
|
||||
c = tsdbKeyCmprFn(&tBlockDataLastKey(pBlockData), &block.minKey);
|
||||
|
||||
ASSERT(c);
|
||||
|
||||
if (c > 0) {
|
||||
pWriter->pBlockData = &pWriter->bDataR;
|
||||
// code =
|
||||
// tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL,
|
||||
// NULL);
|
||||
// if (code) goto _err;
|
||||
pWriter->iRow = 0;
|
||||
|
||||
pWriter->iBlock++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pWriter->pBlockData) continue;
|
||||
|
||||
// code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
|
||||
// if (code) goto _err;
|
||||
|
||||
iRow++;
|
||||
if (iRow < pBlockData->nRow) {
|
||||
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
|
||||
} else {
|
||||
pRow = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
_check_write:
|
||||
if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue;
|
||||
|
||||
_write_block:
|
||||
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
|
||||
// &pWriter->blockW, pWriter->cmprAlg);
|
||||
// if (code) goto _err;
|
||||
|
||||
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk);
|
||||
if (code) goto _err;
|
||||
|
||||
tDataBlkReset(&pWriter->blockW);
|
||||
tBlockDataClear(&pWriter->bDataW);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, vnode snapshot tsdb write table data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
|
||||
pWriter->pTsdb->path, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) {
|
||||
int32_t code = 0;
|
||||
SBlockData* pBlockData = &pWriter->bData;
|
||||
TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData);
|
||||
TSDBKEY keyLast = tBlockDataLastKey(pBlockData);
|
||||
|
||||
// end last table write if should
|
||||
if (pWriter->pBlockIdxW) {
|
||||
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id);
|
||||
if (c < 0) {
|
||||
// end
|
||||
code = tsdbSnapWriteTableDataEnd(pWriter);
|
||||
if (code) goto _err;
|
||||
|
||||
// reset
|
||||
pWriter->pBlockIdxW = NULL;
|
||||
} else if (c > 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
// start new table data write if need
|
||||
if (pWriter->pBlockIdxW == NULL) {
|
||||
// write table data ahead
|
||||
while (true) {
|
||||
if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break;
|
||||
|
||||
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
|
||||
int32_t c = tTABLEIDCmprFn(pBlockIdx, &id);
|
||||
|
||||
if (c >= 0) break;
|
||||
|
||||
code = tsdbSnapMoveWriteTableData(pWriter, pBlockIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
pWriter->iBlockIdx++;
|
||||
}
|
||||
|
||||
// reader
|
||||
pWriter->pBlockIdx = NULL;
|
||||
if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
|
||||
ASSERT(pWriter->pDataFReader);
|
||||
|
||||
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
|
||||
int32_t c = tTABLEIDCmprFn(pBlockIdx, &id);
|
||||
|
||||
ASSERT(c >= 0);
|
||||
|
||||
if (c == 0) {
|
||||
pWriter->pBlockIdx = pBlockIdx;
|
||||
pWriter->iBlockIdx++;
|
||||
}
|
||||
}
|
||||
|
||||
if (pWriter->pBlockIdx) {
|
||||
code = tsdbReadDataBlk(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
tMapDataReset(&pWriter->mBlock);
|
||||
}
|
||||
pWriter->iBlock = 0;
|
||||
pWriter->pBlockData = NULL;
|
||||
pWriter->iRow = 0;
|
||||
|
||||
// writer
|
||||
pWriter->pBlockIdxW = &pWriter->blockIdxW;
|
||||
pWriter->pBlockIdxW->suid = id.suid;
|
||||
pWriter->pBlockIdxW->uid = id.uid;
|
||||
|
||||
tDataBlkReset(&pWriter->blockW);
|
||||
tBlockDataReset(&pWriter->bDataW);
|
||||
tMapDataReset(&pWriter->mBlockW);
|
||||
}
|
||||
|
||||
ASSERT(pWriter->pBlockIdxW && pWriter->pBlockIdxW->suid == id.suid && pWriter->pBlockIdxW->uid == id.uid);
|
||||
ASSERT(pWriter->pBlockIdx == NULL || (pWriter->pBlockIdx->suid == id.suid && pWriter->pBlockIdx->uid == id.uid));
|
||||
|
||||
code = tsdbSnapWriteTableDataImpl(pWriter);
|
||||
if (code) goto _err;
|
||||
|
||||
_exit:
|
||||
tsdbDebug("vgId:%d, vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode),
|
||||
pWriter->pTsdb->path);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, vnode snapshot tsdb write data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
|
||||
pWriter->pTsdb->path, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
STsdb* pTsdb = pWriter->pTsdb;
|
||||
|
||||
if (pWriter->pDataFWriter == NULL) goto _exit;
|
||||
|
||||
// finish current table
|
||||
code = tsdbSnapWriteTableDataEnd(pWriter);
|
||||
if (code) goto _err;
|
||||
|
||||
// move remain table
|
||||
while (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
|
||||
code = tsdbSnapMoveWriteTableData(pWriter, (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx));
|
||||
if (code) goto _err;
|
||||
|
||||
pWriter->iBlockIdx++;
|
||||
}
|
||||
|
||||
// write remain stuff
|
||||
if (taosArrayGetSize(pWriter->aBlockLW) > 0) {
|
||||
code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aBlockIdxW);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pWriter->aBlockIdx) > 0) {
|
||||
code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pWriter->pDataFReader) {
|
||||
code = tsdbDataFReaderClose(&pWriter->pDataFReader);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
_exit:
|
||||
tsdbInfo("vgId:%d, vnode snapshot tsdb writer data end for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, vnode snapshot tsdb writer data end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
#endif
|
||||
|
||||
extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg);
|
||||
extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg);
|
||||
|
||||
static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
|
||||
pWriter->dReader.iBlockIdx++;
|
||||
ASSERT(pWriter->dReader.iRow >= pWriter->dReader.bData.nRow);
|
||||
|
||||
if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) {
|
||||
pWriter->dReader.pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->dReader.aBlockIdx, pWriter->dReader.iBlockIdx);
|
||||
|
||||
code = tsdbReadDataBlk(pWriter->dReader.pReader, pWriter->dReader.pBlockIdx, &pWriter->dReader.mDataBlk);
|
||||
if (code) goto _exit;
|
||||
|
||||
pWriter->dReader.iDataBlk = -1;
|
||||
tBlockDataReset(&pWriter->dReader.bData);
|
||||
pWriter->dReader.iRow = 0;
|
||||
pWriter->dReader.iBlockIdx++;
|
||||
} else {
|
||||
pWriter->dReader.pBlockIdx = NULL;
|
||||
tMapDataReset(&pWriter->dReader.mDataBlk);
|
||||
}
|
||||
pWriter->dReader.iDataBlk = 0; // point to the next one
|
||||
tBlockDataReset(&pWriter->dReader.bData);
|
||||
pWriter->dReader.iRow = 0;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
@ -1134,10 +714,6 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI
|
|||
code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable);
|
||||
if (code) goto _err;
|
||||
|
||||
// Reader (todo)
|
||||
ASSERT(pWriter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0);
|
||||
|
||||
// Writer
|
||||
tMapDataReset(&pWriter->dWriter.mDataBlk);
|
||||
code = tBlockDataInit(&pWriter->dWriter.bData, pId->suid, pId->uid, pWriter->skmTable.pTSchema);
|
||||
if (code) goto _err;
|
||||
|
@ -1145,6 +721,7 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1153,22 +730,28 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
|||
|
||||
if (pWriter->id.suid == 0 && pWriter->id.uid == 0) return code;
|
||||
|
||||
if (pWriter->dReader.pBlockIdx && pWriter->dReader.pBlockIdx->suid == pWriter->id.suid &&
|
||||
pWriter->dReader.pBlockIdx->uid == pWriter->id.uid) {
|
||||
int32_t c = 1;
|
||||
if (pWriter->dReader.pBlockIdx) {
|
||||
c = tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &pWriter->id);
|
||||
ASSERT(c >= 0);
|
||||
}
|
||||
|
||||
if (c == 0) {
|
||||
SBlockData* pBData = &pWriter->dWriter.bData;
|
||||
|
||||
for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
|
||||
TSDBROW row = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
|
||||
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, pWriter->id.uid);
|
||||
|
||||
code = tBlockDataAppendRow(pBData, &row, NULL, pWriter->id.uid);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
|
||||
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
|
||||
pWriter->cmprAlg);
|
||||
if (pBData->nRow >= pWriter->maxRow) {
|
||||
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg);
|
||||
if (code) goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
|
||||
pWriter->cmprAlg);
|
||||
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg);
|
||||
if (code) goto _err;
|
||||
|
||||
for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) {
|
||||
|
@ -1183,14 +766,9 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
|||
if (code) goto _err;
|
||||
}
|
||||
|
||||
// code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
|
||||
// pWriter->cmprAlg);
|
||||
// if (code) goto _err;
|
||||
|
||||
if (pWriter->dWriter.mDataBlk.nItem) {
|
||||
SBlockIdx blockIdx = {.suid = pWriter->id.suid, .uid = pWriter->id.uid};
|
||||
code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dWriter.mDataBlk, &blockIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blockIdx) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1213,11 +791,11 @@ static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) {
|
|||
|
||||
ASSERT(pWriter->dWriter.pWriter == NULL);
|
||||
|
||||
// open new
|
||||
pWriter->fid = fid;
|
||||
pWriter->id = (TABLEID){0};
|
||||
SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);
|
||||
|
||||
// open reader
|
||||
// Reader
|
||||
if (pSet) {
|
||||
code = tsdbDataFReaderOpen(&pWriter->dReader.pReader, pWriter->pTsdb, pSet);
|
||||
if (code) goto _err;
|
||||
|
@ -1225,10 +803,14 @@ static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) {
|
|||
code = tsdbReadBlockIdx(pWriter->dReader.pReader, pWriter->dReader.aBlockIdx);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
// TODO
|
||||
ASSERT(pWriter->dReader.pReader == NULL);
|
||||
taosArrayClear(pWriter->dReader.aBlockIdx);
|
||||
}
|
||||
pWriter->dReader.iBlockIdx = 0; // point to the next one
|
||||
code = tsdbSnapNextTableData(pWriter);
|
||||
if (code) goto _err;
|
||||
|
||||
// open writer
|
||||
// Writer
|
||||
SHeadFile fHead = {.commitID = pWriter->commitID};
|
||||
SDataFile fData = {.commitID = pWriter->commitID};
|
||||
SSmaFile fSma = {.commitID = pWriter->commitID};
|
||||
|
@ -1250,11 +832,14 @@ static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) {
|
|||
wSet.nSttF = 1;
|
||||
}
|
||||
wSet.aSttF[wSet.nSttF - 1] = &fStt;
|
||||
|
||||
code = tsdbDataFWriterOpen(&pWriter->dWriter.pWriter, pWriter->pTsdb, &wSet);
|
||||
if (code) goto _err;
|
||||
taosArrayClear(pWriter->dWriter.aBlockIdx);
|
||||
tMapDataReset(&pWriter->dWriter.mDataBlk);
|
||||
taosArrayClear(pWriter->dWriter.aSttBlk);
|
||||
tBlockDataReset(&pWriter->dWriter.bData);
|
||||
tBlockDataReset(&pWriter->dWriter.sData);
|
||||
|
||||
return code;
|
||||
|
||||
|
@ -1307,21 +892,24 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, int8_t* done) {
|
||||
static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, int8_t* done) {
|
||||
int32_t code = 0;
|
||||
|
||||
SBlockData* pBlockData = &pWriter->bData;
|
||||
TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]};
|
||||
TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow);
|
||||
SBlockData* pBData = &pWriter->bData;
|
||||
TABLEID id = {.suid = pBData->suid, .uid = pBData->uid ? pBData->uid : pBData->aUid[iRow]};
|
||||
TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
|
||||
TSDBKEY key = TSDBROW_KEY(&row);
|
||||
|
||||
if (pWriter->dReader.pBlockIdx && tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &id) == 0) {
|
||||
_merge_block:
|
||||
// merge with data block in row
|
||||
*done = 0;
|
||||
while (pWriter->dReader.iRow < pWriter->dReader.bData.nRow ||
|
||||
pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem) {
|
||||
// Merge row by row
|
||||
for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
|
||||
TSDBROW trow = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
|
||||
TSDBKEY tKey = TSDBROW_KEY(&trow);
|
||||
|
||||
ASSERT(pWriter->dReader.bData.suid == id.suid && pWriter->dReader.bData.uid == id.uid);
|
||||
|
||||
int32_t c = tsdbKeyCmprFn(&key, &tKey);
|
||||
if (c < 0) {
|
||||
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
|
||||
|
@ -1345,19 +933,21 @@ static int32_t tsdbSnapWriteWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iR
|
|||
}
|
||||
}
|
||||
|
||||
// merge with dataBlk in whole
|
||||
// Merge row by block
|
||||
SDataBlk tDataBlk = {.minKey = key, .maxKey = key};
|
||||
for (pWriter->dReader.iBlockIdx++; pWriter->dReader.iBlockIdx < pWriter->dReader.mDataBlk.nItem;
|
||||
pWriter->dReader.iBlockIdx++) {
|
||||
for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) {
|
||||
SDataBlk dataBlk;
|
||||
tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iBlockIdx, &dataBlk, tGetDataBlk);
|
||||
tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk);
|
||||
|
||||
int32_t c = tDataBlkCmprFn(&dataBlk, &tDataBlk);
|
||||
|
||||
if (c < 0) {
|
||||
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
|
||||
pWriter->cmprAlg);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
|
||||
if (code) goto _err;
|
||||
} else if (c < 0) {
|
||||
} else if (c > 0) {
|
||||
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
|
||||
if (code) goto _err;
|
||||
|
||||
|
@ -1372,35 +962,53 @@ static int32_t tsdbSnapWriteWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iR
|
|||
} else {
|
||||
code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData);
|
||||
if (code) goto _err;
|
||||
pWriter->dReader.iRow = 0;
|
||||
|
||||
goto _merge_block;
|
||||
pWriter->dReader.iDataBlk++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
code = tsdbSnapNextTableData(pWriter);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) {
|
||||
static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
SBlockData* pBlockData = &pWriter->bData;
|
||||
TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]};
|
||||
TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow);
|
||||
TABLEID id = {.suid = pWriter->bData.suid,
|
||||
.uid = pWriter->bData.uid ? pWriter->bData.uid : pWriter->bData.aUid[iRow]};
|
||||
TSDBROW row = tsdbRowFromBlockData(&pWriter->bData, iRow);
|
||||
SBlockData* pBData = &pWriter->dWriter.sData;
|
||||
|
||||
code = tBlockDataAppendRow(&pWriter->dWriter.sData, &row, NULL, id.uid);
|
||||
if (pBData->suid || pBData->uid) {
|
||||
if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) {
|
||||
code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg);
|
||||
if (code) goto _err;
|
||||
|
||||
pBData->suid = 0;
|
||||
pBData->uid = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (pBData->suid == 0 && pBData->uid == 0) {
|
||||
code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tBlockDataInit(pBData, pWriter->id.suid, pWriter->id.suid ? 0 : pWriter->id.uid, pWriter->skmTable.pTSchema);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
code = tBlockDataAppendRow(pBData, &row, NULL, id.uid);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pWriter->dWriter.sData.nRow >= pWriter->maxRow) {
|
||||
code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.sData, pWriter->dWriter.aSttBlk,
|
||||
pWriter->cmprAlg);
|
||||
if (pBData->nRow >= pWriter->maxRow) {
|
||||
code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
|
@ -1418,7 +1026,7 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) {
|
|||
TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]};
|
||||
|
||||
// End last table data write if need
|
||||
if (id.suid != pWriter->id.suid || id.uid != pWriter->id.uid) {
|
||||
if (tTABLEIDCmprFn(&pWriter->id, &id) != 0) {
|
||||
code = tsdbSnapWriteTableDataEnd(pWriter);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
@ -1431,18 +1039,22 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) {
|
|||
|
||||
// Merge with .data file data
|
||||
int8_t done = 0;
|
||||
code = tsdbSnapWriteWriteToDataFile(pWriter, iRow, &done);
|
||||
if (code) goto _err;
|
||||
if (pWriter->dReader.pBlockIdx && tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &id) == 0) {
|
||||
code = tsdbSnapWriteToDataFile(pWriter, iRow, &done);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
// Append to the .stt data block (todo: check if need to set/reload sst block)
|
||||
if (!done) {
|
||||
code = tsdbSnapWriteWriteToSttFile(pWriter, iRow);
|
||||
code = tsdbSnapWriteToSttFile(pWriter, iRow);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1456,6 +1068,8 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
|
|||
code = tDecmprBlockData(pHdr->data, pHdr->size, pBlockData, pWriter->aBuf);
|
||||
if (code) goto _err;
|
||||
|
||||
ASSERT(pBlockData->nRow > 0);
|
||||
|
||||
// Loop to handle each row
|
||||
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
|
||||
TSKEY ts = pBlockData->aTSKEY[iRow];
|
||||
|
@ -1655,39 +1269,36 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
|
|||
pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
|
||||
pWriter->commitID = pTsdb->pVnode->state.commitID;
|
||||
|
||||
// for data file
|
||||
// SNAP_DATA_TSDB
|
||||
code = tBlockDataCreate(&pWriter->bData);
|
||||
if (code) goto _err;
|
||||
|
||||
#if 0
|
||||
pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
||||
if (pWriter->aBlockIdx == NULL) {
|
||||
pWriter->fid = INT32_MIN;
|
||||
pWriter->id = (TABLEID){0};
|
||||
// Reader
|
||||
pWriter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
||||
if (pWriter->dReader.aBlockIdx == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
code = tBlockDataCreate(&pWriter->bDataR);
|
||||
code = tBlockDataCreate(&pWriter->dReader.bData);
|
||||
if (code) goto _err;
|
||||
|
||||
pWriter->aSstBlk = taosArrayInit(0, sizeof(SSttBlk));
|
||||
if (pWriter->aSstBlk == NULL) {
|
||||
// Writer
|
||||
pWriter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
||||
if (pWriter->dWriter.aBlockIdx == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pWriter->aBlockIdxW = taosArrayInit(0, sizeof(SBlockIdx));
|
||||
if (pWriter->aBlockIdxW == NULL) {
|
||||
pWriter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
|
||||
if (pWriter->dWriter.aSttBlk == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
code = tBlockDataCreate(&pWriter->bDataW);
|
||||
code = tBlockDataCreate(&pWriter->dWriter.bData);
|
||||
if (code) goto _err;
|
||||
code = tBlockDataCreate(&pWriter->dWriter.sData);
|
||||
if (code) goto _err;
|
||||
|
||||
pWriter->aBlockLW = taosArrayInit(0, sizeof(SSttBlk));
|
||||
if (pWriter->aBlockLW == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
#endif
|
||||
|
||||
// SNAP_DATA_DEL
|
||||
pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx));
|
||||
|
@ -1721,14 +1332,17 @@ _err:
|
|||
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
|
||||
int32_t code = 0;
|
||||
STsdbSnapWriter* pWriter = *ppWriter;
|
||||
STsdb* pTsdb = pWriter->pTsdb;
|
||||
|
||||
if (rollback) {
|
||||
ASSERT(0);
|
||||
// code = tsdbFSRollback(pWriter->pTsdb->pFS);
|
||||
// if (code) goto _err;
|
||||
} else {
|
||||
// code = tsdbSnapWriteDataEnd(pWriter);
|
||||
if (code) goto _err;
|
||||
if (pWriter->dWriter.pWriter) {
|
||||
code = tsdbSnapWriteCloseFile(pWriter);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
code = tsdbSnapWriteDelEnd(pWriter);
|
||||
if (code) goto _err;
|
||||
|
@ -1736,14 +1350,44 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
|
|||
code = tsdbFSCommit1(pWriter->pTsdb, &pWriter->fs);
|
||||
if (code) goto _err;
|
||||
|
||||
// lock
|
||||
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||
|
||||
code = tsdbFSCommit2(pWriter->pTsdb, &pWriter->fs);
|
||||
if (code) goto _err;
|
||||
if (code) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// unlock
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
}
|
||||
|
||||
// SNAP_DATA_DEL
|
||||
taosArrayDestroy(pWriter->aDelIdxW);
|
||||
taosArrayDestroy(pWriter->aDelData);
|
||||
taosArrayDestroy(pWriter->aDelIdxR);
|
||||
|
||||
// SNAP_DATA_TSDB
|
||||
|
||||
// Writer
|
||||
tBlockDataDestroy(&pWriter->dWriter.sData, 1);
|
||||
tBlockDataDestroy(&pWriter->dWriter.bData, 1);
|
||||
taosArrayDestroy(pWriter->dWriter.aSttBlk);
|
||||
tMapDataClear(&pWriter->dWriter.mDataBlk);
|
||||
taosArrayDestroy(pWriter->dWriter.aBlockIdx);
|
||||
|
||||
// Reader
|
||||
tBlockDataDestroy(&pWriter->dReader.bData, 1);
|
||||
tMapDataClear(&pWriter->dReader.mDataBlk);
|
||||
taosArrayDestroy(pWriter->dReader.aBlockIdx);
|
||||
|
||||
tBlockDataDestroy(&pWriter->bData, 1);
|
||||
tTSchemaDestroy(pWriter->skmTable.pTSchema);
|
||||
|
||||
for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
|
||||
tFree(pWriter->aBuf[iBuf]);
|
||||
}
|
||||
|
||||
tsdbInfo("vgId:%d, vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
|
||||
taosMemoryFree(pWriter);
|
||||
*ppWriter = NULL;
|
||||
|
@ -1769,7 +1413,7 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
|
|||
goto _exit;
|
||||
} else {
|
||||
if (pWriter->dWriter.pWriter) {
|
||||
// code = tsdbSnapWriteDataEnd(pWriter);
|
||||
code = tsdbSnapWriteCloseFile(pWriter);
|
||||
if (code) goto _err;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue