more code
This commit is contained in:
parent
de97c9253a
commit
94b97c4da4
|
@ -407,6 +407,7 @@ struct STsdbSnapWriter {
|
|||
int8_t cmprAlg;
|
||||
int64_t commitID;
|
||||
|
||||
uint8_t* aBuf[5];
|
||||
// for data file
|
||||
SBlockData bData;
|
||||
|
||||
|
@ -420,6 +421,7 @@ struct STsdbSnapWriter {
|
|||
SBlockData* pBlockData;
|
||||
int32_t iRow;
|
||||
SBlockData bDataR;
|
||||
SArray* aBlockL; // SArray<SBlockL>
|
||||
|
||||
SDataFWriter* pDataFWriter;
|
||||
SBlockIdx* pBlockIdxW; // NULL when no committing table
|
||||
|
@ -429,6 +431,7 @@ struct STsdbSnapWriter {
|
|||
|
||||
SMapData mBlockW; // SMapData<SBlock>
|
||||
SArray* aBlockIdxW; // SArray<SBlockIdx>
|
||||
SArray* aBlockLW; // SArray<SBlockL>
|
||||
|
||||
// for del file
|
||||
SDelFReader* pDelFReader;
|
||||
|
@ -816,9 +819,11 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
|
|||
|
||||
if (pWriter->pDataFWriter == NULL) goto _exit;
|
||||
|
||||
// finish current table
|
||||
code = tsdbSnapWriteTableDataEnd(pWriter);
|
||||
if (code) goto _err;
|
||||
|
||||
// move remain table
|
||||
while (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
|
||||
code = tsdbSnapMoveWriteTableData(pWriter, (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx));
|
||||
if (code) goto _err;
|
||||
|
@ -826,8 +831,16 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
|
|||
pWriter->iBlockIdx++;
|
||||
}
|
||||
|
||||
// write remain stuff
|
||||
if (taosArrayGetSize(pWriter->aBlockLW) > 0) {
|
||||
code = tsdbWriteBlockL(pWriter->pDataFWriter, pWriter->aBlockIdxW);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pWriter->aBlockIdx) > 0) {
|
||||
code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet);
|
||||
if (code) goto _err;
|
||||
|
@ -853,17 +866,20 @@ _err:
|
|||
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||
int32_t code = 0;
|
||||
STsdb* pTsdb = pWriter->pTsdb;
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
||||
TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr));
|
||||
int64_t n;
|
||||
|
||||
// decode
|
||||
SBlockData* pBlockData = &pWriter->bData;
|
||||
// n = tGetBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pBlockData);
|
||||
// ASSERT(n + sizeof(SSnapDataHdr) + sizeof(TABLEID) == nData);
|
||||
code = tDecmprBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pHdr->size - sizeof(TABLEID), pBlockData,
|
||||
pWriter->aBuf);
|
||||
if (code) goto _err;
|
||||
|
||||
// open file
|
||||
TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData);
|
||||
TSDBKEY keyLast = tBlockDataLastKey(pBlockData);
|
||||
TSDBKEY keyFirst = {.version = pBlockData->aVersion[0], .ts = pBlockData->aTSKEY[0]};
|
||||
TSDBKEY keyLast = {.version = pBlockData->aVersion[pBlockData->nRow - 1],
|
||||
.ts = pBlockData->aTSKEY[pBlockData->nRow - 1]};
|
||||
|
||||
int32_t fid = tsdbKeyFid(keyFirst.ts, pWriter->minutes, pWriter->precision);
|
||||
ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision));
|
||||
|
@ -882,9 +898,13 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
|
|||
|
||||
code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbReadBlockL(pWriter->pDataFReader, pWriter->aBlockL);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
ASSERT(pWriter->pDataFReader == NULL);
|
||||
taosArrayClear(pWriter->aBlockIdx);
|
||||
taosArrayClear(pWriter->aBlockL);
|
||||
}
|
||||
pWriter->iBlockIdx = 0;
|
||||
pWriter->pBlockIdx = NULL;
|
||||
|
@ -921,6 +941,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
|
|||
if (code) goto _err;
|
||||
|
||||
taosArrayClear(pWriter->aBlockIdxW);
|
||||
taosArrayClear(pWriter->aBlockLW);
|
||||
tMapDataReset(&pWriter->mBlockW);
|
||||
pWriter->pBlockIdxW = NULL;
|
||||
tBlockDataReset(&pWriter->bDataW);
|
||||
|
@ -1110,6 +1131,12 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
|
|||
code = tBlockDataCreate(&pWriter->bDataR);
|
||||
if (code) goto _err;
|
||||
|
||||
pWriter->aBlockL = taosArrayInit(0, sizeof(SBlockL));
|
||||
if (pWriter->aBlockL == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pWriter->aBlockIdxW = taosArrayInit(0, sizeof(SBlockIdx));
|
||||
if (pWriter->aBlockIdxW == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1118,6 +1145,12 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
|
|||
code = tBlockDataCreate(&pWriter->bDataW);
|
||||
if (code) goto _err;
|
||||
|
||||
pWriter->aBlockLW = taosArrayInit(0, sizeof(SBlockL));
|
||||
if (pWriter->aBlockLW == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// for del file
|
||||
pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx));
|
||||
if (pWriter->aDelIdxR == NULL) {
|
||||
|
@ -1148,7 +1181,6 @@ _err:
|
|||
|
||||
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
|
||||
int32_t code = 0;
|
||||
#if 0
|
||||
STsdbSnapWriter* pWriter = *ppWriter;
|
||||
|
||||
if (rollback) {
|
||||
|
@ -1169,6 +1201,10 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
|
|||
if (code) goto _err;
|
||||
}
|
||||
|
||||
for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
|
||||
tFree(pWriter->aBuf[iBuf]);
|
||||
}
|
||||
|
||||
tsdbInfo("vgId:%d, vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
|
||||
taosMemoryFree(pWriter);
|
||||
*ppWriter = NULL;
|
||||
|
@ -1179,7 +1215,6 @@ _err:
|
|||
pWriter->pTsdb->path, tstrerror(code));
|
||||
taosMemoryFree(pWriter);
|
||||
*ppWriter = NULL;
|
||||
#endif
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue