refact more
This commit is contained in:
parent
99d223f912
commit
5dcfd2a642
|
@ -289,23 +289,31 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
|
pCols->maxPoints = maxRows;
|
||||||
if (pCols->cols == NULL) {
|
|
||||||
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols, strerror(errno));
|
if (maxCols > 0) {
|
||||||
tdFreeDataCols(pCols);
|
pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
|
||||||
return NULL;
|
if (pCols->cols == NULL) {
|
||||||
|
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols,
|
||||||
|
strerror(errno));
|
||||||
|
tdFreeDataCols(pCols);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pCols->maxCols = maxCols;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCols->maxRowSize = maxRowSize;
|
pCols->maxRowSize = maxRowSize;
|
||||||
pCols->maxCols = maxCols;
|
|
||||||
pCols->maxPoints = maxRows;
|
|
||||||
pCols->bufSize = maxRowSize * maxRows;
|
pCols->bufSize = maxRowSize * maxRows;
|
||||||
|
|
||||||
pCols->buf = malloc(pCols->bufSize);
|
if (pCols->bufSize > 0) {
|
||||||
if (pCols->buf == NULL) {
|
pCols->buf = malloc(pCols->bufSize);
|
||||||
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols, strerror(errno));
|
if (pCols->buf == NULL) {
|
||||||
tdFreeDataCols(pCols);
|
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols,
|
||||||
return NULL;
|
strerror(errno));
|
||||||
|
tdFreeDataCols(pCols);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return pCols;
|
return pCols;
|
||||||
|
|
|
@ -70,7 +70,7 @@ typedef struct {
|
||||||
|
|
||||||
STsdbFS *tsdbNewFS(int keep, int days);
|
STsdbFS *tsdbNewFS(int keep, int days);
|
||||||
void * tsdbFreeFS(STsdbFS *pfs);
|
void * tsdbFreeFS(STsdbFS *pfs);
|
||||||
int tdbOpenFS(STsdbFS *pFs, int keep, int days);
|
int tsdbOpenFS(STsdbFS *pFs, int keep, int days);
|
||||||
void tsdbCloseFS(STsdbFS *pFs);
|
void tsdbCloseFS(STsdbFS *pFs);
|
||||||
int tsdbStartTxn(STsdbFS *pfs);
|
int tsdbStartTxn(STsdbFS *pfs);
|
||||||
int tsdbEndTxn(STsdbFS *pfs);
|
int tsdbEndTxn(STsdbFS *pfs);
|
||||||
|
|
|
@ -287,6 +287,12 @@ typedef struct {
|
||||||
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
|
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
|
||||||
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
|
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
|
||||||
#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
|
#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
|
||||||
|
#define TSDB_FSET_SET_CLOSED(s) \
|
||||||
|
do { \
|
||||||
|
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \
|
||||||
|
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \
|
||||||
|
} \
|
||||||
|
} while (0);
|
||||||
|
|
||||||
void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver);
|
void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver);
|
||||||
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
|
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
|
||||||
|
|
|
@ -16,9 +16,6 @@
|
||||||
#ifndef _TD_TSDB_READ_IMPL_H_
|
#ifndef _TD_TSDB_READ_IMPL_H_
|
||||||
#define _TD_TSDB_READ_IMPL_H_
|
#define _TD_TSDB_READ_IMPL_H_
|
||||||
|
|
||||||
#include "taosdef.h"
|
|
||||||
#include "tdataformat.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
@ -78,16 +75,16 @@ typedef struct {
|
||||||
|
|
||||||
struct SReadH {
|
struct SReadH {
|
||||||
STsdbRepo * pRepo;
|
STsdbRepo * pRepo;
|
||||||
SDFileSet rSet; // File set
|
SDFileSet rSet; // FSET to read
|
||||||
SArray * aBlkIdx;
|
SArray * aBlkIdx; // SBlockIdx array
|
||||||
STable * pTable; // Table info
|
STable * pTable; // table to read
|
||||||
SBlockIdx * pBlkIdx;
|
SBlockIdx * pBlkIdx; // current reading table SBlockIdx
|
||||||
int cidx;
|
int cidx;
|
||||||
SBlockInfo *pBlkInfo;
|
SBlockInfo *pBlkInfo;
|
||||||
SBlockData *pBlkData; // Block info
|
SBlockData *pBlkData; // Block info
|
||||||
SDataCols * pDCols[2];
|
SDataCols * pDCols[2];
|
||||||
void * pBuf;
|
void * pBuf; // buffer
|
||||||
void * pCBuf;
|
void * pCBuf; // compression buffer
|
||||||
};
|
};
|
||||||
|
|
||||||
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
|
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
|
||||||
|
|
|
@ -724,8 +724,8 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
|
||||||
SBlockIdx* compIndex = pQueryHandle->rhelper.pBlkIdx;
|
SBlockIdx* compIndex = pQueryHandle->rhelper.pBlkIdx;
|
||||||
|
|
||||||
// no data block in this file, try next file
|
// no data block in this file, try next file
|
||||||
if (compIndex->len == 0 || compIndex->numOfBlocks == 0 || compIndex->uid != pCheckInfo->tableId.uid) {
|
if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId.uid) {
|
||||||
continue; // no data blocks in the file belongs to pCheckInfo->pTable
|
continue; // no data blocks in the file belongs to pCheckInfo->pTable
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCheckInfo->compSize < (int32_t)compIndex->len) {
|
if (pCheckInfo->compSize < (int32_t)compIndex->len) {
|
||||||
|
|
|
@ -34,9 +34,7 @@ int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) {
|
||||||
memset((void *)pReadh, 0, sizeof(*pReadh));
|
memset((void *)pReadh, 0, sizeof(*pReadh));
|
||||||
pReadh->pRepo = pRepo;
|
pReadh->pRepo = pRepo;
|
||||||
|
|
||||||
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) {
|
TSDB_FSET_SET_CLOSED(TSDB_READ_FSET(pReadh));
|
||||||
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(TSDB_READ_FSET(pReadh), ftype));
|
|
||||||
}
|
|
||||||
|
|
||||||
pReadh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
|
pReadh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
|
||||||
if (pReadh->aBlkIdx == NULL) {
|
if (pReadh->aBlkIdx == NULL) {
|
||||||
|
@ -83,9 +81,7 @@ int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet) {
|
||||||
tsdbResetReadFile(pReadh);
|
tsdbResetReadFile(pReadh);
|
||||||
|
|
||||||
pReadh->rSet = *pSet;
|
pReadh->rSet = *pSet;
|
||||||
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) {
|
TSDB_FSET_SET_CLOSED(TSDB_READ_FSET(pReadh));
|
||||||
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(TSDB_READ_FSET(pReadh), ftype));
|
|
||||||
}
|
|
||||||
if (tsdbOpenDFileSet(TSDB_READ_FSET(pReadh), O_RDONLY) < 0) return -1;
|
if (tsdbOpenDFileSet(TSDB_READ_FSET(pReadh), O_RDONLY) < 0) return -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -233,6 +229,8 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT(pBlkIdx->tid == pReadh->pBlkInfo->tid && pBlkIdx->uid == pReadh->pBlkInfo->uid);
|
||||||
|
|
||||||
if (pTarget) {
|
if (pTarget) {
|
||||||
memcpy(pTarget, (void *)(pReadh->pBlkInfo), pBlkIdx->len);
|
memcpy(pTarget, (void *)(pReadh->pBlkInfo), pBlkIdx->len);
|
||||||
}
|
}
|
||||||
|
@ -410,7 +408,7 @@ static void tsdbResetReadFile(SReadH *pReadh) {
|
||||||
static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols) {
|
static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols) {
|
||||||
ASSERT(pBlock->numOfSubBlocks >= 0 && pBlock->numOfSubBlocks <= 1);
|
ASSERT(pBlock->numOfSubBlocks >= 0 && pBlock->numOfSubBlocks <= 1);
|
||||||
|
|
||||||
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_HEAD_FILE(pReadh);
|
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
|
||||||
|
|
||||||
tdResetDataCols(pDataCols);
|
tdResetDataCols(pDataCols);
|
||||||
if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlock->len) < 0) return -1;
|
if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlock->len) < 0) return -1;
|
||||||
|
@ -426,7 +424,8 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
|
||||||
int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pBlock->len);
|
int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pBlock->len);
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
tsdbError("vgId:%d failed to load block data part while read file %s sinces %s, offset:%" PRId64 " len :%d",
|
tsdbError("vgId:%d failed to load block data part while read file %s sinces %s, offset:%" PRId64 " len :%d",
|
||||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset, pBlock->len);
|
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset,
|
||||||
|
pBlock->len);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -479,16 +478,12 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
|
||||||
if (tcolId == pDataCol->colId) {
|
if (tcolId == pDataCol->colId) {
|
||||||
if (pBlock->algorithm == TWO_STAGE_COMP) {
|
if (pBlock->algorithm == TWO_STAGE_COMP) {
|
||||||
int zsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES;
|
int zsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES;
|
||||||
if (IS_VAR_DATA_TYPE(pDataCol->type)) {
|
|
||||||
zsize += (sizeof(VarDataLenT) * pBlock->numOfRows);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), zsize) < 0) return -1;
|
if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), zsize) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbCheckAndDecodeColumnData(pDataCol, POINTER_SHIFT(pBlockData, tsize + toffset), tlen, pBlock->algorithm,
|
if (tsdbCheckAndDecodeColumnData(pDataCol, POINTER_SHIFT(pBlockData, tsize + toffset), tlen, pBlock->algorithm,
|
||||||
pBlock->numOfRows, pDataCols->maxPoints, TSDB_READ_COMP_BUF(pReadh),
|
pBlock->numOfRows, pDataCols->maxPoints, TSDB_READ_COMP_BUF(pReadh),
|
||||||
(int32_t)taosTSizeof(TSDB_READ_COMP_BUF(pReadh))) < 0) {
|
taosTSizeof(TSDB_READ_COMP_BUF(pReadh))) < 0) {
|
||||||
tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %d",
|
tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %d",
|
||||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tcolId, (int64_t)pBlock->offset, toffset);
|
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tcolId, (int64_t)pBlock->offset, toffset);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -622,11 +617,11 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
|
||||||
ASSERT(pDataCol->colId == pBlockCol->colId);
|
ASSERT(pDataCol->colId == pBlockCol->colId);
|
||||||
|
|
||||||
STsdbRepo *pRepo = TSDB_READ_REPO(pReadh);
|
STsdbRepo *pRepo = TSDB_READ_REPO(pReadh);
|
||||||
STsdbCfg * pCfg = &(pRepo->config);
|
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||||
int tsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES;
|
int tsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES;
|
||||||
|
|
||||||
if (tsdbMakeRoom((void **)(&(pReadh->pBuf)), pBlockCol->len) < 0) return -1;
|
if (tsdbMakeRoom((void **)(&(TSDB_READ_BUF(pReadh))), pBlockCol->len) < 0) return -1;
|
||||||
if (tsdbMakeRoom((void **)(&(pReadh->pCBuf)), tsize) < 0) return -1;
|
if (tsdbMakeRoom((void **)(&(TSDB_READ_COMP_BUF(pReadh))), tsize) < 0) return -1;
|
||||||
|
|
||||||
int64_t offset = pBlock->offset + TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols) + pBlockCol->offset;
|
int64_t offset = pBlock->offset + TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols) + pBlockCol->offset;
|
||||||
if (tsdbSeekDFile(pDFile, offset, SEEK_SET) < 0) {
|
if (tsdbSeekDFile(pDFile, offset, SEEK_SET) < 0) {
|
||||||
|
@ -635,7 +630,7 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t nread = tsdbReadDFile(pDFile, pReadh->pBuf, pBlockCol->len);
|
int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pBlockCol->len);
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
tsdbError("vgId:%d failed to load block column data while read file %s sinces %s, offset:%" PRId64 " len :%d",
|
tsdbError("vgId:%d failed to load block column data while read file %s sinces %s, offset:%" PRId64 " len :%d",
|
||||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), offset, pBlockCol->len);
|
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), offset, pBlockCol->len);
|
||||||
|
|
Loading…
Reference in New Issue