diff --git a/include/libs/index/index.h b/include/libs/index/index.h index 1b74928568..0885ce151e 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -28,6 +28,15 @@ typedef struct SIndexOpts SIndexOpts; typedef struct SIndexMultiTermQuery SIndexMultiTermQuery; typedef struct SArray SIndexMultiTerm; +typedef enum { + ADD_VALUE, // add index colume value + DEL_VALUE, // delete index column value + UPDATE_VALUE, // update index column value + ADD_INDEX, // add index on specify column + DROP_INDEX, // drop existed index + DROP_SATBLE // drop stable +} SIndexColumnType; + typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType; typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2,QUERY_REGEX = 3} EIndexQueryType; /* diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index a6862c05c8..f6ff9bc139 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -17,7 +17,10 @@ #define _TD_INDEX_INT_H_ #include "index.h" +#include "index_fst.h" #include "tlog.h" +#include "thash.h" +#include "taos.h" #ifdef USE_LUCENE #include @@ -32,12 +35,20 @@ struct SIndex { #ifdef USE_LUCENE index_t *index; #endif + void *cache; + void *tindex; + SHashObj *fieldObj; // + uint64_t suid; + int fieldId; + pthread_mutex_t mtx; }; struct SIndexOpts { #ifdef USE_LUCENE void *opts; #endif + int32_t numOfItermLimit; + int8_t mergeInterval; }; struct SIndexMultiTermQuery { diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h new file mode 100644 index 0000000000..27e095ff31 --- /dev/null +++ b/source/libs/index/inc/index_cache.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef __INDEX_CACHE_H__ +#define __INDEX_CACHE_H__ + +#include "index.h" +#include "tlockfree.h" +// ----------------- row structure in skiplist --------------------- + +/* A data row, the format is like below: + * |<--totalLen-->|<-- fieldId-->|<-- value len--->|<-- value-->|<--version--->|<-- itermType -->| + * + */ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct IndexCache { + T_REF_DECLARE() + int cVersion; // +} IndexCache; + + +// +IndexCache *indexCacheCreate(); + +void indexCacheDestroy(IndexCache *cache); + +int indexCachePut(IndexCache *cache, int32_t fieldId, const char *fieldVale, int32_t fvlen, uint64_t uid, int8_t operaType); + +int indexCacheGet(IndexCache *cache, uint64_t *rst); +int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result); + +#ifdef __cplusplus +} +#endif + + + +#endif diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index f8f4311a4a..c011411189 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -15,39 +15,58 @@ #include "index.h" #include "indexInt.h" +#include "index_cache.h" #ifdef USE_LUCENE #include "lucene++/Lucene_c.h" #endif -static pthread_once_t isInit = PTHREAD_ONCE_INIT; +typedef struct SIdxFieldInfo { + int id; // generated by index internal + int type; // field type +} SIdxFieldInfo; + +static pthread_once_t isInit = PTHREAD_ONCE_INIT; static void indexInit(); +static int indexMergeCacheIntoTindex(struct SIndex *sIdx) { + if (sIdx == NULL) { + return -1; + } + indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); + return 0; +} SIndex *indexOpen(SIndexOpts *opts, const char *path) { pthread_once(&isInit, indexInit); + SIndex *sIdx = malloc(sizeof(SIndex)); + #ifdef USE_LUCENE index_t *index = index_open(path); - SIndex *p = malloc(sizeof(SIndex)); - p->index = index; - return p; + sIdx->index = index; #endif - return NULL; + + sIdx->cache = (void*)indexCacheCreate(); + sIdx->tindex = NULL; + sIdx->fieldObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + pthread_mutex_init(&sIdx->mtx, NULL); + return sIdx; } -void indexClose(SIndex *index) { +void indexClose(SIndex *sIdx) { #ifdef USE_LUCENE - index_close(index->index); - index->index = NULL; + index_close(sIdex->index); + sIdx->index = NULL; #endif - free(index); + indexCacheDestroy(sIdx->cache); + taosHashCleanup(sIdx->fieldObj); + pthread_mutex_destroy(&sIdx->mtx); + free(sIdx); return; - } -#ifdef USE_LUCENE -#endif int indexPut(SIndex *index, SArray* field_vals, int uid) { + #ifdef USE_LUCENE index_document_t *doc = index_document_create(); @@ -63,6 +82,8 @@ int indexPut(SIndex *index, SArray* field_vals, int uid) { index_put(index->index, doc); index_document_destroy(doc); #endif + pthread_mutex_lock(&index->mtx); + pthread_mutex_unlock(&index->mtx); return 1; } @@ -105,7 +126,9 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result return 1; } + int indexDelete(SIndex *index, SIndexMultiTermQuery *query) { + return 1; } int indexRebuild(SIndex *index, SIndexOpts *opts); diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c new file mode 100644 index 0000000000..7c355b0f0a --- /dev/null +++ b/source/libs/index/src/index_cache.c @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "index_cache.h" + +static int32_t compareKey(const void *l, const void *r) { + char *lp = (char *)l; + char *rp = (char *)r; + + // skip total len + int32_t ll, rl; // len + memcpy(&ll, lp, sizeof(int32_t)); + memcpy(&rl, rp, sizeof(int32_t)); + lp += sizeof(int32_t); + rp += sizeof(int32_t); + + // compare field id + int32_t lf, rf; // field id + memcpy(&lf, lp, sizeof(lf)); + memcpy(&rf, rp, sizeof(rf)); + if (lf != rf) { + return lf < rf ? -1: 1; + } + lp += sizeof(lf); + rp += sizeof(rf); + + // compare field value + int32_t lfl, rfl; + memcpy(&lfl, lp, sizeof(lfl)); + memcpy(&rfl, rp, sizeof(rfl)); + lp += sizeof(lfl); + rp += sizeof(rfl); + + //refator later + int32_t i, j; + for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) { + if (lp[i] == rp[j]) { continue; } + else { return lp[i] < rp[j] ? -1 : 1;} + } + if (i < lfl) { return 1;} + else if (j < rfl) { return -1; } + lp += lfl; + rp += rfl; + + // compare version + int32_t lv, rv; + memcpy(&lv, lp, sizeof(lv)); + memcpy(&rv, rp, sizeof(rv)); + if (lv != rv) { + return lv > rv ? -1 : 1; + } + lp += sizeof(lv); + rp += sizeof(rv); + + + return 0; + +} +IndexCache *indexCacheCreate() { + IndexCache *cache = calloc(1, sizeof(IndexCache)); + return cache; +} + +void indexCacheDestroy(IndexCache *cache) { + free(cache); +} + +int indexCachePut(IndexCache *cache, int32_t fieldId, const char *fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) { + if (cache == NULL) { return -1;} + int32_t version = T_REF_INC(cache); + + int32_t total = sizeof(int32_t) + sizeof(fieldId) + 4 + fvlen + sizeof(version) + sizeof(uid) + sizeof(operType); + + char *buf = calloc(1, total); + char *p = buf; + + memcpy(buf, &total, sizeof(total)); + total += total; + + memcpy(buf, &fieldId, sizeof(fieldId)); + buf += sizeof(fieldId); + + memcpy(buf, &fvlen, sizeof(fvlen)); + buf += sizeof(fvlen); + memcpy(buf, fieldValue, fvlen); + buf += fvlen; + + memcpy(buf, &version, sizeof(version)); + buf += sizeof(version); + + memcpy(buf, &uid, sizeof(uid)); + buf += sizeof(uid); + + memcpy(buf, &operType, sizeof(operType)); + buf += sizeof(operType); + + +} +int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result) { + + return 0; +} + + + + + diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 9cb4ac6836..7aaa498864 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1330,7 +1330,7 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb SArray *nodes = taosArrayInit(8, sizeof(FstNode *)); while (taosArrayGetSize(sws->stack) > 0) { StreamState *p = (StreamState *)taosArrayPop(sws->stack); - if (p->trans >= FST_NODE_LEN(p->node) || automFuncs[aut->type].canMatch(aut, p->autState)) { + if (p->trans >= FST_NODE_LEN(p->node) || !automFuncs[aut->type].canMatch(aut, p->autState)) { if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) { taosArrayPop(sws->inp); } diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c index f70b90041b..07ad45079b 100644 --- a/source/libs/index/src/index_fst_automation.c +++ b/source/libs/index/src/index_fst_automation.c @@ -87,9 +87,18 @@ static void* prefixAccept(AutomationCtx *ctx, void *state, uint8_t byte) { if (ssv == NULL || ctx == NULL) {return NULL;} char *data = ctx->data; + if (ssv->kind == Done) { + return startWithStateValueCreate(Done, FST_INT, &ssv->val); + } if ((strlen(data) > ssv->val) && data[ssv->val] == byte) { int val = ssv->val + 1; - return startWithStateValueCreate(Running, FST_INT, &val); + StartWithStateValue *nsv = startWithStateValueCreate(Running, FST_INT, &val); + if (prefixIsMatch(ctx, nsv)) { + nsv->kind = Done; + } else { + nsv->kind = Running; + } + return nsv; } return NULL; }