From f5c13c13c6d1a6ddfd3a3ff18ae979cba342bd9a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 1 Mar 2022 22:36:19 +0800 Subject: [PATCH 1/6] support json --- include/libs/index/index.h | 114 +++++++++++++++++++++++++--- source/libs/index/inc/indexInt.h | 57 +++++++++----- source/libs/index/src/index.c | 49 +++++++++--- source/libs/index/src/index_cache.c | 4 +- source/libs/index/src/index_json.c | 44 +++++++++++ source/libs/index/src/index_tfile.c | 13 ++-- source/libs/index/test/fstUT.cc | 10 +-- source/libs/index/test/jsonDemo.cc | 0 8 files changed, 241 insertions(+), 50 deletions(-) create mode 100644 source/libs/index/src/index_json.c create mode 100644 source/libs/index/test/jsonDemo.cc diff --git a/include/libs/index/index.h b/include/libs/index/index.h index 47eb97cc3a..9424eae1b7 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -29,6 +29,12 @@ typedef struct SIndexOpts SIndexOpts; typedef struct SIndexMultiTermQuery SIndexMultiTermQuery; typedef struct SArray SIndexMultiTerm; +typedef struct SIndex SIndexJson; +typedef struct SIndexTerm SIndexJsonTerm; +typedef struct SIndexOpts SIndexJsonOpts; +typedef struct SIndexMultiTermQuery SIndexJsonMultiTermQuery; +typedef struct SArray SIndexJsonMultiTerm; + typedef enum { ADD_VALUE, // add index colume value DEL_VALUE, // delete index column value @@ -39,24 +45,108 @@ typedef enum { } SIndexOperOnColumn; typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType; -typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2, QUERY_REGEX = 3 } EIndexQueryType; +typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2, QUERY_REGEX = 3, QUERY_RANGE = 4 } EIndexQueryType; + /* - * @param: oper - * + * create multi query + * @param oper (input, relation between querys) */ SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType oper); -void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery); -int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType type); + /* - * @param: - * @param: + * destroy multi query + * @param pQuery (input, multi-query-object to be destory) + */ + +void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery); +/* + * add query to multi query + * @param pQuery (input, multi-query-object) + * @param term (input, single query term) + * @param type (input, single query type) + * @return error code + */ +int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType type); +/* + * open index + * @param opt (input, index opt) + * @param path (input, index path) + * @param index (output, index object) + * @return error code + */ +int indexOpen(SIndexOpts* opt, const char* path, SIndex** index); +/* + * close index + * @param index (input, index to be closed) + * @return error code */ -int indexOpen(SIndexOpts* opt, const char* path, SIndex** index); void indexClose(SIndex* index); -int indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid); -int indexDelete(SIndex* index, SIndexMultiTermQuery* query); -int indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result); -int indexRebuild(SIndex* index, SIndexOpts* opt); + +/* + * insert terms into index + * @param index (input, index object) + * @param term (input, terms inserted into index) + * @param uid (input, uid of terms) + * @return error code + */ +int indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid); +/* + * delete terms that meet query condition + * @param index (input, index object) + * @param query (input, condition query to deleted) + * @return error code + */ + +int indexDelete(SIndex* index, SIndexMultiTermQuery* query); +/* + * search index + * @param index (input, index object) + * @param query (input, multi query condition) + * @param result(output, query result) + * @return error code + */ +int indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result); +/* + * rebuild index + * @param index (input, index object) + * @parma opt (input, rebuild index opts) + * @return error code + */ +int indexRebuild(SIndex* index, SIndexOpts* opt); + +/* + * open index + * @param opt (input,index json opt) + * @param path (input, index json path) + * @param index (output, index json object) + * @return error code + */ +int tIndexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index); +/* + * close index + * @param index (input, index to be closed) + * @return error code + */ + +int tIndexJsonClose(SIndexJson* index); + +/* + * insert terms into index + * @param index (input, index object) + * @param term (input, terms inserted into index) + * @param uid (input, uid of terms) + * @return error code + */ +int tIndexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid); +/* + * search index + * @param index (input, index object) + * @param query (input, multi query condition) + * @param result(output, query result) + * @return error code + */ + +int tIndexJsonSearch(SIndexJson* index, SIndexJsonMultiTermQuery* query, SArray* result); /* * @param * @param diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 90ad1e15f4..57d587d297 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -120,29 +120,50 @@ int indexFlushCacheToTFile(SIndex* sIdx, void*); int32_t indexSerialCacheKey(ICacheKey* key, char* buf); -#define indexFatal(...) \ - do { \ - if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \ +#define indexFatal(...) \ + do { \ + if (sDebugFlag & DEBUG_FATAL) { \ + taosPrintLog("index FATAL ", 255, __VA_ARGS__); \ + } \ } while (0) -#define indexError(...) \ - do { \ - if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", 255, __VA_ARGS__); } \ +#define indexError(...) \ + do { \ + if (sDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("index ERROR ", 255, __VA_ARGS__); \ + } \ } while (0) -#define indexWarn(...) \ - do { \ - if (sDebugFlag & DEBUG_WARN) { taosPrintLog("index WARN ", 255, __VA_ARGS__); } \ +#define indexWarn(...) \ + do { \ + if (sDebugFlag & DEBUG_WARN) { \ + taosPrintLog("index WARN ", 255, __VA_ARGS__); \ + } \ } while (0) -#define indexInfo(...) \ - do { \ - if (sDebugFlag & DEBUG_INFO) { taosPrintLog("index ", 255, __VA_ARGS__); } \ +#define indexInfo(...) \ + do { \ + if (sDebugFlag & DEBUG_INFO) { \ + taosPrintLog("index ", 255, __VA_ARGS__); \ + } \ } while (0) -#define indexDebug(...) \ - do { \ - if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \ +#define indexDebug(...) \ + do { \ + if (sDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \ + } \ } while (0) -#define indexTrace(...) \ - do { \ - if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \ +#define indexTrace(...) \ + do { \ + if (sDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \ + } \ + } while (0) + +#define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0) +#define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F) +#define INDEX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \ + do { \ + uint8_t oldTy = ty; \ + ty = (ty >> 4) | exTy; \ + ty = (ty << 4) | oldTy; \ } while (0) #ifdef __cplusplus diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 5147734a85..b518df2ea2 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -2,8 +2,8 @@ * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. + * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free + * Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or @@ -30,6 +30,8 @@ void* indexQhandle = NULL; +static char JSON_COLUMN[] = "JSON"; + void indexInit() { // refactor later indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); @@ -63,6 +65,9 @@ static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv); static void indexMergeSameKey(SArray* result, TFileValue* tv); +static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); +int32_t indexSerialKey(ICacheKey* key, char* buf); + int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { pthread_once(&isInit, indexInit); SIndex* sIdx = calloc(1, sizeof(SIndex)); @@ -147,9 +152,8 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); - char buf[128] = {0}; - ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)}; - int32_t sz = indexSerialCacheKey(&key, buf); + char buf[128] = {0}; + int32_t sz = indexSerialTermKey(p, buf); IndexCache** cache = taosHashGet(index->colObj, buf, sz); if (cache == NULL) { @@ -162,9 +166,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); - char buf[128] = {0}; - ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)}; - int32_t sz = indexSerialCacheKey(&key, buf); + char buf[128] = {0}; + // ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)}; + int32_t sz = indexSerialTermKey(p, buf); IndexCache** cache = taosHashGet(index->colObj, buf, sz); assert(*cache != NULL); @@ -554,7 +558,24 @@ END: return -1; } -int32_t indexSerialCacheKey(ICacheKey* key, char* buf) { +int32_t indexSerialTermKeyOfTag(SIndexTerm* p, char* buf) { + ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)}; + return indexSerialKey(&key, buf); +} +int32_t indexSerilaTermKeyOfJson(SIndexTerm* p, char* buf) { + ICacheKey key = {.suid = p->suid, .colName = JSON_COLUMN, .nColName = strlen(JSON_COLUMN)}; + return indexSerialKey(&key, buf); +} + +int32_t indexSerialTermKey(SIndexTerm* itm, char* buf) { + bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(itm->colType, TSDB_DATA_TYPE_JSON); + if (hasJson) { + return indexSerilaTermKeyOfJson(itm, buf); + } else { + return indexSerialTermKeyOfTag(itm, buf); + } +} +int32_t indexSerialKey(ICacheKey* key, char* buf) { char* p = buf; SERIALIZE_MEM_TO_BUF(buf, key, suid); SERIALIZE_VAR_TO_BUF(buf, '_', char); @@ -563,3 +584,13 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf) { SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); return buf - p; } + +// int32_t indexSerialCacheKey(ICacheKey* key, char* buf) { +// char* p = buf; +// SERIALIZE_MEM_TO_BUF(buf, key, suid); +// SERIALIZE_VAR_TO_BUF(buf, '_', char); +// // SERIALIZE_MEM_TO_BUF(buf, key, colType); +// // SERIALIZE_VAR_TO_BUF(buf, '_', char); +// SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); +// return buf - p; +//} diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index d6a7141825..bb29a78cb1 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -24,6 +24,8 @@ #define MEM_THRESHOLD 1024 * 1024 #define MEM_ESTIMATE_RADIO 1.5 +static char JSON_COLUMN[] = "JSON"; + static void indexMemRef(MemTable* tbl); static void indexMemUnRef(MemTable* tbl); @@ -45,7 +47,7 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in return NULL; }; cache->mem = indexInternalCacheCreate(type); - cache->colName = tstrdup(colName); + cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName); cache->type = type; cache->index = idx; cache->version = 0; diff --git a/source/libs/index/src/index_json.c b/source/libs/index/src/index_json.c new file mode 100644 index 0000000000..f4d28b45b0 --- /dev/null +++ b/source/libs/index/src/index_json.c @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "indeInt.h" +#include "index.h" + +int tIndexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) { + // handle + return tIndexOpen(opts, path, index); +} +// k +int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) { + for (int i = 0; i < taosArrayGetSize(terms); i++) { + SIndexJsonTerm *p = taosArrayGetP(terms, i); + INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON); + } + return indexPut(index, terms, uid); + // handle put +} + +int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *query, SArray *result) { + for (int i = 0; i < taosArrayGetSize(terms); i++) { + SIndexJsonTerm *p = taosArrayGetP(terms, i); + INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON); + } + return indexSearch(index, query, result); + // handle search +} + +int tIndexJsonClose(SIndexJson *index) { + return tIndexClose(index); + // handle close +} diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index e44f8fc1c3..a05884b790 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -205,7 +205,11 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul } else if (qtype == QUERY_PREFIX) { // handle later // - } else { + } else if (qtype == QUERY_SUFFIX) { + // handle later + } else if (qtype == QUERY_REGEX) { + // handle later + } else if (qtype == QUERY_RANGE) { // handle later } tfileReaderUnRef(reader); @@ -586,11 +590,10 @@ static int tfileReaderLoadHeader(TFileReader* reader) { int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0); if (nread == -1) { - indexError("actual Read: %d, to read: %d, errno: %d, filename: %s", (int)(nread), (int)sizeof(buf), - errno, reader->ctx->file.buf); + indexError("actual Read: %d, to read: %d, errno: %d, filename: %s", (int)(nread), (int)sizeof(buf), errno, + reader->ctx->file.buf); } else { - indexInfo("actual Read: %d, to read: %d, filename: %s", (int)(nread), (int)sizeof(buf), - reader->ctx->file.buf); + indexInfo("actual Read: %d, to read: %d, filename: %s", (int)(nread), (int)sizeof(buf), reader->ctx->file.buf); } // assert(nread == sizeof(buf)); memcpy(&reader->header, buf, sizeof(buf)); diff --git a/source/libs/index/test/fstUT.cc b/source/libs/index/test/fstUT.cc index d59a3428da..b34fd71e9c 100644 --- a/source/libs/index/test/fstUT.cc +++ b/source/libs/index/test/fstUT.cc @@ -216,21 +216,21 @@ class FstEnv : public ::testing::Test { TEST_F(FstEnv, writeNormal) { fst->CreateWriter(); - std::string str("aa"); + std::string str("11"); for (int i = 0; i < 10; i++) { - str[0] = 'a' + i; + str[0] = '1' + i; str.resize(2); assert(fst->Put(str, i) == true); } // order failed - assert(fst->Put("aa", 1) == false); + assert(fst->Put("11", 1) == false); fst->DestroyWriter(); fst->CreateReader(); uint64_t val; - assert(fst->Get("a", &val) == false); - assert(fst->Get("aa", &val) == true); + assert(fst->Get("1", &val) == false); + assert(fst->Get("11", &val) == true); assert(val == 0); std::vector rlt; diff --git a/source/libs/index/test/jsonDemo.cc b/source/libs/index/test/jsonDemo.cc new file mode 100644 index 0000000000..e69de29bb2 From d7c3415ae0a5806f3a8710dc6fb13f4816197c25 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 2 Mar 2022 09:24:17 +0800 Subject: [PATCH 2/6] support json --- source/libs/index/src/index_json.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/index/src/index_json.c b/source/libs/index/src/index_json.c index f4d28b45b0..9ffab3fad1 100644 --- a/source/libs/index/src/index_json.c +++ b/source/libs/index/src/index_json.c @@ -12,8 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "indeInt.h" #include "index.h" +#include "indexInt.h" int tIndexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) { // handle From 4be79789ca9d044df45cd465b85fad63146b647e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 2 Mar 2022 15:05:16 +0800 Subject: [PATCH 3/6] support json --- include/libs/index/index.h | 4 +-- source/libs/index/inc/indexInt.h | 2 ++ source/libs/index/src/index.c | 55 ++++++++++------------------- source/libs/index/src/index_cache.c | 46 +++++++++++++++++++----- source/libs/index/src/index_json.c | 12 +++---- 5 files changed, 65 insertions(+), 54 deletions(-) diff --git a/include/libs/index/index.h b/include/libs/index/index.h index 9424eae1b7..453b49e4c6 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -125,10 +125,10 @@ int tIndexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index); /* * close index * @param index (input, index to be closed) - * @return error code + * @return void */ -int tIndexJsonClose(SIndexJson* index); +void tIndexJsonClose(SIndexJson* index); /* * insert terms into index diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 57d587d297..928e8b6102 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -119,6 +119,8 @@ typedef struct TFileCacheKey { int indexFlushCacheToTFile(SIndex* sIdx, void*); int32_t indexSerialCacheKey(ICacheKey* key, char* buf); +// int32_t indexSerialKey(ICacheKey* key, char* buf); +// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); #define indexFatal(...) \ do { \ diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index b518df2ea2..168e819073 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -65,8 +65,8 @@ static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv); static void indexMergeSameKey(SArray* result, TFileValue* tv); -static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); -int32_t indexSerialKey(ICacheKey* key, char* buf); +// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); +// int32_t indexSerialKey(ICacheKey* key, char* buf); int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { pthread_once(&isInit, indexInit); @@ -152,8 +152,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); - char buf[128] = {0}; - int32_t sz = indexSerialTermKey(p, buf); + char buf[128] = {0}; + ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType}; + int32_t sz = indexSerialCacheKey(&key, buf); IndexCache** cache = taosHashGet(index->colObj, buf, sz); if (cache == NULL) { @@ -166,9 +167,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); - char buf[128] = {0}; - // ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)}; - int32_t sz = indexSerialTermKey(p, buf); + char buf[128] = {0}; + ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType}; + int32_t sz = indexSerialCacheKey(&key, buf); IndexCache** cache = taosHashGet(index->colObj, buf, sz); assert(*cache != NULL); @@ -334,8 +335,9 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result IndexCache* cache = NULL; char buf[128] = {0}; - ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)}; - int32_t sz = indexSerialCacheKey(&key, buf); + ICacheKey key = { + .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType}; + int32_t sz = indexSerialCacheKey(&key, buf); pthread_mutex_lock(&sIdx->mtx); IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz); @@ -558,39 +560,18 @@ END: return -1; } -int32_t indexSerialTermKeyOfTag(SIndexTerm* p, char* buf) { - ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)}; - return indexSerialKey(&key, buf); -} -int32_t indexSerilaTermKeyOfJson(SIndexTerm* p, char* buf) { - ICacheKey key = {.suid = p->suid, .colName = JSON_COLUMN, .nColName = strlen(JSON_COLUMN)}; - return indexSerialKey(&key, buf); -} +int32_t indexSerialCacheKey(ICacheKey* key, char* buf) { + bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON); -int32_t indexSerialTermKey(SIndexTerm* itm, char* buf) { - bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(itm->colType, TSDB_DATA_TYPE_JSON); - if (hasJson) { - return indexSerilaTermKeyOfJson(itm, buf); - } else { - return indexSerialTermKeyOfTag(itm, buf); - } -} -int32_t indexSerialKey(ICacheKey* key, char* buf) { char* p = buf; SERIALIZE_MEM_TO_BUF(buf, key, suid); SERIALIZE_VAR_TO_BUF(buf, '_', char); // SERIALIZE_MEM_TO_BUF(buf, key, colType); // SERIALIZE_VAR_TO_BUF(buf, '_', char); - SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); + if (hasJson) { + SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN)); + } else { + SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); + } return buf - p; } - -// int32_t indexSerialCacheKey(ICacheKey* key, char* buf) { -// char* p = buf; -// SERIALIZE_MEM_TO_BUF(buf, key, suid); -// SERIALIZE_VAR_TO_BUF(buf, '_', char); -// // SERIALIZE_MEM_TO_BUF(buf, key, colType); -// // SERIALIZE_VAR_TO_BUF(buf, '_', char); -// SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); -// return buf - p; -//} diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index bb29a78cb1..a1f074feae 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -25,6 +25,7 @@ #define MEM_ESTIMATE_RADIO 1.5 static char JSON_COLUMN[] = "JSON"; +static char JSON_VALUE_DELIM = '&'; static void indexMemRef(MemTable* tbl); static void indexMemUnRef(MemTable* tbl); @@ -46,6 +47,7 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in indexError("failed to create index cache"); return NULL; }; + cache->mem = indexInternalCacheCreate(type); cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName); cache->type = type; @@ -209,11 +211,38 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { } } } +static char* indexCachePackJsonData(SIndexTerm* itm) { + /* + * |<-----colname---->|<-----dataType---->|<--------colVal---------->| + * |<-----string----->|<-----uint8_t----->|<----depend on dataType-->| + */ + uint8_t ty = INDEX_TYPE_GET_TYPE(itm->colType); + int32_t sz = itm->nColName + itm->nColVal + sizeof(uint8_t) + sizeof(JSON_VALUE_DELIM) * 2 + 1; + char* buf = (char*)calloc(1, sz); + char* p = buf; + + memcpy(p, itm->colVal, itm->nColName); + p += itm->nColName; + + memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM)); + p += sizeof(JSON_VALUE_DELIM); + + memcpy(p, &ty, sizeof(ty)); + p += sizeof(ty); + + memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM)); + p += sizeof(JSON_VALUE_DELIM); + + memcpy(p, itm->colVal, itm->nColVal); + + return buf; +} int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { if (cache == NULL) { return -1; } + bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); IndexCache* pCache = cache; indexCacheRef(pCache); @@ -224,8 +253,12 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { } // set up key ct->colType = term->colType; - ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1)); - memcpy(ct->colVal, term->colVal, term->nColVal); + if (hasJson) { + ct->colVal = indexCachePackJsonData(term); + } else { + ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1)); + memcpy(ct->colVal, term->colVal, term->nColVal); + } ct->version = atomic_add_fetch_32(&pCache->version, 1); // set value ct->uid = uid; @@ -369,6 +402,8 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) { } static MemTable* indexInternalCacheCreate(int8_t type) { + type = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type; + MemTable* tbl = calloc(1, sizeof(MemTable)); indexMemRef(tbl); if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { @@ -391,9 +426,6 @@ static bool indexCacheIteratorNext(Iterate* itera) { IterateValue* iv = &itera->val; iterateValueDestroy(iv, false); - // IterateValue* iv = &itera->val; - // IterateValue tIterVal = {.colVal = NULL, .val = taosArrayInit(1, sizeof(uint64_t))}; - bool next = tSkipListIterNext(iter); if (next) { SSkipListNode* node = tSkipListIterGet(iter); @@ -413,10 +445,6 @@ static bool indexCacheIteratorNext(Iterate* itera) { taosArrayPush(iv->val, &ct->uid); } - // IterateValue* iv = &itera->val; - // iterateValueDestroy(iv, true); - //*iv = tIterVal; - return next; } diff --git a/source/libs/index/src/index_json.c b/source/libs/index/src/index_json.c index 9ffab3fad1..de88ff3c8a 100644 --- a/source/libs/index/src/index_json.c +++ b/source/libs/index/src/index_json.c @@ -17,9 +17,8 @@ int tIndexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) { // handle - return tIndexOpen(opts, path, index); + return indexOpen(opts, path, index); } -// k int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) { for (int i = 0; i < taosArrayGetSize(terms); i++) { SIndexJsonTerm *p = taosArrayGetP(terms, i); @@ -29,16 +28,17 @@ int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) { // handle put } -int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *query, SArray *result) { +int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) { + SArray *terms = tq->query; for (int i = 0; i < taosArrayGetSize(terms); i++) { SIndexJsonTerm *p = taosArrayGetP(terms, i); INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON); } - return indexSearch(index, query, result); + return indexSearch(index, tq, result); // handle search } -int tIndexJsonClose(SIndexJson *index) { - return tIndexClose(index); +void tIndexJsonClose(SIndexJson *index) { + return indexClose(index); // handle close } From 6a6f31c4a75dd30d916b7b31512f7620b6af1d02 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 2 Mar 2022 17:09:22 +0800 Subject: [PATCH 4/6] support json --- source/libs/index/src/index_cache.c | 13 +++- source/libs/index/test/CMakeLists.txt | 18 ++++++ source/libs/index/test/jsonUT.cc | 92 +++++++++++++++++++++++++++ 3 files changed, 121 insertions(+), 2 deletions(-) create mode 100644 source/libs/index/test/jsonUT.cc diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index a1f074feae..2742830d3b 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -222,7 +222,7 @@ static char* indexCachePackJsonData(SIndexTerm* itm) { char* buf = (char*)calloc(1, sz); char* p = buf; - memcpy(p, itm->colVal, itm->nColName); + memcpy(p, itm->colName, itm->nColName); p += itm->nColName; memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM)); @@ -329,13 +329,22 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV SIndexTerm* term = query->term; EIndexQueryType qtype = query->qType; - CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)}; + + bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); + char* p = term->colVal; + if (hasJson) { + p = indexCachePackJsonData(term); + } + CacheTerm ct = {.colVal = p, .version = atomic_load_32(&pCache->version)}; int ret = indexQueryMem(mem, &ct, qtype, result, s); if (ret == 0 && *s != kTypeDeletion) { // continue search in imm ret = indexQueryMem(imm, &ct, qtype, result, s); } + if (hasJson) { + tfree(p); + } indexMemUnRef(mem); indexMemUnRef(imm); diff --git a/source/libs/index/test/CMakeLists.txt b/source/libs/index/test/CMakeLists.txt index bed42be3e5..1ebf85368b 100644 --- a/source/libs/index/test/CMakeLists.txt +++ b/source/libs/index/test/CMakeLists.txt @@ -2,6 +2,7 @@ add_executable(indexTest "") add_executable(fstTest "") add_executable(fstUT "") add_executable(UtilUT "") +add_executable(jsonUT "") target_sources(indexTest PRIVATE @@ -21,6 +22,10 @@ target_sources(UtilUT "utilUT.cc" ) +target_sources(jsonUT + PRIVATE + "jsonUT.cc" +) target_include_directories ( indexTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/index" @@ -43,6 +48,12 @@ target_include_directories ( UtilUT "${CMAKE_SOURCE_DIR}/include/libs/index" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) + +target_include_directories (jsonUT + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/index" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries (indexTest os util @@ -73,6 +84,13 @@ target_link_libraries (UtilUT index ) +target_link_libraries (jsonUT + os + util + common + gtest_main + index +) #add_test( # NAME index_test diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc new file mode 100644 index 0000000000..20b59add9f --- /dev/null +++ b/source/libs/index/test/jsonUT.cc @@ -0,0 +1,92 @@ +#include +#include +#include +#include +#include +#include +#include "index.h" +#include "indexInt.h" +#include "index_cache.h" +#include "index_fst.h" +#include "index_fst_counting_writer.h" +#include "index_fst_util.h" +#include "index_tfile.h" +#include "index_util.h" +#include "tglobal.h" +#include "tskiplist.h" +#include "tutil.h" + +static std::string dir = "/tmp/json"; +class JsonEnv : public ::testing::Test { + protected: + virtual void SetUp() { + taosRemoveDir(dir.c_str()); + opts = indexOptsCreate(); + int ret = tIndexJsonOpen(opts, dir.c_str(), &index); + assert(ret == 0); + } + virtual void TearDown() { + tIndexJsonClose(index); + indexOptsDestroy(opts); + } + SIndexJsonOpts* opts; + SIndexJson* index; +}; + +TEST_F(JsonEnv, testWrite) { + { + std::string colName("voltage"); + std::string colVal("ab"); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 100; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } + { + std::string colName("voltage"); + std::string colVal("ab1"); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 100; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } + { + std::string colName("voltage"); + std::string colVal("123"); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 100; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } + { + std::string colName("voltage"); + std::string colVal("ab"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_TERM); + tIndexJsonSearch(index, mq, result); + assert(100 == taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } + + // SIndexTermQuery query = {.term = term, .qType = QUERY_TERM}; +} From 7ae87c10103c801d06092ea3ddea00b9e9b671bd Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 2 Mar 2022 22:06:02 +0800 Subject: [PATCH 5/6] add UT --- source/libs/index/inc/index_comm.h | 32 +++++++++++++++++++ source/libs/index/src/index_cache.c | 35 ++------------------ source/libs/index/src/index_comm.c | 48 ++++++++++++++++++++++++++++ source/libs/index/src/index_tfile.c | 13 +++++++- source/libs/index/test/fstUT.cc | 16 ++++++++++ source/libs/index/test/jsonDemo.cc | 0 source/libs/index/test/jsonUT.cc | 33 +++++++++++++++++-- source/libs/transport/src/transSrv.c | 20 ++++++------ 8 files changed, 153 insertions(+), 44 deletions(-) create mode 100644 source/libs/index/inc/index_comm.h create mode 100644 source/libs/index/src/index_comm.c delete mode 100644 source/libs/index/test/jsonDemo.cc diff --git a/source/libs/index/inc/index_comm.h b/source/libs/index/inc/index_comm.h new file mode 100644 index 0000000000..0d8418ba65 --- /dev/null +++ b/source/libs/index/inc/index_comm.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_INDEX_COMM_H_ +#define _TD_INDEX_COMM_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +extern char JSON_COLUMN[]; +extern char JSON_VALUE_DELIM; + +char* indexPackJsonData(SIndexTerm* itm); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 2742830d3b..599bac3fe6 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -14,6 +14,7 @@ */ #include "index_cache.h" +#include "index_comm.h" #include "index_util.h" #include "tcompare.h" #include "tsched.h" @@ -24,9 +25,6 @@ #define MEM_THRESHOLD 1024 * 1024 #define MEM_ESTIMATE_RADIO 1.5 -static char JSON_COLUMN[] = "JSON"; -static char JSON_VALUE_DELIM = '&'; - static void indexMemRef(MemTable* tbl); static void indexMemUnRef(MemTable* tbl); @@ -211,33 +209,6 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { } } } -static char* indexCachePackJsonData(SIndexTerm* itm) { - /* - * |<-----colname---->|<-----dataType---->|<--------colVal---------->| - * |<-----string----->|<-----uint8_t----->|<----depend on dataType-->| - */ - uint8_t ty = INDEX_TYPE_GET_TYPE(itm->colType); - - int32_t sz = itm->nColName + itm->nColVal + sizeof(uint8_t) + sizeof(JSON_VALUE_DELIM) * 2 + 1; - char* buf = (char*)calloc(1, sz); - char* p = buf; - - memcpy(p, itm->colName, itm->nColName); - p += itm->nColName; - - memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM)); - p += sizeof(JSON_VALUE_DELIM); - - memcpy(p, &ty, sizeof(ty)); - p += sizeof(ty); - - memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM)); - p += sizeof(JSON_VALUE_DELIM); - - memcpy(p, itm->colVal, itm->nColVal); - - return buf; -} int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { if (cache == NULL) { return -1; @@ -254,7 +225,7 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { // set up key ct->colType = term->colType; if (hasJson) { - ct->colVal = indexCachePackJsonData(term); + ct->colVal = indexPackJsonData(term); } else { ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1)); memcpy(ct->colVal, term->colVal, term->nColVal); @@ -333,7 +304,7 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); char* p = term->colVal; if (hasJson) { - p = indexCachePackJsonData(term); + p = indexPackJsonData(term); } CacheTerm ct = {.colVal = p, .version = atomic_load_32(&pCache->version)}; diff --git a/source/libs/index/src/index_comm.c b/source/libs/index/src/index_comm.c new file mode 100644 index 0000000000..4f3cbaa4da --- /dev/null +++ b/source/libs/index/src/index_comm.c @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "index.h" +#include "indexInt.h" + +char JSON_COLUMN[] = "JSON"; +char JSON_VALUE_DELIM = '&'; + +char* indexPackJsonData(SIndexTerm* itm) { + /* + * |<-----colname---->|<-----dataType---->|<--------colVal---------->| + * |<-----string----->|<-----uint8_t----->|<----depend on dataType-->| + */ + uint8_t ty = INDEX_TYPE_GET_TYPE(itm->colType); + + int32_t sz = itm->nColName + itm->nColVal + sizeof(uint8_t) + sizeof(JSON_VALUE_DELIM) * 2 + 1; + char* buf = (char*)calloc(1, sz); + char* p = buf; + + memcpy(p, itm->colName, itm->nColName); + p += itm->nColName; + + memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM)); + p += sizeof(JSON_VALUE_DELIM); + + memcpy(p, &ty, sizeof(ty)); + p += sizeof(ty); + + memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM)); + p += sizeof(JSON_VALUE_DELIM); + + memcpy(p, itm->colVal, itm->nColVal); + + return buf; +} diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index a05884b790..a2089e8eee 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -15,6 +15,7 @@ p * #include "index_tfile.h" #include "index.h" +#include "index_comm.h" #include "index_fst.h" #include "index_fst_counting_writer.h" #include "index_util.h" @@ -186,13 +187,20 @@ void tfileReaderDestroy(TFileReader* reader) { int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) { SIndexTerm* term = query->term; + bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); EIndexQueryType qtype = query->qType; int ret = -1; // refactor to callback later if (qtype == QUERY_TERM) { uint64_t offset; - FstSlice key = fstSliceCreate(term->colVal, term->nColVal); + char* p = term->colVal; + uint64_t sz = term->nColVal; + if (hasJson) { + p = indexPackJsonData(term); + sz = strlen(p); + } + FstSlice key = fstSliceCreate(p, sz); if (fstGet(reader->fst, &key, &offset)) { indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, term->colVal); @@ -202,6 +210,9 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul term->colVal); } fstSliceDestroy(&key); + if (hasJson) { + free(p); + } } else if (qtype == QUERY_PREFIX) { // handle later // diff --git a/source/libs/index/test/fstUT.cc b/source/libs/index/test/fstUT.cc index b34fd71e9c..3bc3b6da6a 100644 --- a/source/libs/index/test/fstUT.cc +++ b/source/libs/index/test/fstUT.cc @@ -238,3 +238,19 @@ TEST_F(FstEnv, writeNormal) { assert(fst->Search(ctx, rlt) == true); } TEST_F(FstEnv, WriteMillonrRecord) {} +TEST_F(FstEnv, writeAbNormal) { + fst->CreateWriter(); + std::string str1("voltage&\b&ab"); + std::string str2("voltbge&\b&ab"); + + fst->Put(str1, 1); + fst->Put(str2, 2); + + fst->DestroyWriter(); + + fst->CreateReader(); + uint64_t val; + assert(fst->Get("1", &val) == false); + assert(fst->Get("voltage&\b&ab", &val) == true); + assert(val == 1); +} diff --git a/source/libs/index/test/jsonDemo.cc b/source/libs/index/test/jsonDemo.cc deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index 20b59add9f..e184bc49ae 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -21,6 +21,8 @@ class JsonEnv : public ::testing::Test { protected: virtual void SetUp() { taosRemoveDir(dir.c_str()); + taosMkDir(dir.c_str()); + opts = indexOptsCreate(); int ret = tIndexJsonOpen(opts, dir.c_str(), &index); assert(ret == 0); @@ -87,6 +89,33 @@ TEST_F(JsonEnv, testWrite) { assert(100 == taosArrayGetSize(result)); indexMultiTermQueryDestroy(mq); } - - // SIndexTermQuery query = {.term = term, .qType = QUERY_TERM}; +} +TEST_F(JsonEnv, testWriteMillonData) { + { + std::string colName("voltagefdadfa"); + std::string colVal("abxxxxxxxxxxxx"); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 1000000; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } + { + std::string colName("voltage"); + std::string colVal("ab"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_TERM); + tIndexJsonSearch(index, mq, result); + assert(100 == taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index f0db054797..c7b6ca2a2c 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -286,15 +286,17 @@ void uvOnWriteCb(uv_write_t* req, int status) { transClearBuffer(&conn->readBuf); if (status == 0) { tTrace("server conn %p data already was written on stream", conn); - assert(taosArrayGetSize(conn->srvMsgs) >= 1); - SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0); - taosArrayRemove(conn->srvMsgs, 0); - destroySmsg(msg); + if (conn->srvMsgs != NULL) { + assert(taosArrayGetSize(conn->srvMsgs) >= 1); + SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0); + taosArrayRemove(conn->srvMsgs, 0); + destroySmsg(msg); - // send second data, just use for push - if (taosArrayGetSize(conn->srvMsgs) > 0) { - msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0); - uvStartSendRespInternal(msg); + // send second data, just use for push + if (taosArrayGetSize(conn->srvMsgs) > 0) { + msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0); + uvStartSendRespInternal(msg); + } } } else { tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); @@ -615,7 +617,7 @@ static void destroyConn(SSrvConn* conn, bool clear) { SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i); destroySmsg(msg); } - taosArrayDestroy(conn->srvMsgs); + conn->srvMsgs = taosArrayDestroy(conn->srvMsgs); QUEUE_REMOVE(&conn->queue); if (clear) { From aec7bef25b05afb69696533bafe00e7b53210c0c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 3 Mar 2022 12:29:54 +0800 Subject: [PATCH 6/6] add UT --- source/libs/index/src/index_tfile.c | 3 +++ source/libs/index/test/fstTest.cc | 23 +++++++++++++---------- source/libs/index/test/jsonUT.cc | 22 ++++++++++++++++++---- 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index a2089e8eee..0947c796b2 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -275,6 +275,7 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { __compar_fn_t fn; int8_t colType = tw->header.colType; + colType = INDEX_TYPE_GET_TYPE(colType); if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) { fn = tfileStrCompare; } else { @@ -572,6 +573,8 @@ static int tfileWriteHeader(TFileWriter* writer) { static int tfileWriteData(TFileWriter* write, TFileValue* tval) { TFileHeader* header = &write->header; uint8_t colType = header->colType; + + colType = INDEX_TYPE_GET_TYPE(colType); if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) { FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal)); if (fstBuilderInsert(write->fb, key, tval->offset)) { diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index 65118a2bce..74dcec4490 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -312,15 +312,18 @@ void validateTFile(char* arg) { tfCleanup(); } -void iterTFileReader(char* path, char* ver) { +void iterTFileReader(char* path, char* uid, char* colName, char* ver) { tfInit(); - int version = atoi(ver); - TFileReader* reader = tfileReaderOpen(path, 0, version, "tag1"); - Iterate* iter = tfileIteratorCreate(reader); - bool tn = iter ? iter->next(iter) : false; - int count = 0; - int termCount = 0; + uint64_t suid = atoi(uid); + int version = atoi(ver); + + TFileReader* reader = tfileReaderOpen(path, suid, version, colName); + + Iterate* iter = tfileIteratorCreate(reader); + bool tn = iter ? iter->next(iter) : false; + int count = 0; + int termCount = 0; while (tn == true) { count++; IterateValue* cv = iter->getValue(iter); @@ -337,9 +340,9 @@ void iterTFileReader(char* path, char* ver) { int main(int argc, char* argv[]) { // tool to check all kind of fst test // if (argc > 1) { validateTFile(argv[1]); } - if (argc > 2) { - // opt - iterTFileReader(argv[1], argv[2]); + if (argc > 4) { + // path suid colName ver + iterTFileReader(argv[1], argv[2], argv[3], argv[4]); } // checkFstCheckIterator(); // checkFstLongTerm(); diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index e184bc49ae..e5c79d137f 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -22,7 +22,7 @@ class JsonEnv : public ::testing::Test { virtual void SetUp() { taosRemoveDir(dir.c_str()); taosMkDir(dir.c_str()); - + printf("set up\n"); opts = indexOptsCreate(); int ret = tIndexJsonOpen(opts, dir.c_str(), &index); assert(ret == 0); @@ -30,6 +30,7 @@ class JsonEnv : public ::testing::Test { virtual void TearDown() { tIndexJsonClose(index); indexOptsDestroy(opts); + printf("destory\n"); } SIndexJsonOpts* opts; SIndexJson* index; @@ -37,7 +38,7 @@ class JsonEnv : public ::testing::Test { TEST_F(JsonEnv, testWrite) { { - std::string colName("voltage"); + std::string colName("test"); std::string colVal("ab"); SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); @@ -76,7 +77,7 @@ TEST_F(JsonEnv, testWrite) { indexMultiTermDestroy(terms); } { - std::string colName("voltage"); + std::string colName("test"); std::string colVal("ab"); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); @@ -91,6 +92,19 @@ TEST_F(JsonEnv, testWrite) { } } TEST_F(JsonEnv, testWriteMillonData) { + { + std::string colName("test"); + std::string colVal("ab"); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 100; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } { std::string colName("voltagefdadfa"); std::string colVal("abxxxxxxxxxxxx"); @@ -105,7 +119,7 @@ TEST_F(JsonEnv, testWriteMillonData) { indexMultiTermDestroy(terms); } { - std::string colName("voltage"); + std::string colName("test"); std::string colVal("ab"); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);