more code

This commit is contained in:
Hongze Cheng 2023-04-11 15:01:42 +08:00
parent a72633d715
commit 2f51c609f4
2 changed files with 164 additions and 74 deletions

View File

@ -249,14 +249,27 @@ static int32_t end_commit_file_set(SCommitter *pCommitter) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tsdbSttFWriterClose(&pCommitter->pWriter, 0, pFileOp); TSDB_CHECK_CODE( //
TSDB_CHECK_CODE(code, lino, _exit); code = tsdbSttFWriterClose( //
&pCommitter->pWriter, //
0, //
pFileOp), //
lino, //
_exit);
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code)); tsdbError( //
"vgId:%d failed at line %d since %s", //
TD_VID(pCommitter->pTsdb->pVnode), //
lino, //
tstrerror(code));
} else { } else {
tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid); tsdbDebug( //
"vgId:%d %s done, fid:%d", //
TD_VID(pCommitter->pTsdb->pVnode), //
__func__, //
pCommitter->fid);
} }
return code; return code;
} }
@ -284,7 +297,11 @@ static int32_t commit_next_file_set(SCommitter *pCommitter) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pCommitter->pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code)); tstrerror(code));
} }
return code; return code;

View File

@ -60,8 +60,8 @@ static int32_t write_timeseries_block(struct SSttFWriter *pWriter) {
} }
pSttBlk->suid = pBData->suid; pSttBlk->suid = pBData->suid;
pSttBlk->minUid = pBData->aUid[0]; pSttBlk->minUid = pBData->uid ? pBData->uid : pBData->aUid[0];
pSttBlk->maxUid = pBData->aUid[pBData->nRow - 1]; pSttBlk->maxUid = pBData->uid ? pBData->uid : pBData->aUid[pBData->nRow - 1];
pSttBlk->minKey = pSttBlk->maxKey = pBData->aTSKEY[0]; pSttBlk->minKey = pSttBlk->maxKey = pBData->aTSKEY[0];
pSttBlk->minVer = pSttBlk->maxVer = pBData->aVersion[0]; pSttBlk->minVer = pSttBlk->maxVer = pBData->aVersion[0];
pSttBlk->nRow = pBData->nRow; pSttBlk->nRow = pBData->nRow;
@ -72,7 +72,6 @@ static int32_t write_timeseries_block(struct SSttFWriter *pWriter) {
if (pSttBlk->maxVer < pBData->aVersion[iRow]) pSttBlk->maxVer = pBData->aVersion[iRow]; if (pSttBlk->maxVer < pBData->aVersion[iRow]) pSttBlk->maxVer = pBData->aVersion[iRow];
} }
// compress data block
TSDB_CHECK_CODE( // TSDB_CHECK_CODE( //
code = tCmprBlockData( // code = tCmprBlockData( //
pBData, // pBData, //
@ -178,6 +177,8 @@ static int32_t write_delete_block(struct SSttFWriter *pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
ASSERTS(0, "TODO: Not implemented yet");
SDelBlk *pDelBlk = taosArrayReserve(pWriter->aDelBlk, 1); SDelBlk *pDelBlk = taosArrayReserve(pWriter->aDelBlk, 1);
if (pDelBlk == NULL) { if (pDelBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
@ -195,16 +196,16 @@ static int32_t write_delete_block(struct SSttFWriter *pWriter) {
if (pDelBlk->maxVer < pWriter->sData.aData[2][iRow]) pDelBlk->maxVer = pWriter->sData.aData[2][iRow]; if (pDelBlk->maxVer < pWriter->sData.aData[2][iRow]) pDelBlk->maxVer = pWriter->sData.aData[2][iRow];
} }
pDelBlk->dp.offset = pWriter->config.file.size; pDelBlk->dp.offset = pWriter->tFile.size;
pDelBlk->dp.size = 0; // TODO pDelBlk->dp.size = 0; // TODO
int64_t tsize = sizeof(int64_t) * pWriter->dData.nRow; int64_t tsize = sizeof(int64_t) * pWriter->dData.nRow;
for (int32_t i = 0; i < ARRAY_SIZE(pWriter->dData.aData); i++) { for (int32_t i = 0; i < ARRAY_SIZE(pWriter->dData.aData); i++) {
code = tsdbWriteFile(pWriter->pFd, pWriter->config.file.size, (const uint8_t *)pWriter->dData.aData[i], tsize); code = tsdbWriteFile(pWriter->pFd, pWriter->tFile.size, (const uint8_t *)pWriter->dData.aData[i], tsize);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
pDelBlk->dp.size += tsize; pDelBlk->dp.size += tsize;
pWriter->config.file.size += tsize; pWriter->tFile.size += tsize;
} }
tDelBlockDestroy(&pWriter->dData); tDelBlockDestroy(&pWriter->dData);
@ -223,20 +224,29 @@ static int32_t write_stt_blk(struct SSttFWriter *pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
pWriter->footer.dict[1].offset = pWriter->config.file.size; pWriter->footer.dict[1].offset = pWriter->tFile.size;
pWriter->footer.dict[1].size = sizeof(SSttBlk) * taosArrayGetSize(pWriter->aSttBlk); pWriter->footer.dict[1].size = sizeof(SSttBlk) * taosArrayGetSize(pWriter->aSttBlk);
if (pWriter->footer.dict[1].size) { if (pWriter->footer.dict[1].size) {
code = tsdbWriteFile(pWriter->pFd, pWriter->config.file.size, TARRAY_DATA(pWriter->aSttBlk), TSDB_CHECK_CODE( //
pWriter->footer.dict[1].size); code = tsdbWriteFile( //
TSDB_CHECK_CODE(code, lino, _exit); pWriter->pFd, //
pWriter->tFile.size, //
TARRAY_DATA(pWriter->aSttBlk), //
pWriter->footer.dict[1].size), //
lino, //
_exit);
pWriter->config.file.size += pWriter->footer.dict[1].size; pWriter->tFile.size += pWriter->footer.dict[1].size;
} }
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pWriter->config.pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code)); tstrerror(code));
} }
return code; return code;
@ -246,20 +256,29 @@ static int32_t write_statistics_blk(struct SSttFWriter *pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
pWriter->footer.dict[2].offset = pWriter->config.file.size; pWriter->footer.dict[2].offset = pWriter->tFile.size;
pWriter->footer.dict[2].size = sizeof(STbStatisBlock) * taosArrayGetSize(pWriter->aStatisBlk); pWriter->footer.dict[2].size = sizeof(STbStatisBlock) * taosArrayGetSize(pWriter->aStatisBlk);
if (pWriter->footer.dict[2].size) { if (pWriter->footer.dict[2].size) {
code = tsdbWriteFile(pWriter->pFd, pWriter->config.file.size, TARRAY_DATA(pWriter->aStatisBlk), TSDB_CHECK_CODE( //
pWriter->footer.dict[2].size); code = tsdbWriteFile( //
TSDB_CHECK_CODE(code, lino, _exit); pWriter->pFd, //
pWriter->tFile.size, //
TARRAY_DATA(pWriter->aStatisBlk), //
pWriter->footer.dict[2].size), //
lino, //
_exit);
pWriter->config.file.size += pWriter->footer.dict[2].size; pWriter->tFile.size += pWriter->footer.dict[2].size;
} }
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pWriter->config.pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code)); tstrerror(code));
} }
return code; return code;
@ -269,29 +288,41 @@ static int32_t write_del_blk(struct SSttFWriter *pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
pWriter->footer.dict[3].offset = pWriter->config.file.size; pWriter->footer.dict[3].offset = pWriter->tFile.size;
pWriter->footer.dict[3].size = sizeof(SDelBlk) * taosArrayGetSize(pWriter->aDelBlk); pWriter->footer.dict[3].size = sizeof(SDelBlk) * taosArrayGetSize(pWriter->aDelBlk);
if (pWriter->footer.dict[3].size) { if (pWriter->footer.dict[3].size) {
code = tsdbWriteFile(pWriter->pFd, pWriter->config.file.size, TARRAY_DATA(pWriter->aDelBlk), TSDB_CHECK_CODE( //
pWriter->footer.dict[3].size); code = tsdbWriteFile( //
TSDB_CHECK_CODE(code, lino, _exit); pWriter->pFd, //
pWriter->tFile.size, //
TARRAY_DATA(pWriter->aDelBlk), //
pWriter->footer.dict[3].size), //
lino, //
_exit);
pWriter->config.file.size += pWriter->footer.dict[3].size; pWriter->tFile.size += pWriter->footer.dict[3].size;
} }
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pWriter->config.pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code)); tstrerror(code));
} }
return code; return code;
} }
static int32_t write_file_footer(struct SSttFWriter *pWriter) { static int32_t write_file_footer(struct SSttFWriter *pWriter) {
int32_t code = tsdbWriteFile(pWriter->pFd, pWriter->config.file.size, (const uint8_t *)&pWriter->footer, int32_t code = tsdbWriteFile( //
pWriter->pFd, //
pWriter->tFile.size, //
(const uint8_t *)&pWriter->footer, //
sizeof(pWriter->footer)); sizeof(pWriter->footer));
pWriter->config.file.size += sizeof(pWriter->footer); pWriter->tFile.size += sizeof(pWriter->footer);
return code; return code;
} }
@ -350,18 +381,19 @@ _exit:
static int32_t destroy_stt_fwriter(struct SSttFWriter *pWriter) { static int32_t destroy_stt_fwriter(struct SSttFWriter *pWriter) {
if (pWriter) { if (pWriter) {
for (int32_t i = 0; ARRAY_SIZE(pWriter->aBuf); i++) tFree(pWriter->aBuf[i]); for (int32_t i = 0; i < ARRAY_SIZE(pWriter->aBuf); i++) {
tFree(pWriter->aBuf[i]);
}
tDestroyTSchema(pWriter->skmRow.pTSchema); tDestroyTSchema(pWriter->skmRow.pTSchema);
tDestroyTSchema(pWriter->skmTb.pTSchema); tDestroyTSchema(pWriter->skmTb.pTSchema);
// statistics data block
taosArrayDestroy(pWriter->aStatisBlk);
tTbStatisBlockDestroy(&pWriter->sData); tTbStatisBlockDestroy(&pWriter->sData);
// deleted data block
taosArrayDestroy(pWriter->aDelBlk);
tDelBlockDestroy(&pWriter->dData); tDelBlockDestroy(&pWriter->dData);
// time-series data block
taosArrayDestroy(pWriter->aSttBlk);
tBlockDataDestroy(&pWriter->bData); tBlockDataDestroy(&pWriter->bData);
taosArrayDestroy(pWriter->aStatisBlk);
taosArrayDestroy(pWriter->aDelBlk);
taosArrayDestroy(pWriter->aSttBlk);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
} }
return 0; return 0;
@ -453,45 +485,86 @@ int32_t tsdbSttFWriterClose(struct SSttFWriter **ppWriter, int8_t abort, struct
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
if (!abort) {
if (ppWriter[0]->bData.nRow > 0) { if (ppWriter[0]->bData.nRow > 0) {
code = write_timeseries_block(ppWriter[0]); TSDB_CHECK_CODE( //
TSDB_CHECK_CODE(code, lino, _exit); code = write_timeseries_block(ppWriter[0]), //
lino, //
_exit);
} }
if (ppWriter[0]->sData.nRow > 0) { if (ppWriter[0]->sData.nRow > 0) {
code = write_statistics_block(ppWriter[0]); TSDB_CHECK_CODE( //
TSDB_CHECK_CODE(code, lino, _exit); code = write_statistics_block(ppWriter[0]), //
lino, //
_exit);
} }
if (ppWriter[0]->dData.nRow > 0) { if (ppWriter[0]->dData.nRow > 0) {
code = write_delete_block(ppWriter[0]); TSDB_CHECK_CODE( //
TSDB_CHECK_CODE(code, lino, _exit); code = write_delete_block(ppWriter[0]), //
lino, //
_exit);
} }
code = write_stt_blk(ppWriter[0]); TSDB_CHECK_CODE( //
TSDB_CHECK_CODE(code, lino, _exit); code = write_stt_blk(ppWriter[0]), //
lino, //
_exit);
code = write_statistics_blk(ppWriter[0]); TSDB_CHECK_CODE( //
TSDB_CHECK_CODE(code, lino, _exit); code = write_statistics_blk(ppWriter[0]), //
lino, //
_exit);
code = write_del_blk(ppWriter[0]); TSDB_CHECK_CODE( //
TSDB_CHECK_CODE(code, lino, _exit); code = write_del_blk(ppWriter[0]), //
lino, //
_exit);
code = write_file_footer(ppWriter[0]); TSDB_CHECK_CODE( //
TSDB_CHECK_CODE(code, lino, _exit); code = write_file_footer(ppWriter[0]), //
lino, //
_exit);
code = write_file_header(ppWriter[0]); TSDB_CHECK_CODE( //
TSDB_CHECK_CODE(code, lino, _exit); code = write_file_header(ppWriter[0]), //
lino, //
_exit);
code = close_stt_fwriter(ppWriter[0]); TSDB_CHECK_CODE( //
TSDB_CHECK_CODE(code, lino, _exit); code = tsdbFsyncFile(ppWriter[0]->pFd), //
lino, //
_exit);
if (op) {
op->oState = ppWriter[0]->config.file;
op->nState = ppWriter[0]->tFile;
if (op->oState.size == 0) {
op->op = TSDB_FOP_CREATE;
} else {
op->op = TSDB_FOP_EXTEND;
}
}
}
TSDB_CHECK_CODE( //
code = close_stt_fwriter(ppWriter[0]), //
lino, //
_exit);
destroy_stt_fwriter(ppWriter[0]); destroy_stt_fwriter(ppWriter[0]);
ppWriter[0] = NULL; ppWriter[0] = NULL;
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", vgId, __func__, lino, tstrerror(code)); tsdbError( //
"vgId:%d %s failed at line %d since %s", //
vgId, //
__func__, //
lino, //
tstrerror(code));
} else {
} }
return code; return code;
} }