more code
This commit is contained in:
parent
e39a08f763
commit
5bce5c6935
|
@ -281,6 +281,8 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
|
||||||
// tsdbRead.c ==============================================================================================
|
// tsdbRead.c ==============================================================================================
|
||||||
int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap);
|
int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap);
|
||||||
void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap);
|
void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap);
|
||||||
|
// tsdbMerge.c ==============================================================================================
|
||||||
|
int32_t tsdbMerge(STsdb *pTsdb);
|
||||||
|
|
||||||
#define TSDB_CACHE_NO(c) ((c).cacheLast == 0)
|
#define TSDB_CACHE_NO(c) ((c).cacheLast == 0)
|
||||||
#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0)
|
#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0)
|
||||||
|
|
|
@ -28,6 +28,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
|
int8_t toMerge;
|
||||||
/* commit data */
|
/* commit data */
|
||||||
int64_t commitID;
|
int64_t commitID;
|
||||||
int32_t minutes;
|
int32_t minutes;
|
||||||
|
@ -394,6 +395,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
||||||
SDFileSet wSet = {0};
|
SDFileSet wSet = {0};
|
||||||
if (pRSet) {
|
if (pRSet) {
|
||||||
ASSERT(pRSet->nLastF < pCommitter->maxLast);
|
ASSERT(pRSet->nLastF < pCommitter->maxLast);
|
||||||
|
|
||||||
fHead = (SHeadFile){.commitID = pCommitter->commitID};
|
fHead = (SHeadFile){.commitID = pCommitter->commitID};
|
||||||
fData = *pRSet->pDataF;
|
fData = *pRSet->pDataF;
|
||||||
fSma = *pRSet->pSmaF;
|
fSma = *pRSet->pSmaF;
|
||||||
|
@ -409,6 +411,10 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
||||||
}
|
}
|
||||||
wSet.nLastF = pRSet->nLastF + 1;
|
wSet.nLastF = pRSet->nLastF + 1;
|
||||||
wSet.aLastF[wSet.nLastF - 1] = &fLast; // todo
|
wSet.aLastF[wSet.nLastF - 1] = &fLast; // todo
|
||||||
|
|
||||||
|
if (wSet.nLastF == pCommitter->maxLast) {
|
||||||
|
pCommitter->toMerge = 1;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
fHead = (SHeadFile){.commitID = pCommitter->commitID};
|
fHead = (SHeadFile){.commitID = pCommitter->commitID};
|
||||||
fData = (SDataFile){.commitID = pCommitter->commitID};
|
fData = (SDataFile){.commitID = pCommitter->commitID};
|
||||||
|
@ -1277,6 +1283,11 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
|
||||||
tsdbFSDestroy(&pCommitter->fs);
|
tsdbFSDestroy(&pCommitter->fs);
|
||||||
taosArrayDestroy(pCommitter->aTbDataP);
|
taosArrayDestroy(pCommitter->aTbDataP);
|
||||||
|
|
||||||
|
if (pCommitter->toMerge) {
|
||||||
|
code = tsdbMerge(pTsdb);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode));
|
tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode));
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
|
|
|
@ -17,11 +17,36 @@
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
|
int8_t maxLast;
|
||||||
STsdbFS fs;
|
STsdbFS fs;
|
||||||
|
struct {
|
||||||
|
SDataFReader *pReader;
|
||||||
|
} dReader;
|
||||||
|
struct {
|
||||||
|
SDataFWriter *pWriter;
|
||||||
|
} dWriter;
|
||||||
} STsdbMerger;
|
} STsdbMerger;
|
||||||
|
|
||||||
int32_t tsdbMerge(STsdb *pTsdb) {
|
int32_t tsdbMerge(STsdb *pTsdb) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
// TODO
|
STsdbMerger merger = {0};
|
||||||
|
STsdbMerger *pMerger = &merger;
|
||||||
|
|
||||||
|
pMerger->pTsdb = pTsdb;
|
||||||
|
pMerger->maxLast = TSDB_DEFAULT_LAST_FILE;
|
||||||
|
code = tsdbFSCopy(pTsdb, &pMerger->fs);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
for (int32_t iSet = 0; iSet < taosArrayGetSize(pMerger->fs.aDFileSet); iSet++) {
|
||||||
|
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pMerger->fs.aDFileSet, iSet);
|
||||||
|
if (pSet->nLastF < pMerger->maxLast) continue;
|
||||||
|
|
||||||
|
// do merge the file
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("vgId:%d tsdb merge failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
Loading…
Reference in New Issue