refact
This commit is contained in:
parent
278a88a21e
commit
5c796c8bda
|
@ -40,15 +40,6 @@ typedef enum {
|
|||
TSDB_FILE_MANIFEST
|
||||
} TSDB_FILE_T;
|
||||
|
||||
#define tsdbOpenFile(T, f, flags) tsdbOpen##T(f, flags)
|
||||
#define tsdbCloseFile(T, f) tsdbClose##T(f)
|
||||
#define tsdbSeekFile(T, f, offset, whence) tsdbSeek##T(f, offset, whence)
|
||||
#define tsdbWriteFile(T, f, buf, nbytes) tsdbWrite##T(f, buf, nbytes)
|
||||
#define tsdbUpdateFileMagic(T, f, pCksum) tsdbUpdate##T##Magic(f, pCksum)
|
||||
#define tsdbTellFile(T, f) tsdbTell##T(f)
|
||||
#define tsdbEncodeFile(T, buf, f) tsdbEncode##T(buf, f)
|
||||
#define tsdbDecodeFile(T, buf, f) tsdbDecode##T(buf, f)
|
||||
|
||||
// =============== SMFile
|
||||
typedef struct {
|
||||
int64_t size;
|
||||
|
@ -68,7 +59,7 @@ void tsdbInitMFile(SMFile* pMFile, int vid, int ver, SMFInfo* pInfo);
|
|||
int tsdbEncodeSMFile(void** buf, SMFile* pMFile);
|
||||
void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
|
||||
|
||||
static FORCE_INLINE int tsdbOpenSMFile(SMFile* pMFile, int flags) {
|
||||
static FORCE_INLINE int tsdbOpenMFile(SMFile* pMFile, int flags) {
|
||||
ASSERT(!TSDB_FILE_OPENED(pMFile));
|
||||
|
||||
pMFile->fd = open(TSDB_FILE_FULL_NAME(pMFile), flags);
|
||||
|
@ -80,14 +71,14 @@ static FORCE_INLINE int tsdbOpenSMFile(SMFile* pMFile, int flags) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tsdbCloseSMFile(SMFile* pMFile) {
|
||||
static FORCE_INLINE void tsdbCloseMFile(SMFile* pMFile) {
|
||||
if (TSDB_FILE_OPENED(pMFile)) {
|
||||
close(pMFile->fd);
|
||||
TSDB_FILE_SET_CLOSED(pMFile);
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE int64_t tsdbSeekSMFile(SMFile* pMFile, int64_t offset, int whence) {
|
||||
static FORCE_INLINE int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence) {
|
||||
ASSERT(TSDB_FILE_OPENED(pMFile));
|
||||
|
||||
int64_t loffset = taosLSeek(TSDB_FILE_FD(pMFile), offset, whence);
|
||||
|
@ -99,7 +90,7 @@ static FORCE_INLINE int64_t tsdbSeekSMFile(SMFile* pMFile, int64_t offset, int w
|
|||
return loffset;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int64_t tsdbWriteSMFile(SMFile* pMFile, void* buf, int64_t nbyte) {
|
||||
static FORCE_INLINE int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte) {
|
||||
ASSERT(TSDB_FILE_OPENED(pMFile));
|
||||
|
||||
int64_t nwrite = taosWrite(pMFile->fd, buf, nbyte);
|
||||
|
@ -111,11 +102,11 @@ static FORCE_INLINE int64_t tsdbWriteSMFile(SMFile* pMFile, void* buf, int64_t n
|
|||
return nwrite;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tsdbUpdateSMFileMagic(SMFile* pMFile, void* pCksum) {
|
||||
static FORCE_INLINE void tsdbUpdateMFileMagic(SMFile* pMFile, void* pCksum) {
|
||||
pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t*)(pCksum), sizeof(TSCKSUM));
|
||||
}
|
||||
|
||||
static FORCE_INLINE int64_t tsdbTellSMFile(SMFile* pMFile) { return tsdbSeekSMFile(pMFile, 0, SEEK_CUR); }
|
||||
static FORCE_INLINE int64_t tsdbTellMFile(SMFile* pMFile) { return tsdbSeekMFile(pMFile, 0, SEEK_CUR); }
|
||||
|
||||
// =============== SDFile
|
||||
typedef struct {
|
||||
|
@ -140,7 +131,7 @@ void tsdbInitDFileWithOld(SDFile* pDFile, SDFile* pOldDFile);
|
|||
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
|
||||
void* tsdbDecodeSDFile(void* buf, SDFile* pDFile);
|
||||
|
||||
static FORCE_INLINE int tsdbOpenSDFile(SDFile *pDFile, int flags) {
|
||||
static FORCE_INLINE int tsdbOpenDFile(SDFile *pDFile, int flags) {
|
||||
ASSERT(!TSDB_FILE_OPENED(pDFile));
|
||||
|
||||
pDFile->fd = open(pDFile->f.aname, flags);
|
||||
|
@ -152,14 +143,14 @@ static FORCE_INLINE int tsdbOpenSDFile(SDFile *pDFile, int flags) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tsdbCloseSDFile(SDFile* pDFile) {
|
||||
static FORCE_INLINE void tsdbCloseDFile(SDFile* pDFile) {
|
||||
if (TSDB_FILE_OPENED(pDFile)) {
|
||||
close(pDFile->fd);
|
||||
TSDB_FILE_SET_CLOSED(pDFile);
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE int64_t tsdbSeekSDFile(SDFile *pDFile, int64_t offset, int whence) {
|
||||
static FORCE_INLINE int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence) {
|
||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||
|
||||
int64_t loffset = taosLSeek(pDFile->fd, offset, whence);
|
||||
|
@ -171,7 +162,7 @@ static FORCE_INLINE int64_t tsdbSeekSDFile(SDFile *pDFile, int64_t offset, int w
|
|||
return loffset;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int64_t tsdbWriteSDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
|
||||
static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
|
||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||
|
||||
int64_t nwrite = taosWrite(pDFile->fd, buf, nbyte);
|
||||
|
@ -183,20 +174,20 @@ static FORCE_INLINE int64_t tsdbWriteSDFile(SDFile* pDFile, void* buf, int64_t n
|
|||
return nwrite;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int64_t tsdbAppendSDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset) {
|
||||
static FORCE_INLINE int64_t tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset) {
|
||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||
int64_t nwrite;
|
||||
|
||||
*offset = tsdbSeekSDFile(pDFile, 0, SEEK_SET);
|
||||
*offset = tsdbSeekDFile(pDFile, 0, SEEK_SET);
|
||||
if (*offset < 0) return -1;
|
||||
|
||||
nwrite = tsdbWriteSDFile(pDFile, buf, nbyte);
|
||||
nwrite = tsdbWriteDFile(pDFile, buf, nbyte);
|
||||
if (nwrite < 0) return nwrite;
|
||||
|
||||
return nwrite;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int64_t tsdbReadSDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
|
||||
static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
|
||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||
|
||||
int64_t nread = taosRead(pDFile->fd, buf, nbyte);
|
||||
|
@ -208,9 +199,9 @@ static FORCE_INLINE int64_t tsdbReadSDFile(SDFile* pDFile, void* buf, int64_t nb
|
|||
return nread;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int64_t tsdbTellSDFile(SDFile *pDFile) { return tsdbSeekSDFile(pDFile, 0, SEEK_CUR); }
|
||||
static FORCE_INLINE int64_t tsdbTellDFile(SDFile *pDFile) { return tsdbSeekDFile(pDFile, 0, SEEK_CUR); }
|
||||
|
||||
static FORCE_INLINE void tsdbUpdateSDFileMagic(SDFile* pDFile, void* pCksm) {
|
||||
static FORCE_INLINE void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm) {
|
||||
pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t*)(pCksm), sizeof(TSCKSUM));
|
||||
}
|
||||
|
||||
|
|
|
@ -92,7 +92,7 @@ struct SReadH {
|
|||
|
||||
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
|
||||
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
|
||||
#define TSDB_READ_FSET(rh) &((rh)->rSet)
|
||||
#define TSDB_READ_FSET(rh) (&((rh)->rSet))
|
||||
#define TSDB_READ_TABLE(ch) ((rh)->pTable)
|
||||
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
|
||||
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
|
||||
|
@ -111,7 +111,7 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable);
|
|||
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget);
|
||||
int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo);
|
||||
int tsdbLoadBlockDataCols(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo, const int16_t *colIds,
|
||||
const int numOfColsIds);
|
||||
int numOfColsIds);
|
||||
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
|
||||
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
|
||||
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
|
||||
|
|
|
@ -18,7 +18,10 @@
|
|||
|
||||
// TODO: remove the include
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <inttypes.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
|
||||
#include "os.h"
|
||||
#include "tlog.h"
|
||||
|
@ -27,6 +30,7 @@
|
|||
#include "tchecksum.h"
|
||||
#include "tskiplist.h"
|
||||
#include "tdataformat.h"
|
||||
#include "tscompression.h"
|
||||
#include "tlockfree.h"
|
||||
#include "tlist.h"
|
||||
#include "hash.h"
|
||||
|
|
|
@ -174,6 +174,7 @@ int tsdbOpenDFileSet(SDFileSet *pSet, int flags) {
|
|||
|
||||
void tsdbCloseDFileSet(SDFileSet *pSet) {
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
|
||||
tsdbCloseDFile(pDFile);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,12 +13,12 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tchecksum.h"
|
||||
#include "tsdbMain.h"
|
||||
#include "tsdbint.h"
|
||||
|
||||
#define TSDB_KEY_COL_OFFSET 0
|
||||
|
||||
static void tsdbResetReadH(SReadH *pReadh);
|
||||
static void tsdbResetReadTable(SReadH *pReadh);
|
||||
static void tsdbResetReadFile(SReadH *pReadh);
|
||||
static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols);
|
||||
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows,
|
||||
int maxPoints, char *buffer, int bufferSize);
|
||||
|
@ -27,31 +27,31 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, const SBlock *pBlock, SDat
|
|||
static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol);
|
||||
|
||||
int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) {
|
||||
ASSERT(pReadh != NULL);
|
||||
ASSERT(pReadh != NULL && pRepo != NULL);
|
||||
|
||||
STsdbCfg *pCfg = REPO_CFG(pRepo);
|
||||
|
||||
memset((void *)pReadh, 0, sizeof(*pReadh));
|
||||
pReadh->pRepo = pRepo;
|
||||
|
||||
pReadh->aBlkIdx = taosArrayInit(sizeof(SBlockIdx), 1024);
|
||||
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(TSDB_READ_FSET(pReadh), ftype));
|
||||
}
|
||||
|
||||
pReadh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
|
||||
if (pReadh->aBlkIdx == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pReadh->pDCols[0] = tdNewDataCols();
|
||||
pReadh->pDCols[0] = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock);
|
||||
if (pReadh->pDCols[0] == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tsdbDestroyReadH(pReadh);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pReadh->pDCols[0] = tdNewDataCols();
|
||||
if (pReadh->pDCols[0] == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tsdbDestroyReadH(pReadh);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pReadh->pDCols[1] = tdNewDataCols();
|
||||
pReadh->pDCols[1] = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock);
|
||||
if (pReadh->pDCols[1] == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tsdbDestroyReadH(pReadh);
|
||||
|
@ -69,7 +69,7 @@ void tsdbDestroyReadH(SReadH *pReadh) {
|
|||
pReadh->pDCols[0] = tdFreeDataCols(pReadh->pDCols[0]);
|
||||
pReadh->pDCols[1] = tdFreeDataCols(pReadh->pDCols[1]);
|
||||
pReadh->pBlkData = taosTZfree(pReadh->pBlkData);
|
||||
pReadh->pBlkInfo = tdFreeDataCols(pReadh->pBlkInfo);
|
||||
pReadh->pBlkInfo = taosTZfree(pReadh->pBlkInfo);
|
||||
pReadh->cidx = 0;
|
||||
pReadh->pBlkIdx = NULL;
|
||||
pReadh->pTable = NULL;
|
||||
|
@ -79,20 +79,22 @@ void tsdbDestroyReadH(SReadH *pReadh) {
|
|||
}
|
||||
|
||||
int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet) {
|
||||
tsdbResetReadH(pReadh);
|
||||
ASSERT(pSet != NULL);
|
||||
tsdbResetReadFile(pReadh);
|
||||
|
||||
pReadh->rSet = *pSet;
|
||||
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(TSDB_READ_FSET(pReadh), ftype));
|
||||
}
|
||||
if (tsdbOpenDFileSet(TSDB_READ_FSET(pReadh), O_RDONLY) < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tsdbCloseAndUnsetFSet(SReadH *pReadh) {
|
||||
// TODO
|
||||
}
|
||||
void tsdbCloseAndUnsetFSet(SReadH *pReadh) { tsdbResetReadFile(pReadh); }
|
||||
|
||||
int tsdbLoadBlockIdx(SReadH *pReadh) {
|
||||
SDFile * pHeadf = TSDB_DFILE_IN_SET(TSDB_READ_FSET(pReadh));
|
||||
SDFile * pHeadf = TSDB_READ_HEAD_FILE(pReadh);
|
||||
SBlockIdx blkIdx;
|
||||
|
||||
ASSERT(taosArrayGetSize(pReadh->aBlkIdx) == 0);
|
||||
|
@ -144,7 +146,7 @@ int tsdbLoadBlockIdx(SReadH *pReadh) {
|
|||
|
||||
tsize++;
|
||||
ASSERT(tsize == 1 || ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 2))->tid <
|
||||
((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 1))->tid)
|
||||
((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 1))->tid);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -153,6 +155,8 @@ int tsdbLoadBlockIdx(SReadH *pReadh) {
|
|||
int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
|
||||
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
||||
|
||||
pReadh->pTable = pTable;
|
||||
|
||||
if (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
@ -163,33 +167,32 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
|
||||
size_t size = taosArrayGetSize(pReadh->aBlkIdx);
|
||||
if (size > 0) {
|
||||
while (true) {
|
||||
if (pReadh->cidx >= size) {
|
||||
pReadh->pBlockIdx = NULL;
|
||||
pReadh->pBlkIdx = NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx);
|
||||
if (pBlkIdx->tid == TABLE_TID(pTable)) {
|
||||
if (pBlkIdx->uid == TABLE_UID(pTable)) {
|
||||
pReadh->pBlockIdx = pBlkIdx;
|
||||
pReadh->pBlkIdx = pBlkIdx;
|
||||
} else {
|
||||
pReadh->pBlockIdx = NULL;
|
||||
pReadh->pBlkIdx = NULL;
|
||||
}
|
||||
pReadh->cidx++;
|
||||
break;
|
||||
} else if (pBlkIdx->tid > TABLE_TID(pTable)) {
|
||||
pReadh->pBlockIdx = NULL;
|
||||
pReadh->pBlkIdx = NULL;
|
||||
break;
|
||||
} else {
|
||||
pReadh->cidx++;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
pReadh->pBlockIdx = NULL;
|
||||
pReadh->pBlkIdx = NULL;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -202,8 +205,8 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
|
|||
SBlockIdx *pBlkIdx = pReadh->pBlkIdx;
|
||||
|
||||
if (tsdbSeekDFile(pHeadf, pBlkIdx->offset, SEEK_SET) < 0) {
|
||||
tsdbError("vgId:%d failed to load SBlockInfo part while seek file %s to offset %u since %s",
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, tstrerror(terrno));
|
||||
tsdbError("vgId:%d failed to load SBlockInfo part while seek file %s since %s, offset:%u len:%u",
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pBlkIdx->offset, pBlkIdx->len);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -231,28 +234,26 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
|
|||
}
|
||||
|
||||
if (pTarget) {
|
||||
memcpy(pTarget, (void *)pReadh->pBlkInfo, pBlkIdx->len);
|
||||
memcpy(pTarget, (void *)(pReadh->pBlkInfo), pBlkIdx->len);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo) {
|
||||
int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlkInfo) {
|
||||
ASSERT(pBlock->numOfSubBlocks > 0);
|
||||
|
||||
const SBlock *iBlock = pBlock;
|
||||
if (pBlock->numOfSubBlocks > 1) {
|
||||
if (pBlockInfo) {
|
||||
iBlock = POINTER_SHIFT(pBlockInfo, pBlock->offset);
|
||||
if (pBlkInfo) {
|
||||
iBlock = (SBlock *)POINTER_SHIFT(pBlkInfo, pBlock->offset);
|
||||
} else {
|
||||
iBlock = POINTER_SHIFT(pReadh->pBlkInfo, pBlock->offset);
|
||||
iBlock = (SBlock *)POINTER_SHIFT(pReadh->pBlkInfo, pBlock->offset);
|
||||
}
|
||||
}
|
||||
|
||||
tdResetDataCols(pReadh->pDCols[0]);
|
||||
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[0]) < 0) return -1;
|
||||
for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
|
||||
tdResetDataCols(pReadh->pDCols[1]);
|
||||
iBlock++;
|
||||
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1;
|
||||
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows) < 0) return -1;
|
||||
|
@ -265,23 +266,21 @@ int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pB
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tsdbLoadBlockDataCols(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo, const int16_t *colIds,
|
||||
const int numOfColsIds) {
|
||||
int tsdbLoadBlockDataCols(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlkInfo, const int16_t *colIds,
|
||||
int numOfColsIds) {
|
||||
ASSERT(pBlock->numOfSubBlocks > 0);
|
||||
|
||||
const SBlock *iBlock = pBlock;
|
||||
if (pBlock->numOfSubBlocks > 1) {
|
||||
if (pBlockInfo) {
|
||||
iBlock = POINTER_SHIFT(pBlockInfo, pBlock->offset);
|
||||
if (pBlkInfo) {
|
||||
iBlock = POINTER_SHIFT(pBlkInfo, pBlock->offset);
|
||||
} else {
|
||||
iBlock = POINTER_SHIFT(pReadh->pBlkInfo, pBlock->offset);
|
||||
}
|
||||
}
|
||||
|
||||
tdResetDataCols(pReadh->pDCols[0]);
|
||||
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[0], colIds, numOfColsIds) < 0) return -1;
|
||||
for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
|
||||
tdResetDataCols(pReadh->pDCols[1]);
|
||||
iBlock++;
|
||||
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1;
|
||||
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows) < 0) return -1;
|
||||
|
@ -297,21 +296,21 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo
|
|||
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
|
||||
ASSERT(pBlock->numOfSubBlocks <= 1);
|
||||
|
||||
SDFile *pFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
|
||||
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
|
||||
|
||||
if (tsdbSeekDFile(pFile, pBlock->offset, SEEK_SET) < 0) {
|
||||
tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %u since %s",
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlock->offset, tstrerror(terrno));
|
||||
if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
|
||||
tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
size_t size = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols);
|
||||
if (tsdbMakeRoom((void **)(&(pReadh->pBlkData), size)) < 0) return -1;
|
||||
if (tsdbMakeRoom((void **)(&(pReadh->pBlkData)), size) < 0) return -1;
|
||||
|
||||
int64_t nread = tsdbReadDFile(pFile, (void *)(pReadh->pBlkData), size);
|
||||
int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size);
|
||||
if (nread < 0) {
|
||||
tsdbError("vgId:%d failed to load block statis part while read file %s sinces %s, offset:%" PRId64 " len :%" PRIzu,
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pFile), tstrerror(terrno), pBlock->offset, size);
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), pBlock->offset, size);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -319,14 +318,14 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
|
|||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu
|
||||
" read bytes: %" PRId64,
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pFile), pBlock->offset, size, nread);
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, size, nread);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), size)) {
|
||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu,
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pFile), pBlock->offset, size);
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, size);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -395,12 +394,16 @@ void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) {
|
|||
}
|
||||
}
|
||||
|
||||
static void tsdbResetReadH(SReadH *pReadh) {
|
||||
static void tsdbResetReadTable(SReadH *pReadh) {
|
||||
tdResetDataCols(pReadh->pDCols[0]);
|
||||
tdResetDataCols(pReadh->pDCols[1]);
|
||||
pReadh->cidx = 0;
|
||||
pReadh->pBlkIdx = NULL;
|
||||
pReadh->pTable = NULL;
|
||||
}
|
||||
|
||||
static void tsdbResetReadFile(SReadH *pReadh) {
|
||||
tsdbResetReadTable(pReadh);
|
||||
taosArrayClear(pReadh->aBlkIdx);
|
||||
tsdbCloseDFileSet(TSDB_READ_FSET(pReadh));
|
||||
}
|
||||
|
@ -410,17 +413,18 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols
|
|||
|
||||
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_HEAD_FILE(pReadh);
|
||||
|
||||
if (tsdbMakeRoom((void **)(&(pReadh->pBuf)), pBlock->len) < 0) return -1;
|
||||
tdResetDataCols(pDataCols);
|
||||
if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlock->len) < 0) return -1;
|
||||
|
||||
SBlockData *pBlockData = (SBlockData *)(pReadh->pBuf);
|
||||
SBlockData *pBlockData = (SBlockData *)TSDB_READ_BUF(pReadh);
|
||||
|
||||
if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
|
||||
tsdbError("vgId:%d failed to load block data part while seek file %s to offset %u since %s",
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlock->offset, tstrerror(terrno));
|
||||
tsdbError("vgId:%d failed to load block data part while seek file %s to offset %" PRId64 " since %s",
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
int64_t nread = tsdbReadDFile(pDFile, pReadh->pBuf, pBlock->len);
|
||||
int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pBlock->len);
|
||||
if (nread < 0) {
|
||||
tsdbError("vgId:%d failed to load block data part while read file %s sinces %s, offset:%" PRId64 " len :%d",
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), pBlock->offset, pBlock->len);
|
||||
|
@ -429,20 +433,21 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols
|
|||
|
||||
if (nread < pBlock->len) {
|
||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
tsdbError("vgId:%d block data part in file %s is corrupted, offset:%" PRId64 " expected bytes:%d" PRIzu
|
||||
" read bytes: %" PRId64,
|
||||
tsdbError("vgId:%d block data part in file %s is corrupted, offset:%" PRId64
|
||||
" expected bytes:%d read bytes: %" PRId64,
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, pBlock->len, nread);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t tsize = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols);
|
||||
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBuf), tsize)) {
|
||||
if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) {
|
||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%d",
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, tsize);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(tsize < pBlock->len);
|
||||
ASSERT(pBlockData->numOfCols == pBlock->numOfCols);
|
||||
|
||||
pDataCols->numOfRows = pBlock->numOfRows;
|
||||
|
@ -475,21 +480,24 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols
|
|||
if (tcolId == pDataCol->colId) {
|
||||
if (pBlock->algorithm == TWO_STAGE_COMP) {
|
||||
int zsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES;
|
||||
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (IS_VAR_DATA_TYPE(pDataCol->type)) {
|
||||
zsize += (sizeof(VarDataLenT) * pBlock->numOfRows);
|
||||
}
|
||||
|
||||
if (tsdbMakeRoom((void **)(&(pReadh->pCBuf)), zsize) < 0) return -1;
|
||||
if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), zsize) < 0) return -1;
|
||||
}
|
||||
|
||||
if (tsdbCheckAndDecodeColumnData(pDataCol, POINTER_SHIFT(pBlockData, tsize + toffset), tlen, pBlock->algorithm,
|
||||
pBlock->numOfRows, pDataCols->maxPoints, pReadh->pCBuf,
|
||||
(int32_t)taosTSizeof(pReadh->pCBuf)) < 0) {
|
||||
pBlock->numOfRows, pDataCols->maxPoints, TSDB_READ_COMP_BUF(pReadh),
|
||||
(int32_t)taosTSizeof(TSDB_READ_COMP_BUF(pReadh))) < 0) {
|
||||
tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %d",
|
||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tcolId, (int64_t)pBlock->offset, toffset);
|
||||
return -1;
|
||||
}
|
||||
if (dcol != 0) ccol++;
|
||||
|
||||
if (dcol != 0) {
|
||||
ccol++;
|
||||
}
|
||||
dcol++;
|
||||
} else if (tcolId < pDataCol->colId) {
|
||||
ccol++;
|
||||
|
@ -528,19 +536,22 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
|
|||
memcpy(pDataCol->pData, content, pDataCol->len);
|
||||
}
|
||||
|
||||
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (IS_VAR_DATA_TYPE(pDataCol->type)) {
|
||||
dataColSetOffset(pDataCol, numOfRows);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) {
|
||||
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
|
||||
int numOfColIds) {
|
||||
ASSERT(pBlock->numOfSubBlocks <= 1);
|
||||
ASSERT(colIds[0] == 0);
|
||||
|
||||
SDFile * pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
|
||||
SBlockCol blockCol = {0};
|
||||
|
||||
tdResetDataCols(pDataCols);
|
||||
|
||||
// If only load timestamp column, no need to load SBlockData part
|
||||
if (numOfColIds > 1 && tsdbLoadBlockStatis(pReadh, pBlock) < 0) return -1;
|
||||
|
||||
|
@ -584,7 +595,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, const SBlock *pBlock, SData
|
|||
break;
|
||||
}
|
||||
|
||||
pBlockCol = &(pReadh->pBlockData->cols[ccol]);
|
||||
pBlockCol = &(pReadh->pBlkData->cols[ccol]);
|
||||
if (pBlockCol->colId > colId) {
|
||||
pBlockCol = NULL;
|
||||
break;
|
||||
|
@ -642,7 +653,7 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
|
|||
|
||||
if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows,
|
||||
pCfg->maxRowsPerFileBlock, pReadh->pCBuf, (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) {
|
||||
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_NAME(pFile),
|
||||
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_NAME(pDFile),
|
||||
pBlockCol->colId, offset);
|
||||
return -1;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue