refact more code

This commit is contained in:
Hongze Cheng 2023-01-29 10:21:39 +08:00
parent 792add53c2
commit e0b0ce2395
1 changed files with 86 additions and 21 deletions

View File

@ -29,12 +29,19 @@ extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData,
typedef struct STsdbDataIter2 STsdbDataIter2; typedef struct STsdbDataIter2 STsdbDataIter2;
typedef struct STsdbFilterInfo STsdbFilterInfo; typedef struct STsdbFilterInfo STsdbFilterInfo;
typedef struct {
int64_t suid;
int64_t uid;
SDelData delData;
} SDelInfo;
struct STsdbDataIter2 { struct STsdbDataIter2 {
STsdbDataIter2* next; STsdbDataIter2* next;
SRBTreeNode rbtn; SRBTreeNode rbtn;
int32_t type; int32_t type;
SRowInfo rowInfo; SRowInfo rowInfo;
SDelInfo delInfo;
union { union {
// TSDB_MEM_TABLE_DATA_ITER // TSDB_MEM_TABLE_DATA_ITER
struct { struct {
@ -391,6 +398,54 @@ _exit:
return code; return code;
} }
static int32_t tsdbTombFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
int32_t code = 0;
int32_t lino = 0;
for (;;) {
while (pIter->tIter.iDelData < taosArrayGetSize(pIter->tIter.aDelData)) {
SDelData* pDelData = taosArrayGet(pIter->tIter.aDelData, pIter->tIter.iDelData);
if (pFilterInfo) {
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
if (pFilterInfo->sver > pDelData->version || pFilterInfo->ever < pDelData->version) {
pIter->tIter.iDelData++;
continue;
}
}
}
pIter->delInfo.delData = *pDelData;
pIter->tIter.iDelData++;
goto _exit;
}
for (;;) {
if (pIter->tIter.iDelIdx < taosArrayGetSize(pIter->tIter.aDelIdx)) {
SDelIdx* pDelIdx = taosArrayGet(pIter->tIter.aDelIdx, pIter->tIter.iDelIdx);
code = tsdbReadDelData(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->delInfo.suid = pDelIdx->suid;
pIter->delInfo.uid = pDelIdx->uid;
pIter->tIter.iDelData = 0;
pIter->tIter.iDelIdx++;
break;
} else {
pIter->delInfo = (SDelInfo){0};
goto _exit;
}
}
}
_exit:
if (code) {
tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { static int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
int32_t code = 0; int32_t code = 0;
@ -401,6 +456,8 @@ static int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilter
return tsdbDataFileDataIterNext(pIter, pFilterInfo); return tsdbDataFileDataIterNext(pIter, pFilterInfo);
} else if (pIter->type == TSDB_STT_FILE_DATA_ITER) { } else if (pIter->type == TSDB_STT_FILE_DATA_ITER) {
return tsdbSttFileDataIterNext(pIter, pFilterInfo); return tsdbSttFileDataIterNext(pIter, pFilterInfo);
} else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) {
return tsdbTombFileDataIterNext(pIter, pFilterInfo);
} else { } else {
ASSERT(0); ASSERT(0);
return code; return code;
@ -725,23 +782,27 @@ _exit:
return code; return code;
} }
static int32_t tsdbSnapReadGetTombData(STsdbSnapReader* pReader, void* pDelInfo) { static void tsdbSnapReadGetTombData(STsdbSnapReader* pReader, SDelInfo** ppDelInfo) {
int32_t code = 0; if (pReader->pTIter == NULL || (pReader->pTIter->delInfo.suid == 0 && pReader->pTIter->delInfo.uid == 0)) {
int32_t lino = 0; *ppDelInfo = NULL;
} else {
ASSERT(0); *ppDelInfo = &pReader->pTIter->delInfo;
// TODO
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
} }
return code;
} }
static int32_t tsdbSnapReadNextTombData(STsdbSnapReader* pReader, void* pDelInfo) { static int32_t tsdbSnapReadNextTombData(STsdbSnapReader* pReader, SDelInfo** ppDelInfo) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
// TODO
code = tsdbDataIterNext2(
pReader->pTIter,
&(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION, .sver = pReader->sver, .ever = pReader->ever});
TSDB_CHECK_CODE(code, lino, _exit);
if (ppDelInfo) {
tsdbSnapReadGetTombData(pReader, ppDelInfo);
}
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
@ -755,6 +816,7 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData)
STsdb* pTsdb = pReader->pTsdb; STsdb* pTsdb = pReader->pTsdb;
// open tombstone data iter if need
if (pReader->pDelFReader == NULL) { if (pReader->pDelFReader == NULL) {
if (pReader->fs.pDelFile == NULL) goto _exit; if (pReader->fs.pDelFile == NULL) goto _exit;
@ -764,15 +826,16 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData)
code = tsdbOpenTombFileDataIter(pReader->pDelFReader, &pReader->pTIter); code = tsdbOpenTombFileDataIter(pReader->pDelFReader, &pReader->pTIter);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (pReader->pTIter) {
code = tsdbSnapReadNextTombData(pReader, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
}
} }
struct { // loop to get tombstone data
int64_t suid; SDelInfo* pDelInfo;
int64_t uid; tsdbSnapReadGetTombData(pReader, &pDelInfo);
SDelData dData;
}* pDelInfo;
code = tsdbSnapReadGetTombData(pReader, &pDelInfo);
TSDB_CHECK_CODE(code, lino, _exit);
if (pDelInfo == NULL) goto _exit; if (pDelInfo == NULL) goto _exit;
@ -786,7 +849,7 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData)
} }
while (pDelInfo && pDelInfo->suid == pReader->tbid.suid && pDelInfo->uid == pReader->tbid.uid) { while (pDelInfo && pDelInfo->suid == pReader->tbid.suid && pDelInfo->uid == pReader->tbid.uid) {
if (taosArrayPush(pReader->aDelData, &pDelInfo->dData) < 0) { if (taosArrayPush(pReader->aDelData, &pDelInfo->delData) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -795,6 +858,7 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData)
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
// encode tombstone data
if (taosArrayGetSize(pReader->aDelData) > 0) { if (taosArrayGetSize(pReader->aDelData) > 0) {
code = tsdbSnapCmprTombData(pReader, ppData); code = tsdbSnapCmprTombData(pReader, ppData);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
@ -870,9 +934,9 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
if (pReader->pDelFReader) { if (pReader->pDelFReader) {
tsdbDelFReaderClose(&pReader->pDelFReader); tsdbDelFReaderClose(&pReader->pDelFReader);
} }
taosArrayDestroy(pReader->aDelData);
// timeseries // timeseries
tBlockDataDestroy(&pReader->bData, 1);
while (pReader->iterList) { while (pReader->iterList) {
STsdbDataIter2* pIter = pReader->iterList; STsdbDataIter2* pIter = pReader->iterList;
pReader->iterList = pIter->next; pReader->iterList = pIter->next;
@ -881,6 +945,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
if (pReader->pDataFReader) { if (pReader->pDataFReader) {
tsdbDataFReaderClose(&pReader->pDataFReader); tsdbDataFReaderClose(&pReader->pDataFReader);
} }
tBlockDataDestroy(&pReader->bData, 1);
// other // other
tDestroyTSchema(pReader->skmTable.pTSchema); tDestroyTSchema(pReader->skmTable.pTSchema);