Merge branch '3.0' into feature/vnode
This commit is contained in:
commit
fc7bb3b5b1
|
@ -45,7 +45,6 @@ typedef struct SMetaData {
|
||||||
} SMetaData;
|
} SMetaData;
|
||||||
|
|
||||||
typedef struct SCatalogCfg {
|
typedef struct SCatalogCfg {
|
||||||
bool enableVgroupCache;
|
|
||||||
uint32_t maxTblCacheNum;
|
uint32_t maxTblCacheNum;
|
||||||
uint32_t maxDBCacheNum;
|
uint32_t maxDBCacheNum;
|
||||||
} SCatalogCfg;
|
} SCatalogCfg;
|
||||||
|
@ -61,8 +60,8 @@ int32_t catalogInit(SCatalogCfg *cfg);
|
||||||
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle);
|
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle);
|
||||||
|
|
||||||
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
|
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
|
||||||
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo);
|
|
||||||
int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
|
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a table's meta data.
|
* Get a table's meta data.
|
||||||
|
|
|
@ -24,7 +24,6 @@ extern "C" {
|
||||||
#include "catalog.h"
|
#include "catalog.h"
|
||||||
|
|
||||||
typedef struct SSchedulerCfg {
|
typedef struct SSchedulerCfg {
|
||||||
int32_t clusterType;
|
|
||||||
int32_t maxJobNum;
|
int32_t maxJobNum;
|
||||||
} SSchedulerCfg;
|
} SSchedulerCfg;
|
||||||
|
|
||||||
|
|
|
@ -66,8 +66,6 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
|
||||||
#define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
|
#define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
|
#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
|
||||||
#define CTG_CACHE_ENABLED() (ctgMgmt.cfg.maxDBCacheNum > 0 || ctgMgmt.cfg.maxTblCacheNum > 0)
|
|
||||||
|
|
||||||
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
|
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
|
||||||
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
||||||
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
|
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
|
||||||
|
|
|
@ -370,6 +370,41 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) {
|
||||||
|
if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) {
|
||||||
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t exist = 0;
|
||||||
|
|
||||||
|
if (0 == forceUpdate) {
|
||||||
|
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist));
|
||||||
|
|
||||||
|
if (exist) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SUseDbOutput DbOut = {0};
|
||||||
|
SBuildUseDBInput input = {0};
|
||||||
|
|
||||||
|
strncpy(input.db, dbName, sizeof(input.db));
|
||||||
|
input.db[sizeof(input.db) - 1] = 0;
|
||||||
|
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||||
|
|
||||||
|
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
|
||||||
|
|
||||||
|
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup));
|
||||||
|
|
||||||
|
if (dbInfo) {
|
||||||
|
*dbInfo = DbOut.dbVgroup;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogInit(SCatalogCfg *cfg) {
|
int32_t catalogInit(SCatalogCfg *cfg) {
|
||||||
if (ctgMgmt.pCluster) {
|
if (ctgMgmt.pCluster) {
|
||||||
ctgError("catalog already init");
|
ctgError("catalog already init");
|
||||||
|
@ -378,16 +413,22 @@ int32_t catalogInit(SCatalogCfg *cfg) {
|
||||||
|
|
||||||
if (cfg) {
|
if (cfg) {
|
||||||
memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
|
memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
|
||||||
|
|
||||||
|
if (ctgMgmt.cfg.maxDBCacheNum == 0) {
|
||||||
|
ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ctgMgmt.cfg.maxTblCacheNum == 0) {
|
||||||
|
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
|
ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
|
||||||
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
|
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (CTG_CACHE_ENABLED()) {
|
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
if (NULL == ctgMgmt.pCluster) {
|
||||||
if (NULL == ctgMgmt.pCluster) {
|
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
|
||||||
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -449,13 +490,19 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
|
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
|
||||||
if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
|
if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dbInfo->vgVersion < 0) {
|
if (dbInfo->vgVersion < 0) {
|
||||||
if (pCatalog->dbCache.cache) {
|
if (pCatalog->dbCache.cache) {
|
||||||
|
SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
||||||
|
if (oldInfo && oldInfo->vgInfo) {
|
||||||
|
taosHashCleanup(oldInfo->vgInfo);
|
||||||
|
oldInfo->vgInfo = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -485,42 +532,6 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) {
|
|
||||||
if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) {
|
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t exist = 0;
|
|
||||||
|
|
||||||
if (0 == forceUpdate) {
|
|
||||||
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist));
|
|
||||||
|
|
||||||
if (exist) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SUseDbOutput DbOut = {0};
|
|
||||||
SBuildUseDBInput input = {0};
|
|
||||||
|
|
||||||
strncpy(input.db, dbName, sizeof(input.db));
|
|
||||||
input.db[sizeof(input.db) - 1] = 0;
|
|
||||||
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
|
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
|
|
||||||
|
|
||||||
CTG_ERR_RET(catalogUpdateDBVgroupCache(pCatalog, dbName, &DbOut.dbVgroup));
|
|
||||||
|
|
||||||
if (dbInfo) {
|
|
||||||
*dbInfo = DbOut.dbVgroup;
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
|
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
|
||||||
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pDBName, pTableName, false, pTableMeta);
|
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pDBName, pTableName, false, pTableMeta);
|
||||||
}
|
}
|
||||||
|
@ -531,6 +542,7 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgroupInfo vgroupInfo = {0};
|
SVgroupInfo vgroupInfo = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo));
|
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo));
|
||||||
|
|
||||||
|
@ -540,11 +552,13 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &output));
|
CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &output));
|
||||||
|
|
||||||
CTG_ERR_RET(ctgUpdateTableMetaCache(pCatalog, &output));
|
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
tfree(output.tbMeta);
|
tfree(output.tbMeta);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
|
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
|
||||||
|
@ -563,7 +577,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
|
||||||
|
|
||||||
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta));
|
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta));
|
||||||
|
|
||||||
CTG_ERR_JRET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
|
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
|
||||||
|
|
||||||
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
|
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
|
||||||
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList));
|
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList));
|
||||||
|
@ -594,6 +608,7 @@ _return:
|
||||||
tfree(tbMeta);
|
tfree(tbMeta);
|
||||||
|
|
||||||
taosArrayDestroy(*pVgroupList);
|
taosArrayDestroy(*pVgroupList);
|
||||||
|
*pVgroupList = NULL;
|
||||||
|
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
@ -604,7 +619,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter,
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t vgId = 0;
|
int32_t vgId = 0;
|
||||||
|
|
||||||
CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo));
|
CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo));
|
||||||
|
|
||||||
if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
|
if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
|
||||||
ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
|
ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
|
||||||
|
@ -627,12 +642,15 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p
|
||||||
if (pReq->pTableName) {
|
if (pReq->pTableName) {
|
||||||
char dbName[TSDB_DB_FNAME_LEN];
|
char dbName[TSDB_DB_FNAME_LEN];
|
||||||
int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
|
int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
|
||||||
if (tbNum > 0) {
|
if (tbNum <= 0) {
|
||||||
pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
|
ctgError("empty table name list");
|
||||||
if (NULL == pRsp->pTableMeta) {
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
ctgError("taosArrayInit num[%d] failed", tbNum);
|
}
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
|
||||||
}
|
pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
|
||||||
|
if (NULL == pRsp->pTableMeta) {
|
||||||
|
ctgError("taosArrayInit num[%d] failed", tbNum);
|
||||||
|
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < tbNum; ++i) {
|
for (int32_t i = 0; i < tbNum; ++i) {
|
||||||
|
@ -663,6 +681,7 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pRsp->pTableMeta);
|
taosArrayDestroy(pRsp->pTableMeta);
|
||||||
|
pRsp->pTableMeta = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
|
|
|
@ -30,14 +30,18 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
typedef struct MemTable {
|
||||||
|
T_REF_DECLARE()
|
||||||
|
SSkipList* mem;
|
||||||
|
} MemTable;
|
||||||
typedef struct IndexCache {
|
typedef struct IndexCache {
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
SSkipList *mem, *imm;
|
MemTable *mem, *imm;
|
||||||
SIndex* index;
|
SIndex* index;
|
||||||
char* colName;
|
char* colName;
|
||||||
int32_t version;
|
int32_t version;
|
||||||
int32_t nTerm;
|
int32_t nTerm;
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
|
||||||
pthread_mutex_t mtx;
|
pthread_mutex_t mtx;
|
||||||
} IndexCache;
|
} IndexCache;
|
||||||
|
@ -45,7 +49,6 @@ typedef struct IndexCache {
|
||||||
#define CACHE_VERSION(cache) atomic_load_32(&cache->version)
|
#define CACHE_VERSION(cache) atomic_load_32(&cache->version)
|
||||||
typedef struct CacheTerm {
|
typedef struct CacheTerm {
|
||||||
// key
|
// key
|
||||||
int32_t nColVal;
|
|
||||||
char* colVal;
|
char* colVal;
|
||||||
int32_t version;
|
int32_t version;
|
||||||
// value
|
// value
|
||||||
|
|
|
@ -34,9 +34,7 @@ int32_t indexInit() {
|
||||||
return indexQhandle == NULL ? -1 : 0;
|
return indexQhandle == NULL ? -1 : 0;
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
void indexCleanUp() {
|
void indexCleanUp() { taosCleanUpScheduler(indexQhandle); }
|
||||||
taosCleanUpScheduler(indexQhandle);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int uidCompare(const void* a, const void* b) {
|
static int uidCompare(const void* a, const void* b) {
|
||||||
uint64_t u1 = *(uint64_t*)a;
|
uint64_t u1 = *(uint64_t*)a;
|
||||||
|
@ -63,7 +61,9 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp
|
||||||
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
||||||
// pthread_once(&isInit, indexInit);
|
// pthread_once(&isInit, indexInit);
|
||||||
SIndex* sIdx = calloc(1, sizeof(SIndex));
|
SIndex* sIdx = calloc(1, sizeof(SIndex));
|
||||||
if (sIdx == NULL) { return -1; }
|
if (sIdx == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef USE_LUCENE
|
#ifdef USE_LUCENE
|
||||||
index_t* index = index_open(path);
|
index_t* index = index_open(path);
|
||||||
|
@ -99,7 +99,9 @@ void indexClose(SIndex* sIdx) {
|
||||||
void* iter = taosHashIterate(sIdx->colObj, NULL);
|
void* iter = taosHashIterate(sIdx->colObj, NULL);
|
||||||
while (iter) {
|
while (iter) {
|
||||||
IndexCache** pCache = iter;
|
IndexCache** pCache = iter;
|
||||||
if (*pCache) { indexCacheUnRef(*pCache); }
|
if (*pCache) {
|
||||||
|
indexCacheUnRef(*pCache);
|
||||||
|
}
|
||||||
iter = taosHashIterate(sIdx->colObj, iter);
|
iter = taosHashIterate(sIdx->colObj, iter);
|
||||||
}
|
}
|
||||||
taosHashCleanup(sIdx->colObj);
|
taosHashCleanup(sIdx->colObj);
|
||||||
|
@ -133,7 +135,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
||||||
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
SIndexTerm* p = taosArrayGetP(fVals, i);
|
||||||
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
|
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
|
||||||
if (*cache == NULL) {
|
if (cache == NULL) {
|
||||||
IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType);
|
IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType);
|
||||||
taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*));
|
taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*));
|
||||||
}
|
}
|
||||||
|
@ -143,10 +145,11 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
||||||
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
SIndexTerm* p = taosArrayGetP(fVals, i);
|
||||||
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
|
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
|
||||||
|
|
||||||
assert(*cache != NULL);
|
assert(*cache != NULL);
|
||||||
int ret = indexCachePut(*cache, p, uid);
|
int ret = indexCachePut(*cache, p, uid);
|
||||||
if (ret != 0) { return ret; }
|
if (ret != 0) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -224,17 +227,20 @@ SIndexOpts* indexOptsCreate() {
|
||||||
#endif
|
#endif
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
void indexOptsDestroy(SIndexOpts* opts){
|
void indexOptsDestroy(SIndexOpts* opts) {
|
||||||
#ifdef USE_LUCENE
|
#ifdef USE_LUCENE
|
||||||
#endif
|
#endif
|
||||||
} /*
|
return;
|
||||||
* @param: oper
|
}
|
||||||
*
|
/*
|
||||||
*/
|
* @param: oper
|
||||||
|
*
|
||||||
|
*/
|
||||||
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
|
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
|
||||||
SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery));
|
SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery));
|
||||||
if (p == NULL) { return NULL; }
|
if (p == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
p->opera = opera;
|
p->opera = opera;
|
||||||
p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
|
p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
|
||||||
return p;
|
return p;
|
||||||
|
@ -253,15 +259,12 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SIndexTerm* indexTermCreate(int64_t suid,
|
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
|
||||||
SIndexOperOnColumn oper,
|
int32_t nColName, const char* colVal, int32_t nColVal) {
|
||||||
uint8_t colType,
|
|
||||||
const char* colName,
|
|
||||||
int32_t nColName,
|
|
||||||
const char* colVal,
|
|
||||||
int32_t nColVal) {
|
|
||||||
SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm)));
|
SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm)));
|
||||||
if (t == NULL) { return NULL; }
|
if (t == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
t->suid = suid;
|
t->suid = suid;
|
||||||
t->operType = oper;
|
t->operType = oper;
|
||||||
|
@ -282,9 +285,7 @@ void indexTermDestroy(SIndexTerm* p) {
|
||||||
free(p);
|
free(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
SIndexMultiTerm* indexMultiTermCreate() {
|
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
|
||||||
return taosArrayInit(4, sizeof(SIndexTerm*));
|
|
||||||
}
|
|
||||||
|
|
||||||
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
|
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
|
||||||
taosArrayPush(terms, &term);
|
taosArrayPush(terms, &term);
|
||||||
|
@ -307,7 +308,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
|
||||||
IndexCache* cache = NULL;
|
IndexCache* cache = NULL;
|
||||||
pthread_mutex_lock(&sIdx->mtx);
|
pthread_mutex_lock(&sIdx->mtx);
|
||||||
IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName);
|
IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName);
|
||||||
if (*pCache == NULL) {
|
if (pCache == NULL) {
|
||||||
pthread_mutex_unlock(&sIdx->mtx);
|
pthread_mutex_unlock(&sIdx->mtx);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -335,7 +336,9 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
static void indexInterResultsDestroy(SArray* results) {
|
static void indexInterResultsDestroy(SArray* results) {
|
||||||
if (results == NULL) { return; }
|
if (results == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
size_t sz = taosArrayGetSize(results);
|
size_t sz = taosArrayGetSize(results);
|
||||||
for (size_t i = 0; i < sz; i++) {
|
for (size_t i = 0; i < sz; i++) {
|
||||||
|
@ -366,7 +369,9 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
|
||||||
}
|
}
|
||||||
|
|
||||||
int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
|
int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
|
||||||
if (sIdx == NULL) { return -1; }
|
if (sIdx == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
|
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
|
||||||
|
|
||||||
IndexCache* pCache = (IndexCache*)cache;
|
IndexCache* pCache = (IndexCache*)cache;
|
||||||
|
@ -399,7 +404,6 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
|
||||||
TFileValue* tfv = tfileValueCreate(cv->colVal);
|
TFileValue* tfv = tfileValueCreate(cv->colVal);
|
||||||
taosArrayAddAll(tfv->tableId, cv->val);
|
taosArrayAddAll(tfv->tableId, cv->val);
|
||||||
taosArrayPush(result, &tfv);
|
taosArrayPush(result, &tfv);
|
||||||
|
|
||||||
// copy to final Result;
|
// copy to final Result;
|
||||||
cn = cacheIter->next(cacheIter);
|
cn = cacheIter->next(cacheIter);
|
||||||
} else {
|
} else {
|
||||||
|
@ -433,7 +437,9 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
|
||||||
indexError("faile to open file to write");
|
indexError("faile to open file to write");
|
||||||
} else {
|
} else {
|
||||||
int ret = tfileWriterPut(tw, result);
|
int ret = tfileWriterPut(tw, result);
|
||||||
if (ret != 0) { indexError("faile to write into tindex "); }
|
if (ret != 0) {
|
||||||
|
indexError("faile to write into tindex ");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// not free later, just put int table cache
|
// not free later, just put int table cache
|
||||||
indexCacheDestroyImm(pCache);
|
indexCacheDestroyImm(pCache);
|
||||||
|
|
|
@ -23,46 +23,22 @@
|
||||||
#define MEM_TERM_LIMIT 1000000
|
#define MEM_TERM_LIMIT 1000000
|
||||||
// ref index_cache.h:22
|
// ref index_cache.h:22
|
||||||
//#define CACHE_KEY_LEN(p) \
|
//#define CACHE_KEY_LEN(p) \
|
||||||
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
|
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
|
||||||
|
// sizeof(p->operType))
|
||||||
|
|
||||||
static void cacheTermDestroy(CacheTerm* ct) {
|
static void indexMemRef(MemTable* tbl);
|
||||||
if (ct == NULL) { return; }
|
static void indexMemUnRef(MemTable* tbl);
|
||||||
|
|
||||||
free(ct->colVal);
|
static void cacheTermDestroy(CacheTerm* ct);
|
||||||
free(ct);
|
static char* getIndexKey(const void* pData);
|
||||||
}
|
static int32_t compareKey(const void* l, const void* r);
|
||||||
static char* getIndexKey(const void* pData) {
|
|
||||||
CacheTerm* p = (CacheTerm*)pData;
|
|
||||||
return (char*)p;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t compareKey(const void* l, const void* r) {
|
static MemTable* indexInternalCacheCreate(int8_t type);
|
||||||
CacheTerm* lt = (CacheTerm*)l;
|
|
||||||
CacheTerm* rt = (CacheTerm*)r;
|
|
||||||
|
|
||||||
// compare colVal
|
static void doMergeWork(SSchedMsg* msg);
|
||||||
int i, j;
|
static bool indexCacheIteratorNext(Iterate* itera);
|
||||||
for (i = 0, j = 0; i < lt->nColVal && j < rt->nColVal; i++, j++) {
|
|
||||||
if (lt->colVal[i] == rt->colVal[j]) {
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
return lt->colVal[i] < rt->colVal[j] ? -1 : 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (i < lt->nColVal) {
|
|
||||||
return 1;
|
|
||||||
} else if (j < rt->nColVal) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
// compare version
|
|
||||||
return rt->version - lt->version;
|
|
||||||
}
|
|
||||||
|
|
||||||
static SSkipList* indexInternalCacheCreate(int8_t type) {
|
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
|
||||||
if (type == TSDB_DATA_TYPE_BINARY) {
|
|
||||||
return tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
|
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
|
||||||
IndexCache* cache = calloc(1, sizeof(IndexCache));
|
IndexCache* cache = calloc(1, sizeof(IndexCache));
|
||||||
|
@ -83,7 +59,15 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
|
||||||
return cache;
|
return cache;
|
||||||
}
|
}
|
||||||
void indexCacheDebug(IndexCache* cache) {
|
void indexCacheDebug(IndexCache* cache) {
|
||||||
SSkipListIterator* iter = tSkipListCreateIter(cache->mem);
|
MemTable* tbl = NULL;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&cache->mtx);
|
||||||
|
tbl = cache->mem;
|
||||||
|
indexMemRef(tbl);
|
||||||
|
pthread_mutex_unlock(&cache->mtx);
|
||||||
|
|
||||||
|
SSkipList* slt = tbl->mem;
|
||||||
|
SSkipListIterator* iter = tSkipListCreateIter(slt);
|
||||||
while (tSkipListIterNext(iter)) {
|
while (tSkipListIterNext(iter)) {
|
||||||
SSkipListNode* node = tSkipListIterGet(iter);
|
SSkipListNode* node = tSkipListIterGet(iter);
|
||||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||||
|
@ -93,6 +77,8 @@ void indexCacheDebug(IndexCache* cache) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tSkipListDestroyIter(iter);
|
tSkipListDestroyIter(iter);
|
||||||
|
|
||||||
|
indexMemUnRef(tbl);
|
||||||
}
|
}
|
||||||
|
|
||||||
void indexCacheDestroySkiplist(SSkipList* slt) {
|
void indexCacheDestroySkiplist(SSkipList* slt) {
|
||||||
|
@ -100,71 +86,50 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
|
||||||
while (tSkipListIterNext(iter)) {
|
while (tSkipListIterNext(iter)) {
|
||||||
SSkipListNode* node = tSkipListIterGet(iter);
|
SSkipListNode* node = tSkipListIterGet(iter);
|
||||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||||
if (ct != NULL) {}
|
if (ct != NULL) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
tSkipListDestroyIter(iter);
|
tSkipListDestroyIter(iter);
|
||||||
|
tSkipListDestroy(slt);
|
||||||
}
|
}
|
||||||
void indexCacheDestroyImm(IndexCache* cache) {
|
void indexCacheDestroyImm(IndexCache* cache) {
|
||||||
|
MemTable* tbl = NULL;
|
||||||
pthread_mutex_lock(&cache->mtx);
|
pthread_mutex_lock(&cache->mtx);
|
||||||
SSkipList* timm = (SSkipList*)cache->imm;
|
tbl = cache->imm;
|
||||||
cache->imm = NULL; // or throw int bg thread
|
cache->imm = NULL; // or throw int bg thread
|
||||||
pthread_mutex_unlock(&cache->mtx);
|
pthread_mutex_unlock(&cache->mtx);
|
||||||
|
indexMemUnRef(tbl);
|
||||||
indexCacheDestroySkiplist(timm);
|
|
||||||
}
|
}
|
||||||
void indexCacheDestroy(void* cache) {
|
void indexCacheDestroy(void* cache) {
|
||||||
IndexCache* pCache = cache;
|
IndexCache* pCache = cache;
|
||||||
if (pCache == NULL) { return; }
|
if (pCache == NULL) {
|
||||||
tSkipListDestroy(pCache->mem);
|
return;
|
||||||
tSkipListDestroy(pCache->imm);
|
}
|
||||||
|
indexMemUnRef(pCache->mem);
|
||||||
|
indexMemUnRef(pCache->imm);
|
||||||
free(pCache->colName);
|
free(pCache->colName);
|
||||||
|
|
||||||
free(pCache);
|
free(pCache);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doMergeWork(SSchedMsg* msg) {
|
|
||||||
IndexCache* pCache = msg->ahandle;
|
|
||||||
SIndex* sidx = (SIndex*)pCache->index;
|
|
||||||
indexFlushCacheTFile(sidx, pCache);
|
|
||||||
}
|
|
||||||
static bool indexCacheIteratorNext(Iterate* itera) {
|
|
||||||
SSkipListIterator* iter = itera->iter;
|
|
||||||
if (iter == NULL) { return false; }
|
|
||||||
|
|
||||||
IterateValue* iv = &itera->val;
|
|
||||||
iterateValueDestroy(iv, false);
|
|
||||||
|
|
||||||
bool next = tSkipListIterNext(iter);
|
|
||||||
if (next) {
|
|
||||||
SSkipListNode* node = tSkipListIterGet(iter);
|
|
||||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
|
||||||
|
|
||||||
iv->type = ct->operaType;
|
|
||||||
iv->colVal = ct->colVal;
|
|
||||||
|
|
||||||
taosArrayPush(iv->val, &ct->uid);
|
|
||||||
}
|
|
||||||
|
|
||||||
return next;
|
|
||||||
}
|
|
||||||
|
|
||||||
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
|
|
||||||
return &iter->val;
|
|
||||||
}
|
|
||||||
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
|
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
|
||||||
Iterate* iiter = calloc(1, sizeof(Iterate));
|
Iterate* iiter = calloc(1, sizeof(Iterate));
|
||||||
if (iiter == NULL) { return NULL; }
|
if (iiter == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
MemTable* tbl = cache->imm;
|
||||||
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
|
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
|
||||||
iiter->iter = cache->imm != NULL ? tSkipListCreateIter(cache->imm) : NULL;
|
iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
|
||||||
iiter->next = indexCacheIteratorNext;
|
iiter->next = indexCacheIteratorNext;
|
||||||
iiter->getValue = indexCacheIteratorGetValue;
|
iiter->getValue = indexCacheIteratorGetValue;
|
||||||
|
|
||||||
return iiter;
|
return iiter;
|
||||||
}
|
}
|
||||||
void indexCacheIteratorDestroy(Iterate* iter) {
|
void indexCacheIteratorDestroy(Iterate* iter) {
|
||||||
if (iter == NULL) { return; }
|
if (iter == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
tSkipListDestroyIter(iter->iter);
|
tSkipListDestroyIter(iter->iter);
|
||||||
iterateValueDestroy(&iter->val, true);
|
iterateValueDestroy(&iter->val, true);
|
||||||
free(iter);
|
free(iter);
|
||||||
|
@ -201,18 +166,21 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
||||||
if (cache == NULL) { return -1; }
|
if (cache == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
IndexCache* pCache = cache;
|
IndexCache* pCache = cache;
|
||||||
indexCacheRef(pCache);
|
indexCacheRef(pCache);
|
||||||
// encode data
|
// encode data
|
||||||
CacheTerm* ct = calloc(1, sizeof(CacheTerm));
|
CacheTerm* ct = calloc(1, sizeof(CacheTerm));
|
||||||
if (cache == NULL) { return -1; }
|
if (cache == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
// set up key
|
// set up key
|
||||||
ct->colType = term->colType;
|
ct->colType = term->colType;
|
||||||
ct->nColVal = term->nColVal;
|
ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1));
|
||||||
ct->colVal = (char*)calloc(1, sizeof(char) * (ct->nColVal + 1));
|
memcpy(ct->colVal, term->colVal, term->nColVal);
|
||||||
memcpy(ct->colVal, term->colVal, ct->nColVal);
|
|
||||||
ct->version = atomic_add_fetch_32(&pCache->version, 1);
|
ct->version = atomic_add_fetch_32(&pCache->version, 1);
|
||||||
// set value
|
// set value
|
||||||
ct->uid = uid;
|
ct->uid = uid;
|
||||||
|
@ -220,8 +188,13 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
||||||
|
|
||||||
// ugly code, refactor later
|
// ugly code, refactor later
|
||||||
pthread_mutex_lock(&pCache->mtx);
|
pthread_mutex_lock(&pCache->mtx);
|
||||||
|
|
||||||
indexCacheMakeRoomForWrite(pCache);
|
indexCacheMakeRoomForWrite(pCache);
|
||||||
tSkipListPut(pCache->mem, (char*)ct);
|
MemTable* tbl = pCache->mem;
|
||||||
|
indexMemRef(tbl);
|
||||||
|
tSkipListPut(tbl->mem, (char*)ct);
|
||||||
|
indexMemUnRef(tbl);
|
||||||
|
|
||||||
pthread_mutex_unlock(&pCache->mtx);
|
pthread_mutex_unlock(&pCache->mtx);
|
||||||
|
|
||||||
indexCacheUnRef(pCache);
|
indexCacheUnRef(pCache);
|
||||||
|
@ -233,27 +206,38 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
|
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
|
||||||
if (cache == NULL) { return -1; }
|
if (cache == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
IndexCache* pCache = cache;
|
IndexCache* pCache = cache;
|
||||||
SIndexTerm* term = query->term;
|
SIndexTerm* term = query->term;
|
||||||
EIndexQueryType qtype = query->qType;
|
EIndexQueryType qtype = query->qType;
|
||||||
|
|
||||||
|
MemTable *mem = NULL, *imm = NULL;
|
||||||
|
pthread_mutex_lock(&pCache->mtx);
|
||||||
|
mem = pCache->mem;
|
||||||
|
imm = pCache->imm;
|
||||||
|
indexMemRef(mem);
|
||||||
|
indexMemRef(imm);
|
||||||
|
pthread_mutex_unlock(&pCache->mtx);
|
||||||
|
|
||||||
CacheTerm* ct = calloc(1, sizeof(CacheTerm));
|
CacheTerm* ct = calloc(1, sizeof(CacheTerm));
|
||||||
if (ct == NULL) { return -1; }
|
if (ct == NULL) {
|
||||||
ct->nColVal = term->nColVal;
|
return -1;
|
||||||
ct->colVal = calloc(1, sizeof(char) * (ct->nColVal + 1));
|
}
|
||||||
memcpy(ct->colVal, term->colVal, ct->nColVal);
|
ct->colVal = calloc(1, sizeof(char) * (term->nColVal + 1));
|
||||||
|
memcpy(ct->colVal, term->colVal, term->nColVal);
|
||||||
ct->version = atomic_load_32(&pCache->version);
|
ct->version = atomic_load_32(&pCache->version);
|
||||||
|
|
||||||
char* key = getIndexKey(ct);
|
char* key = getIndexKey(ct);
|
||||||
// TODO handle multi situation later, and refactor
|
// TODO handle multi situation later, and refactor
|
||||||
SSkipListIterator* iter = tSkipListCreateIterFromVal(pCache->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
||||||
while (tSkipListIterNext(iter)) {
|
while (tSkipListIterNext(iter)) {
|
||||||
SSkipListNode* node = tSkipListIterGet(iter);
|
SSkipListNode* node = tSkipListIterGet(iter);
|
||||||
if (node != NULL) {
|
if (node != NULL) {
|
||||||
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||||
if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) {
|
if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) {
|
||||||
if (c->nColVal == ct->nColVal && strncmp(c->colVal, ct->colVal, c->nColVal) == 0) {
|
if (strcmp(c->colVal, ct->colVal) == 0) {
|
||||||
taosArrayPush(result, &c->uid);
|
taosArrayPush(result, &c->uid);
|
||||||
*s = kTypeValue;
|
*s = kTypeValue;
|
||||||
} else {
|
} else {
|
||||||
|
@ -279,14 +263,104 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
|
||||||
} else if (qtype == QUERY_REGEX) {
|
} else if (qtype == QUERY_REGEX) {
|
||||||
//
|
//
|
||||||
}
|
}
|
||||||
|
indexMemUnRef(mem);
|
||||||
|
indexMemUnRef(imm);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void indexCacheRef(IndexCache* cache) {
|
void indexCacheRef(IndexCache* cache) {
|
||||||
|
if (cache == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
int ref = T_REF_INC(cache);
|
int ref = T_REF_INC(cache);
|
||||||
UNUSED(ref);
|
UNUSED(ref);
|
||||||
}
|
}
|
||||||
void indexCacheUnRef(IndexCache* cache) {
|
void indexCacheUnRef(IndexCache* cache) {
|
||||||
|
if (cache == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
int ref = T_REF_DEC(cache);
|
int ref = T_REF_DEC(cache);
|
||||||
if (ref == 0) { indexCacheDestroy(cache); }
|
if (ref == 0) {
|
||||||
|
indexCacheDestroy(cache);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void indexMemRef(MemTable* tbl) {
|
||||||
|
if (tbl == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int ref = T_REF_INC(tbl);
|
||||||
|
UNUSED(ref);
|
||||||
|
}
|
||||||
|
void indexMemUnRef(MemTable* tbl) {
|
||||||
|
if (tbl == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int ref = T_REF_DEC(tbl);
|
||||||
|
if (ref == 0) {
|
||||||
|
SSkipList* slt = tbl->mem;
|
||||||
|
indexCacheDestroySkiplist(slt);
|
||||||
|
free(tbl);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void cacheTermDestroy(CacheTerm* ct) {
|
||||||
|
if (ct == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
free(ct->colVal);
|
||||||
|
free(ct);
|
||||||
|
}
|
||||||
|
static char* getIndexKey(const void* pData) {
|
||||||
|
CacheTerm* p = (CacheTerm*)pData;
|
||||||
|
return (char*)p;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t compareKey(const void* l, const void* r) {
|
||||||
|
CacheTerm* lt = (CacheTerm*)l;
|
||||||
|
CacheTerm* rt = (CacheTerm*)r;
|
||||||
|
|
||||||
|
// compare colVal
|
||||||
|
int32_t cmp = strcmp(lt->colVal, rt->colVal);
|
||||||
|
if (cmp == 0) {
|
||||||
|
return rt->version - lt->version;
|
||||||
|
}
|
||||||
|
return cmp;
|
||||||
|
}
|
||||||
|
|
||||||
|
static MemTable* indexInternalCacheCreate(int8_t type) {
|
||||||
|
MemTable* tbl = calloc(1, sizeof(MemTable));
|
||||||
|
indexMemRef(tbl);
|
||||||
|
if (type == TSDB_DATA_TYPE_BINARY) {
|
||||||
|
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
|
||||||
|
}
|
||||||
|
return tbl;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doMergeWork(SSchedMsg* msg) {
|
||||||
|
IndexCache* pCache = msg->ahandle;
|
||||||
|
SIndex* sidx = (SIndex*)pCache->index;
|
||||||
|
indexFlushCacheTFile(sidx, pCache);
|
||||||
|
}
|
||||||
|
static bool indexCacheIteratorNext(Iterate* itera) {
|
||||||
|
SSkipListIterator* iter = itera->iter;
|
||||||
|
if (iter == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
IterateValue* iv = &itera->val;
|
||||||
|
iterateValueDestroy(iv, false);
|
||||||
|
|
||||||
|
bool next = tSkipListIterNext(iter);
|
||||||
|
if (next) {
|
||||||
|
SSkipListNode* node = tSkipListIterGet(iter);
|
||||||
|
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||||
|
|
||||||
|
iv->type = ct->operaType;
|
||||||
|
iv->colVal = ct->colVal;
|
||||||
|
|
||||||
|
taosArrayPush(iv->val, &ct->uid);
|
||||||
|
}
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
|
||||||
|
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { return &iter->val; }
|
||||||
|
|
|
@ -54,7 +54,9 @@ static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
|
||||||
|
|
||||||
TFileCache* tfileCacheCreate(const char* path) {
|
TFileCache* tfileCacheCreate(const char* path) {
|
||||||
TFileCache* tcache = calloc(1, sizeof(TFileCache));
|
TFileCache* tcache = calloc(1, sizeof(TFileCache));
|
||||||
if (tcache == NULL) { return NULL; }
|
if (tcache == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
tcache->capacity = 64;
|
tcache->capacity = 64;
|
||||||
|
@ -83,7 +85,10 @@ TFileCache* tfileCacheCreate(const char* path) {
|
||||||
tfileReaderRef(reader);
|
tfileReaderRef(reader);
|
||||||
// loader fst and validate it
|
// loader fst and validate it
|
||||||
TFileHeader* header = &reader->header;
|
TFileHeader* header = &reader->header;
|
||||||
TFileCacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
|
TFileCacheKey key = {.suid = header->suid,
|
||||||
|
.colName = header->colName,
|
||||||
|
.nColName = strlen(header->colName),
|
||||||
|
.colType = header->colType};
|
||||||
|
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
tfileSerialCacheKey(&key, buf);
|
tfileSerialCacheKey(&key, buf);
|
||||||
|
@ -97,13 +102,16 @@ End:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
void tfileCacheDestroy(TFileCache* tcache) {
|
void tfileCacheDestroy(TFileCache* tcache) {
|
||||||
if (tcache == NULL) { return; }
|
if (tcache == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// free table cache
|
// free table cache
|
||||||
TFileReader** reader = taosHashIterate(tcache->tableCache, NULL);
|
TFileReader** reader = taosHashIterate(tcache->tableCache, NULL);
|
||||||
while (reader) {
|
while (reader) {
|
||||||
TFileReader* p = *reader;
|
TFileReader* p = *reader;
|
||||||
indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, p->header.colType);
|
indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName,
|
||||||
|
p->header.colType);
|
||||||
|
|
||||||
tfileReaderUnRef(p);
|
tfileReaderUnRef(p);
|
||||||
reader = taosHashIterate(tcache->tableCache, reader);
|
reader = taosHashIterate(tcache->tableCache, reader);
|
||||||
|
@ -116,10 +124,13 @@ TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) {
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
tfileSerialCacheKey(key, buf);
|
tfileSerialCacheKey(key, buf);
|
||||||
|
|
||||||
TFileReader* reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
|
TFileReader** reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
|
||||||
tfileReaderRef(reader);
|
if (reader == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
tfileReaderRef(*reader);
|
||||||
|
|
||||||
return reader;
|
return *reader;
|
||||||
}
|
}
|
||||||
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) {
|
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) {
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
|
@ -138,14 +149,17 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader)
|
||||||
}
|
}
|
||||||
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
||||||
TFileReader* reader = calloc(1, sizeof(TFileReader));
|
TFileReader* reader = calloc(1, sizeof(TFileReader));
|
||||||
if (reader == NULL) { return NULL; }
|
if (reader == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
// T_REF_INC(reader);
|
// T_REF_INC(reader);
|
||||||
reader->ctx = ctx;
|
reader->ctx = ctx;
|
||||||
|
|
||||||
if (0 != tfileReaderLoadHeader(reader)) {
|
if (0 != tfileReaderLoadHeader(reader)) {
|
||||||
tfileReaderDestroy(reader);
|
tfileReaderDestroy(reader);
|
||||||
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName);
|
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
|
||||||
|
reader->header.colName);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -158,7 +172,9 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
||||||
return reader;
|
return reader;
|
||||||
}
|
}
|
||||||
void tfileReaderDestroy(TFileReader* reader) {
|
void tfileReaderDestroy(TFileReader* reader) {
|
||||||
if (reader == NULL) { return; }
|
if (reader == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
// T_REF_INC(reader);
|
// T_REF_INC(reader);
|
||||||
fstDestroy(reader->fst);
|
fstDestroy(reader->fst);
|
||||||
writerCtxDestroy(reader->ctx);
|
writerCtxDestroy(reader->ctx);
|
||||||
|
@ -175,10 +191,12 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
|
||||||
uint64_t offset;
|
uint64_t offset;
|
||||||
FstSlice key = fstSliceCreate(term->colVal, term->nColVal);
|
FstSlice key = fstSliceCreate(term->colVal, term->nColVal);
|
||||||
if (fstGet(reader->fst, &key, &offset)) {
|
if (fstGet(reader->fst, &key, &offset)) {
|
||||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, term->colVal);
|
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName,
|
||||||
|
term->colVal);
|
||||||
ret = tfileReaderLoadTableIds(reader, offset, result);
|
ret = tfileReaderLoadTableIds(reader, offset, result);
|
||||||
} else {
|
} else {
|
||||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, term->colVal);
|
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName,
|
||||||
|
term->colVal);
|
||||||
}
|
}
|
||||||
fstSliceDestroy(&key);
|
fstSliceDestroy(&key);
|
||||||
} else if (qtype == QUERY_PREFIX) {
|
} else if (qtype == QUERY_PREFIX) {
|
||||||
|
@ -304,12 +322,16 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
void tfileWriteClose(TFileWriter* tw) {
|
void tfileWriteClose(TFileWriter* tw) {
|
||||||
if (tw == NULL) { return; }
|
if (tw == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
writerCtxDestroy(tw->ctx);
|
writerCtxDestroy(tw->ctx);
|
||||||
free(tw);
|
free(tw);
|
||||||
}
|
}
|
||||||
void tfileWriterDestroy(TFileWriter* tw) {
|
void tfileWriterDestroy(TFileWriter* tw) {
|
||||||
if (tw == NULL) { return; }
|
if (tw == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
writerCtxDestroy(tw->ctx);
|
writerCtxDestroy(tw->ctx);
|
||||||
free(tw);
|
free(tw);
|
||||||
|
@ -317,29 +339,35 @@ void tfileWriterDestroy(TFileWriter* tw) {
|
||||||
|
|
||||||
IndexTFile* indexTFileCreate(const char* path) {
|
IndexTFile* indexTFileCreate(const char* path) {
|
||||||
IndexTFile* tfile = calloc(1, sizeof(IndexTFile));
|
IndexTFile* tfile = calloc(1, sizeof(IndexTFile));
|
||||||
if (tfile == NULL) { return NULL; }
|
if (tfile == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
tfile->cache = tfileCacheCreate(path);
|
tfile->cache = tfileCacheCreate(path);
|
||||||
return tfile;
|
return tfile;
|
||||||
}
|
}
|
||||||
void IndexTFileDestroy(IndexTFile* tfile) {
|
void IndexTFileDestroy(IndexTFile* tfile) { free(tfile); }
|
||||||
free(tfile);
|
|
||||||
}
|
|
||||||
|
|
||||||
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
|
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
|
||||||
int ret = -1;
|
int ret = -1;
|
||||||
if (tfile == NULL) { return ret; }
|
if (tfile == NULL) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
IndexTFile* pTfile = (IndexTFile*)tfile;
|
IndexTFile* pTfile = (IndexTFile*)tfile;
|
||||||
|
|
||||||
SIndexTerm* term = query->term;
|
SIndexTerm* term = query->term;
|
||||||
TFileCacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
|
TFileCacheKey key = {
|
||||||
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
|
.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
|
||||||
|
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
|
||||||
|
if (reader == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
return tfileReaderSearch(reader, query, result);
|
return tfileReaderSearch(reader, query, result);
|
||||||
}
|
}
|
||||||
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
|
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
|
||||||
// TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName, .version =
|
// TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName =
|
||||||
// 1};
|
// term->nColName, .version = 1};
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -353,7 +381,9 @@ static bool tfileIteratorNext(Iterate* iiter) {
|
||||||
|
|
||||||
TFileFstIter* tIter = iiter->iter;
|
TFileFstIter* tIter = iiter->iter;
|
||||||
StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL);
|
StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL);
|
||||||
if (rt == NULL) { return false; }
|
if (rt == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t sz = 0;
|
int32_t sz = 0;
|
||||||
char* ch = (char*)fstSliceData(&rt->data, &sz);
|
char* ch = (char*)fstSliceData(&rt->data, &sz);
|
||||||
|
@ -364,20 +394,22 @@ static bool tfileIteratorNext(Iterate* iiter) {
|
||||||
|
|
||||||
swsResultDestroy(rt);
|
swsResultDestroy(rt);
|
||||||
// set up iterate value
|
// set up iterate value
|
||||||
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; }
|
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
iv->colVal = colVal;
|
iv->colVal = colVal;
|
||||||
|
|
||||||
// std::string key(ch, sz);
|
// std::string key(ch, sz);
|
||||||
}
|
}
|
||||||
|
|
||||||
static IterateValue* tifileIterateGetValue(Iterate* iter) {
|
static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
|
||||||
return &iter->val;
|
|
||||||
}
|
|
||||||
|
|
||||||
static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
|
static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
|
||||||
TFileFstIter* tIter = calloc(1, sizeof(Iterate));
|
TFileFstIter* tIter = calloc(1, sizeof(Iterate));
|
||||||
if (tIter == NULL) { return NULL; }
|
if (tIter == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
|
tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
|
||||||
tIter->fb = fstSearch(reader->fst, tIter->ctx);
|
tIter->fb = fstSearch(reader->fst, tIter->ctx);
|
||||||
tIter->st = streamBuilderIntoStream(tIter->fb);
|
tIter->st = streamBuilderIntoStream(tIter->fb);
|
||||||
|
@ -389,14 +421,18 @@ Iterate* tfileIteratorCreate(TFileReader* reader) {
|
||||||
Iterate* iter = calloc(1, sizeof(Iterate));
|
Iterate* iter = calloc(1, sizeof(Iterate));
|
||||||
|
|
||||||
iter->iter = tfileFstIteratorCreate(reader);
|
iter->iter = tfileFstIteratorCreate(reader);
|
||||||
if (iter->iter == NULL) { return NULL; }
|
if (iter->iter == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
iter->next = tfileIteratorNext;
|
iter->next = tfileIteratorNext;
|
||||||
iter->getValue = tifileIterateGetValue;
|
iter->getValue = tifileIterateGetValue;
|
||||||
return iter;
|
return iter;
|
||||||
}
|
}
|
||||||
void tfileIteratorDestroy(Iterate* iter) {
|
void tfileIteratorDestroy(Iterate* iter) {
|
||||||
if (iter == NULL) { return; }
|
if (iter == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
IterateValue* iv = &iter->val;
|
IterateValue* iv = &iter->val;
|
||||||
iterateValueDestroy(iv, true);
|
iterateValueDestroy(iv, true);
|
||||||
|
|
||||||
|
@ -409,14 +445,18 @@ void tfileIteratorDestroy(Iterate* iter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) {
|
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) {
|
||||||
if (tf == NULL) { return NULL; }
|
if (tf == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
|
TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
|
||||||
return tfileCacheGet(tf->cache, &key);
|
return tfileCacheGet(tf->cache, &key);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tfileStrCompare(const void* a, const void* b) {
|
static int tfileStrCompare(const void* a, const void* b) {
|
||||||
int ret = strcmp((char*)a, (char*)b);
|
int ret = strcmp((char*)a, (char*)b);
|
||||||
if (ret == 0) { return ret; }
|
if (ret == 0) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
return ret < 0 ? -1 : 1;
|
return ret < 0 ? -1 : 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -431,13 +471,17 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
|
||||||
|
|
||||||
TFileValue* tfileValueCreate(char* val) {
|
TFileValue* tfileValueCreate(char* val) {
|
||||||
TFileValue* tf = calloc(1, sizeof(TFileValue));
|
TFileValue* tf = calloc(1, sizeof(TFileValue));
|
||||||
if (tf == NULL) { return NULL; }
|
if (tf == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
tf->tableId = taosArrayInit(32, sizeof(uint64_t));
|
tf->tableId = taosArrayInit(32, sizeof(uint64_t));
|
||||||
return tf;
|
return tf;
|
||||||
}
|
}
|
||||||
int tfileValuePush(TFileValue* tf, uint64_t val) {
|
int tfileValuePush(TFileValue* tf, uint64_t val) {
|
||||||
if (tf == NULL) { return -1; }
|
if (tf == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
taosArrayPush(tf->tableId, &val);
|
taosArrayPush(tf->tableId, &val);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -457,7 +501,9 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
|
||||||
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
|
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
|
||||||
int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
|
int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
|
||||||
tw->header.fstOffset = fstOffset;
|
tw->header.fstOffset = fstOffset;
|
||||||
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
|
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
tw->offset += sizeof(fstOffset);
|
tw->offset += sizeof(fstOffset);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -468,7 +514,9 @@ static int tfileWriteHeader(TFileWriter* writer) {
|
||||||
memcpy(buf, (char*)header, sizeof(buf));
|
memcpy(buf, (char*)header, sizeof(buf));
|
||||||
|
|
||||||
int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
|
int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
|
||||||
if (sizeof(buf) != nwrite) { return -1; }
|
if (sizeof(buf) != nwrite) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
writer->offset = nwrite;
|
writer->offset = nwrite;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -502,7 +550,9 @@ static int tfileReaderLoadFst(TFileReader* reader) {
|
||||||
static int FST_MAX_SIZE = 16 * 1024;
|
static int FST_MAX_SIZE = 16 * 1024;
|
||||||
|
|
||||||
char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE);
|
char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE);
|
||||||
if (buf == NULL) { return -1; }
|
if (buf == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
WriterCtx* ctx = reader->ctx;
|
WriterCtx* ctx = reader->ctx;
|
||||||
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
|
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
|
||||||
|
@ -525,7 +575,9 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
|
||||||
|
|
||||||
int32_t total = sizeof(uint64_t) * nid;
|
int32_t total = sizeof(uint64_t) * nid;
|
||||||
char* buf = calloc(1, total);
|
char* buf = calloc(1, total);
|
||||||
if (buf == NULL) { return -1; }
|
if (buf == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
nread = ctx->read(ctx, buf, total);
|
nread = ctx->read(ctx, buf, total);
|
||||||
assert(total == nread);
|
assert(total == nread);
|
||||||
|
@ -543,12 +595,16 @@ void tfileReaderRef(TFileReader* reader) {
|
||||||
|
|
||||||
void tfileReaderUnRef(TFileReader* reader) {
|
void tfileReaderUnRef(TFileReader* reader) {
|
||||||
int ref = T_REF_DEC(reader);
|
int ref = T_REF_DEC(reader);
|
||||||
if (ref == 0) { tfileReaderDestroy(reader); }
|
if (ref == 0) {
|
||||||
|
tfileReaderDestroy(reader);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tfileGetFileList(const char* path, SArray* result) {
|
static int tfileGetFileList(const char* path, SArray* result) {
|
||||||
DIR* dir = opendir(path);
|
DIR* dir = opendir(path);
|
||||||
if (NULL == dir) { return -1; }
|
if (NULL == dir) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
struct dirent* entry;
|
struct dirent* entry;
|
||||||
while ((entry = readdir(dir)) != NULL) {
|
while ((entry = readdir(dir)) != NULL) {
|
||||||
|
@ -576,7 +632,9 @@ static int tfileCompare(const void* a, const void* b) {
|
||||||
size_t bLen = strlen(bName);
|
size_t bLen = strlen(bName);
|
||||||
|
|
||||||
int ret = strncmp(aName, bName, aLen > bLen ? aLen : bLen);
|
int ret = strncmp(aName, bName, aLen > bLen ? aLen : bLen);
|
||||||
if (ret == 0) { return ret; }
|
if (ret == 0) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
return ret < 0 ? -1 : 1;
|
return ret < 0 ? -1 : 1;
|
||||||
}
|
}
|
||||||
// tfile name suid-colId-version.tindex
|
// tfile name suid-colId-version.tindex
|
||||||
|
|
|
@ -2,7 +2,8 @@
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
*
|
*
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation.
|
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
|
||||||
|
* Software Foundation.
|
||||||
*
|
*
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
@ -75,7 +76,9 @@ class FstReadMemory {
|
||||||
bool init() {
|
bool init() {
|
||||||
char* buf = (char*)calloc(1, sizeof(char) * _size);
|
char* buf = (char*)calloc(1, sizeof(char) * _size);
|
||||||
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
|
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
|
||||||
if (nRead <= 0) { return false; }
|
if (nRead <= 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
_size = nRead;
|
_size = nRead;
|
||||||
_s = fstSliceCreate((uint8_t*)buf, _size);
|
_s = fstSliceCreate((uint8_t*)buf, _size);
|
||||||
_fst = fstCreate(&_s);
|
_fst = fstCreate(&_s);
|
||||||
|
@ -179,7 +182,9 @@ void checkFstPerf() {
|
||||||
delete fw;
|
delete fw;
|
||||||
|
|
||||||
FstReadMemory* m = new FstReadMemory(1024 * 64);
|
FstReadMemory* m = new FstReadMemory(1024 * 64);
|
||||||
if (m->init()) { printf("success to init fst read"); }
|
if (m->init()) {
|
||||||
|
printf("success to init fst read");
|
||||||
|
}
|
||||||
Performance_fstReadRecords(m);
|
Performance_fstReadRecords(m);
|
||||||
delete m;
|
delete m;
|
||||||
}
|
}
|
||||||
|
@ -283,7 +288,8 @@ class IndexEnv : public ::testing::Test {
|
||||||
// / {
|
// / {
|
||||||
// / std::string colName("tag1"), colVal("Hello world");
|
// / std::string colName("tag1"), colVal("Hello world");
|
||||||
// / SIndexTerm* term =
|
// / SIndexTerm* term =
|
||||||
// indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), / colVal.size());
|
// indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), /
|
||||||
|
// colVal.size());
|
||||||
// SIndexMultiTerm* terms = indexMultiTermCreate();
|
// SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
// indexMultiTermAdd(terms, term);
|
// indexMultiTermAdd(terms, term);
|
||||||
// / / for (size_t i = 0; i < 100; i++) {
|
// / / for (size_t i = 0; i < 100; i++) {
|
||||||
|
@ -301,14 +307,16 @@ class IndexEnv : public ::testing::Test {
|
||||||
// / {
|
// / {
|
||||||
// / std::string colName("tag1"), colVal("Hello world");
|
// / std::string colName("tag1"), colVal("Hello world");
|
||||||
// / SIndexTerm* term =
|
// / SIndexTerm* term =
|
||||||
// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(),
|
||||||
|
// colVal.size());
|
||||||
// / indexMultiTermAdd(terms, term);
|
// / indexMultiTermAdd(terms, term);
|
||||||
// /
|
// /
|
||||||
// }
|
// }
|
||||||
// / {
|
// / {
|
||||||
// / std::string colName("tag2"), colVal("Hello world");
|
// / std::string colName("tag2"), colVal("Hello world");
|
||||||
// / SIndexTerm* term =
|
// / SIndexTerm* term =
|
||||||
// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(),
|
||||||
|
// colVal.size());
|
||||||
// / indexMultiTermAdd(terms, term);
|
// / indexMultiTermAdd(terms, term);
|
||||||
// /
|
// /
|
||||||
// }
|
// }
|
||||||
|
@ -327,7 +335,8 @@ class IndexEnv : public ::testing::Test {
|
||||||
|
|
||||||
class TFileObj {
|
class TFileObj {
|
||||||
public:
|
public:
|
||||||
TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage") : path_(path), colName_(colName) {
|
TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage")
|
||||||
|
: path_(path), colName_(colName) {
|
||||||
colId_ = 10;
|
colId_ = 10;
|
||||||
// Do Nothing
|
// Do Nothing
|
||||||
//
|
//
|
||||||
|
@ -337,7 +346,9 @@ class TFileObj {
|
||||||
tfileReaderDestroy(reader_);
|
tfileReaderDestroy(reader_);
|
||||||
reader_ = NULL;
|
reader_ = NULL;
|
||||||
}
|
}
|
||||||
if (writer_ == NULL) { InitWriter(); }
|
if (writer_ == NULL) {
|
||||||
|
InitWriter();
|
||||||
|
}
|
||||||
return tfileWriterPut(writer_, tv);
|
return tfileWriterPut(writer_, tv);
|
||||||
}
|
}
|
||||||
bool InitWriter() {
|
bool InitWriter() {
|
||||||
|
@ -377,8 +388,12 @@ class TFileObj {
|
||||||
return tfileReaderSearch(reader_, query, result);
|
return tfileReaderSearch(reader_, query, result);
|
||||||
}
|
}
|
||||||
~TFileObj() {
|
~TFileObj() {
|
||||||
if (writer_) { tfileWriterDestroy(writer_); }
|
if (writer_) {
|
||||||
if (reader_) { tfileReaderDestroy(reader_); }
|
tfileWriterDestroy(writer_);
|
||||||
|
}
|
||||||
|
if (reader_) {
|
||||||
|
tfileReaderDestroy(reader_);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -455,9 +470,10 @@ TEST_F(IndexTFileEnv, test_tfile_write) {
|
||||||
}
|
}
|
||||||
taosArrayDestroy(data);
|
taosArrayDestroy(data);
|
||||||
|
|
||||||
std::string colName("voltage");
|
std::string colName("voltage");
|
||||||
std::string colVal("ab");
|
std::string colVal("ab");
|
||||||
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
SIndexTermQuery query = {.term = term, .qType = QUERY_TERM};
|
SIndexTermQuery query = {.term = term, .qType = QUERY_TERM};
|
||||||
|
|
||||||
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
||||||
|
@ -525,54 +541,62 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
std::string colName("voltage");
|
std::string colName("voltage");
|
||||||
{
|
{
|
||||||
std::string colVal("v1");
|
std::string colVal("v1");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v3");
|
std::string colVal("v3");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v2");
|
std::string colVal("v2");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v3");
|
std::string colVal("v3");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v3");
|
std::string colVal("v3");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
std::string colVal("v3");
|
std::string colVal("v3");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, othColId, version++, suid++);
|
coj->Put(term, othColId, version++, suid++);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v4");
|
std::string colVal("v4");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, othColId, version++, suid++);
|
coj->Put(term, othColId, version++, suid++);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v4");
|
std::string colVal("v4");
|
||||||
for (size_t i = 0; i < 10; i++) {
|
for (size_t i = 0; i < 10; i++) {
|
||||||
colVal[colVal.size() - 1] = 'a' + i;
|
colVal[colVal.size() - 1] = 'a' + i;
|
||||||
SIndexTerm* term =
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
coj->Debug();
|
coj->Debug();
|
||||||
// begin query
|
// begin query
|
||||||
{
|
{
|
||||||
std::string colVal("v3");
|
std::string colVal("v3");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
SIndexTermQuery query = {.term = term, .qType = QUERY_TERM};
|
SIndexTermQuery query = {.term = term, .qType = QUERY_TERM};
|
||||||
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
|
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
|
||||||
STermValueType valType;
|
STermValueType valType;
|
||||||
|
@ -582,8 +606,9 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
assert(taosArrayGetSize(ret) == 4);
|
assert(taosArrayGetSize(ret) == 4);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v2");
|
std::string colVal("v2");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
SIndexTermQuery query = {.term = term, .qType = QUERY_TERM};
|
SIndexTermQuery query = {.term = term, .qType = QUERY_TERM};
|
||||||
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
|
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
|
||||||
STermValueType valType;
|
STermValueType valType;
|
||||||
|
@ -592,3 +617,132 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
assert(taosArrayGetSize(ret) == 1);
|
assert(taosArrayGetSize(ret) == 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
class IndexObj {
|
||||||
|
public:
|
||||||
|
IndexObj() {
|
||||||
|
// opt
|
||||||
|
numOfWrite = 0;
|
||||||
|
numOfRead = 0;
|
||||||
|
indexInit();
|
||||||
|
}
|
||||||
|
int Init(const std::string& dir) {
|
||||||
|
taosRemoveDir(dir.c_str());
|
||||||
|
taosMkDir(dir.c_str());
|
||||||
|
int ret = indexOpen(&opts, dir.c_str(), &idx);
|
||||||
|
if (ret != 0) {
|
||||||
|
// opt
|
||||||
|
std::cout << "failed to open index: %s" << dir << std::endl;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
int Put(SIndexMultiTerm* fvs, uint64_t uid) {
|
||||||
|
numOfWrite += taosArrayGetSize(fvs);
|
||||||
|
return indexPut(idx, fvs, uid);
|
||||||
|
}
|
||||||
|
int Search(SIndexMultiTermQuery* multiQ, SArray* result) {
|
||||||
|
SArray* query = multiQ->query;
|
||||||
|
numOfRead = taosArrayGetSize(query);
|
||||||
|
return indexSearch(idx, multiQ, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Debug() {
|
||||||
|
std::cout << "numOfWrite:" << numOfWrite << std::endl;
|
||||||
|
std::cout << "numOfRead:" << numOfRead << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
~IndexObj() {
|
||||||
|
indexClose(idx);
|
||||||
|
indexCleanUp();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
SIndexOpts opts;
|
||||||
|
SIndex* idx;
|
||||||
|
int numOfWrite;
|
||||||
|
int numOfRead;
|
||||||
|
};
|
||||||
|
|
||||||
|
class IndexEnv2 : public ::testing::Test {
|
||||||
|
protected:
|
||||||
|
virtual void SetUp() {
|
||||||
|
index = new IndexObj();
|
||||||
|
//
|
||||||
|
}
|
||||||
|
virtual void TearDown() {
|
||||||
|
// r
|
||||||
|
delete index;
|
||||||
|
}
|
||||||
|
IndexObj* index;
|
||||||
|
};
|
||||||
|
TEST_F(IndexEnv2, testIndexOpen) {
|
||||||
|
std::string path = "/tmp";
|
||||||
|
if (index->Init(path) != 0) {
|
||||||
|
std::cout << "failed to init index" << std::endl;
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int targetSize = 100;
|
||||||
|
{
|
||||||
|
std::string colName("tag1"), colVal("Hello world");
|
||||||
|
|
||||||
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
|
indexMultiTermAdd(terms, term);
|
||||||
|
for (size_t i = 0; i < targetSize; i++) {
|
||||||
|
int tableId = i;
|
||||||
|
int ret = index->Put(terms, tableId);
|
||||||
|
assert(ret == 0);
|
||||||
|
}
|
||||||
|
indexMultiTermDestroy(terms);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
size_t size = 100;
|
||||||
|
std::string colName("tag1"), colVal("hello world");
|
||||||
|
|
||||||
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
|
indexMultiTermAdd(terms, term);
|
||||||
|
for (size_t i = 0; i < size; i++) {
|
||||||
|
int tableId = i;
|
||||||
|
int ret = index->Put(terms, tableId);
|
||||||
|
assert(ret == 0);
|
||||||
|
}
|
||||||
|
indexMultiTermDestroy(terms);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
std::string colName("tag1"), colVal("Hello world");
|
||||||
|
|
||||||
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
|
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
|
||||||
|
|
||||||
|
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
||||||
|
index->Search(mq, result);
|
||||||
|
assert(taosArrayGetSize(result) == targetSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TEST_F(IndexEnv2, testIndex_CachePut) {
|
||||||
|
std::string path = "/tmp";
|
||||||
|
if (index->Init(path) != 0) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(IndexEnv2, testIndexr_TFilePut) {
|
||||||
|
std::string path = "/tmp";
|
||||||
|
if (index->Init(path) != 0) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TEST_F(IndexEnv2, testIndex_CacheSearch) {
|
||||||
|
std::string path = "/tmp";
|
||||||
|
if (index->Init(path) != 0) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TEST_F(IndexEnv2, testIndex_TFileSearch) {
|
||||||
|
std::string path = "/tmp";
|
||||||
|
if (index->Init(path) != 0) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -791,9 +791,29 @@ void schDropJobAllTasks(SSchJob *job) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t schGenSchId(void) {
|
||||||
|
uint64_t sId = 0;
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
|
||||||
|
qDebug("Gen sId:0x%"PRIx64, sId);
|
||||||
|
|
||||||
|
return sId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t schedulerInit(SSchedulerCfg *cfg) {
|
int32_t schedulerInit(SSchedulerCfg *cfg) {
|
||||||
|
if (schMgmt.jobs) {
|
||||||
|
qError("scheduler already init");
|
||||||
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
if (cfg) {
|
if (cfg) {
|
||||||
schMgmt.cfg = *cfg;
|
schMgmt.cfg = *cfg;
|
||||||
|
|
||||||
|
if (schMgmt.cfg.maxJobNum <= 0) {
|
||||||
|
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
|
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
|
||||||
}
|
}
|
||||||
|
@ -803,18 +823,14 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
|
||||||
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
|
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
schMgmt.sId = 1; //TODO GENERATE A UUID
|
schMgmt.sId = schGenSchId();
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) {
|
int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) {
|
||||||
if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
|
if (qnodeList && taosArrayGetSize(qnodeList) <= 0) {
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosArrayGetSize(qnodeList) <= 0) {
|
|
||||||
qInfo("qnodeList is empty");
|
qInfo("qnodeList is empty");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -882,6 +898,10 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows) {
|
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows) {
|
||||||
|
if (NULL == transport || /* NULL == qnodeList || */ NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == numOfRows) {
|
||||||
|
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
*numOfRows = 0;
|
*numOfRows = 0;
|
||||||
|
|
||||||
SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true));
|
SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true));
|
||||||
|
@ -894,6 +914,10 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) {
|
int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) {
|
||||||
|
if (NULL == transport || NULL == qnodeList ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
|
||||||
|
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
return scheduleExecJobImpl(transport, qnodeList, pDag, pJob, false);
|
return scheduleExecJobImpl(transport, qnodeList, pDag, pJob, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue