feat:[TD-30975]process return value in schemaless

This commit is contained in:
wangmm0220 2024-07-16 11:50:40 +08:00
parent 101d1936e9
commit e76e06cf2e
4 changed files with 31 additions and 18 deletions

View File

@ -246,9 +246,9 @@ void smlBuildTsKv(SSmlKv *kv, int64_t ts);
int32_t smlParseEndTelnetJson(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv); int32_t smlParseEndTelnetJson(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv);
int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs); int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs);
static inline int32_t smlDoubleToInt64OverFlow(double num) { static inline bool smlDoubleToInt64OverFlow(double num) {
if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return 0; if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
return -1; return false;
} }
static inline void smlStrReplace(char* src, int32_t len){ static inline void smlStrReplace(char* src, int32_t len){

View File

@ -728,7 +728,7 @@ bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg) {
kvVal->type = TSDB_DATA_TYPE_FLOAT; kvVal->type = TSDB_DATA_TYPE_FLOAT;
kvVal->f = (float)result; kvVal->f = (float)result;
} else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) { } else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
if (smlDoubleToInt64OverFlow(result) == 0) { if (smlDoubleToInt64OverFlow(result)) {
errno = 0; errno = 0;
int64_t tmp = taosStr2Int64(pVal, &endptr, 10); int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
if (errno == ERANGE) { if (errno == ERANGE) {
@ -1467,7 +1467,7 @@ end:
taosHashCancelIterate(info->superTables, tmp); taosHashCancelIterate(info->superTables, tmp);
taosHashCleanup(hashTmp); taosHashCleanup(hashTmp);
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); (void)catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); // ignore refresh meta code if there is an error
uError("SML:0x%" PRIx64 " smlModifyDBSchemas end failed:%d:%s, format:%d, needModifySchema:%d", info->id, code, uError("SML:0x%" PRIx64 " smlModifyDBSchemas end failed:%d:%s, format:%d, needModifySchema:%d", info->id, code,
tstrerror(code), info->dataFormat, info->needModifySchema); tstrerror(code), info->dataFormat, info->needModifySchema);

View File

@ -739,7 +739,8 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
// notice!!! put ts back to tag to ensure get meta->precision // notice!!! put ts back to tag to ensure get meta->precision
int64_t ts = smlParseTSFromJSON(info, tsJson); int64_t ts = smlParseTSFromJSON(info, tsJson);
if (unlikely(ts < 0)) { if (unlikely(ts < 0)) {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id); char* tmp = cJSON_PrintUnformatted(tsJson);
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %s %" PRId64, info->id, info->msgBuf.buf,tmp, ts);
return TSDB_CODE_INVALID_TIMESTAMP; return TSDB_CODE_INVALID_TIMESTAMP;
} }
SSmlKv kvTs = {0}; SSmlKv kvTs = {0};

View File

@ -121,7 +121,8 @@ int smlProcess_json1_Test() {
char *sql1[1] = {0}; char *sql1[1] = {0};
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
sql1[i] = taosMemoryCalloc(1, 1024); sql1[i] = taosMemoryCalloc(1, 1024);
strncpy(sql1[i], sql[i], 1023); ASSERT(sql1[i] != NULL);
(void)strncpy(sql1[i], sql[i], 1023);
} }
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_JSON_PROTOCOL, pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_JSON_PROTOCOL,
@ -149,7 +150,8 @@ int smlProcess_json1_Test() {
char *sql3[1] = {0}; char *sql3[1] = {0};
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
sql3[i] = taosMemoryCalloc(1, 1024); sql3[i] = taosMemoryCalloc(1, 1024);
strncpy(sql3[i], sql2[i], 1023); ASSERT(sql3[i] != NULL);
(void)strncpy(sql3[i], sql2[i], 1023);
} }
pRes = taos_schemaless_insert(taos, (char **)sql3, sizeof(sql3) / sizeof(sql3[0]), TSDB_SML_JSON_PROTOCOL, pRes = taos_schemaless_insert(taos, (char **)sql3, sizeof(sql3) / sizeof(sql3[0]), TSDB_SML_JSON_PROTOCOL,
@ -176,7 +178,8 @@ int smlProcess_json1_Test() {
char *sql5[1] = {0}; char *sql5[1] = {0};
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
sql5[i] = taosMemoryCalloc(1, 1024); sql5[i] = taosMemoryCalloc(1, 1024);
strncpy(sql5[i], sql4[i], 1023); ASSERT(sql5[i] != NULL);
(void)strncpy(sql5[i], sql4[i], 1023);
} }
pRes = taos_schemaless_insert(taos, (char **)sql5, sizeof(sql5) / sizeof(sql5[0]), TSDB_SML_JSON_PROTOCOL, pRes = taos_schemaless_insert(taos, (char **)sql5, sizeof(sql5) / sizeof(sql5[0]), TSDB_SML_JSON_PROTOCOL,
@ -215,7 +218,8 @@ int smlProcess_json2_Test() {
char *sql1[1] = {0}; char *sql1[1] = {0};
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
sql1[i] = taosMemoryCalloc(1, 1024); sql1[i] = taosMemoryCalloc(1, 1024);
strncpy(sql1[i], sql[i], 1023); ASSERT(sql1[i] != NULL);
(void)strncpy(sql1[i], sql[i], 1023);
} }
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_JSON_PROTOCOL, pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_JSON_PROTOCOL,
@ -249,7 +253,8 @@ int smlProcess_json3_Test() {
char *sql1[1] = {0}; char *sql1[1] = {0};
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
sql1[i] = taosMemoryCalloc(1, 1024); sql1[i] = taosMemoryCalloc(1, 1024);
strncpy(sql1[i], sql[i], 1023); ASSERT(sql1[i] != NULL);
(void)strncpy(sql1[i], sql[i], 1023);
} }
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_JSON_PROTOCOL, pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_JSON_PROTOCOL,
@ -286,7 +291,8 @@ int smlProcess_json_tag_not_same_Test() {
char *sql1[1] = {0}; char *sql1[1] = {0};
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
sql1[i] = taosMemoryCalloc(1, 1024); sql1[i] = taosMemoryCalloc(1, 1024);
strncpy(sql1[i], sql[i], 1023); ASSERT(sql1[i] != NULL);
(void)strncpy(sql1[i], sql[i], 1023);
} }
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_JSON_PROTOCOL, pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_JSON_PROTOCOL,
@ -312,7 +318,8 @@ int smlProcess_json_tag_not_same_Test() {
char *sql3[1] = {0}; char *sql3[1] = {0};
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
sql3[i] = taosMemoryCalloc(1, 1024); sql3[i] = taosMemoryCalloc(1, 1024);
strncpy(sql3[i], sql2[i], 1023); ASSERT(sql3[i] != NULL);
(void)strncpy(sql3[i], sql2[i], 1023);
} }
pRes = taos_schemaless_insert(taos, (char **)sql3, sizeof(sql3) / sizeof(sql3[0]), TSDB_SML_JSON_PROTOCOL, pRes = taos_schemaless_insert(taos, (char **)sql3, sizeof(sql3) / sizeof(sql3[0]), TSDB_SML_JSON_PROTOCOL,
@ -338,7 +345,8 @@ int smlProcess_json_tag_not_same_Test() {
char *sql5[1] = {0}; char *sql5[1] = {0};
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
sql5[i] = taosMemoryCalloc(1, 1024); sql5[i] = taosMemoryCalloc(1, 1024);
strncpy(sql5[i], sql4[i], 1023); ASSERT(sql5[i] != NULL);
(void)strncpy(sql5[i], sql4[i], 1023);
} }
pRes = taos_schemaless_insert(taos, (char **)sql5, sizeof(sql5) / sizeof(sql5[0]), TSDB_SML_JSON_PROTOCOL, pRes = taos_schemaless_insert(taos, (char **)sql5, sizeof(sql5) / sizeof(sql5[0]), TSDB_SML_JSON_PROTOCOL,
@ -1147,7 +1155,8 @@ int sml_19221_Test() {
taos_free_result(pRes); taos_free_result(pRes);
char *tmp = (char *)taosMemoryCalloc(1024, 1); char *tmp = (char *)taosMemoryCalloc(1024, 1);
memcpy(tmp, sql[0], strlen(sql[0])); ASSERT(tmp != NULL);
(void)memcpy(tmp, sql[0], strlen(sql[0]));
*(char *)(tmp + 44) = 0; *(char *)(tmp + 44) = 0;
int32_t totalRows = 0; int32_t totalRows = 0;
pRes = taos_schemaless_insert_raw(taos, tmp, strlen(sql[0]), &totalRows, TSDB_SML_LINE_PROTOCOL, pRes = taos_schemaless_insert_raw(taos, tmp, strlen(sql[0]), &totalRows, TSDB_SML_LINE_PROTOCOL,
@ -1218,7 +1227,8 @@ int sml_ts3116_Test() {
taos_free_result(pRes); taos_free_result(pRes);
int32_t totalRows = 0; int32_t totalRows = 0;
char *tmp = (char *)taosMemoryCalloc(1024, 1); char *tmp = (char *)taosMemoryCalloc(1024, 1);
memcpy(tmp, sql, strlen(sql)); ASSERT(tmp != NULL);
(void)memcpy(tmp, sql, strlen(sql));
totalRows = 0; totalRows = 0;
pRes = taos_schemaless_insert_raw(taos, tmp, strlen(tmp), &totalRows, TSDB_SML_LINE_PROTOCOL, pRes = taos_schemaless_insert_raw(taos, tmp, strlen(tmp), &totalRows, TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS); TSDB_SML_TIMESTAMP_MILLI_SECONDS);
@ -1235,7 +1245,8 @@ int sml_ts3116_Test() {
taos_free_result(pRes); taos_free_result(pRes);
tmp = (char *)taosMemoryCalloc(1024, 1); tmp = (char *)taosMemoryCalloc(1024, 1);
memcpy(tmp, sql1, strlen(sql1)); ASSERT(tmp != NULL);
(void)memcpy(tmp, sql1, strlen(sql1));
totalRows = 0; totalRows = 0;
pRes = taos_schemaless_insert_raw(taos, tmp, strlen(tmp), &totalRows, TSDB_SML_LINE_PROTOCOL, pRes = taos_schemaless_insert_raw(taos, tmp, strlen(tmp), &totalRows, TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS); TSDB_SML_TIMESTAMP_MILLI_SECONDS);
@ -2049,7 +2060,8 @@ int sml_td29373_Test() {
char *sql3[1] = {0}; char *sql3[1] = {0};
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
sql3[i] = taosMemoryCalloc(1, 1024); sql3[i] = taosMemoryCalloc(1, 1024);
strncpy(sql3[i], sql2[i], 1023); ASSERT(sql3[i] != NULL);
(void)strncpy(sql3[i], sql2[i], 1023);
} }
pRes = taos_schemaless_insert(taos, (char **)sql3, sizeof(sql3) / sizeof(sql3[0]), TSDB_SML_JSON_PROTOCOL, pRes = taos_schemaless_insert(taos, (char **)sql3, sizeof(sql3) / sizeof(sql3[0]), TSDB_SML_JSON_PROTOCOL,