enh: add json index
This commit is contained in:
parent
0ef1a19b23
commit
7c9d76a6a6
|
@ -103,10 +103,10 @@ typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
|
||||||
|
|
||||||
typedef struct TAOS_MULTI_BIND {
|
typedef struct TAOS_MULTI_BIND {
|
||||||
int buffer_type;
|
int buffer_type;
|
||||||
void *buffer;
|
void * buffer;
|
||||||
uintptr_t buffer_length;
|
uintptr_t buffer_length;
|
||||||
int32_t *length;
|
int32_t * length;
|
||||||
char *is_null;
|
char * is_null;
|
||||||
int num;
|
int num;
|
||||||
} TAOS_MULTI_BIND;
|
} TAOS_MULTI_BIND;
|
||||||
|
|
||||||
|
@ -130,7 +130,7 @@ DLL_EXPORT void taos_cleanup(void);
|
||||||
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
|
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
|
||||||
DLL_EXPORT setConfRet taos_set_config(const char *config);
|
DLL_EXPORT setConfRet taos_set_config(const char *config);
|
||||||
DLL_EXPORT int taos_init(void);
|
DLL_EXPORT int taos_init(void);
|
||||||
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
|
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
|
||||||
DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen,
|
DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen,
|
||||||
const char *db, int dbLen, uint16_t port);
|
const char *db, int dbLen, uint16_t port);
|
||||||
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
|
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
|
||||||
|
@ -147,17 +147,17 @@ DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name
|
||||||
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
|
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
|
||||||
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
|
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
|
||||||
|
|
||||||
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
|
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
|
||||||
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
|
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
|
||||||
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
|
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
|
||||||
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
|
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
|
||||||
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
|
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
|
||||||
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
|
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
|
||||||
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
|
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
|
||||||
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
|
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
|
||||||
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
|
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
|
||||||
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
|
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
|
||||||
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
|
DLL_EXPORT char * taos_stmt_errstr(TAOS_STMT *stmt);
|
||||||
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
|
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
|
||||||
DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt);
|
DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt);
|
||||||
|
|
||||||
|
@ -179,11 +179,11 @@ DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
|
||||||
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
|
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
|
||||||
DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows);
|
DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows);
|
||||||
DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData);
|
DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData);
|
||||||
DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex);
|
DLL_EXPORT int * taos_get_column_data_offset(TAOS_RES *res, int columnIndex);
|
||||||
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
|
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
|
||||||
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
|
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
|
||||||
|
|
||||||
DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
|
DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
|
||||||
DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);
|
DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);
|
||||||
|
|
||||||
DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
|
DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
|
||||||
|
@ -204,7 +204,7 @@ DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
|
||||||
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
|
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
|
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
|
||||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
|
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
|
||||||
|
|
||||||
/* --------------------------TMQ INTERFACE------------------------------- */
|
/* --------------------------TMQ INTERFACE------------------------------- */
|
||||||
|
@ -229,7 +229,7 @@ DLL_EXPORT tmq_list_t *tmq_list_new();
|
||||||
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
|
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
|
||||||
DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
|
DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
|
||||||
DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *);
|
DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *);
|
||||||
DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
|
DLL_EXPORT char ** tmq_list_to_c_array(const tmq_list_t *);
|
||||||
|
|
||||||
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
|
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
|
||||||
|
|
||||||
|
@ -240,7 +240,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
|
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
|
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
|
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
|
||||||
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
|
DLL_EXPORT TAOS_RES * tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
|
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
|
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
|
||||||
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
|
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
|
||||||
|
@ -260,7 +260,7 @@ enum tmq_conf_res_t {
|
||||||
|
|
||||||
typedef enum tmq_conf_res_t tmq_conf_res_t;
|
typedef enum tmq_conf_res_t tmq_conf_res_t;
|
||||||
|
|
||||||
DLL_EXPORT tmq_conf_t *tmq_conf_new();
|
DLL_EXPORT tmq_conf_t * tmq_conf_new();
|
||||||
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
|
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
|
||||||
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
|
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
|
||||||
DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
|
DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
|
||||||
|
|
|
@ -603,6 +603,9 @@ typedef struct {
|
||||||
} SIdxCursor;
|
} SIdxCursor;
|
||||||
|
|
||||||
int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||||
|
#ifdef USE_INVERTED_INDEX
|
||||||
|
return -1;
|
||||||
|
#else
|
||||||
SIdxCursor *pCursor = NULL;
|
SIdxCursor *pCursor = NULL;
|
||||||
|
|
||||||
int32_t ret = 0, valid = 0;
|
int32_t ret = 0, valid = 0;
|
||||||
|
@ -678,4 +681,5 @@ END:
|
||||||
taosMemoryFree(pCursor);
|
taosMemoryFree(pCursor);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,9 +28,9 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
|
||||||
int vLen = 0;
|
int vLen = 0;
|
||||||
const void *pKey = NULL;
|
const void *pKey = NULL;
|
||||||
const void *pVal = NULL;
|
const void *pVal = NULL;
|
||||||
void *pBuf = NULL;
|
void * pBuf = NULL;
|
||||||
int32_t szBuf = 0;
|
int32_t szBuf = 0;
|
||||||
void *p = NULL;
|
void * p = NULL;
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
|
|
||||||
// validate req
|
// validate req
|
||||||
|
@ -83,8 +83,8 @@ int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid) {
|
||||||
|
|
||||||
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) {
|
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
STbDbKey tbDbKey;
|
STbDbKey tbDbKey;
|
||||||
void *pKey = NULL;
|
void * pKey = NULL;
|
||||||
void *pVal = NULL;
|
void * pVal = NULL;
|
||||||
int kLen = 0;
|
int kLen = 0;
|
||||||
int vLen = 0;
|
int vLen = 0;
|
||||||
SEncoder coder = {0};
|
SEncoder coder = {0};
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include "meta.h"
|
#include "meta.h"
|
||||||
|
|
||||||
|
static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
|
||||||
static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME);
|
static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
|
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME);
|
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
|
@ -25,7 +26,7 @@ static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
|
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
|
||||||
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type);
|
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type);
|
||||||
|
|
||||||
static int metaUpdateMetaRsp(tb_uid_t uid, char* tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp) {
|
static int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp) {
|
||||||
pMetaRsp->pSchemas = taosMemoryMalloc(pSchema->nCols * sizeof(SSchema));
|
pMetaRsp->pSchemas = taosMemoryMalloc(pSchema->nCols * sizeof(SSchema));
|
||||||
if (NULL == pMetaRsp->pSchemas) {
|
if (NULL == pMetaRsp->pSchemas) {
|
||||||
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||||
|
@ -43,6 +44,75 @@ static int metaUpdateMetaRsp(tb_uid_t uid, char* tbName, SSchemaWrapper *pSchema
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema) {
|
||||||
|
#ifdef USE_INVERTED_INDEX
|
||||||
|
if (pMeta->pTagIvtIdx == NULL || pCtbEntry == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
void * data = pCtbEntry->ctbEntry.pTags;
|
||||||
|
const char *tagName = pSchema->name;
|
||||||
|
|
||||||
|
tb_uid_t suid = pCtbEntry->ctbEntry.suid;
|
||||||
|
tb_uid_t tuid = pCtbEntry->uid;
|
||||||
|
const void *pTagData = pCtbEntry->ctbEntry.pTags;
|
||||||
|
int32_t nTagData = 0;
|
||||||
|
|
||||||
|
SArray *pTagVals = NULL;
|
||||||
|
if (tTagToValArray((const STag *)data, &pTagVals) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
SIndexMultiTerm *terms = indexMultiTermCreate();
|
||||||
|
int16_t nCols = taosArrayGetSize(pTagVals);
|
||||||
|
for (int i = 0; i < nCols; i++) {
|
||||||
|
STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i);
|
||||||
|
char type = pTagVal->type;
|
||||||
|
|
||||||
|
char *Key = pTagVal->pKey;
|
||||||
|
char *key = taosMemoryCalloc(1, strlen(Key) + 2 + strlen(tagName));
|
||||||
|
sprintf(key, "%s_%s", tagName, Key);
|
||||||
|
|
||||||
|
int32_t nKey = strlen(key);
|
||||||
|
SIndexTerm *term = NULL;
|
||||||
|
|
||||||
|
if (type == TSDB_DATA_TYPE_NULL) {
|
||||||
|
} else if (type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
if (pTagVal->nData > 0) {
|
||||||
|
char * val = taosMemoryCalloc(1, pTagVal->nData);
|
||||||
|
int32_t len = taosUcs4ToMbs((TdUcs4 *)pTagVal->pData, pTagVal->nData, val);
|
||||||
|
// printf("val: %s, len: %d", val, len);
|
||||||
|
|
||||||
|
char *tval = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
|
||||||
|
memcpy(tval, (uint16_t *)&len, VARSTR_HEADER_SIZE);
|
||||||
|
memcpy(tval + VARSTR_HEADER_SIZE, val, len);
|
||||||
|
type = TSDB_DATA_TYPE_VARCHAR;
|
||||||
|
term = indexTermCreate(suid, ADD_VALUE, type, key, nKey, tval, len + 2);
|
||||||
|
} else if (pTagVal->nData == 0) {
|
||||||
|
char * val = NULL;
|
||||||
|
int32_t len = 0;
|
||||||
|
// handle NULL key
|
||||||
|
}
|
||||||
|
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
|
double val = *(double *)(&pTagVal->i64);
|
||||||
|
int len = 0;
|
||||||
|
term = indexTermCreate(suid, ADD_VALUE, type, key, nKey, (const char *)&val, len);
|
||||||
|
} else if (type == TSDB_DATA_TYPE_BOOL) {
|
||||||
|
int val = *(int *)(&pTagVal->i64);
|
||||||
|
int len = 0;
|
||||||
|
term = indexTermCreate(suid, ADD_VALUE, type, key, nKey, (const char *)&val, len);
|
||||||
|
}
|
||||||
|
if (term == NULL) {
|
||||||
|
// handle except later
|
||||||
|
} else {
|
||||||
|
indexMultiTermAdd(terms, term);
|
||||||
|
}
|
||||||
|
taosMemoryFree(key);
|
||||||
|
}
|
||||||
|
tIndexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
|
||||||
|
indexMultiTermDestroy(terms);
|
||||||
|
#endif
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
SMetaEntry me = {0};
|
SMetaEntry me = {0};
|
||||||
int kLen = 0;
|
int kLen = 0;
|
||||||
|
@ -341,7 +411,6 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq, STableMetaRsp *pMetaRsp) {
|
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq, STableMetaRsp *pMetaRsp) {
|
||||||
void * pVal = NULL;
|
void * pVal = NULL;
|
||||||
int nVal = 0;
|
int nVal = 0;
|
||||||
|
@ -824,6 +893,9 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
||||||
} else {
|
} else {
|
||||||
// pTagData = pCtbEntry->ctbEntry.pTags;
|
// pTagData = pCtbEntry->ctbEntry.pTags;
|
||||||
// nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
|
// nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
|
||||||
|
pTagData = pCtbEntry->ctbEntry.pTags;
|
||||||
|
nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
|
||||||
|
return metaSaveJsonVarToIdx(pMeta, pCtbEntry, pTagColumn);
|
||||||
}
|
}
|
||||||
|
|
||||||
// update tag index
|
// update tag index
|
||||||
|
|
|
@ -202,7 +202,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
|
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
|
||||||
int32_t sz = indexSerialCacheKey(&key, buf);
|
int32_t sz = indexSerialCacheKey(&key, buf);
|
||||||
indexDebug("suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
|
indexDebug("w suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
|
||||||
|
|
||||||
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
|
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
|
||||||
assert(*cache != NULL);
|
assert(*cache != NULL);
|
||||||
|
@ -330,7 +330,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
ICacheKey key = {
|
ICacheKey key = {
|
||||||
.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
|
.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
|
||||||
indexDebug("suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
|
indexDebug("r suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
|
||||||
int32_t sz = indexSerialCacheKey(&key, buf);
|
int32_t sz = indexSerialCacheKey(&key, buf);
|
||||||
|
|
||||||
taosThreadMutexLock(&sIdx->mtx);
|
taosThreadMutexLock(&sIdx->mtx);
|
||||||
|
|
|
@ -402,16 +402,16 @@ int32_t indexConvertDataToStr(void* src, int8_t type, void** dst) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
|
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
|
||||||
tlen = taosEncodeBinary(NULL, src, strlen(src));
|
tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src));
|
||||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||||
tlen = taosEncodeBinary(dst, src, strlen(src));
|
tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src));
|
||||||
*dst = (char*)*dst - tlen;
|
*dst = (char*)*dst - tlen;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_VARBINARY:
|
case TSDB_DATA_TYPE_VARBINARY:
|
||||||
tlen = taosEncodeBinary(NULL, src, strlen(src));
|
tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src));
|
||||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||||
tlen = taosEncodeBinary(dst, src, strlen(src));
|
tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src));
|
||||||
*dst = (char*)*dst - tlen;
|
*dst = (char*)*dst - tlen;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -162,12 +162,27 @@ static int32_t sifGetValueFromNode(SNode *node, char **value) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
||||||
|
SOperatorNode *nd = (SOperatorNode *)node;
|
||||||
|
assert(nodeType(node) == QUERY_NODE_OPERATOR);
|
||||||
|
SColumnNode *l = (SColumnNode *)nd->pLeft;
|
||||||
|
SValueNode * r = (SValueNode *)nd->pRight;
|
||||||
|
|
||||||
|
param->colId = l->colId;
|
||||||
|
param->colValType = l->node.resType.type;
|
||||||
|
memcpy(param->dbName, l->dbName, sizeof(l->dbName));
|
||||||
|
sprintf(param->colName, "%s_%s", l->colName, r->literal);
|
||||||
|
param->colValType = r->typeData;
|
||||||
|
return 0;
|
||||||
|
// memcpy(param->colName, l->colName, sizeof(l->colName));
|
||||||
|
}
|
||||||
static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
||||||
switch (nodeType(node)) {
|
switch (nodeType(node)) {
|
||||||
case QUERY_NODE_VALUE: {
|
case QUERY_NODE_VALUE: {
|
||||||
SValueNode *vn = (SValueNode *)node;
|
SValueNode *vn = (SValueNode *)node;
|
||||||
SIF_ERR_RET(sifGetValueFromNode(node, ¶m->condValue));
|
SIF_ERR_RET(sifGetValueFromNode(node, ¶m->condValue));
|
||||||
param->colId = -1;
|
param->colId = -1;
|
||||||
|
param->colValType = (uint8_t)(vn->node.resType.type);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_COLUMN: {
|
case QUERY_NODE_COLUMN: {
|
||||||
|
@ -219,17 +234,31 @@ static int32_t sifInitOperParams(SIFParam **params, SOperatorNode *node, SIFCtx
|
||||||
indexError("invalid operation node, left: %p, rigth: %p", node->pLeft, node->pRight);
|
indexError("invalid operation node, left: %p, rigth: %p", node->pLeft, node->pRight);
|
||||||
SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
if (node->opType == OP_TYPE_JSON_GET_VALUE || node->opType == OP_TYPE_JSON_CONTAINS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
SIFParam *paramList = taosMemoryCalloc(nParam, sizeof(SIFParam));
|
SIFParam *paramList = taosMemoryCalloc(nParam, sizeof(SIFParam));
|
||||||
if (NULL == paramList) {
|
if (NULL == paramList) {
|
||||||
SIF_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SIF_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
SIF_ERR_JRET(sifInitParam(node->pLeft, ¶mList[0], ctx));
|
if (nodeType(node->pLeft) == QUERY_NODE_OPERATOR) {
|
||||||
if (nParam > 1) {
|
SNode *interNode = (node->pLeft);
|
||||||
SIF_ERR_JRET(sifInitParam(node->pRight, ¶mList[1], ctx));
|
SIF_ERR_JRET(sifInitJsonParam(interNode, ¶mList[0], ctx));
|
||||||
|
if (nParam > 1) {
|
||||||
|
SIF_ERR_JRET(sifInitParam(node->pRight, ¶mList[1], ctx));
|
||||||
|
}
|
||||||
|
paramList[0].colValType = TSDB_DATA_TYPE_JSON;
|
||||||
|
*params = paramList;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else {
|
||||||
|
SIF_ERR_JRET(sifInitParam(node->pLeft, ¶mList[0], ctx));
|
||||||
|
if (nParam > 1) {
|
||||||
|
SIF_ERR_JRET(sifInitParam(node->pRight, ¶mList[1], ctx));
|
||||||
|
}
|
||||||
|
*params = paramList;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
*params = paramList;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
_return:
|
_return:
|
||||||
taosMemoryFree(paramList);
|
taosMemoryFree(paramList);
|
||||||
SIF_RET(code);
|
SIF_RET(code);
|
||||||
|
@ -307,18 +336,23 @@ static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
|
||||||
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
|
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
|
||||||
SIndexMetaArg *arg = &output->arg;
|
SIndexMetaArg *arg = &output->arg;
|
||||||
#ifdef USE_INVERTED_INDEX
|
#ifdef USE_INVERTED_INDEX
|
||||||
SIndexTerm *tm = indexTermCreate(arg->suid, DEFAULT, left->colValType, left->colName, strlen(left->colName),
|
SIndexTerm *tm = indexTermCreate(arg->suid, DEFAULT, right->colValType, left->colName, strlen(left->colName),
|
||||||
right->condValue, strlen(right->condValue));
|
right->condValue, strlen(right->condValue));
|
||||||
if (tm == NULL) {
|
if (tm == NULL) {
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ret = 0;
|
||||||
EIndexQueryType qtype = 0;
|
EIndexQueryType qtype = 0;
|
||||||
SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype));
|
SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype));
|
||||||
|
|
||||||
SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST);
|
||||||
indexMultiTermQueryAdd(mtm, tm, qtype);
|
indexMultiTermQueryAdd(mtm, tm, qtype);
|
||||||
int ret = indexSearch(arg->metaHandle, mtm, output->result);
|
if (left->colValType == TSDB_DATA_TYPE_JSON) {
|
||||||
|
ret = tIndexJsonSearch(arg->metaHandle, mtm, output->result);
|
||||||
|
} else {
|
||||||
|
ret = indexSearch(arg->metaHandle, mtm, output->result);
|
||||||
|
}
|
||||||
indexDebug("index filter data size: %d", (int)taosArrayGetSize(output->result));
|
indexDebug("index filter data size: %d", (int)taosArrayGetSize(output->result));
|
||||||
indexMultiTermQueryDestroy(mtm);
|
indexMultiTermQueryDestroy(mtm);
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -392,6 +426,14 @@ static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output
|
||||||
int id = OP_TYPE_NMATCH;
|
int id = OP_TYPE_NMATCH;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
|
static int32_t sifJsonContains(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
|
// return 0
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
static int32_t sifJsonGetValue(SIFParam *left, SIFParam *rigth, SIFParam *output) {
|
||||||
|
// return 0
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
// add more except
|
// add more except
|
||||||
|
@ -445,6 +487,14 @@ static int32_t sifGetOperFn(int32_t funcId, sif_func_t *func, SIdxFltStatus *sta
|
||||||
*status = SFLT_NOT_INDEX;
|
*status = SFLT_NOT_INDEX;
|
||||||
*func = sifNotMatchFunc;
|
*func = sifNotMatchFunc;
|
||||||
return 0;
|
return 0;
|
||||||
|
case OP_TYPE_JSON_CONTAINS:
|
||||||
|
*status = SFLT_ACCURATE_INDEX;
|
||||||
|
*func = sifJsonContains;
|
||||||
|
return 0;
|
||||||
|
case OP_TYPE_JSON_GET_VALUE:
|
||||||
|
*status = SFLT_ACCURATE_INDEX;
|
||||||
|
*func = sifJsonGetValue;
|
||||||
|
return 0;
|
||||||
default:
|
default:
|
||||||
*status = SFLT_NOT_INDEX;
|
*status = SFLT_NOT_INDEX;
|
||||||
*func = sifNullFunc;
|
*func = sifNullFunc;
|
||||||
|
@ -460,13 +510,15 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
||||||
if (nParam <= 1) {
|
if (nParam <= 1) {
|
||||||
SIF_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
SIF_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
if (node->opType == OP_TYPE_JSON_GET_VALUE || node->opType == OP_TYPE_JSON_CONTAINS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
SIFParam *params = NULL;
|
SIFParam *params = NULL;
|
||||||
SIF_ERR_RET(sifInitOperParams(¶ms, node, ctx));
|
|
||||||
|
|
||||||
|
SIF_ERR_RET(sifInitOperParams(¶ms, node, ctx));
|
||||||
// ugly code, refactor later
|
// ugly code, refactor later
|
||||||
output->arg = ctx->arg;
|
output->arg = ctx->arg;
|
||||||
|
|
||||||
sif_func_t operFn = sifNullFunc;
|
sif_func_t operFn = sifNullFunc;
|
||||||
code = sifGetOperFn(node->opType, &operFn, &output->status);
|
code = sifGetOperFn(node->opType, &operFn, &output->status);
|
||||||
if (ctx->noExec) {
|
if (ctx->noExec) {
|
||||||
|
@ -573,7 +625,9 @@ EDealRes sifCalcWalker(SNode *node, void *context) {
|
||||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(node)) {
|
if (QUERY_NODE_LOGIC_CONDITION == nodeType(node)) {
|
||||||
return sifWalkLogic(node, ctx);
|
return sifWalkLogic(node, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QUERY_NODE_OPERATOR == nodeType(node)) {
|
if (QUERY_NODE_OPERATOR == nodeType(node)) {
|
||||||
|
indexInfo("node type for index filter, type: %d", nodeType(node));
|
||||||
return sifWalkOper(node, ctx);
|
return sifWalkOper(node, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue