more code
This commit is contained in:
parent
2e827b50e2
commit
7de0fc7dae
|
@ -32,7 +32,6 @@ typedef struct {
|
|||
int8_t type;
|
||||
union {
|
||||
struct {
|
||||
SArray *aTbDataP;
|
||||
int32_t iTbDataP;
|
||||
STbDataIter iter;
|
||||
}; // memory data iter
|
||||
|
@ -75,6 +74,7 @@ typedef struct {
|
|||
struct {
|
||||
SDataIter *pIter;
|
||||
SRBTree rbt;
|
||||
SDataIter aDataIter[TSDB_MAX_LAST_FILE + 1];
|
||||
};
|
||||
struct {
|
||||
SDataFWriter *pWriter;
|
||||
|
@ -102,6 +102,26 @@ static int32_t tsdbCommitData(SCommitter *pCommitter);
|
|||
static int32_t tsdbCommitDel(SCommitter *pCommitter);
|
||||
static int32_t tsdbCommitCache(SCommitter *pCommitter);
|
||||
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
|
||||
static int32_t tsdbNextCommitRow(SCommitter *pCommitter);
|
||||
|
||||
static int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
|
||||
SRowInfo *pInfo1 = (SRowInfo *)p1;
|
||||
SRowInfo *pInfo2 = (SRowInfo *)p2;
|
||||
|
||||
if (pInfo1->suid < pInfo2->suid) {
|
||||
return -1;
|
||||
} else if (pInfo1->suid > pInfo2->suid) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (pInfo1->uid < pInfo2->uid) {
|
||||
return -1;
|
||||
} else if (pInfo1->uid > pInfo2->uid) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row);
|
||||
}
|
||||
|
||||
int32_t tsdbBegin(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
|
@ -376,6 +396,49 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
|
||||
int32_t code = 0;
|
||||
|
||||
tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn);
|
||||
pCommitter->pIter = NULL;
|
||||
|
||||
int8_t iIter = 0;
|
||||
// memory
|
||||
SDataIter *pIter = &pCommitter->aDataIter[iIter];
|
||||
pIter->type = 0;
|
||||
pIter->iTbDataP = 0;
|
||||
for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) {
|
||||
STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
|
||||
TSDBKEY tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN};
|
||||
tsdbTbDataIterOpen(pTbData, &tKey, 0, &pIter->iter);
|
||||
TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
|
||||
|
||||
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
|
||||
pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
|
||||
continue;
|
||||
}
|
||||
|
||||
pIter->r.suid = pTbData->suid;
|
||||
pIter->r.uid = pTbData->uid;
|
||||
pIter->r.row = *pRow;
|
||||
break;
|
||||
}
|
||||
|
||||
tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);
|
||||
|
||||
// disk
|
||||
if (0) {
|
||||
}
|
||||
|
||||
code = tsdbNextCommitRow(pCommitter);
|
||||
if (code) goto _err;
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
||||
int32_t code = 0;
|
||||
STsdb *pTsdb = pCommitter->pTsdb;
|
||||
|
@ -464,6 +527,10 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
|||
tBlockDataReset(&pCommitter->dWriter.bData);
|
||||
tBlockDataReset(&pCommitter->dWriter.bDatal);
|
||||
|
||||
// open iter
|
||||
code = tsdbOpenCommitIter(pCommitter);
|
||||
if (code) goto _err;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
||||
|
@ -1325,24 +1392,6 @@ _err:
|
|||
}
|
||||
|
||||
// ================================================================================
|
||||
static int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
|
||||
SRowInfo *pInfo1 = (SRowInfo *)p1;
|
||||
SRowInfo *pInfo2 = (SRowInfo *)p2;
|
||||
|
||||
if (pInfo1->suid < pInfo2->suid) {
|
||||
return -1;
|
||||
} else if (pInfo1->suid > pInfo2->suid) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (pInfo1->uid < pInfo2->uid) {
|
||||
return -1;
|
||||
} else if (pInfo1->uid > pInfo2->uid) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row);
|
||||
}
|
||||
|
||||
static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) {
|
||||
return (pCommitter->pIter) ? &pCommitter->pIter->r : NULL;
|
||||
|
@ -1370,8 +1419,8 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
|
|||
}
|
||||
|
||||
pIter->iTbDataP++;
|
||||
if (pIter->iTbDataP < taosArrayGetSize(pIter->aTbDataP)) {
|
||||
STbData *pTbData = (STbData *)taosArrayGetP(pIter->aTbDataP, pIter->iTbDataP);
|
||||
if (pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP)) {
|
||||
STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
|
||||
TSDBKEY keyFrom = {.ts = pCommitter->minKey, .version = VERSION_MIN};
|
||||
tsdbTbDataIterOpen(pTbData, &keyFrom, 0, &pIter->iter);
|
||||
pRow = tsdbTbDataIterGet(&pIter->iter);
|
||||
|
|
Loading…
Reference in New Issue