Merge pull request #10567 from taosdata/feature/index_oper
add update index
This commit is contained in:
commit
ef3d95064d
|
@ -97,7 +97,9 @@ typedef struct Iterate Iterate;
|
||||||
|
|
||||||
typedef struct IterateValue {
|
typedef struct IterateValue {
|
||||||
int8_t type; // opera type, ADD_VALUE/DELETE_VALUE
|
int8_t type; // opera type, ADD_VALUE/DELETE_VALUE
|
||||||
|
uint64_t ver; // data ver, tfile data version is 0
|
||||||
char* colVal;
|
char* colVal;
|
||||||
|
|
||||||
SArray* val;
|
SArray* val;
|
||||||
} IterateValue;
|
} IterateValue;
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#define __INDEX_CACHE_H__
|
#define __INDEX_CACHE_H__
|
||||||
|
|
||||||
#include "indexInt.h"
|
#include "indexInt.h"
|
||||||
|
#include "index_util.h"
|
||||||
#include "tskiplist.h"
|
#include "tskiplist.h"
|
||||||
|
|
||||||
// ----------------- key structure in skiplist ---------------------
|
// ----------------- key structure in skiplist ---------------------
|
||||||
|
@ -54,6 +55,7 @@ typedef struct CacheTerm {
|
||||||
// value
|
// value
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
int8_t colType;
|
int8_t colType;
|
||||||
|
|
||||||
SIndexOperOnColumn operaType;
|
SIndexOperOnColumn operaType;
|
||||||
} CacheTerm;
|
} CacheTerm;
|
||||||
//
|
//
|
||||||
|
@ -68,7 +70,7 @@ void indexCacheIteratorDestroy(Iterate* iiter);
|
||||||
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid);
|
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid);
|
||||||
|
|
||||||
// int indexCacheGet(void *cache, uint64_t *rst);
|
// int indexCacheGet(void *cache, uint64_t *rst);
|
||||||
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s);
|
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* tr, STermValueType* s);
|
||||||
|
|
||||||
void indexCacheRef(IndexCache* cache);
|
void indexCacheRef(IndexCache* cache);
|
||||||
void indexCacheUnRef(IndexCache* cache);
|
void indexCacheUnRef(IndexCache* cache);
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "index_fst.h"
|
#include "index_fst.h"
|
||||||
#include "index_fst_counting_writer.h"
|
#include "index_fst_counting_writer.h"
|
||||||
#include "index_tfile.h"
|
#include "index_tfile.h"
|
||||||
|
#include "index_util.h"
|
||||||
#include "tlockfree.h"
|
#include "tlockfree.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
@ -103,7 +104,7 @@ TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
|
||||||
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName);
|
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName);
|
||||||
TFileReader* tfileReaderCreate(WriterCtx* ctx);
|
TFileReader* tfileReaderCreate(WriterCtx* ctx);
|
||||||
void tfileReaderDestroy(TFileReader* reader);
|
void tfileReaderDestroy(TFileReader* reader);
|
||||||
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result);
|
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr);
|
||||||
void tfileReaderRef(TFileReader* reader);
|
void tfileReaderRef(TFileReader* reader);
|
||||||
void tfileReaderUnRef(TFileReader* reader);
|
void tfileReaderUnRef(TFileReader* reader);
|
||||||
|
|
||||||
|
@ -118,7 +119,7 @@ int tfileWriterFinish(TFileWriter* tw);
|
||||||
IndexTFile* indexTFileCreate(const char* path);
|
IndexTFile* indexTFileCreate(const char* path);
|
||||||
void indexTFileDestroy(IndexTFile* tfile);
|
void indexTFileDestroy(IndexTFile* tfile);
|
||||||
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid);
|
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid);
|
||||||
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result);
|
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* tr);
|
||||||
|
|
||||||
Iterate* tfileIteratorCreate(TFileReader* reader);
|
Iterate* tfileIteratorCreate(TFileReader* reader);
|
||||||
void tfileIteratorDestroy(Iterate* iterator);
|
void tfileIteratorDestroy(Iterate* iterator);
|
||||||
|
|
|
@ -47,6 +47,19 @@ extern "C" {
|
||||||
buf += len; \
|
buf += len; \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \
|
||||||
|
{ \
|
||||||
|
bool f = false; \
|
||||||
|
for (int i = 0; i < taosArrayGetSize(src); i++) { \
|
||||||
|
if (*(uint64_t *)taosArrayGet(src, i) == tgt) { \
|
||||||
|
f = true; \
|
||||||
|
} \
|
||||||
|
} \
|
||||||
|
if (f == false) { \
|
||||||
|
taosArrayPush(dst, &tgt); \
|
||||||
|
} \
|
||||||
|
}
|
||||||
|
|
||||||
/* multi sorted result intersection
|
/* multi sorted result intersection
|
||||||
* input: [1, 2, 4, 5]
|
* input: [1, 2, 4, 5]
|
||||||
* [2, 3, 4, 5]
|
* [2, 3, 4, 5]
|
||||||
|
@ -66,10 +79,32 @@ void iUnion(SArray *interResults, SArray *finalResult);
|
||||||
/* sorted array
|
/* sorted array
|
||||||
* total: [1, 2, 4, 5, 7, 8]
|
* total: [1, 2, 4, 5, 7, 8]
|
||||||
* except: [4, 5]
|
* except: [4, 5]
|
||||||
* return: [1, 2, 7, 8]
|
* return: [1, 2, 7, 8] saved in total
|
||||||
*/
|
*/
|
||||||
|
|
||||||
void iExcept(SArray *total, SArray *except);
|
void iExcept(SArray *total, SArray *except);
|
||||||
|
|
||||||
|
int uidCompare(const void *a, const void *b);
|
||||||
|
|
||||||
|
// data with ver
|
||||||
|
typedef struct {
|
||||||
|
uint32_t ver;
|
||||||
|
uint64_t data;
|
||||||
|
} SIdxVerdata;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SArray *total;
|
||||||
|
SArray *added;
|
||||||
|
SArray *deled;
|
||||||
|
} SIdxTempResult;
|
||||||
|
|
||||||
|
SIdxTempResult *sIdxTempResultCreate();
|
||||||
|
|
||||||
|
void sIdxTempResultClear(SIdxTempResult *tr);
|
||||||
|
|
||||||
|
void sIdxTempResultDestroy(SIdxTempResult *tr);
|
||||||
|
|
||||||
|
void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -31,18 +31,6 @@
|
||||||
|
|
||||||
void* indexQhandle = NULL;
|
void* indexQhandle = NULL;
|
||||||
|
|
||||||
#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \
|
|
||||||
{ \
|
|
||||||
bool f = false; \
|
|
||||||
for (int i = 0; i < taosArrayGetSize(src); i++) { \
|
|
||||||
if (*(uint64_t*)taosArrayGet(src, i) == tgt) { \
|
|
||||||
f = true; \
|
|
||||||
} \
|
|
||||||
} \
|
|
||||||
if (f == false) { \
|
|
||||||
taosArrayPush(dst, &tgt); \
|
|
||||||
} \
|
|
||||||
}
|
|
||||||
void indexInit() {
|
void indexInit() {
|
||||||
// refactor later
|
// refactor later
|
||||||
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
|
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
|
||||||
|
@ -52,23 +40,11 @@ void indexCleanUp() {
|
||||||
taosCleanUpScheduler(indexQhandle);
|
taosCleanUpScheduler(indexQhandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int uidCompare(const void* a, const void* b) {
|
|
||||||
// add more version compare
|
|
||||||
uint64_t u1 = *(uint64_t*)a;
|
|
||||||
uint64_t u2 = *(uint64_t*)b;
|
|
||||||
return u1 - u2;
|
|
||||||
}
|
|
||||||
typedef struct SIdxColInfo {
|
typedef struct SIdxColInfo {
|
||||||
int colId; // generated by index internal
|
int colId; // generated by index internal
|
||||||
int cVersion;
|
int cVersion;
|
||||||
} SIdxColInfo;
|
} SIdxColInfo;
|
||||||
|
|
||||||
typedef struct SIdxTempResult {
|
|
||||||
SArray* total;
|
|
||||||
SArray* added;
|
|
||||||
SArray* deled;
|
|
||||||
} SIdxTempResult;
|
|
||||||
|
|
||||||
static pthread_once_t isInit = PTHREAD_ONCE_INIT;
|
static pthread_once_t isInit = PTHREAD_ONCE_INIT;
|
||||||
// static void indexInit();
|
// static void indexInit();
|
||||||
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
|
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
|
||||||
|
@ -255,6 +231,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
|
||||||
|
|
||||||
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) {
|
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) {
|
||||||
#ifdef USE_INVERTED_INDEX
|
#ifdef USE_INVERTED_INDEX
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -363,22 +340,30 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
|
||||||
*result = taosArrayInit(4, sizeof(uint64_t));
|
*result = taosArrayInit(4, sizeof(uint64_t));
|
||||||
// TODO: iterator mem and tidex
|
// TODO: iterator mem and tidex
|
||||||
STermValueType s = kTypeValue;
|
STermValueType s = kTypeValue;
|
||||||
if (0 == indexCacheSearch(cache, query, *result, &s)) {
|
|
||||||
|
SIdxTempResult* tr = sIdxTempResultCreate();
|
||||||
|
if (0 == indexCacheSearch(cache, query, tr, &s)) {
|
||||||
if (s == kTypeDeletion) {
|
if (s == kTypeDeletion) {
|
||||||
indexInfo("col: %s already drop by", term->colName);
|
indexInfo("col: %s already drop by", term->colName);
|
||||||
// coloum already drop by other oper, no need to query tindex
|
// coloum already drop by other oper, no need to query tindex
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
if (0 != indexTFileSearch(sIdx->tindex, query, *result)) {
|
if (0 != indexTFileSearch(sIdx->tindex, query, tr)) {
|
||||||
indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
|
indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
|
||||||
return -1;
|
goto END;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
|
indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
|
||||||
return -1;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sIdxTempResultMergeTo(*result, tr);
|
||||||
|
sIdxTempResultDestroy(tr);
|
||||||
return 0;
|
return 0;
|
||||||
|
END:
|
||||||
|
sIdxTempResultDestroy(tr);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
static void indexInterResultsDestroy(SArray* results) {
|
static void indexInterResultsDestroy(SArray* results) {
|
||||||
if (results == NULL) {
|
if (results == NULL) {
|
||||||
|
@ -413,43 +398,6 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SIdxTempResult* sIdxTempResultCreate() {
|
|
||||||
SIdxTempResult* tr = calloc(1, sizeof(SIdxTempResult));
|
|
||||||
tr->total = taosArrayInit(4, sizeof(uint64_t));
|
|
||||||
tr->added = taosArrayInit(4, sizeof(uint64_t));
|
|
||||||
tr->deled = taosArrayInit(4, sizeof(uint64_t));
|
|
||||||
return tr;
|
|
||||||
}
|
|
||||||
void sIdxTempResultClear(SIdxTempResult* tr) {
|
|
||||||
if (tr == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
taosArrayClear(tr->total);
|
|
||||||
taosArrayClear(tr->added);
|
|
||||||
taosArrayClear(tr->deled);
|
|
||||||
}
|
|
||||||
void sIdxTempResultDestroy(SIdxTempResult* tr) {
|
|
||||||
if (tr == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
taosArrayDestroy(tr->total);
|
|
||||||
taosArrayDestroy(tr->added);
|
|
||||||
taosArrayDestroy(tr->deled);
|
|
||||||
}
|
|
||||||
static void sIdxTempResultMergeTo(SArray* result, SIdxTempResult* tr) {
|
|
||||||
taosArraySort(tr->total, uidCompare);
|
|
||||||
taosArraySort(tr->added, uidCompare);
|
|
||||||
taosArraySort(tr->deled, uidCompare);
|
|
||||||
|
|
||||||
SArray* arrs = taosArrayInit(2, sizeof(void*));
|
|
||||||
taosArrayPush(arrs, &tr->total);
|
|
||||||
taosArrayPush(arrs, &tr->added);
|
|
||||||
|
|
||||||
iUnion(arrs, result);
|
|
||||||
taosArrayDestroy(arrs);
|
|
||||||
|
|
||||||
iExcept(result, tr->deled);
|
|
||||||
}
|
|
||||||
static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) {
|
static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) {
|
||||||
int32_t sz = taosArrayGetSize(result);
|
int32_t sz = taosArrayGetSize(result);
|
||||||
if (sz > 0) {
|
if (sz > 0) {
|
||||||
|
@ -478,6 +426,7 @@ static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateVal
|
||||||
|
|
||||||
if (cv != NULL) {
|
if (cv != NULL) {
|
||||||
uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
|
uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
|
||||||
|
uint32_t ver = cv->ver;
|
||||||
if (cv->type == ADD_VALUE) {
|
if (cv->type == ADD_VALUE) {
|
||||||
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id)
|
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id)
|
||||||
} else if (cv->type == DEL_VALUE) {
|
} else if (cv->type == DEL_VALUE) {
|
||||||
|
|
|
@ -256,7 +256,7 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) {
|
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SIdxTempResult* tr, STermValueType* s) {
|
||||||
if (mem == NULL) {
|
if (mem == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -267,28 +267,23 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA
|
||||||
SSkipListNode* node = tSkipListIterGet(iter);
|
SSkipListNode* node = tSkipListIterGet(iter);
|
||||||
if (node != NULL) {
|
if (node != NULL) {
|
||||||
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||||
// if (c->operaType == ADD_VALUE) {
|
if (qtype == QUERY_TERM) {
|
||||||
//} else if (c->operaType == DEL_VALUE) {
|
if (0 == strcmp(c->colVal, ct->colVal)) {
|
||||||
//}
|
if (c->operaType == ADD_VALUE) {
|
||||||
|
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
|
||||||
if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) {
|
// taosArrayPush(result, &c->uid);
|
||||||
if (strcmp(c->colVal, ct->colVal) == 0) {
|
|
||||||
taosArrayPush(result, &c->uid);
|
|
||||||
*s = kTypeValue;
|
*s = kTypeValue;
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else if (c->operaType == DEL_VALUE) {
|
} else if (c->operaType == DEL_VALUE) {
|
||||||
// table is del, not need
|
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
|
||||||
*s = kTypeDeletion;
|
}
|
||||||
break;
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tSkipListDestroyIter(iter);
|
tSkipListDestroyIter(iter);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
|
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) {
|
||||||
if (cache == NULL) {
|
if (cache == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -416,6 +411,7 @@ static bool indexCacheIteratorNext(Iterate* itera) {
|
||||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||||
|
|
||||||
iv->type = ct->operaType;
|
iv->type = ct->operaType;
|
||||||
|
iv->ver = ct->version;
|
||||||
iv->colVal = tstrdup(ct->colVal);
|
iv->colVal = tstrdup(ct->colVal);
|
||||||
|
|
||||||
taosArrayPush(iv->val, &ct->uid);
|
taosArrayPush(iv->val, &ct->uid);
|
||||||
|
|
|
@ -184,11 +184,12 @@ void tfileReaderDestroy(TFileReader* reader) {
|
||||||
free(reader);
|
free(reader);
|
||||||
}
|
}
|
||||||
|
|
||||||
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) {
|
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) {
|
||||||
SIndexTerm* term = query->term;
|
SIndexTerm* term = query->term;
|
||||||
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
|
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
|
||||||
EIndexQueryType qtype = query->qType;
|
EIndexQueryType qtype = query->qType;
|
||||||
|
|
||||||
|
SArray* result = taosArrayInit(16, sizeof(uint64_t));
|
||||||
int ret = -1;
|
int ret = -1;
|
||||||
// refactor to callback later
|
// refactor to callback later
|
||||||
if (qtype == QUERY_TERM) {
|
if (qtype == QUERY_TERM) {
|
||||||
|
@ -223,6 +224,10 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
|
||||||
// handle later
|
// handle later
|
||||||
}
|
}
|
||||||
tfileReaderUnRef(reader);
|
tfileReaderUnRef(reader);
|
||||||
|
|
||||||
|
taosArrayAddAll(tr->total, result);
|
||||||
|
taosArrayDestroy(result);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,7 +253,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
|
||||||
tfileGenFileFullName(fullname, path, suid, colName, version);
|
tfileGenFileFullName(fullname, path, suid, colName, version);
|
||||||
|
|
||||||
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
|
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
|
||||||
indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size);
|
indexInfo("open read file name:%s, file size: %d", wc->file.buf, wc->file.size);
|
||||||
if (wc == NULL) {
|
if (wc == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -380,7 +385,7 @@ void indexTFileDestroy(IndexTFile* tfile) {
|
||||||
free(tfile);
|
free(tfile);
|
||||||
}
|
}
|
||||||
|
|
||||||
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
|
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result) {
|
||||||
int ret = -1;
|
int ret = -1;
|
||||||
if (tfile == NULL) {
|
if (tfile == NULL) {
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -428,6 +433,7 @@ static bool tfileIteratorNext(Iterate* iiter) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
iv->ver = 0;
|
||||||
iv->type = ADD_VALUE; // value in tfile always ADD_VALUE
|
iv->type = ADD_VALUE; // value in tfile always ADD_VALUE
|
||||||
iv->colVal = colVal;
|
iv->colVal = colVal;
|
||||||
return true;
|
return true;
|
||||||
|
@ -628,7 +634,7 @@ static int tfileReaderLoadFst(TFileReader* reader) {
|
||||||
int64_t ts = taosGetTimestampUs();
|
int64_t ts = taosGetTimestampUs();
|
||||||
int32_t nread = ctx->readFrom(ctx, buf, fstSize, reader->header.fstOffset);
|
int32_t nread = ctx->readFrom(ctx, buf, fstSize, reader->header.fstOffset);
|
||||||
int64_t cost = taosGetTimestampUs() - ts;
|
int64_t cost = taosGetTimestampUs() - ts;
|
||||||
indexInfo("nread = %d, and fst offset=%d, size: %d, filename: %s, size: %d, time cost: %" PRId64 "us", nread,
|
indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %d, time cost: %" PRId64 "us", nread,
|
||||||
reader->header.fstOffset, fstSize, ctx->file.buf, ctx->file.size, cost);
|
reader->header.fstOffset, fstSize, ctx->file.buf, ctx->file.size, cost);
|
||||||
// we assuse fst size less than FST_MAX_SIZE
|
// we assuse fst size less than FST_MAX_SIZE
|
||||||
assert(nread > 0 && nread <= fstSize);
|
assert(nread > 0 && nread <= fstSize);
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
#include "index_util.h"
|
#include "index_util.h"
|
||||||
#include "index.h"
|
#include "index.h"
|
||||||
|
#include "tcompare.h"
|
||||||
|
|
||||||
typedef struct MergeIndex {
|
typedef struct MergeIndex {
|
||||||
int idx;
|
int idx;
|
||||||
|
@ -135,3 +136,60 @@ void iExcept(SArray *total, SArray *except) {
|
||||||
|
|
||||||
taosArrayPopTailBatch(total, tsz - vIdx);
|
taosArrayPopTailBatch(total, tsz - vIdx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int uidCompare(const void *a, const void *b) {
|
||||||
|
// add more version compare
|
||||||
|
uint64_t u1 = *(uint64_t *)a;
|
||||||
|
uint64_t u2 = *(uint64_t *)b;
|
||||||
|
return u1 - u2;
|
||||||
|
}
|
||||||
|
int verdataCompare(const void *a, const void *b) {
|
||||||
|
SIdxVerdata *va = (SIdxVerdata *)a;
|
||||||
|
SIdxVerdata *vb = (SIdxVerdata *)b;
|
||||||
|
|
||||||
|
int32_t cmp = compareUint64Val(&va->data, &vb->data);
|
||||||
|
if (cmp == 0) {
|
||||||
|
cmp = 0 - compareUint32Val(&va->ver, &vb->data);
|
||||||
|
return cmp;
|
||||||
|
}
|
||||||
|
return cmp;
|
||||||
|
}
|
||||||
|
|
||||||
|
SIdxTempResult *sIdxTempResultCreate() {
|
||||||
|
SIdxTempResult *tr = calloc(1, sizeof(SIdxTempResult));
|
||||||
|
|
||||||
|
tr->total = taosArrayInit(4, sizeof(uint64_t));
|
||||||
|
tr->added = taosArrayInit(4, sizeof(uint64_t));
|
||||||
|
tr->deled = taosArrayInit(4, sizeof(uint64_t));
|
||||||
|
return tr;
|
||||||
|
}
|
||||||
|
void sIdxTempResultClear(SIdxTempResult *tr) {
|
||||||
|
if (tr == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
taosArrayClear(tr->total);
|
||||||
|
taosArrayClear(tr->added);
|
||||||
|
taosArrayClear(tr->deled);
|
||||||
|
}
|
||||||
|
void sIdxTempResultDestroy(SIdxTempResult *tr) {
|
||||||
|
if (tr == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
taosArrayDestroy(tr->total);
|
||||||
|
taosArrayDestroy(tr->added);
|
||||||
|
taosArrayDestroy(tr->deled);
|
||||||
|
}
|
||||||
|
void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr) {
|
||||||
|
taosArraySort(tr->total, uidCompare);
|
||||||
|
taosArraySort(tr->added, uidCompare);
|
||||||
|
taosArraySort(tr->deled, uidCompare);
|
||||||
|
|
||||||
|
SArray *arrs = taosArrayInit(2, sizeof(void *));
|
||||||
|
taosArrayPush(arrs, &tr->total);
|
||||||
|
taosArrayPush(arrs, &tr->added);
|
||||||
|
|
||||||
|
iUnion(arrs, result);
|
||||||
|
taosArrayDestroy(arrs);
|
||||||
|
|
||||||
|
iExcept(result, tr->deled);
|
||||||
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include "index_fst_counting_writer.h"
|
#include "index_fst_counting_writer.h"
|
||||||
#include "index_fst_util.h"
|
#include "index_fst_util.h"
|
||||||
#include "index_tfile.h"
|
#include "index_tfile.h"
|
||||||
|
#include "index_util.h"
|
||||||
#include "tskiplist.h"
|
#include "tskiplist.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
@ -393,7 +394,13 @@ class TFileObj {
|
||||||
//
|
//
|
||||||
//
|
//
|
||||||
}
|
}
|
||||||
return tfileReaderSearch(reader_, query, result);
|
SIdxTempResult* tr = sIdxTempResultCreate();
|
||||||
|
|
||||||
|
int ret = tfileReaderSearch(reader_, query, tr);
|
||||||
|
|
||||||
|
sIdxTempResultMergeTo(result, tr);
|
||||||
|
sIdxTempResultDestroy(tr);
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
~TFileObj() {
|
~TFileObj() {
|
||||||
if (writer_) {
|
if (writer_) {
|
||||||
|
@ -507,9 +514,13 @@ class CacheObj {
|
||||||
indexCacheDebug(cache);
|
indexCacheDebug(cache);
|
||||||
}
|
}
|
||||||
int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) {
|
int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) {
|
||||||
int ret = indexCacheSearch(cache, query, result, s);
|
SIdxTempResult* tr = sIdxTempResultCreate();
|
||||||
|
|
||||||
|
int ret = indexCacheSearch(cache, query, tr, s);
|
||||||
|
sIdxTempResultMergeTo(result, tr);
|
||||||
|
sIdxTempResultDestroy(tr);
|
||||||
|
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
//
|
|
||||||
std::cout << "failed to get from cache:" << ret << std::endl;
|
std::cout << "failed to get from cache:" << ret << std::endl;
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -649,7 +660,7 @@ class IndexObj {
|
||||||
indexInit();
|
indexInit();
|
||||||
}
|
}
|
||||||
int Init(const std::string& dir) {
|
int Init(const std::string& dir) {
|
||||||
// taosRemoveDir(dir.c_str());
|
taosRemoveDir(dir.c_str());
|
||||||
taosMkDir(dir.c_str());
|
taosMkDir(dir.c_str());
|
||||||
int ret = indexOpen(&opts, dir.c_str(), &idx);
|
int ret = indexOpen(&opts, dir.c_str(), &idx);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
|
@ -658,6 +669,14 @@ class IndexObj {
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
void Del(const std::string& colName, const std::string& colVal, uint64_t uid) {
|
||||||
|
SIndexTerm* term = indexTermCreate(0, DEL_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
|
indexMultiTermAdd(terms, term);
|
||||||
|
Put(terms, uid);
|
||||||
|
indexMultiTermDestroy(terms);
|
||||||
|
}
|
||||||
int WriteMillonData(const std::string& colName, const std::string& colVal = "Hello world",
|
int WriteMillonData(const std::string& colName, const std::string& colVal = "Hello world",
|
||||||
size_t numOfTable = 100 * 10000) {
|
size_t numOfTable = 100 * 10000) {
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
@ -730,6 +749,7 @@ class IndexObj {
|
||||||
std::cout << "search and time cost:" << e - s << "\tquery col:" << colName << "\t val: " << colVal
|
std::cout << "search and time cost:" << e - s << "\tquery col:" << colName << "\t val: " << colVal
|
||||||
<< "\t size:" << taosArrayGetSize(result) << std::endl;
|
<< "\t size:" << taosArrayGetSize(result) << std::endl;
|
||||||
} else {
|
} else {
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
int sz = taosArrayGetSize(result);
|
int sz = taosArrayGetSize(result);
|
||||||
indexMultiTermQueryDestroy(mq);
|
indexMultiTermQueryDestroy(mq);
|
||||||
|
@ -797,12 +817,8 @@ class IndexObj {
|
||||||
|
|
||||||
class IndexEnv2 : public ::testing::Test {
|
class IndexEnv2 : public ::testing::Test {
|
||||||
protected:
|
protected:
|
||||||
virtual void SetUp() {
|
virtual void SetUp() { index = new IndexObj(); }
|
||||||
index = new IndexObj();
|
virtual void TearDown() { delete index; }
|
||||||
}
|
|
||||||
virtual void TearDown() {
|
|
||||||
delete index;
|
|
||||||
}
|
|
||||||
IndexObj* index;
|
IndexObj* index;
|
||||||
};
|
};
|
||||||
TEST_F(IndexEnv2, testIndexOpen) {
|
TEST_F(IndexEnv2, testIndexOpen) {
|
||||||
|
@ -1042,3 +1058,19 @@ TEST_F(IndexEnv2, testIndex_read_performance4) {
|
||||||
std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
|
std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
|
||||||
assert(3 == index->SearchOne("tag10", "Hello"));
|
assert(3 == index->SearchOne("tag10", "Hello"));
|
||||||
}
|
}
|
||||||
|
TEST_F(IndexEnv2, testIndex_del) {
|
||||||
|
std::string path = "/tmp/cache_and_tfile";
|
||||||
|
if (index->Init(path) != 0) {
|
||||||
|
}
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
index->PutOneTarge("tag10", "Hello", i);
|
||||||
|
}
|
||||||
|
index->Del("tag10", "Hello", 12);
|
||||||
|
index->Del("tag10", "Hello", 11);
|
||||||
|
|
||||||
|
index->WriteMultiMillonData("tag10", "xxxxxxxxxxxxxx", 100 * 10000);
|
||||||
|
|
||||||
|
EXPECT_EQ(98, index->SearchOne("tag10", "Hello"));
|
||||||
|
// std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
|
||||||
|
// assert(3 == index->SearchOne("tag10", "Hello"));
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue