more code
This commit is contained in:
parent
65f8c4c535
commit
8ff1eb41d1
|
@ -698,6 +698,7 @@ struct SDiskDataBuilder {
|
||||||
int32_t nBuilder;
|
int32_t nBuilder;
|
||||||
SArray *aBuilder; // SArray<SDiskColBuilder>
|
SArray *aBuilder; // SArray<SDiskColBuilder>
|
||||||
uint8_t *aBuf[2];
|
uint8_t *aBuf[2];
|
||||||
|
SDiskData dd;
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||||
|
|
|
@ -649,6 +649,45 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilder, SArray *aSttBlk) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
if (pBuilder->nRow == 0) return code;
|
||||||
|
|
||||||
|
SSttBlk sttBlk = {.suid = pBuilder->suid,
|
||||||
|
.minUid = 0, // todo
|
||||||
|
.maxUid = 0, // todo
|
||||||
|
.minKey = 0, // todo
|
||||||
|
.maxKey = 0, // todo
|
||||||
|
.minVer = 0, // todo
|
||||||
|
.maxVer = 0, // todo
|
||||||
|
.nRow = pBuilder->nRow};
|
||||||
|
|
||||||
|
// gnrt
|
||||||
|
code = tGnrtDiskData(pBuilder, &pBuilder->dd);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
// write
|
||||||
|
// code = tsdbWriteDiskData(pWriter, &pBuilder->dd);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
// push
|
||||||
|
if (taosArrayPush(aSttBlk, &sttBlk) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
// clear (todo)
|
||||||
|
// tDiskDataBuilderClear(pBuilder);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
|
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -1315,24 +1354,22 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) {
|
static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
SDiskDataBuilder *pBuilder = pCommitter->dWriter.pBuilder;
|
SDiskDataBuilder *pBuilder = pCommitter->dWriter.pBuilder;
|
||||||
if (pBuilder->suid || pBuilder->uid) {
|
if (pBuilder->suid || pBuilder->uid) {
|
||||||
if (!TABLE_SAME_SCHEMA(pBuilder->suid, pBuilder->uid, id.suid, id.uid)) {
|
if (!TABLE_SAME_SCHEMA(pBuilder->suid, pBuilder->uid, id.suid, id.uid)) {
|
||||||
// code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk,
|
code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pBuilder, pCommitter->dWriter.aSttBlk);
|
||||||
// pCommitter->cmprAlg); // todo
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
// tBlockDataReset(pBDatal);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pBuilder->suid && !pBuilder->uid) {
|
if (!pBuilder->suid && !pBuilder->uid) {
|
||||||
ASSERT(pCommitter->skmTable.suid == id.suid);
|
ASSERT(pCommitter->skmTable.suid == id.suid);
|
||||||
ASSERT(pCommitter->skmTable.uid == id.uid);
|
ASSERT(pCommitter->skmTable.uid == id.uid);
|
||||||
// code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema); todo
|
code = tDiskDataBuilderInit(pBuilder, pCommitter->skmTable.pTSchema, &id, pCommitter->cmprAlg, 0);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1351,7 +1388,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
|
||||||
SBlockData *pBData = &pCommitter->dWriter.bData;
|
SBlockData *pBData = &pCommitter->dWriter.bData;
|
||||||
|
|
||||||
TABLEID id = {.suid = pBData->suid, .uid = pBData->uid};
|
TABLEID id = {.suid = pBData->suid, .uid = pBData->uid};
|
||||||
code = tsdbInitLastBlockIfNeed(pCommitter, id);
|
code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) {
|
for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) {
|
||||||
|
@ -1361,8 +1398,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
|
if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
|
||||||
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, NULL /*TODO */, pCommitter->dWriter.aSttBlk,
|
code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
|
||||||
pCommitter->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1409,8 +1445,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
|
if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
|
||||||
// code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk,
|
code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
|
||||||
// pCommitter->cmprAlg); (todo)
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1515,8 +1550,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
|
||||||
code = tsdbMoveCommitData(pCommitter, id);
|
code = tsdbMoveCommitData(pCommitter, id);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
|
code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
|
||||||
// pCommitter->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
|
Loading…
Reference in New Issue