diff --git a/include/libs/index/index.h b/include/libs/index/index.h index 9424eae1b7..453b49e4c6 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -125,10 +125,10 @@ int tIndexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index); /* * close index * @param index (input, index to be closed) - * @return error code + * @return void */ -int tIndexJsonClose(SIndexJson* index); +void tIndexJsonClose(SIndexJson* index); /* * insert terms into index diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 57d587d297..928e8b6102 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -119,6 +119,8 @@ typedef struct TFileCacheKey { int indexFlushCacheToTFile(SIndex* sIdx, void*); int32_t indexSerialCacheKey(ICacheKey* key, char* buf); +// int32_t indexSerialKey(ICacheKey* key, char* buf); +// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); #define indexFatal(...) \ do { \ diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index b518df2ea2..168e819073 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -65,8 +65,8 @@ static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv); static void indexMergeSameKey(SArray* result, TFileValue* tv); -static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); -int32_t indexSerialKey(ICacheKey* key, char* buf); +// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); +// int32_t indexSerialKey(ICacheKey* key, char* buf); int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { pthread_once(&isInit, indexInit); @@ -152,8 +152,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); - char buf[128] = {0}; - int32_t sz = indexSerialTermKey(p, buf); + char buf[128] = {0}; + ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType}; + int32_t sz = indexSerialCacheKey(&key, buf); IndexCache** cache = taosHashGet(index->colObj, buf, sz); if (cache == NULL) { @@ -166,9 +167,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); - char buf[128] = {0}; - // ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)}; - int32_t sz = indexSerialTermKey(p, buf); + char buf[128] = {0}; + ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType}; + int32_t sz = indexSerialCacheKey(&key, buf); IndexCache** cache = taosHashGet(index->colObj, buf, sz); assert(*cache != NULL); @@ -334,8 +335,9 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result IndexCache* cache = NULL; char buf[128] = {0}; - ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)}; - int32_t sz = indexSerialCacheKey(&key, buf); + ICacheKey key = { + .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType}; + int32_t sz = indexSerialCacheKey(&key, buf); pthread_mutex_lock(&sIdx->mtx); IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz); @@ -558,39 +560,18 @@ END: return -1; } -int32_t indexSerialTermKeyOfTag(SIndexTerm* p, char* buf) { - ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)}; - return indexSerialKey(&key, buf); -} -int32_t indexSerilaTermKeyOfJson(SIndexTerm* p, char* buf) { - ICacheKey key = {.suid = p->suid, .colName = JSON_COLUMN, .nColName = strlen(JSON_COLUMN)}; - return indexSerialKey(&key, buf); -} +int32_t indexSerialCacheKey(ICacheKey* key, char* buf) { + bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON); -int32_t indexSerialTermKey(SIndexTerm* itm, char* buf) { - bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(itm->colType, TSDB_DATA_TYPE_JSON); - if (hasJson) { - return indexSerilaTermKeyOfJson(itm, buf); - } else { - return indexSerialTermKeyOfTag(itm, buf); - } -} -int32_t indexSerialKey(ICacheKey* key, char* buf) { char* p = buf; SERIALIZE_MEM_TO_BUF(buf, key, suid); SERIALIZE_VAR_TO_BUF(buf, '_', char); // SERIALIZE_MEM_TO_BUF(buf, key, colType); // SERIALIZE_VAR_TO_BUF(buf, '_', char); - SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); + if (hasJson) { + SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN)); + } else { + SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); + } return buf - p; } - -// int32_t indexSerialCacheKey(ICacheKey* key, char* buf) { -// char* p = buf; -// SERIALIZE_MEM_TO_BUF(buf, key, suid); -// SERIALIZE_VAR_TO_BUF(buf, '_', char); -// // SERIALIZE_MEM_TO_BUF(buf, key, colType); -// // SERIALIZE_VAR_TO_BUF(buf, '_', char); -// SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); -// return buf - p; -//} diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index bb29a78cb1..a1f074feae 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -25,6 +25,7 @@ #define MEM_ESTIMATE_RADIO 1.5 static char JSON_COLUMN[] = "JSON"; +static char JSON_VALUE_DELIM = '&'; static void indexMemRef(MemTable* tbl); static void indexMemUnRef(MemTable* tbl); @@ -46,6 +47,7 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in indexError("failed to create index cache"); return NULL; }; + cache->mem = indexInternalCacheCreate(type); cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName); cache->type = type; @@ -209,11 +211,38 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { } } } +static char* indexCachePackJsonData(SIndexTerm* itm) { + /* + * |<-----colname---->|<-----dataType---->|<--------colVal---------->| + * |<-----string----->|<-----uint8_t----->|<----depend on dataType-->| + */ + uint8_t ty = INDEX_TYPE_GET_TYPE(itm->colType); + int32_t sz = itm->nColName + itm->nColVal + sizeof(uint8_t) + sizeof(JSON_VALUE_DELIM) * 2 + 1; + char* buf = (char*)calloc(1, sz); + char* p = buf; + + memcpy(p, itm->colVal, itm->nColName); + p += itm->nColName; + + memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM)); + p += sizeof(JSON_VALUE_DELIM); + + memcpy(p, &ty, sizeof(ty)); + p += sizeof(ty); + + memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM)); + p += sizeof(JSON_VALUE_DELIM); + + memcpy(p, itm->colVal, itm->nColVal); + + return buf; +} int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { if (cache == NULL) { return -1; } + bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); IndexCache* pCache = cache; indexCacheRef(pCache); @@ -224,8 +253,12 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { } // set up key ct->colType = term->colType; - ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1)); - memcpy(ct->colVal, term->colVal, term->nColVal); + if (hasJson) { + ct->colVal = indexCachePackJsonData(term); + } else { + ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1)); + memcpy(ct->colVal, term->colVal, term->nColVal); + } ct->version = atomic_add_fetch_32(&pCache->version, 1); // set value ct->uid = uid; @@ -369,6 +402,8 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) { } static MemTable* indexInternalCacheCreate(int8_t type) { + type = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type; + MemTable* tbl = calloc(1, sizeof(MemTable)); indexMemRef(tbl); if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { @@ -391,9 +426,6 @@ static bool indexCacheIteratorNext(Iterate* itera) { IterateValue* iv = &itera->val; iterateValueDestroy(iv, false); - // IterateValue* iv = &itera->val; - // IterateValue tIterVal = {.colVal = NULL, .val = taosArrayInit(1, sizeof(uint64_t))}; - bool next = tSkipListIterNext(iter); if (next) { SSkipListNode* node = tSkipListIterGet(iter); @@ -413,10 +445,6 @@ static bool indexCacheIteratorNext(Iterate* itera) { taosArrayPush(iv->val, &ct->uid); } - // IterateValue* iv = &itera->val; - // iterateValueDestroy(iv, true); - //*iv = tIterVal; - return next; } diff --git a/source/libs/index/src/index_json.c b/source/libs/index/src/index_json.c index 9ffab3fad1..de88ff3c8a 100644 --- a/source/libs/index/src/index_json.c +++ b/source/libs/index/src/index_json.c @@ -17,9 +17,8 @@ int tIndexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) { // handle - return tIndexOpen(opts, path, index); + return indexOpen(opts, path, index); } -// k int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) { for (int i = 0; i < taosArrayGetSize(terms); i++) { SIndexJsonTerm *p = taosArrayGetP(terms, i); @@ -29,16 +28,17 @@ int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) { // handle put } -int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *query, SArray *result) { +int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) { + SArray *terms = tq->query; for (int i = 0; i < taosArrayGetSize(terms); i++) { SIndexJsonTerm *p = taosArrayGetP(terms, i); INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON); } - return indexSearch(index, query, result); + return indexSearch(index, tq, result); // handle search } -int tIndexJsonClose(SIndexJson *index) { - return tIndexClose(index); +void tIndexJsonClose(SIndexJson *index) { + return indexClose(index); // handle close }