add index cache

This commit is contained in:
yihaoDeng 2021-12-16 22:04:47 +08:00
parent 4932afa8e2
commit f0110af30e
5 changed files with 66 additions and 14 deletions

View File

@ -28,6 +28,15 @@ typedef struct SIndexOpts SIndexOpts;
typedef struct SIndexMultiTermQuery SIndexMultiTermQuery; typedef struct SIndexMultiTermQuery SIndexMultiTermQuery;
typedef struct SArray SIndexMultiTerm; typedef struct SArray SIndexMultiTerm;
typedef enum {
ADD_VALUE, // add index colume value
DEL_VALUE, // delete index column value
UPDATE_VALUE, // update index column value
ADD_INDEX, // add index on specify column
DROP_INDEX, // drop existed index
DROP_SATBLE // drop stable
} SIndexColumnType;
typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType; typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType;
typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2,QUERY_REGEX = 3} EIndexQueryType; typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2,QUERY_REGEX = 3} EIndexQueryType;
/* /*

View File

@ -17,7 +17,10 @@
#define _TD_INDEX_INT_H_ #define _TD_INDEX_INT_H_
#include "index.h" #include "index.h"
#include "index_fst.h"
#include "tlog.h" #include "tlog.h"
#include "thash.h"
#include "taos.h"
#ifdef USE_LUCENE #ifdef USE_LUCENE
#include <lucene++/Lucene_c.h> #include <lucene++/Lucene_c.h>
@ -32,12 +35,20 @@ struct SIndex {
#ifdef USE_LUCENE #ifdef USE_LUCENE
index_t *index; index_t *index;
#endif #endif
void *cache;
void *tindex;
SHashObj *fieldObj; // <field name, field id>
uint64_t suid;
int fieldId;
pthread_mutex_t mtx;
}; };
struct SIndexOpts { struct SIndexOpts {
#ifdef USE_LUCENE #ifdef USE_LUCENE
void *opts; void *opts;
#endif #endif
int32_t numOfItermLimit;
int8_t mergeInterval;
}; };
struct SIndexMultiTermQuery { struct SIndexMultiTermQuery {

View File

@ -15,39 +15,58 @@
#include "index.h" #include "index.h"
#include "indexInt.h" #include "indexInt.h"
#include "index_cache.h"
#ifdef USE_LUCENE #ifdef USE_LUCENE
#include "lucene++/Lucene_c.h" #include "lucene++/Lucene_c.h"
#endif #endif
static pthread_once_t isInit = PTHREAD_ONCE_INIT;
typedef struct SIdxFieldInfo {
int id; // generated by index internal
int type; // field type
} SIdxFieldInfo;
static pthread_once_t isInit = PTHREAD_ONCE_INIT;
static void indexInit(); static void indexInit();
static int indexMergeCacheIntoTindex(struct SIndex *sIdx) {
if (sIdx == NULL) {
return -1;
}
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
return 0;
}
SIndex *indexOpen(SIndexOpts *opts, const char *path) { SIndex *indexOpen(SIndexOpts *opts, const char *path) {
pthread_once(&isInit, indexInit); pthread_once(&isInit, indexInit);
SIndex *sIdx = malloc(sizeof(SIndex));
#ifdef USE_LUCENE #ifdef USE_LUCENE
index_t *index = index_open(path); index_t *index = index_open(path);
SIndex *p = malloc(sizeof(SIndex)); sIdx->index = index;
p->index = index;
return p;
#endif #endif
return NULL;
sIdx->cache = (void*)indexCacheCreate();
sIdx->tindex = NULL;
sIdx->fieldObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pthread_mutex_init(&sIdx->mtx, NULL);
return sIdx;
} }
void indexClose(SIndex *index) { void indexClose(SIndex *sIdx) {
#ifdef USE_LUCENE #ifdef USE_LUCENE
index_close(index->index); index_close(sIdex->index);
index->index = NULL; sIdx->index = NULL;
#endif #endif
free(index); indexCacheDestroy(sIdx->cache);
taosHashCleanup(sIdx->fieldObj);
pthread_mutex_destroy(&sIdx->mtx);
free(sIdx);
return; return;
} }
#ifdef USE_LUCENE
#endif
int indexPut(SIndex *index, SArray* field_vals, int uid) { int indexPut(SIndex *index, SArray* field_vals, int uid) {
#ifdef USE_LUCENE #ifdef USE_LUCENE
index_document_t *doc = index_document_create(); index_document_t *doc = index_document_create();
@ -63,6 +82,8 @@ int indexPut(SIndex *index, SArray* field_vals, int uid) {
index_put(index->index, doc); index_put(index->index, doc);
index_document_destroy(doc); index_document_destroy(doc);
#endif #endif
pthread_mutex_lock(&index->mtx);
pthread_mutex_unlock(&index->mtx);
return 1; return 1;
} }
@ -105,7 +126,9 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result
return 1; return 1;
} }
int indexDelete(SIndex *index, SIndexMultiTermQuery *query) { int indexDelete(SIndex *index, SIndexMultiTermQuery *query) {
return 1; return 1;
} }
int indexRebuild(SIndex *index, SIndexOpts *opts); int indexRebuild(SIndex *index, SIndexOpts *opts);

View File

@ -1330,7 +1330,7 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
SArray *nodes = taosArrayInit(8, sizeof(FstNode *)); SArray *nodes = taosArrayInit(8, sizeof(FstNode *));
while (taosArrayGetSize(sws->stack) > 0) { while (taosArrayGetSize(sws->stack) > 0) {
StreamState *p = (StreamState *)taosArrayPop(sws->stack); StreamState *p = (StreamState *)taosArrayPop(sws->stack);
if (p->trans >= FST_NODE_LEN(p->node) || automFuncs[aut->type].canMatch(aut, p->autState)) { if (p->trans >= FST_NODE_LEN(p->node) || !automFuncs[aut->type].canMatch(aut, p->autState)) {
if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) { if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) {
taosArrayPop(sws->inp); taosArrayPop(sws->inp);
} }

View File

@ -87,9 +87,18 @@ static void* prefixAccept(AutomationCtx *ctx, void *state, uint8_t byte) {
if (ssv == NULL || ctx == NULL) {return NULL;} if (ssv == NULL || ctx == NULL) {return NULL;}
char *data = ctx->data; char *data = ctx->data;
if (ssv->kind == Done) {
return startWithStateValueCreate(Done, FST_INT, &ssv->val);
}
if ((strlen(data) > ssv->val) && data[ssv->val] == byte) { if ((strlen(data) > ssv->val) && data[ssv->val] == byte) {
int val = ssv->val + 1; int val = ssv->val + 1;
return startWithStateValueCreate(Running, FST_INT, &val); StartWithStateValue *nsv = startWithStateValueCreate(Running, FST_INT, &val);
if (prefixIsMatch(ctx, nsv)) {
nsv->kind = Done;
} else {
nsv->kind = Running;
}
return nsv;
} }
return NULL; return NULL;
} }