support json
This commit is contained in:
parent
d7c3415ae0
commit
4be79789ca
|
@ -125,10 +125,10 @@ int tIndexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index);
|
||||||
/*
|
/*
|
||||||
* close index
|
* close index
|
||||||
* @param index (input, index to be closed)
|
* @param index (input, index to be closed)
|
||||||
* @return error code
|
* @return void
|
||||||
*/
|
*/
|
||||||
|
|
||||||
int tIndexJsonClose(SIndexJson* index);
|
void tIndexJsonClose(SIndexJson* index);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* insert terms into index
|
* insert terms into index
|
||||||
|
|
|
@ -119,6 +119,8 @@ typedef struct TFileCacheKey {
|
||||||
int indexFlushCacheToTFile(SIndex* sIdx, void*);
|
int indexFlushCacheToTFile(SIndex* sIdx, void*);
|
||||||
|
|
||||||
int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
|
int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
|
||||||
|
// int32_t indexSerialKey(ICacheKey* key, char* buf);
|
||||||
|
// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
|
||||||
|
|
||||||
#define indexFatal(...) \
|
#define indexFatal(...) \
|
||||||
do { \
|
do { \
|
||||||
|
|
|
@ -65,8 +65,8 @@ static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
|
||||||
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv);
|
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv);
|
||||||
static void indexMergeSameKey(SArray* result, TFileValue* tv);
|
static void indexMergeSameKey(SArray* result, TFileValue* tv);
|
||||||
|
|
||||||
static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
|
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
|
||||||
int32_t indexSerialKey(ICacheKey* key, char* buf);
|
// int32_t indexSerialKey(ICacheKey* key, char* buf);
|
||||||
|
|
||||||
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
||||||
pthread_once(&isInit, indexInit);
|
pthread_once(&isInit, indexInit);
|
||||||
|
@ -152,8 +152,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
||||||
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
SIndexTerm* p = taosArrayGetP(fVals, i);
|
||||||
|
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
int32_t sz = indexSerialTermKey(p, buf);
|
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
|
||||||
|
int32_t sz = indexSerialCacheKey(&key, buf);
|
||||||
|
|
||||||
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
|
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
|
||||||
if (cache == NULL) {
|
if (cache == NULL) {
|
||||||
|
@ -166,9 +167,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
||||||
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
SIndexTerm* p = taosArrayGetP(fVals, i);
|
||||||
|
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
// ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)};
|
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
|
||||||
int32_t sz = indexSerialTermKey(p, buf);
|
int32_t sz = indexSerialCacheKey(&key, buf);
|
||||||
|
|
||||||
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
|
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
|
||||||
assert(*cache != NULL);
|
assert(*cache != NULL);
|
||||||
|
@ -334,8 +335,9 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
|
||||||
IndexCache* cache = NULL;
|
IndexCache* cache = NULL;
|
||||||
|
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)};
|
ICacheKey key = {
|
||||||
int32_t sz = indexSerialCacheKey(&key, buf);
|
.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
|
||||||
|
int32_t sz = indexSerialCacheKey(&key, buf);
|
||||||
|
|
||||||
pthread_mutex_lock(&sIdx->mtx);
|
pthread_mutex_lock(&sIdx->mtx);
|
||||||
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
|
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
|
||||||
|
@ -558,39 +560,18 @@ END:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t indexSerialTermKeyOfTag(SIndexTerm* p, char* buf) {
|
int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
|
||||||
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)};
|
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
|
||||||
return indexSerialKey(&key, buf);
|
|
||||||
}
|
|
||||||
int32_t indexSerilaTermKeyOfJson(SIndexTerm* p, char* buf) {
|
|
||||||
ICacheKey key = {.suid = p->suid, .colName = JSON_COLUMN, .nColName = strlen(JSON_COLUMN)};
|
|
||||||
return indexSerialKey(&key, buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t indexSerialTermKey(SIndexTerm* itm, char* buf) {
|
|
||||||
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(itm->colType, TSDB_DATA_TYPE_JSON);
|
|
||||||
if (hasJson) {
|
|
||||||
return indexSerilaTermKeyOfJson(itm, buf);
|
|
||||||
} else {
|
|
||||||
return indexSerialTermKeyOfTag(itm, buf);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
int32_t indexSerialKey(ICacheKey* key, char* buf) {
|
|
||||||
char* p = buf;
|
char* p = buf;
|
||||||
SERIALIZE_MEM_TO_BUF(buf, key, suid);
|
SERIALIZE_MEM_TO_BUF(buf, key, suid);
|
||||||
SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
||||||
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
|
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
|
||||||
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
||||||
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
|
if (hasJson) {
|
||||||
|
SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN));
|
||||||
|
} else {
|
||||||
|
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
|
||||||
|
}
|
||||||
return buf - p;
|
return buf - p;
|
||||||
}
|
}
|
||||||
|
|
||||||
// int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
|
|
||||||
// char* p = buf;
|
|
||||||
// SERIALIZE_MEM_TO_BUF(buf, key, suid);
|
|
||||||
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
|
||||||
// // SERIALIZE_MEM_TO_BUF(buf, key, colType);
|
|
||||||
// // SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
|
||||||
// SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
|
|
||||||
// return buf - p;
|
|
||||||
//}
|
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#define MEM_ESTIMATE_RADIO 1.5
|
#define MEM_ESTIMATE_RADIO 1.5
|
||||||
|
|
||||||
static char JSON_COLUMN[] = "JSON";
|
static char JSON_COLUMN[] = "JSON";
|
||||||
|
static char JSON_VALUE_DELIM = '&';
|
||||||
|
|
||||||
static void indexMemRef(MemTable* tbl);
|
static void indexMemRef(MemTable* tbl);
|
||||||
static void indexMemUnRef(MemTable* tbl);
|
static void indexMemUnRef(MemTable* tbl);
|
||||||
|
@ -46,6 +47,7 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
|
||||||
indexError("failed to create index cache");
|
indexError("failed to create index cache");
|
||||||
return NULL;
|
return NULL;
|
||||||
};
|
};
|
||||||
|
|
||||||
cache->mem = indexInternalCacheCreate(type);
|
cache->mem = indexInternalCacheCreate(type);
|
||||||
cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName);
|
cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName);
|
||||||
cache->type = type;
|
cache->type = type;
|
||||||
|
@ -209,11 +211,38 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
static char* indexCachePackJsonData(SIndexTerm* itm) {
|
||||||
|
/*
|
||||||
|
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
|
||||||
|
* |<-----string----->|<-----uint8_t----->|<----depend on dataType-->|
|
||||||
|
*/
|
||||||
|
uint8_t ty = INDEX_TYPE_GET_TYPE(itm->colType);
|
||||||
|
|
||||||
|
int32_t sz = itm->nColName + itm->nColVal + sizeof(uint8_t) + sizeof(JSON_VALUE_DELIM) * 2 + 1;
|
||||||
|
char* buf = (char*)calloc(1, sz);
|
||||||
|
char* p = buf;
|
||||||
|
|
||||||
|
memcpy(p, itm->colVal, itm->nColName);
|
||||||
|
p += itm->nColName;
|
||||||
|
|
||||||
|
memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM));
|
||||||
|
p += sizeof(JSON_VALUE_DELIM);
|
||||||
|
|
||||||
|
memcpy(p, &ty, sizeof(ty));
|
||||||
|
p += sizeof(ty);
|
||||||
|
|
||||||
|
memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM));
|
||||||
|
p += sizeof(JSON_VALUE_DELIM);
|
||||||
|
|
||||||
|
memcpy(p, itm->colVal, itm->nColVal);
|
||||||
|
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
||||||
if (cache == NULL) {
|
if (cache == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
|
||||||
|
|
||||||
IndexCache* pCache = cache;
|
IndexCache* pCache = cache;
|
||||||
indexCacheRef(pCache);
|
indexCacheRef(pCache);
|
||||||
|
@ -224,8 +253,12 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
||||||
}
|
}
|
||||||
// set up key
|
// set up key
|
||||||
ct->colType = term->colType;
|
ct->colType = term->colType;
|
||||||
ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1));
|
if (hasJson) {
|
||||||
memcpy(ct->colVal, term->colVal, term->nColVal);
|
ct->colVal = indexCachePackJsonData(term);
|
||||||
|
} else {
|
||||||
|
ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1));
|
||||||
|
memcpy(ct->colVal, term->colVal, term->nColVal);
|
||||||
|
}
|
||||||
ct->version = atomic_add_fetch_32(&pCache->version, 1);
|
ct->version = atomic_add_fetch_32(&pCache->version, 1);
|
||||||
// set value
|
// set value
|
||||||
ct->uid = uid;
|
ct->uid = uid;
|
||||||
|
@ -369,6 +402,8 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static MemTable* indexInternalCacheCreate(int8_t type) {
|
static MemTable* indexInternalCacheCreate(int8_t type) {
|
||||||
|
type = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type;
|
||||||
|
|
||||||
MemTable* tbl = calloc(1, sizeof(MemTable));
|
MemTable* tbl = calloc(1, sizeof(MemTable));
|
||||||
indexMemRef(tbl);
|
indexMemRef(tbl);
|
||||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
@ -391,9 +426,6 @@ static bool indexCacheIteratorNext(Iterate* itera) {
|
||||||
IterateValue* iv = &itera->val;
|
IterateValue* iv = &itera->val;
|
||||||
iterateValueDestroy(iv, false);
|
iterateValueDestroy(iv, false);
|
||||||
|
|
||||||
// IterateValue* iv = &itera->val;
|
|
||||||
// IterateValue tIterVal = {.colVal = NULL, .val = taosArrayInit(1, sizeof(uint64_t))};
|
|
||||||
|
|
||||||
bool next = tSkipListIterNext(iter);
|
bool next = tSkipListIterNext(iter);
|
||||||
if (next) {
|
if (next) {
|
||||||
SSkipListNode* node = tSkipListIterGet(iter);
|
SSkipListNode* node = tSkipListIterGet(iter);
|
||||||
|
@ -413,10 +445,6 @@ static bool indexCacheIteratorNext(Iterate* itera) {
|
||||||
|
|
||||||
taosArrayPush(iv->val, &ct->uid);
|
taosArrayPush(iv->val, &ct->uid);
|
||||||
}
|
}
|
||||||
// IterateValue* iv = &itera->val;
|
|
||||||
// iterateValueDestroy(iv, true);
|
|
||||||
//*iv = tIterVal;
|
|
||||||
|
|
||||||
return next;
|
return next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,9 +17,8 @@
|
||||||
|
|
||||||
int tIndexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) {
|
int tIndexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) {
|
||||||
// handle
|
// handle
|
||||||
return tIndexOpen(opts, path, index);
|
return indexOpen(opts, path, index);
|
||||||
}
|
}
|
||||||
// k
|
|
||||||
int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
|
int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
|
||||||
for (int i = 0; i < taosArrayGetSize(terms); i++) {
|
for (int i = 0; i < taosArrayGetSize(terms); i++) {
|
||||||
SIndexJsonTerm *p = taosArrayGetP(terms, i);
|
SIndexJsonTerm *p = taosArrayGetP(terms, i);
|
||||||
|
@ -29,16 +28,17 @@ int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
|
||||||
// handle put
|
// handle put
|
||||||
}
|
}
|
||||||
|
|
||||||
int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *query, SArray *result) {
|
int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) {
|
||||||
|
SArray *terms = tq->query;
|
||||||
for (int i = 0; i < taosArrayGetSize(terms); i++) {
|
for (int i = 0; i < taosArrayGetSize(terms); i++) {
|
||||||
SIndexJsonTerm *p = taosArrayGetP(terms, i);
|
SIndexJsonTerm *p = taosArrayGetP(terms, i);
|
||||||
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
|
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
|
||||||
}
|
}
|
||||||
return indexSearch(index, query, result);
|
return indexSearch(index, tq, result);
|
||||||
// handle search
|
// handle search
|
||||||
}
|
}
|
||||||
|
|
||||||
int tIndexJsonClose(SIndexJson *index) {
|
void tIndexJsonClose(SIndexJson *index) {
|
||||||
return tIndexClose(index);
|
return indexClose(index);
|
||||||
// handle close
|
// handle close
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue