more code

This commit is contained in:
Hongze Cheng 2023-02-07 22:05:36 +08:00
parent c7267c7c9f
commit 9b338ae095
1 changed files with 50 additions and 35 deletions

View File

@ -725,7 +725,8 @@ typedef struct {
SArray *aSkyLine; // SArray<TSDBKEY> SArray *aSkyLine; // SArray<TSDBKEY>
int32_t iDelIdx; int32_t iDelIdx;
int32_t iSkyLine; int32_t iSkyLine;
int8_t onGoing; TSDBKEY *pDKey;
TSDBKEY dKey;
// Reader // Reader
SDataFReader *pReader; SDataFReader *pReader;
@ -751,8 +752,7 @@ static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEI
// tombstone // tombstone
for (;;) { for (;;) {
if (pCompactor->iDelIdx >= taosArrayGetSize(pCompactor->aDelIdx)) { if (pCompactor->iDelIdx >= taosArrayGetSize(pCompactor->aDelIdx)) {
if (pCompactor->aSkyLine) taosArrayClear(pCompactor->aSkyLine); pCompactor->pDKey = NULL;
pCompactor->iSkyLine = 0;
break; break;
} }
@ -771,10 +771,18 @@ static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEI
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
pCompactor->iSkyLine = 0; pCompactor->iSkyLine = 0;
if (pCompactor->iSkyLine < taosArrayGetSize(pCompactor->aSkyLine)) {
TSDBKEY *pKey = (TSDBKEY *)taosArrayGet(pCompactor->aSkyLine, pCompactor->iSkyLine);
pCompactor->dKey.version = 0;
pCompactor->dKey.ts = pKey->ts;
pCompactor->pDKey = &pCompactor->dKey;
} else {
pCompactor->pDKey = NULL;
}
break; break;
} else { } else {
if (pCompactor->aSkyLine) taosArrayClear(pCompactor->aSkyLine); pCompactor->pDKey = NULL;
pCompactor->iSkyLine = 0;
break; break;
} }
} }
@ -859,38 +867,45 @@ _exit:
} }
static bool tsdbCompactRowIsDeleted(STsdbCompactor *pCompactor, TSDBROW *pRow) { static bool tsdbCompactRowIsDeleted(STsdbCompactor *pCompactor, TSDBROW *pRow) {
// TSDBKEY tKey = TSDBROW_KEY(pRow); TSDBKEY tKey = TSDBROW_KEY(pRow);
// while (tKey.ts > pCompactor->sKey.ts) { if (tKey.ts > pCompactor->pDKey->ts) {
// pCompactor->sKey.version = pCompactor->aTSDBKEY[pCompactor->iKey].version; while (tKey.ts > pCompactor->pDKey->ts) {
// pCompactor->iKey++; TSDBKEY *pKey = (TSDBKEY *)taosArrayGet(pCompactor->aSkyLine, pCompactor->iSkyLine);
// if (pCompactor->iKey < taosArrayGetSize(pCompactor->aSkyLine)) {
// pCompactor->sKey.ts = pCompactor->aTSDBKEY[pCompactor->iKey].ts;
// } else {
// pCompactor->sKey.ts = TSKEY_MAX;
// }
// }
// if (tKey.ts < pCompactor->sKey.ts) { pCompactor->pDKey->version = pKey->version;
// if (tKey.version > pCompactor->sKey.version) { pCompactor->iSkyLine++;
// return false; if (pCompactor->iSkyLine < taosArrayGetSize(pCompactor->aSkyLine)) {
// } else { TSDBKEY *pKey = (TSDBKEY *)taosArrayGet(pCompactor->aSkyLine, pCompactor->iSkyLine);
// return true;
// }
// } else if (tKey.ts == pCompactor->sKey.ts) {
// int64_t version;
// if (pCompactor->iKey < taosArrayGetSize(pCompactor->aSkyLine)) {
// version = TMAX(pCompactor->sKey.version, pCompactor->aTSDBKEY[pCompactor->iKey].version);
// } else {
// version = pCompactor->sKey.version;
// }
// if (tKey.version > version) { pCompactor->dKey.ts = pKey->ts;
// return false; } else {
// } else { pCompactor->pDKey = NULL;
// return true; return false;
// } }
// } }
}
if (tKey.ts < pCompactor->pDKey->ts) {
if (tKey.version > pCompactor->pDKey->version) {
return false;
} else {
return true;
}
} else if (tKey.ts == pCompactor->pDKey->ts) {
int64_t version;
if (pCompactor->iSkyLine < taosArrayGetSize(pCompactor->aSkyLine)) {
version = TMAX(pCompactor->pDKey->version, pCompactor->aTSDBKEY[pCompactor->iKey].version);
} else {
version = pCompactor->pDKey->version;
}
if (tKey.version > version) {
return false;
} else {
return true;
}
}
return false; return false;
} }
@ -919,7 +934,7 @@ static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *p
} }
// check if row is deleted // check if row is deleted
if (pCompactor->onGoing && tsdbCompactRowIsDeleted(pCompactor, &pRowInfo->row)) goto _exit; if (pCompactor->pDKey && tsdbCompactRowIsDeleted(pCompactor, &pRowInfo->row)) goto _exit;
code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, NULL, pRowInfo->uid); code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, NULL, pRowInfo->uid);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);