From 07123ac8d07faf3f83a641d9248f4f0c845e94e0 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 30 Aug 2021 16:08:08 +0800 Subject: [PATCH] [TD-6442]: Support OpenTSDB telnet style data import format --- src/client/src/tscParseLineProtocol.c | 177 ++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 778c0cfb47..d2b0bb847c 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -2212,3 +2212,180 @@ cleanup: return code; } +//========================================================================= +// telnet style API parser +static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index, SSmlLinesInfo* info) { + const char *cur = *index; + uint16_t len = 0; + + pSml->stableName = tcalloc(TSDB_TABLE_NAME_LEN + 1, 1); // +1 to avoid 1772 line over write + if (pSml->stableName == NULL){ + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + if (isdigit(*cur)) { + tscError("SML:0x%"PRIx64" Metric cannnot start with digit", info->id); + tfree(pSml->stableName); + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + + while (*cur != '\0') { + if (len > TSDB_TABLE_NAME_LEN) { + tscError("SML:0x%"PRIx64" Metric cannot exceeds 193 characters", info->id); + tfree(pSml->stableName); + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + + if (*cur == ' ') { + break; + } + + pSml->stableName[len] = *cur; + cur++; + len++; + } + pSml->stableName[len] = '\0'; + *index = cur + 1; + tscDebug("SML:0x%"PRIx64" Stable name in metric:%s|len:%d", info->id, pSml->stableName, len); + + return TSDB_CODE_SUCCESS; +} + +static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char **index, SSmlLinesInfo* info) { + //Timestamp must be the first KV to parse + assert(*num_kvs == 0); + + const char *start, *cur; + int32_t ret = TSDB_CODE_SUCCESS; + int len = 0; + char key[] = "_ts"; + char *value = NULL; + + start = cur = *index; + *pTS = calloc(1, sizeof(TAOS_SML_KV)); + + while(*cur != '\0') { + if (*cur == ' ') { + break; + } + cur++; + len++; + } + + if (len > 0) { + value = calloc(len + 1, 1); + memcpy(value, start, len); + } else { + free(*pTS); + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + + ret = convertSmlTimeStamp(*pTS, value, len, info); + if (ret) { + free(value); + free(*pTS); + return ret; + } + free(value); + + (*pTS)->key = calloc(sizeof(key), 1); + memcpy((*pTS)->key, key, sizeof(key)); + + *num_kvs += 1; + *index = cur + 1; + + return ret; +} + +static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const char **index, SSmlLinesInfo* info) { + //SKip timestamp + TAOS_SML_KV *pVal = *pKVs + sizeof(TAOS_SML_KV); + const char *start, *cur; + int32_t ret = TSDB_CODE_SUCCESS; + int len = 0; + char key[] = "value"; + char *value = NULL; + + start = cur = *index; + pVal = calloc(1, sizeof(TAOS_SML_KV)); + + while(*cur != '\0') { + if (*cur == ' ') { + break; + } + cur++; + len++; + } + + if (len > 0) { + value = calloc(len + 1, 1); + memcpy(value, start, len); + } else { + free(pVal); + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + + if (!convertSmlValueType(pVal, value, len, info)) { + tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type", + info->id, value); + free(value); + free(pVal); + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + free(value); + + pVal->key = calloc(sizeof(key), 1); + memcpy(pVal->key, key, sizeof(key)); + *num_kvs += 1; + + *index = cur + 1; + return ret; +} + +static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs, + const char **index, TAOS_SML_DATA_POINT* smlData, + SHashObj *pHash, SSmlLinesInfo* info) { + return 0; +} + +int32_t tscParseTelnetAPI(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) { + const char* index = sql; + int32_t ret = TSDB_CODE_SUCCESS; + + //Parse metric + ret = parseTelnetMetric(smlData, &index, info); + if (ret) { + tscError("SML:0x%"PRIx64" Unable to parse metric", info->id); + return ret; + } + tscDebug("SML:0x%"PRIx64" Parse metric finished", info->id); + + //Parse timestamp + ret = parseTelnetTimeStamp(&smlData->fields, &smlData->fieldNum, &index, info); + if (ret) { + tscError("SML:0x%"PRIx64" Unable to parse timestamp", info->id); + return ret; + } + tscDebug("SML:0x%"PRIx64" Parse timestamp finished", info->id); + + //Parse value + ret = parseTelnetMetricValue(&smlData->fields, &smlData->fieldNum, &index, info); + if (ret) { + tscError("SML:0x%"PRIx64" Unable to parse metric value", info->id); + return ret; + } + tscDebug("SML:0x%"PRIx64" Parse tags finished, num of tags:%d", info->id, smlData->tagNum); + + //Parse tagKVs + SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + ret = parseTelnetTagKvs(&smlData->fields, &smlData->fieldNum, &index, smlData, keyHashTable, info); + if (ret) { + tscError("SML:0x%"PRIx64" Unable to parse field", info->id); + taosHashCleanup(keyHashTable); + return ret; + } + tscDebug("SML:0x%"PRIx64" Parse fields finished, num of fields:%d", info->id, smlData->fieldNum); + taosHashCleanup(keyHashTable); + + + return TSDB_CODE_SUCCESS; +}