From 0029ddd232c2ebb338996b10538c3ecdb7a0f9be Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 17 Oct 2022 17:48:30 +0800 Subject: [PATCH] feat:add new interface for schemaless --- include/client/taos.h | 1 + source/client/src/clientSml.c | 220 ++++++++++++++++++++++------------ 2 files changed, 145 insertions(+), 76 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 270b647a77..1182a9c2f7 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -198,6 +198,7 @@ DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList); DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision); +DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision); /* --------------------------TMQ INTERFACE------------------------------- */ diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 24da5f7b70..a7b0aff42d 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -28,8 +28,8 @@ #define QUOTE '"' #define SLASH '\\' -#define JUMP_SPACE(sql) \ - while (*sql != '\0') { \ +#define JUMP_SPACE(sql, sqlEnd) \ + while (sql < sqlEnd) { \ if (*sql == SPACE) \ sql++; \ else \ @@ -915,16 +915,17 @@ static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) { return TSDB_CODE_TSC_INVALID_VALUE; } -static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSmlMsgBuf *msg) { +static int32_t smlParseInfluxString(const char *sql, const char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) { if (!sql) return TSDB_CODE_SML_INVALID_DATA; - JUMP_SPACE(sql) + JUMP_SPACE(sql, sqlEnd) if (*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA; elements->measure = sql; // parse measure - while (*sql != '\0') { + while (sql < sqlEnd) { if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) { - MOVE_FORWARD_ONE(sql, strlen(sql) + 1); + MOVE_FORWARD_ONE(sql, sqlEnd - sql); + sqlEnd--; continue; } if (IS_COMMA(sql)) { @@ -948,7 +949,7 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm } else { if (*sql == COMMA) sql++; elements->tags = sql; - while (*sql != '\0') { + while (sql < sqlEnd) { if (IS_SPACE(sql)) { break; } @@ -959,10 +960,10 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm elements->measureTagsLen = sql - elements->measure; // parse cols - JUMP_SPACE(sql) + JUMP_SPACE(sql, sqlEnd) elements->cols = sql; bool isInQuote = false; - while (*sql != '\0') { + while (sql < sqlEnd) { if (IS_QUOTE(sql)) { isInQuote = !isInQuote; } @@ -982,10 +983,10 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm } // parse timestamp - JUMP_SPACE(sql) + JUMP_SPACE(sql, sqlEnd) elements->timestamp = sql; - while (*sql != '\0') { - if (*sql == SPACE) { + while (sql < sqlEnd) { + if (isspace(*sql)) { break; } sql++; @@ -995,8 +996,8 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm return TSDB_CODE_SUCCESS; } -static void smlParseTelnetElement(const char **sql, const char **data, int32_t *len) { - while (**sql != '\0') { +static void smlParseTelnetElement(const char **sql, const char *sqlEnd, const char **data, int32_t *len) { + while (*sql < sqlEnd) { if (**sql != SPACE && !(*data)) { *data = *sql; } else if (**sql == SPACE && *data) { @@ -1007,19 +1008,19 @@ static void smlParseTelnetElement(const char **sql, const char **data, int32_t * } } -static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTableName, SHashObj *dumplicateKey, +static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *cols, char *childTableName, SHashObj *dumplicateKey, SSmlMsgBuf *msg) { const char *sql = data; size_t childTableNameLen = strlen(tsSmlChildTableName); - while (*sql != '\0') { - JUMP_SPACE(sql) + while (sql < sqlEnd) { + JUMP_SPACE(sql, sqlEnd) if (*sql == '\0') break; const char *key = sql; int32_t keyLen = 0; // parse key - while (*sql != '\0') { + while (sql < sqlEnd) { if (*sql == SPACE) { smlBuildInvalidDataMsg(msg, "invalid data", sql); return TSDB_CODE_SML_INVALID_DATA; @@ -1044,7 +1045,7 @@ static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTab // parse value const char *value = sql; int32_t valueLen = 0; - while (*sql != '\0') { + while (sql < sqlEnd) { // parse value if (*sql == SPACE) { break; @@ -1089,11 +1090,11 @@ static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTab } // format: =[ =] -static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTableInfo *tinfo, SArray *cols) { +static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlTableInfo *tinfo, SArray *cols) { if (!sql) return TSDB_CODE_SML_INVALID_DATA; // parse metric - smlParseTelnetElement(&sql, &tinfo->sTableName, &tinfo->sTableNameLen); + smlParseTelnetElement(&sql, sqlEnd, &tinfo->sTableName, &tinfo->sTableNameLen); if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql); return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; @@ -1102,7 +1103,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTable // parse timestamp const char *timestamp = NULL; int32_t tLen = 0; - smlParseTelnetElement(&sql, ×tamp, &tLen); + smlParseTelnetElement(&sql, sqlEnd, ×tamp, &tLen); if (!timestamp || tLen == 0) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql); return TSDB_CODE_SML_INVALID_DATA; @@ -1117,7 +1118,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTable // parse value const char *value = NULL; int32_t valueLen = 0; - smlParseTelnetElement(&sql, &value, &valueLen); + smlParseTelnetElement(&sql, sqlEnd, &value, &valueLen); if (!value || valueLen == 0) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql); return TSDB_CODE_TSC_INVALID_VALUE; @@ -1135,7 +1136,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTable } // parse tags - ret = smlParseTelnetTags(sql, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf); + ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf); if (ret != TSDB_CODE_SUCCESS) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql); return ret; @@ -2061,11 +2062,11 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * } /************* TSDB_SML_JSON_PROTOCOL function end **************/ -static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) { +static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) { SSmlLineInfo elements = {0}; uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql); - int ret = smlParseInfluxString(sql, &elements, &info->msgBuf); + int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf); if (ret != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id); return ret; @@ -2170,7 +2171,7 @@ static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) { return TSDB_CODE_SUCCESS; } -static int32_t smlParseTelnetLine(SSmlHandle *info, void *data) { +static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) { int ret = TSDB_CODE_SUCCESS; SSmlTableInfo *tinfo = smlBuildTableInfo(); if (!tinfo) { @@ -2184,7 +2185,7 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data) { } if (info->protocol == TSDB_SML_TELNET_PROTOCOL) { - ret = smlParseTelnetString(info, (const char *)data, tinfo, cols); + ret = smlParseTelnetString(info, (const char *)data, data + len, tinfo, cols); } else if (info->protocol == TSDB_SML_JSON_PROTOCOL) { ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols); } else { @@ -2275,7 +2276,7 @@ static int32_t smlParseJSON(SSmlHandle *info, char *payload) { for (int32_t i = 0; i < payloadNum; ++i) { cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(root)) ? root : cJSON_GetArrayItem(root, i); - ret = smlParseTelnetLine(info, dataPoint); + ret = smlParseTelnetLine(info, dataPoint, -1); if (ret != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id); goto end; @@ -2364,10 +2365,14 @@ static void smlPrintStatisticInfo(SSmlHandle *info) { info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime); } -static int32_t smlParseLine(SSmlHandle *info, char *lines[], int numLines) { +static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char* rawLineEnd, int numLines) { int32_t code = TSDB_CODE_SUCCESS; if (info->protocol == TSDB_SML_JSON_PROTOCOL) { - code = smlParseJSON(info, *lines); + if(lines){ + code = smlParseJSON(info, *lines); + }else if(rawLine){ + code = smlParseJSON(info, rawLine); + } if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, *lines); return code; @@ -2376,10 +2381,25 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], int numLines) { } for (int32_t i = 0; i < numLines; ++i) { + char *tmp = NULL; + int len = 0; + if(lines){ + tmp = lines[i]; + len = strlen(tmp); + }else if(rawLine){ + tmp = rawLine; + while(rawLine < rawLineEnd){ + if(*(rawLine++) == '\n'){ + break; + } + len++; + } + } + if (info->protocol == TSDB_SML_LINE_PROTOCOL) { - code = smlParseInfluxLine(info, lines[i]); + code = smlParseInfluxLine(info, tmp, len); } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) { - code = smlParseTelnetLine(info, lines[i]); + code = smlParseTelnetLine(info, tmp, len); } else { ASSERT(0); } @@ -2391,13 +2411,13 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], int numLines) { return code; } -static int smlProcess(SSmlHandle *info, char *lines[], int numLines) { +static int smlProcess(SSmlHandle *info, char *lines[], char* rawLine, char* rawLineEnd, int numLines) { int32_t code = TSDB_CODE_SUCCESS; int32_t retryNum = 0; info->cost.parseTime = taosGetTimestampUs(); - code = smlParseLine(info, lines, numLines); + code = smlParseLine(info, lines, rawLine, rawLineEnd, numLines); if (code != 0) { uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code)); return code; @@ -2490,39 +2510,8 @@ static void smlInsertCallback(void *param, void *res, int32_t code) { smlDestroyInfo(info); } -/** - * taos_schemaless_insert() parse and insert data points into database according to - * different protocol. - * - * @param $lines input array may contain multiple lines, each line indicates a data point. - * If protocol=2 is used input array should contain single JSON - * string(e.g. char *lines[] = {"$JSON_string"}). If need to insert - * multiple data points in JSON format, should include them in $JSON_string - * as a JSON array. - * @param $numLines indicates how many data points in $lines. - * If protocol = 2 is used this param will be ignored as $lines should - * contain single JSON string. - * @param $protocol indicates which protocol to use for parsing: - * 0 - influxDB line protocol - * 1 - OpenTSDB telnet line protocol - * 2 - OpenTSDB JSON format protocol - * @return return zero for successful insertion. Otherwise return none-zero error code of - * failure reason. - * - */ - -TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) { - if (NULL == taos) { - terrno = TSDB_CODE_TSC_DISCONNECTED; - return NULL; - } - - SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT); - if (!request) { - uError("SML:taos_schemaless_insert error request is null"); - return NULL; - } +TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd, int numLines, int protocol, int precision) { int batchs = 0; STscObj *pTscObj = request->pTscObj; @@ -2546,12 +2535,6 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr goto end; } - if (!lines) { - request->code = TSDB_CODE_SML_INVALID_DATA; - smlBuildInvalidDataMsg(&msg, "lines is null", NULL); - goto end; - } - if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) { request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE; smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL); @@ -2602,15 +2585,28 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr info->affectedRows = perBatch; info->pRequest->body.queryFp = smlInsertCallback; info->pRequest->body.param = info; - int32_t code = smlProcess(info, lines, perBatch); - lines += perBatch; + int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch); + if(lines){ + lines += perBatch; + } + if(rawLine){ + int num = 0; + while(rawLine < rawLineEnd){ + if(*(rawLine++) == '\n'){ + num++; + } + if(num == perBatch){ + break; + } + } + } if (code != TSDB_CODE_SUCCESS) { info->pRequest->body.queryFp(info, req, code); } } tsem_wait(¶ms.sem); -end: + end: taosThreadSpinDestroy(¶ms.lock); tsem_destroy(¶ms.sem); // ((STscObj *)taos)->schemalessType = 0; @@ -2618,3 +2614,75 @@ end: uDebug("resultend:%s", request->msgBuf); return (TAOS_RES *)request; } + +/** + * taos_schemaless_insert() parse and insert data points into database according to + * different protocol. + * + * @param $lines input array may contain multiple lines, each line indicates a data point. + * If protocol=2 is used input array should contain single JSON + * string(e.g. char *lines[] = {"$JSON_string"}). If need to insert + * multiple data points in JSON format, should include them in $JSON_string + * as a JSON array. + * @param $numLines indicates how many data points in $lines. + * If protocol = 2 is used this param will be ignored as $lines should + * contain single JSON string. + * @param $protocol indicates which protocol to use for parsing: + * 0 - influxDB line protocol + * 1 - OpenTSDB telnet line protocol + * 2 - OpenTSDB JSON format protocol + * @return return zero for successful insertion. Otherwise return none-zero error code of + * failure reason. + * + */ + +TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) { + if (NULL == taos) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return NULL; + } + + SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT); + if (!request) { + uError("SML:taos_schemaless_insert error request is null"); + return NULL; + } + + if (!lines || numLines <= 0) { + SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; + request->code = TSDB_CODE_SML_INVALID_DATA; + smlBuildInvalidDataMsg(&msg, "lines is null", NULL); + return (TAOS_RES *)request; + } + + return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision); +} + +TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision){ + if (NULL == taos) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return NULL; + } + + SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT); + if (!request) { + uError("SML:taos_schemaless_insert error request is null"); + return NULL; + } + + if (!lines || len <= 0) { + SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; + request->code = TSDB_CODE_SML_INVALID_DATA; + smlBuildInvalidDataMsg(&msg, "lines is null", NULL); + return (TAOS_RES *)request; + } + + int numLines = 0; + for(int i = 0; i < len; i++){ + if(lines[i] == '\n' || i == len - 1){ + numLines++; + } + } + *totalRows = numLines; + return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision); +}