feat:add new interface for schemaless
This commit is contained in:
parent
08397dc698
commit
0029ddd232
|
@ -198,6 +198,7 @@ DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res);
|
|||
|
||||
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
|
||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
|
||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision);
|
||||
|
||||
/* --------------------------TMQ INTERFACE------------------------------- */
|
||||
|
||||
|
|
|
@ -28,8 +28,8 @@
|
|||
#define QUOTE '"'
|
||||
#define SLASH '\\'
|
||||
|
||||
#define JUMP_SPACE(sql) \
|
||||
while (*sql != '\0') { \
|
||||
#define JUMP_SPACE(sql, sqlEnd) \
|
||||
while (sql < sqlEnd) { \
|
||||
if (*sql == SPACE) \
|
||||
sql++; \
|
||||
else \
|
||||
|
@ -915,16 +915,17 @@ static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
|
|||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
|
||||
static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
|
||||
static int32_t smlParseInfluxString(const char *sql, const char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
|
||||
if (!sql) return TSDB_CODE_SML_INVALID_DATA;
|
||||
JUMP_SPACE(sql)
|
||||
JUMP_SPACE(sql, sqlEnd)
|
||||
if (*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA;
|
||||
elements->measure = sql;
|
||||
|
||||
// parse measure
|
||||
while (*sql != '\0') {
|
||||
while (sql < sqlEnd) {
|
||||
if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
|
||||
MOVE_FORWARD_ONE(sql, strlen(sql) + 1);
|
||||
MOVE_FORWARD_ONE(sql, sqlEnd - sql);
|
||||
sqlEnd--;
|
||||
continue;
|
||||
}
|
||||
if (IS_COMMA(sql)) {
|
||||
|
@ -948,7 +949,7 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm
|
|||
} else {
|
||||
if (*sql == COMMA) sql++;
|
||||
elements->tags = sql;
|
||||
while (*sql != '\0') {
|
||||
while (sql < sqlEnd) {
|
||||
if (IS_SPACE(sql)) {
|
||||
break;
|
||||
}
|
||||
|
@ -959,10 +960,10 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm
|
|||
elements->measureTagsLen = sql - elements->measure;
|
||||
|
||||
// parse cols
|
||||
JUMP_SPACE(sql)
|
||||
JUMP_SPACE(sql, sqlEnd)
|
||||
elements->cols = sql;
|
||||
bool isInQuote = false;
|
||||
while (*sql != '\0') {
|
||||
while (sql < sqlEnd) {
|
||||
if (IS_QUOTE(sql)) {
|
||||
isInQuote = !isInQuote;
|
||||
}
|
||||
|
@ -982,10 +983,10 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm
|
|||
}
|
||||
|
||||
// parse timestamp
|
||||
JUMP_SPACE(sql)
|
||||
JUMP_SPACE(sql, sqlEnd)
|
||||
elements->timestamp = sql;
|
||||
while (*sql != '\0') {
|
||||
if (*sql == SPACE) {
|
||||
while (sql < sqlEnd) {
|
||||
if (isspace(*sql)) {
|
||||
break;
|
||||
}
|
||||
sql++;
|
||||
|
@ -995,8 +996,8 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void smlParseTelnetElement(const char **sql, const char **data, int32_t *len) {
|
||||
while (**sql != '\0') {
|
||||
static void smlParseTelnetElement(const char **sql, const char *sqlEnd, const char **data, int32_t *len) {
|
||||
while (*sql < sqlEnd) {
|
||||
if (**sql != SPACE && !(*data)) {
|
||||
*data = *sql;
|
||||
} else if (**sql == SPACE && *data) {
|
||||
|
@ -1007,19 +1008,19 @@ static void smlParseTelnetElement(const char **sql, const char **data, int32_t *
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTableName, SHashObj *dumplicateKey,
|
||||
static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *cols, char *childTableName, SHashObj *dumplicateKey,
|
||||
SSmlMsgBuf *msg) {
|
||||
const char *sql = data;
|
||||
size_t childTableNameLen = strlen(tsSmlChildTableName);
|
||||
while (*sql != '\0') {
|
||||
JUMP_SPACE(sql)
|
||||
while (sql < sqlEnd) {
|
||||
JUMP_SPACE(sql, sqlEnd)
|
||||
if (*sql == '\0') break;
|
||||
|
||||
const char *key = sql;
|
||||
int32_t keyLen = 0;
|
||||
|
||||
// parse key
|
||||
while (*sql != '\0') {
|
||||
while (sql < sqlEnd) {
|
||||
if (*sql == SPACE) {
|
||||
smlBuildInvalidDataMsg(msg, "invalid data", sql);
|
||||
return TSDB_CODE_SML_INVALID_DATA;
|
||||
|
@ -1044,7 +1045,7 @@ static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTab
|
|||
// parse value
|
||||
const char *value = sql;
|
||||
int32_t valueLen = 0;
|
||||
while (*sql != '\0') {
|
||||
while (sql < sqlEnd) {
|
||||
// parse value
|
||||
if (*sql == SPACE) {
|
||||
break;
|
||||
|
@ -1089,11 +1090,11 @@ static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTab
|
|||
}
|
||||
|
||||
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
|
||||
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTableInfo *tinfo, SArray *cols) {
|
||||
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlTableInfo *tinfo, SArray *cols) {
|
||||
if (!sql) return TSDB_CODE_SML_INVALID_DATA;
|
||||
|
||||
// parse metric
|
||||
smlParseTelnetElement(&sql, &tinfo->sTableName, &tinfo->sTableNameLen);
|
||||
smlParseTelnetElement(&sql, sqlEnd, &tinfo->sTableName, &tinfo->sTableNameLen);
|
||||
if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
|
||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
|
||||
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
||||
|
@ -1102,7 +1103,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTable
|
|||
// parse timestamp
|
||||
const char *timestamp = NULL;
|
||||
int32_t tLen = 0;
|
||||
smlParseTelnetElement(&sql, ×tamp, &tLen);
|
||||
smlParseTelnetElement(&sql, sqlEnd, ×tamp, &tLen);
|
||||
if (!timestamp || tLen == 0) {
|
||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
|
||||
return TSDB_CODE_SML_INVALID_DATA;
|
||||
|
@ -1117,7 +1118,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTable
|
|||
// parse value
|
||||
const char *value = NULL;
|
||||
int32_t valueLen = 0;
|
||||
smlParseTelnetElement(&sql, &value, &valueLen);
|
||||
smlParseTelnetElement(&sql, sqlEnd, &value, &valueLen);
|
||||
if (!value || valueLen == 0) {
|
||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
|
@ -1135,7 +1136,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTable
|
|||
}
|
||||
|
||||
// parse tags
|
||||
ret = smlParseTelnetTags(sql, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
|
||||
ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
|
||||
return ret;
|
||||
|
@ -2061,11 +2062,11 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *
|
|||
}
|
||||
/************* TSDB_SML_JSON_PROTOCOL function end **************/
|
||||
|
||||
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) {
|
||||
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) {
|
||||
SSmlLineInfo elements = {0};
|
||||
uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql);
|
||||
|
||||
int ret = smlParseInfluxString(sql, &elements, &info->msgBuf);
|
||||
int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id);
|
||||
return ret;
|
||||
|
@ -2170,7 +2171,7 @@ static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t smlParseTelnetLine(SSmlHandle *info, void *data) {
|
||||
static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) {
|
||||
int ret = TSDB_CODE_SUCCESS;
|
||||
SSmlTableInfo *tinfo = smlBuildTableInfo();
|
||||
if (!tinfo) {
|
||||
|
@ -2184,7 +2185,7 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data) {
|
|||
}
|
||||
|
||||
if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
|
||||
ret = smlParseTelnetString(info, (const char *)data, tinfo, cols);
|
||||
ret = smlParseTelnetString(info, (const char *)data, data + len, tinfo, cols);
|
||||
} else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
||||
ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
|
||||
} else {
|
||||
|
@ -2275,7 +2276,7 @@ static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
|
|||
|
||||
for (int32_t i = 0; i < payloadNum; ++i) {
|
||||
cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(root)) ? root : cJSON_GetArrayItem(root, i);
|
||||
ret = smlParseTelnetLine(info, dataPoint);
|
||||
ret = smlParseTelnetLine(info, dataPoint, -1);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
|
||||
goto end;
|
||||
|
@ -2364,10 +2365,14 @@ static void smlPrintStatisticInfo(SSmlHandle *info) {
|
|||
info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
|
||||
}
|
||||
|
||||
static int32_t smlParseLine(SSmlHandle *info, char *lines[], int numLines) {
|
||||
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char* rawLineEnd, int numLines) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
||||
if(lines){
|
||||
code = smlParseJSON(info, *lines);
|
||||
}else if(rawLine){
|
||||
code = smlParseJSON(info, rawLine);
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, *lines);
|
||||
return code;
|
||||
|
@ -2376,10 +2381,25 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], int numLines) {
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < numLines; ++i) {
|
||||
char *tmp = NULL;
|
||||
int len = 0;
|
||||
if(lines){
|
||||
tmp = lines[i];
|
||||
len = strlen(tmp);
|
||||
}else if(rawLine){
|
||||
tmp = rawLine;
|
||||
while(rawLine < rawLineEnd){
|
||||
if(*(rawLine++) == '\n'){
|
||||
break;
|
||||
}
|
||||
len++;
|
||||
}
|
||||
}
|
||||
|
||||
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
|
||||
code = smlParseInfluxLine(info, lines[i]);
|
||||
code = smlParseInfluxLine(info, tmp, len);
|
||||
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
|
||||
code = smlParseTelnetLine(info, lines[i]);
|
||||
code = smlParseTelnetLine(info, tmp, len);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -2391,13 +2411,13 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], int numLines) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int smlProcess(SSmlHandle *info, char *lines[], int numLines) {
|
||||
static int smlProcess(SSmlHandle *info, char *lines[], char* rawLine, char* rawLineEnd, int numLines) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t retryNum = 0;
|
||||
|
||||
info->cost.parseTime = taosGetTimestampUs();
|
||||
|
||||
code = smlParseLine(info, lines, numLines);
|
||||
code = smlParseLine(info, lines, rawLine, rawLineEnd, numLines);
|
||||
if (code != 0) {
|
||||
uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
|
||||
return code;
|
||||
|
@ -2490,39 +2510,8 @@ static void smlInsertCallback(void *param, void *res, int32_t code) {
|
|||
smlDestroyInfo(info);
|
||||
}
|
||||
|
||||
/**
|
||||
* taos_schemaless_insert() parse and insert data points into database according to
|
||||
* different protocol.
|
||||
*
|
||||
* @param $lines input array may contain multiple lines, each line indicates a data point.
|
||||
* If protocol=2 is used input array should contain single JSON
|
||||
* string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
|
||||
* multiple data points in JSON format, should include them in $JSON_string
|
||||
* as a JSON array.
|
||||
* @param $numLines indicates how many data points in $lines.
|
||||
* If protocol = 2 is used this param will be ignored as $lines should
|
||||
* contain single JSON string.
|
||||
* @param $protocol indicates which protocol to use for parsing:
|
||||
* 0 - influxDB line protocol
|
||||
* 1 - OpenTSDB telnet line protocol
|
||||
* 2 - OpenTSDB JSON format protocol
|
||||
* @return return zero for successful insertion. Otherwise return none-zero error code of
|
||||
* failure reason.
|
||||
*
|
||||
*/
|
||||
|
||||
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
|
||||
if (NULL == taos) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT);
|
||||
if (!request) {
|
||||
uError("SML:taos_schemaless_insert error request is null");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd, int numLines, int protocol, int precision) {
|
||||
int batchs = 0;
|
||||
STscObj *pTscObj = request->pTscObj;
|
||||
|
||||
|
@ -2546,12 +2535,6 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
|
|||
goto end;
|
||||
}
|
||||
|
||||
if (!lines) {
|
||||
request->code = TSDB_CODE_SML_INVALID_DATA;
|
||||
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
|
||||
request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
|
||||
smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
|
||||
|
@ -2602,15 +2585,28 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
|
|||
info->affectedRows = perBatch;
|
||||
info->pRequest->body.queryFp = smlInsertCallback;
|
||||
info->pRequest->body.param = info;
|
||||
int32_t code = smlProcess(info, lines, perBatch);
|
||||
int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch);
|
||||
if(lines){
|
||||
lines += perBatch;
|
||||
}
|
||||
if(rawLine){
|
||||
int num = 0;
|
||||
while(rawLine < rawLineEnd){
|
||||
if(*(rawLine++) == '\n'){
|
||||
num++;
|
||||
}
|
||||
if(num == perBatch){
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
info->pRequest->body.queryFp(info, req, code);
|
||||
}
|
||||
}
|
||||
tsem_wait(¶ms.sem);
|
||||
|
||||
end:
|
||||
end:
|
||||
taosThreadSpinDestroy(¶ms.lock);
|
||||
tsem_destroy(¶ms.sem);
|
||||
// ((STscObj *)taos)->schemalessType = 0;
|
||||
|
@ -2618,3 +2614,75 @@ end:
|
|||
uDebug("resultend:%s", request->msgBuf);
|
||||
return (TAOS_RES *)request;
|
||||
}
|
||||
|
||||
/**
|
||||
* taos_schemaless_insert() parse and insert data points into database according to
|
||||
* different protocol.
|
||||
*
|
||||
* @param $lines input array may contain multiple lines, each line indicates a data point.
|
||||
* If protocol=2 is used input array should contain single JSON
|
||||
* string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
|
||||
* multiple data points in JSON format, should include them in $JSON_string
|
||||
* as a JSON array.
|
||||
* @param $numLines indicates how many data points in $lines.
|
||||
* If protocol = 2 is used this param will be ignored as $lines should
|
||||
* contain single JSON string.
|
||||
* @param $protocol indicates which protocol to use for parsing:
|
||||
* 0 - influxDB line protocol
|
||||
* 1 - OpenTSDB telnet line protocol
|
||||
* 2 - OpenTSDB JSON format protocol
|
||||
* @return return zero for successful insertion. Otherwise return none-zero error code of
|
||||
* failure reason.
|
||||
*
|
||||
*/
|
||||
|
||||
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
|
||||
if (NULL == taos) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT);
|
||||
if (!request) {
|
||||
uError("SML:taos_schemaless_insert error request is null");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (!lines || numLines <= 0) {
|
||||
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
|
||||
request->code = TSDB_CODE_SML_INVALID_DATA;
|
||||
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
|
||||
return (TAOS_RES *)request;
|
||||
}
|
||||
|
||||
return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision);
|
||||
}
|
||||
|
||||
TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision){
|
||||
if (NULL == taos) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT);
|
||||
if (!request) {
|
||||
uError("SML:taos_schemaless_insert error request is null");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (!lines || len <= 0) {
|
||||
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
|
||||
request->code = TSDB_CODE_SML_INVALID_DATA;
|
||||
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
|
||||
return (TAOS_RES *)request;
|
||||
}
|
||||
|
||||
int numLines = 0;
|
||||
for(int i = 0; i < len; i++){
|
||||
if(lines[i] == '\n' || i == len - 1){
|
||||
numLines++;
|
||||
}
|
||||
}
|
||||
*totalRows = numLines;
|
||||
return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue