more code
This commit is contained in:
parent
90b32809c5
commit
3b9e03ba17
|
@ -1000,10 +1000,7 @@ _err:
|
||||||
static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
|
static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
// .data
|
while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) {
|
||||||
while (true) {
|
|
||||||
if (pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) >= 0) break;
|
|
||||||
|
|
||||||
SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
|
SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
|
||||||
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
|
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
@ -1349,6 +1346,11 @@ static int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
|
||||||
return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row);
|
return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) {
|
||||||
|
// TODO
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbNextCommitRow(SCommitter *pCommitter, SRowInfo **ppInfo) {
|
static int32_t tsdbNextCommitRow(SCommitter *pCommitter, SRowInfo **ppInfo) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -1437,6 +1439,66 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx;
|
||||||
|
|
||||||
|
ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0);
|
||||||
|
if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) {
|
||||||
|
int32_t iBlock = 0;
|
||||||
|
SBlock block;
|
||||||
|
SBlock *pBlock = █
|
||||||
|
SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
|
||||||
|
|
||||||
|
ASSERT(pRowInfo->suid == id.suid && pRowInfo->uid == id.uid);
|
||||||
|
|
||||||
|
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
|
||||||
|
while (pBlock && pRowInfo) {
|
||||||
|
SBlock tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)};
|
||||||
|
int32_t c = tBlockCmprFn(pBlock, &tBlock);
|
||||||
|
|
||||||
|
if (c < 0) {
|
||||||
|
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
iBlock++;
|
||||||
|
if (iBlock < pCommitter->dReader.mBlock.nItem) {
|
||||||
|
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
|
||||||
|
} else {
|
||||||
|
pBlock = NULL;
|
||||||
|
}
|
||||||
|
} else if (c > 0) {
|
||||||
|
} else {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
while (pBlock) {
|
||||||
|
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
iBlock++;
|
||||||
|
if (iBlock < pCommitter->dReader.mBlock.nItem) {
|
||||||
|
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
|
||||||
|
} else {
|
||||||
|
pBlock = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("vgId:%d tsdb merge table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbCommitTableData2(SCommitter *pCommitter, TABLEID id) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
|
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -1447,42 +1509,45 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
if (pRowInfo == NULL) {
|
if (pRowInfo == NULL) {
|
||||||
// end the commit (todo)
|
/* end current table data commit (todo) */
|
||||||
|
|
||||||
|
/* end remain table data commit*/
|
||||||
|
code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX});
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
if (pCommitter->dWriter.bDatal.nRow > 0) {
|
||||||
|
code = tsdbCommitLastBlock(pCommitter);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (id.suid != pRowInfo->suid || id.uid != pRowInfo->uid) {
|
if (id.suid != pRowInfo->suid || id.uid != pRowInfo->uid) {
|
||||||
// table changed, end current table commit (todo)
|
/* end current table data commit (todo) */
|
||||||
|
|
||||||
// prepare the new
|
/* start new table data commit */
|
||||||
id.suid = pRowInfo->suid;
|
id.suid = pRowInfo->suid;
|
||||||
id.uid = pRowInfo->uid;
|
id.uid = pRowInfo->uid;
|
||||||
}
|
// reader
|
||||||
|
code = tsdbMoveCommitData(pCommitter, id);
|
||||||
SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx;
|
if (code) goto _err;
|
||||||
if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) {
|
// writer
|
||||||
while (true) {
|
tMapDataReset(&pCommitter->dWriter.mBlock);
|
||||||
/* code */
|
// other
|
||||||
}
|
code = tsdbCommitterUpdateTableSchema(pCommitter, id.suid, id.uid);
|
||||||
}
|
if (code) goto _err;
|
||||||
|
code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmRow.pTSchema);
|
||||||
if (pRowInfo->row.type == 0) {
|
|
||||||
code = tsdbCommitterUpdateRowSchema(pCommitter, pRowInfo->suid, pRowInfo->uid, TSDBROW_SVERSION(&pRowInfo->row));
|
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tBlockDataAppendRow(&pCommitter->dWriter.bData, &pRowInfo->row, pCommitter->skmRow.pTSchema, pRowInfo->uid);
|
/* merge with data in .data file */
|
||||||
|
code = tsdbMergeTableData(pCommitter, id);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) {
|
/* handle remain table data */
|
||||||
if (1 /*toLastOnly*/) {
|
code = tsdbCommitTableData2(pCommitter, id);
|
||||||
code = tsdbCommitLastBlock(pCommitter);
|
if (code) goto _err;
|
||||||
if (code) goto _err;
|
|
||||||
} else {
|
|
||||||
code = tsdbCommitDataBlock(pCommitter, NULL);
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -1491,373 +1556,3 @@ _err:
|
||||||
tsdbError("vgId:%d tsdb commit file data impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d tsdb commit file data impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// // ================================================================================
|
|
||||||
// typedef struct {
|
|
||||||
// SRowInfo rowInfo;
|
|
||||||
// SDataFReader *pReader;
|
|
||||||
// int32_t iLast;
|
|
||||||
// SArray *aBlockL; // SArray<SBlockL>
|
|
||||||
// int32_t iBlockL;
|
|
||||||
// SBlockData bData;
|
|
||||||
// int32_t iRow;
|
|
||||||
// } SLDataIter;
|
|
||||||
|
|
||||||
// typedef struct {
|
|
||||||
// SRBTreeNode *pNode;
|
|
||||||
// SRBTree rbt;
|
|
||||||
// } SDataMerger;
|
|
||||||
|
|
||||||
// static void tDataMergerInit(SDataMerger *pMerger, SArray *aNodeP) {
|
|
||||||
// pMerger->pNode = NULL;
|
|
||||||
// tRBTreeCreate(&pMerger->rbt, tRowInfoCmprFn);
|
|
||||||
// for (int32_t iNode = 0; iNode < taosArrayGetSize(aNodeP); iNode++) {
|
|
||||||
// SRBTreeNode *pNode = (SRBTreeNode *)taosArrayGetP(aNodeP, iNode);
|
|
||||||
|
|
||||||
// pNode = tRBTreePut(&pMerger->rbt, pNode);
|
|
||||||
// ASSERT(pNode);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// static int32_t tDataMergeNext(SDataMerger *pMerger, SRowInfo **ppInfo) {
|
|
||||||
// int32_t code = 0;
|
|
||||||
|
|
||||||
// if (pMerger->pNode) {
|
|
||||||
// // next current iter
|
|
||||||
// SLDataIter *pIter = (SLDataIter *)pMerger->pNode->payload;
|
|
||||||
|
|
||||||
// pIter->iRow++;
|
|
||||||
// if (pIter->iRow < pIter->bData.nRow) {
|
|
||||||
// pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
|
|
||||||
// pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
|
|
||||||
// } else {
|
|
||||||
// pIter->iBlockL++;
|
|
||||||
// if (pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) {
|
|
||||||
// SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL);
|
|
||||||
// code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pBlockL, &pIter->bData);
|
|
||||||
// if (code) goto _exit;
|
|
||||||
|
|
||||||
// pIter->iRow = 0;
|
|
||||||
// pIter->rowInfo.suid = pIter->bData.suid;
|
|
||||||
// pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
|
|
||||||
// pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, 0);
|
|
||||||
// } else {
|
|
||||||
// pMerger->pNode = NULL;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// SRBTreeNode *pMinNode = tRBTreeMin(&pMerger->rbt);
|
|
||||||
// if (pMerger->pNode && pMinNode) {
|
|
||||||
// int32_t c = tRowInfoCmprFn(pMerger->pNode->payload, pMinNode->payload);
|
|
||||||
// if (c > 0) {
|
|
||||||
// pMerger->pNode = tRBTreePut(&pMerger->rbt, pMerger->pNode);
|
|
||||||
// ASSERT(pMerger->pNode);
|
|
||||||
// pMerger->pNode = NULL;
|
|
||||||
// } else {
|
|
||||||
// ASSERT(c);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if (pMerger->pNode == NULL) {
|
|
||||||
// pMerger->pNode = tRBTreeMin(&pMerger->rbt);
|
|
||||||
// if (pMerger->pNode) {
|
|
||||||
// tRBTreeDrop(&pMerger->rbt, pMerger->pNode);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if (pMerger->pNode) {
|
|
||||||
// *ppInfo = &((SLDataIter *)pMerger->pNode->payload)[0].rowInfo;
|
|
||||||
// } else {
|
|
||||||
// *ppInfo = NULL;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// _exit:
|
|
||||||
// return code;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// typedef struct {
|
|
||||||
// STsdb *pTsdb;
|
|
||||||
// int8_t maxLast;
|
|
||||||
// int32_t minRow;
|
|
||||||
// int32_t maxRow;
|
|
||||||
// int8_t cmprAlg;
|
|
||||||
// int64_t commitID;
|
|
||||||
// STsdbFS fs;
|
|
||||||
// struct {
|
|
||||||
// SDataFReader *pReader;
|
|
||||||
// SArray *aBlockIdx;
|
|
||||||
// SLDataIter *aLDataiter[TSDB_MAX_LAST_FILE];
|
|
||||||
// SDataMerger merger;
|
|
||||||
// } dReader;
|
|
||||||
// struct {
|
|
||||||
// SDataFWriter *pWriter;
|
|
||||||
// SArray *aBlockIdx;
|
|
||||||
// SArray *aBlockL;
|
|
||||||
// SBlockData bData;
|
|
||||||
// SBlockData bDatal;
|
|
||||||
// } dWriter;
|
|
||||||
// } STsdbMerger;
|
|
||||||
|
|
||||||
// static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) {
|
|
||||||
// int32_t code = 0;
|
|
||||||
// STsdb *pTsdb = pMerger->pTsdb;
|
|
||||||
|
|
||||||
// // reader
|
|
||||||
// code = tsdbDataFReaderOpen(&pMerger->dReader.pReader, pTsdb, pSet);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// code = tsdbReadBlockIdx(pMerger->dReader.pReader, pMerger->dReader.aBlockIdx);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// pMerger->dReader.merger.pNode = NULL;
|
|
||||||
// tRBTreeCreate(&pMerger->dReader.merger.rbt, tRowInfoCmprFn);
|
|
||||||
// for (int8_t iLast = 0; iLast < pSet->nLastF; iLast++) {
|
|
||||||
// SRBTreeNode *pNode = (SRBTreeNode *)taosMemoryCalloc(1, sizeof(*pNode) + sizeof(SLDataIter));
|
|
||||||
// if (pNode == NULL) {
|
|
||||||
// code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
// goto _err;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// SLDataIter *pIter = (SLDataIter *)pNode->payload;
|
|
||||||
// pIter->pReader = pMerger->dReader.pReader;
|
|
||||||
// pIter->iLast = iLast;
|
|
||||||
|
|
||||||
// pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL));
|
|
||||||
// if (pIter->aBlockL == NULL) {
|
|
||||||
// code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
// goto _err;
|
|
||||||
// }
|
|
||||||
// code = tBlockDataCreate(&pIter->bData);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// code = tsdbReadBlockL(pMerger->dReader.pReader, iLast, pIter->aBlockL);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// if (taosArrayGetSize(pIter->aBlockL) == 0) continue;
|
|
||||||
// pIter->iBlockL = 0;
|
|
||||||
|
|
||||||
// SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, 0);
|
|
||||||
// code = tsdbReadLastBlockEx(pMerger->dReader.pReader, iLast, pBlockL, &pIter->bData);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// pIter->iRow = 0;
|
|
||||||
// pIter->rowInfo.suid = pIter->bData.suid;
|
|
||||||
// pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
|
|
||||||
// pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, 0);
|
|
||||||
|
|
||||||
// pNode = tRBTreePut(&pMerger->dReader.merger.rbt, pNode);
|
|
||||||
// ASSERT(pNode);
|
|
||||||
|
|
||||||
// pMerger->dReader.aLDataiter[iLast] = pIter;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // writer
|
|
||||||
// SHeadFile fHead = {.commitID = pMerger->commitID};
|
|
||||||
// SDataFile fData = *pSet->pDataF;
|
|
||||||
// SSmaFile fSma = *pSet->pSmaF;
|
|
||||||
// SLastFile fLast = {.commitID = pMerger->commitID};
|
|
||||||
// SDFileSet wSet = {.diskId = pSet->diskId,
|
|
||||||
// .fid = pSet->fid,
|
|
||||||
// .nLastF = 1,
|
|
||||||
// .pHeadF = &fHead,
|
|
||||||
// .pDataF = &fData,
|
|
||||||
// .pSmaF = &fSma,
|
|
||||||
// .aLastF[0] = &fLast};
|
|
||||||
// code = tsdbDataFWriterOpen(&pMerger->dWriter.pWriter, pTsdb, &wSet);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// return code;
|
|
||||||
|
|
||||||
// _err:
|
|
||||||
// tsdbError("vgId:%d tsdb merge file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
|
||||||
// return code;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// static int32_t tsdbMergeFileDataEnd(STsdbMerger *pMerger) {
|
|
||||||
// int32_t code = 0;
|
|
||||||
// STsdb *pTsdb = pMerger->pTsdb;
|
|
||||||
|
|
||||||
// // write aBlockIdx
|
|
||||||
// code = tsdbWriteBlockIdx(pMerger->dWriter.pWriter, pMerger->dWriter.aBlockIdx);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// // write aBlockL
|
|
||||||
// code = tsdbWriteBlockL(pMerger->dWriter.pWriter, pMerger->dWriter.aBlockL);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// // update file header
|
|
||||||
// code = tsdbUpdateDFileSetHeader(pMerger->dWriter.pWriter);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// // upsert SDFileSet
|
|
||||||
// code = tsdbFSUpsertFSet(&pMerger->fs, &pMerger->dWriter.pWriter->wSet);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// // close and sync
|
|
||||||
// code = tsdbDataFWriterClose(&pMerger->dWriter.pWriter, 1);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// if (pMerger->dReader.pReader) {
|
|
||||||
// code = tsdbDataFReaderClose(&pMerger->dReader.pReader);
|
|
||||||
// if (code) goto _err;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// return code;
|
|
||||||
|
|
||||||
// _err:
|
|
||||||
// tsdbError("vgId:%d tsdb merge file data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
|
||||||
// return code;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// static int32_t tsdbMergeFileData(STsdbMerger *pMerger, SDFileSet *pSet) {
|
|
||||||
// int32_t code = 0;
|
|
||||||
// STsdb *pTsdb = pMerger->pTsdb;
|
|
||||||
|
|
||||||
// // start
|
|
||||||
// code = tsdbMergeFileDataStart(pMerger, pSet);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// // impl
|
|
||||||
// SRowInfo *pInfo;
|
|
||||||
// TABLEID id = {0};
|
|
||||||
// while (true) {
|
|
||||||
// code = tDataMergeNext(&pMerger->dReader.merger, &pInfo);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// if (pInfo == NULL) {
|
|
||||||
// if (pMerger->dWriter.bData.nRow > 0) {
|
|
||||||
// // TODO
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if (pMerger->dWriter.bDatal.nRow > 0) {
|
|
||||||
// // TODO
|
|
||||||
// }
|
|
||||||
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if (id.suid != pInfo->suid || id.uid != pInfo->uid) {
|
|
||||||
// while (true) {
|
|
||||||
// // move commit the head data
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // prepare to commit next
|
|
||||||
// }
|
|
||||||
|
|
||||||
// code = tBlockDataAppendRow(&pMerger->dWriter.bData, &pInfo->row, NULL, pInfo->uid);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// if (pMerger->dWriter.bData.nRow >= pMerger->maxRow * 4 / 5) {
|
|
||||||
// // code = tsdbCommitDataBlock();
|
|
||||||
// if (code) goto _err;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // end
|
|
||||||
// code = tsdbMergeFileDataEnd(pMerger);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// return code;
|
|
||||||
|
|
||||||
// _err:
|
|
||||||
// tsdbError("vgId:%d tsdb merge file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
|
||||||
// return code;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// static int32_t tsdbStartMerge(STsdbMerger *pMerger, STsdb *pTsdb) {
|
|
||||||
// int32_t code = 0;
|
|
||||||
|
|
||||||
// pMerger->pTsdb = pTsdb;
|
|
||||||
// pMerger->maxLast = TSDB_DEFAULT_LAST_FILE;
|
|
||||||
// pMerger->commitID = ++pTsdb->pVnode->state.commitID;
|
|
||||||
// code = tsdbFSCopy(pTsdb, &pMerger->fs);
|
|
||||||
// if (code) goto _exit;
|
|
||||||
|
|
||||||
// // reader
|
|
||||||
// pMerger->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
|
||||||
// if (pMerger->dReader.aBlockIdx == NULL) {
|
|
||||||
// code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
// goto _exit;
|
|
||||||
// }
|
|
||||||
// // for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) {
|
|
||||||
// // pMerger->dReader.aBlockL[iLast] = taosArrayInit(0, sizeof(SBlockL));
|
|
||||||
// // if (pMerger->dReader.aBlockL[iLast] == NULL) {
|
|
||||||
// // code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
// // goto _exit;
|
|
||||||
// // }
|
|
||||||
// // }
|
|
||||||
|
|
||||||
// // writer
|
|
||||||
// pMerger->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
|
||||||
// if (pMerger->dWriter.aBlockIdx == NULL) {
|
|
||||||
// code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
// goto _exit;
|
|
||||||
// }
|
|
||||||
// pMerger->dWriter.aBlockL = taosArrayInit(0, sizeof(SBlockL));
|
|
||||||
// if (pMerger->dWriter.aBlockL == NULL) {
|
|
||||||
// code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
// goto _exit;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// _exit:
|
|
||||||
// return code;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// static int32_t tsdbEndMerge(STsdbMerger *pMerger) {
|
|
||||||
// int32_t code = 0;
|
|
||||||
// STsdb *pTsdb = pMerger->pTsdb;
|
|
||||||
|
|
||||||
// code = tsdbFSCommit1(pTsdb, &pMerger->fs);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// taosThreadRwlockWrlock(&pTsdb->rwLock);
|
|
||||||
// code = tsdbFSCommit2(pTsdb, &pMerger->fs);
|
|
||||||
// if (code) {
|
|
||||||
// taosThreadRwlockUnlock(&pTsdb->rwLock);
|
|
||||||
// goto _err;
|
|
||||||
// }
|
|
||||||
// taosThreadRwlockUnlock(&pTsdb->rwLock);
|
|
||||||
|
|
||||||
// // writer
|
|
||||||
// taosArrayDestroy(pMerger->dWriter.aBlockL);
|
|
||||||
// taosArrayDestroy(pMerger->dWriter.aBlockIdx);
|
|
||||||
|
|
||||||
// // reader
|
|
||||||
// // for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) {
|
|
||||||
// // taosArrayDestroy(pMerger->dReader.aBlockL[iLast]);
|
|
||||||
// // }
|
|
||||||
// taosArrayDestroy(pMerger->dReader.aBlockIdx);
|
|
||||||
// tsdbFSDestroy(&pMerger->fs);
|
|
||||||
|
|
||||||
// return code;
|
|
||||||
|
|
||||||
// _err:
|
|
||||||
// tsdbError("vgId:%d, tsdb end merge failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
|
||||||
// return code;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// int32_t tsdbMerge(STsdb *pTsdb) {
|
|
||||||
// int32_t code = 0;
|
|
||||||
// STsdbMerger merger = {0};
|
|
||||||
|
|
||||||
// code = tsdbStartMerge(&merger, pTsdb);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// for (int32_t iSet = 0; iSet < taosArrayGetSize(merger.fs.aDFileSet); iSet++) {
|
|
||||||
// SDFileSet *pSet = (SDFileSet *)taosArrayGet(merger.fs.aDFileSet, iSet);
|
|
||||||
// if (pSet->nLastF < merger.maxLast) continue;
|
|
||||||
|
|
||||||
// code = tsdbMergeFileData(&merger, pSet);
|
|
||||||
// if (code) goto _err;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// code = tsdbEndMerge(&merger);
|
|
||||||
// if (code) goto _err;
|
|
||||||
|
|
||||||
// return code;
|
|
||||||
|
|
||||||
// _err:
|
|
||||||
// tsdbError("vgId:%d tsdb merge failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
|
||||||
// return code;
|
|
||||||
// }
|
|
Loading…
Reference in New Issue