diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index bb2a4db20b..372161a3e8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -68,6 +68,8 @@ typedef struct { SArray *aDelData; // SArray SArray *aSkyLine; // SArray TSDBKEY *aTSDBKEY; + int32_t iKey; + TSDBKEY sKey; // Reader SDataFReader *pReader; @@ -504,6 +506,42 @@ static int32_t tDelIdxCmprFn(const SDelIdx *pDelIdx1, const SDelIdx *pDelIdx2) { return 0; } + +static bool tsdbCompactRowIsDeleted(STsdbCompactor *pCompactor, TSDBROW *pRow) { + TSDBKEY tKey = TSDBROW_KEY(pRow); + + while (tKey.ts > pCompactor->sKey.ts) { + pCompactor->sKey.version = pCompactor->aTSDBKEY[pCompactor->iKey].version; + pCompactor->iKey++; + if (pCompactor->iKey < taosArrayGetSize(pCompactor->aSkyLine)) { + pCompactor->sKey.ts = pCompactor->aTSDBKEY[pCompactor->iKey].ts; + } else { + pCompactor->sKey.ts = TSKEY_MAX; + } + } + + if (tKey.ts < pCompactor->sKey.ts) { + if (tKey.version > pCompactor->sKey.version) { + return false; + } else { + return true; + } + } else if (tKey.ts == pCompactor->sKey.ts) { + int64_t version; + if (pCompactor->iKey < taosArrayGetSize(pCompactor->aSkyLine)) { + version = TMAX(pCompactor->sKey.version, pCompactor->aTSDBKEY[pCompactor->iKey].version); + } + + if (tKey.version > version) { + return false; + } else { + return true; + } + } + + return false; +} + static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) { int32_t code = 0; int32_t lino = 0; @@ -588,6 +626,9 @@ static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) { TSDB_CHECK_CODE(code, lino, _exit); pCompactor->aTSDBKEY = (TSDBKEY *)TARRAY_DATA(pCompactor->aSkyLine); + pCompactor->iKey = 0; + pCompactor->sKey.version = 0; + pCompactor->sKey.ts = pCompactor->aTSDBKEY[0].ts; } else { pCompactor->aTSDBKEY = NULL; } @@ -596,7 +637,8 @@ static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) { ASSERT(pRowInfo->uid == pCompactor->tbSkm.uid); - if (pCompactor->aTSDBKEY && 0 /* TODO: the row is deleted */) { + // check if the row is deleted + if (pCompactor->aTSDBKEY && tsdbCompactRowIsDeleted(pCompactor, &pRowInfo->row)) { continue; } else { break;