refact code for further change
This commit is contained in:
parent
f963f5bfcc
commit
7fe743d00f
|
@ -14,382 +14,136 @@
|
|||
*/
|
||||
|
||||
#include "tsdb.h"
|
||||
// =============== PAGE-WISE FILE ===============
|
||||
typedef struct {
|
||||
TdFilePtr pFD;
|
||||
int32_t szPage;
|
||||
int32_t nBuf;
|
||||
uint8_t *pBuf;
|
||||
int64_t pgno;
|
||||
} STsdbFD;
|
||||
|
||||
// SDelFWriter ====================================================
|
||||
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) {
|
||||
int32_t tsdbOpenFile(const char *path, int32_t opt, STsdbFD *pFD) {
|
||||
int32_t code = 0;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
char hdr[TSDB_FHDR_SIZE] = {0};
|
||||
SDelFWriter *pDelFWriter;
|
||||
int64_t n;
|
||||
|
||||
// alloc
|
||||
pDelFWriter = (SDelFWriter *)taosMemoryCalloc(1, sizeof(*pDelFWriter));
|
||||
if (pDelFWriter == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
pDelFWriter->pTsdb = pTsdb;
|
||||
pDelFWriter->fDel = *pFile;
|
||||
|
||||
tsdbDelFileName(pTsdb, pFile, fname);
|
||||
pDelFWriter->pWriteH = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE);
|
||||
if (pDelFWriter->pWriteH == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// update header
|
||||
n = taosWriteFile(pDelFWriter->pWriteH, &hdr, TSDB_FHDR_SIZE);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pDelFWriter->fDel.size = TSDB_FHDR_SIZE;
|
||||
pDelFWriter->fDel.offset = 0;
|
||||
|
||||
*ppWriter = pDelFWriter;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, failed to open del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
*ppWriter = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync) {
|
||||
int32_t code = 0;
|
||||
SDelFWriter *pWriter = *ppWriter;
|
||||
STsdb *pTsdb = pWriter->pTsdb;
|
||||
|
||||
// sync
|
||||
if (sync && taosFsyncFile(pWriter->pWriteH) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// close
|
||||
if (taosCloseFile(&pWriter->pWriteH) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t *); iBuf++) {
|
||||
tFree(pWriter->aBuf[iBuf]);
|
||||
}
|
||||
taosMemoryFree(pWriter);
|
||||
|
||||
*ppWriter = NULL;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, failed to close del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelIdx) {
|
||||
int32_t code = 0;
|
||||
int64_t size;
|
||||
int64_t n;
|
||||
|
||||
// prepare
|
||||
size = sizeof(uint32_t);
|
||||
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
|
||||
size += tPutDelData(NULL, taosArrayGet(aDelData, iDelData));
|
||||
}
|
||||
size += sizeof(TSCKSUM);
|
||||
|
||||
// alloc
|
||||
code = tRealloc(&pWriter->aBuf[0], size);
|
||||
if (code) goto _err;
|
||||
|
||||
// build
|
||||
n = 0;
|
||||
n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT);
|
||||
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
|
||||
n += tPutDelData(pWriter->aBuf[0] + n, taosArrayGet(aDelData, iDelData));
|
||||
}
|
||||
taosCalcChecksumAppend(0, pWriter->aBuf[0], size);
|
||||
|
||||
ASSERT(n + sizeof(TSCKSUM) == size);
|
||||
|
||||
// write
|
||||
n = taosWriteFile(pWriter->pWriteH, pWriter->aBuf[0], size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
ASSERT(n == size);
|
||||
|
||||
// update
|
||||
pDelIdx->offset = pWriter->fDel.size;
|
||||
pDelIdx->size = size;
|
||||
pWriter->fDel.size += size;
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, failed to write del data since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx) {
|
||||
int32_t code = 0;
|
||||
int64_t size;
|
||||
int64_t n;
|
||||
SDelIdx *pDelIdx;
|
||||
|
||||
// prepare
|
||||
size = sizeof(uint32_t);
|
||||
for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
|
||||
size += tPutDelIdx(NULL, taosArrayGet(aDelIdx, iDelIdx));
|
||||
}
|
||||
size += sizeof(TSCKSUM);
|
||||
|
||||
// alloc
|
||||
code = tRealloc(&pWriter->aBuf[0], size);
|
||||
if (code) goto _err;
|
||||
|
||||
// build
|
||||
n = 0;
|
||||
n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT);
|
||||
for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
|
||||
n += tPutDelIdx(pWriter->aBuf[0] + n, taosArrayGet(aDelIdx, iDelIdx));
|
||||
}
|
||||
taosCalcChecksumAppend(0, pWriter->aBuf[0], size);
|
||||
|
||||
ASSERT(n + sizeof(TSCKSUM) == size);
|
||||
|
||||
// write
|
||||
n = taosWriteFile(pWriter->pWriteH, pWriter->aBuf[0], size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// update
|
||||
pWriter->fDel.offset = pWriter->fDel.size;
|
||||
pWriter->fDel.size += size;
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, write del idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter) {
|
||||
int32_t code = 0;
|
||||
char hdr[TSDB_FHDR_SIZE];
|
||||
int64_t size = TSDB_FHDR_SIZE;
|
||||
int64_t n;
|
||||
|
||||
// build
|
||||
memset(hdr, 0, size);
|
||||
tPutDelFile(hdr, &pWriter->fDel);
|
||||
taosCalcChecksumAppend(0, hdr, size);
|
||||
|
||||
// seek
|
||||
if (taosLSeekFile(pWriter->pWriteH, 0, SEEK_SET) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// write
|
||||
n = taosWriteFile(pWriter->pWriteH, hdr, size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, update del file hdr failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
// SDelFReader ====================================================
|
||||
struct SDelFReader {
|
||||
STsdb *pTsdb;
|
||||
SDelFile fDel;
|
||||
TdFilePtr pReadH;
|
||||
|
||||
uint8_t *aBuf[1];
|
||||
};
|
||||
|
||||
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
SDelFReader *pDelFReader;
|
||||
int64_t n;
|
||||
|
||||
// alloc
|
||||
pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
|
||||
if (pDelFReader == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open impl
|
||||
pDelFReader->pTsdb = pTsdb;
|
||||
pDelFReader->fDel = *pFile;
|
||||
|
||||
tsdbDelFileName(pTsdb, pFile, fname);
|
||||
pDelFReader->pReadH = taosOpenFile(fname, TD_FILE_READ);
|
||||
if (pDelFReader->pReadH == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
taosMemoryFree(pDelFReader);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
_exit:
|
||||
*ppReader = pDelFReader;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, del file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
*ppReader = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbDelFReaderClose(SDelFReader **ppReader) {
|
||||
int32_t code = 0;
|
||||
SDelFReader *pReader = *ppReader;
|
||||
|
||||
if (pReader) {
|
||||
if (taosCloseFile(&pReader->pReadH) < 0) {
|
||||
pFD->pFD = taosOpenFile(path, opt);
|
||||
if (pFD->pFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _exit;
|
||||
}
|
||||
for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) {
|
||||
tFree(pReader->aBuf[iBuf]);
|
||||
|
||||
pFD->szPage = 4096;
|
||||
pFD->pgno = 0;
|
||||
pFD->nBuf = 0;
|
||||
pFD->pBuf = taosMemoryMalloc(pFD->szPage);
|
||||
if (pFD->pBuf == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
taosMemoryFree(pReader);
|
||||
}
|
||||
*ppReader = NULL;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
|
||||
int32_t code = 0;
|
||||
int64_t offset = pDelIdx->offset;
|
||||
int64_t size = pDelIdx->size;
|
||||
int64_t n;
|
||||
|
||||
taosArrayClear(aDelData);
|
||||
|
||||
// seek
|
||||
if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
void tsdbCloseFile(STsdbFD *pFD) {
|
||||
taosMemoryFree(pFD->pBuf);
|
||||
taosCloseFile(&pFD->pFD);
|
||||
}
|
||||
|
||||
// alloc
|
||||
code = tRealloc(&pReader->aBuf[0], size);
|
||||
if (code) goto _err;
|
||||
int32_t tsdbSyncFile(STsdbFD *pFD) {
|
||||
int32_t code = 0;
|
||||
|
||||
// read
|
||||
n = taosReadFile(pReader->pReadH, pReader->aBuf[0], size);
|
||||
if (taosFsyncFile(pFD->pFD) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbWriteFile(STsdbFD *pFD, uint8_t *pBuf, int32_t nBuf, int64_t *offset) {
|
||||
int32_t code = 0;
|
||||
|
||||
int32_t n = 0;
|
||||
while (n < nBuf) {
|
||||
int32_t remain = pFD->szPage - pFD->nBuf - sizeof(TSCKSUM);
|
||||
int32_t size = TMIN(remain, nBuf - n);
|
||||
|
||||
memcpy(pFD->pBuf + pFD->nBuf, pBuf + n, size);
|
||||
n += size;
|
||||
pFD->nBuf += size;
|
||||
|
||||
if (pFD->nBuf + sizeof(TSCKSUM) == pFD->szPage) {
|
||||
taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
|
||||
|
||||
int64_t n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
} else if (n < size) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// check
|
||||
if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// // decode
|
||||
n = 0;
|
||||
|
||||
uint32_t delimiter;
|
||||
n += tGetU32(pReader->aBuf[0] + n, &delimiter);
|
||||
while (n < size - sizeof(TSCKSUM)) {
|
||||
SDelData delData;
|
||||
n += tGetDelData(pReader->aBuf[0] + n, &delData);
|
||||
|
||||
if (taosArrayPush(aDelData, &delData) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
pFD->nBuf = 0;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(n == size - sizeof(TSCKSUM));
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, read del data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
|
||||
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
|
||||
int32_t code = 0;
|
||||
int32_t n;
|
||||
int64_t offset = pReader->fDel.offset;
|
||||
int64_t size = pReader->fDel.size - offset;
|
||||
|
||||
taosArrayClear(aDelIdx);
|
||||
|
||||
// seek
|
||||
if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// alloc
|
||||
code = tRealloc(&pReader->aBuf[0], size);
|
||||
if (code) goto _err;
|
||||
|
||||
// read
|
||||
n = taosReadFile(pReader->pReadH, pReader->aBuf[0], size);
|
||||
int64_t n = taosLSeekFile(pFD->pFD, pgno * pFD->szPage, SEEK_SET);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
} else if (n < size) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _exit;
|
||||
} else if (n < pFD->szPage) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// check
|
||||
if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
|
||||
if (!taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// decode
|
||||
n = 0;
|
||||
uint32_t delimiter;
|
||||
n += tGetU32(pReader->aBuf[0] + n, &delimiter);
|
||||
ASSERT(delimiter == TSDB_FILE_DLMT);
|
||||
|
||||
while (n < size - sizeof(TSCKSUM)) {
|
||||
SDelIdx delIdx;
|
||||
|
||||
n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx);
|
||||
|
||||
if (taosArrayPush(aDelIdx, &delIdx) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(n == size - sizeof(TSCKSUM));
|
||||
pFD->pgno = pgno;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
int64_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t count) {
|
||||
int32_t code = 0;
|
||||
|
||||
int64_t pgno = offset / pFD->szPage;
|
||||
int64_t n = 0;
|
||||
if (pFD->pgno == pgno) {
|
||||
int64_t bOff = offset % pFD->szPage;
|
||||
int64_t nRead = TMIN(pFD->szPage - bOff - sizeof(TSCKSUM), count);
|
||||
memcpy(pBuf + n, pFD->pBuf + bOff, nRead);
|
||||
n = nRead;
|
||||
}
|
||||
|
||||
while (n < count) {
|
||||
code = tsdbReadFilePage(pFD, pgno);
|
||||
if (code) goto _exit;
|
||||
|
||||
pgno++;
|
||||
|
||||
int64_t nRead = TMIN(pFD->szPage - sizeof(TSCKSUM), count - n);
|
||||
memcpy(pBuf + n, pFD->pBuf, nRead);
|
||||
n += nRead;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1609,135 +1363,380 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
// =============== PAGE-WISE FILE ===============
|
||||
typedef struct {
|
||||
TdFilePtr pFD;
|
||||
int32_t szPage;
|
||||
int32_t nBuf;
|
||||
uint8_t *pBuf;
|
||||
int64_t pgno;
|
||||
} STsdbFD;
|
||||
|
||||
int32_t tsdbOpenFile(const char *path, int32_t opt, STsdbFD *pFD) {
|
||||
// SDelFWriter ====================================================
|
||||
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
char hdr[TSDB_FHDR_SIZE] = {0};
|
||||
SDelFWriter *pDelFWriter;
|
||||
int64_t n;
|
||||
|
||||
pFD->pFD = taosOpenFile(path, opt);
|
||||
if (pFD->pFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
pFD->szPage = 4096;
|
||||
pFD->pgno = 0;
|
||||
pFD->nBuf = 0;
|
||||
pFD->pBuf = taosMemoryMalloc(pFD->szPage);
|
||||
if (pFD->pBuf == NULL) {
|
||||
// alloc
|
||||
pDelFWriter = (SDelFWriter *)taosMemoryCalloc(1, sizeof(*pDelFWriter));
|
||||
if (pDelFWriter == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
pDelFWriter->pTsdb = pTsdb;
|
||||
pDelFWriter->fDel = *pFile;
|
||||
|
||||
tsdbDelFileName(pTsdb, pFile, fname);
|
||||
pDelFWriter->pWriteH = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE);
|
||||
if (pDelFWriter->pWriteH == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// update header
|
||||
n = taosWriteFile(pDelFWriter->pWriteH, &hdr, TSDB_FHDR_SIZE);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pDelFWriter->fDel.size = TSDB_FHDR_SIZE;
|
||||
pDelFWriter->fDel.offset = 0;
|
||||
|
||||
*ppWriter = pDelFWriter;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, failed to open del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
*ppWriter = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync) {
|
||||
int32_t code = 0;
|
||||
SDelFWriter *pWriter = *ppWriter;
|
||||
STsdb *pTsdb = pWriter->pTsdb;
|
||||
|
||||
// sync
|
||||
if (sync && taosFsyncFile(pWriter->pWriteH) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// close
|
||||
if (taosCloseFile(&pWriter->pWriteH) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t *); iBuf++) {
|
||||
tFree(pWriter->aBuf[iBuf]);
|
||||
}
|
||||
taosMemoryFree(pWriter);
|
||||
|
||||
*ppWriter = NULL;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, failed to close del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelIdx) {
|
||||
int32_t code = 0;
|
||||
int64_t size;
|
||||
int64_t n;
|
||||
|
||||
// prepare
|
||||
size = sizeof(uint32_t);
|
||||
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
|
||||
size += tPutDelData(NULL, taosArrayGet(aDelData, iDelData));
|
||||
}
|
||||
size += sizeof(TSCKSUM);
|
||||
|
||||
// alloc
|
||||
code = tRealloc(&pWriter->aBuf[0], size);
|
||||
if (code) goto _err;
|
||||
|
||||
// build
|
||||
n = 0;
|
||||
n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT);
|
||||
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
|
||||
n += tPutDelData(pWriter->aBuf[0] + n, taosArrayGet(aDelData, iDelData));
|
||||
}
|
||||
taosCalcChecksumAppend(0, pWriter->aBuf[0], size);
|
||||
|
||||
ASSERT(n + sizeof(TSCKSUM) == size);
|
||||
|
||||
// write
|
||||
n = taosWriteFile(pWriter->pWriteH, pWriter->aBuf[0], size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
ASSERT(n == size);
|
||||
|
||||
// update
|
||||
pDelIdx->offset = pWriter->fDel.size;
|
||||
pDelIdx->size = size;
|
||||
pWriter->fDel.size += size;
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, failed to write del data since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx) {
|
||||
int32_t code = 0;
|
||||
int64_t size;
|
||||
int64_t n;
|
||||
SDelIdx *pDelIdx;
|
||||
|
||||
// prepare
|
||||
size = sizeof(uint32_t);
|
||||
for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
|
||||
size += tPutDelIdx(NULL, taosArrayGet(aDelIdx, iDelIdx));
|
||||
}
|
||||
size += sizeof(TSCKSUM);
|
||||
|
||||
// alloc
|
||||
code = tRealloc(&pWriter->aBuf[0], size);
|
||||
if (code) goto _err;
|
||||
|
||||
// build
|
||||
n = 0;
|
||||
n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT);
|
||||
for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
|
||||
n += tPutDelIdx(pWriter->aBuf[0] + n, taosArrayGet(aDelIdx, iDelIdx));
|
||||
}
|
||||
taosCalcChecksumAppend(0, pWriter->aBuf[0], size);
|
||||
|
||||
ASSERT(n + sizeof(TSCKSUM) == size);
|
||||
|
||||
// write
|
||||
n = taosWriteFile(pWriter->pWriteH, pWriter->aBuf[0], size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// update
|
||||
pWriter->fDel.offset = pWriter->fDel.size;
|
||||
pWriter->fDel.size += size;
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, write del idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter) {
|
||||
int32_t code = 0;
|
||||
char hdr[TSDB_FHDR_SIZE];
|
||||
int64_t size = TSDB_FHDR_SIZE;
|
||||
int64_t n;
|
||||
|
||||
// build
|
||||
memset(hdr, 0, size);
|
||||
tPutDelFile(hdr, &pWriter->fDel);
|
||||
taosCalcChecksumAppend(0, hdr, size);
|
||||
|
||||
// seek
|
||||
if (taosLSeekFile(pWriter->pWriteH, 0, SEEK_SET) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// write
|
||||
n = taosWriteFile(pWriter->pWriteH, hdr, size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, update del file hdr failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
// SDelFReader ====================================================
|
||||
struct SDelFReader {
|
||||
STsdb *pTsdb;
|
||||
SDelFile fDel;
|
||||
TdFilePtr pReadH;
|
||||
|
||||
uint8_t *aBuf[1];
|
||||
};
|
||||
|
||||
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
SDelFReader *pDelFReader;
|
||||
int64_t n;
|
||||
|
||||
// alloc
|
||||
pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
|
||||
if (pDelFReader == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open impl
|
||||
pDelFReader->pTsdb = pTsdb;
|
||||
pDelFReader->fDel = *pFile;
|
||||
|
||||
tsdbDelFileName(pTsdb, pFile, fname);
|
||||
pDelFReader->pReadH = taosOpenFile(fname, TD_FILE_READ);
|
||||
if (pDelFReader->pReadH == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
taosMemoryFree(pDelFReader);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
_exit:
|
||||
*ppReader = pDelFReader;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, del file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
*ppReader = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbDelFReaderClose(SDelFReader **ppReader) {
|
||||
int32_t code = 0;
|
||||
SDelFReader *pReader = *ppReader;
|
||||
|
||||
if (pReader) {
|
||||
if (taosCloseFile(&pReader->pReadH) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _exit;
|
||||
}
|
||||
for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) {
|
||||
tFree(pReader->aBuf[iBuf]);
|
||||
}
|
||||
taosMemoryFree(pReader);
|
||||
}
|
||||
*ppReader = NULL;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
void tsdbCloseFile(STsdbFD *pFD) {
|
||||
taosMemoryFree(pFD->pBuf);
|
||||
taosCloseFile(&pFD->pFD);
|
||||
}
|
||||
|
||||
int32_t tsdbSyncFile(STsdbFD *pFD) {
|
||||
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
|
||||
int32_t code = 0;
|
||||
int64_t offset = pDelIdx->offset;
|
||||
int64_t size = pDelIdx->size;
|
||||
int64_t n;
|
||||
|
||||
if (taosFsyncFile(pFD->pFD) < 0) {
|
||||
taosArrayClear(aDelData);
|
||||
|
||||
// seek
|
||||
if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _exit;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
// alloc
|
||||
code = tRealloc(&pReader->aBuf[0], size);
|
||||
if (code) goto _err;
|
||||
|
||||
int32_t tsdbWriteFile(STsdbFD *pFD, uint8_t *pBuf, int32_t nBuf, int64_t *offset) {
|
||||
int32_t code = 0;
|
||||
|
||||
int32_t n = 0;
|
||||
while (n < nBuf) {
|
||||
int32_t remain = pFD->szPage - pFD->nBuf - sizeof(TSCKSUM);
|
||||
int32_t size = TMIN(remain, nBuf - n);
|
||||
|
||||
memcpy(pFD->pBuf + pFD->nBuf, pBuf + n, size);
|
||||
n += size;
|
||||
pFD->nBuf += size;
|
||||
|
||||
if (pFD->nBuf + sizeof(TSCKSUM) == pFD->szPage) {
|
||||
taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
|
||||
|
||||
int64_t n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
|
||||
// read
|
||||
n = taosReadFile(pReader->pReadH, pReader->aBuf[0], size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
pFD->nBuf = 0;
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
|
||||
int32_t code = 0;
|
||||
|
||||
int64_t n = taosLSeekFile(pFD->pFD, pgno * pFD->szPage, SEEK_SET);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _exit;
|
||||
} else if (n < pFD->szPage) {
|
||||
goto _err;
|
||||
} else if (n < size) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _exit;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (!taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
|
||||
// check
|
||||
if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _exit;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pFD->pgno = pgno;
|
||||
// // decode
|
||||
n = 0;
|
||||
|
||||
_exit:
|
||||
uint32_t delimiter;
|
||||
n += tGetU32(pReader->aBuf[0] + n, &delimiter);
|
||||
while (n < size - sizeof(TSCKSUM)) {
|
||||
SDelData delData;
|
||||
n += tGetDelData(pReader->aBuf[0] + n, &delData);
|
||||
|
||||
if (taosArrayPush(aDelData, &delData) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(n == size - sizeof(TSCKSUM));
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, read del data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int64_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t count) {
|
||||
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
|
||||
int32_t code = 0;
|
||||
int32_t n;
|
||||
int64_t offset = pReader->fDel.offset;
|
||||
int64_t size = pReader->fDel.size - offset;
|
||||
|
||||
int64_t pgno = offset / pFD->szPage;
|
||||
int64_t n = 0;
|
||||
if (pFD->pgno == pgno) {
|
||||
int64_t bOff = offset % pFD->szPage;
|
||||
int64_t nRead = TMIN(pFD->szPage - bOff - sizeof(TSCKSUM), count);
|
||||
memcpy(pBuf + n, pFD->pBuf + bOff, nRead);
|
||||
n = nRead;
|
||||
taosArrayClear(aDelIdx);
|
||||
|
||||
// seek
|
||||
if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
while (n < count) {
|
||||
code = tsdbReadFilePage(pFD, pgno);
|
||||
if (code) goto _exit;
|
||||
// alloc
|
||||
code = tRealloc(&pReader->aBuf[0], size);
|
||||
if (code) goto _err;
|
||||
|
||||
pgno++;
|
||||
|
||||
int64_t nRead = TMIN(pFD->szPage - sizeof(TSCKSUM), count - n);
|
||||
memcpy(pBuf + n, pFD->pBuf, nRead);
|
||||
n += nRead;
|
||||
// read
|
||||
n = taosReadFile(pReader->pReadH, pReader->aBuf[0], size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
} else if (n < size) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
_exit:
|
||||
// check
|
||||
if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// decode
|
||||
n = 0;
|
||||
uint32_t delimiter;
|
||||
n += tGetU32(pReader->aBuf[0] + n, &delimiter);
|
||||
ASSERT(delimiter == TSDB_FILE_DLMT);
|
||||
|
||||
while (n < size - sizeof(TSCKSUM)) {
|
||||
SDelIdx delIdx;
|
||||
|
||||
n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx);
|
||||
|
||||
if (taosArrayPush(aDelIdx, &delIdx) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(n == size - sizeof(TSCKSUM));
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
Loading…
Reference in New Issue