[TD-6442]<feature>: Support OpenTSDB telnet style data import format

This commit is contained in:
Ganlin Zhao 2021-08-30 16:08:08 +08:00
parent 9c1813dca7
commit bc95808b96
1 changed files with 28 additions and 29 deletions

View File

@ -72,7 +72,7 @@ static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char
char *value = NULL; char *value = NULL;
start = cur = *index; start = cur = *index;
*pTS = calloc(1, sizeof(TAOS_SML_KV)); *pTS = tcalloc(1, sizeof(TAOS_SML_KV));
while(*cur != '\0') { while(*cur != '\0') {
if (*cur == ' ') { if (*cur == ' ') {
@ -83,22 +83,22 @@ static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char
} }
if (len > 0) { if (len > 0) {
value = calloc(len + 1, 1); value = tcalloc(len + 1, 1);
memcpy(value, start, len); memcpy(value, start, len);
} else { } else {
free(*pTS); tfree(*pTS);
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
} }
ret = convertSmlTimeStamp(*pTS, value, len, info); ret = convertSmlTimeStamp(*pTS, value, len, info);
if (ret) { if (ret) {
free(value); tfree(value);
free(*pTS); tfree(*pTS);
return ret; return ret;
} }
free(value); tfree(value);
(*pTS)->key = calloc(sizeof(key), 1); (*pTS)->key = tcalloc(sizeof(key), 1);
memcpy((*pTS)->key, key, sizeof(key)); memcpy((*pTS)->key, key, sizeof(key));
*num_kvs += 1; *num_kvs += 1;
@ -117,7 +117,7 @@ static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const ch
char *value = NULL; char *value = NULL;
start = cur = *index; start = cur = *index;
pVal = calloc(1, sizeof(TAOS_SML_KV)); pVal = tcalloc(1, sizeof(TAOS_SML_KV));
while(*cur != '\0') { while(*cur != '\0') {
if (*cur == ' ') { if (*cur == ' ') {
@ -128,23 +128,23 @@ static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const ch
} }
if (len > 0) { if (len > 0) {
value = calloc(len + 1, 1); value = tcalloc(len + 1, 1);
memcpy(value, start, len); memcpy(value, start, len);
} else { } else {
free(pVal); tfree(pVal);
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
} }
if (!convertSmlValueType(pVal, value, len, info)) { if (!convertSmlValueType(pVal, value, len, info)) {
tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type", tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
info->id, value); info->id, value);
free(value); tfree(value);
free(pVal); tfree(pVal);
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
} }
free(value); tfree(value);
pVal->key = calloc(sizeof(key), 1); pVal->key = tcalloc(sizeof(key), 1);
memcpy(pVal->key, key, sizeof(key)); memcpy(pVal->key, key, sizeof(key));
*num_kvs += 1; *num_kvs += 1;
@ -181,7 +181,7 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **index, SHashObj
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
} }
pKV->key = calloc(len + 1, 1); pKV->key = tcalloc(len + 1, 1);
memcpy(pKV->key, key, len + 1); memcpy(pKV->key, key, len + 1);
//tscDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len); //tscDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len);
*index = cur + 1; *index = cur + 1;
@ -207,19 +207,18 @@ static bool parseTelnetTagValue(TAOS_SML_KV *pKV, const char **index,
len++; len++;
} }
value = calloc(len + 1, 1); value = tcalloc(len + 1, 1);
memcpy(value, start, len); memcpy(value, start, len);
value[len] = '\0'; value[len] = '\0';
if (!convertSmlValueType(pKV, value, len, info)) { if (!convertSmlValueType(pKV, value, len, info)) {
tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type", tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
info->id, value); info->id, value);
//free previous alocated key field //free previous alocated key field
free(pKV->key); tfree(pKV->key);
pKV->key = NULL; tfree(value);
free(value);
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
} }
free(value); tfree(value);
*index = (*cur == '\0') ? cur : cur + 1; *index = (*cur == '\0') ? cur : cur + 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -234,7 +233,7 @@ static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs,
bool is_last_kv = false; bool is_last_kv = false;
int32_t capacity = 4; int32_t capacity = 4;
*pKVs = calloc(capacity, sizeof(TAOS_SML_KV)); *pKVs = tcalloc(capacity, sizeof(TAOS_SML_KV));
pkv = *pKVs; pkv = *pKVs;
while (*cur != '\0') { while (*cur != '\0') {
@ -256,8 +255,8 @@ static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs,
childTableName = malloc(pkv->length + 1); childTableName = malloc(pkv->length + 1);
memcpy(childTableName, pkv->value, pkv->length); memcpy(childTableName, pkv->value, pkv->length);
childTableName[pkv->length] = '\0'; childTableName[pkv->length] = '\0';
free(pkv->key); tfree(pkv->key);
free(pkv->value); tfree(pkv->value);
} else { } else {
*num_kvs += 1; *num_kvs += 1;
} }
@ -347,7 +346,7 @@ int32_t tscParseTelnetLines(char* lines[], int numLines, SArray* points, SArray*
int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines) { int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines) {
int32_t code = 0; int32_t code = 0;
SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo)); SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
info->id = genUID(); info->id = genUID();
if (numLines <= 0 || numLines > 65536) { if (numLines <= 0 || numLines > 65536) {
@ -359,7 +358,7 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines) {
for (int i = 0; i < numLines; ++i) { for (int i = 0; i < numLines; ++i) {
if (lines[i] == NULL) { if (lines[i] == NULL) {
tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i); tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i);
free(info); tfree(info);
code = TSDB_CODE_TSC_APP_ERROR; code = TSDB_CODE_TSC_APP_ERROR;
return code; return code;
} }
@ -368,7 +367,7 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines) {
SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT)); SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT));
if (lpPoints == NULL) { if (lpPoints == NULL) {
tscError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id); tscError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
free(info); tfree(info);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
@ -396,14 +395,14 @@ cleanup:
taosArrayDestroy(lpPoints); taosArrayDestroy(lpPoints);
free(info); tfree(info);
return code; return code;
} }
int taos_telnet_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { int taos_telnet_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo)); SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
info->id = genUID(); info->id = genUID();
int code = tscSmlInsert(taos, points, numPoint, info); int code = tscSmlInsert(taos, points, numPoint, info);
free(info); tfree(info);
return code; return code;
} }