Merge pull request #9527 from taosdata/feature/index_cache
Feature/index cache
This commit is contained in:
commit
efc3de3dda
|
@ -38,6 +38,7 @@ int64_t tfOpenCreateWriteAppend(const char *pathname);
|
|||
int64_t tfClose(int64_t tfd);
|
||||
int64_t tfWrite(int64_t tfd, void *buf, int64_t count);
|
||||
int64_t tfRead(int64_t tfd, void *buf, int64_t count);
|
||||
int64_t tfPread(int64_t tfd, void *buf, int64_t count, int64_t offset);
|
||||
int32_t tfFsync(int64_t tfd);
|
||||
bool tfValid(int64_t tfd);
|
||||
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence);
|
||||
|
@ -47,4 +48,4 @@ int32_t tfFtruncate(int64_t tfd, int64_t length);
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_UTIL_FILE_H*/
|
||||
#endif /*_TD_UTIL_FILE_H*/
|
||||
|
|
|
@ -142,7 +142,8 @@ uint64_t fstStateInputLen(FstState* state);
|
|||
// end_addr
|
||||
uint64_t fstStateEndAddrForOneTransNext(FstState* state, FstSlice* data);
|
||||
uint64_t fstStateEndAddrForOneTrans(FstState* state, FstSlice* data, PackSizes sizes);
|
||||
uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, uint64_t nTrans);
|
||||
uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes,
|
||||
uint64_t nTrans);
|
||||
// input
|
||||
uint8_t fstStateInput(FstState* state, FstNode* node);
|
||||
uint8_t fstStateInputForAnyTrans(FstState* state, FstNode* node, uint64_t i);
|
||||
|
@ -255,9 +256,10 @@ typedef struct FstMeta {
|
|||
} FstMeta;
|
||||
|
||||
typedef struct Fst {
|
||||
FstMeta* meta;
|
||||
FstSlice* data; //
|
||||
FstNode* root; //
|
||||
FstMeta* meta;
|
||||
FstSlice* data; //
|
||||
FstNode* root; //
|
||||
pthread_mutex_t mtx;
|
||||
} Fst;
|
||||
|
||||
// refactor simple function
|
||||
|
@ -310,7 +312,8 @@ StreamWithStateResult* swsResultCreate(FstSlice* data, FstOutput fOut, void* sta
|
|||
void swsResultDestroy(StreamWithStateResult* result);
|
||||
|
||||
typedef void* (*StreamCallback)(void*);
|
||||
StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, FstBoundWithData* max);
|
||||
StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min,
|
||||
FstBoundWithData* max);
|
||||
|
||||
void streamWithStateDestroy(StreamWithState* sws);
|
||||
|
||||
|
|
|
@ -77,6 +77,7 @@ typedef struct TFileReader {
|
|||
Fst* fst;
|
||||
WriterCtx* ctx;
|
||||
TFileHeader header;
|
||||
bool remove;
|
||||
} TFileReader;
|
||||
|
||||
typedef struct IndexTFile {
|
||||
|
|
|
@ -94,7 +94,6 @@ void indexClose(SIndex* sIdx) {
|
|||
#endif
|
||||
|
||||
#ifdef USE_INVERTED_INDEX
|
||||
indexCacheDestroy(sIdx->cache);
|
||||
void* iter = taosHashIterate(sIdx->colObj, NULL);
|
||||
while (iter) {
|
||||
IndexCache** pCache = iter;
|
||||
|
@ -104,6 +103,7 @@ void indexClose(SIndex* sIdx) {
|
|||
taosHashCleanup(sIdx->colObj);
|
||||
pthread_mutex_destroy(&sIdx->mtx);
|
||||
#endif
|
||||
free(sIdx->path);
|
||||
free(sIdx);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
|
||||
#define MAX_INDEX_KEY_LEN 256 // test only, change later
|
||||
|
||||
#define MEM_TERM_LIMIT 200
|
||||
#define MEM_TERM_LIMIT 10000 * 10
|
||||
// ref index_cache.h:22
|
||||
//#define CACHE_KEY_LEN(p) \
|
||||
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
|
||||
|
@ -110,7 +110,10 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
|
|||
while (tSkipListIterNext(iter)) {
|
||||
SSkipListNode* node = tSkipListIterGet(iter);
|
||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||
if (ct != NULL) {}
|
||||
if (ct != NULL) {
|
||||
free(ct->colVal);
|
||||
free(ct);
|
||||
}
|
||||
}
|
||||
tSkipListDestroyIter(iter);
|
||||
tSkipListDestroy(slt);
|
||||
|
@ -271,7 +274,7 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
|
|||
SIndexTerm* term = query->term;
|
||||
EIndexQueryType qtype = query->qType;
|
||||
CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)};
|
||||
indexCacheDebug(pCache);
|
||||
// indexCacheDebug(pCache);
|
||||
|
||||
int ret = indexQueryMem(mem, &ct, qtype, result, s);
|
||||
if (ret == 0 && *s != kTypeDeletion) {
|
||||
|
|
|
@ -354,7 +354,8 @@ uint64_t fstStateEndAddrForOneTrans(FstState* s, FstSlice* data, PackSizes sizes
|
|||
return FST_SLICE_LEN(data) - 1 - fstStateInputLen(s) - 1 // pack size
|
||||
- FST_GET_TRANSITION_PACK_SIZE(sizes) - FST_GET_OUTPUT_PACK_SIZE(sizes);
|
||||
}
|
||||
uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, uint64_t nTrans) {
|
||||
uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes,
|
||||
uint64_t nTrans) {
|
||||
uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(sizes);
|
||||
uint8_t finalOsize = !fstStateIsFinalState(state) ? 0 : oSizes;
|
||||
return FST_SLICE_LEN(date) - 1 - fstStateNtransLen(state) - 1 // pack size
|
||||
|
@ -403,8 +404,8 @@ CompiledAddr fstStateTransAddrForAnyTrans(FstState* s, FstNode* node, uint64_t i
|
|||
|
||||
FstSlice* slice = &node->data;
|
||||
uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes);
|
||||
uint64_t at = node->start - fstStateNtransLen(s) - 1 - fstStateTransIndexSize(s, node->version, node->nTrans) - node->nTrans -
|
||||
(i * tSizes) - tSizes;
|
||||
uint64_t at = node->start - fstStateNtransLen(s) - 1 - fstStateTransIndexSize(s, node->version, node->nTrans) -
|
||||
node->nTrans - (i * tSizes) - tSizes;
|
||||
uint8_t* data = fstSliceData(slice, NULL);
|
||||
return unpackDelta(data + at, tSizes, node->end);
|
||||
}
|
||||
|
@ -595,7 +596,8 @@ FstNode* fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice* slice) {
|
|||
n->isFinal = fstStateIsFinalState(&st); // s.is_final_state();
|
||||
n->nTrans = nTrans;
|
||||
n->sizes = sz;
|
||||
n->finalOutput = fstStateFinalOutput(&st, version, &data, sz, nTrans); // s.final_output(version, data, sz, ntrans);
|
||||
n->finalOutput =
|
||||
fstStateFinalOutput(&st, version, &data, sz, nTrans); // s.final_output(version, data, sz, ntrans);
|
||||
}
|
||||
return n;
|
||||
}
|
||||
|
@ -875,9 +877,7 @@ void* fstBuilderInsertInner(FstBuilder* b) {
|
|||
// b->wrt = NULL;
|
||||
return b->wrt;
|
||||
}
|
||||
void fstBuilderFinish(FstBuilder* b) {
|
||||
fstBuilderInsertInner(b);
|
||||
}
|
||||
void fstBuilderFinish(FstBuilder* b) { fstBuilderInsertInner(b); }
|
||||
|
||||
FstSlice fstNodeAsSlice(FstNode* node) {
|
||||
FstSlice* slice = &node->data;
|
||||
|
@ -894,9 +894,7 @@ FstLastTransition* fstLastTransitionCreate(uint8_t inp, Output out) {
|
|||
return trn;
|
||||
}
|
||||
|
||||
void fstLastTransitionDestroy(FstLastTransition* trn) {
|
||||
free(trn);
|
||||
}
|
||||
void fstLastTransitionDestroy(FstLastTransition* trn) { free(trn); }
|
||||
void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, CompiledAddr addr) {
|
||||
FstLastTransition* trn = unNode->last;
|
||||
if (trn == NULL) { return; }
|
||||
|
@ -959,9 +957,10 @@ Fst* fstCreate(FstSlice* slice) {
|
|||
fst->meta->checkSum = checkSum;
|
||||
|
||||
FstSlice* s = calloc(1, sizeof(FstSlice));
|
||||
*s = fstSliceCopy(slice, 0, FST_SLICE_LEN(slice));
|
||||
*s = fstSliceCopy(slice, 0, FST_SLICE_LEN(slice) - 1);
|
||||
fst->data = s;
|
||||
|
||||
pthread_mutex_init(&fst->mtx, NULL);
|
||||
return fst;
|
||||
|
||||
FST_CREAT_FAILED:
|
||||
|
@ -973,14 +972,18 @@ void fstDestroy(Fst* fst) {
|
|||
free(fst->meta);
|
||||
fstSliceDestroy(fst->data);
|
||||
free(fst->data);
|
||||
pthread_mutex_destroy(&fst->mtx);
|
||||
}
|
||||
free(fst);
|
||||
}
|
||||
|
||||
bool fstGet(Fst* fst, FstSlice* b, Output* out) {
|
||||
// dec lock range
|
||||
pthread_mutex_lock(&fst->mtx);
|
||||
FstNode* root = fstGetRoot(fst);
|
||||
Output tOut = 0;
|
||||
int32_t len;
|
||||
|
||||
uint8_t* data = fstSliceData(b, &len);
|
||||
|
||||
SArray* nodes = (SArray*)taosArrayInit(len, sizeof(FstNode*));
|
||||
|
@ -988,7 +991,10 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
|
|||
for (uint32_t i = 0; i < len; i++) {
|
||||
uint8_t inp = data[i];
|
||||
Output res = 0;
|
||||
if (false == fstNodeFindInput(root, inp, &res)) { return false; }
|
||||
if (false == fstNodeFindInput(root, inp, &res)) {
|
||||
pthread_mutex_unlock(&fst->mtx);
|
||||
return false;
|
||||
}
|
||||
|
||||
FstTransition trn;
|
||||
fstNodeGetTransitionAt(root, res, &trn);
|
||||
|
@ -997,6 +1003,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
|
|||
taosArrayPush(nodes, &root);
|
||||
}
|
||||
if (!FST_NODE_IS_FINAL(root)) {
|
||||
pthread_mutex_unlock(&fst->mtx);
|
||||
return false;
|
||||
} else {
|
||||
tOut = tOut + FST_NODE_FINAL_OUTPUT(root);
|
||||
|
@ -1007,13 +1014,13 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
|
|||
fstNodeDestroy(*node);
|
||||
}
|
||||
taosArrayDestroy(nodes);
|
||||
|
||||
fst->root = NULL;
|
||||
pthread_mutex_unlock(&fst->mtx);
|
||||
*out = tOut;
|
||||
|
||||
return true;
|
||||
}
|
||||
FstStreamBuilder* fstSearch(Fst* fst, AutomationCtx* ctx) {
|
||||
// refactor later
|
||||
return fstStreamBuilderCreate(fst, ctx);
|
||||
}
|
||||
StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) {
|
||||
|
@ -1021,24 +1028,30 @@ StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) {
|
|||
return streamWithStateCreate(sb->fst, sb->aut, sb->min, sb->max);
|
||||
}
|
||||
FstStreamWithStateBuilder* fstSearchWithState(Fst* fst, AutomationCtx* ctx) {
|
||||
// refactor later
|
||||
return fstStreamBuilderCreate(fst, ctx);
|
||||
}
|
||||
|
||||
FstNode* fstGetRoot(Fst* fst) {
|
||||
if (fst->root != NULL) { return fst->root; }
|
||||
CompiledAddr rAddr = fstGetRootAddr(fst);
|
||||
fst->root = fstGetNode(fst, rAddr);
|
||||
return fst->root;
|
||||
return fstGetNode(fst, rAddr);
|
||||
// pthread_mutex_lock(&fst->mtx);
|
||||
// if (fst->root != NULL) {
|
||||
// // pthread_mutex_unlock(&fst->mtx);
|
||||
// return fst->root;
|
||||
//}
|
||||
// CompiledAddr rAddr = fstGetRootAddr(fst);
|
||||
// fst->root = fstGetNode(fst, rAddr);
|
||||
//// pthread_mutex_unlock(&fst->mtx);
|
||||
// return fst->root;
|
||||
}
|
||||
|
||||
FstNode* fstGetNode(Fst* fst, CompiledAddr addr) {
|
||||
// refactor later
|
||||
return fstNodeCreate(fst->meta->version, addr, fst->data);
|
||||
}
|
||||
FstType fstGetType(Fst* fst) {
|
||||
return fst->meta->ty;
|
||||
}
|
||||
CompiledAddr fstGetRootAddr(Fst* fst) {
|
||||
return fst->meta->rootAddr;
|
||||
}
|
||||
FstType fstGetType(Fst* fst) { return fst->meta->ty; }
|
||||
CompiledAddr fstGetRootAddr(Fst* fst) { return fst->meta->rootAddr; }
|
||||
|
||||
Output fstEmptyFinalOutput(Fst* fst, bool* null) {
|
||||
Output res = 0;
|
||||
|
@ -1053,8 +1066,7 @@ Output fstEmptyFinalOutput(Fst* fst, bool* null) {
|
|||
}
|
||||
|
||||
bool fstVerify(Fst* fst) {
|
||||
uint32_t checkSum = fst->meta->checkSum;
|
||||
int32_t len;
|
||||
uint32_t len, checkSum = fst->meta->checkSum;
|
||||
uint8_t* data = fstSliceData(fst->data, &len);
|
||||
TSCKSUM initSum = 0;
|
||||
if (!taosCheckChecksumWhole(data, len)) { return false; }
|
||||
|
@ -1094,15 +1106,12 @@ bool fstBoundWithDataIsEmpty(FstBoundWithData* bound) {
|
|||
}
|
||||
}
|
||||
|
||||
bool fstBoundWithDataIsIncluded(FstBoundWithData* bound) {
|
||||
return bound->type == Excluded ? false : true;
|
||||
}
|
||||
bool fstBoundWithDataIsIncluded(FstBoundWithData* bound) { return bound->type == Excluded ? false : true; }
|
||||
|
||||
void fstBoundDestroy(FstBoundWithData* bound) {
|
||||
free(bound);
|
||||
}
|
||||
void fstBoundDestroy(FstBoundWithData* bound) { free(bound); }
|
||||
|
||||
StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, FstBoundWithData* max) {
|
||||
StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min,
|
||||
FstBoundWithData* max) {
|
||||
StreamWithState* sws = calloc(1, sizeof(StreamWithState));
|
||||
if (sws == NULL) { return NULL; }
|
||||
|
||||
|
@ -1131,7 +1140,9 @@ void streamWithStateDestroy(StreamWithState* sws) {
|
|||
bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) {
|
||||
AutomationCtx* aut = sws->aut;
|
||||
if (fstBoundWithDataIsEmpty(min)) {
|
||||
if (fstBoundWithDataIsIncluded(min)) { sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null)); }
|
||||
if (fstBoundWithDataIsIncluded(min)) {
|
||||
sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null));
|
||||
}
|
||||
StreamState s = {.node = fstGetRoot(sws->fst),
|
||||
.trans = 0,
|
||||
.out = {.null = false, .out = 0},
|
||||
|
@ -1203,7 +1214,8 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) {
|
|||
uint64_t trans = s->trans;
|
||||
FstTransition trn;
|
||||
fstNodeGetTransitionAt(n, trans - 1, &trn);
|
||||
StreamState s = {.node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState};
|
||||
StreamState s = {
|
||||
.node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState};
|
||||
taosArrayPush(sws->stack, &s);
|
||||
return true;
|
||||
}
|
||||
|
@ -1260,9 +1272,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
|
|||
|
||||
size_t isz = taosArrayGetSize(sws->inp);
|
||||
uint8_t* buf = (uint8_t*)malloc(isz * sizeof(uint8_t));
|
||||
for (uint32_t i = 0; i < isz; i++) {
|
||||
buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i);
|
||||
}
|
||||
for (uint32_t i = 0; i < isz; i++) { buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i); }
|
||||
FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp));
|
||||
if (fstBoundWithDataExceededBy(sws->endAt, &slice)) {
|
||||
taosArrayDestroyEx(sws->stack, streamStateDestroy);
|
||||
|
@ -1327,8 +1337,8 @@ FstStreamBuilder* fstStreamBuilderCreate(Fst* fst, AutomationCtx* aut) {
|
|||
}
|
||||
void fstStreamBuilderDestroy(FstStreamBuilder* b) {
|
||||
fstSliceDestroy(&b->min->data);
|
||||
tfree(b->min);
|
||||
fstSliceDestroy(&b->max->data);
|
||||
tfree(b->min);
|
||||
tfree(b->max);
|
||||
free(b);
|
||||
}
|
||||
|
|
|
@ -17,9 +17,7 @@
|
|||
|
||||
StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void* val) {
|
||||
StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue));
|
||||
if (nsv == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
if (nsv == NULL) { return NULL; }
|
||||
|
||||
nsv->kind = kind;
|
||||
nsv->type = ty;
|
||||
|
@ -37,9 +35,7 @@ StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueTyp
|
|||
}
|
||||
void startWithStateValueDestroy(void* val) {
|
||||
StartWithStateValue* sv = (StartWithStateValue*)val;
|
||||
if (sv == NULL) {
|
||||
return;
|
||||
}
|
||||
if (sv == NULL) { return; }
|
||||
|
||||
if (sv->type == FST_INT) {
|
||||
//
|
||||
|
@ -52,9 +48,7 @@ void startWithStateValueDestroy(void* val) {
|
|||
}
|
||||
StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) {
|
||||
StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue));
|
||||
if (nsv == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
if (nsv == NULL) { return NULL; }
|
||||
|
||||
nsv->kind = sv->kind;
|
||||
nsv->type = sv->type;
|
||||
|
@ -94,14 +88,10 @@ static bool prefixCanMatch(AutomationCtx* ctx, void* sv) {
|
|||
static bool prefixWillAlwaysMatch(AutomationCtx* ctx, void* state) { return true; }
|
||||
static void* prefixAccept(AutomationCtx* ctx, void* state, uint8_t byte) {
|
||||
StartWithStateValue* ssv = (StartWithStateValue*)state;
|
||||
if (ssv == NULL || ctx == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
if (ssv == NULL || ctx == NULL) { return NULL; }
|
||||
|
||||
char* data = ctx->data;
|
||||
if (ssv->kind == Done) {
|
||||
return startWithStateValueCreate(Done, FST_INT, &ssv->val);
|
||||
}
|
||||
if (ssv->kind == Done) { return startWithStateValueCreate(Done, FST_INT, &ssv->val); }
|
||||
if ((strlen(data) > ssv->val) && data[ssv->val] == byte) {
|
||||
int val = ssv->val + 1;
|
||||
|
||||
|
@ -138,9 +128,7 @@ AutomationFunc automFuncs[] = {
|
|||
|
||||
AutomationCtx* automCtxCreate(void* data, AutomationType atype) {
|
||||
AutomationCtx* ctx = calloc(1, sizeof(AutomationCtx));
|
||||
if (ctx == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
if (ctx == NULL) { return NULL; }
|
||||
|
||||
StartWithStateValue* sv = NULL;
|
||||
if (atype == AUTOMATION_ALWAYS) {
|
||||
|
|
|
@ -42,8 +42,8 @@ static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) {
|
|||
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) {
|
||||
int nRead = 0;
|
||||
if (ctx->type == TFile) {
|
||||
tfLseek(ctx->file.fd, offset, 0);
|
||||
nRead = tfRead(ctx->file.fd, buf, len);
|
||||
// tfLseek(ctx->file.fd, offset, 0);
|
||||
nRead = tfPread(ctx->file.fd, buf, len, offset);
|
||||
} else {
|
||||
// refactor later
|
||||
assert(0);
|
||||
|
@ -52,6 +52,7 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
|
|||
}
|
||||
static int writeCtxDoFlush(WriterCtx* ctx) {
|
||||
if (ctx->type == TFile) {
|
||||
// taosFsyncFile(ctx->file.fd);
|
||||
tfFsync(ctx->file.fd);
|
||||
// tfFlush(ctx->file.fd);
|
||||
} else {
|
||||
|
@ -69,13 +70,15 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
|
|||
// ugly code, refactor later
|
||||
ctx->file.readOnly = readOnly;
|
||||
if (readOnly == false) {
|
||||
// ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
ctx->file.fd = tfOpenCreateWriteAppend(path);
|
||||
} else {
|
||||
ctx->file.fd = tfOpenReadWrite(path);
|
||||
// ctx->file.fd = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
ctx->file.fd = tfOpenRead(path);
|
||||
}
|
||||
memcpy(ctx->file.buf, path, strlen(path));
|
||||
if (ctx->file.fd < 0) {
|
||||
indexError("open file error %d", errno);
|
||||
indexError("failed to open file, error %d", errno);
|
||||
goto END;
|
||||
}
|
||||
} else if (ctx->type == TMemory) {
|
||||
|
@ -101,10 +104,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
|
|||
free(ctx->mem.buf);
|
||||
} else {
|
||||
tfClose(ctx->file.fd);
|
||||
if (remove) {
|
||||
indexError("rm file %s", ctx->file.buf);
|
||||
unlink(ctx->file.buf);
|
||||
}
|
||||
if (remove) { unlink(ctx->file.buf); }
|
||||
}
|
||||
free(ctx);
|
||||
}
|
||||
|
@ -144,7 +144,8 @@ int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len)
|
|||
}
|
||||
|
||||
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { return 0; }
|
||||
int fstCountingWriterFlush(FstCountingWriter* write) {
|
||||
|
||||
int fstCountingWriterFlush(FstCountingWriter* write) {
|
||||
WriterCtx* ctx = write->wrt;
|
||||
ctx->flush(ctx);
|
||||
// write->wtr->flush
|
||||
|
|
|
@ -53,13 +53,6 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId,
|
|||
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version);
|
||||
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
|
||||
|
||||
static TFileReader* tfileReaderCreateImpl(WriterCtx* ctx) {
|
||||
TFileReader* reader = tfileReaderCreate(ctx);
|
||||
tfileReaderRef(reader);
|
||||
// tfileSerialCacheKey(&key, buf);
|
||||
return reader;
|
||||
}
|
||||
|
||||
TFileCache* tfileCacheCreate(const char* path) {
|
||||
TFileCache* tcache = calloc(1, sizeof(TFileCache));
|
||||
if (tcache == NULL) { return NULL; }
|
||||
|
@ -88,13 +81,16 @@ TFileCache* tfileCacheCreate(const char* path) {
|
|||
}
|
||||
|
||||
char buf[128] = {0};
|
||||
TFileReader* reader = tfileReaderCreateImpl(wc);
|
||||
TFileReader* reader = tfileReaderCreate(wc);
|
||||
TFileHeader* header = &reader->header;
|
||||
TFileCacheKey key = {.suid = header->suid,
|
||||
.colName = header->colName,
|
||||
.nColName = strlen(header->colName),
|
||||
.colType = header->colType};
|
||||
tfileSerialCacheKey(&key, buf);
|
||||
|
||||
tfileReaderRef(reader);
|
||||
// indexTable
|
||||
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
|
||||
}
|
||||
taosArrayDestroyEx(files, tfileDestroyFileName);
|
||||
|
@ -139,6 +135,7 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader)
|
|||
if (p != NULL) {
|
||||
TFileReader* oldReader = *p;
|
||||
taosHashRemove(tcache->tableCache, buf, strlen(buf));
|
||||
oldReader->remove = true;
|
||||
tfileReaderUnRef(oldReader);
|
||||
}
|
||||
|
||||
|
@ -152,7 +149,6 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
|||
|
||||
// T_REF_INC(reader);
|
||||
reader->ctx = ctx;
|
||||
|
||||
if (0 != tfileReaderLoadHeader(reader)) {
|
||||
tfileReaderDestroy(reader);
|
||||
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
|
||||
|
@ -172,7 +168,7 @@ void tfileReaderDestroy(TFileReader* reader) {
|
|||
if (reader == NULL) { return; }
|
||||
// T_REF_INC(reader);
|
||||
fstDestroy(reader->fst);
|
||||
writerCtxDestroy(reader->ctx, true);
|
||||
writerCtxDestroy(reader->ctx, reader->remove);
|
||||
free(reader);
|
||||
}
|
||||
|
||||
|
@ -232,7 +228,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
|
|||
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
|
||||
if (wc == NULL) { return NULL; }
|
||||
|
||||
TFileReader* reader = tfileReaderCreateImpl(wc);
|
||||
TFileReader* reader = tfileReaderCreate(wc);
|
||||
return reader;
|
||||
|
||||
// tfileSerialCacheKey(&key, buf);
|
||||
|
@ -330,13 +326,16 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
|
|||
return -1;
|
||||
}
|
||||
// write fst
|
||||
indexError("--------Begin----------------");
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
// TODO, fst batch write later
|
||||
TFileValue* v = taosArrayGetP((SArray*)data, i);
|
||||
if (tfileWriteData(tw, v) == 0) {
|
||||
//
|
||||
}
|
||||
indexError("data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId));
|
||||
}
|
||||
indexError("--------End----------------");
|
||||
fstBuilderFinish(tw->fb);
|
||||
fstBuilderDestroy(tw->fb);
|
||||
tw->fb = NULL;
|
||||
|
@ -360,7 +359,10 @@ IndexTFile* indexTFileCreate(const char* path) {
|
|||
tfile->cache = tfileCacheCreate(path);
|
||||
return tfile;
|
||||
}
|
||||
void IndexTFileDestroy(IndexTFile* tfile) { free(tfile); }
|
||||
void IndexTFileDestroy(IndexTFile* tfile) {
|
||||
tfileCacheDestroy(tfile->cache);
|
||||
free(tfile);
|
||||
}
|
||||
|
||||
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
|
||||
int ret = -1;
|
||||
|
@ -539,8 +541,14 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
|
|||
char buf[TFILE_HEADER_SIZE] = {0};
|
||||
|
||||
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
|
||||
assert(nread == sizeof(buf));
|
||||
if (nread == -1) {
|
||||
//
|
||||
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
|
||||
errno, reader->ctx->file.fd, reader->ctx->file.buf);
|
||||
}
|
||||
// assert(nread == sizeof(buf));
|
||||
memcpy(&reader->header, buf, sizeof(buf));
|
||||
|
||||
return 0;
|
||||
}
|
||||
static int tfileReaderLoadFst(TFileReader* reader) {
|
||||
|
@ -573,7 +581,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
|
|||
char* buf = calloc(1, total);
|
||||
if (buf == NULL) { return -1; }
|
||||
|
||||
nread = ctx->read(ctx, buf, total);
|
||||
nread = ctx->readFrom(ctx, buf, total, offset + sizeof(nid));
|
||||
assert(total == nread);
|
||||
|
||||
for (int32_t i = 0; i < nid; i++) { taosArrayPush(result, (uint64_t*)buf + i); }
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
#include "index.h"
|
||||
#include "indexInt.h"
|
||||
|
@ -42,7 +43,8 @@ class FstWriter {
|
|||
|
||||
class FstReadMemory {
|
||||
public:
|
||||
FstReadMemory(size_t size) {
|
||||
FstReadMemory(size_t size, const std::string& fileName = fileName) {
|
||||
tfInit();
|
||||
_wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024);
|
||||
_w = fstCountingWriterCreate(_wc);
|
||||
_size = size;
|
||||
|
@ -101,6 +103,7 @@ class FstReadMemory {
|
|||
fstDestroy(_fst);
|
||||
fstSliceDestroy(&_s);
|
||||
writerCtxDestroy(_wc, false);
|
||||
tfCleanup();
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -165,8 +168,44 @@ void checkFstCheckIterator() {
|
|||
delete m;
|
||||
tfCleanup();
|
||||
}
|
||||
int main() {
|
||||
checkFstCheckIterator();
|
||||
|
||||
void fst_get(Fst* fst) {
|
||||
for (int i = 0; i < 10000; i++) {
|
||||
std::string term = "Hello";
|
||||
FstSlice key = fstSliceCreate((uint8_t*)term.c_str(), term.size());
|
||||
uint64_t offset = 0;
|
||||
bool ret = fstGet(fst, &key, &offset);
|
||||
if (ret == false) {
|
||||
std::cout << "not found" << std::endl;
|
||||
} else {
|
||||
std::cout << "found value:" << offset << std::endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#define NUM_OF_THREAD 10
|
||||
void validateTFile(char* arg) {
|
||||
tfInit();
|
||||
|
||||
std::thread threads[NUM_OF_THREAD];
|
||||
// std::vector<std::thread> threads;
|
||||
TFileReader* reader = tfileReaderOpen(arg, 0, 295868, "tag1");
|
||||
|
||||
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
||||
threads[i] = std::thread(fst_get, reader->fst);
|
||||
// threads.push_back(fst_get, reader->fst);
|
||||
// std::thread t(fst_get, reader->fst);
|
||||
}
|
||||
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
||||
// wait join
|
||||
threads[i].join();
|
||||
}
|
||||
tfCleanup();
|
||||
}
|
||||
int main(int argc, char* argv[]) {
|
||||
if (argc > 1) { validateTFile(argv[1]); }
|
||||
// checkFstCheckIterator();
|
||||
// checkFstPrefixSearch();
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include "index.h"
|
||||
#include "indexInt.h"
|
||||
#include "index_cache.h"
|
||||
|
@ -25,6 +26,9 @@
|
|||
#include "tskiplist.h"
|
||||
#include "tutil.h"
|
||||
using namespace std;
|
||||
|
||||
#define NUM_OF_THREAD 10
|
||||
|
||||
class DebugInfo {
|
||||
public:
|
||||
DebugInfo(const char* str) : info(str) {
|
||||
|
@ -41,6 +45,7 @@ class DebugInfo {
|
|||
private:
|
||||
std::string info;
|
||||
};
|
||||
|
||||
class FstWriter {
|
||||
public:
|
||||
FstWriter() {
|
||||
|
@ -332,6 +337,8 @@ class TFileObj {
|
|||
TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage")
|
||||
: path_(path), colName_(colName) {
|
||||
colId_ = 10;
|
||||
reader_ = NULL;
|
||||
writer_ = NULL;
|
||||
// Do Nothing
|
||||
//
|
||||
}
|
||||
|
@ -527,6 +534,7 @@ TEST_F(IndexCacheEnv, cache_test) {
|
|||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
coj->Put(term, colId, version++, suid++);
|
||||
// indexTermDestry(term);
|
||||
}
|
||||
{
|
||||
std::string colVal("v3");
|
||||
|
@ -634,6 +642,23 @@ class IndexObj {
|
|||
indexMultiTermDestroy(terms);
|
||||
return numOfTable;
|
||||
}
|
||||
int WriteMultiMillonData(const std::string& colName, const std::string& colVal = "Hello world",
|
||||
size_t numOfTable = 100 * 10000) {
|
||||
std::string tColVal = colVal;
|
||||
for (int i = 0; i < numOfTable; i++) {
|
||||
tColVal[tColVal.size() - 1] = 'a' + i % 26;
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||
indexMultiTermAdd(terms, term);
|
||||
for (size_t i = 0; i < 10; i++) {
|
||||
int ret = Put(terms, i);
|
||||
assert(ret == 0);
|
||||
}
|
||||
indexMultiTermDestroy(terms);
|
||||
}
|
||||
return numOfTable;
|
||||
}
|
||||
|
||||
int Put(SIndexMultiTerm* fvs, uint64_t uid) {
|
||||
numOfWrite += taosArrayGetSize(fvs);
|
||||
|
@ -656,6 +681,14 @@ class IndexObj {
|
|||
return taosArrayGetSize(result);
|
||||
// assert(taosArrayGetSize(result) == targetSize);
|
||||
}
|
||||
void PutOne(const std::string& colName, const std::string& colVal) {
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||
indexMultiTermAdd(terms, term);
|
||||
Put(terms, 10);
|
||||
indexMultiTermDestroy(terms);
|
||||
}
|
||||
void Debug() {
|
||||
std::cout << "numOfWrite:" << numOfWrite << std::endl;
|
||||
std::cout << "numOfRead:" << numOfRead << std::endl;
|
||||
|
@ -687,7 +720,7 @@ class IndexEnv2 : public ::testing::Test {
|
|||
IndexObj* index;
|
||||
};
|
||||
TEST_F(IndexEnv2, testIndexOpen) {
|
||||
std::string path = "/tmp";
|
||||
std::string path = "/tmp/test";
|
||||
if (index->Init(path) != 0) {
|
||||
std::cout << "failed to init index" << std::endl;
|
||||
exit(1);
|
||||
|
@ -723,10 +756,24 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
|||
}
|
||||
indexMultiTermDestroy(terms);
|
||||
}
|
||||
|
||||
{
|
||||
size_t size = 200;
|
||||
std::string colName("tag1"), colVal("Hello");
|
||||
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||
indexMultiTermAdd(terms, term);
|
||||
for (size_t i = size * 3; i < size * 4; i++) {
|
||||
int tableId = i;
|
||||
int ret = index->Put(terms, tableId);
|
||||
assert(ret == 0);
|
||||
}
|
||||
indexMultiTermDestroy(terms);
|
||||
}
|
||||
|
||||
{
|
||||
std::string colName("tag1"), colVal("Hello");
|
||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
|
@ -735,21 +782,44 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
|||
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
||||
index->Search(mq, result);
|
||||
std::cout << "target size: " << taosArrayGetSize(result) << std::endl;
|
||||
// assert(taosArrayGetSize(result) == targetSize);
|
||||
assert(taosArrayGetSize(result) == 400);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(IndexEnv2, testIndex_TrigeFlush) {
|
||||
std::string path = "/tmp";
|
||||
if (index->Init(path) != 0) {}
|
||||
std::string path = "/tmp/test";
|
||||
if (index->Init(path) != 0) {
|
||||
// r
|
||||
std::cout << "failed to init" << std::endl;
|
||||
}
|
||||
int numOfTable = 100 * 10000;
|
||||
index->WriteMillonData("tag1", "Hello world", numOfTable);
|
||||
int target = index->SearchOne("tag1", "Hello world");
|
||||
index->WriteMillonData("tag1", "Hello", numOfTable);
|
||||
int target = index->SearchOne("tag1", "Hello");
|
||||
assert(numOfTable == target);
|
||||
}
|
||||
|
||||
static void write_and_search(IndexObj* idx) {
|
||||
std::string colName("tag1"), colVal("Hello");
|
||||
|
||||
int target = idx->SearchOne("tag1", "Hello");
|
||||
idx->PutOne(colName, colVal);
|
||||
}
|
||||
TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
|
||||
std::string path = "/tmp";
|
||||
if (index->Init(path) != 0) {}
|
||||
std::string path = "/tmp/cache_and_tfile";
|
||||
if (index->Init(path) != 0) {
|
||||
// opt
|
||||
}
|
||||
index->WriteMultiMillonData("tag1", "Hello", 200000);
|
||||
std::thread threads[NUM_OF_THREAD];
|
||||
|
||||
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
||||
//
|
||||
threads[i] = std::thread(write_and_search, index);
|
||||
}
|
||||
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
||||
// TOD
|
||||
threads[i].join();
|
||||
}
|
||||
}
|
||||
TEST_F(IndexEnv2, testIndex_multi_thread_write) {
|
||||
std::string path = "/tmp";
|
||||
|
@ -769,4 +839,7 @@ TEST_F(IndexEnv2, testIndex_performance) {
|
|||
std::string path = "/tmp";
|
||||
if (index->Init(path) != 0) {}
|
||||
}
|
||||
TEST_F(IndexEnv2, testIndexMultiTag) {}
|
||||
TEST_F(IndexEnv2, testIndexMultiTag) {
|
||||
std::string path = "/tmp";
|
||||
if (index->Init(path) != 0) {}
|
||||
}
|
||||
|
|
|
@ -16,21 +16,19 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "ulog.h"
|
||||
#include "tutil.h"
|
||||
#include "tref.h"
|
||||
#include "tutil.h"
|
||||
#include "ulog.h"
|
||||
|
||||
static int32_t tsFileRsetId = -1;
|
||||
|
||||
static int8_t tfInited = 0;
|
||||
|
||||
static void tfCloseFile(void *p) {
|
||||
taosCloseFile((int32_t)(uintptr_t)p);
|
||||
}
|
||||
static void tfCloseFile(void *p) { taosCloseFile((int32_t)(uintptr_t)p); }
|
||||
|
||||
int32_t tfInit() {
|
||||
int8_t old = atomic_val_compare_exchange_8(&tfInited, 0, 1);
|
||||
if(old == 1) return 0;
|
||||
if (old == 1) return 0;
|
||||
tsFileRsetId = taosOpenRef(2000, tfCloseFile);
|
||||
if (tsFileRsetId > 0) {
|
||||
return 0;
|
||||
|
@ -79,9 +77,7 @@ int64_t tfOpenCreateWriteAppend(const char *pathname, int32_t flags, mode_t mode
|
|||
return tfOpenImp(fd);
|
||||
}
|
||||
|
||||
int64_t tfClose(int64_t tfd) {
|
||||
return taosRemoveRef(tsFileRsetId, tfd);
|
||||
}
|
||||
int64_t tfClose(int64_t tfd) { return taosRemoveRef(tsFileRsetId, tfd); }
|
||||
|
||||
int64_t tfWrite(int64_t tfd, void *buf, int64_t count) {
|
||||
void *p = taosAcquireRef(tsFileRsetId, tfd);
|
||||
|
@ -109,6 +105,19 @@ int64_t tfRead(int64_t tfd, void *buf, int64_t count) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int64_t tfPread(int64_t tfd, void *buf, int64_t count, int32_t offset) {
|
||||
void *p = taosAcquireRef(tsFileRsetId, tfd);
|
||||
if (p == NULL) return -1;
|
||||
|
||||
int32_t fd = (int32_t)(uintptr_t)p;
|
||||
|
||||
int64_t ret = pread(fd, buf, count, offset);
|
||||
if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
||||
taosReleaseRef(tsFileRsetId, tfd);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t tfFsync(int64_t tfd) {
|
||||
void *p = taosAcquireRef(tsFileRsetId, tfd);
|
||||
if (p == NULL) return -1;
|
||||
|
|
Loading…
Reference in New Issue