make more compile

This commit is contained in:
Hongze Cheng 2022-01-07 08:21:28 +00:00
parent b51c6ae1ef
commit 507763258c
11 changed files with 213 additions and 96 deletions

View File

@ -22,16 +22,38 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SDataStatis {
int16_t colId;
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
} SDataStatis;
typedef struct STable {
int32_t tid;
uint64_t uid;
STSchema *pSchema;
} STable;
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
// TYPES EXPOSED // TYPES EXPOSED
typedef struct STsdb STsdb; typedef struct STsdb STsdb;
typedef struct STsdbCfg { typedef struct STsdbCfg {
int8_t precision; int8_t precision;
uint64_t lruCacheSize; uint64_t lruCacheSize;
uint32_t keep;
uint32_t keep1;
uint32_t keep2;
int32_t daysPerFile; int32_t daysPerFile;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
int32_t keep;
int32_t keep1;
int32_t keep2;
int8_t update;
} STsdbCfg; } STsdbCfg;
// STsdb // STsdb

View File

@ -10,6 +10,9 @@ else(0)
"src/tsdbMemTable.c" "src/tsdbMemTable.c"
"src/tsdbOptions.c" "src/tsdbOptions.c"
"src/tsdbWrite.c" "src/tsdbWrite.c"
"src/tsdbReadImpl.c"
"src/tsdbFile.c"
# "src/tsdbFS.c"
) )
endif(0) endif(0)
@ -25,4 +28,5 @@ target_link_libraries(
PUBLIC util PUBLIC util
PUBLIC common PUBLIC common
PUBLIC tkv PUBLIC tkv
PUBLIC tfs
) )

View File

@ -16,7 +16,6 @@
#ifndef _TD_TSDB_COMMIT_H_ #ifndef _TD_TSDB_COMMIT_H_
#define _TD_TSDB_COMMIT_H_ #define _TD_TSDB_COMMIT_H_
#if 0
typedef struct { typedef struct {
int minFid; int minFid;
int midFid; int midFid;
@ -30,6 +29,7 @@ typedef struct {
int64_t size; int64_t size;
} SKVRecord; } SKVRecord;
#if 0
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) #define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);

View File

@ -17,6 +17,7 @@
#define _TD_TSDB_DEF_H_ #define _TD_TSDB_DEF_H_
#include "mallocator.h" #include "mallocator.h"
#include "tcompression.h"
#include "tglobal.h" #include "tglobal.h"
#include "thash.h" #include "thash.h"
#include "tlist.h" #include "tlist.h"
@ -25,9 +26,14 @@
#include "ttime.h" #include "ttime.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsdbCommit.h"
// #include "tsdbFS.h"
#include "tsdbFile.h"
#include "tsdbLog.h" #include "tsdbLog.h"
#include "tsdbMemTable.h" #include "tsdbMemTable.h"
#include "tsdbMemory.h"
#include "tsdbOptions.h" #include "tsdbOptions.h"
#include "tsdbReadImpl.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -39,10 +45,14 @@ struct STsdb {
STsdbCfg config; STsdbCfg config;
STsdbMemTable * mem; STsdbMemTable * mem;
STsdbMemTable * imem; STsdbMemTable * imem;
SRtn rtn;
SMemAllocatorFactory *pmaf; SMemAllocatorFactory *pmaf;
// STsdbFS fs;
}; };
#define REPO_ID(r) (r)->vgId #define REPO_ID(r) 0
#define REPO_CFG(r) (&(r)->config)
// #define REPO_FS(r) (&(r)->fs)
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -16,7 +16,7 @@
#ifndef _TD_TSDB_FS_H_ #ifndef _TD_TSDB_FS_H_
#define _TD_TSDB_FS_H_ #define _TD_TSDB_FS_H_
#if 0 #include "tsdbFile.h"
#define TSDB_FS_VERSION 0 #define TSDB_FS_VERSION 0
@ -39,19 +39,17 @@ typedef struct {
// ================== // ==================
typedef struct { typedef struct {
STsdbFSMeta meta; // FS meta STsdbFSMeta meta; // FS meta
SMFile* pmf; // meta file pointer SArray * df; // data file array
SMFile mf; // meta file
SArray* df; // data file array
} SFSStatus; } SFSStatus;
typedef struct { typedef struct {
pthread_rwlock_t lock; pthread_rwlock_t lock;
SFSStatus* cstatus; // current status SFSStatus *cstatus; // current status
SHashObj* metaCache; // meta cache SHashObj * metaCache; // meta cache
SHashObj* metaCacheComp; // meta cache for compact SHashObj * metaCacheComp; // meta cache for compact
bool intxn; bool intxn;
SFSStatus* nstatus; // new status SFSStatus *nstatus; // new status
} STsdbFS; } STsdbFS;
#define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus) #define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus)
@ -63,10 +61,10 @@ typedef struct {
typedef struct { typedef struct {
int direction; int direction;
uint64_t version; // current FS version uint64_t version; // current FS version
STsdbFS* pfs; STsdbFS * pfs;
int index; // used to position next fset when version the same int index; // used to position next fset when version the same
int fid; // used to seek when version is changed int fid; // used to seek when version is changed
SDFileSet* pSet; SDFileSet *pSet;
} SFSIter; } SFSIter;
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC #define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
@ -74,21 +72,21 @@ typedef struct {
STsdbFS *tsdbNewFS(STsdbCfg *pCfg); STsdbFS *tsdbNewFS(STsdbCfg *pCfg);
void * tsdbFreeFS(STsdbFS *pfs); void * tsdbFreeFS(STsdbFS *pfs);
int tsdbOpenFS(STsdbRepo *pRepo); int tsdbOpenFS(STsdb *pRepo);
void tsdbCloseFS(STsdbRepo *pRepo); void tsdbCloseFS(STsdb *pRepo);
void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd); void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd);
int tsdbEndFSTxn(STsdbRepo *pRepo); int tsdbEndFSTxn(STsdb *pRepo);
int tsdbEndFSTxnWithError(STsdbFS *pfs); int tsdbEndFSTxnWithError(STsdbFS *pfs);
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta); void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta);
void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile); // void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet); int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction); void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction);
void tsdbFSIterSeek(SFSIter *pIter, int fid); void tsdbFSIterSeek(SFSIter *pIter, int fid);
SDFileSet *tsdbFSIterNext(SFSIter *pIter); SDFileSet *tsdbFSIterNext(SFSIter *pIter);
int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta); int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta);
static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) { static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) {
int code = pthread_rwlock_rdlock(&(pFs->lock)); int code = pthread_rwlock_rdlock(&(pFs->lock));
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
@ -97,7 +95,7 @@ static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) {
return 0; return 0;
} }
static FORCE_INLINE int tsdbWLockFS(STsdbFS* pFs) { static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) {
int code = pthread_rwlock_wrlock(&(pFs->lock)); int code = pthread_rwlock_wrlock(&(pFs->lock));
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
@ -106,7 +104,7 @@ static FORCE_INLINE int tsdbWLockFS(STsdbFS* pFs) {
return 0; return 0;
} }
static FORCE_INLINE int tsdbUnLockFS(STsdbFS* pFs) { static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
int code = pthread_rwlock_unlock(&(pFs->lock)); int code = pthread_rwlock_unlock(&(pFs->lock));
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
@ -115,6 +113,4 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS* pFs) {
return 0; return 0;
} }
#endif
#endif /* _TD_TSDB_FS_H_ */ #endif /* _TD_TSDB_FS_H_ */

View File

@ -16,7 +16,8 @@
#ifndef _TS_TSDB_FILE_H_ #ifndef _TS_TSDB_FILE_H_
#define _TS_TSDB_FILE_H_ #define _TS_TSDB_FILE_H_
#if 0 #include "tchecksum.h"
#include "tfs.h"
#define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F #define TSDB_FILE_DELIMITER 0xF00AFA0F
@ -34,7 +35,7 @@
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1) #define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1)
#define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf)) #define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf))
#define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf)) #define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf))
#define TSDB_FILE_FSYNC(tf) taosFsync(TSDB_FILE_FD(tf)) #define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_FD(tf))
#define TSDB_FILE_STATE(tf) ((tf)->state) #define TSDB_FILE_STATE(tf) ((tf)->state)
#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s)) #define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) #define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
@ -42,6 +43,7 @@
typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T; typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T;
#if 0
// =============== SMFile // =============== SMFile
typedef struct { typedef struct {
int64_t size; int64_t size;
@ -68,7 +70,7 @@ int tsdbApplyMFileChange(SMFile* from, SMFile* to);
int tsdbCreateMFile(SMFile* pMFile, bool updateHeader); int tsdbCreateMFile(SMFile* pMFile, bool updateHeader);
int tsdbUpdateMFileHeader(SMFile* pMFile); int tsdbUpdateMFileHeader(SMFile* pMFile);
int tsdbLoadMFileHeader(SMFile* pMFile, SMFInfo* pInfo); int tsdbLoadMFileHeader(SMFile* pMFile, SMFInfo* pInfo);
int tsdbScanAndTryFixMFile(STsdbRepo* pRepo); int tsdbScanAndTryFixMFile(STsdb* pRepo);
int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo); int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo);
void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo); void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo);
@ -96,7 +98,7 @@ static FORCE_INLINE void tsdbCloseMFile(SMFile* pMFile) {
static FORCE_INLINE int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence) { static FORCE_INLINE int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence) {
ASSERT(TSDB_FILE_OPENED(pMFile)); ASSERT(TSDB_FILE_OPENED(pMFile));
int64_t loffset = taosLSeek(TSDB_FILE_FD(pMFile), offset, whence); int64_t loffset = taosLSeekFile(TSDB_FILE_FD(pMFile), offset, whence);
if (loffset < 0) { if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
@ -108,7 +110,7 @@ static FORCE_INLINE int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int wh
static FORCE_INLINE int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte) { static FORCE_INLINE int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pMFile)); ASSERT(TSDB_FILE_OPENED(pMFile));
int64_t nwrite = taosWrite(pMFile->fd, buf, nbyte); int64_t nwrite = taosWriteFile(pMFile->fd, buf, nbyte);
if (nwrite < nbyte) { if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
@ -150,7 +152,7 @@ static FORCE_INLINE int tsdbRemoveMFile(SMFile* pMFile) { return tfsremove(TSDB_
static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nbyte) { static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pMFile)); ASSERT(TSDB_FILE_OPENED(pMFile));
int64_t nread = taosRead(pMFile->fd, buf, nbyte); int64_t nread = taosReadFile(pMFile->fd, buf, nbyte);
if (nread < 0) { if (nread < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
@ -159,6 +161,8 @@ static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nby
return nread; return nread;
} }
#endif
// =============== SDFile // =============== SDFile
typedef struct { typedef struct {
uint32_t magic; uint32_t magic;
@ -210,7 +214,7 @@ static FORCE_INLINE void tsdbCloseDFile(SDFile* pDFile) {
static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence) { static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence) {
ASSERT(TSDB_FILE_OPENED(pDFile)); ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t loffset = taosLSeek(TSDB_FILE_FD(pDFile), offset, whence); int64_t loffset = taosLSeekFile(TSDB_FILE_FD(pDFile), offset, whence);
if (loffset < 0) { if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
@ -222,7 +226,7 @@ static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int wh
static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte) { static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile)); ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nwrite = taosWrite(pDFile->fd, buf, nbyte); int64_t nwrite = taosWriteFile(pDFile->fd, buf, nbyte);
if (nwrite < nbyte) { if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
@ -264,7 +268,7 @@ static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsremove(TSDB_
static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) { static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile)); ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nread = taosRead(pDFile->fd, buf, nbyte); int64_t nread = taosReadFile(pDFile->fd, buf, nbyte);
if (nread < 0) { if (nread < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
@ -316,7 +320,7 @@ void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet);
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to); int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader); int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader);
int tsdbUpdateDFileSetHeader(SDFileSet* pSet); int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet* pSet); int tsdbScanAndTryFixDFileSet(STsdb* pRepo, SDFileSet* pSet);
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) { static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
@ -366,5 +370,4 @@ static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) {
return true; return true;
} }
#endif
#endif /* _TS_TSDB_FILE_H_ */ #endif /* _TS_TSDB_FILE_H_ */

View File

@ -0,0 +1,74 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_MEMORY_H_
#define _TD_TSDB_MEMORY_H_
static void * taosTMalloc(size_t size);
static void * taosTCalloc(size_t nmemb, size_t size);
static void * taosTRealloc(void *ptr, size_t size);
static void * taosTZfree(void *ptr);
static size_t taosTSizeof(void *ptr);
static void taosTMemset(void *ptr, int c);
static FORCE_INLINE void *taosTMalloc(size_t size) {
if (size <= 0) return NULL;
void *ret = malloc(size + sizeof(size_t));
if (ret == NULL) return NULL;
*(size_t *)ret = size;
return (void *)((char *)ret + sizeof(size_t));
}
static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) {
size_t tsize = nmemb * size;
void * ret = taosTMalloc(tsize);
if (ret == NULL) return NULL;
taosTMemset(ret, 0);
return ret;
}
static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; }
static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); }
static FORCE_INLINE void * taosTRealloc(void *ptr, size_t size) {
if (ptr == NULL) return taosTMalloc(size);
if (size <= taosTSizeof(ptr)) return ptr;
void * tptr = (void *)((char *)ptr - sizeof(size_t));
size_t tsize = size + sizeof(size_t);
void* tptr1 = realloc(tptr, tsize);
if (tptr1 == NULL) return NULL;
tptr = tptr1;
*(size_t *)tptr = size;
return (void *)((char *)tptr + sizeof(size_t));
}
static FORCE_INLINE void* taosTZfree(void* ptr) {
if (ptr) {
free((void*)((char*)ptr - sizeof(size_t)));
}
return NULL;
}
#endif /* _TD_TSDB_MEMORY_H_ */

View File

@ -15,14 +15,14 @@
#ifndef _TD_TSDB_READ_IMPL_H_ #ifndef _TD_TSDB_READ_IMPL_H_
#define _TD_TSDB_READ_IMPL_H_ #define _TD_TSDB_READ_IMPL_H_
#if 0
#include "os.h"
#include "tfs.h" #include "tfs.h"
#include "tsdb.h" #include "tsdb.h"
#include "os.h"
#include "tsdbFile.h" #include "tsdbFile.h"
#include "tskiplist.h" #include "tskiplist.h"
#include "tsdbMeta.h" #include "tsdbMemory.h"
#include "common.h"
typedef struct SReadH SReadH; typedef struct SReadH SReadH;
@ -91,7 +91,7 @@ typedef struct {
} SBlockData; } SBlockData;
struct SReadH { struct SReadH {
STsdbRepo * pRepo; STsdb * pRepo;
SDFileSet rSet; // FSET to read SDFileSet rSet; // FSET to read
SArray * aBlkIdx; // SBlockIdx array SArray * aBlkIdx; // SBlockIdx array
STable * pTable; // table to read STable * pTable; // table to read
@ -116,7 +116,7 @@ struct SReadH {
#define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM)) #define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM))
int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo); int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo);
void tsdbDestroyReadH(SReadH *pReadh); void tsdbDestroyReadH(SReadH *pReadh);
int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet); int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet);
void tsdbCloseAndUnsetFSet(SReadH *pReadh); void tsdbCloseAndUnsetFSet(SReadH *pReadh);
@ -151,6 +151,4 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
return 0; return 0;
} }
#endif
#endif /*_TD_TSDB_READ_IMPL_H_*/ #endif /*_TD_TSDB_READ_IMPL_H_*/

View File

@ -13,9 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h"
#include "tsdbint.h"
#include <regex.h> #include <regex.h>
#include "os.h"
#include "tsdbDef.h"
typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T; typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T;
static const char *tsdbTxnFname[] = {"current.t", "current"}; static const char *tsdbTxnFname[] = {"current.t", "current"};
@ -26,16 +26,16 @@ static void tsdbResetFSStatus(SFSStatus *pStatus);
static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid); static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid);
static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo); static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo);
static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]); static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]);
static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo); static int tsdbOpenFSFromCurrent(STsdb *pRepo);
static int tsdbScanAndTryFixFS(STsdbRepo *pRepo); static int tsdbScanAndTryFixFS(STsdb *pRepo);
static int tsdbScanRootDir(STsdbRepo *pRepo); static int tsdbScanRootDir(STsdb *pRepo);
static int tsdbScanDataDir(STsdbRepo *pRepo); static int tsdbScanDataDir(STsdb *pRepo);
static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf); static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf);
static int tsdbRestoreCurrent(STsdbRepo *pRepo); static int tsdbRestoreCurrent(STsdb *pRepo);
static int tsdbComparTFILE(const void *arg1, const void *arg2); static int tsdbComparTFILE(const void *arg1, const void *arg2);
static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired); static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired);
static int tsdbProcessExpiredFS(STsdbRepo *pRepo); static int tsdbProcessExpiredFS(STsdb *pRepo);
static int tsdbCreateMeta(STsdbRepo *pRepo); static int tsdbCreateMeta(STsdb *pRepo);
// For backward compatibility // For backward compatibility
// ================== CURRENT file header info // ================== CURRENT file header info
@ -240,7 +240,7 @@ void *tsdbFreeFS(STsdbFS *pfs) {
return NULL; return NULL;
} }
static int tsdbProcessExpiredFS(STsdbRepo *pRepo) { static int tsdbProcessExpiredFS(STsdb *pRepo) {
tsdbStartFSTxn(pRepo, 0, 0); tsdbStartFSTxn(pRepo, 0, 0);
if (tsdbCreateMeta(pRepo) < 0) { if (tsdbCreateMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to create meta since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to create meta since %s", REPO_ID(pRepo), tstrerror(terrno));
@ -259,7 +259,7 @@ static int tsdbProcessExpiredFS(STsdbRepo *pRepo) {
return 0; return 0;
} }
static int tsdbCreateMeta(STsdbRepo *pRepo) { static int tsdbCreateMeta(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo); STsdbFS *pfs = REPO_FS(pRepo);
SMFile * pOMFile = pfs->cstatus->pmf; SMFile * pOMFile = pfs->cstatus->pmf;
SMFile mf; SMFile mf;
@ -296,7 +296,7 @@ static int tsdbCreateMeta(STsdbRepo *pRepo) {
return 0; return 0;
} }
int tsdbOpenFS(STsdbRepo *pRepo) { int tsdbOpenFS(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo); STsdbFS *pfs = REPO_FS(pRepo);
char current[TSDB_FILENAME_LEN] = "\0"; char current[TSDB_FILENAME_LEN] = "\0";
int nExpired = 0; int nExpired = 0;
@ -338,12 +338,12 @@ int tsdbOpenFS(STsdbRepo *pRepo) {
return 0; return 0;
} }
void tsdbCloseFS(STsdbRepo *pRepo) { void tsdbCloseFS(STsdb *pRepo) {
// Do nothing // Do nothing
} }
// Start a new transaction to modify the file system // Start a new transaction to modify the file system
void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd) { void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd) {
STsdbFS *pfs = REPO_FS(pRepo); STsdbFS *pfs = REPO_FS(pRepo);
ASSERT(pfs->intxn == false); ASSERT(pfs->intxn == false);
@ -361,7 +361,7 @@ void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd) {
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; } void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; }
int tsdbEndFSTxn(STsdbRepo *pRepo) { int tsdbEndFSTxn(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo); STsdbFS *pfs = REPO_FS(pRepo);
ASSERT(FS_IN_TXN(pfs)); ASSERT(FS_IN_TXN(pfs));
SFSStatus *pStatus; SFSStatus *pStatus;
@ -372,7 +372,7 @@ int tsdbEndFSTxn(STsdbRepo *pRepo) {
return -1; return -1;
} }
// Make new // Make new
tsdbWLockFS(pfs); tsdbWLockFS(pfs);
pStatus = pfs->cstatus; pStatus = pfs->cstatus;
pfs->cstatus = pfs->nstatus; pfs->cstatus = pfs->nstatus;
@ -642,7 +642,7 @@ static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), repoid, tsdbTxnFname[ftype]); snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), repoid, tsdbTxnFname[ftype]);
} }
static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) { static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
STsdbFS * pfs = REPO_FS(pRepo); STsdbFS * pfs = REPO_FS(pRepo);
int fd = -1; int fd = -1;
void * buffer = NULL; void * buffer = NULL;
@ -737,7 +737,7 @@ _err:
} }
// Scan and try to fix incorrect files // Scan and try to fix incorrect files
static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) { static int tsdbScanAndTryFixFS(STsdb *pRepo) {
STsdbFS * pfs = REPO_FS(pRepo); STsdbFS * pfs = REPO_FS(pRepo);
SFSStatus *pStatus = pfs->cstatus; SFSStatus *pStatus = pfs->cstatus;
@ -763,7 +763,7 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) {
return 0; return 0;
} }
int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) { int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta) {
char tbuf[128]; char tbuf[128];
STsdbFS * pfs = REPO_FS(pRepo); STsdbFS * pfs = REPO_FS(pRepo);
SMFile mf; SMFile mf;
@ -899,7 +899,7 @@ int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) {
return 0; return 0;
} }
static int tsdbScanRootDir(STsdbRepo *pRepo) { static int tsdbScanRootDir(STsdb *pRepo) {
char rootDir[TSDB_FILENAME_LEN]; char rootDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN];
STsdbFS * pfs = REPO_FS(pRepo); STsdbFS * pfs = REPO_FS(pRepo);
@ -933,7 +933,7 @@ static int tsdbScanRootDir(STsdbRepo *pRepo) {
return 0; return 0;
} }
static int tsdbScanDataDir(STsdbRepo *pRepo) { static int tsdbScanDataDir(STsdb *pRepo) {
char dataDir[TSDB_FILENAME_LEN]; char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN];
STsdbFS * pfs = REPO_FS(pRepo); STsdbFS * pfs = REPO_FS(pRepo);
@ -977,7 +977,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
return false; return false;
} }
static int tsdbRestoreMeta(STsdbRepo *pRepo) { static int tsdbRestoreMeta(STsdb *pRepo) {
char rootDir[TSDB_FILENAME_LEN]; char rootDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN];
TDIR * tdir = NULL; TDIR * tdir = NULL;
@ -1098,7 +1098,7 @@ static int tsdbRestoreMeta(STsdbRepo *pRepo) {
return 0; return 0;
} }
static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { static int tsdbRestoreDFileSet(STsdb *pRepo) {
char dataDir[TSDB_FILENAME_LEN]; char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN];
TDIR * tdir = NULL; TDIR * tdir = NULL;
@ -1220,9 +1220,10 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
} }
pDFile->f = *pf; pDFile->f = *pf;
if (tsdbOpenDFile(pDFile, O_RDONLY) < 0) { if (tsdbOpenDFile(pDFile, O_RDONLY) < 0) {
tsdbError("vgId:%d failed to open DFile %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno)); tsdbError("vgId:%d failed to open DFile %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
tstrerror(terrno));
taosArrayDestroy(fArray); taosArrayDestroy(fArray);
return -1; return -1;
} }
@ -1266,7 +1267,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
return 0; return 0;
} }
static int tsdbRestoreCurrent(STsdbRepo *pRepo) { static int tsdbRestoreCurrent(STsdb *pRepo) {
// Loop to recover mfile // Loop to recover mfile
if (tsdbRestoreMeta(pRepo) < 0) { if (tsdbRestoreMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno));
@ -1317,7 +1318,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) {
} }
} }
static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired) { static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired) {
STsdbFS * pfs = REPO_FS(pRepo); STsdbFS * pfs = REPO_FS(pRepo);
SFSStatus *pStatus = pfs->cstatus; SFSStatus *pStatus = pfs->cstatus;
SDFInfo info; SDFInfo info;

View File

@ -13,22 +13,23 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tsdbint.h" #include "tsdbDef.h"
static const char *TSDB_FNAME_SUFFIX[] = { static const char *TSDB_FNAME_SUFFIX[] = {
"head", // TSDB_FILE_HEAD "head", // TSDB_FILE_HEAD
"data", // TSDB_FILE_DATA "data", // TSDB_FILE_DATA
"last", // TSDB_FILE_LAST "last", // TSDB_FILE_LAST
"", // TSDB_FILE_MAX "", // TSDB_FILE_MAX
"meta", // TSDB_FILE_META "meta", // TSDB_FILE_META
}; };
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname); static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname);
static int tsdbRollBackMFile(SMFile *pMFile); // static int tsdbRollBackMFile(SMFile *pMFile);
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo); static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo);
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo); static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo);
static int tsdbRollBackDFile(SDFile *pDFile); static int tsdbRollBackDFile(SDFile *pDFile);
#if 0
// ============== SMFile // ============== SMFile
void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) { void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) {
char fname[TSDB_FILENAME_LEN]; char fname[TSDB_FILENAME_LEN];
@ -185,7 +186,7 @@ int tsdbLoadMFileHeader(SMFile *pMFile, SMFInfo *pInfo) {
return 0; return 0;
} }
int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) { int tsdbScanAndTryFixMFile(STsdb *pRepo) {
SMFile * pMFile = pRepo->fs->cstatus->pmf; SMFile * pMFile = pRepo->fs->cstatus->pmf;
struct stat mfstat; struct stat mfstat;
SMFile mf; SMFile mf;
@ -291,6 +292,8 @@ static int tsdbRollBackMFile(SMFile *pMFile) {
return 0; return 0;
} }
#endif
// ============== Operations on SDFile // ============== Operations on SDFile
void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype) { void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype) {
char fname[TSDB_FILENAME_LEN]; char fname[TSDB_FILENAME_LEN];
@ -397,7 +400,7 @@ int tsdbUpdateDFileHeader(SDFile *pDFile) {
} }
void *ptr = buf; void *ptr = buf;
taosEncodeFixedU32(&ptr, TSDB_FS_VERSION); taosEncodeFixedU32(&ptr, 0);
tsdbEncodeDFInfo(&ptr, &(pDFile->info)); tsdbEncodeDFInfo(&ptr, &(pDFile->info));
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
@ -433,7 +436,7 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) {
return 0; return 0;
} }
static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { static int tsdbScanAndTryFixDFile(STsdb *pRepo, SDFile *pDFile) {
struct stat dfstat; struct stat dfstat;
SDFile df; SDFile df;
@ -442,7 +445,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
if (access(TSDB_FILE_FULL_NAME(pDFile), F_OK) != 0) { if (access(TSDB_FILE_FULL_NAME(pDFile), F_OK) != 0) {
tsdbError("vgId:%d data file %s not exit, report to upper layer to fix it", REPO_ID(pRepo), tsdbError("vgId:%d data file %s not exit, report to upper layer to fix it", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pDFile)); TSDB_FILE_FULL_NAME(pDFile));
pRepo->state |= TSDB_STATE_BAD_DATA; // pRepo->state |= TSDB_STATE_BAD_DATA;
TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD); TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD);
return 0; return 0;
} }
@ -457,7 +460,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
return -1; return -1;
} }
if (taosFtruncate(df.fd, df.info.size) < 0) { if (taosFtruncateFile(df.fd, df.info.size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseDFile(&df); tsdbCloseDFile(&df);
return -1; return -1;
@ -474,7 +477,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
} else if (pDFile->info.size > dfstat.st_size) { } else if (pDFile->info.size > dfstat.st_size) {
tsdbError("vgId:%d data file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it", tsdbError("vgId:%d data file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it",
REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), dfstat.st_size, pDFile->info.size); REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), dfstat.st_size, pDFile->info.size);
pRepo->state |= TSDB_STATE_BAD_DATA; // pRepo->state |= TSDB_STATE_BAD_DATA;
TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD); TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return 0; return 0;
@ -538,7 +541,7 @@ static int tsdbRollBackDFile(SDFile *pDFile) {
return -1; return -1;
} }
if (taosFtruncate(TSDB_FILE_FD(&df), pDFile->info.size) < 0) { if (taosFtruncateFile(TSDB_FILE_FD(&df), pDFile->info.size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseDFile(&df); tsdbCloseDFile(&df);
return -1; return -1;
@ -651,7 +654,7 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
return 0; return 0;
} }
int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) { int tsdbScanAndTryFixDFileSet(STsdb *pRepo, SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbScanAndTryFixDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype)) < 0) { if (tsdbScanAndTryFixDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype)) < 0) {
return -1; return -1;

View File

@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tsdbint.h" #include "tsdbDef.h"
#define TSDB_KEY_COL_OFFSET 0 #define TSDB_KEY_COL_OFFSET 0
@ -25,8 +25,9 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int3
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
int numOfColIds); int numOfColIds);
static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol); static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol);
static STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) { return NULL; }
int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) { int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) {
ASSERT(pReadh != NULL && pRepo != NULL); ASSERT(pReadh != NULL && pRepo != NULL);
STsdbCfg *pCfg = REPO_CFG(pRepo); STsdbCfg *pCfg = REPO_CFG(pRepo);
@ -259,7 +260,9 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
for (int i = 1; i < pBlock->numOfSubBlocks; i++) { for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
iBlock++; iBlock++;
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1; if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1;
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, update != TD_ROW_PARTIAL_UPDATE) < 0) return -1; if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
update != TD_ROW_PARTIAL_UPDATE) < 0)
return -1;
} }
ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows); ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows);
@ -286,7 +289,9 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
for (int i = 1; i < pBlock->numOfSubBlocks; i++) { for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
iBlock++; iBlock++;
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1; if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1;
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, update != TD_ROW_PARTIAL_UPDATE) < 0) return -1; if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
update != TD_ROW_PARTIAL_UPDATE) < 0)
return -1;
} }
ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows); ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows);
@ -524,7 +529,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
if (comp) { if (comp) {
// Need to decompress // Need to decompress
int tlen = (*(tDataTypes[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData, int tlen = (*(tDataTypes[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData,
pDataCol->spaceSize, comp, buffer, bufferSize); pDataCol->spaceSize, comp, buffer, bufferSize);
if (tlen <= 0) { if (tlen <= 0) {
tsdbError("Failed to decompress column, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d bufferSize:%d", tsdbError("Failed to decompress column, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d bufferSize:%d",
len, comp, numOfRows, maxPoints, bufferSize); len, comp, numOfRows, maxPoints, bufferSize);
@ -624,9 +629,9 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol) { static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol) {
ASSERT(pDataCol->colId == pBlockCol->colId); ASSERT(pDataCol->colId == pBlockCol->colId);
STsdbRepo *pRepo = TSDB_READ_REPO(pReadh); STsdb * pRepo = TSDB_READ_REPO(pReadh);
STsdbCfg * pCfg = REPO_CFG(pRepo); STsdbCfg *pCfg = REPO_CFG(pRepo);
int tsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES; int tsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES;
if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlockCol->len) < 0) return -1; if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlockCol->len) < 0) return -1;
if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), tsize) < 0) return -1; if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), tsize) < 0) return -1;
@ -662,3 +667,4 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
return 0; return 0;
} }