opti:schemaless logic

This commit is contained in:
wangmm0220 2023-12-07 14:07:09 +08:00
parent f4b0e7baf5
commit 26855848f6
5 changed files with 522 additions and 1054 deletions

View File

@ -65,14 +65,32 @@ extern "C" {
#define IS_INVALID_COL_LEN(len) ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN) #define IS_INVALID_COL_LEN(len) ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN) #define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)
//#define TS "_ts"
//#define TS_LEN 3
#define VALUE "_value" #define VALUE "_value"
#define VALUE_LEN 6 #define VALUE_LEN (sizeof(VALUE)-1)
#define OTD_JSON_FIELDS_NUM 4 #define OTD_JSON_FIELDS_NUM 4
#define MAX_RETRY_TIMES 10 #define MAX_RETRY_TIMES 10
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
#define IS_SAME_CHILD_TABLE (elements->measureTagsLen == info->preLine.measureTagsLen \
&& memcmp(elements->measure, info->preLine.measure, elements->measureTagsLen) == 0)
#define IS_SAME_SUPER_TABLE (elements->measureLen == info->preLine.measureLen \
&& memcmp(elements->measure, info->preLine.measure, elements->measureLen) == 0)
#define IS_SAME_KEY (maxKV->type == kv->type && maxKV->keyLen == kv->keyLen && memcmp(maxKV->key, kv->key, kv->keyLen) == 0)
#define IS_SLASH_LETTER_IN_MEASUREMENT(sql) \
(*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE))
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
#define PROCESS_SLASH_IN_MEASUREMENT(key, keyLen) \
for (int i = 1; i < keyLen; ++i) { \
if (IS_SLASH_LETTER_IN_MEASUREMENT(key + i)) { \
MOVE_FORWARD_ONE(key + i, keyLen - i); \
keyLen--; \
} \
}
typedef enum { typedef enum {
SCHEMA_ACTION_NULL, SCHEMA_ACTION_NULL,
@ -83,18 +101,6 @@ typedef enum {
SCHEMA_ACTION_CHANGE_TAG_SIZE, SCHEMA_ACTION_CHANGE_TAG_SIZE,
} ESchemaAction; } ESchemaAction;
typedef struct {
const void *key;
int32_t keyLen;
void *value;
bool used;
}Node;
typedef struct NodeList{
Node data;
struct NodeList* next;
}NodeList;
typedef struct { typedef struct {
char *measure; char *measure;
char *tags; char *tags;
@ -117,7 +123,6 @@ typedef struct {
int32_t sTableNameLen; int32_t sTableNameLen;
char childTableName[TSDB_TABLE_NAME_LEN]; char childTableName[TSDB_TABLE_NAME_LEN];
uint64_t uid; uint64_t uid;
// void *key; // for openTsdb
SArray *tags; SArray *tags;
@ -161,7 +166,8 @@ typedef struct {
typedef struct { typedef struct {
int64_t id; int64_t id;
SMLProtocolType protocol; TSDB_SML_PROTOCOL_TYPE protocol;
int8_t precision; int8_t precision;
bool reRun; bool reRun;
bool dataFormat; // true means that the name and order of keys in each line are the same(only for influx protocol) bool dataFormat; // true means that the name and order of keys in each line are the same(only for influx protocol)
@ -201,29 +207,8 @@ typedef struct {
bool needModifySchema; bool needModifySchema;
} SSmlHandle; } SSmlHandle;
#define IS_SAME_CHILD_TABLE (elements->measureTagsLen == info->preLine.measureTagsLen \ extern int64_t smlFactorNS[];
&& memcmp(elements->measure, info->preLine.measure, elements->measureTagsLen) == 0) extern int64_t smlFactorS[];
#define IS_SAME_SUPER_TABLE (elements->measureLen == info->preLine.measureLen \
&& memcmp(elements->measure, info->preLine.measure, elements->measureLen) == 0)
#define IS_SAME_KEY (maxKV->keyLen == kv.keyLen && memcmp(maxKV->key, kv.key, kv.keyLen) == 0)
#define IS_SLASH_LETTER_IN_MEASUREMENT(sql) \
(*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE))
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
#define PROCESS_SLASH_IN_MEASUREMENT(key, keyLen) \
for (int i = 1; i < keyLen; ++i) { \
if (IS_SLASH_LETTER_IN_MEASUREMENT(key + i)) { \
MOVE_FORWARD_ONE(key + i, keyLen - i); \
keyLen--; \
} \
}
extern int64_t smlFactorNS[3];
extern int64_t smlFactorS[3];
typedef int32_t (*_equal_fn_sml)(const void *, const void *); typedef int32_t (*_equal_fn_sml)(const void *, const void *);
@ -231,11 +216,7 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos);
void smlDestroyInfo(SSmlHandle *info); void smlDestroyInfo(SSmlHandle *info);
int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset); int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset);
int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset); int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset);
//SArray *smlJsonParseTags(char *start, char *end);
bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg); bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg);
//void* nodeListGet(NodeList* list, const void *key, int32_t len, _equal_fn_sml fn);
//int nodeListSet(NodeList** list, const void *key, int32_t len, void* value, _equal_fn_sml fn);
//int nodeListSize(NodeList* list);
bool smlDoubleToInt64OverFlow(double num); bool smlDoubleToInt64OverFlow(double num);
int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2); int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2);
bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg); bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg);
@ -253,12 +234,22 @@ int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg);
uint8_t smlGetTimestampLen(int64_t num); uint8_t smlGetTimestampLen(int64_t num);
void smlDestroyTableInfo(void *para); void smlDestroyTableInfo(void *para);
void freeSSmlKv(void* data); void freeSSmlKv(void* data);
int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements); int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements); int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
int32_t smlParseJSON(SSmlHandle *info, char *payload); int32_t smlParseJSON(SSmlHandle *info, char *payload);
void smlStrReplace(char* src, int32_t len); void smlStrReplace(char* src, int32_t len);
SSmlSTableMeta* smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement);
bool isSmlTagAligned(SSmlHandle *info, int cnt, SSmlKv *kv);
bool isSmlColAligned(SSmlHandle *info, int cnt, SSmlKv *kv);
int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements);
int32_t smlProcessSuperTable(SSmlHandle *info, SSmlLineInfo *elements);
int32_t smlJoinMeasureTag(SSmlLineInfo *elements);
void smlBuildTsKv(SSmlKv *kv, int64_t ts);
int32_t smlParseEndTelnetJson(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv);
int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -20,75 +20,93 @@
#include "clientSml.h" #include "clientSml.h"
int64_t smlToMilli[3] = {3600000LL, 60000LL, 1000LL}; #define RETURN_FALSE \
int64_t smlFactorNS[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1}; smlBuildInvalidDataMsg(msg, "invalid data", pVal); \
int64_t smlFactorS[3] = {1000LL, 1000000LL, 1000000000LL}; return false;
//void *nodeListGet(NodeList *list, const void *key, int32_t len, _equal_fn_sml fn) { #define SET_DOUBLE \
// NodeList *tmp = list; kvVal->type = TSDB_DATA_TYPE_DOUBLE; \
// while (tmp) { kvVal->d = result;
// if (fn == NULL) {
// if (tmp->data.used && tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) { #define SET_FLOAT \
// return tmp->data.value; if (!IS_VALID_FLOAT(result)) { \
// } smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal); \
// } else { return false; \
// if (tmp->data.used && fn(tmp->data.key, key) == 0) { } \
// return tmp->data.value; kvVal->type = TSDB_DATA_TYPE_FLOAT; \
// } kvVal->f = (float)result;
// }
// #define SET_BIGINT \
// tmp = tmp->next; errno = 0; \
// } int64_t tmp = taosStr2Int64(pVal, &endptr, 10); \
// return NULL; if (errno == ERANGE) { \
//} smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal); \
// return false; \
//int nodeListSet(NodeList **list, const void *key, int32_t len, void *value, _equal_fn_sml fn) { } \
// NodeList *tmp = *list; kvVal->type = TSDB_DATA_TYPE_BIGINT; \
// while (tmp) { kvVal->i = tmp;
// if (!tmp->data.used) break;
// if (fn == NULL) { #define SET_INT \
// if (tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) { if (!IS_VALID_INT(result)) { \
// return -1; smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal); \
// } return false; \
// } else { } \
// if (tmp->data.keyLen == len && fn(tmp->data.key, key) == 0) { kvVal->type = TSDB_DATA_TYPE_INT; \
// return -1; kvVal->i = result;
// }
// } #define SET_SMALL_INT \
// if (!IS_VALID_SMALLINT(result)) { \
// tmp = tmp->next; smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal); \
// } return false; \
// if (tmp) { } \
// tmp->data.key = key; kvVal->type = TSDB_DATA_TYPE_SMALLINT; \
// tmp->data.keyLen = len; kvVal->i = result;
// tmp->data.value = value;
// tmp->data.used = true; #define SET_UBIGINT \
// } else { errno = 0; \
// NodeList *newNode = (NodeList *)taosMemoryCalloc(1, sizeof(NodeList)); uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10); \
// if (newNode == NULL) { if (errno == ERANGE || result < 0) { \
// return -1; smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal); \
// } return false; \
// newNode->data.key = key; } \
// newNode->data.keyLen = len; kvVal->type = TSDB_DATA_TYPE_UBIGINT; \
// newNode->data.value = value; kvVal->u = tmp;
// newNode->data.used = true;
// newNode->next = *list; #define SET_UINT \
// *list = newNode; if (!IS_VALID_UINT(result)) { \
// } smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal); \
// return 0; return false; \
//} } \
// kvVal->type = TSDB_DATA_TYPE_UINT; \
//int nodeListSize(NodeList *list) { kvVal->u = result;
// int cnt = 0;
// while (list) { #define SET_USMALL_INT \
// if (list->data.used) if (!IS_VALID_USMALLINT(result)) { \
// cnt++; smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal); \
// else return false; \
// break; } \
// list = list->next; kvVal->type = TSDB_DATA_TYPE_USMALLINT; \
// } kvVal->u = result;
// return cnt;
//} #define SET_TINYINT \
if (!IS_VALID_TINYINT(result)) { \
smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal); \
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_TINYINT; \
kvVal->i = result;
#define SET_UTINYINT \
if (!IS_VALID_UTINYINT(result)) { \
smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal); \
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_UTINYINT; \
kvVal->u = result;
int64_t smlToMilli[] = {3600000LL, 60000LL, 1000LL};
int64_t smlFactorNS[] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
int64_t smlFactorS[] = {1000LL, 1000000LL, 1000000000LL};
static int32_t smlCheckAuth(SSmlHandle *info, SRequestConnInfo* conn, const char* pTabName, AUTH_TYPE type){ static int32_t smlCheckAuth(SSmlHandle *info, SRequestConnInfo* conn, const char* pTabName, AUTH_TYPE type){
SUserAuthInfo pAuth = {0}; SUserAuthInfo pAuth = {0};
@ -114,7 +132,7 @@ inline bool smlDoubleToInt64OverFlow(double num) {
return false; return false;
} }
void smlStrReplace(char* src, int32_t len){ inline void smlStrReplace(char* src, int32_t len){
if (!tsSmlDot2Underline) return; if (!tsSmlDot2Underline) return;
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
if(src[i] == '.'){ if(src[i] == '.'){
@ -155,7 +173,7 @@ int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, u
return convertTimePrecision(tsInt64, fromPrecision, toPrecision); return convertTimePrecision(tsInt64, fromPrecision, toPrecision);
} }
int8_t smlGetTsTypeByLen(int32_t len) { inline int8_t smlGetTsTypeByLen(int32_t len) {
if (len == TSDB_TIME_PRECISION_SEC_DIGITS) { if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
return TSDB_TIME_PRECISION_SECONDS; return TSDB_TIME_PRECISION_SECONDS;
} else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) { } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
@ -180,11 +198,6 @@ SSmlTableInfo *smlBuildTableInfo(int numRows, const char *measure, int32_t measu
goto cleanup; goto cleanup;
} }
// tag->tags = taosArrayInit(16, sizeof(SSmlKv));
// if (tag->tags == NULL) {
// uError("SML:smlBuildTableInfo failed to allocate memory");
// goto cleanup;
// }
return tag; return tag;
cleanup: cleanup:
@ -192,6 +205,242 @@ cleanup:
return NULL; return NULL;
} }
void smlBuildTsKv(SSmlKv *kv, int64_t ts){
kv->key = tsSmlTsDefaultName;
kv->keyLen = strlen(tsSmlTsDefaultName);
kv->type = TSDB_DATA_TYPE_TIMESTAMP;
kv->i = ts;
kv->length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
}
SSmlSTableMeta* smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement){
SSmlSTableMeta* sMeta = NULL;
char *measure = currElement->measure;
int measureLen = currElement->measureLen;
if (currElement->measureEscaped) {
measure = (char *)taosMemoryMalloc(measureLen);
memcpy(measure, currElement->measure, measureLen);
PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);
smlStrReplace(measure, measureLen);
}
STableMeta *pTableMeta = smlGetMeta(info, measure, measureLen);
if (currElement->measureEscaped) {
taosMemoryFree(measure);
}
if (pTableMeta == NULL) {
info->dataFormat = false;
info->reRun = true;
terrno = TSDB_CODE_SUCCESS;
return sMeta;
}
sMeta = smlBuildSTableMeta(info->dataFormat);
if(sMeta == NULL){
taosMemoryFreeClear(pTableMeta);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return sMeta;
}
sMeta->tableMeta = pTableMeta;
taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &sMeta, POINTER_BYTES);
for (int i = 1; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++) {
SSchema *col = pTableMeta->schema + i;
SSmlKv kv = {.key = col->name, .keyLen = strlen(col->name), .type = col->type};
if (col->type == TSDB_DATA_TYPE_NCHAR) {
kv.length = (col->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
} else if (col->type == TSDB_DATA_TYPE_BINARY || col->type == TSDB_DATA_TYPE_GEOMETRY || col->type == TSDB_DATA_TYPE_VARBINARY) {
kv.length = col->bytes - VARSTR_HEADER_SIZE;
} else{
kv.length = col->bytes;
}
if(i < pTableMeta->tableInfo.numOfColumns){
taosArrayPush(sMeta->cols, &kv);
}else{
taosArrayPush(sMeta->tags, &kv);
}
}
return sMeta;
}
bool isSmlColAligned(SSmlHandle *info, int cnt, SSmlKv *kv){
// cnt begin 0, add ts so + 2
if (unlikely(cnt + 2 > info->currSTableMeta->tableInfo.numOfColumns)) {
info->dataFormat = false;
info->reRun = true;
return false;
}
// bind data
int32_t ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, kv, cnt + 1);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uDebug("smlBuildCol error, retry");
info->dataFormat = false;
info->reRun = true;
return false;
}
if (cnt >= taosArrayGetSize(info->maxColKVs)) {
info->dataFormat = false;
info->reRun = true;
return false;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxColKVs, cnt);
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return false;
}
if (unlikely(IS_VAR_DATA_TYPE(kv->type) && kv->length > maxKV->length)) {
maxKV->length = kv->length;
info->needModifySchema = true;
}
return true;
}
bool isSmlTagAligned(SSmlHandle *info, int cnt, SSmlKv *kv){
if (unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)) {
goto END;
}
if (unlikely(cnt >= taosArrayGetSize(info->maxTagKVs))) {
goto END;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxTagKVs, cnt);
if (unlikely(!IS_SAME_KEY)) {
goto END;
}
if (unlikely(kv->length > maxKV->length)) {
maxKV->length = kv->length;
info->needModifySchema = true;
}
return true;
END:
info->dataFormat = false;
info->reRun = true;
return false;
}
int32_t smlJoinMeasureTag(SSmlLineInfo *elements){
elements->measureTag = (char *)taosMemoryMalloc(elements->measureLen + elements->tagsLen);
if(elements->measureTag == NULL){
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(elements->measureTag, elements->measure, elements->measureLen);
memcpy(elements->measureTag + elements->measureLen, elements->tags, elements->tagsLen);
elements->measureTagsLen = elements->measureLen + elements->tagsLen;
return TSDB_CODE_SUCCESS;
}
int32_t smlProcessSuperTable(SSmlHandle *info, SSmlLineInfo *elements) {
bool isSameMeasure = IS_SAME_SUPER_TABLE;
if(isSameMeasure) {
return 0;
}
SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
SSmlSTableMeta *sMeta = NULL;
if (unlikely(tmp == NULL)) {
sMeta = smlBuildSuperTableInfo(info, elements);
if(sMeta == NULL) return -1;
}else{
sMeta = *tmp;
}
ASSERT(sMeta != NULL);
info->currSTableMeta = sMeta->tableMeta;
info->maxTagKVs = sMeta->tags;
info->maxColKVs = sMeta->cols;
return 0;
}
int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements){
SSmlTableInfo **oneTable =
(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureTagsLen);
SSmlTableInfo *tinfo = NULL;
if (unlikely(oneTable == NULL)) {
tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
if (unlikely(!tinfo)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosHashPut(info->childTables, elements->measureTag, elements->measureTagsLen, &tinfo, POINTER_BYTES);
tinfo->tags = taosArrayDup(info->preLineTagKV, NULL);
for (size_t i = 0; i < taosArrayGetSize(info->preLineTagKV); i++) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(info->preLineTagKV, i);
if (kv->keyEscaped) kv->key = NULL;
if (kv->valueEscaped) kv->value = NULL;
}
smlSetCTableName(tinfo);
getTableUid(info, elements, tinfo);
if (info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
smlDestroyTableInfo(&tinfo);
return TSDB_CODE_SML_INVALID_DATA;
}
}
}else{
tinfo = *oneTable;
}
ASSERT(tinfo != NULL);
if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx;
return TSDB_CODE_SUCCESS;
}
int32_t smlParseEndTelnetJson(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv){
if (info->dataFormat) {
uDebug("SML:0x%" PRIx64 " smlParseEndTelnetJson format true, ts:%" PRId64, info->id, kvTs->i);
int32_t ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, kvTs, 0);
if (ret == TSDB_CODE_SUCCESS) {
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, kv, 1);
}
if (ret == TSDB_CODE_SUCCESS) {
ret = smlBuildRow(info->currTableDataCtx);
}
clearColValArraySml(info->currTableDataCtx->pValues);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
} else {
uDebug("SML:0x%" PRIx64 " smlParseEndTelnetJson format false, ts:%" PRId64, info->id, kvTs->i);
if (elements->colArray == NULL) {
elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
}
taosArrayPush(elements->colArray, kvTs);
taosArrayPush(elements->colArray, kv);
}
info->preLine = *elements;
return TSDB_CODE_SUCCESS;
}
int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs){
if (info->dataFormat) {
uDebug("SML:0x%" PRIx64 " smlParseEndLine format true, ts:%" PRId64, info->id, kvTs->i);
int32_t ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
if (ret == TSDB_CODE_SUCCESS) {
ret = smlBuildRow(info->currTableDataCtx);
}
clearColValArraySml(info->currTableDataCtx->pValues);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
} else {
uDebug("SML:0x%" PRIx64 " smlParseEndLine format false, ts:%" PRId64, info->id, kvTs->i);
taosArraySet(elements->colArray, 0, &kvTs);
}
info->preLine = *elements;
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseTableName(SArray *tags, char *childTableName) { static int32_t smlParseTableName(SArray *tags, char *childTableName) {
bool autoChildName = false; bool autoChildName = false;
size_t delimiter = strlen(tsSmlAutoChildTableNameDelimiter); size_t delimiter = strlen(tsSmlAutoChildTableNameDelimiter);
@ -328,98 +577,6 @@ cleanup:
return NULL; return NULL;
} }
// uint16_t smlCalTypeSum(char* endptr, int32_t left){
// uint16_t sum = 0;
// for(int i = 0; i < left; i++){
// sum += endptr[i];
// }
// return sum;
// }
#define RETURN_FALSE \
smlBuildInvalidDataMsg(msg, "invalid data", pVal); \
return false;
#define SET_DOUBLE \
kvVal->type = TSDB_DATA_TYPE_DOUBLE; \
kvVal->d = result;
#define SET_FLOAT \
if (!IS_VALID_FLOAT(result)) { \
smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal); \
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_FLOAT; \
kvVal->f = (float)result;
#define SET_BIGINT \
errno = 0; \
int64_t tmp = taosStr2Int64(pVal, &endptr, 10); \
if (errno == ERANGE) { \
smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal); \
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_BIGINT; \
kvVal->i = tmp;
#define SET_INT \
if (!IS_VALID_INT(result)) { \
smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal); \
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_INT; \
kvVal->i = result;
#define SET_SMALL_INT \
if (!IS_VALID_SMALLINT(result)) { \
smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal); \
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_SMALLINT; \
kvVal->i = result;
#define SET_UBIGINT \
errno = 0; \
uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10); \
if (errno == ERANGE || result < 0) { \
smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal); \
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_UBIGINT; \
kvVal->u = tmp;
#define SET_UINT \
if (!IS_VALID_UINT(result)) { \
smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal); \
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_UINT; \
kvVal->u = result;
#define SET_USMALL_INT \
if (!IS_VALID_USMALLINT(result)) { \
smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal); \
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_USMALLINT; \
kvVal->u = result;
#define SET_TINYINT \
if (!IS_VALID_TINYINT(result)) { \
smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal); \
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_TINYINT; \
kvVal->i = result;
#define SET_UTINYINT \
if (!IS_VALID_UTINYINT(result)) { \
smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal); \
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_UTINYINT; \
kvVal->u = result;
bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) { bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value; const char *pVal = kvVal->value;
int32_t len = kvVal->length; int32_t len = kvVal->length;
@ -765,8 +922,6 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
// int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray *pTags, STableMeta *pTableMeta, static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray *pTags, STableMeta *pTableMeta,
ESchemaAction action) { ESchemaAction action) {
SRequestObj *pRequest = NULL; SRequestObj *pRequest = NULL;
@ -1121,35 +1276,6 @@ end:
return code; return code;
} }
/*
static int32_t smlCheckDupUnit(SHashObj *dumplicateKey, SArray *tags, SSmlMsgBuf *msg){
for(int i = 0; i < taosArrayGetSize(tags); i++) {
SSmlKv *tag = taosArrayGet(tags, i);
if (smlCheckDuplicateKey(tag->key, tag->keyLen, dumplicateKey)) {
smlBuildInvalidDataMsg(msg, "dumplicate key", tag->key);
return TSDB_CODE_TSC_DUP_NAMES;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlJudgeDupColName(SArray *cols, SArray *tags, SSmlMsgBuf *msg) {
SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
int ret = smlCheckDupUnit(dumplicateKey, cols, msg);
if(ret != TSDB_CODE_SUCCESS){
goto end;
}
ret = smlCheckDupUnit(dumplicateKey, tags, msg);
if(ret != TSDB_CODE_SUCCESS){
goto end;
}
end:
taosHashCleanup(dumplicateKey);
return ret;
}
*/
static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols) { static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols) {
for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) { for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
@ -1206,11 +1332,6 @@ void smlDestroyTableInfo(void *para) {
taosHashCleanup(kvHash); taosHashCleanup(kvHash);
} }
// if (info->parseJsonByLib) {
// SSmlLineInfo *key = (SSmlLineInfo *)(tag->key);
// if (key != NULL) taosMemoryFree(key->tags);
// }
// taosMemoryFree(tag->key);
taosArrayDestroy(tag->cols); taosArrayDestroy(tag->cols);
taosArrayDestroyEx(tag->tags, freeSSmlKv); taosArrayDestroyEx(tag->tags, freeSSmlKv);
taosMemoryFree(tag); taosMemoryFree(tag);
@ -1228,21 +1349,6 @@ void smlDestroyInfo(SSmlHandle *info) {
if (!info) return; if (!info) return;
qDestroyQuery(info->pQuery); qDestroyQuery(info->pQuery);
// destroy info->childTables
// SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
// while (oneTable) {
// smlDestroyTableInfo(oneTable);
// oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
// }
// destroy info->superTables
// SSmlSTableMeta **oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
// while (oneSTable) {
// smlDestroySTableMeta(*oneSTable);
// oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, oneSTable);
// }
// destroy info->pVgHash
taosHashCleanup(info->pVgHash); taosHashCleanup(info->pVgHash);
taosHashCleanup(info->childTables); taosHashCleanup(info->childTables);
taosHashCleanup(info->superTables); taosHashCleanup(info->superTables);
@ -1399,11 +1505,6 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
return ret; return ret;
} }
} else { } else {
// ret = smlJudgeDupColName(elements->colArray, tinfo->tags, &info->msgBuf);
// if (ret != TSDB_CODE_SUCCESS) {
// uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
// return ret;
// }
uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat, uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat,
info->lineNum); info->lineNum);
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
@ -1537,19 +1638,6 @@ static void smlPrintStatisticInfo(SSmlHandle *info) {
int32_t smlClearForRerun(SSmlHandle *info) { int32_t smlClearForRerun(SSmlHandle *info) {
info->reRun = false; info->reRun = false;
// clear info->childTables
// SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
// while (oneTable) {
// smlDestroyTableInfo(info, *oneTable);
// oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
// }
// clear info->superTables
// SSmlSTableMeta **oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
// while (oneSTable) {
// smlDestroySTableMeta(*oneSTable);
// oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, oneSTable);
// }
taosHashClear(info->childTables); taosHashClear(info->childTables);
taosHashClear(info->superTables); taosHashClear(info->superTables);

View File

@ -29,199 +29,6 @@
(start)++; \ (start)++; \
} }
// SArray *smlJsonParseTags(char *start, char *end){
// SArray *tags = taosArrayInit(4, sizeof(SSmlKv));
// while(start < end){
// SSmlKv kv = {0};
// kv.type = TSDB_DATA_TYPE_NCHAR;
// bool isInQuote = false;
// while(start < end){
// if(unlikely(!isInQuote && *start == '"')){
// start++;
// kv.key = start;
// isInQuote = true;
// continue;
// }
// if(unlikely(isInQuote && *start == '"')){
// kv.keyLen = start - kv.key;
// start++;
// break;
// }
// start++;
// }
// bool hasColon = false;
// while(start < end){
// if(unlikely(!hasColon && *start == ':')){
// start++;
// hasColon = true;
// continue;
// }
// if(unlikely(hasColon && kv.value == NULL && (*start > 32 && *start != '"'))){
// kv.value = start;
// start++;
// continue;
// }
//
// if(unlikely(hasColon && kv.value != NULL && (*start == '"' || *start == ',' || *start == '}'))){
// kv.length = start - kv.value;
// taosArrayPush(tags, &kv);
// start++;
// break;
// }
// start++;
// }
// }
// return tags;
// }
// static int32_t smlParseTagsFromJSON(SSmlHandle *info, SSmlLineInfo *elements) {
// int32_t ret = TSDB_CODE_SUCCESS;
//
// if(is_same_child_table_telnet(elements, &info->preLine) == 0){
// return TSDB_CODE_SUCCESS;
// }
//
// bool isSameMeasure = IS_SAME_SUPER_TABLE;
//
// int cnt = 0;
// SArray *preLineKV = info->preLineTagKV;
// bool isSuperKVInit = true;
// SArray *superKV = NULL;
// if(info->dataFormat){
// if(unlikely(!isSameMeasure)){
// SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure,
// elements->measureLen, NULL);
//
// if(unlikely(sMeta == NULL)){
// sMeta = smlBuildSTableMeta(info->dataFormat);
// STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
// sMeta->tableMeta = pTableMeta;
// if(pTableMeta == NULL){
// info->dataFormat = false;
// info->reRun = true;
// return TSDB_CODE_SUCCESS;
// }
// nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta, NULL);
// }
// info->currSTableMeta = sMeta->tableMeta;
// superKV = sMeta->tags;
//
// if(unlikely(taosArrayGetSize(superKV) == 0)){
// isSuperKVInit = false;
// }
// taosArraySetSize(preLineKV, 0);
// }
// }else{
// taosArraySetSize(preLineKV, 0);
// }
//
// SArray *tags = smlJsonParseTags(elements->tags, elements->tags + elements->tagsLen);
// int32_t tagNum = taosArrayGetSize(tags);
// if (tagNum == 0) {
// uError("SML:tag is empty:%s", elements->tags)
// taosArrayDestroy(tags);
// return TSDB_CODE_SML_INVALID_DATA;
// }
// for (int32_t i = 0; i < tagNum; ++i) {
// SSmlKv kv = *(SSmlKv*)taosArrayGet(tags, i);
//
// if(info->dataFormat){
// if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
// info->dataFormat = false;
// info->reRun = true;
// taosArrayDestroy(tags);
// return TSDB_CODE_SUCCESS;
// }
//
// if(isSameMeasure){
// if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
// info->dataFormat = false;
// info->reRun = true;
// taosArrayDestroy(tags);
// return TSDB_CODE_SUCCESS;
// }
// SSmlKv *preKV = (SSmlKv *)taosArrayGet(preLineKV, cnt);
// if(unlikely(kv.length > preKV->length)){
// preKV->length = kv.length;
// SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure,
// elements->measureLen, NULL);
// if(unlikely(NULL == tableMeta)){
// uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id);
// return TSDB_CODE_SML_INTERNAL_ERROR;
// }
//
// SSmlKv *oldKV = (SSmlKv *)taosArrayGet(tableMeta->tags, cnt);
// oldKV->length = kv.length;
// info->needModifySchema = true;
// }
// if(unlikely(!IS_SAME_KEY)){
// info->dataFormat = false;
// info->reRun = true;
// taosArrayDestroy(tags);
// return TSDB_CODE_SUCCESS;
// }
// }else{
// if(isSuperKVInit){
// if(unlikely(cnt >= taosArrayGetSize(superKV))) {
// info->dataFormat = false;
// info->reRun = true;
// taosArrayDestroy(tags);
// return TSDB_CODE_SUCCESS;
// }
// SSmlKv *preKV = (SSmlKv *)taosArrayGet(superKV, cnt);
// if(unlikely(kv.length > preKV->length)) {
// preKV->length = kv.length;
// }else{
// kv.length = preKV->length;
// }
// info->needModifySchema = true;
//
// if(unlikely(!IS_SAME_KEY)){
// info->dataFormat = false;
// info->reRun = true;
// taosArrayDestroy(tags);
// return TSDB_CODE_SUCCESS;
// }
// }else{
// taosArrayPush(superKV, &kv);
// }
// taosArrayPush(preLineKV, &kv);
// }
// }else{
// taosArrayPush(preLineKV, &kv);
// }
// cnt++;
// }
// taosArrayDestroy(tags);
//
// SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES,
// is_same_child_table_telnet); if (unlikely(tinfo == NULL)) {
// tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
// if (unlikely(!tinfo)) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
// tinfo->tags = taosArrayDup(preLineKV, NULL);
//
// smlSetCTableName(tinfo);
// if (info->dataFormat) {
// info->currSTableMeta->uid = tinfo->uid;
// tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
// if (tinfo->tableDataCtx == NULL) {
// smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
// return TSDB_CODE_SML_INVALID_DATA;
// }
// }
//
// SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo));
// *key = *elements;
// tinfo->key = key;
// nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo, is_same_child_table_telnet);
// }
// if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx;
//
// return ret;
// }
static char *smlJsonGetObj(char *payload) { static char *smlJsonGetObj(char *payload) {
int leftBracketCnt = 0; int leftBracketCnt = 0;
bool isInQuote = false; bool isInQuote = false;
@ -659,12 +466,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
break; break;
} }
case cJSON_String: { case cJSON_String: {
/* set default JSON type to binary/nchar according to smlConvertJSONString(kv, "binary", root);
* user configured parameter tsDefaultJSONStrType
*/
char *tsDefaultJSONStrType = "binary"; // todo
smlConvertJSONString(kv, tsDefaultJSONStrType, root);
break; break;
} }
case cJSON_Object: { case cJSON_Object: {
@ -682,138 +484,71 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo *elements) { static int32_t smlProcessTagJson(SSmlHandle *info, cJSON *tags){
int32_t ret = TSDB_CODE_SUCCESS;
bool isSameMeasure = IS_SAME_SUPER_TABLE;
int cnt = 0;
SArray *preLineKV = info->preLineTagKV; SArray *preLineKV = info->preLineTagKV;
if (info->dataFormat) { taosArrayClearEx(preLineKV, freeSSmlKv);
if (unlikely(!isSameMeasure)) { int cnt = 0;
SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
SSmlSTableMeta *sMeta = NULL;
if (unlikely(tmp == NULL)) {
STableMeta *pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
if (pTableMeta == NULL) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
sMeta = smlBuildSTableMeta(info->dataFormat);
if(sMeta == NULL){
taosMemoryFreeClear(pTableMeta);
return TSDB_CODE_OUT_OF_MEMORY;
}
sMeta->tableMeta = pTableMeta;
taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES);
for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){
SSchema *tag = pTableMeta->schema + i;
SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type, .length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE };
taosArrayPush(sMeta->tags, &kv);
}
tmp = &sMeta;
}
info->currSTableMeta = (*tmp)->tableMeta;
info->maxTagKVs = (*tmp)->tags;
}
}
taosArrayClear(preLineKV);
int32_t tagNum = cJSON_GetArraySize(tags); int32_t tagNum = cJSON_GetArraySize(tags);
if (unlikely(tagNum == 0)) { if (unlikely(tagNum == 0)) {
uError("SML:Tag should not be empty"); uError("SML:Tag should not be empty");
return TSDB_CODE_TSC_INVALID_JSON; terrno = TSDB_CODE_TSC_INVALID_JSON;
return -1;
} }
for (int32_t i = 0; i < tagNum; ++i) { for (int32_t i = 0; i < tagNum; ++i) {
cJSON *tag = cJSON_GetArrayItem(tags, i); cJSON *tag = cJSON_GetArrayItem(tags, i);
if (unlikely(tag == NULL)) { if (unlikely(tag == NULL)) {
return TSDB_CODE_TSC_INVALID_JSON; terrno = TSDB_CODE_TSC_INVALID_JSON;
return -1;
} }
// if(unlikely(tag == cMeasure)) continue;
size_t keyLen = strlen(tag->string); size_t keyLen = strlen(tag->string);
if (unlikely(IS_INVALID_COL_LEN(keyLen))) { if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
uError("OTD:Tag key length is 0 or too large than 64"); uError("OTD:Tag key length is 0 or too large than 64");
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH; terrno = TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
return -1;
} }
// add kv to SSmlKv // add kv to SSmlKv
SSmlKv kv = {.key = tag->string, .keyLen = keyLen}; SSmlKv kv = {0};
// value kv.key = tag->string;
ret = smlParseValueFromJSON(tag, &kv); kv.keyLen = keyLen;
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret;
}
if (info->dataFormat) {
if (unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if (unlikely(cnt >= taosArrayGetSize(info->maxTagKVs))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxTagKVs, cnt);
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
info->needModifySchema = true;
}
}
taosArrayPush(preLineKV, &kv); taosArrayPush(preLineKV, &kv);
// value
int32_t ret = smlParseValueFromJSON(tag, &kv);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
terrno = ret;
return -1;
}
if (info->dataFormat && !isSmlTagAligned(info, cnt, &kv)) {
terrno = TSDB_CODE_SUCCESS;
return -1;
}
cnt++; cnt++;
} }
return 0;
}
elements->measureTag = (char *)taosMemoryMalloc(elements->measureLen + elements->tagsLen); static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo *elements) {
memcpy(elements->measureTag, elements->measure, elements->measureLen); int32_t ret = 0;
memcpy(elements->measureTag + elements->measureLen, elements->tags, elements->tagsLen); if(info->dataFormat){
elements->measureTagsLen = elements->measureLen + elements->tagsLen; ret = smlProcessSuperTable(info, elements);
if(ret != 0){
SSmlTableInfo **tmp = return terrno;
(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen);
SSmlTableInfo *tinfo = NULL;
if (unlikely(tmp == NULL)) {
tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
if (unlikely(!tinfo)) {
return TSDB_CODE_OUT_OF_MEMORY;
} }
tinfo->tags = taosArrayDup(preLineKV, NULL);
smlSetCTableName(tinfo);
getTableUid(info, elements, tinfo);
if (info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
smlDestroyTableInfo(&tinfo);
return TSDB_CODE_SML_INVALID_DATA;
}
}
// SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo));
// *key = *elements;
// if(info->parseJsonByLib){
// key->tags = taosMemoryMalloc(elements->tagsLen + 1);
// memcpy(key->tags, elements->tags, elements->tagsLen);
// key->tags[elements->tagsLen] = 0;
// }
// tinfo->key = key;
taosHashPut(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen, &tinfo,
POINTER_BYTES);
tmp = &tinfo;
} }
if (info->dataFormat) info->currTableDataCtx = (*tmp)->tableDataCtx; ret = smlProcessTagJson(info, tags);
if(ret != 0){
return ret; return terrno;
}
ret = smlJoinMeasureTag(elements);
if(ret != 0){
return ret;
}
return smlProcessChildTable(info, elements);
} }
static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPrecision) { static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPrecision) {
@ -998,35 +733,10 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id); uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
return TSDB_CODE_INVALID_TIMESTAMP; return TSDB_CODE_INVALID_TIMESTAMP;
} }
SSmlKv kvTs = {.key = tsSmlTsDefaultName, SSmlKv kvTs = {0};
.keyLen = strlen(tsSmlTsDefaultName), smlBuildTsKv(&kvTs, ts);
.type = TSDB_DATA_TYPE_TIMESTAMP,
.i = ts,
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
if (info->dataFormat) { return smlParseEndTelnetJson(info, elements, &kvTs, &kv);
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
if (ret == TSDB_CODE_SUCCESS) {
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
}
if (ret == TSDB_CODE_SUCCESS) {
ret = smlBuildRow(info->currTableDataCtx);
}
clearColValArraySml(info->currTableDataCtx->pValues);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
} else {
if (elements->colArray == NULL) {
elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
}
taosArrayPush(elements->colArray, &kvTs);
taosArrayPush(elements->colArray, &kv);
}
info->preLine = *elements;
return TSDB_CODE_SUCCESS;
} }
static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) { static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
@ -1054,7 +764,6 @@ static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
return TSDB_CODE_TSC_INVALID_JSON; return TSDB_CODE_TSC_INVALID_JSON;
} }
if (unlikely(info->lines != NULL)) { if (unlikely(info->lines != NULL)) {
for (int i = 0; i < info->lineNum; i++) { for (int i = 0; i < info->lineNum; i++) {
taosArrayDestroyEx(info->lines[i].colArray, freeSSmlKv); taosArrayDestroyEx(info->lines[i].colArray, freeSSmlKv);
@ -1202,35 +911,10 @@ static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *
return TSDB_CODE_INVALID_TIMESTAMP; return TSDB_CODE_INVALID_TIMESTAMP;
} }
} }
SSmlKv kvTs = {.key = tsSmlTsDefaultName, SSmlKv kvTs = {0};
.keyLen = strlen(tsSmlTsDefaultName), smlBuildTsKv(&kvTs, ts);
.type = TSDB_DATA_TYPE_TIMESTAMP,
.i = ts,
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
if (info->dataFormat) { return smlParseEndTelnetJson(info, elements, &kvTs, &kv);
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
if (ret == TSDB_CODE_SUCCESS) {
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
}
if (ret == TSDB_CODE_SUCCESS) {
ret = smlBuildRow(info->currTableDataCtx);
}
clearColValArraySml(info->currTableDataCtx->pValues);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
} else {
if (elements->colArray == NULL) {
elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
}
taosArrayPush(elements->colArray, &kvTs);
taosArrayPush(elements->colArray, &kv);
}
info->preLine = *elements;
return TSDB_CODE_SUCCESS;
} }
int32_t smlParseJSON(SSmlHandle *info, char *payload) { int32_t smlParseJSON(SSmlHandle *info, char *payload) {

View File

@ -20,15 +20,9 @@
#include "clientSml.h" #include "clientSml.h"
// comma ,
#define IS_COMMA(sql) (*(sql) == COMMA && *((sql)-1) != SLASH) #define IS_COMMA(sql) (*(sql) == COMMA && *((sql)-1) != SLASH)
// space
#define IS_SPACE(sql) (*(sql) == SPACE && *((sql)-1) != SLASH) #define IS_SPACE(sql) (*(sql) == SPACE && *((sql)-1) != SLASH)
// equal =
#define IS_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) != SLASH) #define IS_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) != SLASH)
// quote "
// #define IS_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) != SLASH)
// SLASH
#define IS_SLASH_LETTER_IN_FIELD_VALUE(sql) (*((sql)-1) == SLASH && (*(sql) == QUOTE || *(sql) == SLASH)) #define IS_SLASH_LETTER_IN_FIELD_VALUE(sql) (*((sql)-1) == SLASH && (*(sql) == QUOTE || *(sql) == SLASH))
@ -51,10 +45,10 @@
} \ } \
} }
#define BINARY_ADD_LEN 2 // "binary" 2 means " " #define BINARY_ADD_LEN (sizeof("\"\"")-1) // "binary" 2 means length of ("")
#define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" " #define NCHAR_ADD_LEN (sizeof("L\"\"")-1) // L"nchar" 3 means length of (L"")
uint8_t smlPrecisionConvert[7] = {TSDB_TIME_PRECISION_NANO, TSDB_TIME_PRECISION_HOURS, TSDB_TIME_PRECISION_MINUTES, uint8_t smlPrecisionConvert[] = {TSDB_TIME_PRECISION_NANO, TSDB_TIME_PRECISION_HOURS, TSDB_TIME_PRECISION_MINUTES,
TSDB_TIME_PRECISION_SECONDS, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_SECONDS, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_MICRO,
TSDB_TIME_PRECISION_NANO}; TSDB_TIME_PRECISION_NANO};
@ -198,68 +192,10 @@ int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *currElement, bool isSameMeasure, static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){
bool isSameCTable) {
if (isSameCTable) {
return TSDB_CODE_SUCCESS;
}
int cnt = 0;
SArray *preLineKV = info->preLineTagKV; SArray *preLineKV = info->preLineTagKV;
if (info->dataFormat) {
if (unlikely(!isSameMeasure)) {
SSmlSTableMeta **tmp =
(SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
SSmlSTableMeta *sMeta = NULL;
if (unlikely(tmp == NULL)) {
char *measure = currElement->measure;
int measureLen = currElement->measureLen;
if (currElement->measureEscaped) {
measure = (char *)taosMemoryMalloc(currElement->measureLen);
memcpy(measure, currElement->measure, currElement->measureLen);
PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);
smlStrReplace(measure, measureLen);
}
STableMeta *pTableMeta = smlGetMeta(info, measure, measureLen);
if (currElement->measureEscaped) {
taosMemoryFree(measure);
}
if (pTableMeta == NULL) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
sMeta = smlBuildSTableMeta(info->dataFormat);
if(sMeta == NULL){
taosMemoryFreeClear(pTableMeta);
return TSDB_CODE_OUT_OF_MEMORY;
}
sMeta->tableMeta = pTableMeta;
taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &sMeta, POINTER_BYTES);
for (int i = 1; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++) {
SSchema *tag = pTableMeta->schema + i;
if(i < pTableMeta->tableInfo.numOfColumns){
SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type};
if (tag->type == TSDB_DATA_TYPE_NCHAR) {
kv.length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
} else if (tag->type == TSDB_DATA_TYPE_BINARY || tag->type == TSDB_DATA_TYPE_GEOMETRY || tag->type == TSDB_DATA_TYPE_VARBINARY) {
kv.length = tag->bytes - VARSTR_HEADER_SIZE;
}
taosArrayPush(sMeta->cols, &kv);
}else{
SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type, .length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE};
taosArrayPush(sMeta->tags, &kv);
}
}
tmp = &sMeta;
}
info->currSTableMeta = (*tmp)->tableMeta;
info->maxTagKVs = (*tmp)->tags;
}
}
taosArrayClearEx(preLineKV, freeSSmlKv); taosArrayClearEx(preLineKV, freeSSmlKv);
int cnt = 0;
while (*sql < sqlEnd) { while (*sql < sqlEnd) {
if (unlikely(IS_SPACE(*sql))) { if (unlikely(IS_SPACE(*sql))) {
@ -274,7 +210,8 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
while (*sql < sqlEnd) { while (*sql < sqlEnd) {
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) { if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA; terrno = TSDB_CODE_SML_INVALID_DATA;
return -1;
} }
if (unlikely(IS_EQUAL(*sql))) { if (unlikely(IS_EQUAL(*sql))) {
keyLen = *sql - key; keyLen = *sql - key;
@ -290,7 +227,8 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
if (unlikely(IS_INVALID_COL_LEN(keyLen - keyLenEscaped))) { if (unlikely(IS_INVALID_COL_LEN(keyLen - keyLenEscaped))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key); smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH; terrno = TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
return -1;
} }
// parse value // parse value
@ -304,7 +242,8 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
break; break;
} else if (unlikely(IS_EQUAL(*sql))) { } else if (unlikely(IS_EQUAL(*sql))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA; terrno = TSDB_CODE_SML_INVALID_DATA;
return -1;
} }
if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) { if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) {
@ -318,11 +257,13 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
if (unlikely(valueLen == 0)) { if (unlikely(valueLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value); smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
return TSDB_CODE_SML_INVALID_DATA; terrno = TSDB_CODE_SML_INVALID_DATA;
return -1;
} }
if (unlikely(valueLen - valueLenEscaped > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) { if (unlikely(valueLen - valueLenEscaped > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN; terrno = TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
return -1;
} }
if (keyEscaped) { if (keyEscaped) {
@ -338,37 +279,17 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
value = tmp; value = tmp;
} }
SSmlKv kv = {.key = key, SSmlKv kv = {.key = key,
.keyLen = keyLen, .keyLen = keyLen,
.type = TSDB_DATA_TYPE_NCHAR, .type = TSDB_DATA_TYPE_NCHAR,
.value = value, .value = value,
.length = valueLen, .length = valueLen,
.keyEscaped = keyEscaped, .keyEscaped = keyEscaped,
.valueEscaped = valueEscaped}; .valueEscaped = valueEscaped};
taosArrayPush(preLineKV, &kv); taosArrayPush(preLineKV, &kv);
if (info->dataFormat) {
if (unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if (unlikely(cnt >= taosArrayGetSize(info->maxTagKVs))) { if (info->dataFormat && !isSmlTagAligned(info, cnt, &kv)) {
info->dataFormat = false; terrno = TSDB_CODE_SUCCESS;
info->reRun = true; return -1;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxTagKVs, cnt);
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
info->needModifySchema = true;
}
} }
cnt++; cnt++;
@ -377,99 +298,33 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
} }
(*sql)++; (*sql)++;
} }
return 0;
}
void *oneTable = taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen); static int32_t smlParseTagLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *elements) {
if ((oneTable != NULL)) { bool isSameCTable = IS_SAME_CHILD_TABLE;
if(isSameCTable){
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSmlTableInfo *tinfo = smlBuildTableInfo(1, currElement->measure, currElement->measureLen); int32_t ret = 0;
if (unlikely(!tinfo)) { if(info->dataFormat){
return TSDB_CODE_OUT_OF_MEMORY; ret = smlProcessSuperTable(info, elements);
} if(ret != 0){
tinfo->tags = taosArrayDup(preLineKV, NULL); return terrno;
for (size_t i = 0; i < taosArrayGetSize(preLineKV); i++) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(preLineKV, i);
if (kv->keyEscaped) kv->key = NULL;
if (kv->valueEscaped) kv->value = NULL;
}
smlSetCTableName(tinfo);
getTableUid(info, currElement, tinfo);
if (info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) {
smlDestroyTableInfo(&tinfo);
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
return TSDB_CODE_SML_INVALID_DATA;
} }
} }
taosHashPut(info->childTables, currElement->measure, currElement->measureTagsLen, &tinfo, POINTER_BYTES); ret = smlProcessTagLine(info, sql, sqlEnd);
if(ret != 0){
return terrno;
}
return TSDB_CODE_SUCCESS; return smlProcessChildTable(info, elements);
} }
static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *currElement, bool isSameMeasure, static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *currElement) {
bool isSameCTable) {
int cnt = 0; int cnt = 0;
if (info->dataFormat) {
if (unlikely(!isSameCTable)) {
SSmlTableInfo **oneTable =
(SSmlTableInfo **)taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen);
if (unlikely(oneTable == NULL)) {
smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure);
return TSDB_CODE_SML_INVALID_DATA;
}
info->currTableDataCtx = (*oneTable)->tableDataCtx;
}
if (unlikely(!isSameMeasure)) {
SSmlSTableMeta **tmp =
(SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
if (unlikely(tmp == NULL)) {
char *measure = currElement->measure;
int measureLen = currElement->measureLen;
if (currElement->measureEscaped) {
measure = (char *)taosMemoryMalloc(currElement->measureLen);
memcpy(measure, currElement->measure, currElement->measureLen);
PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);
smlStrReplace(measure, measureLen);
}
STableMeta *pTableMeta = smlGetMeta(info, measure, measureLen);
if (currElement->measureEscaped) {
taosMemoryFree(measure);
}
if (pTableMeta == NULL) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
*tmp = smlBuildSTableMeta(info->dataFormat);
if(*tmp == NULL){
taosMemoryFreeClear(pTableMeta);
return TSDB_CODE_OUT_OF_MEMORY;
}
(*tmp)->tableMeta = pTableMeta;
taosHashPut(info->superTables, currElement->measure, currElement->measureLen, tmp, POINTER_BYTES);
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
SSchema *tag = pTableMeta->schema + i;
SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type};
if (tag->type == TSDB_DATA_TYPE_NCHAR) {
kv.length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
} else if (tag->type == TSDB_DATA_TYPE_BINARY || tag->type == TSDB_DATA_TYPE_GEOMETRY || tag->type == TSDB_DATA_TYPE_VARBINARY) {
kv.length = tag->bytes - VARSTR_HEADER_SIZE;
}
taosArrayPush((*tmp)->cols, &kv);
}
}
info->currSTableMeta = (*tmp)->tableMeta;
info->maxColKVs = (*tmp)->cols;
}
}
while (*sql < sqlEnd) { while (*sql < sqlEnd) {
if (unlikely(IS_SPACE(*sql))) { if (unlikely(IS_SPACE(*sql))) {
break; break;
@ -569,47 +424,11 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
} }
if (info->dataFormat) { if (info->dataFormat) {
// cnt begin 0, add ts so + 2 bool isAligned = isSmlColAligned(info, cnt, &kv);
if (unlikely(cnt + 2 > info->currSTableMeta->tableInfo.numOfColumns)) {
info->dataFormat = false;
info->reRun = true;
freeSSmlKv(&kv);
return TSDB_CODE_SUCCESS;
}
// bind data
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, cnt + 1);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uDebug("smlBuildCol error, retry");
info->dataFormat = false;
info->reRun = true;
freeSSmlKv(&kv);
return TSDB_CODE_SUCCESS;
}
if (cnt >= taosArrayGetSize(info->maxColKVs)) {
info->dataFormat = false;
info->reRun = true;
freeSSmlKv(&kv);
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxColKVs, cnt);
if (kv.type != maxKV->type) {
info->dataFormat = false;
info->reRun = true;
freeSSmlKv(&kv);
return TSDB_CODE_SUCCESS;
}
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
freeSSmlKv(&kv);
return TSDB_CODE_SUCCESS;
}
if (unlikely(IS_VAR_DATA_TYPE(kv.type) && kv.length > maxKV->length)) {
maxKV->length = kv.length;
info->needModifySchema = true;
}
freeSSmlKv(&kv); freeSSmlKv(&kv);
if(!isAligned){
return TSDB_CODE_SUCCESS;
}
} else { } else {
if (currElement->colArray == NULL) { if (currElement->colArray == NULL) {
currElement->colArray = taosArrayInit_s(sizeof(SSmlKv), 1); currElement->colArray = taosArrayInit_s(sizeof(SSmlKv), 1);
@ -665,20 +484,12 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
tmp++; tmp++;
} }
elements->measureTagsLen = tmp - elements->measure; elements->measureTagsLen = tmp - elements->measure;
elements->measureTag = elements->measure;
bool isSameCTable = false;
bool isSameMeasure = false;
if (IS_SAME_CHILD_TABLE) {
isSameCTable = true;
isSameMeasure = true;
} else if (info->dataFormat) {
isSameMeasure = IS_SAME_SUPER_TABLE;
}
// parse tag // parse tag
if (*sql == COMMA) sql++; if (*sql == COMMA) sql++;
elements->tags = sql; elements->tags = sql;
int ret = smlParseTagKv(info, &sql, sqlEnd, elements, isSameMeasure, isSameCTable); int ret = smlParseTagLine(info, &sql, sqlEnd, elements);
if (unlikely(ret != TSDB_CODE_SUCCESS)) { if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret; return ret;
} }
@ -693,7 +504,7 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
JUMP_SPACE(sql, sqlEnd) JUMP_SPACE(sql, sqlEnd)
elements->cols = sql; elements->cols = sql;
ret = smlParseColKv(info, &sql, sqlEnd, elements, isSameMeasure, isSameCTable); ret = smlParseColLine(info, &sql, sqlEnd, elements);
if (unlikely(ret != TSDB_CODE_SUCCESS)) { if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret; return ret;
} }
@ -724,31 +535,9 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
uError("SML:0x%" PRIx64 " smlParseTS error:%" PRId64, info->id, ts); uError("SML:0x%" PRIx64 " smlParseTS error:%" PRId64, info->id, ts);
return TSDB_CODE_INVALID_TIMESTAMP; return TSDB_CODE_INVALID_TIMESTAMP;
} }
// add ts to
SSmlKv kv = {.key = tsSmlTsDefaultName,
.keyLen = strlen(tsSmlTsDefaultName),
.type = TSDB_DATA_TYPE_TIMESTAMP,
.i = ts,
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes,
.keyEscaped = false,
.valueEscaped = false};
if (info->dataFormat) {
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format true, ts:%" PRId64, info->id, ts);
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
if (ret == TSDB_CODE_SUCCESS) {
ret = smlBuildRow(info->currTableDataCtx);
}
clearColValArraySml(info->currTableDataCtx->pValues); SSmlKv kvTs = {0};
if (unlikely(ret != TSDB_CODE_SUCCESS)) { smlBuildTsKv(&kvTs, ts);
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
} else {
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format false, ts:%" PRId64, info->id, ts);
taosArraySet(elements->colArray, 0, &kv);
}
info->preLine = *elements;
return ret; return smlParseEndLine(info, elements, &kvTs);
} }

View File

@ -23,8 +23,6 @@
int32_t is_same_child_table_telnet(const void *a, const void *b) { int32_t is_same_child_table_telnet(const void *a, const void *b) {
SSmlLineInfo *t1 = (SSmlLineInfo *)a; SSmlLineInfo *t1 = (SSmlLineInfo *)a;
SSmlLineInfo *t2 = (SSmlLineInfo *)b; SSmlLineInfo *t2 = (SSmlLineInfo *)b;
// uError("is_same_child_table_telnet len:%d,%d %s,%s @@@ len:%d,%d %s,%s", t1->measureLen, t2->measureLen,
// t1->measure, t2->measure, t1->tagsLen, t2->tagsLen, t1->tags, t2->tags);
if (t1 == NULL || t2 == NULL || t1->measure == NULL || t2->measure == NULL || t1->tags == NULL || t2->tags == NULL) if (t1 == NULL || t2 == NULL || t1->measure == NULL || t2->measure == NULL || t1->tags == NULL || t2->tags == NULL)
return 1; return 1;
return (((t1->measureLen == t2->measureLen) && memcmp(t1->measure, t2->measure, t1->measureLen) == 0) && return (((t1->measureLen == t2->measureLen) && memcmp(t1->measure, t2->measure, t1->measureLen) == 0) &&
@ -69,47 +67,11 @@ static void smlParseTelnetElement(char **sql, char *sqlEnd, char **data, int32_t
} }
} }
static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) { static int32_t smlProcessTagTelnet(SSmlHandle *info, char *data, char *sqlEnd){
if (is_same_child_table_telnet(elements, &info->preLine) == 0) {
elements->measureTag = info->preLine.measureTag;
return TSDB_CODE_SUCCESS;
}
bool isSameMeasure = IS_SAME_SUPER_TABLE;
int cnt = 0;
SArray *preLineKV = info->preLineTagKV; SArray *preLineKV = info->preLineTagKV;
if (info->dataFormat) { taosArrayClearEx(preLineKV, freeSSmlKv);
if (!isSameMeasure) { int cnt = 0;
SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
SSmlSTableMeta *sMeta = NULL;
if (unlikely(tmp == NULL)) {
STableMeta *pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
if (pTableMeta == NULL) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
sMeta = smlBuildSTableMeta(info->dataFormat);
if(sMeta == NULL){
taosMemoryFreeClear(pTableMeta);
return TSDB_CODE_OUT_OF_MEMORY;
}
sMeta->tableMeta = pTableMeta;
taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES);
for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){
SSchema *tag = pTableMeta->schema + i;
SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type, .length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE };
taosArrayPush(sMeta->tags, &kv);
}
tmp = &sMeta;
}
info->currSTableMeta = (*tmp)->tableMeta;
info->maxTagKVs = (*tmp)->tags;
}
}
taosArrayClear(preLineKV);
const char *sql = data; const char *sql = data;
while (sql < sqlEnd) { while (sql < sqlEnd) {
JUMP_SPACE(sql, sqlEnd) JUMP_SPACE(sql, sqlEnd)
@ -121,8 +83,9 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
// parse key // parse key
while (sql < sqlEnd) { while (sql < sqlEnd) {
if (unlikely(*sql == SPACE)) { if (unlikely(*sql == SPACE)) {
smlBuildInvalidDataMsg(msg, "invalid data", sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
return TSDB_CODE_SML_INVALID_DATA; terrno = TSDB_CODE_SML_INVALID_DATA;
return -1;
} }
if (unlikely(*sql == EQUAL)) { if (unlikely(*sql == EQUAL)) {
keyLen = sql - key; keyLen = sql - key;
@ -133,13 +96,10 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
} }
if (unlikely(IS_INVALID_COL_LEN(keyLen))) { if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key); smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH; terrno = TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
return -1;
} }
// if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
// smlBuildInvalidDataMsg(msg, "dumplicate key", key);
// return TSDB_CODE_TSC_DUP_NAMES;
// }
// parse value // parse value
const char *value = sql; const char *value = sql;
@ -150,87 +110,67 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
break; break;
} }
if (unlikely(*sql == EQUAL)) { if (unlikely(*sql == EQUAL)) {
smlBuildInvalidDataMsg(msg, "invalid data", sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
return TSDB_CODE_SML_INVALID_DATA; terrno = TSDB_CODE_SML_INVALID_DATA;
return -1;
} }
sql++; sql++;
} }
valueLen = sql - value; valueLen = sql - value;
if (unlikely(valueLen == 0)) { if (unlikely(valueLen == 0)) {
smlBuildInvalidDataMsg(msg, "invalid value", value); smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
return TSDB_CODE_TSC_INVALID_VALUE; terrno = TSDB_CODE_TSC_INVALID_VALUE;
return -1;
} }
if (unlikely(valueLen > (TSDB_MAX_TAGS_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) { if (unlikely(valueLen > (TSDB_MAX_TAGS_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN; terrno = TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
return -1;
} }
SSmlKv kv = {.key = key, .keyLen = keyLen, .type = TSDB_DATA_TYPE_NCHAR, .value = value, .length = valueLen}; SSmlKv kv = {.key = key,
.keyLen = keyLen,
if (info->dataFormat) { .type = TSDB_DATA_TYPE_NCHAR,
if (unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)) { .value = value,
info->dataFormat = false; .length = valueLen,
info->reRun = true; .keyEscaped = false,
return TSDB_CODE_SUCCESS; .valueEscaped = false};
}
if (unlikely(cnt >= taosArrayGetSize(info->maxTagKVs))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxTagKVs, cnt);
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
info->needModifySchema = true;
}
}
taosArrayPush(preLineKV, &kv); taosArrayPush(preLineKV, &kv);
if (info->dataFormat && !isSmlTagAligned(info, cnt, &kv)) {
terrno = TSDB_CODE_SUCCESS;
return -1;
}
cnt++; cnt++;
} }
return 0;
}
elements->measureTag = (char *)taosMemoryMalloc(elements->measureLen + elements->tagsLen); static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SSmlLineInfo *elements) {
memcpy(elements->measureTag, elements->measure, elements->measureLen); if (is_same_child_table_telnet(elements, &info->preLine) == 0) {
memcpy(elements->measureTag + elements->measureLen, elements->tags, elements->tagsLen); elements->measureTag = info->preLine.measureTag;
elements->measureTagsLen = elements->measureLen + elements->tagsLen; return TSDB_CODE_SUCCESS;
SSmlTableInfo **tmp =
(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen);
SSmlTableInfo *tinfo = NULL;
if (unlikely(tmp == NULL)) {
tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
if (!tinfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tinfo->tags = taosArrayDup(preLineKV, NULL);
smlSetCTableName(tinfo);
getTableUid(info, elements, tinfo);
if (info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
smlDestroyTableInfo(&tinfo);
return TSDB_CODE_SML_INVALID_DATA;
}
}
// SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo));
// *key = *elements;
// tinfo->key = key;
taosHashPut(info->childTables, elements->measureTag, elements->measureLen + elements->tagsLen, &tinfo,
POINTER_BYTES);
tmp = &tinfo;
} }
if (info->dataFormat) info->currTableDataCtx = (*tmp)->tableDataCtx;
return TSDB_CODE_SUCCESS; int32_t ret = 0;
if(info->dataFormat){
ret = smlProcessSuperTable(info, elements);
if(ret != 0){
return terrno;
}
}
ret = smlProcessTagTelnet(info, data, sqlEnd);
if(ret != 0){
return terrno;
}
ret = smlJoinMeasureTag(elements);
if(ret != 0){
return ret;
}
return smlProcessChildTable(info, elements);
} }
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>] // format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
@ -251,7 +191,7 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
bool needConverTime = false; // get TS before parse tag(get meta), so need conver time bool needConverTime = false; // get TS before parse tag(get meta), so need convert time
if (info->dataFormat && info->currSTableMeta == NULL) { if (info->dataFormat && info->currSTableMeta == NULL) {
needConverTime = true; needConverTime = true;
} }
@ -260,11 +200,6 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
return TSDB_CODE_INVALID_TIMESTAMP; return TSDB_CODE_INVALID_TIMESTAMP;
} }
SSmlKv kvTs = {.key = tsSmlTsDefaultName,
.keyLen = strlen(tsSmlTsDefaultName),
.type = TSDB_DATA_TYPE_TIMESTAMP,
.i = ts,
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
// parse value // parse value
smlParseTelnetElement(&sql, sqlEnd, &elements->cols, &elements->colsLen); smlParseTelnetElement(&sql, sqlEnd, &elements->cols, &elements->colsLen);
@ -287,7 +222,7 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
int ret = smlParseTelnetTags(info, sql, sqlEnd, elements, &info->msgBuf); int ret = smlParseTelnetTags(info, sql, sqlEnd, elements);
if (unlikely(ret != TSDB_CODE_SUCCESS)) { if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret; return ret;
} }
@ -296,30 +231,11 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (info->dataFormat && info->currSTableMeta != NULL) { SSmlKv kvTs = {0};
if (needConverTime) { smlBuildTsKv(&kvTs, ts);
kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision); if (needConverTime) {
} kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
if (ret == TSDB_CODE_SUCCESS) {
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
}
if (ret == TSDB_CODE_SUCCESS) {
ret = smlBuildRow(info->currTableDataCtx);
}
clearColValArraySml(info->currTableDataCtx->pValues);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
} else {
if (elements->colArray == NULL) {
elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
}
taosArrayPush(elements->colArray, &kvTs);
taosArrayPush(elements->colArray, &kv);
} }
info->preLine = *elements;
return TSDB_CODE_SUCCESS; return smlParseEndTelnetJson(info, elements, &kvTs, &kv);
} }