531 lines
16 KiB
C
531 lines
16 KiB
C
/*
|
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
*
|
|
* This program is free software: you can use, redistribute, and/or modify
|
|
* it under the terms of the GNU Affero General Public License, version 3
|
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "indexCache.h"
|
|
#include "indexComm.h"
|
|
#include "indexUtil.h"
|
|
#include "tcompare.h"
|
|
#include "tsched.h"
|
|
|
|
#define MAX_INDEX_KEY_LEN 256 // test only, change later
|
|
|
|
#define MEM_TERM_LIMIT 10 * 10000
|
|
#define MEM_THRESHOLD 1024 * 1024
|
|
#define MEM_ESTIMATE_RADIO 1.5
|
|
|
|
static void indexMemRef(MemTable* tbl);
|
|
static void indexMemUnRef(MemTable* tbl);
|
|
|
|
static void indexCacheTermDestroy(CacheTerm* ct);
|
|
static int32_t indexCacheTermCompare(const void* l, const void* r);
|
|
static char* indexCacheTermGet(const void* pData);
|
|
|
|
static MemTable* indexInternalCacheCreate(int8_t type);
|
|
|
|
static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
|
static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
|
static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
|
static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
|
static int32_t cacheSearchLessThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
|
static int32_t cacheSearchLessEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
|
static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
|
static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
|
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
|
/*comm func of compare, used in (LE/LT/GE/GT compare)*/
|
|
static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s,
|
|
RangeType type);
|
|
|
|
typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
|
|
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
|
|
|
|
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { return MATCH; }
|
|
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { return MATCH; }
|
|
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { return MATCH; }
|
|
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { return MATCH; }
|
|
|
|
static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual,
|
|
tCompareGreaterThan, tCompareGreaterEqual};
|
|
|
|
static int32_t (*cacheSearch[])(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) = {
|
|
cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan,
|
|
cacheSearchLessEqual, cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange};
|
|
|
|
static void doMergeWork(SSchedMsg* msg);
|
|
static bool indexCacheIteratorNext(Iterate* itera);
|
|
|
|
static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
|
if (cache == NULL) {
|
|
return 0;
|
|
}
|
|
|
|
MemTable* mem = cache;
|
|
char* key = indexCacheTermGet(ct);
|
|
|
|
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
|
while (tSkipListIterNext(iter)) {
|
|
SSkipListNode* node = tSkipListIterGet(iter);
|
|
if (node == NULL) {
|
|
break;
|
|
}
|
|
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
|
if (0 == strcmp(c->colVal, ct->colVal)) {
|
|
if (c->operaType == ADD_VALUE) {
|
|
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
|
|
// taosArrayPush(result, &c->uid);
|
|
*s = kTypeValue;
|
|
} else if (c->operaType == DEL_VALUE) {
|
|
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
|
|
}
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
tSkipListDestroyIter(iter);
|
|
return 0;
|
|
}
|
|
static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
|
// impl later
|
|
return 0;
|
|
}
|
|
static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
|
// impl later
|
|
return 0;
|
|
}
|
|
static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
|
// impl later
|
|
return 0;
|
|
}
|
|
static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s,
|
|
RangeType type) {
|
|
if (cache == NULL) {
|
|
return 0;
|
|
}
|
|
_cache_range_compare cmpFn = rangeCompare[type];
|
|
|
|
MemTable* mem = cache;
|
|
char* key = indexCacheTermGet(ct);
|
|
|
|
SSkipListIterator* iter = tSkipListCreateIter(mem->mem);
|
|
while (tSkipListIterNext(iter)) {
|
|
SSkipListNode* node = tSkipListIterGet(iter);
|
|
if (node == NULL) {
|
|
break;
|
|
}
|
|
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
|
TExeCond cond = cmpFn(c->colVal, ct->colVal, ct->colType);
|
|
if (cond == MATCH) {
|
|
if (c->operaType == ADD_VALUE) {
|
|
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
|
|
// taosArrayPush(result, &c->uid);
|
|
*s = kTypeValue;
|
|
} else if (c->operaType == DEL_VALUE) {
|
|
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
|
|
}
|
|
} else if (cond == CONTINUE) {
|
|
} else if (cond == BREAK) {
|
|
break;
|
|
}
|
|
}
|
|
tSkipListDestroyIter(iter);
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
static int32_t cacheSearchLessThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
|
return cacheSearchCompareFunc(cache, ct, tr, s, LT);
|
|
}
|
|
static int32_t cacheSearchLessEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
|
return cacheSearchCompareFunc(cache, ct, tr, s, LE);
|
|
}
|
|
static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
|
return cacheSearchCompareFunc(cache, ct, tr, s, GT);
|
|
}
|
|
static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
|
return cacheSearchCompareFunc(cache, ct, tr, s, GE);
|
|
}
|
|
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
|
// impl later
|
|
return 0;
|
|
}
|
|
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
|
|
|
|
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
|
|
IndexCache* cache = taosMemoryCalloc(1, sizeof(IndexCache));
|
|
if (cache == NULL) {
|
|
indexError("failed to create index cache");
|
|
return NULL;
|
|
};
|
|
|
|
cache->mem = indexInternalCacheCreate(type);
|
|
cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName);
|
|
cache->type = type;
|
|
cache->index = idx;
|
|
cache->version = 0;
|
|
cache->suid = suid;
|
|
cache->occupiedMem = 0;
|
|
|
|
taosThreadMutexInit(&cache->mtx, NULL);
|
|
taosThreadCondInit(&cache->finished, NULL);
|
|
|
|
indexCacheRef(cache);
|
|
return cache;
|
|
}
|
|
void indexCacheDebug(IndexCache* cache) {
|
|
MemTable* tbl = NULL;
|
|
|
|
taosThreadMutexLock(&cache->mtx);
|
|
tbl = cache->mem;
|
|
indexMemRef(tbl);
|
|
taosThreadMutexUnlock(&cache->mtx);
|
|
|
|
{
|
|
SSkipList* slt = tbl->mem;
|
|
SSkipListIterator* iter = tSkipListCreateIter(slt);
|
|
while (tSkipListIterNext(iter)) {
|
|
SSkipListNode* node = tSkipListIterGet(iter);
|
|
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
|
if (ct != NULL) {
|
|
// TODO, add more debug info
|
|
indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
|
|
}
|
|
}
|
|
tSkipListDestroyIter(iter);
|
|
|
|
indexMemUnRef(tbl);
|
|
}
|
|
|
|
{
|
|
taosThreadMutexLock(&cache->mtx);
|
|
tbl = cache->imm;
|
|
indexMemRef(tbl);
|
|
taosThreadMutexUnlock(&cache->mtx);
|
|
if (tbl != NULL) {
|
|
SSkipList* slt = tbl->mem;
|
|
SSkipListIterator* iter = tSkipListCreateIter(slt);
|
|
while (tSkipListIterNext(iter)) {
|
|
SSkipListNode* node = tSkipListIterGet(iter);
|
|
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
|
if (ct != NULL) {
|
|
// TODO, add more debug info
|
|
indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
|
|
}
|
|
}
|
|
tSkipListDestroyIter(iter);
|
|
}
|
|
|
|
indexMemUnRef(tbl);
|
|
}
|
|
}
|
|
|
|
void indexCacheDestroySkiplist(SSkipList* slt) {
|
|
SSkipListIterator* iter = tSkipListCreateIter(slt);
|
|
while (tSkipListIterNext(iter)) {
|
|
SSkipListNode* node = tSkipListIterGet(iter);
|
|
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
|
if (ct != NULL) {
|
|
taosMemoryFree(ct->colVal);
|
|
taosMemoryFree(ct);
|
|
}
|
|
}
|
|
tSkipListDestroyIter(iter);
|
|
tSkipListDestroy(slt);
|
|
}
|
|
void indexCacheDestroyImm(IndexCache* cache) {
|
|
if (cache == NULL) {
|
|
return;
|
|
}
|
|
|
|
MemTable* tbl = NULL;
|
|
taosThreadMutexLock(&cache->mtx);
|
|
|
|
tbl = cache->imm;
|
|
cache->imm = NULL; // or throw int bg thread
|
|
taosThreadCondBroadcast(&cache->finished);
|
|
|
|
taosThreadMutexUnlock(&cache->mtx);
|
|
|
|
indexMemUnRef(tbl);
|
|
indexMemUnRef(tbl);
|
|
}
|
|
void indexCacheDestroy(void* cache) {
|
|
IndexCache* pCache = cache;
|
|
if (pCache == NULL) {
|
|
return;
|
|
}
|
|
indexMemUnRef(pCache->mem);
|
|
indexMemUnRef(pCache->imm);
|
|
taosMemoryFree(pCache->colName);
|
|
|
|
taosThreadMutexDestroy(&pCache->mtx);
|
|
taosThreadCondDestroy(&pCache->finished);
|
|
|
|
taosMemoryFree(pCache);
|
|
}
|
|
|
|
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
|
|
Iterate* iiter = taosMemoryCalloc(1, sizeof(Iterate));
|
|
if (iiter == NULL) {
|
|
return NULL;
|
|
}
|
|
|
|
taosThreadMutexLock(&cache->mtx);
|
|
|
|
indexMemRef(cache->imm);
|
|
|
|
MemTable* tbl = cache->imm;
|
|
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
|
|
iiter->val.colVal = NULL;
|
|
iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
|
|
iiter->next = indexCacheIteratorNext;
|
|
iiter->getValue = indexCacheIteratorGetValue;
|
|
|
|
taosThreadMutexUnlock(&cache->mtx);
|
|
|
|
return iiter;
|
|
}
|
|
void indexCacheIteratorDestroy(Iterate* iter) {
|
|
if (iter == NULL) {
|
|
return;
|
|
}
|
|
tSkipListDestroyIter(iter->iter);
|
|
iterateValueDestroy(&iter->val, true);
|
|
taosMemoryFree(iter);
|
|
}
|
|
|
|
int indexCacheSchedToMerge(IndexCache* pCache) {
|
|
SSchedMsg schedMsg = {0};
|
|
schedMsg.fp = doMergeWork;
|
|
schedMsg.ahandle = pCache;
|
|
schedMsg.thandle = NULL;
|
|
schedMsg.msg = NULL;
|
|
|
|
taosScheduleTask(indexQhandle, &schedMsg);
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void indexCacheMakeRoomForWrite(IndexCache* cache) {
|
|
while (true) {
|
|
if (cache->occupiedMem * MEM_ESTIMATE_RADIO < MEM_THRESHOLD) {
|
|
break;
|
|
} else if (cache->imm != NULL) {
|
|
// TODO: wake up by condition variable
|
|
taosThreadCondWait(&cache->finished, &cache->mtx);
|
|
} else {
|
|
indexCacheRef(cache);
|
|
cache->imm = cache->mem;
|
|
cache->mem = indexInternalCacheCreate(cache->type);
|
|
cache->occupiedMem = 0;
|
|
// sched to merge
|
|
// unref cache in bgwork
|
|
indexCacheSchedToMerge(cache);
|
|
}
|
|
}
|
|
}
|
|
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
|
if (cache == NULL) {
|
|
return -1;
|
|
}
|
|
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
|
|
|
|
IndexCache* pCache = cache;
|
|
indexCacheRef(pCache);
|
|
// encode data
|
|
CacheTerm* ct = taosMemoryCalloc(1, sizeof(CacheTerm));
|
|
if (cache == NULL) {
|
|
return -1;
|
|
}
|
|
// set up key
|
|
ct->colType = term->colType;
|
|
if (hasJson) {
|
|
ct->colVal = indexPackJsonData(term);
|
|
} else {
|
|
ct->colVal = (char*)taosMemoryCalloc(1, sizeof(char) * (term->nColVal + 1));
|
|
memcpy(ct->colVal, term->colVal, term->nColVal);
|
|
}
|
|
ct->version = atomic_add_fetch_32(&pCache->version, 1);
|
|
// set value
|
|
ct->uid = uid;
|
|
ct->operaType = term->operType;
|
|
// ugly code, refactor later
|
|
int64_t estimate = sizeof(ct) + strlen(ct->colVal);
|
|
|
|
taosThreadMutexLock(&pCache->mtx);
|
|
pCache->occupiedMem += estimate;
|
|
indexCacheMakeRoomForWrite(pCache);
|
|
MemTable* tbl = pCache->mem;
|
|
indexMemRef(tbl);
|
|
tSkipListPut(tbl->mem, (char*)ct);
|
|
indexMemUnRef(tbl);
|
|
|
|
taosThreadMutexUnlock(&pCache->mtx);
|
|
|
|
indexCacheUnRef(pCache);
|
|
return 0;
|
|
// encode end
|
|
}
|
|
int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
|
|
IndexCache* pCache = cache;
|
|
return 0;
|
|
}
|
|
|
|
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SIdxTempResult* tr, STermValueType* s) {
|
|
if (mem == NULL) {
|
|
return 0;
|
|
}
|
|
return cacheSearch[qtype](mem, ct, tr, s);
|
|
}
|
|
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) {
|
|
int64_t st = taosGetTimestampUs();
|
|
if (cache == NULL) {
|
|
return 0;
|
|
}
|
|
IndexCache* pCache = cache;
|
|
|
|
MemTable *mem = NULL, *imm = NULL;
|
|
taosThreadMutexLock(&pCache->mtx);
|
|
mem = pCache->mem;
|
|
imm = pCache->imm;
|
|
indexMemRef(mem);
|
|
indexMemRef(imm);
|
|
taosThreadMutexUnlock(&pCache->mtx);
|
|
|
|
SIndexTerm* term = query->term;
|
|
EIndexQueryType qtype = query->qType;
|
|
|
|
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
|
|
char* p = term->colVal;
|
|
if (hasJson) {
|
|
p = indexPackJsonData(term);
|
|
}
|
|
CacheTerm ct = {.colVal = p, .version = atomic_load_32(&pCache->version)};
|
|
|
|
int ret = indexQueryMem(mem, &ct, qtype, result, s);
|
|
if (ret == 0 && *s != kTypeDeletion) {
|
|
// continue search in imm
|
|
ret = indexQueryMem(imm, &ct, qtype, result, s);
|
|
}
|
|
|
|
if (hasJson) {
|
|
taosMemoryFreeClear(p);
|
|
}
|
|
|
|
indexMemUnRef(mem);
|
|
indexMemUnRef(imm);
|
|
indexInfo("cache search, time cost %" PRIu64 "us", taosGetTimestampUs() - st);
|
|
|
|
return ret;
|
|
}
|
|
|
|
void indexCacheRef(IndexCache* cache) {
|
|
if (cache == NULL) {
|
|
return;
|
|
}
|
|
int ref = T_REF_INC(cache);
|
|
UNUSED(ref);
|
|
}
|
|
void indexCacheUnRef(IndexCache* cache) {
|
|
if (cache == NULL) {
|
|
return;
|
|
}
|
|
int ref = T_REF_DEC(cache);
|
|
if (ref == 0) {
|
|
indexCacheDestroy(cache);
|
|
}
|
|
}
|
|
|
|
void indexMemRef(MemTable* tbl) {
|
|
if (tbl == NULL) {
|
|
return;
|
|
}
|
|
int ref = T_REF_INC(tbl);
|
|
UNUSED(ref);
|
|
}
|
|
void indexMemUnRef(MemTable* tbl) {
|
|
if (tbl == NULL) {
|
|
return;
|
|
}
|
|
int ref = T_REF_DEC(tbl);
|
|
if (ref == 0) {
|
|
SSkipList* slt = tbl->mem;
|
|
indexCacheDestroySkiplist(slt);
|
|
taosMemoryFree(tbl);
|
|
}
|
|
}
|
|
|
|
static void indexCacheTermDestroy(CacheTerm* ct) {
|
|
if (ct == NULL) {
|
|
return;
|
|
}
|
|
taosMemoryFree(ct->colVal);
|
|
taosMemoryFree(ct);
|
|
}
|
|
static char* indexCacheTermGet(const void* pData) {
|
|
CacheTerm* p = (CacheTerm*)pData;
|
|
return (char*)p;
|
|
}
|
|
static int32_t indexCacheTermCompare(const void* l, const void* r) {
|
|
CacheTerm* lt = (CacheTerm*)l;
|
|
CacheTerm* rt = (CacheTerm*)r;
|
|
// compare colVal
|
|
int32_t cmp = strcmp(lt->colVal, rt->colVal);
|
|
if (cmp == 0) {
|
|
return rt->version - lt->version;
|
|
}
|
|
return cmp;
|
|
}
|
|
|
|
static MemTable* indexInternalCacheCreate(int8_t type) {
|
|
type = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type;
|
|
|
|
MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable));
|
|
indexMemRef(tbl);
|
|
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
|
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, indexCacheTermCompare, SL_ALLOW_DUP_KEY,
|
|
indexCacheTermGet);
|
|
}
|
|
return tbl;
|
|
}
|
|
|
|
static void doMergeWork(SSchedMsg* msg) {
|
|
IndexCache* pCache = msg->ahandle;
|
|
SIndex* sidx = (SIndex*)pCache->index;
|
|
indexFlushCacheToTFile(sidx, pCache);
|
|
}
|
|
static bool indexCacheIteratorNext(Iterate* itera) {
|
|
SSkipListIterator* iter = itera->iter;
|
|
if (iter == NULL) {
|
|
return false;
|
|
}
|
|
IterateValue* iv = &itera->val;
|
|
iterateValueDestroy(iv, false);
|
|
|
|
bool next = tSkipListIterNext(iter);
|
|
if (next) {
|
|
SSkipListNode* node = tSkipListIterGet(iter);
|
|
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
|
|
|
iv->type = ct->operaType;
|
|
iv->ver = ct->version;
|
|
iv->colVal = tstrdup(ct->colVal);
|
|
|
|
taosArrayPush(iv->val, &ct->uid);
|
|
}
|
|
return next;
|
|
}
|
|
|
|
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
|
|
// opt later
|
|
return &iter->val;
|
|
}
|