more code
This commit is contained in:
parent
3aa1f8ca90
commit
6f947af8cc
|
@ -68,6 +68,8 @@ typedef struct {
|
||||||
SArray *aDelData; // SArray<SDelData>
|
SArray *aDelData; // SArray<SDelData>
|
||||||
SArray *aSkyLine; // SArray<TSDBKEY>
|
SArray *aSkyLine; // SArray<TSDBKEY>
|
||||||
TSDBKEY *aTSDBKEY;
|
TSDBKEY *aTSDBKEY;
|
||||||
|
int32_t iKey;
|
||||||
|
TSDBKEY sKey;
|
||||||
|
|
||||||
// Reader
|
// Reader
|
||||||
SDataFReader *pReader;
|
SDataFReader *pReader;
|
||||||
|
@ -504,6 +506,42 @@ static int32_t tDelIdxCmprFn(const SDelIdx *pDelIdx1, const SDelIdx *pDelIdx2) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool tsdbCompactRowIsDeleted(STsdbCompactor *pCompactor, TSDBROW *pRow) {
|
||||||
|
TSDBKEY tKey = TSDBROW_KEY(pRow);
|
||||||
|
|
||||||
|
while (tKey.ts > pCompactor->sKey.ts) {
|
||||||
|
pCompactor->sKey.version = pCompactor->aTSDBKEY[pCompactor->iKey].version;
|
||||||
|
pCompactor->iKey++;
|
||||||
|
if (pCompactor->iKey < taosArrayGetSize(pCompactor->aSkyLine)) {
|
||||||
|
pCompactor->sKey.ts = pCompactor->aTSDBKEY[pCompactor->iKey].ts;
|
||||||
|
} else {
|
||||||
|
pCompactor->sKey.ts = TSKEY_MAX;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tKey.ts < pCompactor->sKey.ts) {
|
||||||
|
if (tKey.version > pCompactor->sKey.version) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} else if (tKey.ts == pCompactor->sKey.ts) {
|
||||||
|
int64_t version;
|
||||||
|
if (pCompactor->iKey < taosArrayGetSize(pCompactor->aSkyLine)) {
|
||||||
|
version = TMAX(pCompactor->sKey.version, pCompactor->aTSDBKEY[pCompactor->iKey].version);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tKey.version > version) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) {
|
static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -588,6 +626,9 @@ static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
pCompactor->aTSDBKEY = (TSDBKEY *)TARRAY_DATA(pCompactor->aSkyLine);
|
pCompactor->aTSDBKEY = (TSDBKEY *)TARRAY_DATA(pCompactor->aSkyLine);
|
||||||
|
pCompactor->iKey = 0;
|
||||||
|
pCompactor->sKey.version = 0;
|
||||||
|
pCompactor->sKey.ts = pCompactor->aTSDBKEY[0].ts;
|
||||||
} else {
|
} else {
|
||||||
pCompactor->aTSDBKEY = NULL;
|
pCompactor->aTSDBKEY = NULL;
|
||||||
}
|
}
|
||||||
|
@ -596,7 +637,8 @@ static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) {
|
||||||
|
|
||||||
ASSERT(pRowInfo->uid == pCompactor->tbSkm.uid);
|
ASSERT(pRowInfo->uid == pCompactor->tbSkm.uid);
|
||||||
|
|
||||||
if (pCompactor->aTSDBKEY && 0 /* TODO: the row is deleted */) {
|
// check if the row is deleted
|
||||||
|
if (pCompactor->aTSDBKEY && tsdbCompactRowIsDeleted(pCompactor, &pRowInfo->row)) {
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue