more code

This commit is contained in:
Hongze Cheng 2022-08-26 10:44:02 +08:00
parent 4a1447b666
commit 039e4a0505
1 changed files with 64 additions and 13 deletions

View File

@ -22,11 +22,13 @@ typedef struct {
} SRowInfo; } SRowInfo;
typedef struct { typedef struct {
SRowInfo rowInfo; SRowInfo rowInfo;
SArray *aBlockL; // SArray<SBlockL> SDataFReader *pReader;
int32_t iBlockL; int32_t iLast;
SBlockData bData; SArray *aBlockL; // SArray<SBlockL>
int32_t iRow; int32_t iBlockL;
SBlockData bData;
int32_t iRow;
} SLDataIter; } SLDataIter;
typedef struct { typedef struct {
@ -64,6 +66,9 @@ static void tDataMergerInit(SDataMerger *pMerger, SArray *aNodeP) {
} }
} }
extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL,
SBlockData *pBlockData); // todo
static int32_t tDataMergeNext(SDataMerger *pMerger, SRowInfo **ppInfo) { static int32_t tDataMergeNext(SDataMerger *pMerger, SRowInfo **ppInfo) {
int32_t code = 0; int32_t code = 0;
@ -78,7 +83,8 @@ static int32_t tDataMergeNext(SDataMerger *pMerger, SRowInfo **ppInfo) {
} else { } else {
pIter->iBlockL++; pIter->iBlockL++;
if (pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) { if (pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) {
// code = tsdbReadLastBlock(NULL, (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL), &pIter->bData); SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL);
code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pBlockL, &pIter->bData);
if (code) goto _exit; if (code) goto _exit;
pIter->iRow = 0; pIter->iRow = 0;
@ -129,7 +135,7 @@ typedef struct {
struct { struct {
SDataFReader *pReader; SDataFReader *pReader;
SArray *aBlockIdx; SArray *aBlockIdx;
SLDataIter aLDataiter[TSDB_MAX_LAST_FILE]; SLDataIter *aLDataiter[TSDB_MAX_LAST_FILE];
SDataMerger merger; SDataMerger merger;
} dReader; } dReader;
struct { struct {
@ -141,9 +147,6 @@ typedef struct {
} dWriter; } dWriter;
} STsdbMerger; } STsdbMerger;
extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL,
SBlockData *pBlockData); // todo
static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) { static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) {
int32_t code = 0; int32_t code = 0;
STsdb *pTsdb = pMerger->pTsdb; STsdb *pTsdb = pMerger->pTsdb;
@ -165,6 +168,8 @@ static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) {
} }
SLDataIter *pIter = (SLDataIter *)pNode->payload; SLDataIter *pIter = (SLDataIter *)pNode->payload;
pIter->pReader = pMerger->dReader.pReader;
pIter->iLast = iLast;
pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL));
if (pIter->aBlockL == NULL) { if (pIter->aBlockL == NULL) {
@ -191,6 +196,8 @@ static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) {
pNode = tRBTreePut(&pMerger->dReader.merger.rbt, pNode); pNode = tRBTreePut(&pMerger->dReader.merger.rbt, pNode);
ASSERT(pNode); ASSERT(pNode);
pMerger->dReader.aLDataiter[iLast] = pIter;
} }
// writer // writer
@ -251,6 +258,48 @@ _err:
return code; return code;
} }
typedef struct {
int64_t suid;
int64_t uid;
TSKEY ts;
int64_t version;
} SRInfo;
static int32_t tRInfoCmprFn(const void *p1, const void *p2) {
SRInfo *pInfo1 = (SRInfo *)p1;
SRInfo *pInfo2 = (SRInfo *)p2;
// suid
if (pInfo1->suid < pInfo2->suid) {
return -1;
} else if (pInfo1->suid > pInfo2->suid) {
return 1;
}
// uid
if (pInfo1->uid < pInfo2->uid) {
return -1;
} else if (pInfo1->uid > pInfo2->uid) {
return 1;
}
// ts
if (pInfo1->ts < pInfo2->ts) {
return -1;
} else if (pInfo1->ts > pInfo2->ts) {
return 1;
}
// version
if (pInfo1->version < pInfo2->version) {
return -1;
} else if (pInfo1->version > pInfo2->version) {
return 1;
}
return 0;
}
static int32_t tsdbMergeFileData(STsdbMerger *pMerger, SDFileSet *pSet) { static int32_t tsdbMergeFileData(STsdbMerger *pMerger, SDFileSet *pSet) {
int32_t code = 0; int32_t code = 0;
STsdb *pTsdb = pMerger->pTsdb; STsdb *pTsdb = pMerger->pTsdb;
@ -260,9 +309,9 @@ static int32_t tsdbMergeFileData(STsdbMerger *pMerger, SDFileSet *pSet) {
if (code) goto _err; if (code) goto _err;
// impl // impl
SRowInfo rInfo = {.suid = INT64_MIN};
SRowInfo *pInfo; SRowInfo *pInfo;
int64_t nRow = 0; int64_t nRow = 0;
SRInfo rInfo = {.suid = INT64_MIN};
while (true) { while (true) {
code = tDataMergeNext(&pMerger->dReader.merger, &pInfo); code = tDataMergeNext(&pMerger->dReader.merger, &pInfo);
if (code) goto _err; if (code) goto _err;
@ -270,8 +319,10 @@ static int32_t tsdbMergeFileData(STsdbMerger *pMerger, SDFileSet *pSet) {
if (pInfo == NULL) break; if (pInfo == NULL) break;
nRow++; nRow++;
ASSERT(tRowInfoCmprFn(pInfo, &rInfo) > 0); SRInfo rInfoT = {
rInfo = *pInfo; .suid = pInfo->suid, .uid = pInfo->uid, .ts = TSDBROW_TS(&pInfo->row), .version = TSDBROW_VERSION(&pInfo->row)};
ASSERT(tRInfoCmprFn(&rInfoT, &rInfo) > 0);
rInfo = rInfoT;
} }
// end // end