more vnode snapshot reader

This commit is contained in:
Hongze Cheng 2022-07-13 06:26:13 +00:00
parent b691e76782
commit 2fd04241d6
4 changed files with 45 additions and 47 deletions

View File

@ -331,7 +331,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519)
#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a)
#define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b)
#define TSDB_CODE_VND_READ_END TAOS_DEF_ERROR_CODE(0, 0x051c)
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)

View File

@ -34,8 +34,8 @@ struct STsdbSnapReader {
// for del file
int8_t delDone;
SDelFReader* pDelFReader;
SArray* aDelIdx; // SArray<SDelIdx>
int32_t iDelIdx;
SArray* aDelIdx; // SArray<SDelIdx>
SArray* aDelData; // SArray<SDelData>
};
@ -117,7 +117,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
if (code) goto _err;
}
// org data (todo)
// org data
// compress data (todo)
int32_t size = sizeof(TABLEID) + tPutBlockData(NULL, &pReader->nBlockData);
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
@ -137,9 +138,10 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData);
tsdbInfo("vgId:%d vnode snapshot read data, fid:%d suid:%" PRId64 " uid:%" PRId64
" iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d",
" iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d size:%d",
TD_VID(pTsdb->pVnode), pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid,
pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow);
pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow,
size);
goto _exit;
}
@ -161,7 +163,6 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
if (pReader->pDelFReader == NULL) {
if (pDelFile == NULL) {
code = TSDB_CODE_VND_READ_END;
goto _exit;
}
@ -176,15 +177,20 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
pReader->iDelIdx = 0;
}
while (pReader->iDelIdx < taosArrayGetSize(pReader->aDelIdx)) {
while (true) {
if (pReader->iDelIdx >= taosArrayGetSize(pReader->aDelIdx)) {
tsdbDelFReaderClose(&pReader->pDelFReader);
break;
}
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pReader->aDelIdx, pReader->iDelIdx);
int32_t size = 0;
pReader->iDelIdx++;
code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData, NULL);
if (code) goto _err;
int32_t size = 0;
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);
@ -193,46 +199,44 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
}
}
if (size > 0) {
int64_t n = 0;
if (size == 0) continue;
size = size + sizeof(SSnapDataHdr) + sizeof(TABLEID);
code = tRealloc(ppData, size);
if (code) goto _err;
// SSnapDataHdr
SSnapDataHdr* pSnapDataHdr = (SSnapDataHdr*)(*ppData + n);
pSnapDataHdr->type = 1;
pSnapDataHdr->size = size; // TODO: size here may incorrect
n += sizeof(SSnapDataHdr);
// TABLEID
TABLEID* pId = (TABLEID*)(*ppData + n);
pId->suid = pDelIdx->suid;
pId->uid = pDelIdx->uid;
n += sizeof(*pId);
// DATA
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);
if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
n += tPutDelData(*ppData + n, pDelData);
}
}
goto _exit;
// org data
size = sizeof(TABLEID) + size;
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
code = TSDB_CODE_VND_READ_END;
tsdbDelFReaderClose(&pReader->pDelFReader);
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = 2;
pHdr->size = size;
TABLEID* pId = (TABLEID*)(&pHdr[1]);
pId->suid = pDelIdx->suid;
pId->uid = pDelIdx->uid;
int32_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID);
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);
if (pDelData->version < pReader->sver) continue;
if (pDelData->version > pReader->ever) continue;
n += tPutDelData((*ppData) + n, pDelData);
}
tsdbInfo("vgId:%d vnode snapshot tsdb read del data, suid:%" PRId64 " uid:%d" PRId64 " size:%d",
TD_VID(pTsdb->pVnode), pDelIdx->suid, pDelIdx->uid, size);
break;
}
_exit:
return code;
_err:
tsdbError("vgId:%d snap read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d vnode snapshot tsdb read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
@ -342,9 +346,6 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
}
_exit:
if (*ppData) {
} else {
}
return code;
_err:

View File

@ -237,8 +237,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
// }
_exit:
vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->type, nData,
pHdr->index);
vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index,
pHdr->type, nData);
return code;
_err:

View File

@ -335,8 +335,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_READ_END, "Read end")
// tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")