[TD-6442]<feature>: Support OpenTSDB telnet style data import format
This commit is contained in:
parent
58ff59641a
commit
ea7ca9b241
|
@ -52,14 +52,19 @@ typedef struct {
|
||||||
SHashObj* smlDataToSchema;
|
SHashObj* smlDataToSchema;
|
||||||
} SSmlLinesInfo;
|
} SSmlLinesInfo;
|
||||||
|
|
||||||
|
int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info);
|
||||||
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint);
|
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint);
|
||||||
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info);
|
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info);
|
||||||
int32_t isValidChildTableName(const char *pTbName, int16_t len);
|
int32_t isValidChildTableName(const char *pTbName, int16_t len);
|
||||||
|
|
||||||
bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
|
bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
|
||||||
uint16_t len, SSmlLinesInfo* info);
|
uint16_t len, SSmlLinesInfo* info);
|
||||||
|
|
||||||
int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
|
int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
|
||||||
uint16_t len, SSmlLinesInfo* info);
|
uint16_t len, SSmlLinesInfo* info);
|
||||||
|
|
||||||
|
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -13,6 +13,18 @@
|
||||||
#include "tscParseLine.h"
|
#include "tscParseLine.h"
|
||||||
//=========================================================================
|
//=========================================================================
|
||||||
// telnet style API parser
|
// telnet style API parser
|
||||||
|
static uint64_t HandleId = 0;
|
||||||
|
|
||||||
|
uint64_t genUID() {
|
||||||
|
uint64_t id;
|
||||||
|
|
||||||
|
do {
|
||||||
|
id = atomic_add_fetch_64(&HandleId, 1);
|
||||||
|
} while (id == 0);
|
||||||
|
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index, SSmlLinesInfo* info) {
|
static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index, SSmlLinesInfo* info) {
|
||||||
const char *cur = *index;
|
const char *cur = *index;
|
||||||
uint16_t len = 0;
|
uint16_t len = 0;
|
||||||
|
@ -317,3 +329,75 @@ int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData, SSmlL
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tscParseTelnetLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) {
|
||||||
|
for (int32_t i = 0; i < numLines; ++i) {
|
||||||
|
TAOS_SML_DATA_POINT point = {0};
|
||||||
|
int32_t code = tscParseTelnetLine(lines[i], &point, info);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tscError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]);
|
||||||
|
destroySmlDataPoint(&point);
|
||||||
|
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
|
||||||
|
} else {
|
||||||
|
tscDebug("SML:0x%"PRIx64" data point line parse success. line %d", info->id, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(points, &point);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo));
|
||||||
|
info->id = genUID();
|
||||||
|
|
||||||
|
if (numLines <= 0 || numLines > 65536) {
|
||||||
|
tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
|
||||||
|
code = TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < numLines; ++i) {
|
||||||
|
if (lines[i] == NULL) {
|
||||||
|
tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i);
|
||||||
|
free(info);
|
||||||
|
code = TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT));
|
||||||
|
if (lpPoints == NULL) {
|
||||||
|
tscError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
|
||||||
|
free(info);
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
tscDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]);
|
||||||
|
code = tscParseTelnetLines(lines, numLines, lpPoints, NULL, info);
|
||||||
|
size_t numPoints = taosArrayGetSize(lpPoints);
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
|
||||||
|
code = tscSmlInsert(taos, points, (int)numPoints, info);
|
||||||
|
if (code != 0) {
|
||||||
|
tscError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code)));
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
tscDebug("SML:0x%"PRIx64" taos_insert_lines finish inserting %d lines. code: %d", info->id, numLines, code);
|
||||||
|
points = TARRAY_GET_START(lpPoints);
|
||||||
|
numPoints = taosArrayGetSize(lpPoints);
|
||||||
|
for (int i=0; i<numPoints; ++i) {
|
||||||
|
destroySmlDataPoint(points+i);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(lpPoints);
|
||||||
|
|
||||||
|
free(info);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue