Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_refact
This commit is contained in:
commit
267e450a15
|
@ -628,6 +628,8 @@ int32_t* taosGetErrno();
|
|||
|
||||
//index
|
||||
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
|
||||
#define TSDB_CODE_INDEX_INVALID_FILE TAOS_DEF_ERROR_CODE(0, 0x3201)
|
||||
|
||||
|
||||
//tmq
|
||||
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
|
||||
|
|
|
@ -21,7 +21,7 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
#include "indexFstAutomation.h"
|
||||
#include "indexFstCountingWriter.h"
|
||||
#include "indexFstFile.h"
|
||||
#include "indexFstNode.h"
|
||||
#include "indexFstRegistry.h"
|
||||
#include "indexFstUtil.h"
|
||||
|
@ -90,8 +90,8 @@ FstBuilderNode* fstUnFinishedNodesPopEmpty(FstUnFinishedNodes* nodes);
|
|||
uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes* node, FstSlice bs, Output in, Output* out);
|
||||
|
||||
typedef struct FstBuilder {
|
||||
FstCountingWriter* wrt; // The FST raw data is written directly to `wtr`.
|
||||
FstUnFinishedNodes* unfinished; // The stack of unfinished nodes
|
||||
IdxFstFile* wrt; // The FST raw data is written directly to `wtr`.
|
||||
FstUnFinishedNodes* unfinished; // The stack of unfinished nodes
|
||||
FstRegistry* registry; // A map of finished nodes.
|
||||
FstSlice last; // The last word added
|
||||
CompiledAddr lastAddr; // The address of the last compiled node
|
||||
|
@ -125,9 +125,9 @@ FstState fstStateCreateFrom(FstSlice* data, CompiledAddr addr);
|
|||
FstState fstStateCreate(State state);
|
||||
|
||||
// compile
|
||||
void fstStateCompileForOneTransNext(FstCountingWriter* w, CompiledAddr addr, uint8_t inp);
|
||||
void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTransition* trn);
|
||||
void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuilderNode* node);
|
||||
void fstStateCompileForOneTransNext(IdxFstFile* w, CompiledAddr addr, uint8_t inp);
|
||||
void fstStateCompileForOneTrans(IdxFstFile* w, CompiledAddr addr, FstTransition* trn);
|
||||
void fstStateCompileForAnyTrans(IdxFstFile* w, CompiledAddr addr, FstBuilderNode* node);
|
||||
|
||||
// set_comm_input
|
||||
void fstStateSetCommInput(FstState* state, uint8_t inp);
|
||||
|
@ -282,7 +282,7 @@ FStmSt* stmBuilderIntoStm(FStmBuilder* sb);
|
|||
bool fstVerify(Fst* fst);
|
||||
|
||||
// refactor this function
|
||||
bool fstBuilderNodeCompileTo(FstBuilderNode* b, FstCountingWriter* wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
|
||||
bool fstBuilderNodeCompileTo(FstBuilderNode* b, IdxFstFile* wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
|
||||
|
||||
typedef struct StreamState {
|
||||
FstNode* node;
|
||||
|
|
|
@ -1,96 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef __INDEX_FST_COUNTING_WRITER_H__
|
||||
#define __INDEX_FST_COUNTING_WRITER_H__
|
||||
|
||||
#include "indexInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
//#define USE_MMAP 1
|
||||
|
||||
#define DefaultMem 1024 * 1024
|
||||
|
||||
static char tmpFile[] = "./index";
|
||||
typedef enum WriterType { TMemory, TFile } WriterType;
|
||||
|
||||
typedef struct WriterCtx {
|
||||
int (*write)(struct WriterCtx* ctx, uint8_t* buf, int len);
|
||||
int (*read)(struct WriterCtx* ctx, uint8_t* buf, int len);
|
||||
int (*flush)(struct WriterCtx* ctx);
|
||||
int (*readFrom)(struct WriterCtx* ctx, uint8_t* buf, int len, int32_t offset);
|
||||
int (*size)(struct WriterCtx* ctx);
|
||||
WriterType type;
|
||||
union {
|
||||
struct {
|
||||
TdFilePtr pFile;
|
||||
bool readOnly;
|
||||
char buf[256];
|
||||
int size;
|
||||
#ifdef USE_MMAP
|
||||
char* ptr;
|
||||
#endif
|
||||
} file;
|
||||
struct {
|
||||
int32_t capa;
|
||||
char* buf;
|
||||
} mem;
|
||||
};
|
||||
int32_t offset;
|
||||
int32_t limit;
|
||||
} WriterCtx;
|
||||
|
||||
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len);
|
||||
static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len);
|
||||
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset);
|
||||
static int writeCtxDoFlush(WriterCtx* ctx);
|
||||
|
||||
WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity);
|
||||
void writerCtxDestroy(WriterCtx* w, bool remove);
|
||||
|
||||
typedef uint32_t CheckSummer;
|
||||
|
||||
typedef struct FstCountingWriter {
|
||||
void* wrt; // wrap any writer that counts and checksum bytes written
|
||||
uint64_t count;
|
||||
CheckSummer summer;
|
||||
} FstCountingWriter;
|
||||
|
||||
int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len);
|
||||
|
||||
int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len);
|
||||
|
||||
int fstCountingWriterFlush(FstCountingWriter* write);
|
||||
|
||||
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write);
|
||||
|
||||
FstCountingWriter* fstCountingWriterCreate(void* wtr);
|
||||
void fstCountingWriterDestroy(FstCountingWriter* w);
|
||||
|
||||
void fstCountingWriterPackUintIn(FstCountingWriter* writer, uint64_t n, uint8_t nBytes);
|
||||
uint8_t fstCountingWriterPackUint(FstCountingWriter* writer, uint64_t n);
|
||||
|
||||
#define FST_WRITER_COUNT(writer) (writer->count)
|
||||
#define FST_WRITER_INTER_WRITER(writer) (writer->wtr)
|
||||
#define FST_WRITE_CHECK_SUMMER(writer) (writer->summer)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef __INDEX_FST_FILE_H__
|
||||
#define __INDEX_FST_FILE_H__
|
||||
|
||||
#include "indexInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
//#define USE_MMAP 1
|
||||
|
||||
#define DefaultMem 1024 * 1024
|
||||
|
||||
static char tmpFile[] = "./index";
|
||||
typedef enum WriterType { TMemory, TFile } WriterType;
|
||||
|
||||
typedef struct IFileCtx {
|
||||
int (*write)(struct IFileCtx* ctx, uint8_t* buf, int len);
|
||||
int (*read)(struct IFileCtx* ctx, uint8_t* buf, int len);
|
||||
int (*flush)(struct IFileCtx* ctx);
|
||||
int (*readFrom)(struct IFileCtx* ctx, uint8_t* buf, int len, int32_t offset);
|
||||
int (*size)(struct IFileCtx* ctx);
|
||||
WriterType type;
|
||||
union {
|
||||
struct {
|
||||
TdFilePtr pFile;
|
||||
bool readOnly;
|
||||
char buf[256];
|
||||
int64_t size;
|
||||
#ifdef USE_MMAP
|
||||
char* ptr;
|
||||
#endif
|
||||
} file;
|
||||
struct {
|
||||
int32_t cap;
|
||||
char* buf;
|
||||
} mem;
|
||||
};
|
||||
int32_t offset;
|
||||
int32_t limit;
|
||||
} IFileCtx;
|
||||
|
||||
static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len);
|
||||
static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len);
|
||||
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset);
|
||||
static int idxFileCtxDoFlush(IFileCtx* ctx);
|
||||
|
||||
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity);
|
||||
void idxFileCtxDestroy(IFileCtx* w, bool remove);
|
||||
|
||||
typedef uint32_t CheckSummer;
|
||||
|
||||
typedef struct IdxFstFile {
|
||||
void* wrt; // wrap any writer that counts and checksum bytes written
|
||||
uint64_t count;
|
||||
CheckSummer summer;
|
||||
} IdxFstFile;
|
||||
|
||||
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len);
|
||||
|
||||
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len);
|
||||
|
||||
int idxFileFlush(IdxFstFile* write);
|
||||
|
||||
uint32_t idxFileMaskedCheckSum(IdxFstFile* write);
|
||||
|
||||
IdxFstFile* idxFileCreate(void* wtr);
|
||||
void idxFileDestroy(IdxFstFile* w);
|
||||
|
||||
void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes);
|
||||
uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n);
|
||||
|
||||
#define FST_WRITER_COUNT(writer) (writer->count)
|
||||
#define FST_WRITER_INTER_WRITER(writer) (writer->wtr)
|
||||
#define FST_WRITE_CHECK_SUMMER(writer) (writer->summer)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -20,12 +20,12 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "indexFstCountingWriter.h"
|
||||
#include "indexFstFile.h"
|
||||
#include "indexFstUtil.h"
|
||||
#include "indexInt.h"
|
||||
|
||||
#define FST_BUILDER_NODE_IS_FINAL(bn) (bn->isFinal)
|
||||
#define FST_BUILDER_NODE_TRANS_ISEMPTY(bn) (taosArrayGetSize(bn->trans) == 0)
|
||||
#define FST_BUILDER_NODE_IS_FINAL(bn) (bn->isFinal)
|
||||
#define FST_BUILDER_NODE_TRANS_ISEMPTY(bn) (taosArrayGetSize(bn->trans) == 0)
|
||||
#define FST_BUILDER_NODE_FINALOUTPUT_ISZERO(bn) (bn->finalOutput == 0)
|
||||
|
||||
typedef struct FstTransition {
|
||||
|
@ -46,7 +46,7 @@ FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src);
|
|||
|
||||
void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src);
|
||||
|
||||
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt,
|
||||
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile' *wrt,
|
||||
// CompiledAddr lastAddr, CompiledAddr startAddr);
|
||||
bool fstBuilderNodeEqual(FstBuilderNode* n1, FstBuilderNode* n2);
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#define __INDEX_TFILE_H__
|
||||
|
||||
#include "indexFst.h"
|
||||
#include "indexFstCountingWriter.h"
|
||||
#include "indexFstFile.h"
|
||||
#include "indexInt.h"
|
||||
#include "indexTfile.h"
|
||||
#include "indexUtil.h"
|
||||
|
@ -59,7 +59,7 @@ typedef struct TFileCache {
|
|||
|
||||
typedef struct TFileWriter {
|
||||
FstBuilder* fb;
|
||||
WriterCtx* ctx;
|
||||
IFileCtx* ctx;
|
||||
TFileHeader header;
|
||||
uint32_t offset;
|
||||
} TFileWriter;
|
||||
|
@ -68,7 +68,7 @@ typedef struct TFileWriter {
|
|||
typedef struct TFileReader {
|
||||
T_REF_DECLARE()
|
||||
Fst* fst;
|
||||
WriterCtx* ctx;
|
||||
IFileCtx* ctx;
|
||||
TFileHeader header;
|
||||
bool remove;
|
||||
} TFileReader;
|
||||
|
@ -103,7 +103,7 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* read
|
|||
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
|
||||
|
||||
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName);
|
||||
TFileReader* tfileReaderCreate(WriterCtx* ctx);
|
||||
TFileReader* tfileReaderCreate(IFileCtx* ctx);
|
||||
void tfileReaderDestroy(TFileReader* reader);
|
||||
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr);
|
||||
void tfileReaderRef(TFileReader* reader);
|
||||
|
@ -111,7 +111,7 @@ void tfileReaderUnRef(TFileReader* reader);
|
|||
|
||||
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t type);
|
||||
void tfileWriterClose(TFileWriter* tw);
|
||||
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header);
|
||||
TFileWriter* tfileWriterCreate(IFileCtx* ctx, TFileHeader* header);
|
||||
void tfileWriterDestroy(TFileWriter* tw);
|
||||
int tfileWriterPut(TFileWriter* tw, void* data, bool order);
|
||||
int tfileWriterFinish(TFileWriter* tw);
|
||||
|
|
|
@ -39,7 +39,7 @@
|
|||
#define INDEX_DATA_BIGINT_NULL 0x8000000000000000LL
|
||||
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL
|
||||
|
||||
#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
|
||||
#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
|
||||
#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000LL // an NAN
|
||||
#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF
|
||||
#define INDEX_DATA_BINARY_NULL 0xFF
|
||||
|
@ -614,7 +614,7 @@ static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
|||
return ret;
|
||||
END:
|
||||
if (tw != NULL) {
|
||||
writerCtxDestroy(tw->ctx, true);
|
||||
idxFileCtxDestroy(tw->ctx, true);
|
||||
taosMemoryFree(tw);
|
||||
}
|
||||
return -1;
|
||||
|
|
|
@ -19,11 +19,11 @@
|
|||
#include "tchecksum.h"
|
||||
#include "tcoding.h"
|
||||
|
||||
static void fstPackDeltaIn(FstCountingWriter* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr, uint8_t nBytes) {
|
||||
static void fstPackDeltaIn(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr, uint8_t nBytes) {
|
||||
CompiledAddr deltaAddr = (transAddr == EMPTY_ADDRESS) ? EMPTY_ADDRESS : nodeAddr - transAddr;
|
||||
fstCountingWriterPackUintIn(wrt, deltaAddr, nBytes);
|
||||
idxFilePackUintIn(wrt, deltaAddr, nBytes);
|
||||
}
|
||||
static uint8_t fstPackDetla(FstCountingWriter* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr) {
|
||||
static uint8_t fstPackDetla(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr) {
|
||||
uint8_t nBytes = packDeltaSize(nodeAddr, transAddr);
|
||||
fstPackDeltaIn(wrt, nodeAddr, transAddr, nBytes);
|
||||
return nBytes;
|
||||
|
@ -208,7 +208,7 @@ FstState fstStateCreate(State state) {
|
|||
return fstStateDict[idx];
|
||||
}
|
||||
// compile
|
||||
void fstStateCompileForOneTransNext(FstCountingWriter* w, CompiledAddr addr, uint8_t inp) {
|
||||
void fstStateCompileForOneTransNext(IdxFstFile* w, CompiledAddr addr, uint8_t inp) {
|
||||
FstState s = fstStateCreate(OneTransNext);
|
||||
fstStateSetCommInput(&s, inp);
|
||||
|
||||
|
@ -216,21 +216,21 @@ void fstStateCompileForOneTransNext(FstCountingWriter* w, CompiledAddr addr, uin
|
|||
uint8_t v = fstStateCommInput(&s, &null);
|
||||
if (null) {
|
||||
// w->write_all(&[inp])
|
||||
fstCountingWriterWrite(w, &inp, 1);
|
||||
idxFileWrite(w, &inp, 1);
|
||||
}
|
||||
fstCountingWriterWrite(w, &(s.val), 1);
|
||||
idxFileWrite(w, &(s.val), 1);
|
||||
// w->write_all(&[s.val])
|
||||
return;
|
||||
}
|
||||
void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTransition* trn) {
|
||||
void fstStateCompileForOneTrans(IdxFstFile* w, CompiledAddr addr, FstTransition* trn) {
|
||||
Output out = trn->out;
|
||||
uint8_t outPackSize = (out == 0 ? 0 : fstCountingWriterPackUint(w, out));
|
||||
uint8_t outPackSize = (out == 0 ? 0 : idxFilePackUint(w, out));
|
||||
uint8_t transPackSize = fstPackDetla(w, addr, trn->addr);
|
||||
PackSizes packSizes = 0;
|
||||
|
||||
FST_SET_OUTPUT_PACK_SIZE(packSizes, outPackSize);
|
||||
FST_SET_TRANSITION_PACK_SIZE(packSizes, transPackSize);
|
||||
fstCountingWriterWrite(w, (char*)&packSizes, sizeof(packSizes));
|
||||
idxFileWrite(w, (char*)&packSizes, sizeof(packSizes));
|
||||
|
||||
FstState st = fstStateCreate(OneTrans);
|
||||
|
||||
|
@ -239,12 +239,12 @@ void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTran
|
|||
bool null = false;
|
||||
uint8_t inp = fstStateCommInput(&st, &null);
|
||||
if (null == true) {
|
||||
fstCountingWriterWrite(w, (char*)&trn->inp, sizeof(trn->inp));
|
||||
idxFileWrite(w, (char*)&trn->inp, sizeof(trn->inp));
|
||||
}
|
||||
fstCountingWriterWrite(w, (char*)(&(st.val)), sizeof(st.val));
|
||||
idxFileWrite(w, (char*)(&(st.val)), sizeof(st.val));
|
||||
return;
|
||||
}
|
||||
void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuilderNode* node) {
|
||||
void fstStateCompileForAnyTrans(IdxFstFile* w, CompiledAddr addr, FstBuilderNode* node) {
|
||||
int32_t sz = taosArrayGetSize(node->trans);
|
||||
assert(sz <= 256);
|
||||
|
||||
|
@ -275,11 +275,11 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
|
|||
|
||||
if (anyOuts) {
|
||||
if (FST_BUILDER_NODE_IS_FINAL(node)) {
|
||||
fstCountingWriterPackUintIn(w, node->finalOutput, oSize);
|
||||
idxFilePackUintIn(w, node->finalOutput, oSize);
|
||||
}
|
||||
for (int32_t i = sz - 1; i >= 0; i--) {
|
||||
FstTransition* t = taosArrayGet(node->trans, i);
|
||||
fstCountingWriterPackUintIn(w, t->out, oSize);
|
||||
idxFilePackUintIn(w, t->out, oSize);
|
||||
}
|
||||
}
|
||||
for (int32_t i = sz - 1; i >= 0; i--) {
|
||||
|
@ -288,7 +288,7 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
|
|||
}
|
||||
for (int32_t i = sz - 1; i >= 0; i--) {
|
||||
FstTransition* t = taosArrayGet(node->trans, i);
|
||||
fstCountingWriterWrite(w, (char*)&t->inp, 1);
|
||||
idxFileWrite(w, (char*)&t->inp, 1);
|
||||
// fstPackDeltaIn(w, addr, t->addr, tSize);
|
||||
}
|
||||
if (sz > TRANS_INDEX_THRESHOLD) {
|
||||
|
@ -306,10 +306,10 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
|
|||
index[t->inp] = i;
|
||||
// fstPackDeltaIn(w, addr, t->addr, tSize);
|
||||
}
|
||||
fstCountingWriterWrite(w, (char*)index, 256);
|
||||
idxFileWrite(w, (char*)index, 256);
|
||||
taosMemoryFree(index);
|
||||
}
|
||||
fstCountingWriterWrite(w, (char*)&packSizes, 1);
|
||||
idxFileWrite(w, (char*)&packSizes, 1);
|
||||
bool null = false;
|
||||
fstStateStateNtrans(&st, &null);
|
||||
if (null == true) {
|
||||
|
@ -318,12 +318,12 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
|
|||
// encoded in the state byte.
|
||||
uint8_t v = 1;
|
||||
if (sz == 256) {
|
||||
fstCountingWriterWrite(w, (char*)&v, 1);
|
||||
idxFileWrite(w, (char*)&v, 1);
|
||||
} else {
|
||||
fstCountingWriterWrite(w, (char*)&sz, 1);
|
||||
idxFileWrite(w, (char*)&sz, 1);
|
||||
}
|
||||
}
|
||||
fstCountingWriterWrite(w, (char*)(&(st.val)), 1);
|
||||
idxFileWrite(w, (char*)(&(st.val)), 1);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -753,7 +753,7 @@ bool fstNodeCompile(FstNode* node, void* w, CompiledAddr lastAddr, CompiledAddr
|
|||
return true;
|
||||
}
|
||||
|
||||
bool fstBuilderNodeCompileTo(FstBuilderNode* b, FstCountingWriter* wrt, CompiledAddr lastAddr, CompiledAddr startAddr) {
|
||||
bool fstBuilderNodeCompileTo(FstBuilderNode* b, IdxFstFile* wrt, CompiledAddr lastAddr, CompiledAddr startAddr) {
|
||||
return fstNodeCompile(NULL, wrt, lastAddr, startAddr, b);
|
||||
}
|
||||
|
||||
|
@ -763,7 +763,7 @@ FstBuilder* fstBuilderCreate(void* w, FstType ty) {
|
|||
return b;
|
||||
}
|
||||
|
||||
b->wrt = fstCountingWriterCreate(w);
|
||||
b->wrt = idxFileCreate(w);
|
||||
b->unfinished = fstUnFinishedNodesCreate();
|
||||
b->registry = fstRegistryCreate(10000, 2);
|
||||
b->last = fstSliceCreate(NULL, 0);
|
||||
|
@ -773,12 +773,12 @@ FstBuilder* fstBuilderCreate(void* w, FstType ty) {
|
|||
char buf64[8] = {0};
|
||||
void* pBuf64 = buf64;
|
||||
taosEncodeFixedU64(&pBuf64, VERSION);
|
||||
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
|
||||
idxFileWrite(b->wrt, buf64, sizeof(buf64));
|
||||
|
||||
pBuf64 = buf64;
|
||||
memset(buf64, 0, sizeof(buf64));
|
||||
taosEncodeFixedU64(&pBuf64, ty);
|
||||
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
|
||||
idxFileWrite(b->wrt, buf64, sizeof(buf64));
|
||||
|
||||
return b;
|
||||
}
|
||||
|
@ -787,7 +787,7 @@ void fstBuilderDestroy(FstBuilder* b) {
|
|||
return;
|
||||
}
|
||||
|
||||
fstCountingWriterDestroy(b->wrt);
|
||||
idxFileDestroy(b->wrt);
|
||||
fstUnFinishedNodesDestroy(b->unfinished);
|
||||
fstRegistryDestroy(b->registry);
|
||||
fstSliceDestroy(&b->last);
|
||||
|
@ -905,21 +905,19 @@ void* fstBuilderInsertInner(FstBuilder* b) {
|
|||
|
||||
void* pBuf64 = buf64;
|
||||
taosEncodeFixedU64(&pBuf64, b->len);
|
||||
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
|
||||
idxFileWrite(b->wrt, buf64, sizeof(buf64));
|
||||
|
||||
pBuf64 = buf64;
|
||||
taosEncodeFixedU64(&pBuf64, rootAddr);
|
||||
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
|
||||
idxFileWrite(b->wrt, buf64, sizeof(buf64));
|
||||
|
||||
char buf32[4] = {0};
|
||||
void* pBuf32 = buf32;
|
||||
uint32_t sum = fstCountingWriterMaskedCheckSum(b->wrt);
|
||||
uint32_t sum = idxFileMaskedCheckSum(b->wrt);
|
||||
taosEncodeFixedU32(&pBuf32, sum);
|
||||
fstCountingWriterWrite(b->wrt, buf32, sizeof(buf32));
|
||||
idxFileWrite(b->wrt, buf32, sizeof(buf32));
|
||||
|
||||
fstCountingWriterFlush(b->wrt);
|
||||
// fstCountingWriterDestroy(b->wrt);
|
||||
// b->wrt = NULL;
|
||||
idxFileFlush(b->wrt);
|
||||
return b->wrt;
|
||||
}
|
||||
void fstBuilderFinish(FstBuilder* b) { fstBuilderInsertInner(b); }
|
||||
|
|
|
@ -61,9 +61,10 @@ void dfaBuilderDestroy(FstDfaBuilder *builder) {
|
|||
pIter = taosHashIterate(builder->cache, pIter);
|
||||
}
|
||||
taosHashCleanup(builder->cache);
|
||||
taosMemoryFree(builder);
|
||||
}
|
||||
|
||||
FstDfa *dfaBuilderBuild(FstDfaBuilder *builder) {
|
||||
FstDfa *dfaBuilder(FstDfaBuilder *builder) {
|
||||
uint32_t sz = taosArrayGetSize(builder->dfa->insts);
|
||||
FstSparseSet *cur = sparSetCreate(sz);
|
||||
FstSparseSet *nxt = sparSetCreate(sz);
|
||||
|
|
|
@ -13,13 +13,13 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "indexFstCountingWriter.h"
|
||||
#include "indexFstFile.h"
|
||||
#include "indexFstUtil.h"
|
||||
#include "indexInt.h"
|
||||
#include "os.h"
|
||||
#include "tutil.h"
|
||||
|
||||
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
|
||||
static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
|
||||
if (ctx->type == TFile) {
|
||||
assert(len == taosWriteFile(ctx->file.pFile, buf, len));
|
||||
} else {
|
||||
|
@ -28,7 +28,7 @@ static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
|
|||
ctx->offset += len;
|
||||
return len;
|
||||
}
|
||||
static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) {
|
||||
static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
|
||||
int nRead = 0;
|
||||
if (ctx->type == TFile) {
|
||||
#ifdef USE_MMAP
|
||||
|
@ -44,7 +44,7 @@ static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) {
|
|||
|
||||
return nRead;
|
||||
}
|
||||
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) {
|
||||
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset) {
|
||||
int nRead = 0;
|
||||
if (ctx->type == TFile) {
|
||||
// tfLseek(ctx->file.pFile, offset, 0);
|
||||
|
@ -61,7 +61,7 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
|
|||
}
|
||||
return nRead;
|
||||
}
|
||||
static int writeCtxGetSize(WriterCtx* ctx) {
|
||||
static int idxFileCtxGetSize(IFileCtx* ctx) {
|
||||
if (ctx->type == TFile) {
|
||||
int64_t file_size = 0;
|
||||
taosStatFile(ctx->file.buf, &file_size, NULL);
|
||||
|
@ -69,7 +69,7 @@ static int writeCtxGetSize(WriterCtx* ctx) {
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
static int writeCtxDoFlush(WriterCtx* ctx) {
|
||||
static int idxFileCtxDoFlush(IFileCtx* ctx) {
|
||||
if (ctx->type == TFile) {
|
||||
// taosFsyncFile(ctx->file.pFile);
|
||||
taosFsyncFile(ctx->file.pFile);
|
||||
|
@ -80,8 +80,8 @@ static int writeCtxDoFlush(WriterCtx* ctx) {
|
|||
return 1;
|
||||
}
|
||||
|
||||
WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
|
||||
WriterCtx* ctx = taosMemoryCalloc(1, sizeof(WriterCtx));
|
||||
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
|
||||
IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx));
|
||||
if (ctx == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -90,39 +90,36 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
|
|||
if (ctx->type == TFile) {
|
||||
// ugly code, refactor later
|
||||
ctx->file.readOnly = readOnly;
|
||||
memcpy(ctx->file.buf, path, strlen(path));
|
||||
if (readOnly == false) {
|
||||
// ctx->file.pFile = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
taosFtruncateFile(ctx->file.pFile, 0);
|
||||
int64_t file_size;
|
||||
taosStatFile(path, &file_size, NULL);
|
||||
ctx->file.size = (int)file_size;
|
||||
taosStatFile(path, &ctx->file.size, NULL);
|
||||
// ctx->file.size = (int)size;
|
||||
|
||||
} else {
|
||||
// ctx->file.pFile = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
|
||||
|
||||
int64_t file_size = 0;
|
||||
taosFStatFile(ctx->file.pFile, &file_size, NULL);
|
||||
ctx->file.size = (int)file_size;
|
||||
int64_t size = 0;
|
||||
taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL);
|
||||
ctx->file.size = (int)size;
|
||||
#ifdef USE_MMAP
|
||||
ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.pFile, ctx->file.size);
|
||||
#endif
|
||||
}
|
||||
memcpy(ctx->file.buf, path, strlen(path));
|
||||
if (ctx->file.pFile == NULL) {
|
||||
indexError("failed to open file, error %d", errno);
|
||||
goto END;
|
||||
}
|
||||
} else if (ctx->type == TMemory) {
|
||||
ctx->mem.buf = taosMemoryCalloc(1, sizeof(char) * capacity);
|
||||
ctx->mem.capa = capacity;
|
||||
ctx->mem.cap = capacity;
|
||||
}
|
||||
ctx->write = writeCtxDoWrite;
|
||||
ctx->read = writeCtxDoRead;
|
||||
ctx->flush = writeCtxDoFlush;
|
||||
ctx->readFrom = writeCtxDoReadFrom;
|
||||
ctx->size = writeCtxGetSize;
|
||||
ctx->write = idxFileCtxDoWrite;
|
||||
ctx->read = idxFileCtxDoRead;
|
||||
ctx->flush = idxFileCtxDoFlush;
|
||||
ctx->readFrom = idxFileCtxDoReadFrom;
|
||||
ctx->size = idxFileCtxGetSize;
|
||||
|
||||
ctx->offset = 0;
|
||||
ctx->limit = capacity;
|
||||
|
@ -135,7 +132,7 @@ END:
|
|||
taosMemoryFree(ctx);
|
||||
return NULL;
|
||||
}
|
||||
void writerCtxDestroy(WriterCtx* ctx, bool remove) {
|
||||
void idxFileCtxDestroy(IFileCtx* ctx, bool remove) {
|
||||
if (ctx->type == TMemory) {
|
||||
taosMemoryFree(ctx->mem.buf);
|
||||
} else {
|
||||
|
@ -149,9 +146,6 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
|
|||
if (ctx->file.readOnly == false) {
|
||||
int64_t file_size = 0;
|
||||
taosStatFile(ctx->file.buf, &file_size, NULL);
|
||||
// struct stat fstat;
|
||||
// stat(ctx->file.buf, &fstat);
|
||||
// indexError("write file size: %d", (int)(fstat.st_size));
|
||||
}
|
||||
if (remove) {
|
||||
unlink(ctx->file.buf);
|
||||
|
@ -160,30 +154,29 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
|
|||
taosMemoryFree(ctx);
|
||||
}
|
||||
|
||||
FstCountingWriter* fstCountingWriterCreate(void* wrt) {
|
||||
FstCountingWriter* cw = taosMemoryCalloc(1, sizeof(FstCountingWriter));
|
||||
IdxFstFile* idxFileCreate(void* wrt) {
|
||||
IdxFstFile* cw = taosMemoryCalloc(1, sizeof(IdxFstFile));
|
||||
if (cw == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
cw->wrt = wrt;
|
||||
//(void *)(writerCtxCreate(TFile, readOnly));
|
||||
return cw;
|
||||
}
|
||||
void fstCountingWriterDestroy(FstCountingWriter* cw) {
|
||||
void idxFileDestroy(IdxFstFile* cw) {
|
||||
// free wrt object: close fd or free mem
|
||||
fstCountingWriterFlush(cw);
|
||||
// writerCtxDestroy((WriterCtx *)(cw->wrt));
|
||||
idxFileFlush(cw);
|
||||
// idxFileCtxDestroy((IFileCtx *)(cw->wrt));
|
||||
taosMemoryFree(cw);
|
||||
}
|
||||
|
||||
int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len) {
|
||||
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
|
||||
if (write == NULL) {
|
||||
return 0;
|
||||
}
|
||||
// update checksum
|
||||
// write data to file/socket or mem
|
||||
WriterCtx* ctx = write->wrt;
|
||||
IFileCtx* ctx = write->wrt;
|
||||
|
||||
int nWrite = ctx->write(ctx, buf, len);
|
||||
assert(nWrite == len);
|
||||
|
@ -192,42 +185,41 @@ int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len)
|
|||
write->summer = taosCalcChecksum(write->summer, buf, len);
|
||||
return len;
|
||||
}
|
||||
int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) {
|
||||
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
|
||||
if (write == NULL) {
|
||||
return 0;
|
||||
}
|
||||
WriterCtx* ctx = write->wrt;
|
||||
int nRead = ctx->read(ctx, buf, len);
|
||||
IFileCtx* ctx = write->wrt;
|
||||
int nRead = ctx->read(ctx, buf, len);
|
||||
// assert(nRead == len);
|
||||
return nRead;
|
||||
}
|
||||
|
||||
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) {
|
||||
uint32_t idxFileMaskedCheckSum(IdxFstFile* write) {
|
||||
// opt
|
||||
return write->summer;
|
||||
}
|
||||
|
||||
int fstCountingWriterFlush(FstCountingWriter* write) {
|
||||
WriterCtx* ctx = write->wrt;
|
||||
int idxFileFlush(IdxFstFile* write) {
|
||||
IFileCtx* ctx = write->wrt;
|
||||
ctx->flush(ctx);
|
||||
// write->wtr->flush
|
||||
return 1;
|
||||
}
|
||||
|
||||
void fstCountingWriterPackUintIn(FstCountingWriter* writer, uint64_t n, uint8_t nBytes) {
|
||||
void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes) {
|
||||
assert(1 <= nBytes && nBytes <= 8);
|
||||
uint8_t* buf = taosMemoryCalloc(8, sizeof(uint8_t));
|
||||
for (uint8_t i = 0; i < nBytes; i++) {
|
||||
buf[i] = (uint8_t)n;
|
||||
n = n >> 8;
|
||||
}
|
||||
fstCountingWriterWrite(writer, buf, nBytes);
|
||||
idxFileWrite(writer, buf, nBytes);
|
||||
taosMemoryFree(buf);
|
||||
return;
|
||||
}
|
||||
|
||||
uint8_t fstCountingWriterPackUint(FstCountingWriter* writer, uint64_t n) {
|
||||
uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n) {
|
||||
uint8_t nBytes = packSize(n);
|
||||
fstCountingWriterPackUintIn(writer, n, nBytes);
|
||||
idxFilePackUintIn(writer, n, nBytes);
|
||||
return nBytes;
|
||||
}
|
|
@ -95,7 +95,7 @@ void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) {
|
|||
}
|
||||
}
|
||||
|
||||
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr
|
||||
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile *wrt, CompiledAddr lastAddr, CompiledAddr
|
||||
// startAddr) {
|
||||
|
||||
// size_t sz = taosArrayGetSize(b->trans);
|
||||
|
|
|
@ -75,7 +75,6 @@ CompiledAddr unpackDelta(char* data, uint64_t len, uint64_t nodeAddr) {
|
|||
}
|
||||
|
||||
// fst slice func
|
||||
//
|
||||
|
||||
FstSlice fstSliceCreate(uint8_t* data, uint64_t len) {
|
||||
FstString* str = (FstString*)taosMemoryMalloc(sizeof(FstString));
|
||||
|
@ -164,16 +163,3 @@ int fstSliceCompare(FstSlice* a, FstSlice* b) {
|
|||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
// FstStack* fstStackCreate(size_t elemSize, StackFreeElem freeFn) {
|
||||
// FstStack *s = taosMemoryCalloc(1, sizeof(FstStack));
|
||||
// if (s == NULL) { return NULL; }
|
||||
// s->
|
||||
// s->freeFn
|
||||
//
|
||||
//}
|
||||
// void *fstStackPush(FstStack *s, void *elem);
|
||||
// void *fstStackTop(FstStack *s);
|
||||
// size_t fstStackLen(FstStack *s);
|
||||
// void *fstStackGetAt(FstStack *s, size_t i);
|
||||
// void fstStackDestory(FstStack *);
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#include "index.h"
|
||||
#include "indexComm.h"
|
||||
#include "indexFst.h"
|
||||
#include "indexFstCountingWriter.h"
|
||||
#include "indexFstFile.h"
|
||||
#include "indexUtil.h"
|
||||
#include "taosdef.h"
|
||||
#include "taoserror.h"
|
||||
|
@ -103,7 +103,7 @@ TFileCache* tfileCacheCreate(const char* path) {
|
|||
for (size_t i = 0; i < taosArrayGetSize(files); i++) {
|
||||
char* file = taosArrayGetP(files, i);
|
||||
|
||||
WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64);
|
||||
IFileCtx* wc = idxFileCtxCreate(TFile, file, true, 1024 * 1024 * 64);
|
||||
if (wc == NULL) {
|
||||
indexError("failed to open index:%s", file);
|
||||
goto End;
|
||||
|
@ -175,7 +175,7 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
|
|||
tfileReaderRef(reader);
|
||||
return;
|
||||
}
|
||||
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
||||
TFileReader* tfileReaderCreate(IFileCtx* ctx) {
|
||||
TFileReader* reader = taosMemoryCalloc(1, sizeof(TFileReader));
|
||||
if (reader == NULL) {
|
||||
return NULL;
|
||||
|
@ -216,7 +216,7 @@ void tfileReaderDestroy(TFileReader* reader) {
|
|||
} else {
|
||||
indexInfo("%s is not removed", reader->ctx->file.buf);
|
||||
}
|
||||
writerCtxDestroy(reader->ctx, reader->remove);
|
||||
idxFileCtxDestroy(reader->ctx, reader->remove);
|
||||
|
||||
taosMemoryFree(reader);
|
||||
}
|
||||
|
@ -490,7 +490,7 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
|
|||
char fullname[256] = {0};
|
||||
tfileGenFileFullName(fullname, path, suid, colName, version);
|
||||
// indexInfo("open write file name %s", fullname);
|
||||
WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
|
||||
IFileCtx* wcx = idxFileCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
|
||||
if (wcx == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -507,18 +507,18 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const c
|
|||
char fullname[256] = {0};
|
||||
tfileGenFileFullName(fullname, path, suid, colName, version);
|
||||
|
||||
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
|
||||
IFileCtx* wc = idxFileCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
|
||||
if (wc == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr());
|
||||
return NULL;
|
||||
}
|
||||
indexTrace("open read file name:%s, file size: %d", wc->file.buf, wc->file.size);
|
||||
indexTrace("open read file name:%s, file size: %" PRId64 "", wc->file.buf, wc->file.size);
|
||||
|
||||
TFileReader* reader = tfileReaderCreate(wc);
|
||||
return reader;
|
||||
}
|
||||
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
|
||||
TFileWriter* tfileWriterCreate(IFileCtx* ctx, TFileHeader* header) {
|
||||
TFileWriter* tw = taosMemoryCalloc(1, sizeof(TFileWriter));
|
||||
if (tw == NULL) {
|
||||
indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid);
|
||||
|
@ -609,14 +609,14 @@ void tfileWriterClose(TFileWriter* tw) {
|
|||
if (tw == NULL) {
|
||||
return;
|
||||
}
|
||||
writerCtxDestroy(tw->ctx, false);
|
||||
idxFileCtxDestroy(tw->ctx, false);
|
||||
taosMemoryFree(tw);
|
||||
}
|
||||
void tfileWriterDestroy(TFileWriter* tw) {
|
||||
if (tw == NULL) {
|
||||
return;
|
||||
}
|
||||
writerCtxDestroy(tw->ctx, false);
|
||||
idxFileCtxDestroy(tw->ctx, false);
|
||||
taosMemoryFree(tw);
|
||||
}
|
||||
|
||||
|
@ -892,8 +892,8 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
|
|||
return 0;
|
||||
}
|
||||
static int tfileReaderLoadFst(TFileReader* reader) {
|
||||
WriterCtx* ctx = reader->ctx;
|
||||
int size = ctx->size(ctx);
|
||||
IFileCtx* ctx = reader->ctx;
|
||||
int size = ctx->size(ctx);
|
||||
|
||||
// current load fst into memory, refactor it later
|
||||
int fstSize = size - reader->header.fstOffset - sizeof(tfileMagicNumber);
|
||||
|
@ -905,8 +905,9 @@ static int tfileReaderLoadFst(TFileReader* reader) {
|
|||
int64_t ts = taosGetTimestampUs();
|
||||
int32_t nread = ctx->readFrom(ctx, buf, fstSize, reader->header.fstOffset);
|
||||
int64_t cost = taosGetTimestampUs() - ts;
|
||||
indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %d, time cost: %" PRId64 "us", nread,
|
||||
reader->header.fstOffset, fstSize, ctx->file.buf, ctx->file.size, cost);
|
||||
indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %" PRId64 ", time cost: %" PRId64
|
||||
"us",
|
||||
nread, reader->header.fstOffset, fstSize, ctx->file.buf, ctx->file.size, cost);
|
||||
// we assuse fst size less than FST_MAX_SIZE
|
||||
assert(nread > 0 && nread <= fstSize);
|
||||
|
||||
|
@ -919,7 +920,7 @@ static int tfileReaderLoadFst(TFileReader* reader) {
|
|||
}
|
||||
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
|
||||
// TODO(yihao): opt later
|
||||
WriterCtx* ctx = reader->ctx;
|
||||
IFileCtx* ctx = reader->ctx;
|
||||
// add block cache
|
||||
char block[4096] = {0};
|
||||
int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset);
|
||||
|
@ -952,7 +953,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
|
|||
}
|
||||
static int tfileReaderVerify(TFileReader* reader) {
|
||||
// just validate header and Footer, file corrupted also shuild be verified later
|
||||
WriterCtx* ctx = reader->ctx;
|
||||
IFileCtx* ctx = reader->ctx;
|
||||
|
||||
uint64_t tMagicNumber = 0;
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@
|
|||
#include "index.h"
|
||||
#include "indexCache.h"
|
||||
#include "indexFst.h"
|
||||
#include "indexFstCountingWriter.h"
|
||||
#include "indexFstUtil.h"
|
||||
#include "indexInt.h"
|
||||
#include "indexTfile.h"
|
||||
|
@ -20,7 +19,7 @@ class FstWriter {
|
|||
public:
|
||||
FstWriter() {
|
||||
taosRemoveFile(fileName.c_str());
|
||||
_wc = writerCtxCreate(TFile, fileName.c_str(), false, 64 * 1024 * 1024);
|
||||
_wc = idxFileCtxCreate(TFile, fileName.c_str(), false, 64 * 1024 * 1024);
|
||||
_b = fstBuilderCreate(_wc, 0);
|
||||
}
|
||||
bool Put(const std::string& key, uint64_t val) {
|
||||
|
@ -38,25 +37,25 @@ class FstWriter {
|
|||
fstBuilderFinish(_b);
|
||||
fstBuilderDestroy(_b);
|
||||
|
||||
writerCtxDestroy(_wc, false);
|
||||
idxFileCtxDestroy(_wc, false);
|
||||
}
|
||||
|
||||
private:
|
||||
FstBuilder* _b;
|
||||
WriterCtx* _wc;
|
||||
IFileCtx* _wc;
|
||||
};
|
||||
|
||||
class FstReadMemory {
|
||||
public:
|
||||
FstReadMemory(int32_t size, const std::string& fileName = TD_TMP_DIR_PATH "tindex.tindex") {
|
||||
_wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024);
|
||||
_w = fstCountingWriterCreate(_wc);
|
||||
_wc = idxFileCtxCreate(TFile, fileName.c_str(), true, 64 * 1024);
|
||||
_w = idxFileCreate(_wc);
|
||||
_size = size;
|
||||
memset((void*)&_s, 0, sizeof(_s));
|
||||
}
|
||||
bool init() {
|
||||
char* buf = (char*)taosMemoryCalloc(1, sizeof(char) * _size);
|
||||
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
|
||||
int nRead = idxFileRead(_w, (uint8_t*)buf, _size);
|
||||
if (nRead <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
@ -141,18 +140,18 @@ class FstReadMemory {
|
|||
}
|
||||
|
||||
~FstReadMemory() {
|
||||
fstCountingWriterDestroy(_w);
|
||||
idxFileDestroy(_w);
|
||||
fstDestroy(_fst);
|
||||
fstSliceDestroy(&_s);
|
||||
writerCtxDestroy(_wc, false);
|
||||
idxFileCtxDestroy(_wc, false);
|
||||
}
|
||||
|
||||
private:
|
||||
FstCountingWriter* _w;
|
||||
Fst* _fst;
|
||||
FstSlice _s;
|
||||
WriterCtx* _wc;
|
||||
int32_t _size;
|
||||
IdxFstFile* _w;
|
||||
Fst* _fst;
|
||||
FstSlice _s;
|
||||
IFileCtx* _wc;
|
||||
int32_t _size;
|
||||
};
|
||||
|
||||
#define L 100
|
||||
|
|
|
@ -8,7 +8,6 @@
|
|||
#include "index.h"
|
||||
#include "indexCache.h"
|
||||
#include "indexFst.h"
|
||||
#include "indexFstCountingWriter.h"
|
||||
#include "indexFstUtil.h"
|
||||
#include "indexInt.h"
|
||||
#include "indexTfile.h"
|
||||
|
@ -40,7 +39,7 @@ static void EnvCleanup() {}
|
|||
class FstWriter {
|
||||
public:
|
||||
FstWriter() {
|
||||
_wc = writerCtxCreate(TFile, tindex, false, 64 * 1024 * 1024);
|
||||
_wc = idxFileCtxCreate(TFile, tindex, false, 64 * 1024 * 1024);
|
||||
_b = fstBuilderCreate(_wc, 0);
|
||||
}
|
||||
bool Put(const std::string& key, uint64_t val) {
|
||||
|
@ -58,25 +57,25 @@ class FstWriter {
|
|||
fstBuilderFinish(_b);
|
||||
fstBuilderDestroy(_b);
|
||||
|
||||
writerCtxDestroy(_wc, false);
|
||||
idxFileCtxDestroy(_wc, false);
|
||||
}
|
||||
|
||||
private:
|
||||
FstBuilder* _b;
|
||||
WriterCtx* _wc;
|
||||
IFileCtx* _wc;
|
||||
};
|
||||
|
||||
class FstReadMemory {
|
||||
public:
|
||||
FstReadMemory(size_t size) {
|
||||
_wc = writerCtxCreate(TFile, tindex, true, 64 * 1024);
|
||||
_w = fstCountingWriterCreate(_wc);
|
||||
_wc = idxFileCtxCreate(TFile, tindex, true, 64 * 1024);
|
||||
_w = idxFileCreate(_wc);
|
||||
_size = size;
|
||||
memset((void*)&_s, 0, sizeof(_s));
|
||||
}
|
||||
bool init() {
|
||||
char* buf = (char*)taosMemoryCalloc(1, sizeof(char) * _size);
|
||||
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
|
||||
int nRead = idxFileRead(_w, (uint8_t*)buf, _size);
|
||||
if (nRead <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
@ -130,18 +129,18 @@ class FstReadMemory {
|
|||
}
|
||||
|
||||
~FstReadMemory() {
|
||||
fstCountingWriterDestroy(_w);
|
||||
idxFileDestroy(_w);
|
||||
fstDestroy(_fst);
|
||||
fstSliceDestroy(&_s);
|
||||
writerCtxDestroy(_wc, false);
|
||||
idxFileCtxDestroy(_wc, false);
|
||||
}
|
||||
|
||||
private:
|
||||
FstCountingWriter* _w;
|
||||
Fst* _fst;
|
||||
FstSlice _s;
|
||||
WriterCtx* _wc;
|
||||
size_t _size;
|
||||
IdxFstFile* _w;
|
||||
Fst* _fst;
|
||||
FstSlice _s;
|
||||
IFileCtx* _wc;
|
||||
size_t _size;
|
||||
};
|
||||
|
||||
class FstWriterEnv : public ::testing::Test {
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
#include "index.h"
|
||||
#include "indexCache.h"
|
||||
#include "indexFst.h"
|
||||
#include "indexFstCountingWriter.h"
|
||||
#include "indexFstUtil.h"
|
||||
#include "indexInt.h"
|
||||
#include "indexTfile.h"
|
||||
|
@ -51,7 +50,7 @@ class DebugInfo {
|
|||
class FstWriter {
|
||||
public:
|
||||
FstWriter() {
|
||||
_wc = writerCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", false, 64 * 1024 * 1024);
|
||||
_wc = idxFileCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", false, 64 * 1024 * 1024);
|
||||
_b = fstBuilderCreate(NULL, 0);
|
||||
}
|
||||
bool Put(const std::string& key, uint64_t val) {
|
||||
|
@ -64,25 +63,25 @@ class FstWriter {
|
|||
fstBuilderFinish(_b);
|
||||
fstBuilderDestroy(_b);
|
||||
|
||||
writerCtxDestroy(_wc, false);
|
||||
idxFileCtxDestroy(_wc, false);
|
||||
}
|
||||
|
||||
private:
|
||||
FstBuilder* _b;
|
||||
WriterCtx* _wc;
|
||||
IFileCtx* _wc;
|
||||
};
|
||||
|
||||
class FstReadMemory {
|
||||
public:
|
||||
FstReadMemory(size_t size) {
|
||||
_wc = writerCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", true, 64 * 1024);
|
||||
_w = fstCountingWriterCreate(_wc);
|
||||
_wc = idxFileCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", true, 64 * 1024);
|
||||
_w = idxFileCreate(_wc);
|
||||
_size = size;
|
||||
memset((void*)&_s, 0, sizeof(_s));
|
||||
}
|
||||
bool init() {
|
||||
char* buf = (char*)taosMemoryCalloc(1, sizeof(char) * _size);
|
||||
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
|
||||
int nRead = idxFileRead(_w, (uint8_t*)buf, _size);
|
||||
if (nRead <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
@ -124,18 +123,18 @@ class FstReadMemory {
|
|||
}
|
||||
|
||||
~FstReadMemory() {
|
||||
fstCountingWriterDestroy(_w);
|
||||
idxFileDestroy(_w);
|
||||
fstDestroy(_fst);
|
||||
fstSliceDestroy(&_s);
|
||||
writerCtxDestroy(_wc, true);
|
||||
idxFileCtxDestroy(_wc, true);
|
||||
}
|
||||
|
||||
private:
|
||||
FstCountingWriter* _w;
|
||||
Fst* _fst;
|
||||
FstSlice _s;
|
||||
WriterCtx* _wc;
|
||||
size_t _size;
|
||||
IdxFstFile* _w;
|
||||
Fst* _fst;
|
||||
FstSlice _s;
|
||||
IFileCtx* _wc;
|
||||
size_t _size;
|
||||
};
|
||||
|
||||
#define L 100
|
||||
|
@ -392,13 +391,13 @@ class TFileObj {
|
|||
|
||||
fileName_ = path;
|
||||
|
||||
WriterCtx* ctx = writerCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024);
|
||||
IFileCtx* ctx = idxFileCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024);
|
||||
|
||||
writer_ = tfileWriterCreate(ctx, &header);
|
||||
return writer_ != NULL ? true : false;
|
||||
}
|
||||
bool InitReader() {
|
||||
WriterCtx* ctx = writerCtxCreate(TFile, fileName_.c_str(), true, 64 * 1024 * 1024);
|
||||
IFileCtx* ctx = idxFileCtxCreate(TFile, fileName_.c_str(), true, 64 * 1024 * 1024);
|
||||
reader_ = tfileReaderCreate(ctx);
|
||||
return reader_ != NULL ? true : false;
|
||||
}
|
||||
|
|
|
@ -7,7 +7,6 @@
|
|||
#include "index.h"
|
||||
#include "indexCache.h"
|
||||
#include "indexFst.h"
|
||||
#include "indexFstCountingWriter.h"
|
||||
#include "indexFstUtil.h"
|
||||
#include "indexInt.h"
|
||||
#include "indexTfile.h"
|
||||
|
|
|
@ -8,7 +8,6 @@
|
|||
#include "indexCache.h"
|
||||
#include "indexComm.h"
|
||||
#include "indexFst.h"
|
||||
#include "indexFstCountingWriter.h"
|
||||
#include "indexFstUtil.h"
|
||||
#include "indexInt.h"
|
||||
#include "indexTfile.h"
|
||||
|
|
|
@ -479,6 +479,10 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
|
|||
}
|
||||
return true;
|
||||
}
|
||||
static int32_t transGetRefMgt() {
|
||||
//
|
||||
return refMgt;
|
||||
}
|
||||
|
||||
static void transInitEnv() {
|
||||
refMgt = transOpenExHandleMgt(50000);
|
||||
|
@ -486,8 +490,9 @@ static void transInitEnv() {
|
|||
}
|
||||
static void transDestroyEnv() {
|
||||
// close ref
|
||||
transCloseExHandleMgt(refMgt);
|
||||
transCloseExHandleMgt();
|
||||
}
|
||||
|
||||
void transInit() {
|
||||
// init env
|
||||
taosThreadOnce(&transModuleInit, transInitEnv);
|
||||
|
@ -502,25 +507,25 @@ int32_t transOpenExHandleMgt(int size) {
|
|||
}
|
||||
void transCloseExHandleMgt() {
|
||||
// close ref
|
||||
taosCloseRef(refMgt);
|
||||
taosCloseRef(transGetRefMgt());
|
||||
}
|
||||
int64_t transAddExHandle(void* p) {
|
||||
// acquire extern handle
|
||||
return taosAddRef(refMgt, p);
|
||||
return taosAddRef(transGetRefMgt(), p);
|
||||
}
|
||||
int32_t transRemoveExHandle(int64_t refId) {
|
||||
// acquire extern handle
|
||||
return taosRemoveRef(refMgt, refId);
|
||||
return taosRemoveRef(transGetRefMgt(), refId);
|
||||
}
|
||||
|
||||
SExHandle* transAcquireExHandle(int64_t refId) {
|
||||
// acquire extern handle
|
||||
return (SExHandle*)taosAcquireRef(refMgt, refId);
|
||||
return (SExHandle*)taosAcquireRef(transGetRefMgt(), refId);
|
||||
}
|
||||
|
||||
int32_t transReleaseExHandle(int64_t refId) {
|
||||
// release extern handle
|
||||
return taosReleaseRef(refMgt, refId);
|
||||
return taosReleaseRef(transGetRefMgt(), refId);
|
||||
}
|
||||
void transDestoryExHandle(void* handle) {
|
||||
if (handle == NULL) {
|
||||
|
|
|
@ -599,6 +599,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset
|
|||
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Invalid index file")
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
|
||||
|
||||
|
|
Loading…
Reference in New Issue