Merge pull request #16256 from taosdata/refact/tsdb_last

refactor: refactor the last file structure, as well as the related write and read procedure.
This commit is contained in:
Haojun Liao 2022-08-21 20:55:23 +08:00 committed by GitHub
commit 98a6f36cc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 4129 additions and 2460 deletions

View File

@ -38,22 +38,18 @@ typedef struct STagVal STagVal;
typedef struct STag STag;
// bitmap
#define N1(n) ((1 << (n)) - 1)
#define BIT1_SIZE(n) (((n)-1) / 8 + 1)
#define BIT2_SIZE(n) (((n)-1) / 4 + 1)
#define SET_BIT1(p, i, v) \
do { \
(p)[(i) / 8] &= N1((i) % 8); \
(p)[(i) / 8] |= (((uint8_t)(v)) << (((i) % 8))); \
} while (0)
const static uint8_t BIT2_MAP[4][4] = {{0b00000000, 0b00000001, 0b00000010, 0},
{0b00000000, 0b00000100, 0b00001000, 2},
{0b00000000, 0b00010000, 0b00100000, 4},
{0b00000000, 0b01000000, 0b10000000, 6}};
#define GET_BIT1(p, i) (((p)[(i) / 8] >> ((i) % 8)) & ((uint8_t)1))
#define SET_BIT2(p, i, v) \
do { \
p[(i) / 4] &= N1((i) % 4 * 2); \
(p)[(i) / 4] |= (((uint8_t)(v)) << (((i) % 4) * 2)); \
} while (0)
#define GET_BIT2(p, i) (((p)[(i) / 4] >> (((i) % 4) * 2)) & ((uint8_t)3))
#define N1(n) ((((uint8_t)1) << (n)) - 1)
#define BIT1_SIZE(n) ((((n)-1) >> 3) + 1)
#define BIT2_SIZE(n) ((((n)-1) >> 2) + 1)
#define SET_BIT1(p, i, v) ((p)[(i) >> 3] = (p)[(i) >> 3] & N1((i)&7) | (((uint8_t)(v)) << ((i)&7)))
#define GET_BIT1(p, i) (((p)[(i) >> 3] >> ((i)&7)) & ((uint8_t)1))
#define SET_BIT2(p, i, v) ((p)[(i) >> 2] = (p)[(i) >> 2] & N1(BIT2_MAP[(i)&3][3]) | BIT2_MAP[(i)&3][(v)])
#define GET_BIT2(p, i) (((p)[(i) >> 2] >> BIT2_MAP[(i)&3][3]) & ((uint8_t)3))
// STSchema
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema);
@ -171,7 +167,7 @@ struct SColVal {
#pragma pack(push, 1)
struct STagVal {
// char colName[TSDB_COL_NAME_LEN]; // only used for tmq_get_meta
// char colName[TSDB_COL_NAME_LEN]; // only used for tmq_get_meta
union {
int16_t cid;
char *pKey;

View File

@ -123,7 +123,7 @@ void createNewTable(TAOS* pConn, int32_t index) {
}
taos_free_result(pRes);
for(int32_t i = 0; i < 100000; i += 20) {
for(int32_t i = 0; i < 3280; i += 20) {
char sql[1024] = {0};
sprintf(sql,
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
@ -679,30 +679,28 @@ TEST(testCase, projection_query_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "explain verbose true select _wstart,count(*),a from st1 partition by a interval(1s)");
printResult(pRes);
// pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int) tags(a int)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table tu using st1 tags(1)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// for(int32_t i = 0; i < 1; ++i) {
// printf("create table :%d\n", i);
// createNewTable(pConn, i);
// }
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tu using st1 tags(1)");
if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
for(int32_t i = 0; i < 2; ++i) {
printf("create table :%d\n", i);
createNewTable(pConn, i);
}
//
// pRes = taos_query(pConn, "select * from tu");
// if (taos_errno(pRes) != 0) {

View File

@ -392,10 +392,10 @@ tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX] = {
getStatics_i64},
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", 0, 0, tsCompressFloat, tsDecompressFloat, getStatics_f},
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", 0, 0, tsCompressDouble, tsDecompressDouble, getStatics_d},
{TSDB_DATA_TYPE_VARCHAR, 6, 0, "VARCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_bin},
{TSDB_DATA_TYPE_VARCHAR, 6, 1, "VARCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_bin},
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", INT64_MIN, INT64_MAX, tsCompressTimestamp,
tsDecompressTimestamp, getStatics_i64},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
{TSDB_DATA_TYPE_NCHAR, 5, 1, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
{TSDB_DATA_TYPE_UTINYINT, 16, CHAR_BYTES, "TINYINT UNSIGNED", 0, UINT8_MAX, tsCompressTinyint, tsDecompressTinyint,
getStatics_u8},
{TSDB_DATA_TYPE_USMALLINT, 17, SHORT_BYTES, "SMALLINT UNSIGNED", 0, UINT16_MAX, tsCompressSmallint,

View File

@ -66,7 +66,6 @@ int32_t metaCacheOpen(SMeta* pMeta);
void metaCacheClose(SMeta* pMeta);
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo);
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo);
struct SMeta {
TdThreadRwlock lock;

View File

@ -45,7 +45,7 @@ typedef struct SBlockIdx SBlockIdx;
typedef struct SBlock SBlock;
typedef struct SBlockL SBlockL;
typedef struct SColData SColData;
typedef struct SBlockDataHdr SBlockDataHdr;
typedef struct SDiskDataHdr SDiskDataHdr;
typedef struct SBlockData SBlockData;
typedef struct SDelFile SDelFile;
typedef struct SHeadFile SHeadFile;
@ -61,7 +61,11 @@ typedef struct SRowIter SRowIter;
typedef struct STsdbFS STsdbFS;
typedef struct SRowMerger SRowMerger;
typedef struct STsdbReadSnap STsdbReadSnap;
typedef struct SBlockInfo SBlockInfo;
typedef struct SSmaInfo SSmaInfo;
typedef struct SBlockCol SBlockCol;
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
#define TSDB_MAX_SUBBLOCKS 8
#define TSDB_FHDR_SIZE 512
@ -113,10 +117,14 @@ int32_t tPutBlock(uint8_t *p, void *ph);
int32_t tGetBlock(uint8_t *p, void *ph);
int32_t tBlockCmprFn(const void *p1, const void *p2);
bool tBlockHasSma(SBlock *pBlock);
// SBlockL
int32_t tPutBlockL(uint8_t *p, void *ph);
int32_t tGetBlockL(uint8_t *p, void *ph);
// SBlockIdx
int32_t tPutBlockIdx(uint8_t *p, void *ph);
int32_t tGetBlockIdx(uint8_t *p, void *ph);
int32_t tCmprBlockIdx(void const *lhs, void const *rhs);
int32_t tCmprBlockL(void const *lhs, void const *rhs);
// SColdata
void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn);
void tColDataReset(SColData *pColData);
@ -131,20 +139,25 @@ int32_t tGetColData(uint8_t *p, SColData *pColData);
#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1)
#define tBlockDataFirstKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataFirstRow(PBLOCKDATA))
#define tBlockDataLastKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataLastRow(PBLOCKDATA))
int32_t tBlockDataInit(SBlockData *pBlockData);
int32_t tBlockDataCreate(SBlockData *pBlockData);
void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear);
int32_t tBlockDataInit(SBlockData *pBlockData, int64_t suid, int64_t uid, STSchema *pTSchema);
int32_t tBlockDataInitEx(SBlockData *pBlockData, SBlockData *pBlockDataFrom);
void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema);
int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom);
void tBlockDataClearData(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
void tBlockDataClear(SBlockData *pBlockData);
SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx);
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData);
int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData);
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[],
int32_t aBufN[]);
int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]);
// SDiskDataHdr
int32_t tPutDiskDataHdr(uint8_t *p, void *ph);
int32_t tGetDiskDataHdr(uint8_t *p, void *ph);
// SDelIdx
int32_t tPutDelIdx(uint8_t *p, void *ph);
int32_t tGetDelIdx(uint8_t *p, void *ph);
@ -168,13 +181,25 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *m
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now);
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline);
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg);
int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut,
int32_t *szOut, uint8_t **ppBuf);
int32_t tsdbDecmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t szOut,
uint8_t **ppBuf);
int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppOut, int32_t nOut,
uint8_t **ppBuf);
int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData,
uint8_t **ppBuf);
int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck);
// tsdbMemTable ==============================================================================================
// SMemTable
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
void tsdbMemTableDestroy(SMemTable *pMemTable);
void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid);
void tsdbRefMemTable(SMemTable *pMemTable);
void tsdbUnrefMemTable(SMemTable *pMemTable);
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
// STbDataIter
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
@ -223,33 +248,33 @@ int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile);
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync);
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2,
SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx);
int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
int8_t cmprAlg, int8_t toLast);
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
// SDataFReader
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppBuf);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2);
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
uint8_t **ppBuf1, uint8_t **ppBuf2);
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg, uint8_t **ppBuf);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData);
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL);
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg);
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData);
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData);
// SDelFWriter
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync);
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, uint8_t **ppBuf, SDelIdx *pDelIdx);
int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx, uint8_t **ppBuf);
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelIdx);
int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx);
int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter);
// SDelFReader
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb, uint8_t **ppBuf);
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb);
int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, uint8_t **ppBuf);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
// tsdbRead.c ==============================================================================================
int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap);
void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap);
@ -277,13 +302,6 @@ size_t tsdbCacheGetCapacity(SVnode *pVnode);
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
// structs =======================
typedef struct {
int minFid;
int midFid;
int maxFid;
TSKEY minKey;
} SRtn;
struct STsdbFS {
SDelFile *pDelFile;
SArray *aDFileSet; // SArray<SDFileSet>
@ -312,30 +330,23 @@ struct SMemSkipListNode {
SMemSkipListNode *forwards[0];
};
typedef struct SMemSkipList {
uint32_t seed;
int64_t size;
uint32_t seed;
int8_t maxLevel;
int8_t level;
SMemSkipListNode *pHead;
SMemSkipListNode *pTail;
} SMemSkipList;
struct SDelDataInfo {
tb_uid_t suid;
tb_uid_t uid;
};
struct STbData {
tb_uid_t suid;
tb_uid_t uid;
TSKEY minKey;
TSKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int32_t maxSkmVer;
SDelData *pHead;
SDelData *pTail;
SMemSkipList sl;
STbData *next;
};
struct SMemTable {
@ -345,11 +356,13 @@ struct SMemTable {
volatile int32_t nRef;
TSKEY minKey;
TSKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int64_t nRow;
int64_t nDel;
SArray *aTbData; // SArray<STbData*>
struct {
int32_t nTbData;
int32_t nBucket;
STbData **aBucket;
};
};
struct TSDBROW {
@ -380,63 +393,51 @@ struct SMapData {
uint8_t *pData;
};
typedef struct {
struct SBlockCol {
int16_t cid;
int8_t type;
int8_t smaOn;
int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE
int32_t offset;
int32_t szBitmap; // bitmap size
int32_t szOffset; // size of offset, only for variant-length data type
int32_t szValue; // compressed column value size
int32_t szOrigin; // original column value size (only save for variant data type)
} SBlockCol;
int32_t szBitmap; // bitmap size, 0 only for flag == HAS_VAL
int32_t szOffset; // offset size, 0 only for non-variant-length type
int32_t szValue; // value size, 0 when flag == (HAS_NULL | HAS_NONE)
int32_t offset;
};
typedef struct {
int32_t nRow;
int8_t cmprAlg;
struct SBlockInfo {
int64_t offset; // block data offset
int32_t szBlockCol; // SBlockCol size
int32_t szVersion; // VERSION size
int32_t szTSKEY; // TSKEY size
int32_t szBlock; // total block size
int64_t sOffset; // sma offset
int32_t nSma; // sma size
} SSubBlock;
int32_t szBlock;
int32_t szKey;
};
struct SSmaInfo {
int64_t offset;
int32_t size;
};
struct SBlock {
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int32_t nRow;
int8_t last;
int8_t hasDup;
int8_t nSubBlock;
SSubBlock aSubBlock[TSDB_MAX_SUBBLOCKS];
};
struct SBlockL {
struct {
int64_t uid;
int64_t version;
TSKEY ts;
} minKey;
struct {
int64_t uid;
int64_t version;
TSKEY ts;
} maxKey;
int64_t minVer;
int64_t maxVer;
int32_t nRow;
int8_t cmprAlg;
int64_t offset;
int32_t szBlock;
int32_t szBlockCol;
int32_t szUid;
int32_t szVer;
int32_t szTSKEY;
int8_t hasDup;
int8_t nSubBlock;
SBlockInfo aSubBlock[TSDB_MAX_SUBBLOCKS];
SSmaInfo smaInfo;
};
struct SBlockL {
int64_t suid;
int64_t minUid;
int64_t maxUid;
TSKEY minKey;
TSKEY maxKey;
int64_t minVer;
int64_t maxVer;
int32_t nRow;
SBlockInfo bInfo;
};
struct SColData {
@ -451,10 +452,17 @@ struct SColData {
uint8_t *pData;
};
// (SBlockData){.suid = 0, .uid = 0}: block data not initialized
// (SBlockData){.suid = suid, .uid = uid}: block data for ONE child table int .data file
// (SBlockData){.suid = suid, .uid = 0}: block data for N child tables int .last file
// (SBlockData){.suid = 0, .uid = uid}: block data for 1 normal table int .last/.data file
struct SBlockData {
int32_t nRow;
int64_t *aVersion;
TSKEY *aTSKEY;
int64_t suid; // 0 means normal table block data, otherwise child table block data
int64_t uid; // 0 means block data in .last file, otherwise in .data file
int32_t nRow; // number of rows
int64_t *aUid; // uids of each row, only exist in block data in .last file (uid == 0)
int64_t *aVersion; // versions of each row
TSKEY *aTSKEY; // timestamp of each row
SArray *aIdx; // SArray<int32_t>
SArray *aColData; // SArray<SColData>
};
@ -492,13 +500,18 @@ struct SDelIdx {
int64_t size;
};
#pragma pack(push, 1)
struct SBlockDataHdr {
struct SDiskDataHdr {
uint32_t delimiter;
uint32_t fmtVer;
int64_t suid;
int64_t uid;
int32_t szUid;
int32_t szVer;
int32_t szKey;
int32_t szBlkCol;
int32_t nRow;
int8_t cmprAlg;
};
#pragma pack(pop)
struct SDelFile {
volatile int32_t nRef;
@ -528,6 +541,7 @@ struct SLastFile {
int64_t commitID;
int64_t size;
int64_t offset;
};
struct SSmaFile {
@ -562,6 +576,8 @@ struct SDelFWriter {
STsdb *pTsdb;
SDelFile fDel;
TdFilePtr pWriteH;
uint8_t *aBuf[1];
};
struct SDataFWriter {
@ -577,6 +593,8 @@ struct SDataFWriter {
SDataFile fData;
SLastFile fLast;
SSmaFile fSma;
uint8_t *aBuf[4];
};
struct STsdbReadSnap {

View File

@ -369,6 +369,7 @@ struct SSma {
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
enum {
SNAP_DATA_CFG = 0,
SNAP_DATA_META = 1,
SNAP_DATA_TSDB = 2,
SNAP_DATA_DEL = 3,

View File

@ -513,17 +513,64 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema **ppTSchema) {
int32_t code = 0;
STSchema *pTSchema = NULL;
SSkmDbKey skmDbKey = {.uid = suid ? suid : uid, .sver = sver};
void *pData = NULL;
int nData = 0;
SSkmDbKey skmDbKey;
if (sver <= 0) {
SMetaInfo info;
if (metaGetInfo(pMeta, suid ? suid : uid, &info) == 0) {
sver = info.skmVer;
} else {
TBC *pSkmDbC = NULL;
int c;
// query
skmDbKey.uid = suid ? suid : uid;
skmDbKey.sver = INT32_MAX;
tdbTbcOpen(pMeta->pSkmDb, &pSkmDbC, NULL);
metaRLock(pMeta);
if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), &pData, &nData) < 0) {
code = TSDB_CODE_NOT_FOUND;
if (tdbTbcMoveTo(pSkmDbC, &skmDbKey, sizeof(skmDbKey), &c) < 0) {
metaULock(pMeta);
goto _err;
tdbTbcClose(pSkmDbC);
code = TSDB_CODE_NOT_FOUND;
goto _exit;
}
ASSERT(c);
if (c < 0) {
tdbTbcMoveToPrev(pSkmDbC);
}
const void *pKey = NULL;
int32_t nKey = 0;
tdbTbcGet(pSkmDbC, &pKey, &nKey, NULL, NULL);
if (((SSkmDbKey *)pKey)->uid != skmDbKey.uid) {
metaULock(pMeta);
tdbTbcClose(pSkmDbC);
code = TSDB_CODE_NOT_FOUND;
goto _exit;
}
sver = ((SSkmDbKey *)pKey)->sver;
metaULock(pMeta);
tdbTbcClose(pSkmDbC);
}
}
ASSERT(sver > 0);
skmDbKey.uid = suid ? suid : uid;
skmDbKey.sver = sver;
metaRLock(pMeta);
if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(SSkmDbKey), &pData, &nData) < 0) {
metaULock(pMeta);
code = TSDB_CODE_NOT_FOUND;
goto _exit;
}
metaULock(pMeta);
@ -545,15 +592,13 @@ int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sv
SSchema *pSchema = pSchemaWrapper->pSchema + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
}
pTSchema = tdGetSchemaFromBuilder(&sb);
STSchema *pTSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
*ppTSchema = pTSchema;
taosMemoryFree(pSchemaWrapper->pSchema);
return code;
_err:
*ppTSchema = NULL;
_exit:
return code;
}
@ -1006,6 +1051,8 @@ int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj
return TSDB_CODE_SUCCESS;
}
int32_t metaCacheGet(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo);
int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo) {
int32_t code = 0;
void *pData = NULL;

View File

@ -357,10 +357,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
metaSaveToTbDb(pMeta, &nStbEntry);
// update uid index
SMetaInfo info;
metaGetEntryInfo(&nStbEntry, &info);
tdbTbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t),
&(SUidIdxVal){.suid = info.suid, .version = info.version, .skmVer = info.skmVer}, sizeof(SUidIdxVal), 0);
metaUpdateUidIdx(pMeta, &nStbEntry);
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
metaULock(pMeta);
@ -884,7 +881,8 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
}
SCtbIdxKey ctbIdxKey = {.suid = ctbEntry.ctbEntry.suid, .uid = uid};
tdbTbUpsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), ctbEntry.ctbEntry.pTags, ((STag*)(ctbEntry.ctbEntry.pTags))->len, &pMeta->txn);
tdbTbUpsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), ctbEntry.ctbEntry.pTags,
((STag *)(ctbEntry.ctbEntry.pTags))->len, &pMeta->txn);
tDecoderClear(&dc1);
tDecoderClear(&dc2);
@ -1091,7 +1089,8 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid};
return tdbTbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), pME->ctbEntry.pTags, ((STag*)(pME->ctbEntry.pTags))->len, &pMeta->txn);
return tdbTbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), pME->ctbEntry.pTags,
((STag *)(pME->ctbEntry.pTags))->len, &pMeta->txn);
}
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_t nTagData, int8_t type, tb_uid_t uid,

View File

@ -266,14 +266,14 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb
}
for (++iCol; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (keyTs >= tTsVal->ts) {
SColVal *tColVal = &tTsVal->colVal;
SLastCol *tTsVal1 = (SLastCol *)taosArrayGet(pLast, iCol);
if (keyTs >= tTsVal1->ts) {
SColVal *tColVal = &tTsVal1->colVal;
SColVal colVal = {0};
tTSRowGetVal(row, pTSchema, iCol, &colVal);
if (colVal.isNone || colVal.isNull) {
if (keyTs == tTsVal->ts && !tColVal->isNone && !tColVal->isNull) {
if (keyTs == tTsVal1->ts && !tColVal->isNone && !tColVal->isNull) {
invalidate = true;
break;
@ -284,6 +284,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb
}
}
_invalidate:
taosMemoryFreeClear(pTSchema);
taosLRUCacheRelease(pCache, h, invalidate);
@ -322,7 +323,7 @@ static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelI
int32_t code = 0;
if (pDelIdx) {
code = tsdbReadDelData(pDelReader, pDelIdx, aDelData, NULL);
code = tsdbReadDelData(pDelReader, pDelIdx, aDelData);
}
return code;
@ -393,8 +394,7 @@ static int32_t getTableDelIdx(SDelFReader *pDelFReader, tb_uid_t suid, tb_uid_t
SDelIdx idx = {.suid = suid, .uid = uid};
// tMapDataReset(&delIdxMap);
// code = tsdbReadDelIdx(pDelFReader, &delIdxMap, NULL);
code = tsdbReadDelIdx(pDelFReader, pDelIdxArray, NULL);
code = tsdbReadDelIdx(pDelFReader, pDelIdxArray);
if (code) goto _err;
// code = tMapDataSearch(&delIdxMap, &idx, tGetDelIdx, tCmprDelIdx, pDelIdx);
@ -410,6 +410,178 @@ _err:
return code;
}
typedef enum {
SFSLASTNEXTROW_FS,
SFSLASTNEXTROW_FILESET,
SFSLASTNEXTROW_BLOCKDATA,
SFSLASTNEXTROW_BLOCKROW
} SFSLASTNEXTROWSTATES;
typedef struct {
SFSLASTNEXTROWSTATES state; // [input]
STsdb *pTsdb; // [input]
SBlockIdx *pBlockIdxExp; // [input]
STSchema *pTSchema; // [input]
int32_t nFileSet;
int32_t iFileSet;
SArray *aDFileSet;
SDataFReader *pDataFReader;
SArray *aBlockL;
SBlockL *pBlockL;
SBlockData *pBlockDataL;
SBlockData blockDataL;
int32_t nRow;
int32_t iRow;
TSDBROW row;
/*
SArray *aBlockIdx;
SBlockIdx *pBlockIdx;
SMapData blockMap;
int32_t nBlock;
int32_t iBlock;
SBlock block;
*/
} SFSLastNextRowIter;
static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
int32_t code = 0;
switch (state->state) {
case SFSLASTNEXTROW_FS:
// state->aDFileSet = state->pTsdb->pFS->cState->aDFileSet;
state->nFileSet = taosArrayGetSize(state->aDFileSet);
state->iFileSet = state->nFileSet;
state->pBlockDataL = NULL;
case SFSLASTNEXTROW_FILESET: {
SDFileSet *pFileSet = NULL;
_next_fileset:
if (--state->iFileSet >= 0) {
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
} else {
if (state->pBlockDataL) {
tBlockDataDestroy(state->pBlockDataL, 1);
state->pBlockDataL = NULL;
}
*ppRow = NULL;
return code;
}
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
if (code) goto _err;
if (!state->aBlockL) {
state->aBlockL = taosArrayInit(0, sizeof(SBlockIdx));
} else {
taosArrayClear(state->aBlockL);
}
code = tsdbReadBlockL(state->pDataFReader, state->aBlockL);
if (code) goto _err;
// SBlockL *pBlockL = (SBlockL *)taosArrayGet(state->aBlockL, state->iBlockL);
state->pBlockL = taosArraySearch(state->aBlockL, state->pBlockIdxExp, tCmprBlockL, TD_EQ);
if (!state->pBlockL) {
goto _next_fileset;
}
int64_t suid = state->pBlockL->suid;
int64_t uid = state->pBlockL->maxUid;
if (!state->pBlockDataL) {
state->pBlockDataL = &state->blockDataL;
}
code = tBlockDataInit(state->pBlockDataL, suid, suid ? 0 : uid, state->pTSchema);
if (code) goto _err;
}
case SFSLASTNEXTROW_BLOCKDATA:
code = tsdbReadLastBlock(state->pDataFReader, state->pBlockL, state->pBlockDataL);
if (code) goto _err;
state->nRow = state->blockDataL.nRow;
state->iRow = state->nRow - 1;
if (!state->pBlockDataL->uid) {
while (state->pBlockIdxExp->uid != state->pBlockDataL->aUid[state->iRow]) {
--state->iRow;
}
}
state->state = SFSLASTNEXTROW_BLOCKROW;
case SFSLASTNEXTROW_BLOCKROW:
if (state->pBlockDataL->uid) {
if (state->iRow >= 0) {
state->row = tsdbRowFromBlockData(state->pBlockDataL, state->iRow);
*ppRow = &state->row;
if (--state->iRow < 0) {
state->state = SFSLASTNEXTROW_FILESET;
}
}
} else {
if (state->iRow >= 0 && state->pBlockIdxExp->uid == state->pBlockDataL->aUid[state->iRow]) {
state->row = tsdbRowFromBlockData(state->pBlockDataL, state->iRow);
*ppRow = &state->row;
if (--state->iRow < 0 || state->pBlockIdxExp->uid != state->pBlockDataL->aUid[state->iRow]) {
state->state = SFSLASTNEXTROW_FILESET;
}
}
}
return code;
default:
ASSERT(0);
break;
}
_err:
if (state->pDataFReader) {
tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL;
}
if (state->aBlockL) {
taosArrayDestroy(state->aBlockL);
state->aBlockL = NULL;
}
if (state->pBlockDataL) {
tBlockDataDestroy(state->pBlockDataL, 1);
state->pBlockDataL = NULL;
}
*ppRow = NULL;
return code;
}
int32_t clearNextRowFromFSLast(void *iter) {
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
int32_t code = 0;
if (!state) {
return code;
}
if (state->pDataFReader) {
tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL;
}
if (state->aBlockL) {
taosArrayDestroy(state->aBlockL);
state->aBlockL = NULL;
}
if (state->pBlockDataL) {
tBlockDataDestroy(state->pBlockDataL, 1);
state->pBlockDataL = NULL;
}
return code;
}
typedef enum SFSNEXTROWSTATES {
SFSNEXTROW_FS,
SFSNEXTROW_FILESET,
@ -456,9 +628,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (--state->iFileSet >= 0) {
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
} else {
// tBlockDataClear(&state->blockData, 1);
// tBlockDataDestroy(&state->blockData, 1);
if (state->pBlockData) {
tBlockDataClear(state->pBlockData, 1);
tBlockDataDestroy(state->pBlockData, 1);
state->pBlockData = NULL;
}
@ -470,13 +642,12 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (code) goto _err;
// tMapDataReset(&state->blockIdxMap);
// code = tsdbReadBlockIdx(state->pDataFReader, &state->blockIdxMap, NULL);
if (!state->aBlockIdx) {
state->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
} else {
taosArrayClear(state->aBlockIdx);
}
code = tsdbReadBlockIdx(state->pDataFReader, state->aBlockIdx, NULL);
code = tsdbReadBlockIdx(state->pDataFReader, state->aBlockIdx);
if (code) goto _err;
/* if (state->pBlockIdx) { */
@ -492,8 +663,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
}
tMapDataReset(&state->blockMap);
code = tsdbReadBlock(state->pDataFReader, state->pBlockIdx, &state->blockMap, NULL);
/* code = tsdbReadBlock(state->pDataFReader, &state->blockIdx, &state->blockMap, NULL); */
code = tsdbReadBlock(state->pDataFReader, state->pBlockIdx, &state->blockMap);
if (code) goto _err;
state->nBlock = state->blockMap.nItem;
@ -502,7 +672,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (!state->pBlockData) {
state->pBlockData = &state->blockData;
tBlockDataInit(&state->blockData);
tBlockDataCreate(&state->blockData);
}
}
case SFSNEXTROW_BLOCKDATA:
@ -515,7 +685,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetBlock);
/* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */
code = tsdbReadBlockData(state->pDataFReader, state->pBlockIdx, &block, state->pBlockData, NULL, NULL);
code = tsdbReadDataBlock(state->pDataFReader, &block, state->pBlockData);
if (code) goto _err;
state->nRow = state->blockData.nRow;
@ -560,8 +730,8 @@ _err:
state->aBlockIdx = NULL;
}
if (state->pBlockData) {
// tBlockDataClear(&state->blockData, 1);
tBlockDataClear(state->pBlockData, 1);
// tBlockDataDestroy(&state->blockData, 1);
tBlockDataDestroy(state->pBlockData, 1);
state->pBlockData = NULL;
}
@ -587,8 +757,8 @@ int32_t clearNextRowFromFS(void *iter) {
state->aBlockIdx = NULL;
}
if (state->pBlockData) {
// tBlockDataClear(&state->blockData, 1);
tBlockDataClear(state->pBlockData, 1);
// tBlockDataDestroy(&state->blockData, 1);
tBlockDataDestroy(state->pBlockData, 1);
state->pBlockData = NULL;
}
@ -733,15 +903,16 @@ typedef struct {
SBlockIdx idx;
SMemNextRowIter memState;
SMemNextRowIter imemState;
SFSLastNextRowIter fsLastState;
SFSNextRowIter fsState;
TSDBROW memRow, imemRow, fsRow;
TSDBROW memRow, imemRow, fsLastRow, fsRow;
TsdbNextRowState input[3];
TsdbNextRowState input[4];
STsdbReadSnap *pReadSnap;
STsdb *pTsdb;
} CacheNextRowIter;
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb) {
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema) {
int code = 0;
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
@ -750,12 +921,12 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
STbData *pMem = NULL;
if (pIter->pReadSnap->pMem) {
tsdbGetTbDataFromMemTable(pIter->pReadSnap->pMem, suid, uid, &pMem);
pMem = tsdbGetTbDataFromMemTable(pIter->pReadSnap->pMem, suid, uid);
}
STbData *pIMem = NULL;
if (pIter->pReadSnap->pIMem) {
tsdbGetTbDataFromMemTable(pIter->pReadSnap->pIMem, suid, uid, &pIMem);
pIMem = tsdbGetTbDataFromMemTable(pIter->pReadSnap->pIMem, suid, uid);
}
pIter->pTsdb = pTsdb;
@ -768,7 +939,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
if (pDelFile) {
SDelFReader *pDelFReader;
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
if (code) goto _err;
code = getTableDelIdx(pDelFReader, suid, uid, &delIdx);
@ -787,6 +958,12 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
pIter->fsLastState.state = (SFSLASTNEXTROWSTATES) SFSNEXTROW_FS;
pIter->fsLastState.pTsdb = pTsdb;
pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
pIter->fsLastState.pBlockIdxExp = &pIter->idx;
pIter->fsLastState.pTSchema = pTSchema;
pIter->fsState.state = SFSNEXTROW_FS;
pIter->fsState.pTsdb = pTsdb;
pIter->fsState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
@ -794,7 +971,9 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL};
pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL};
pIter->input[2] =
pIter->input[2] = (TsdbNextRowState){&pIter->fsLastRow, false, true, &pIter->fsLastState, getNextRowFromFSLast,
clearNextRowFromFSLast};
pIter->input[3] =
(TsdbNextRowState){&pIter->fsRow, false, true, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
if (pMem) {
@ -819,7 +998,7 @@ _err:
static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
int code = 0;
for (int i = 0; i < 3; ++i) {
for (int i = 0; i < 4; ++i) {
if (pIter->input[i].nextRowClearFn) {
pIter->input[i].nextRowClearFn(pIter->input[i].iter);
}
@ -831,7 +1010,6 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap);
return code;
_err:
return code;
}
@ -840,7 +1018,7 @@ _err:
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
int code = 0;
for (int i = 0; i < 3; ++i) {
for (int i = 0; i < 4; ++i) {
if (pIter->input[i].next && !pIter->input[i].stop) {
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow);
if (code) goto _err;
@ -852,18 +1030,18 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
}
}
if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop && pIter->input[3].stop) {
*ppRow = NULL;
return code;
}
// select maxpoint(s) from mem, imem, fs
TSDBROW *max[3] = {0};
int iMax[3] = {-1, -1, -1};
// select maxpoint(s) from mem, imem, fs and last
TSDBROW *max[4] = {0};
int iMax[4] = {-1, -1, -1, -1};
int nMax = 0;
TSKEY maxKey = TSKEY_MIN;
for (int i = 0; i < 3; ++i) {
for (int i = 0; i < 4; ++i) {
if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
TSDBKEY key = TSDBROW_KEY(pIter->input[i].pRow);
@ -881,13 +1059,13 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
}
// delete detection
TSDBROW *merge[3] = {0};
int iMerge[3] = {-1, -1, -1};
TSDBROW *merge[4] = {0};
int iMerge[4] = {-1, -1, -1, -1};
int nMerge = 0;
for (int i = 0; i < nMax; ++i) {
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
bool deleted = tsdbKeyDeleted(&maxKey, pIter->pSkyline, &pIter->iSkyline);
bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
if (!deleted) {
iMerge[nMerge] = iMax[i];
merge[nMerge++] = max[i];
@ -923,7 +1101,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb);
nextRowIterOpen(&iter, uid, pTsdb, pTSchema);
do {
TSDBROW *pRow = NULL;
@ -1020,7 +1198,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb);
nextRowIterOpen(&iter, uid, pTsdb, pTSchema);
do {
TSDBROW *pRow = NULL;

File diff suppressed because it is too large Load Diff

View File

@ -576,10 +576,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
fSet.pHeadF->nRef = 0;
fSet.pHeadF->commitID = pSet->pHeadF->commitID;
fSet.pHeadF->size = pSet->pHeadF->size;
fSet.pHeadF->offset = pSet->pHeadF->offset;
*fSet.pHeadF = *pSet->pHeadF;
// data
fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
@ -587,9 +584,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
fSet.pDataF->nRef = 0;
fSet.pDataF->commitID = pSet->pDataF->commitID;
fSet.pDataF->size = pSet->pDataF->size;
*fSet.pDataF = *pSet->pDataF;
// data
fSet.pLastF = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile));
@ -597,9 +592,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
fSet.pLastF->nRef = 0;
fSet.pLastF->commitID = pSet->pLastF->commitID;
fSet.pLastF->size = pSet->pLastF->size;
*fSet.pLastF = *pSet->pLastF;
// last
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
@ -607,9 +600,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
fSet.pSmaF->nRef = 0;
fSet.pSmaF->commitID = pSet->pSmaF->commitID;
fSet.pSmaF->size = pSet->pSmaF->size;
*fSet.pSmaF = *pSet->pSmaF;
if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -58,6 +58,7 @@ int32_t tPutLastFile(uint8_t *p, SLastFile *pLastFile) {
n += tPutI64v(p ? p + n : p, pLastFile->commitID);
n += tPutI64v(p ? p + n : p, pLastFile->size);
n += tPutI64v(p ? p + n : p, pLastFile->offset);
return n;
}
@ -67,6 +68,7 @@ static int32_t tGetLastFile(uint8_t *p, SLastFile *pLastFile) {
n += tGetI64v(p + n, &pLastFile->commitID);
n += tGetI64v(p + n, &pLastFile->size);
n += tGetI64v(p + n, &pLastFile->offset);
return n;
}
@ -186,11 +188,16 @@ int32_t tPutDFileSet(uint8_t *p, SDFileSet *pSet) {
n += tPutI32v(p ? p + n : p, pSet->diskId.level);
n += tPutI32v(p ? p + n : p, pSet->diskId.id);
n += tPutI32v(p ? p + n : p, pSet->fid);
// data
n += tPutHeadFile(p ? p + n : p, pSet->pHeadF);
n += tPutDataFile(p ? p + n : p, pSet->pDataF);
n += tPutLastFile(p ? p + n : p, pSet->pLastF);
n += tPutSmaFile(p ? p + n : p, pSet->pSmaF);
// last
n += tPutU8(p ? p + n : p, 1); // for future compatibility
n += tPutLastFile(p ? p + n : p, pSet->pLastF);
return n;
}
@ -200,11 +207,17 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
n += tGetI32v(p + n, &pSet->diskId.level);
n += tGetI32v(p + n, &pSet->diskId.id);
n += tGetI32v(p + n, &pSet->fid);
// data
n += tGetHeadFile(p + n, pSet->pHeadF);
n += tGetDataFile(p + n, pSet->pDataF);
n += tGetLastFile(p + n, pSet->pLastF);
n += tGetSmaFile(p + n, pSet->pSmaF);
// last
uint8_t nLast;
n += tGetU8(p + n, &nLast);
n += tGetLastFile(p + n, pSet->pLastF);
return n;
}

View File

@ -15,6 +15,7 @@
#include "tsdb.h"
#define MEM_MIN_HASH 1024
#define SL_MAX_LEVEL 5
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
@ -45,12 +46,12 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
pMemTable->nRef = 1;
pMemTable->minKey = TSKEY_MAX;
pMemTable->maxKey = TSKEY_MIN;
pMemTable->minVersion = VERSION_MAX;
pMemTable->maxVersion = VERSION_MIN;
pMemTable->nRow = 0;
pMemTable->nDel = 0;
pMemTable->aTbData = taosArrayInit(128, sizeof(STbData *));
if (pMemTable->aTbData == NULL) {
pMemTable->nTbData = 0;
pMemTable->nBucket = MEM_MIN_HASH;
pMemTable->aBucket = (STbData **)taosMemoryCalloc(pMemTable->nBucket, sizeof(STbData *));
if (pMemTable->aBucket == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pMemTable);
goto _err;
@ -68,37 +69,30 @@ _err:
void tsdbMemTableDestroy(SMemTable *pMemTable) {
if (pMemTable) {
vnodeBufPoolUnRef(pMemTable->pPool);
taosArrayDestroy(pMemTable->aTbData);
taosMemoryFree(pMemTable->aBucket);
taosMemoryFree(pMemTable);
}
}
static int32_t tbDataPCmprFn(const void *p1, const void *p2) {
STbData *pTbData1 = *(STbData **)p1;
STbData *pTbData2 = *(STbData **)p2;
static FORCE_INLINE STbData *tsdbGetTbDataFromMemTableImpl(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
STbData *pTbData = pMemTable->aBucket[TABS(uid) % pMemTable->nBucket];
if (pTbData1->suid < pTbData2->suid) {
return -1;
} else if (pTbData1->suid > pTbData2->suid) {
return 1;
while (pTbData) {
if (pTbData->uid == uid) break;
pTbData = pTbData->next;
}
if (pTbData1->uid < pTbData2->uid) {
return -1;
} else if (pTbData1->uid > pTbData2->uid) {
return 1;
}
return 0;
return pTbData;
}
void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
STbData *pTbData = &(STbData){.suid = suid, .uid = uid};
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
STbData *pTbData;
taosRLockLatch(&pMemTable->latch);
void *p = taosArraySearch(pMemTable->aTbData, &pTbData, tbDataPCmprFn, TD_EQ);
pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
taosRUnLockLatch(&pMemTable->latch);
*ppTbData = p ? *(STbData **)p : NULL;
return pTbData;
}
int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock,
@ -184,10 +178,6 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
pTbData->pTail = pDelData;
}
// update the state of pMemTable and other (todo)
pMemTable->minVersion = TMIN(pMemTable->minVersion, version);
pMemTable->maxVersion = TMAX(pMemTable->maxVersion, version);
pMemTable->nDel++;
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
@ -320,18 +310,44 @@ _exit:
return pIter->pRow;
}
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
int32_t code = 0;
int32_t nBucket = pMemTable->nBucket * 2;
STbData **aBucket = (STbData **)taosMemoryCalloc(nBucket, sizeof(STbData *));
if (aBucket == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
STbData *pTbData = pMemTable->aBucket[iBucket];
while (pTbData) {
STbData *pNext = pTbData->next;
int32_t idx = TABS(pTbData->uid) % nBucket;
pTbData->next = aBucket[idx];
aBucket[idx] = pTbData;
pTbData = pNext;
}
}
taosMemoryFree(pMemTable->aBucket);
pMemTable->nBucket = nBucket;
pMemTable->aBucket = aBucket;
_exit:
return code;
}
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
int32_t code = 0;
int32_t idx = 0;
STbData *pTbData = NULL;
STbData *pTbDataT = &(STbData){.suid = suid, .uid = uid};
// get
idx = taosArraySearchIdx(pMemTable->aTbData, &pTbDataT, tbDataPCmprFn, TD_GE);
if (idx >= 0) {
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, idx);
if (tbDataPCmprFn(&pTbDataT, &pTbData) == 0) goto _exit;
}
STbData *pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
if (pTbData) goto _exit;
// create
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
@ -346,9 +362,6 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
pTbData->uid = uid;
pTbData->minKey = TSKEY_MAX;
pTbData->maxKey = TSKEY_MIN;
pTbData->minVersion = VERSION_MAX;
pTbData->maxVersion = VERSION_MIN;
pTbData->maxSkmVer = -1;
pTbData->pHead = NULL;
pTbData->pTail = NULL;
pTbData->sl.seed = taosRand();
@ -367,21 +380,23 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL;
}
void *p;
if (idx < 0) {
idx = taosArrayGetSize(pMemTable->aTbData);
}
taosWLockLatch(&pMemTable->latch);
p = taosArrayInsert(pMemTable->aTbData, idx, &pTbData);
if (pMemTable->nTbData >= pMemTable->nBucket) {
code = tsdbMemTableRehash(pMemTable);
if (code) {
taosWUnLockLatch(&pMemTable->latch);
tsdbDebug("vgId:%d, add table data %p at idx:%d", TD_VID(pMemTable->pTsdb->pVnode), pTbData, idx);
if (p == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
int32_t idx = TABS(uid) % pMemTable->nBucket;
pTbData->next = pMemTable->aBucket[idx];
pMemTable->aBucket[idx] = pTbData;
pMemTable->nTbData++;
taosWUnLockLatch(&pMemTable->latch);
_exit:
*ppTbData = pTbData;
return code;
@ -591,15 +606,9 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow, pMemTable->pTsdb);
}
pTbData->minVersion = TMIN(pTbData->minVersion, version);
pTbData->maxVersion = TMAX(pTbData->maxVersion, version);
pTbData->maxSkmVer = TMAX(pTbData->maxSkmVer, pMsgIter->sversion);
// SMemTable
pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
pMemTable->minVersion = TMIN(pMemTable->minVersion, pTbData->minVersion);
pMemTable->maxVersion = TMAX(pMemTable->maxVersion, pTbData->maxVersion);
pMemTable->nRow += nRow;
pRsp->numOfRows = nRow;
@ -624,3 +633,41 @@ void tsdbUnrefMemTable(SMemTable *pMemTable) {
tsdbMemTableDestroy(pMemTable);
}
}
static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
STbData *pTbData1 = *(STbData **)p1;
STbData *pTbData2 = *(STbData **)p2;
if (pTbData1->suid < pTbData2->suid) {
return -1;
} else if (pTbData1->suid > pTbData2->suid) {
return 1;
}
if (pTbData1->uid < pTbData2->uid) {
return -1;
} else if (pTbData1->uid > pTbData2->uid) {
return 1;
}
return 0;
}
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) {
SArray *aTbDataP = taosArrayInit(pMemTable->nTbData, sizeof(STbData *));
if (aTbDataP == NULL) goto _exit;
for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
STbData *pTbData = pMemTable->aBucket[iBucket];
while (pTbData) {
taosArrayPush(aTbDataP, &pTbData);
pTbData = pTbData->next;
}
}
taosArraySort(aTbDataP, tbDataPCmprFn);
_exit:
return aTbDataP;
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -27,8 +27,12 @@ struct STsdbSnapReader {
int32_t fid;
SDataFReader* pDataFReader;
SArray* aBlockIdx; // SArray<SBlockIdx>
int32_t iBlockIdx;
SArray* aBlockL; // SArray<SBlockL>
SBlockIdx* pBlockIdx;
SBlockL* pBlockL;
int32_t iBlockIdx;
int32_t iBlockL;
SMapData mBlock; // SMapData<SBlock>
int32_t iBlock;
SBlockData oBlockData;
@ -47,115 +51,117 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
while (true) {
if (pReader->pDataFReader == NULL) {
SDFileSet* pSet =
taosArraySearch(pReader->fs.aDFileSet, &(SDFileSet){.fid = pReader->fid}, tDFileSetCmprFn, TD_GT);
// next
SDFileSet dFileSet = {.fid = pReader->fid};
SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT);
if (pSet == NULL) goto _exit;
pReader->fid = pSet->fid;
code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
// load
code = tsdbDataFReaderOpen(&pReader->pDataFReader, pTsdb, pSet);
if (code) goto _err;
// SBlockIdx
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx, NULL);
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx);
if (code) goto _err;
code = tsdbReadBlockL(pReader->pDataFReader, pReader->aBlockL);
if (code) goto _err;
// init
pReader->iBlockIdx = 0;
if (pReader->iBlockIdx < taosArrayGetSize(pReader->aBlockIdx)) {
pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock);
if (code) goto _err;
pReader->iBlock = 0;
} else {
pReader->pBlockIdx = NULL;
}
pReader->iBlockL = 0;
while (true) {
if (pReader->iBlockL >= taosArrayGetSize(pReader->aBlockL)) {
pReader->pBlockL = NULL;
break;
}
pReader->pBlockL = (SBlockL*)taosArrayGet(pReader->aBlockL, pReader->iBlockL);
if (pReader->pBlockL->minVer <= pReader->ever && pReader->pBlockL->maxVer >= pReader->sver) {
// TODO
break;
}
pReader->iBlockL++;
}
tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pTsdb->pVnode), pTsdb->path,
pReader->fid);
}
while (true) {
if (pReader->pBlockIdx == NULL) {
if (pReader->iBlockIdx >= taosArrayGetSize(pReader->aBlockIdx)) {
tsdbDataFReaderClose(&pReader->pDataFReader);
break;
if (pReader->pBlockIdx && pReader->pBlockL) {
TABLEID id = {.suid = pReader->pBlockL->suid, .uid = pReader->pBlockL->minUid};
ASSERT(0);
// if (tTABLEIDCmprFn(pReader->pBlockIdx, &minId) < 0) {
// // TODO
// } else if (tTABLEIDCmprFn(pReader->pBlockIdx, &maxId) < 0) {
// // TODO
// } else {
// // TODO
// }
} else if (pReader->pBlockIdx) {
while (pReader->iBlock < pReader->mBlock.nItem) {
SBlock block;
tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, &block, tGetBlock);
if (block.minVer <= pReader->ever && block.maxVer >= pReader->sver) {
// load data (todo)
}
pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
pReader->iBlockIdx++;
// next
pReader->iBlock++;
if (*ppData) break;
}
code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock, NULL);
if (pReader->iBlock >= pReader->mBlock.nItem) {
pReader->iBlockIdx++;
if (pReader->iBlockIdx < taosArrayGetSize(pReader->aBlockIdx)) {
pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock);
if (code) goto _err;
pReader->iBlock = 0;
}
SBlock block;
SBlock* pBlock = &block;
while (true) {
if (pReader->iBlock >= pReader->mBlock.nItem) {
} else {
pReader->pBlockIdx = NULL;
}
}
if (*ppData) goto _exit;
} else if (pReader->pBlockL) {
while (pReader->pBlockL) {
if (pReader->pBlockL->minVer <= pReader->ever && pReader->pBlockL->maxVer >= pReader->sver) {
// load data (todo)
}
// next
pReader->iBlockL++;
if (pReader->iBlockL < taosArrayGetSize(pReader->aBlockL)) {
pReader->pBlockL = (SBlockL*)taosArrayGetSize(pReader->aBlockL);
} else {
pReader->pBlockL = NULL;
}
if (*ppData) goto _exit;
}
} else {
tsdbDataFReaderClose(&pReader->pDataFReader);
break;
}
tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, pBlock, tGetBlock);
pReader->iBlock++;
if (pBlock->minVersion > pReader->ever || pBlock->maxVersion < pReader->sver) continue;
code = tsdbReadBlockData(pReader->pDataFReader, pReader->pBlockIdx, pBlock, &pReader->oBlockData, NULL, NULL);
if (code) goto _err;
// filter
tBlockDataReset(&pReader->nBlockData);
for (int32_t iColData = 0; iColData < taosArrayGetSize(pReader->oBlockData.aIdx); iColData++) {
SColData* pColDataO = tBlockDataGetColDataByIdx(&pReader->oBlockData, iColData);
SColData* pColDataN = NULL;
code = tBlockDataAddColData(&pReader->nBlockData, taosArrayGetSize(pReader->nBlockData.aIdx), &pColDataN);
if (code) goto _err;
tColDataInit(pColDataN, pColDataO->cid, pColDataO->type, pColDataO->smaOn);
}
for (int32_t iRow = 0; iRow < pReader->oBlockData.nRow; iRow++) {
TSDBROW row = tsdbRowFromBlockData(&pReader->oBlockData, iRow);
int64_t version = TSDBROW_VERSION(&row);
tsdbTrace("vgId:%d, vnode snapshot tsdb read for %s, %" PRId64 "(%" PRId64 " , %" PRId64 ")",
TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path, version, pReader->sver, pReader->ever);
if (version < pReader->sver || version > pReader->ever) continue;
code = tBlockDataAppendRow(&pReader->nBlockData, &row, NULL);
if (code) goto _err;
}
if (pReader->nBlockData.nRow <= 0) {
continue;
}
// org data
// compress data (todo)
int32_t size = sizeof(TABLEID) + tPutBlockData(NULL, &pReader->nBlockData);
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = pReader->type;
pHdr->size = size;
TABLEID* pId = (TABLEID*)(&pHdr[1]);
pId->suid = pReader->pBlockIdx->suid;
pId->uid = pReader->pBlockIdx->uid;
tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData);
tsdbInfo("vgId:%d, vnode snapshot read data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64
" iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d size:%d",
TD_VID(pTsdb->pVnode), pTsdb->path, pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid,
pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow,
size);
goto _exit;
}
}
}
@ -179,11 +185,11 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
}
// open
code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pTsdb, NULL);
code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pTsdb);
if (code) goto _err;
// read index
code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx, NULL);
code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx);
if (code) goto _err;
pReader->iDelIdx = 0;
@ -199,7 +205,7 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
pReader->iDelIdx++;
code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData, NULL);
code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData);
if (code) goto _err;
int32_t size = 0;
@ -292,10 +298,15 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->aBlockL = taosArrayInit(0, sizeof(SBlockL));
if (pReader->aBlockL == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->mBlock = tMapDataInit();
code = tBlockDataInit(&pReader->oBlockData);
code = tBlockDataCreate(&pReader->oBlockData);
if (code) goto _err;
code = tBlockDataInit(&pReader->nBlockData);
code = tBlockDataCreate(&pReader->nBlockData);
if (code) goto _err;
pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
@ -327,10 +338,11 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
if (pReader->pDataFReader) {
tsdbDataFReaderClose(&pReader->pDataFReader);
}
taosArrayDestroy(pReader->aBlockL);
taosArrayDestroy(pReader->aBlockIdx);
tMapDataClear(&pReader->mBlock);
tBlockDataClear(&pReader->oBlockData, 1);
tBlockDataClear(&pReader->nBlockData, 1);
tBlockDataDestroy(&pReader->oBlockData, 1);
tBlockDataDestroy(&pReader->nBlockData, 1);
if (pReader->pDelFReader) {
tsdbDelFReaderClose(&pReader->pDelFReader);
@ -405,6 +417,7 @@ struct STsdbSnapWriter {
int8_t cmprAlg;
int64_t commitID;
uint8_t* aBuf[5];
// for data file
SBlockData bData;
@ -418,6 +431,9 @@ struct STsdbSnapWriter {
SBlockData* pBlockData;
int32_t iRow;
SBlockData bDataR;
SArray* aBlockL; // SArray<SBlockL>
int32_t iBlockL;
SBlockData lDataR;
SDataFWriter* pDataFWriter;
SBlockIdx* pBlockIdxW; // NULL when no committing table
@ -427,6 +443,7 @@ struct STsdbSnapWriter {
SMapData mBlockW; // SMapData<SBlock>
SArray* aBlockIdxW; // SArray<SBlockIdx>
SArray* aBlockLW; // SArray<SBlockL>
// for del file
SDelFReader* pDelFReader;
@ -437,25 +454,6 @@ struct STsdbSnapWriter {
SArray* aDelIdxW;
};
static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
int32_t iRow = 0; // todo
int32_t nRow = 0; // todo
SBlockData* pBlockData = NULL; // todo
while (iRow < nRow) {
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
if (code) goto _err;
}
return code;
_err:
tsdbError("vgId:%d, tsdb snapshot write append data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path, tstrerror(code));
return code;
}
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0;
@ -467,20 +465,21 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
if (pWriter->pBlockData) {
ASSERT(pWriter->iRow < pWriter->pBlockData->nRow);
while (pWriter->iRow < pWriter->pBlockData->nRow) {
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL);
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL,
0); // todo
if (code) goto _err;
if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) {
pWriter->blockW.last = 0;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
&pWriter->blockW, pWriter->cmprAlg);
// pWriter->blockW.last = 0;
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
// &pWriter->blockW, pWriter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
if (code) goto _err;
tBlockReset(&pWriter->blockW);
tBlockDataClearData(&pWriter->bDataW);
tBlockDataClear(&pWriter->bDataW);
}
pWriter->iRow++;
@ -489,16 +488,16 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
// write remain data if has
if (pWriter->bDataW.nRow > 0) {
pWriter->blockW.last = 0;
// pWriter->blockW.last = 0;
if (pWriter->bDataW.nRow < pWriter->minRow) {
if (pWriter->iBlock > pWriter->mBlock.nItem) {
pWriter->blockW.last = 1;
// pWriter->blockW.last = 1;
}
}
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
&pWriter->blockW, pWriter->cmprAlg);
if (code) goto _err;
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
// &pWriter->blockW, pWriter->cmprAlg);
// if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
if (code) goto _err;
@ -510,16 +509,16 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
SBlock block;
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
if (block.last) {
code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
if (code) goto _err;
// if (block.last) {
// code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
// if (code) goto _err;
tBlockReset(&block);
block.last = 1;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pWriter->pBlockIdxW, &block,
pWriter->cmprAlg);
if (code) goto _err;
}
// tBlockReset(&block);
// block.last = 1;
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pWriter->pBlockIdxW, &block,
// pWriter->cmprAlg);
// if (code) goto _err;
// }
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
if (code) goto _err;
@ -528,8 +527,8 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
}
// SBlock
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW);
if (code) goto _err;
// code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW);
// if (code) goto _err;
// SBlockIdx
if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) {
@ -550,7 +549,7 @@ _err:
static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) {
int32_t code = 0;
code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock, NULL);
code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock);
if (code) goto _err;
// SBlockData
@ -559,16 +558,17 @@ static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* p
for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) {
tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock);
if (block.last) {
code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
if (code) goto _err;
// if (block.last) {
// code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
// if (code) goto _err;
tBlockReset(&block);
block.last = 1;
code =
tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block, pWriter->cmprAlg);
if (code) goto _err;
}
// tBlockReset(&block);
// block.last = 1;
// code =
// tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block,
// pWriter->cmprAlg);
// if (code) goto _err;
// }
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
if (code) goto _err;
@ -576,7 +576,7 @@ static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* p
// SBlock
SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, &blockIdx);
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, &blockIdx);
if (code) goto _err;
// SBlockIdx
@ -601,9 +601,9 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
TSDBROW row;
TSDBROW* pRow = &row;
// correct schema
code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData);
if (code) goto _err;
// // correct schema
// code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData);
// if (code) goto _err;
// loop to merge
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
@ -618,8 +618,8 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
ASSERT(c);
if (c < 0) {
code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
if (code) goto _err;
// code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
// if (code) goto _err;
iRow++;
if (iRow < pWriter->pBlockData->nRow) {
@ -628,8 +628,8 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
pRow = NULL;
}
} else if (c > 0) {
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL);
if (code) goto _err;
// code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow),
// NULL); if (code) goto _err;
pWriter->iRow++;
if (pWriter->iRow >= pWriter->pBlockData->nRow) {
@ -647,16 +647,15 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
if (block.last) {
pWriter->pBlockData = &pWriter->bDataR;
// if (block.last) {
// pWriter->pBlockData = &pWriter->bDataR;
code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL);
if (code) goto _err;
pWriter->iRow = 0;
// code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL,
// NULL); if (code) goto _err; pWriter->iRow = 0;
pWriter->iBlock++;
break;
}
// pWriter->iBlock++;
// break;
// }
c = tsdbKeyCmprFn(&block.maxKey, &key);
@ -664,16 +663,16 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
if (c < 0) {
if (pWriter->bDataW.nRow) {
pWriter->blockW.last = 0;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
&pWriter->blockW, pWriter->cmprAlg);
if (code) goto _err;
// pWriter->blockW.last = 0;
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
// &pWriter->blockW, pWriter->cmprAlg);
// if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
if (code) goto _err;
tBlockReset(&pWriter->blockW);
tBlockDataClearData(&pWriter->bDataW);
tBlockDataClear(&pWriter->bDataW);
}
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
@ -687,9 +686,10 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
if (c > 0) {
pWriter->pBlockData = &pWriter->bDataR;
code =
tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL);
if (code) goto _err;
// code =
// tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL,
// NULL);
// if (code) goto _err;
pWriter->iRow = 0;
pWriter->iBlock++;
@ -700,8 +700,8 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
if (pWriter->pBlockData) continue;
code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
if (code) goto _err;
// code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
// if (code) goto _err;
iRow++;
if (iRow < pBlockData->nRow) {
@ -715,15 +715,15 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue;
_write_block:
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
&pWriter->blockW, pWriter->cmprAlg);
if (code) goto _err;
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
// &pWriter->blockW, pWriter->cmprAlg);
// if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
if (code) goto _err;
tBlockReset(&pWriter->blockW);
tBlockDataClearData(&pWriter->bDataW);
tBlockDataClear(&pWriter->bDataW);
}
return code;
@ -789,7 +789,7 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) {
}
if (pWriter->pBlockIdx) {
code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock, NULL);
code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock);
if (code) goto _err;
} else {
tMapDataReset(&pWriter->mBlock);
@ -831,9 +831,11 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
if (pWriter->pDataFWriter == NULL) goto _exit;
// finish current table
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
// move remain table
while (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
code = tsdbSnapMoveWriteTableData(pWriter, (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx));
if (code) goto _err;
@ -841,8 +843,16 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
pWriter->iBlockIdx++;
}
code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW, NULL);
// write remain stuff
if (taosArrayGetSize(pWriter->aBlockLW) > 0) {
code = tsdbWriteBlockL(pWriter->pDataFWriter, pWriter->aBlockIdxW);
if (code) goto _err;
}
if (taosArrayGetSize(pWriter->aBlockIdx) > 0) {
code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW);
if (code) goto _err;
}
code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet);
if (code) goto _err;
@ -868,17 +878,20 @@ _err:
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr));
int64_t n;
// decode
SBlockData* pBlockData = &pWriter->bData;
n = tGetBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pBlockData);
ASSERT(n + sizeof(SSnapDataHdr) + sizeof(TABLEID) == nData);
code = tDecmprBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pHdr->size - sizeof(TABLEID), pBlockData,
pWriter->aBuf);
if (code) goto _err;
// open file
TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData);
TSDBKEY keyLast = tBlockDataLastKey(pBlockData);
TSDBKEY keyFirst = {.version = pBlockData->aVersion[0], .ts = pBlockData->aTSKEY[0]};
TSDBKEY keyLast = {.version = pBlockData->aVersion[pBlockData->nRow - 1],
.ts = pBlockData->aTSKEY[pBlockData->nRow - 1]};
int32_t fid = tsdbKeyFid(keyFirst.ts, pWriter->minutes, pWriter->precision);
ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision));
@ -895,11 +908,15 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
if (code) goto _err;
code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx, NULL);
code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx);
if (code) goto _err;
code = tsdbReadBlockL(pWriter->pDataFReader, pWriter->aBlockL);
if (code) goto _err;
} else {
ASSERT(pWriter->pDataFReader == NULL);
taosArrayClear(pWriter->aBlockIdx);
taosArrayClear(pWriter->aBlockL);
}
pWriter->iBlockIdx = 0;
pWriter->pBlockIdx = NULL;
@ -907,7 +924,9 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
pWriter->iBlock = 0;
pWriter->pBlockData = NULL;
pWriter->iRow = 0;
pWriter->iBlockL = 0;
tBlockDataReset(&pWriter->bDataR);
tBlockDataReset(&pWriter->lDataR);
// write
SHeadFile fHead;
@ -928,7 +947,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
wSet.fid = fid;
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
fData = (SDataFile){.commitID = pWriter->commitID, .size = 0};
fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0};
fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0, .offset = 0};
fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0};
}
@ -936,6 +955,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
if (code) goto _err;
taosArrayClear(pWriter->aBlockIdxW);
taosArrayClear(pWriter->aBlockLW);
tMapDataReset(&pWriter->mBlockW);
pWriter->pBlockIdxW = NULL;
tBlockDataReset(&pWriter->bDataW);
@ -963,10 +983,10 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
// reader
if (pDelFile) {
code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL);
code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb);
if (code) goto _err;
code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR, NULL);
code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR);
if (code) goto _err;
}
@ -980,52 +1000,16 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr));
while (true) {
SDelIdx* pDelIdx = NULL;
int64_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID);
SDelData delData;
SDelIdx delIdx;
int8_t toBreak = 0;
if (pWriter->iDelIdx >= taosArrayGetSize(pWriter->aDelIdxR)) break;
if (tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) >= 0) break;
if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR)) {
pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
}
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
if (pDelIdx) {
int32_t c = tTABLEIDCmprFn(&id, pDelIdx);
if (c < 0) {
goto _new_del;
} else {
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, NULL);
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
if (code) goto _err;
pWriter->iDelIdx++;
if (c == 0) {
toBreak = 1;
delIdx = (SDelIdx){.suid = id.suid, .uid = id.uid};
goto _merge_del;
} else {
delIdx = (SDelIdx){.suid = pDelIdx->suid, .uid = pDelIdx->uid};
goto _write_del;
}
}
}
_new_del:
toBreak = 1;
delIdx = (SDelIdx){.suid = id.suid, .uid = id.uid};
taosArrayClear(pWriter->aDelData);
_merge_del:
while (n < nData) {
n += tGetDelData(pData + n, &delData);
if (taosArrayPush(pWriter->aDelData, &delData) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
_write_del:
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx);
SDelIdx delIdx = *pDelIdx;
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
if (code) goto _err;
if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
@ -1033,7 +1017,40 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
goto _err;
}
if (toBreak) break;
pWriter->iDelIdx++;
}
if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR) &&
tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) == 0) {
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
if (code) goto _err;
pWriter->iDelIdx++;
} else {
taosArrayClear(pWriter->aDelData);
}
int64_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID);
while (n < nData) {
SDelData delData;
n += tGetDelData(pData + n, &delData);
if (taosArrayPush(pWriter->aDelData, &delData) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
SDelIdx delIdx = {.suid = id.suid, .uid = id.uid};
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
if (code) goto _err;
if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
_exit:
@ -1054,11 +1071,11 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR); pWriter->iDelIdx++) {
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, NULL);
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
if (code) goto _err;
SDelIdx delIdx = (SDelIdx){.suid = pDelIdx->suid, .uid = pDelIdx->uid};
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx);
SDelIdx delIdx = *pDelIdx;
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
if (code) goto _err;
if (taosArrayPush(pWriter->aDelIdxR, &delIdx) == NULL) {
@ -1117,7 +1134,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
pWriter->commitID = pTsdb->pVnode->state.commitID;
// for data file
code = tBlockDataInit(&pWriter->bData);
code = tBlockDataCreate(&pWriter->bData);
if (code) goto _err;
pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
@ -1125,17 +1142,29 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tBlockDataInit(&pWriter->bDataR);
code = tBlockDataCreate(&pWriter->bDataR);
if (code) goto _err;
pWriter->aBlockL = taosArrayInit(0, sizeof(SBlockL));
if (pWriter->aBlockL == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->aBlockIdxW = taosArrayInit(0, sizeof(SBlockIdx));
if (pWriter->aBlockIdxW == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tBlockDataInit(&pWriter->bDataW);
code = tBlockDataCreate(&pWriter->bDataW);
if (code) goto _err;
pWriter->aBlockLW = taosArrayInit(0, sizeof(SBlockL));
if (pWriter->aBlockLW == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
// for del file
pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx));
if (pWriter->aDelIdxR == NULL) {
@ -1186,6 +1215,10 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
if (code) goto _err;
}
for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
tFree(pWriter->aBuf[iBuf]);
}
tsdbInfo("vgId:%d, vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
taosMemoryFree(pWriter);
*ppWriter = NULL;
@ -1224,6 +1257,7 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
_exit:
tsdbDebug("vgId:%d, tsdb snapshot write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
return code;
_err:

File diff suppressed because it is too large Load Diff

View File

@ -76,6 +76,12 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#define realloc u_realloc
#endif
#define T_LONG_JMP(_obj, _c) \
do { \
assert((_c) != -1); \
longjmp((_obj), (_c)); \
} while (0);
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

View File

@ -5574,6 +5574,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
if (pCtx->end.key != INT64_MIN) {
pInfo->dOutput += twa_get_area(pInfo->p, pCtx->end);
pInfo->p = pCtx->end;
numOfElems += 1;
}
pInfo->win.ekey = pInfo->p.key;

210
source/util/src/trbtree.c Normal file
View File

@ -0,0 +1,210 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
typedef int32_t (*tRBTreeCmprFn)(void *, void *);
typedef struct SRBTree SRBTree;
typedef struct SRBTreeNode SRBTreeNode;
typedef struct SRBTreeIter SRBTreeIter;
struct SRBTreeNode {
enum { RED, BLACK } color;
SRBTreeNode *parent;
SRBTreeNode *left;
SRBTreeNode *right;
uint8_t payload[];
};
struct SRBTree {
tRBTreeCmprFn cmprFn;
SRBTreeNode *root;
};
struct SRBTreeIter {
SRBTree *pTree;
};
#define RBTREE_NODE_COLOR(N) ((N) ? (N)->color : BLACK)
// APIs ================================================
static void tRBTreeRotateLeft(SRBTree *pTree, SRBTreeNode *pNode) {
SRBTreeNode *right = pNode->right;
pNode->right = right->left;
if (pNode->right) {
pNode->right->parent = pNode;
}
right->parent = pNode->parent;
if (pNode->parent == NULL) {
pTree->root = right;
} else if (pNode == pNode->parent->left) {
pNode->parent->left = right;
} else {
pNode->parent->right = right;
}
right->left = pNode;
pNode->parent = right;
}
static void tRBTreeRotateRight(SRBTree *pTree, SRBTreeNode *pNode) {
SRBTreeNode *left = pNode->left;
pNode->left = left->right;
if (pNode->left) {
pNode->left->parent = pNode;
}
left->parent = pNode->parent;
if (pNode->parent == NULL) {
pTree->root = left;
} else if (pNode == pNode->parent->left) {
pNode->parent->left = left;
} else {
pNode->parent->right = left;
}
left->right = pNode;
pNode->parent = left;
}
#define tRBTreeCreate(compare) \
(SRBTree) { .cmprFn = (compare), .root = NULL }
SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *pNew) {
pNew->left = NULL;
pNew->right = NULL;
pNew->color = RED;
// insert
if (pTree->root == NULL) {
pNew->parent = NULL;
pTree->root = pNew;
} else {
SRBTreeNode *pNode = pTree->root;
while (true) {
ASSERT(pNode);
int32_t c = pTree->cmprFn(pNew->payload, pNode->payload);
if (c < 0) {
if (pNode->left) {
pNode = pNode->left;
} else {
pNew->parent = pNode;
pNode->left = pNew;
break;
}
} else if (c > 0) {
if (pNode->right) {
pNode = pNode->right;
} else {
pNew->parent = pNode;
pNode->right = pNew;
break;
}
} else {
return NULL;
}
}
}
// fix
SRBTreeNode *pNode = pNew;
while (pNode->parent && pNode->parent->color == RED) {
SRBTreeNode *p = pNode->parent;
SRBTreeNode *g = p->parent;
if (p == g->left) {
SRBTreeNode *u = g->right;
if (RBTREE_NODE_COLOR(u) == RED) {
p->color = BLACK;
u->color = BLACK;
g->color = RED;
pNode = g;
} else {
if (pNode == p->right) {
pNode = p;
tRBTreeRotateLeft(pTree, pNode);
}
pNode->parent->color = BLACK;
pNode->parent->parent->color = RED;
tRBTreeRotateRight(pTree, pNode->parent->parent);
}
} else {
SRBTreeNode *u = g->left;
if (RBTREE_NODE_COLOR(u) == RED) {
p->color = BLACK;
u->color = BLACK;
g->color = RED;
} else {
if (pNode == p->left) {
pNode = p;
tRBTreeRotateRight(pTree, pNode);
}
pNode->parent->color = BLACK;
pNode->parent->parent->color = RED;
tRBTreeRotateLeft(pTree, pNode->parent->parent);
}
}
}
pTree->root->color = BLACK;
return pNew;
}
SRBTreeNode *tRBTreeDrop(SRBTree *pTree, void *pKey) {
SRBTreeNode *pNode = pTree->root;
while (pNode) {
int32_t c = pTree->cmprFn(pKey, pNode->payload);
if (c < 0) {
pNode = pNode->left;
} else if (c > 0) {
pNode = pNode->right;
} else {
break;
}
}
if (pNode) {
// TODO
}
return pNode;
}
SRBTreeNode *tRBTreeGet(SRBTree *pTree, void *pKey) {
SRBTreeNode *pNode = pTree->root;
while (pNode) {
int32_t c = pTree->cmprFn(pKey, pNode->payload);
if (c < 0) {
pNode = pNode->left;
} else if (c > 0) {
pNode = pNode->right;
} else {
break;
}
}
return pNode;
}

View File

@ -70,6 +70,7 @@ if $data00 != @15-08-18 00:00:00.000@ then
return -1
endi
if $data01 != 2.068333156 then
print expect 2.068333156, actual: $data01
return -1
endi
if $data02 != 2.063999891 then
@ -128,6 +129,7 @@ if $data03 != 2 then
return -1
endi
if $data11 != 2.077099980 then
print expect 2.077099980, actual: $data11
return -1
endi
if $data12 != 2.077000022 then