refact: alloc just enough memory for snap data in tsdbSnapRAWReadData

This commit is contained in:
Benguang Zhao 2023-11-24 16:13:53 +08:00
parent 6c419423de
commit a504007ade
2 changed files with 49 additions and 40 deletions

View File

@ -60,27 +60,21 @@ int32_t tsdbDataFileRAWReaderClose(SDataFileRAWReader **reader) {
return 0; return 0;
} }
int32_t tsdbDataFileRAWReadBlockData(SDataFileRAWReader *reader, STsdbDataRAWBlockHeader *bHdr) { int32_t tsdbDataFileRAWReadBlockData(SDataFileRAWReader *reader, STsdbDataRAWBlockHeader *pBlock) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
bHdr->file.type = reader->config->file.type; pBlock->file.type = reader->config->file.type;
bHdr->file.fid = reader->config->file.fid; pBlock->file.fid = reader->config->file.fid;
bHdr->file.cid = reader->config->file.cid; pBlock->file.cid = reader->config->file.cid;
bHdr->file.size = reader->config->file.size; pBlock->file.size = reader->config->file.size;
bHdr->file.minVer = reader->config->file.minVer; pBlock->file.minVer = reader->config->file.minVer;
bHdr->file.maxVer = reader->config->file.maxVer; pBlock->file.maxVer = reader->config->file.maxVer;
bHdr->file.stt->level = reader->config->file.stt->level; pBlock->file.stt->level = reader->config->file.stt->level;
int64_t size = TMIN(bHdr->dataLength, reader->config->file.size - reader->ctx->offset); code = tsdbReadFile(reader->fd, pBlock->offset, pBlock->data, pBlock->dataLength);
ASSERT(size > 0);
bHdr->dataLength = 0;
bHdr->offset = reader->ctx->offset;
code = tsdbReadFile(reader->fd, bHdr->offset, bHdr->data, size);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
bHdr->dataLength = size;
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);

View File

@ -183,52 +183,54 @@ static int32_t tsdbSnapRAWReadFileSetCloseIter(STsdbSnapRAWReader* reader) {
return 0; return 0;
} }
static int32_t tsdbSnapRAWReadNext(STsdbSnapRAWReader* reader, STsdbDataRAWBlockHeader* bHdr) { static int64_t tsdbSnapRAWReadPeek(SDataFileRAWReader* reader) {
int64_t size = TMIN(reader->config->file.size - reader->ctx->offset, TSDB_SNAP_RAW_PAYLOAD_SIZE);
return size;
}
static int32_t tsdbSnapRAWReadNext(STsdbSnapRAWReader* reader, SSnapDataHdr** ppData) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
ppData[0] = NULL;
ASSERT(reader->dataIter->offset <= reader->dataIter->size); ASSERT(reader->dataIter->offset <= reader->dataIter->size);
ASSERT(reader->dataIter->idx <= reader->dataIter->count); ASSERT(reader->dataIter->idx <= reader->dataIter->count);
// dataReader
if (reader->dataIter->offset == reader->dataIter->size && reader->dataIter->idx < reader->dataIter->count) { if (reader->dataIter->offset == reader->dataIter->size && reader->dataIter->idx < reader->dataIter->count) {
reader->dataIter->idx++; reader->dataIter->idx++;
} }
if (reader->dataIter->idx == reader->dataIter->count) { if (reader->dataIter->idx == reader->dataIter->count) {
return 0; return 0;
} }
int8_t type = reader->type;
SDataFileRAWReader* dataReader = TARRAY2_GET(reader->dataReaderArr, reader->dataIter->idx); SDataFileRAWReader* dataReader = TARRAY2_GET(reader->dataReaderArr, reader->dataIter->idx);
code = tsdbDataFileRAWReadBlockData(dataReader, bHdr);
TSDB_CHECK_CODE(code, lino, _exit);
reader->dataIter->offset += bHdr->dataLength; // prepare
_exit: int64_t dataLength = tsdbSnapRAWReadPeek(dataReader);
if (code) { ASSERT(dataLength > 0);
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
return code;
}
static int32_t tsdbSnapRAWReadData(STsdbSnapRAWReader* reader, SSnapDataHdr** data) { void* pBuf = taosMemoryCalloc(1, sizeof(SSnapDataHdr) + sizeof(STsdbDataRAWBlockHeader) + dataLength);
int32_t code = 0;
int32_t lino = 0;
void* pBuf = taosMemoryCalloc(1, sizeof(SSnapDataHdr) + sizeof(STsdbDataRAWBlockHeader) + TSDB_SNAP_RAW_PAYLOAD_SIZE);
if (pBuf == NULL) { if (pBuf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
SSnapDataHdr* pHdr = pBuf; SSnapDataHdr* pHdr = pBuf;
pHdr->type = reader->type; pHdr->type = type;
STsdbDataRAWBlockHeader* pData = (void*)pHdr->data; pHdr->size = sizeof(STsdbDataRAWBlockHeader) + dataLength;
pData->dataLength = TSDB_SNAP_RAW_PAYLOAD_SIZE;
code = tsdbSnapRAWReadNext(reader, pData); STsdbDataRAWBlockHeader* pBlock = (void*)pHdr->data;
pBlock->offset = dataReader->ctx->offset;
pBlock->dataLength = dataLength;
// read
code = tsdbDataFileRAWReadBlockData(dataReader, pBlock);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(pData->dataLength > 0 && pData->dataLength <= TSDB_SNAP_RAW_PAYLOAD_SIZE); // finish
pHdr->size = sizeof(STsdbDataRAWBlockHeader) + pData->dataLength; reader->dataIter->offset += pBlock->dataLength;
ppData[0] = pBuf;
ASSERT(reader->dataIter->offset <= reader->dataIter->size);
_exit: _exit:
if (code) { if (code) {
@ -236,7 +238,20 @@ _exit:
pBuf = NULL; pBuf = NULL;
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
} }
data[0] = pBuf; return code;
}
static int32_t tsdbSnapRAWReadData(STsdbSnapRAWReader* reader, uint8_t** ppData) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbSnapRAWReadNext(reader, (SSnapDataHdr**)ppData);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
return code; return code;
} }
@ -288,7 +303,7 @@ int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* reader, uint8_t** data) {
} }
if (!reader->ctx->isDataDone) { if (!reader->ctx->isDataDone) {
code = tsdbSnapRAWReadData(reader, (SSnapDataHdr**)data); code = tsdbSnapRAWReadData(reader, data);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (data[0]) { if (data[0]) {
goto _exit; goto _exit;