make it compile
This commit is contained in:
parent
98bc07bb2a
commit
de97c9253a
|
@ -439,26 +439,6 @@ struct STsdbSnapWriter {
|
||||||
SArray* aDelIdxW;
|
SArray* aDelIdxW;
|
||||||
};
|
};
|
||||||
|
|
||||||
#if 0
|
|
||||||
static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t iRow = 0; // todo
|
|
||||||
int32_t nRow = 0; // todo
|
|
||||||
SBlockData* pBlockData = NULL; // todo
|
|
||||||
|
|
||||||
while (iRow < nRow) {
|
|
||||||
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
tsdbError("vgId:%d, tsdb snapshot write append data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
|
|
||||||
pWriter->pTsdb->path, tstrerror(code));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -470,13 +450,14 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
if (pWriter->pBlockData) {
|
if (pWriter->pBlockData) {
|
||||||
ASSERT(pWriter->iRow < pWriter->pBlockData->nRow);
|
ASSERT(pWriter->iRow < pWriter->pBlockData->nRow);
|
||||||
while (pWriter->iRow < pWriter->pBlockData->nRow) {
|
while (pWriter->iRow < pWriter->pBlockData->nRow) {
|
||||||
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL);
|
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL,
|
||||||
|
0); // todo
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) {
|
if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) {
|
||||||
pWriter->blockW.last = 0;
|
// pWriter->blockW.last = 0;
|
||||||
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
|
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
|
||||||
&pWriter->blockW, pWriter->cmprAlg);
|
// &pWriter->blockW, pWriter->cmprAlg);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
|
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
|
||||||
|
@ -492,16 +473,16 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
|
|
||||||
// write remain data if has
|
// write remain data if has
|
||||||
if (pWriter->bDataW.nRow > 0) {
|
if (pWriter->bDataW.nRow > 0) {
|
||||||
pWriter->blockW.last = 0;
|
// pWriter->blockW.last = 0;
|
||||||
if (pWriter->bDataW.nRow < pWriter->minRow) {
|
if (pWriter->bDataW.nRow < pWriter->minRow) {
|
||||||
if (pWriter->iBlock > pWriter->mBlock.nItem) {
|
if (pWriter->iBlock > pWriter->mBlock.nItem) {
|
||||||
pWriter->blockW.last = 1;
|
// pWriter->blockW.last = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
|
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
|
||||||
&pWriter->blockW, pWriter->cmprAlg);
|
// &pWriter->blockW, pWriter->cmprAlg);
|
||||||
if (code) goto _err;
|
// if (code) goto _err;
|
||||||
|
|
||||||
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
|
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
@ -513,16 +494,16 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
SBlock block;
|
SBlock block;
|
||||||
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
|
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
|
||||||
|
|
||||||
if (block.last) {
|
// if (block.last) {
|
||||||
code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
|
// code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
|
||||||
if (code) goto _err;
|
// if (code) goto _err;
|
||||||
|
|
||||||
tBlockReset(&block);
|
// tBlockReset(&block);
|
||||||
block.last = 1;
|
// block.last = 1;
|
||||||
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pWriter->pBlockIdxW, &block,
|
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pWriter->pBlockIdxW, &block,
|
||||||
pWriter->cmprAlg);
|
// pWriter->cmprAlg);
|
||||||
if (code) goto _err;
|
// if (code) goto _err;
|
||||||
}
|
// }
|
||||||
|
|
||||||
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
|
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
@ -531,8 +512,8 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SBlock
|
// SBlock
|
||||||
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW);
|
// code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW);
|
||||||
if (code) goto _err;
|
// if (code) goto _err;
|
||||||
|
|
||||||
// SBlockIdx
|
// SBlockIdx
|
||||||
if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) {
|
if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) {
|
||||||
|
@ -562,16 +543,17 @@ static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* p
|
||||||
for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) {
|
for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) {
|
||||||
tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock);
|
tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock);
|
||||||
|
|
||||||
if (block.last) {
|
// if (block.last) {
|
||||||
code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
|
// code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
|
||||||
if (code) goto _err;
|
// if (code) goto _err;
|
||||||
|
|
||||||
tBlockReset(&block);
|
// tBlockReset(&block);
|
||||||
block.last = 1;
|
// block.last = 1;
|
||||||
code =
|
// code =
|
||||||
tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block, pWriter->cmprAlg);
|
// tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block,
|
||||||
if (code) goto _err;
|
// pWriter->cmprAlg);
|
||||||
}
|
// if (code) goto _err;
|
||||||
|
// }
|
||||||
|
|
||||||
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
|
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
@ -579,7 +561,7 @@ static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* p
|
||||||
|
|
||||||
// SBlock
|
// SBlock
|
||||||
SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
|
SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
|
||||||
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, &blockIdx);
|
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, &blockIdx);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// SBlockIdx
|
// SBlockIdx
|
||||||
|
@ -604,9 +586,9 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
|
||||||
TSDBROW row;
|
TSDBROW row;
|
||||||
TSDBROW* pRow = &row;
|
TSDBROW* pRow = &row;
|
||||||
|
|
||||||
// correct schema
|
// // correct schema
|
||||||
code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData);
|
// code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData);
|
||||||
if (code) goto _err;
|
// if (code) goto _err;
|
||||||
|
|
||||||
// loop to merge
|
// loop to merge
|
||||||
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
|
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
|
||||||
|
@ -621,8 +603,8 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
|
||||||
ASSERT(c);
|
ASSERT(c);
|
||||||
|
|
||||||
if (c < 0) {
|
if (c < 0) {
|
||||||
code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
|
// code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
|
||||||
if (code) goto _err;
|
// if (code) goto _err;
|
||||||
|
|
||||||
iRow++;
|
iRow++;
|
||||||
if (iRow < pWriter->pBlockData->nRow) {
|
if (iRow < pWriter->pBlockData->nRow) {
|
||||||
|
@ -631,8 +613,8 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
|
||||||
pRow = NULL;
|
pRow = NULL;
|
||||||
}
|
}
|
||||||
} else if (c > 0) {
|
} else if (c > 0) {
|
||||||
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL);
|
// code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow),
|
||||||
if (code) goto _err;
|
// NULL); if (code) goto _err;
|
||||||
|
|
||||||
pWriter->iRow++;
|
pWriter->iRow++;
|
||||||
if (pWriter->iRow >= pWriter->pBlockData->nRow) {
|
if (pWriter->iRow >= pWriter->pBlockData->nRow) {
|
||||||
|
@ -650,16 +632,15 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
|
||||||
|
|
||||||
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
|
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
|
||||||
|
|
||||||
if (block.last) {
|
// if (block.last) {
|
||||||
pWriter->pBlockData = &pWriter->bDataR;
|
// pWriter->pBlockData = &pWriter->bDataR;
|
||||||
|
|
||||||
code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL);
|
// code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL,
|
||||||
if (code) goto _err;
|
// NULL); if (code) goto _err; pWriter->iRow = 0;
|
||||||
pWriter->iRow = 0;
|
|
||||||
|
|
||||||
pWriter->iBlock++;
|
// pWriter->iBlock++;
|
||||||
break;
|
// break;
|
||||||
}
|
// }
|
||||||
|
|
||||||
c = tsdbKeyCmprFn(&block.maxKey, &key);
|
c = tsdbKeyCmprFn(&block.maxKey, &key);
|
||||||
|
|
||||||
|
@ -667,10 +648,10 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
|
||||||
|
|
||||||
if (c < 0) {
|
if (c < 0) {
|
||||||
if (pWriter->bDataW.nRow) {
|
if (pWriter->bDataW.nRow) {
|
||||||
pWriter->blockW.last = 0;
|
// pWriter->blockW.last = 0;
|
||||||
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
|
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
|
||||||
&pWriter->blockW, pWriter->cmprAlg);
|
// &pWriter->blockW, pWriter->cmprAlg);
|
||||||
if (code) goto _err;
|
// if (code) goto _err;
|
||||||
|
|
||||||
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
|
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
@ -690,9 +671,10 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
|
||||||
|
|
||||||
if (c > 0) {
|
if (c > 0) {
|
||||||
pWriter->pBlockData = &pWriter->bDataR;
|
pWriter->pBlockData = &pWriter->bDataR;
|
||||||
code =
|
// code =
|
||||||
tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL);
|
// tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL,
|
||||||
if (code) goto _err;
|
// NULL);
|
||||||
|
// if (code) goto _err;
|
||||||
pWriter->iRow = 0;
|
pWriter->iRow = 0;
|
||||||
|
|
||||||
pWriter->iBlock++;
|
pWriter->iBlock++;
|
||||||
|
@ -703,8 +685,8 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
|
||||||
|
|
||||||
if (pWriter->pBlockData) continue;
|
if (pWriter->pBlockData) continue;
|
||||||
|
|
||||||
code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
|
// code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
|
||||||
if (code) goto _err;
|
// if (code) goto _err;
|
||||||
|
|
||||||
iRow++;
|
iRow++;
|
||||||
if (iRow < pBlockData->nRow) {
|
if (iRow < pBlockData->nRow) {
|
||||||
|
@ -718,9 +700,9 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
|
||||||
if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue;
|
if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue;
|
||||||
|
|
||||||
_write_block:
|
_write_block:
|
||||||
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
|
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
|
||||||
&pWriter->blockW, pWriter->cmprAlg);
|
// &pWriter->blockW, pWriter->cmprAlg);
|
||||||
if (code) goto _err;
|
// if (code) goto _err;
|
||||||
|
|
||||||
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
|
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
@ -876,8 +858,8 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
|
||||||
|
|
||||||
// decode
|
// decode
|
||||||
SBlockData* pBlockData = &pWriter->bData;
|
SBlockData* pBlockData = &pWriter->bData;
|
||||||
n = tGetBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pBlockData);
|
// n = tGetBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pBlockData);
|
||||||
ASSERT(n + sizeof(SSnapDataHdr) + sizeof(TABLEID) == nData);
|
// ASSERT(n + sizeof(SSnapDataHdr) + sizeof(TABLEID) == nData);
|
||||||
|
|
||||||
// open file
|
// open file
|
||||||
TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData);
|
TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData);
|
||||||
|
@ -922,16 +904,16 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
|
||||||
if (pSet) {
|
if (pSet) {
|
||||||
wSet.diskId = pSet->diskId;
|
wSet.diskId = pSet->diskId;
|
||||||
wSet.fid = fid;
|
wSet.fid = fid;
|
||||||
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0, .loffset = 0};
|
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0, .offset = 0};
|
||||||
fData = *pSet->pDataF;
|
fData = *pSet->pDataF;
|
||||||
fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0};
|
fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0};
|
||||||
fSma = *pSet->pSmaF;
|
fSma = *pSet->pSmaF;
|
||||||
} else {
|
} else {
|
||||||
wSet.diskId = (SDiskID){.level = 0, .id = 0};
|
wSet.diskId = (SDiskID){.level = 0, .id = 0};
|
||||||
wSet.fid = fid;
|
wSet.fid = fid;
|
||||||
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0, .loffset = 0};
|
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
|
||||||
fData = (SDataFile){.commitID = pWriter->commitID, .size = 0};
|
fData = (SDataFile){.commitID = pWriter->commitID, .size = 0};
|
||||||
fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0};
|
fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0, .offset = 0};
|
||||||
fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0};
|
fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -956,7 +938,6 @@ _err:
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -1206,7 +1187,6 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
||||||
|
|
||||||
#if 0
|
|
||||||
// ts data
|
// ts data
|
||||||
if (pHdr->type == SNAP_DATA_TSDB) {
|
if (pHdr->type == SNAP_DATA_TSDB) {
|
||||||
code = tsdbSnapWriteData(pWriter, pData, nData);
|
code = tsdbSnapWriteData(pWriter, pData, nData);
|
||||||
|
@ -1219,7 +1199,6 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
// del data
|
// del data
|
||||||
if (pHdr->type == SNAP_DATA_DEL) {
|
if (pHdr->type == SNAP_DATA_DEL) {
|
||||||
|
|
Loading…
Reference in New Issue