Merge pull request #9179 from taosdata/feature/index_cache
Feature/index cache
This commit is contained in:
commit
4658a9636f
|
@ -9,6 +9,7 @@ target_link_libraries(
|
||||||
index
|
index
|
||||||
PUBLIC os
|
PUBLIC os
|
||||||
PUBLIC util
|
PUBLIC util
|
||||||
|
PUBLIC common
|
||||||
)
|
)
|
||||||
|
|
||||||
if (${BUILD_WITH_LUCENE})
|
if (${BUILD_WITH_LUCENE})
|
||||||
|
|
|
@ -37,9 +37,11 @@ struct SIndex {
|
||||||
#endif
|
#endif
|
||||||
void *cache;
|
void *cache;
|
||||||
void *tindex;
|
void *tindex;
|
||||||
SHashObj *fieldObj; // <field name, field id>
|
SHashObj *fieldObj;// < field name, field id>
|
||||||
uint64_t suid;
|
|
||||||
int fieldId;
|
int64_t suid; // current super table id, -1 is normal table
|
||||||
|
int fieldId; // field id allocated to cache
|
||||||
|
int32_t cVersion; // current version allocated to cache
|
||||||
pthread_mutex_t mtx;
|
pthread_mutex_t mtx;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -58,6 +60,7 @@ struct SIndexMultiTermQuery {
|
||||||
|
|
||||||
// field and key;
|
// field and key;
|
||||||
typedef struct SIndexTerm {
|
typedef struct SIndexTerm {
|
||||||
|
uint8_t type; // term data type, str/interger/json
|
||||||
char *key;
|
char *key;
|
||||||
int32_t nKey;
|
int32_t nKey;
|
||||||
char *val;
|
char *val;
|
||||||
|
|
|
@ -17,11 +17,12 @@
|
||||||
|
|
||||||
#include "index.h"
|
#include "index.h"
|
||||||
#include "tlockfree.h"
|
#include "tlockfree.h"
|
||||||
|
#include "tskiplist.h"
|
||||||
// ----------------- row structure in skiplist ---------------------
|
// ----------------- row structure in skiplist ---------------------
|
||||||
|
|
||||||
/* A data row, the format is like below:
|
/* A data row, the format is like below:
|
||||||
* |<--totalLen-->|<-- fieldId-->|<-- value len--->|<-- value-->|<--version--->|<-- itermType -->|
|
* content: |<--totalLen-->|<-- fieldid-->|<--field type -->|<-- value len--->|<-- value -->|<-- uid -->|<--version--->|<-- itermType -->|
|
||||||
*
|
* len : |<--int32_t -->|<-- int16_t-->|<-- int16_t --->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
@ -30,7 +31,7 @@ extern "C" {
|
||||||
|
|
||||||
typedef struct IndexCache {
|
typedef struct IndexCache {
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
int cVersion; //
|
SSkipList *skiplist;
|
||||||
} IndexCache;
|
} IndexCache;
|
||||||
|
|
||||||
|
|
||||||
|
@ -39,7 +40,8 @@ IndexCache *indexCacheCreate();
|
||||||
|
|
||||||
void indexCacheDestroy(IndexCache *cache);
|
void indexCacheDestroy(IndexCache *cache);
|
||||||
|
|
||||||
int indexCachePut(IndexCache *cache, int32_t fieldId, const char *fieldVale, int32_t fvlen, uint64_t uid, int8_t operaType);
|
int indexCachePut(IndexCache *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen,
|
||||||
|
uint32_t version, uint64_t uid, int8_t operType);
|
||||||
|
|
||||||
int indexCacheGet(IndexCache *cache, uint64_t *rst);
|
int indexCacheGet(IndexCache *cache, uint64_t *rst);
|
||||||
int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result);
|
int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result);
|
||||||
|
|
|
@ -315,7 +315,6 @@ typedef struct StreamWithStateResult {
|
||||||
FstSlice data;
|
FstSlice data;
|
||||||
FstOutput out;
|
FstOutput out;
|
||||||
void *state;
|
void *state;
|
||||||
|
|
||||||
} StreamWithStateResult;
|
} StreamWithStateResult;
|
||||||
|
|
||||||
StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *state);
|
StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *state);
|
||||||
|
|
|
@ -23,7 +23,8 @@
|
||||||
|
|
||||||
|
|
||||||
typedef struct SIdxFieldInfo {
|
typedef struct SIdxFieldInfo {
|
||||||
int id; // generated by index internal
|
int fieldId; // generated by index internal
|
||||||
|
int cVersion;
|
||||||
int type; // field type
|
int type; // field type
|
||||||
} SIdxFieldInfo;
|
} SIdxFieldInfo;
|
||||||
|
|
||||||
|
@ -39,7 +40,7 @@ static int indexMergeCacheIntoTindex(struct SIndex *sIdx) {
|
||||||
}
|
}
|
||||||
SIndex *indexOpen(SIndexOpts *opts, const char *path) {
|
SIndex *indexOpen(SIndexOpts *opts, const char *path) {
|
||||||
pthread_once(&isInit, indexInit);
|
pthread_once(&isInit, indexInit);
|
||||||
SIndex *sIdx = malloc(sizeof(SIndex));
|
SIndex *sIdx = calloc(1, sizeof(SIndex));
|
||||||
|
|
||||||
#ifdef USE_LUCENE
|
#ifdef USE_LUCENE
|
||||||
index_t *index = index_open(path);
|
index_t *index = index_open(path);
|
||||||
|
@ -49,6 +50,8 @@ SIndex *indexOpen(SIndexOpts *opts, const char *path) {
|
||||||
sIdx->cache = (void*)indexCacheCreate();
|
sIdx->cache = (void*)indexCacheCreate();
|
||||||
sIdx->tindex = NULL;
|
sIdx->tindex = NULL;
|
||||||
sIdx->fieldObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
sIdx->fieldObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
|
sIdx->fieldId = 1;
|
||||||
|
sIdx->cVersion = 1;
|
||||||
pthread_mutex_init(&sIdx->mtx, NULL);
|
pthread_mutex_init(&sIdx->mtx, NULL);
|
||||||
return sIdx;
|
return sIdx;
|
||||||
}
|
}
|
||||||
|
@ -65,7 +68,7 @@ void indexClose(SIndex *sIdx) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int indexPut(SIndex *index, SArray* field_vals, int uid) {
|
int indexPut(SIndex *index, SArray* fVals, int uid) {
|
||||||
|
|
||||||
#ifdef USE_LUCENE
|
#ifdef USE_LUCENE
|
||||||
index_document_t *doc = index_document_create();
|
index_document_t *doc = index_document_create();
|
||||||
|
@ -73,8 +76,8 @@ int indexPut(SIndex *index, SArray* field_vals, int uid) {
|
||||||
char buf[16] = {0};
|
char buf[16] = {0};
|
||||||
sprintf(buf, "%d", uid);
|
sprintf(buf, "%d", uid);
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(field_vals); i++) {
|
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||||
SIndexTerm *p = taosArrayGetP(field_vals, i);
|
SIndexTerm *p = taosArrayGetP(fVals, i);
|
||||||
index_document_add(doc, (const char *)(p->key), p->nKey, (const char *)(p->val), p->nVal, 1);
|
index_document_add(doc, (const char *)(p->key), p->nKey, (const char *)(p->val), p->nVal, 1);
|
||||||
}
|
}
|
||||||
index_document_add(doc, NULL, 0, buf, strlen(buf), 0);
|
index_document_add(doc, NULL, 0, buf, strlen(buf), 0);
|
||||||
|
@ -82,10 +85,33 @@ int indexPut(SIndex *index, SArray* field_vals, int uid) {
|
||||||
index_put(index->index, doc);
|
index_put(index->index, doc);
|
||||||
index_document_destroy(doc);
|
index_document_destroy(doc);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
//TODO(yihao): reduce the lock range
|
||||||
pthread_mutex_lock(&index->mtx);
|
pthread_mutex_lock(&index->mtx);
|
||||||
|
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||||
|
SIndexTerm *p = taosArrayGetP(fVals, i);
|
||||||
|
SIdxFieldInfo *fi = taosHashGet(index->fieldObj, p->key, p->nKey);
|
||||||
|
if (fi == NULL) {
|
||||||
|
SIdxFieldInfo tfi = {.fieldId = index->fieldId, .type = p->type};
|
||||||
|
index->cVersion++;
|
||||||
|
index->fieldId++;
|
||||||
|
taosHashPut(index->fieldObj, p->key, p->nKey, &tfi, sizeof(tfi));
|
||||||
|
} else {
|
||||||
|
//TODO, del
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||||
|
SIndexTerm *p = taosArrayGetP(fVals, i);
|
||||||
|
SIdxFieldInfo *fi = taosHashGet(index->fieldObj, p->key, p->nKey);
|
||||||
|
assert(fi != NULL);
|
||||||
|
int32_t fieldId = fi->fieldId;
|
||||||
|
int32_t colType = fi->type;
|
||||||
|
int32_t version = index->cVersion;
|
||||||
|
|
||||||
|
}
|
||||||
pthread_mutex_unlock(&index->mtx);
|
pthread_mutex_unlock(&index->mtx);
|
||||||
return 1;
|
return 1;
|
||||||
|
|
||||||
}
|
}
|
||||||
int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result) {
|
int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result) {
|
||||||
#ifdef USE_LUCENE
|
#ifdef USE_LUCENE
|
||||||
|
@ -152,7 +178,7 @@ SIndexMultiTermQuery *indexMultiTermQueryCreate(EIndexOperatorType opera) {
|
||||||
SIndexMultiTermQuery *p = (SIndexMultiTermQuery *)malloc(sizeof(SIndexMultiTermQuery));
|
SIndexMultiTermQuery *p = (SIndexMultiTermQuery *)malloc(sizeof(SIndexMultiTermQuery));
|
||||||
if (p == NULL) { return NULL; }
|
if (p == NULL) { return NULL; }
|
||||||
p->opera = opera;
|
p->opera = opera;
|
||||||
p->query = taosArrayInit(1, sizeof(SIndexTermQuery));
|
p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
void indexMultiTermQueryDestroy(SIndexMultiTermQuery *pQuery) {
|
void indexMultiTermQueryDestroy(SIndexMultiTermQuery *pQuery) {
|
||||||
|
|
|
@ -14,12 +14,18 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "index_cache.h"
|
#include "index_cache.h"
|
||||||
|
#include "tcompare.h"
|
||||||
|
|
||||||
|
#define MAX_INDEX_KEY_LEN 128 // test only, change later
|
||||||
|
|
||||||
|
static char* getIndexKey(const void *pData) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
static int32_t compareKey(const void *l, const void *r) {
|
static int32_t compareKey(const void *l, const void *r) {
|
||||||
char *lp = (char *)l;
|
char *lp = (char *)l;
|
||||||
char *rp = (char *)r;
|
char *rp = (char *)r;
|
||||||
|
|
||||||
// skip total len
|
// skip total len, not compare
|
||||||
int32_t ll, rl; // len
|
int32_t ll, rl; // len
|
||||||
memcpy(&ll, lp, sizeof(int32_t));
|
memcpy(&ll, lp, sizeof(int32_t));
|
||||||
memcpy(&rl, rp, sizeof(int32_t));
|
memcpy(&rl, rp, sizeof(int32_t));
|
||||||
|
@ -27,7 +33,7 @@ static int32_t compareKey(const void *l, const void *r) {
|
||||||
rp += sizeof(int32_t);
|
rp += sizeof(int32_t);
|
||||||
|
|
||||||
// compare field id
|
// compare field id
|
||||||
int32_t lf, rf; // field id
|
int16_t lf, rf; // field id
|
||||||
memcpy(&lf, lp, sizeof(lf));
|
memcpy(&lf, lp, sizeof(lf));
|
||||||
memcpy(&rf, rp, sizeof(rf));
|
memcpy(&rf, rp, sizeof(rf));
|
||||||
if (lf != rf) {
|
if (lf != rf) {
|
||||||
|
@ -36,14 +42,22 @@ static int32_t compareKey(const void *l, const void *r) {
|
||||||
lp += sizeof(lf);
|
lp += sizeof(lf);
|
||||||
rp += sizeof(rf);
|
rp += sizeof(rf);
|
||||||
|
|
||||||
// compare field value
|
// compare field type
|
||||||
|
int16_t lft, rft;
|
||||||
|
memcpy(&lft, lp, sizeof(lft));
|
||||||
|
memcpy(&rft, rp, sizeof(rft));
|
||||||
|
lp += sizeof(lft);
|
||||||
|
rp += sizeof(rft);
|
||||||
|
assert(rft == rft);
|
||||||
|
|
||||||
|
// skip value len
|
||||||
int32_t lfl, rfl;
|
int32_t lfl, rfl;
|
||||||
memcpy(&lfl, lp, sizeof(lfl));
|
memcpy(&lfl, lp, sizeof(lfl));
|
||||||
memcpy(&rfl, rp, sizeof(rfl));
|
memcpy(&rfl, rp, sizeof(rfl));
|
||||||
lp += sizeof(lfl);
|
lp += sizeof(lfl);
|
||||||
rp += sizeof(rfl);
|
rp += sizeof(rfl);
|
||||||
|
|
||||||
//refator later
|
// compare value
|
||||||
int32_t i, j;
|
int32_t i, j;
|
||||||
for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) {
|
for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) {
|
||||||
if (lp[i] == rp[j]) { continue; }
|
if (lp[i] == rp[j]) { continue; }
|
||||||
|
@ -54,58 +68,79 @@ static int32_t compareKey(const void *l, const void *r) {
|
||||||
lp += lfl;
|
lp += lfl;
|
||||||
rp += rfl;
|
rp += rfl;
|
||||||
|
|
||||||
// compare version
|
// skip uid
|
||||||
|
uint64_t lu, ru;
|
||||||
|
memcpy(&lu, lp, sizeof(lu));
|
||||||
|
memcpy(&ru, rp, sizeof(ru));
|
||||||
|
lp += sizeof(lu);
|
||||||
|
rp += sizeof(ru);
|
||||||
|
|
||||||
|
// compare version, desc order
|
||||||
int32_t lv, rv;
|
int32_t lv, rv;
|
||||||
memcpy(&lv, lp, sizeof(lv));
|
memcpy(&lv, lp, sizeof(lv));
|
||||||
memcpy(&rv, rp, sizeof(rv));
|
memcpy(&rv, rp, sizeof(rv));
|
||||||
if (lv != rv) {
|
if (lv != rv) {
|
||||||
return lv > rv ? -1 : 1;
|
return lv > rv ? -1 : 1;
|
||||||
}
|
}
|
||||||
lp += sizeof(lv);
|
lp += sizeof(lv);
|
||||||
rp += sizeof(rv);
|
rp += sizeof(rv);
|
||||||
|
// not care item type
|
||||||
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
}
|
}
|
||||||
IndexCache *indexCacheCreate() {
|
IndexCache *indexCacheCreate() {
|
||||||
IndexCache *cache = calloc(1, sizeof(IndexCache));
|
IndexCache *cache = calloc(1, sizeof(IndexCache));
|
||||||
|
cache->skiplist = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
|
||||||
return cache;
|
return cache;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void indexCacheDestroy(IndexCache *cache) {
|
void indexCacheDestroy(IndexCache *cache) {
|
||||||
|
if (cache == NULL) { return; }
|
||||||
|
tSkipListDestroy(cache->skiplist);
|
||||||
free(cache);
|
free(cache);
|
||||||
}
|
}
|
||||||
|
|
||||||
int indexCachePut(IndexCache *cache, int32_t fieldId, const char *fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
|
int indexCachePut(IndexCache *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen,
|
||||||
|
uint32_t version, uint64_t uid, int8_t operType) {
|
||||||
if (cache == NULL) { return -1;}
|
if (cache == NULL) { return -1;}
|
||||||
int32_t version = T_REF_INC(cache);
|
|
||||||
|
|
||||||
int32_t total = sizeof(int32_t) + sizeof(fieldId) + 4 + fvlen + sizeof(version) + sizeof(uid) + sizeof(operType);
|
// encode data
|
||||||
|
int32_t total = sizeof(int32_t) + sizeof(fieldId) + sizeof(fieldType) + sizeof(fvLen) + fvLen + sizeof(version) + sizeof(uid) + sizeof(operType);
|
||||||
|
|
||||||
char *buf = calloc(1, total);
|
char *buf = calloc(1, total);
|
||||||
char *p = buf;
|
char *p = buf;
|
||||||
|
|
||||||
memcpy(buf, &total, sizeof(total));
|
memcpy(p, &total, sizeof(total));
|
||||||
total += total;
|
p += sizeof(total);
|
||||||
|
|
||||||
memcpy(buf, &fieldId, sizeof(fieldId));
|
memcpy(p, &fieldId, sizeof(fieldId));
|
||||||
buf += sizeof(fieldId);
|
p += sizeof(fieldId);
|
||||||
|
|
||||||
memcpy(buf, &fvlen, sizeof(fvlen));
|
memcpy(p, &fieldType, sizeof(fieldType));
|
||||||
buf += sizeof(fvlen);
|
p += sizeof(fieldType);
|
||||||
memcpy(buf, fieldValue, fvlen);
|
|
||||||
buf += fvlen;
|
memcpy(p, &fvLen, sizeof(fvLen));
|
||||||
|
p += sizeof(fvLen);
|
||||||
|
memcpy(p, fieldValue, fvLen);
|
||||||
|
p += fvLen;
|
||||||
|
|
||||||
memcpy(buf, &version, sizeof(version));
|
memcpy(p, &version, sizeof(version));
|
||||||
buf += sizeof(version);
|
p += sizeof(version);
|
||||||
|
|
||||||
memcpy(buf, &uid, sizeof(uid));
|
memcpy(p, &uid, sizeof(uid));
|
||||||
buf += sizeof(uid);
|
p += sizeof(uid);
|
||||||
|
|
||||||
memcpy(buf, &operType, sizeof(operType));
|
memcpy(p, &operType, sizeof(operType));
|
||||||
buf += sizeof(operType);
|
p += sizeof(operType);
|
||||||
|
|
||||||
|
tSkipListPut(cache->skiplist, (void *)buf);
|
||||||
|
// encode end
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
int indexCacheDel(IndexCache *cache, int32_t fieldId, const char *fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
|
||||||
|
|
||||||
}
|
}
|
||||||
int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result) {
|
int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result) {
|
||||||
|
|
Loading…
Reference in New Issue