[TD-6442]<feature>: Support OpenTSDB telnet style data import format
This commit is contained in:
parent
368d50f29e
commit
58ff59641a
|
@ -0,0 +1,67 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2021 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_TSCPARSELINE_H
|
||||||
|
#define TDENGINE_TSCPARSELINE_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char* key;
|
||||||
|
uint8_t type;
|
||||||
|
int16_t length;
|
||||||
|
char* value;
|
||||||
|
} TAOS_SML_KV;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char* stableName;
|
||||||
|
|
||||||
|
char* childTableName;
|
||||||
|
TAOS_SML_KV* tags;
|
||||||
|
int32_t tagNum;
|
||||||
|
|
||||||
|
// first kv must be timestamp
|
||||||
|
TAOS_SML_KV* fields;
|
||||||
|
int32_t fieldNum;
|
||||||
|
} TAOS_SML_DATA_POINT;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
SML_TIME_STAMP_NOW,
|
||||||
|
SML_TIME_STAMP_SECONDS,
|
||||||
|
SML_TIME_STAMP_MILLI_SECONDS,
|
||||||
|
SML_TIME_STAMP_MICRO_SECONDS,
|
||||||
|
SML_TIME_STAMP_NANO_SECONDS
|
||||||
|
} SMLTimeStampType;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint64_t id;
|
||||||
|
SHashObj* smlDataToSchema;
|
||||||
|
} SSmlLinesInfo;
|
||||||
|
|
||||||
|
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint);
|
||||||
|
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info);
|
||||||
|
int32_t isValidChildTableName(const char *pTbName, int16_t len);
|
||||||
|
bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
|
||||||
|
uint16_t len, SSmlLinesInfo* info);
|
||||||
|
int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
|
||||||
|
uint16_t len, SSmlLinesInfo* info);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // TDENGINE_TSCPARSELINE_H
|
|
@ -17,6 +17,7 @@
|
||||||
#include "tscLog.h"
|
#include "tscLog.h"
|
||||||
|
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
|
#include "tscParseLine.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char sTableName[TSDB_TABLE_NAME_LEN];
|
char sTableName[TSDB_TABLE_NAME_LEN];
|
||||||
|
@ -27,38 +28,6 @@ typedef struct {
|
||||||
uint8_t precision;
|
uint8_t precision;
|
||||||
} SSmlSTableSchema;
|
} SSmlSTableSchema;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
char* key;
|
|
||||||
uint8_t type;
|
|
||||||
int16_t length;
|
|
||||||
char* value;
|
|
||||||
} TAOS_SML_KV;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
char* stableName;
|
|
||||||
|
|
||||||
char* childTableName;
|
|
||||||
TAOS_SML_KV* tags;
|
|
||||||
int32_t tagNum;
|
|
||||||
|
|
||||||
// first kv must be timestamp
|
|
||||||
TAOS_SML_KV* fields;
|
|
||||||
int32_t fieldNum;
|
|
||||||
} TAOS_SML_DATA_POINT;
|
|
||||||
|
|
||||||
typedef enum {
|
|
||||||
SML_TIME_STAMP_NOW,
|
|
||||||
SML_TIME_STAMP_SECONDS,
|
|
||||||
SML_TIME_STAMP_MILLI_SECONDS,
|
|
||||||
SML_TIME_STAMP_MICRO_SECONDS,
|
|
||||||
SML_TIME_STAMP_NANO_SECONDS
|
|
||||||
} SMLTimeStampType;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
uint64_t id;
|
|
||||||
SHashObj* smlDataToSchema;
|
|
||||||
} SSmlLinesInfo;
|
|
||||||
|
|
||||||
//=================================================================================================
|
//=================================================================================================
|
||||||
|
|
||||||
static uint64_t linesSmlHandleId = 0;
|
static uint64_t linesSmlHandleId = 0;
|
||||||
|
@ -1565,8 +1534,8 @@ static bool convertStrToNumber(TAOS_SML_KV *pVal, char*str, SSmlLinesInfo* info)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
//len does not include '\0' from value.
|
//len does not include '\0' from value.
|
||||||
static bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
|
bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
|
||||||
uint16_t len, SSmlLinesInfo* info) {
|
uint16_t len, SSmlLinesInfo* info) {
|
||||||
if (len <= 0) {
|
if (len <= 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -1749,8 +1718,8 @@ static int32_t getTimeStampValue(char *value, uint16_t len,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
|
int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
|
||||||
uint16_t len, SSmlLinesInfo* info) {
|
uint16_t len, SSmlLinesInfo* info) {
|
||||||
int32_t ret;
|
int32_t ret;
|
||||||
SMLTimeStampType type;
|
SMLTimeStampType type;
|
||||||
int64_t tsVal;
|
int64_t tsVal;
|
||||||
|
@ -1805,7 +1774,7 @@ static int32_t parseSmlTimeStamp(TAOS_SML_KV **pTS, const char **index, SSmlLine
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
|
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
|
||||||
char *val = NULL;
|
char *val = NULL;
|
||||||
char *cur = key;
|
char *cur = key;
|
||||||
char keyLower[TSDB_COL_NAME_LEN];
|
char keyLower[TSDB_COL_NAME_LEN];
|
||||||
|
@ -1958,7 +1927,7 @@ static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **index
|
||||||
}
|
}
|
||||||
|
|
||||||
//Table name can only contain digits(0-9),alphebet(a-z),underscore(_)
|
//Table name can only contain digits(0-9),alphebet(a-z),underscore(_)
|
||||||
static int32_t isValidChildTableName(const char *pTbName, int16_t len) {
|
int32_t isValidChildTableName(const char *pTbName, int16_t len) {
|
||||||
const char *cur = pTbName;
|
const char *cur = pTbName;
|
||||||
for (int i = 0; i < len; ++i) {
|
for (int i = 0; i < len; ++i) {
|
||||||
if(!isdigit(cur[i]) && !isalpha(cur[i]) && (cur[i] != '_')) {
|
if(!isdigit(cur[i]) && !isalpha(cur[i]) && (cur[i] != '_')) {
|
||||||
|
@ -2212,308 +2181,3 @@ cleanup:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
//=========================================================================
|
|
||||||
// telnet style API parser
|
|
||||||
static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index, SSmlLinesInfo* info) {
|
|
||||||
const char *cur = *index;
|
|
||||||
uint16_t len = 0;
|
|
||||||
|
|
||||||
pSml->stableName = tcalloc(TSDB_TABLE_NAME_LEN + 1, 1); // +1 to avoid 1772 line over write
|
|
||||||
if (pSml->stableName == NULL){
|
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
if (isdigit(*cur)) {
|
|
||||||
tscError("SML:0x%"PRIx64" Metric cannnot start with digit", info->id);
|
|
||||||
tfree(pSml->stableName);
|
|
||||||
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (*cur != '\0') {
|
|
||||||
if (len > TSDB_TABLE_NAME_LEN) {
|
|
||||||
tscError("SML:0x%"PRIx64" Metric cannot exceeds 193 characters", info->id);
|
|
||||||
tfree(pSml->stableName);
|
|
||||||
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (*cur == ' ') {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
pSml->stableName[len] = *cur;
|
|
||||||
cur++;
|
|
||||||
len++;
|
|
||||||
}
|
|
||||||
pSml->stableName[len] = '\0';
|
|
||||||
*index = cur + 1;
|
|
||||||
tscDebug("SML:0x%"PRIx64" Stable name in metric:%s|len:%d", info->id, pSml->stableName, len);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char **index, SSmlLinesInfo* info) {
|
|
||||||
//Timestamp must be the first KV to parse
|
|
||||||
assert(*num_kvs == 0);
|
|
||||||
|
|
||||||
const char *start, *cur;
|
|
||||||
int32_t ret = TSDB_CODE_SUCCESS;
|
|
||||||
int len = 0;
|
|
||||||
char key[] = "_ts";
|
|
||||||
char *value = NULL;
|
|
||||||
|
|
||||||
start = cur = *index;
|
|
||||||
*pTS = calloc(1, sizeof(TAOS_SML_KV));
|
|
||||||
|
|
||||||
while(*cur != '\0') {
|
|
||||||
if (*cur == ' ') {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
cur++;
|
|
||||||
len++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (len > 0) {
|
|
||||||
value = calloc(len + 1, 1);
|
|
||||||
memcpy(value, start, len);
|
|
||||||
} else {
|
|
||||||
free(*pTS);
|
|
||||||
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = convertSmlTimeStamp(*pTS, value, len, info);
|
|
||||||
if (ret) {
|
|
||||||
free(value);
|
|
||||||
free(*pTS);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
free(value);
|
|
||||||
|
|
||||||
(*pTS)->key = calloc(sizeof(key), 1);
|
|
||||||
memcpy((*pTS)->key, key, sizeof(key));
|
|
||||||
|
|
||||||
*num_kvs += 1;
|
|
||||||
*index = cur + 1;
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const char **index, SSmlLinesInfo* info) {
|
|
||||||
//SKip timestamp
|
|
||||||
TAOS_SML_KV *pVal = *pKVs + sizeof(TAOS_SML_KV);
|
|
||||||
const char *start, *cur;
|
|
||||||
int32_t ret = TSDB_CODE_SUCCESS;
|
|
||||||
int len = 0;
|
|
||||||
char key[] = "value";
|
|
||||||
char *value = NULL;
|
|
||||||
|
|
||||||
start = cur = *index;
|
|
||||||
pVal = calloc(1, sizeof(TAOS_SML_KV));
|
|
||||||
|
|
||||||
while(*cur != '\0') {
|
|
||||||
if (*cur == ' ') {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
cur++;
|
|
||||||
len++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (len > 0) {
|
|
||||||
value = calloc(len + 1, 1);
|
|
||||||
memcpy(value, start, len);
|
|
||||||
} else {
|
|
||||||
free(pVal);
|
|
||||||
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!convertSmlValueType(pVal, value, len, info)) {
|
|
||||||
tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
|
|
||||||
info->id, value);
|
|
||||||
free(value);
|
|
||||||
free(pVal);
|
|
||||||
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
|
||||||
}
|
|
||||||
free(value);
|
|
||||||
|
|
||||||
pVal->key = calloc(sizeof(key), 1);
|
|
||||||
memcpy(pVal->key, key, sizeof(key));
|
|
||||||
*num_kvs += 1;
|
|
||||||
|
|
||||||
*index = cur + 1;
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash, SSmlLinesInfo* info) {
|
|
||||||
const char *cur = *index;
|
|
||||||
char key[TSDB_COL_NAME_LEN + 1]; // +1 to avoid key[len] over write
|
|
||||||
uint16_t len = 0;
|
|
||||||
|
|
||||||
//key field cannot start with digit
|
|
||||||
if (isdigit(*cur)) {
|
|
||||||
tscError("SML:0x%"PRIx64" Tag key cannnot start with digit", info->id);
|
|
||||||
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
|
||||||
}
|
|
||||||
while (*cur != '\0') {
|
|
||||||
if (len > TSDB_COL_NAME_LEN) {
|
|
||||||
tscError("SML:0x%"PRIx64" Key field cannot exceeds 65 characters", info->id);
|
|
||||||
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
|
||||||
}
|
|
||||||
if (*cur == '=') {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
key[len] = *cur;
|
|
||||||
cur++;
|
|
||||||
len++;
|
|
||||||
}
|
|
||||||
key[len] = '\0';
|
|
||||||
|
|
||||||
if (checkDuplicateKey(key, pHash, info)) {
|
|
||||||
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
pKV->key = calloc(len + 1, 1);
|
|
||||||
memcpy(pKV->key, key, len + 1);
|
|
||||||
//tscDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len);
|
|
||||||
*index = cur + 1;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static bool parseTelnetTagValue(TAOS_SML_KV *pKV, const char **index,
|
|
||||||
bool *is_last_kv, SSmlLinesInfo* info) {
|
|
||||||
const char *start, *cur;
|
|
||||||
char *value = NULL;
|
|
||||||
uint16_t len = 0;
|
|
||||||
start = cur = *index;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
// ',' or '\0' identifies a value
|
|
||||||
if (*cur == ',' || *cur == '\0') {
|
|
||||||
// '\0' indicates end of value
|
|
||||||
*is_last_kv = (*cur == '\0') ? true : false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
cur++;
|
|
||||||
len++;
|
|
||||||
}
|
|
||||||
|
|
||||||
value = calloc(len + 1, 1);
|
|
||||||
memcpy(value, start, len);
|
|
||||||
value[len] = '\0';
|
|
||||||
if (!convertSmlValueType(pKV, value, len, info)) {
|
|
||||||
tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
|
|
||||||
info->id, value);
|
|
||||||
//free previous alocated key field
|
|
||||||
free(pKV->key);
|
|
||||||
pKV->key = NULL;
|
|
||||||
free(value);
|
|
||||||
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
|
||||||
}
|
|
||||||
free(value);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs,
|
|
||||||
const char **index, char* childTableName,
|
|
||||||
SHashObj *pHash, SSmlLinesInfo* info) {
|
|
||||||
const char *cur = *index;
|
|
||||||
int32_t ret = TSDB_CODE_SUCCESS;
|
|
||||||
TAOS_SML_KV *pkv;
|
|
||||||
bool is_last_kv = false;
|
|
||||||
|
|
||||||
int32_t capacity = 4;
|
|
||||||
*pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
|
|
||||||
pkv = *pKVs;
|
|
||||||
|
|
||||||
while (*cur != '\0') {
|
|
||||||
ret = parseTelnetTagKey(pkv, &cur, pHash, info);
|
|
||||||
if (ret) {
|
|
||||||
tscError("SML:0x%"PRIx64" Unable to parse key", info->id);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
ret = parseTelnetTagValue(pkv, &cur, &is_last_kv, info);
|
|
||||||
if (ret) {
|
|
||||||
tscError("SML:0x%"PRIx64" Unable to parse value", info->id);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
if ((strcasecmp(pkv->key, "ID") == 0) && pkv->type == TSDB_DATA_TYPE_BINARY) {
|
|
||||||
ret = isValidChildTableName(pkv->value, pkv->length);
|
|
||||||
if (ret) {
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
childTableName = malloc(pkv->length + 1);
|
|
||||||
memcpy(childTableName, pkv->value, pkv->length);
|
|
||||||
childTableName[pkv->length] = '\0';
|
|
||||||
free(pkv->key);
|
|
||||||
free(pkv->value);
|
|
||||||
} else {
|
|
||||||
*num_kvs += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (is_last_kv) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
//reallocate addtional memory for more kvs
|
|
||||||
TAOS_SML_KV *more_kvs = NULL;
|
|
||||||
|
|
||||||
if ((*num_kvs + 1) > capacity) {
|
|
||||||
capacity *= 3; capacity /= 2;
|
|
||||||
more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
|
|
||||||
} else {
|
|
||||||
more_kvs = *pKVs;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!more_kvs) {
|
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
*pKVs = more_kvs;
|
|
||||||
//move pKV points to next TAOS_SML_KV block
|
|
||||||
pkv = *pKVs + *num_kvs;
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tscParseTelnetAPI(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) {
|
|
||||||
const char* index = sql;
|
|
||||||
int32_t ret = TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
//Parse metric
|
|
||||||
ret = parseTelnetMetric(smlData, &index, info);
|
|
||||||
if (ret) {
|
|
||||||
tscError("SML:0x%"PRIx64" Unable to parse metric", info->id);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
tscDebug("SML:0x%"PRIx64" Parse metric finished", info->id);
|
|
||||||
|
|
||||||
//Parse timestamp
|
|
||||||
ret = parseTelnetTimeStamp(&smlData->fields, &smlData->fieldNum, &index, info);
|
|
||||||
if (ret) {
|
|
||||||
tscError("SML:0x%"PRIx64" Unable to parse timestamp", info->id);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
tscDebug("SML:0x%"PRIx64" Parse timestamp finished", info->id);
|
|
||||||
|
|
||||||
//Parse value
|
|
||||||
ret = parseTelnetMetricValue(&smlData->fields, &smlData->fieldNum, &index, info);
|
|
||||||
if (ret) {
|
|
||||||
tscError("SML:0x%"PRIx64" Unable to parse metric value", info->id);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
tscDebug("SML:0x%"PRIx64" Parse value finished", info->id);
|
|
||||||
|
|
||||||
//Parse tagKVs
|
|
||||||
SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
|
||||||
ret = parseTelnetTagKvs(&smlData->fields, &smlData->fieldNum, &index, smlData->childTableName, keyHashTable, info);
|
|
||||||
if (ret) {
|
|
||||||
tscError("SML:0x%"PRIx64" Unable to parse tags", info->id);
|
|
||||||
taosHashCleanup(keyHashTable);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
tscDebug("SML:0x%"PRIx64" Parse tags finished", info->id);
|
|
||||||
taosHashCleanup(keyHashTable);
|
|
||||||
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,319 @@
|
||||||
|
#include <ctype.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
|
#include "hash.h"
|
||||||
|
#include "taos.h"
|
||||||
|
|
||||||
|
#include "tscUtil.h"
|
||||||
|
#include "tsclient.h"
|
||||||
|
#include "tscLog.h"
|
||||||
|
|
||||||
|
#include "tscParseLine.h"
|
||||||
|
//=========================================================================
|
||||||
|
// telnet style API parser
|
||||||
|
static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index, SSmlLinesInfo* info) {
|
||||||
|
const char *cur = *index;
|
||||||
|
uint16_t len = 0;
|
||||||
|
|
||||||
|
pSml->stableName = tcalloc(TSDB_TABLE_NAME_LEN + 1, 1); // +1 to avoid 1772 line over write
|
||||||
|
if (pSml->stableName == NULL){
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
if (isdigit(*cur)) {
|
||||||
|
tscError("SML:0x%"PRIx64" Metric cannnot start with digit", info->id);
|
||||||
|
tfree(pSml->stableName);
|
||||||
|
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (*cur != '\0') {
|
||||||
|
if (len > TSDB_TABLE_NAME_LEN) {
|
||||||
|
tscError("SML:0x%"PRIx64" Metric cannot exceeds 193 characters", info->id);
|
||||||
|
tfree(pSml->stableName);
|
||||||
|
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*cur == ' ') {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
pSml->stableName[len] = *cur;
|
||||||
|
cur++;
|
||||||
|
len++;
|
||||||
|
}
|
||||||
|
pSml->stableName[len] = '\0';
|
||||||
|
*index = cur + 1;
|
||||||
|
tscDebug("SML:0x%"PRIx64" Stable name in metric:%s|len:%d", info->id, pSml->stableName, len);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char **index, SSmlLinesInfo* info) {
|
||||||
|
//Timestamp must be the first KV to parse
|
||||||
|
assert(*num_kvs == 0);
|
||||||
|
|
||||||
|
const char *start, *cur;
|
||||||
|
int32_t ret = TSDB_CODE_SUCCESS;
|
||||||
|
int len = 0;
|
||||||
|
char key[] = "_ts";
|
||||||
|
char *value = NULL;
|
||||||
|
|
||||||
|
start = cur = *index;
|
||||||
|
*pTS = calloc(1, sizeof(TAOS_SML_KV));
|
||||||
|
|
||||||
|
while(*cur != '\0') {
|
||||||
|
if (*cur == ' ') {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
cur++;
|
||||||
|
len++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (len > 0) {
|
||||||
|
value = calloc(len + 1, 1);
|
||||||
|
memcpy(value, start, len);
|
||||||
|
} else {
|
||||||
|
free(*pTS);
|
||||||
|
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = convertSmlTimeStamp(*pTS, value, len, info);
|
||||||
|
if (ret) {
|
||||||
|
free(value);
|
||||||
|
free(*pTS);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
free(value);
|
||||||
|
|
||||||
|
(*pTS)->key = calloc(sizeof(key), 1);
|
||||||
|
memcpy((*pTS)->key, key, sizeof(key));
|
||||||
|
|
||||||
|
*num_kvs += 1;
|
||||||
|
*index = cur + 1;
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const char **index, SSmlLinesInfo* info) {
|
||||||
|
//SKip timestamp
|
||||||
|
TAOS_SML_KV *pVal = *pKVs + sizeof(TAOS_SML_KV);
|
||||||
|
const char *start, *cur;
|
||||||
|
int32_t ret = TSDB_CODE_SUCCESS;
|
||||||
|
int len = 0;
|
||||||
|
char key[] = "value";
|
||||||
|
char *value = NULL;
|
||||||
|
|
||||||
|
start = cur = *index;
|
||||||
|
pVal = calloc(1, sizeof(TAOS_SML_KV));
|
||||||
|
|
||||||
|
while(*cur != '\0') {
|
||||||
|
if (*cur == ' ') {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
cur++;
|
||||||
|
len++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (len > 0) {
|
||||||
|
value = calloc(len + 1, 1);
|
||||||
|
memcpy(value, start, len);
|
||||||
|
} else {
|
||||||
|
free(pVal);
|
||||||
|
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!convertSmlValueType(pVal, value, len, info)) {
|
||||||
|
tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
|
||||||
|
info->id, value);
|
||||||
|
free(value);
|
||||||
|
free(pVal);
|
||||||
|
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
||||||
|
}
|
||||||
|
free(value);
|
||||||
|
|
||||||
|
pVal->key = calloc(sizeof(key), 1);
|
||||||
|
memcpy(pVal->key, key, sizeof(key));
|
||||||
|
*num_kvs += 1;
|
||||||
|
|
||||||
|
*index = cur + 1;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash, SSmlLinesInfo* info) {
|
||||||
|
const char *cur = *index;
|
||||||
|
char key[TSDB_COL_NAME_LEN + 1]; // +1 to avoid key[len] over write
|
||||||
|
uint16_t len = 0;
|
||||||
|
|
||||||
|
//key field cannot start with digit
|
||||||
|
if (isdigit(*cur)) {
|
||||||
|
tscError("SML:0x%"PRIx64" Tag key cannnot start with digit", info->id);
|
||||||
|
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
||||||
|
}
|
||||||
|
while (*cur != '\0') {
|
||||||
|
if (len > TSDB_COL_NAME_LEN) {
|
||||||
|
tscError("SML:0x%"PRIx64" Key field cannot exceeds 65 characters", info->id);
|
||||||
|
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
||||||
|
}
|
||||||
|
if (*cur == '=') {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
key[len] = *cur;
|
||||||
|
cur++;
|
||||||
|
len++;
|
||||||
|
}
|
||||||
|
key[len] = '\0';
|
||||||
|
|
||||||
|
if (checkDuplicateKey(key, pHash, info)) {
|
||||||
|
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
pKV->key = calloc(len + 1, 1);
|
||||||
|
memcpy(pKV->key, key, len + 1);
|
||||||
|
//tscDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len);
|
||||||
|
*index = cur + 1;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool parseTelnetTagValue(TAOS_SML_KV *pKV, const char **index,
|
||||||
|
bool *is_last_kv, SSmlLinesInfo* info) {
|
||||||
|
const char *start, *cur;
|
||||||
|
char *value = NULL;
|
||||||
|
uint16_t len = 0;
|
||||||
|
start = cur = *index;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
// ',' or '\0' identifies a value
|
||||||
|
if (*cur == ',' || *cur == '\0') {
|
||||||
|
// '\0' indicates end of value
|
||||||
|
*is_last_kv = (*cur == '\0') ? true : false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
cur++;
|
||||||
|
len++;
|
||||||
|
}
|
||||||
|
|
||||||
|
value = calloc(len + 1, 1);
|
||||||
|
memcpy(value, start, len);
|
||||||
|
value[len] = '\0';
|
||||||
|
if (!convertSmlValueType(pKV, value, len, info)) {
|
||||||
|
tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
|
||||||
|
info->id, value);
|
||||||
|
//free previous alocated key field
|
||||||
|
free(pKV->key);
|
||||||
|
pKV->key = NULL;
|
||||||
|
free(value);
|
||||||
|
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
||||||
|
}
|
||||||
|
free(value);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs,
|
||||||
|
const char **index, char* childTableName,
|
||||||
|
SHashObj *pHash, SSmlLinesInfo* info) {
|
||||||
|
const char *cur = *index;
|
||||||
|
int32_t ret = TSDB_CODE_SUCCESS;
|
||||||
|
TAOS_SML_KV *pkv;
|
||||||
|
bool is_last_kv = false;
|
||||||
|
|
||||||
|
int32_t capacity = 4;
|
||||||
|
*pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
|
||||||
|
pkv = *pKVs;
|
||||||
|
|
||||||
|
while (*cur != '\0') {
|
||||||
|
ret = parseTelnetTagKey(pkv, &cur, pHash, info);
|
||||||
|
if (ret) {
|
||||||
|
tscError("SML:0x%"PRIx64" Unable to parse key", info->id);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
ret = parseTelnetTagValue(pkv, &cur, &is_last_kv, info);
|
||||||
|
if (ret) {
|
||||||
|
tscError("SML:0x%"PRIx64" Unable to parse value", info->id);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
if ((strcasecmp(pkv->key, "ID") == 0) && pkv->type == TSDB_DATA_TYPE_BINARY) {
|
||||||
|
ret = isValidChildTableName(pkv->value, pkv->length);
|
||||||
|
if (ret) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
childTableName = malloc(pkv->length + 1);
|
||||||
|
memcpy(childTableName, pkv->value, pkv->length);
|
||||||
|
childTableName[pkv->length] = '\0';
|
||||||
|
free(pkv->key);
|
||||||
|
free(pkv->value);
|
||||||
|
} else {
|
||||||
|
*num_kvs += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_last_kv) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
//reallocate addtional memory for more kvs
|
||||||
|
TAOS_SML_KV *more_kvs = NULL;
|
||||||
|
|
||||||
|
if ((*num_kvs + 1) > capacity) {
|
||||||
|
capacity *= 3; capacity /= 2;
|
||||||
|
more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
|
||||||
|
} else {
|
||||||
|
more_kvs = *pKVs;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!more_kvs) {
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
*pKVs = more_kvs;
|
||||||
|
//move pKV points to next TAOS_SML_KV block
|
||||||
|
pkv = *pKVs + *num_kvs;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) {
|
||||||
|
const char* index = line;
|
||||||
|
int32_t ret = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
//Parse metric
|
||||||
|
ret = parseTelnetMetric(smlData, &index, info);
|
||||||
|
if (ret) {
|
||||||
|
tscError("SML:0x%"PRIx64" Unable to parse metric", info->id);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
tscDebug("SML:0x%"PRIx64" Parse metric finished", info->id);
|
||||||
|
|
||||||
|
//Parse timestamp
|
||||||
|
ret = parseTelnetTimeStamp(&smlData->fields, &smlData->fieldNum, &index, info);
|
||||||
|
if (ret) {
|
||||||
|
tscError("SML:0x%"PRIx64" Unable to parse timestamp", info->id);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
tscDebug("SML:0x%"PRIx64" Parse timestamp finished", info->id);
|
||||||
|
|
||||||
|
//Parse value
|
||||||
|
ret = parseTelnetMetricValue(&smlData->fields, &smlData->fieldNum, &index, info);
|
||||||
|
if (ret) {
|
||||||
|
tscError("SML:0x%"PRIx64" Unable to parse metric value", info->id);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
tscDebug("SML:0x%"PRIx64" Parse value finished", info->id);
|
||||||
|
|
||||||
|
//Parse tagKVs
|
||||||
|
SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||||
|
ret = parseTelnetTagKvs(&smlData->fields, &smlData->fieldNum, &index, smlData->childTableName, keyHashTable, info);
|
||||||
|
if (ret) {
|
||||||
|
tscError("SML:0x%"PRIx64" Unable to parse tags", info->id);
|
||||||
|
taosHashCleanup(keyHashTable);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
tscDebug("SML:0x%"PRIx64" Parse tags finished", info->id);
|
||||||
|
taosHashCleanup(keyHashTable);
|
||||||
|
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue