This commit is contained in:
Hongze Cheng 2022-06-04 07:38:27 +00:00
parent b6e79b2693
commit c7c47788f3
2 changed files with 24 additions and 20 deletions

View File

@ -87,7 +87,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp *pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
int metaGetTableEntryByName(SMetaReader* pReader, const char* name); int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
@ -112,7 +112,7 @@ int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
int tsdbClose(STsdb** pTsdb); int tsdbClose(STsdb** pTsdb);
int tsdbBegin(STsdb* pTsdb); int tsdbBegin(STsdb* pTsdb);
int tsdbCommit(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);

View File

@ -86,7 +86,7 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter,
SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn); static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn);
int tsdbBegin(STsdb *pTsdb) { int tsdbBegin(STsdb *pTsdb) {
@ -101,33 +101,36 @@ int tsdbBegin(STsdb *pTsdb) {
return 0; return 0;
} }
int tsdbCommit(STsdb *pRepo) { int32_t tsdbCommit(STsdb *pTsdb) {
int32_t code = 0;
SCommitH commith = {0}; SCommitH commith = {0};
SDFileSet *pSet = NULL; SDFileSet *pSet = NULL;
int fid; int fid;
// if (pRepo->imem == NULL) return 0; ASSERT(pTsdb->imem == NULL && pTsdb->mem);
pRepo->imem = pRepo->mem; pTsdb->imem = pTsdb->mem;
pRepo->mem = NULL; pTsdb->mem = NULL;
tsdbStartCommit(pRepo); // start commit
// Resource initialization tsdbStartCommit(pTsdb);
if (tsdbInitCommitH(&commith, pRepo) < 0) { if (tsdbInitCommitH(&commith, pTsdb) < 0) {
return -1; return -1;
} }
#if 0
// Skip expired memory data and expired FSET // Skip expired memory data and expired FSET
tsdbSeekCommitIter(&commith, commith.rtn.minKey); tsdbSeekCommitIter(&commith, commith.rtn.minKey);
while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) { while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) {
if (pSet->fid < commith.rtn.minFid) { if (pSet->fid < commith.rtn.minFid) {
tsdbInfo("vgId:%d, FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid, tsdbInfo("vgId:%d, FSET %d on level %d disk id %d expires, remove it", REPO_ID(pTsdb), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
} else { } else {
break; break;
} }
} }
#endif
// Loop to commit to each file // commit
fid = tsdbNextCommitFid(&(commith)); fid = tsdbNextCommitFid(&(commith));
while (true) { while (true) {
// Loop over both on disk and memory // Loop over both on disk and memory
@ -136,7 +139,7 @@ int tsdbCommit(STsdb *pRepo) {
if (pSet && (fid == TSDB_IVLD_FID || pSet->fid < fid)) { if (pSet && (fid == TSDB_IVLD_FID || pSet->fid < fid)) {
// Only has existing FSET but no memory data to commit in this // Only has existing FSET but no memory data to commit in this
// existing FSET, only check if file in correct retention // existing FSET, only check if file in correct retention
if (tsdbApplyRtnOnFSet(pRepo, pSet, &(commith.rtn)) < 0) { if (tsdbApplyRtnOnFSet(pTsdb, pSet, &(commith.rtn)) < 0) {
tsdbDestroyCommitH(&commith); tsdbDestroyCommitH(&commith);
return -1; return -1;
} }
@ -167,10 +170,11 @@ int tsdbCommit(STsdb *pRepo) {
} }
} }
// end commit
tsdbDestroyCommitH(&commith); tsdbDestroyCommitH(&commith);
tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS); tsdbEndCommit(pTsdb, TSDB_CODE_SUCCESS);
return 0; return code;
} }
static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
@ -717,8 +721,8 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// extern int32_t tsTsdbMetaCompactRatio; // extern int32_t tsTsdbMetaCompactRatio;
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, static int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf,
SBlockIdx *pIdx) { SBlockIdx *pIdx) {
size_t nSupBlocks; size_t nSupBlocks;
size_t nSubBlocks; size_t nSubBlocks;
uint32_t tlen; uint32_t tlen;
@ -778,7 +782,7 @@ int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray
return 0; return 0;
} }
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
SBlockIdx *pBlkIdx; SBlockIdx *pBlkIdx;
size_t nidx = taosArrayGetSize(pIdxA); size_t nidx = taosArrayGetSize(pIdxA);
int tlen = 0, size; int tlen = 0, size;
@ -1012,8 +1016,8 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
* @param ppExBuf * @param ppExBuf
* @return int * @return int
*/ */
int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) { SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) {
STsdbCfg *pCfg = REPO_CFG(pRepo); STsdbCfg *pCfg = REPO_CFG(pRepo);
SBlockData *pBlockData = NULL; SBlockData *pBlockData = NULL;
SAggrBlkData *pAggrBlkData = NULL; SAggrBlkData *pAggrBlkData = NULL;