Merge branch '3.0' into merge/mainto3.0

This commit is contained in:
Shengliang Guan 2024-10-31 17:38:59 +08:00
commit f13d262a5a
36 changed files with 1396 additions and 2131 deletions

View File

@ -291,3 +291,4 @@ RESUME STREAM [IF EXISTS] [IGNORE UNTREATED] stream_name;
CREATE SNODE ON DNODE [id]
```
其中的 id 是集群中的 dnode 的序号。请注意选择的dnode流计算的中间状态将自动在其上进行备份。
从 3.3.4.0 版本开始,在多副本环境中创建流会进行 snode 的**存在性检查**,要求首先创建 snode。如果 snode 不存在,无法创建流。

View File

@ -81,13 +81,6 @@ typedef enum {
TSDB_SML_TIMESTAMP_NANO_SECONDS,
} TSDB_SML_TIMESTAMP_TYPE;
typedef enum TAOS_FIELD_T {
TAOS_FIELD_COL = 1,
TAOS_FIELD_TAG,
TAOS_FIELD_QUERY,
TAOS_FIELD_TBNAME,
} TAOS_FIELD_T;
typedef struct taosField {
char name[65];
int8_t type;
@ -102,15 +95,6 @@ typedef struct TAOS_FIELD_E {
int32_t bytes;
} TAOS_FIELD_E;
typedef struct TAOS_FIELD_ALL {
char name[65];
int8_t type;
uint8_t precision;
uint8_t scale;
int32_t bytes;
TAOS_FIELD_T field_type;
} TAOS_FIELD_ALL;
#ifdef WINDOWS
#define DLL_EXPORT __declspec(dllexport)
#else
@ -211,6 +195,13 @@ DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt);
typedef void TAOS_STMT2;
typedef enum TAOS_FIELD_T {
TAOS_FIELD_COL = 1,
TAOS_FIELD_TAG,
TAOS_FIELD_QUERY,
TAOS_FIELD_TBNAME,
} TAOS_FIELD_T;
typedef struct TAOS_STMT2_OPTION {
int64_t reqid;
bool singleStbInsert;
@ -241,9 +232,7 @@ DLL_EXPORT int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows);
DLL_EXPORT int taos_stmt2_close(TAOS_STMT2 *stmt);
DLL_EXPORT int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert);
DLL_EXPORT int taos_stmt2_get_fields(TAOS_STMT2 *stmt, TAOS_FIELD_T field_type, int *count, TAOS_FIELD_E **fields);
DLL_EXPORT int taos_stmt2_get_all_fields(TAOS_STMT2 *stmt, int *count, TAOS_FIELD_ALL **fields);
DLL_EXPORT void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_E *fields);
DLL_EXPORT void taos_stmt2_free_all_fields(TAOS_STMT2 *stmt, TAOS_FIELD_ALL *fields);
DLL_EXPORT TAOS_RES *taos_stmt2_result(TAOS_STMT2 *stmt);
DLL_EXPORT char *taos_stmt2_error(TAOS_STMT2 *stmt);
@ -262,17 +251,17 @@ DLL_EXPORT int64_t taos_affected_rows64(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
DLL_EXPORT int taos_print_row_with_size(char *str, uint32_t size, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
DLL_EXPORT void taos_stop_query(TAOS_RES *res);
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
DLL_EXPORT int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows);
DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows);
DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData);
DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex);
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
DLL_EXPORT int taos_print_row_with_size(char *str, uint32_t size, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
DLL_EXPORT void taos_stop_query(TAOS_RES *res);
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
DLL_EXPORT int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows);
DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows);
DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData);
DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex);
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);

View File

@ -92,6 +92,26 @@ extern "C" {
} \
}
#define SML_CHECK_CODE(CMD) \
code = (CMD); \
if (TSDB_CODE_SUCCESS != code) { \
lino = __LINE__; \
goto END; \
}
#define SML_CHECK_NULL(CMD) \
if (NULL == (CMD)) { \
code = terrno; \
lino = __LINE__; \
goto END; \
}
#define RETURN \
if (code != 0){ \
uError("%s failed code:%d line:%d", __FUNCTION__ , code, lino); \
} \
return code;
typedef enum {
SCHEMA_ACTION_NULL,
SCHEMA_ACTION_CREATE_STABLE,
@ -191,7 +211,6 @@ typedef struct {
cJSON *root; // for parse json
int8_t offset[OTD_JSON_FIELDS_NUM];
SSmlLineInfo *lines; // element is SSmlLineInfo
bool parseJsonByLib;
SArray *tagJsonArray;
SArray *valueJsonArray;
@ -211,13 +230,8 @@ typedef struct {
extern int64_t smlFactorNS[];
extern int64_t smlFactorS[];
typedef int32_t (*_equal_fn_sml)(const void *, const void *);
int32_t smlBuildSmlInfo(TAOS *taos, SSmlHandle **handle);
void smlDestroyInfo(SSmlHandle *info);
int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset);
int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset);
bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg);
void smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2);
int32_t smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg);
int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, uint8_t toPrecision);
@ -237,7 +251,7 @@ void smlDestroyTableInfo(void *para);
void freeSSmlKv(void* data);
int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
int32_t smlParseJSON(SSmlHandle *info, char *payload);
int32_t smlParseJSONExt(SSmlHandle *info, char *payload);
int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSmlSTableMeta** sMeta);
bool isSmlTagAligned(SSmlHandle *info, int cnt, SSmlKv *kv);
@ -246,7 +260,8 @@ int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements);
int32_t smlProcessSuperTable(SSmlHandle *info, SSmlLineInfo *elements);
int32_t smlJoinMeasureTag(SSmlLineInfo *elements);
void smlBuildTsKv(SSmlKv *kv, int64_t ts);
int32_t smlParseEndTelnetJson(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv);
int32_t smlParseEndTelnetJsonFormat(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv);
int32_t smlParseEndTelnetJsonUnFormat(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv);
int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs);
static inline bool smlDoubleToInt64OverFlow(double num) {

View File

@ -222,7 +222,6 @@ int stmtSetTbTags2(TAOS_STMT2 *stmt, TAOS_STMT2_BIND *tags);
int stmtBindBatch2(TAOS_STMT2 *stmt, TAOS_STMT2_BIND *bind, int32_t colIdx);
int stmtGetTagFields2(TAOS_STMT2 *stmt, int *nums, TAOS_FIELD_E **fields);
int stmtGetColFields2(TAOS_STMT2 *stmt, int *nums, TAOS_FIELD_E **fields);
int stmtGetColFieldsALL2(TAOS_STMT2 *stmt, int *nums, TAOS_FIELD_ALL **fields);
int stmtGetParamNum2(TAOS_STMT2 *stmt, int *nums);
int stmtGetParamTbName(TAOS_STMT2 *stmt, int *nums);
int stmtIsInsert2(TAOS_STMT2 *stmt, int *insert);

View File

@ -84,7 +84,7 @@ void taos_cleanup(void) {
taosCloseRef(id);
nodesDestroyAllocatorSet();
// cleanupAppInfo();
// cleanupAppInfo();
rpcCleanup();
tscDebug("rpc cleanup");
@ -388,6 +388,7 @@ void taos_free_result(TAOS_RES *res) {
tDeleteMqBatchMetaRsp(&pRsp->batchMetaRsp);
}
taosMemoryFree(pRsp);
}
void taos_kill_query(TAOS *taos) {
@ -483,7 +484,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
return taos_print_row_with_size(str, INT32_MAX, row, fields, num_fields);
}
int taos_print_row_with_size(char *str, uint32_t size, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
int taos_print_row_with_size(char *str, uint32_t size, TAOS_ROW row, TAOS_FIELD *fields, int num_fields){
int32_t len = 0;
for (int i = 0; i < num_fields; ++i) {
if (i > 0 && len < size - 1) {
@ -588,7 +589,7 @@ int taos_print_row_with_size(char *str, uint32_t size, TAOS_ROW row, TAOS_FIELD
break;
}
}
if (len < size) {
if (len < size){
str[len] = 0;
}
@ -2081,7 +2082,7 @@ int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert) {
}
int taos_stmt2_get_fields(TAOS_STMT2 *stmt, TAOS_FIELD_T field_type, int *count, TAOS_FIELD_E **fields) {
if (stmt == NULL || count == NULL) {
if (stmt == NULL || NULL == count) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
@ -2102,28 +2103,12 @@ int taos_stmt2_get_fields(TAOS_STMT2 *stmt, TAOS_FIELD_T field_type, int *count,
}
}
int taos_stmt2_get_all_fields(TAOS_STMT2 *stmt, int *count, TAOS_FIELD_ALL **fields) {
if (stmt == NULL || count == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtGetColFieldsALL2(stmt, count, fields);
}
void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_E *fields) {
(void)stmt;
if (!fields) return;
taosMemoryFree(fields);
}
DLL_EXPORT void taos_stmt2_free_all_fields(TAOS_STMT2 *stmt, TAOS_FIELD_ALL *fields) {
(void)stmt;
if (!fields) return;
taosMemoryFree(fields);
}
TAOS_RES *taos_stmt2_result(TAOS_STMT2 *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);

File diff suppressed because it is too large Load Diff

View File

@ -21,259 +21,10 @@
#define OTD_JSON_SUB_FIELDS_NUM 2
#define JUMP_JSON_SPACE(start) \
while (*(start)) { \
if (unlikely(*(start) > 32)) \
break; \
else \
(start)++; \
}
static int32_t smlJsonGetObj(char **payload) {
int leftBracketCnt = 0;
bool isInQuote = false;
while (**payload) {
if (**payload == '"' && *((*payload) - 1) != '\\') {
isInQuote = !isInQuote;
} else if (!isInQuote && unlikely(**payload == '{')) {
leftBracketCnt++;
(*payload)++;
continue;
} else if (!isInQuote && unlikely(**payload == '}')) {
leftBracketCnt--;
(*payload)++;
if (leftBracketCnt == 0) {
return 0;
} else if (leftBracketCnt < 0) {
return -1;
}
continue;
}
(*payload)++;
}
return -1;
}
int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset) {
int index = 0;
while (*(*start)) {
if ((*start)[0] != '"') {
(*start)++;
continue;
}
if (unlikely(index >= OTD_JSON_FIELDS_NUM)) {
uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start);
return TSDB_CODE_TSC_INVALID_JSON;
}
char *sTmp = *start;
if ((*start)[1] == 'm' && (*start)[2] == 'e' && (*start)[3] == 't' && (*start)[4] == 'r' && (*start)[5] == 'i' &&
(*start)[6] == 'c' && (*start)[7] == '"') {
(*start) += 8;
bool isInQuote = false;
while (*(*start)) {
if (unlikely(!isInQuote && *(*start) == '"')) {
(*start)++;
offset[index++] = *start - sTmp;
element->measure = (*start);
isInQuote = true;
continue;
}
if (unlikely(isInQuote && *(*start) == '"')) {
element->measureLen = (*start) - element->measure;
(*start)++;
break;
}
(*start)++;
}
} else if ((*start)[1] == 't' && (*start)[2] == 'i' && (*start)[3] == 'm' && (*start)[4] == 'e' &&
(*start)[5] == 's' && (*start)[6] == 't' && (*start)[7] == 'a' && (*start)[8] == 'm' &&
(*start)[9] == 'p' && (*start)[10] == '"') {
(*start) += 11;
bool hasColon = false;
while (*(*start)) {
if (unlikely(!hasColon && *(*start) == ':')) {
(*start)++;
JUMP_JSON_SPACE((*start))
offset[index++] = *start - sTmp;
element->timestamp = (*start);
if (*(*start) == '{') {
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->timestampLen = tmp - (*start);
*start = tmp;
}
break;
}
hasColon = true;
continue;
}
if (unlikely(hasColon && (*(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32))) {
element->timestampLen = (*start) - element->timestamp;
break;
}
(*start)++;
}
} else if ((*start)[1] == 'v' && (*start)[2] == 'a' && (*start)[3] == 'l' && (*start)[4] == 'u' &&
(*start)[5] == 'e' && (*start)[6] == '"') {
(*start) += 7;
bool hasColon = false;
while (*(*start)) {
if (unlikely(!hasColon && *(*start) == ':')) {
(*start)++;
JUMP_JSON_SPACE((*start))
offset[index++] = *start - sTmp;
element->cols = (*start);
if (*(*start) == '{') {
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->colsLen = tmp - (*start);
*start = tmp;
}
break;
}
hasColon = true;
continue;
}
if (unlikely(hasColon && (*(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32))) {
element->colsLen = (*start) - element->cols;
break;
}
(*start)++;
}
} else if ((*start)[1] == 't' && (*start)[2] == 'a' && (*start)[3] == 'g' && (*start)[4] == 's' &&
(*start)[5] == '"') {
(*start) += 6;
while (*(*start)) {
if (unlikely(*(*start) == ':')) {
(*start)++;
JUMP_JSON_SPACE((*start))
offset[index++] = *start - sTmp;
element->tags = (*start);
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->tagsLen = tmp - (*start);
*start = tmp;
}
break;
}
(*start)++;
}
}
if (*(*start) == '\0') {
break;
}
if (*(*start) == '}') {
(*start)++;
break;
}
(*start)++;
}
if (unlikely(index != OTD_JSON_FIELDS_NUM) || element->tags == NULL || element->cols == NULL ||
element->measure == NULL || element->timestamp == NULL) {
uError("elements != %d or element parse null", OTD_JSON_FIELDS_NUM);
return TSDB_CODE_TSC_INVALID_JSON;
}
return 0;
}
int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset) {
int index = 0;
while (*(*start)) {
if ((*start)[0] != '"') {
(*start)++;
continue;
}
if (unlikely(index >= OTD_JSON_FIELDS_NUM)) {
uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start);
return TSDB_CODE_TSC_INVALID_JSON;
}
if ((*start)[1] == 'm') {
(*start) += offset[index++];
element->measure = *start;
while (*(*start)) {
if (unlikely(*(*start) == '"')) {
element->measureLen = (*start) - element->measure;
(*start)++;
break;
}
(*start)++;
}
} else if ((*start)[1] == 't' && (*start)[2] == 'i') {
(*start) += offset[index++];
element->timestamp = *start;
if (*(*start) == '{') {
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->timestampLen = tmp - (*start);
*start = tmp;
}
} else {
while (*(*start)) {
if (unlikely(*(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32)) {
element->timestampLen = (*start) - element->timestamp;
break;
}
(*start)++;
}
}
} else if ((*start)[1] == 'v') {
(*start) += offset[index++];
element->cols = *start;
if (*(*start) == '{') {
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->colsLen = tmp - (*start);
*start = tmp;
}
} else {
while (*(*start)) {
if (unlikely(*(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32)) {
element->colsLen = (*start) - element->cols;
break;
}
(*start)++;
}
}
} else if ((*start)[1] == 't' && (*start)[2] == 'a') {
(*start) += offset[index++];
element->tags = (*start);
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->tagsLen = tmp - (*start);
*start = tmp;
}
}
if (*(*start) == '}') {
(*start)++;
break;
}
(*start)++;
}
if (unlikely(index != 0 && index != OTD_JSON_FIELDS_NUM)) {
uError("elements != %d", OTD_JSON_FIELDS_NUM);
return TSDB_CODE_TSC_INVALID_JSON;
}
return TSDB_CODE_SUCCESS;
}
static inline int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *metric, SSmlLineInfo *elements) {
elements->measureLen = strlen(metric->valuestring);
if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
uError("OTD:0x%" PRIx64 " Metric length is 0 or large than 192", info->id);
uError("SML:0x%" PRIx64 " Metric length is 0 or large than 192", info->id);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
@ -293,7 +44,7 @@ static int32_t smlGetJsonElements(cJSON *root, cJSON ***marks) {
child = child->next;
}
if (*marks[i] == NULL) {
uError("smlGetJsonElements error, not find mark:%d:%s", i, jsonName[i]);
uError("SML %s error, not find mark:%d:%s", __FUNCTION__, i, jsonName[i]);
return TSDB_CODE_TSC_INVALID_JSON;
}
}
@ -302,7 +53,7 @@ static int32_t smlGetJsonElements(cJSON *root, cJSON ***marks) {
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
if (strcasecmp(typeStr, "bool") != 0) {
uError("OTD:invalid type(%s) for JSON Bool", typeStr);
uError("SML:invalid type(%s) for JSON Bool", typeStr);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
pVal->type = TSDB_DATA_TYPE_BOOL;
@ -316,7 +67,7 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
// tinyint
if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
if (!IS_VALID_TINYINT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
uError("SML:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_TINYINT;
@ -327,7 +78,7 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
// smallint
if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
if (!IS_VALID_SMALLINT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
uError("SML:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_SMALLINT;
@ -338,7 +89,7 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
// int
if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
if (!IS_VALID_INT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
uError("SML:JSON value(%f) cannot fit in type(int)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_INT;
@ -362,7 +113,7 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
// float
if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
if (!IS_VALID_FLOAT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
uError("SML:JSON value(%f) cannot fit in type(float)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_FLOAT;
@ -379,7 +130,7 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
}
// if reach here means type is unsupported
uError("OTD:invalid type(%s) for JSON Number", typeStr);
uError("SML:invalid type(%s) for JSON Number", typeStr);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
@ -391,7 +142,7 @@ static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
} else if (strcasecmp(typeStr, "nchar") == 0) {
pVal->type = TSDB_DATA_TYPE_NCHAR;
} else {
uError("OTD:invalid type(%s) for JSON String", typeStr);
uError("SML:invalid type(%s) for JSON String", typeStr);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
pVal->length = strlen(value->valuestring);
@ -474,7 +225,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
case cJSON_String: {
int32_t ret = smlConvertJSONString(kv, "binary", root);
if (ret != TSDB_CODE_SUCCESS) {
uError("OTD:Failed to parse binary value from JSON Obj");
uError("SML:Failed to parse binary value from JSON Obj");
return ret;
}
break;
@ -482,7 +233,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
case cJSON_Object: {
int32_t ret = smlParseValueFromJSONObj(root, kv);
if (ret != TSDB_CODE_SUCCESS) {
uError("OTD:Failed to parse value from JSON Obj");
uError("SML:Failed to parse value from JSON Obj");
return ret;
}
break;
@ -511,7 +262,7 @@ static int32_t smlProcessTagJson(SSmlHandle *info, cJSON *tags){
}
size_t keyLen = strlen(tag->string);
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
uError("OTD:Tag key length is 0 or too large than 64");
uError("SML:Tag key length is 0 or too large than 64");
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
@ -539,28 +290,24 @@ static int32_t smlProcessTagJson(SSmlHandle *info, cJSON *tags){
}
static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo *elements) {
int32_t ret = 0;
if (is_same_child_table_telnet(elements, &info->preLine) == 0) {
elements->measureTag = info->preLine.measureTag;
return TSDB_CODE_SUCCESS;
}
int32_t code = 0;
int32_t lino = 0;
if(info->dataFormat){
ret = smlProcessSuperTable(info, elements);
if(ret != 0){
if(info->reRun){
return TSDB_CODE_SUCCESS;
}
return ret;
}
}
ret = smlProcessTagJson(info, tags);
if(ret != 0){
if(info->reRun){
return TSDB_CODE_SUCCESS;
}
return ret;
}
ret = smlJoinMeasureTag(elements);
if(ret != 0){
return ret;
SML_CHECK_CODE(smlProcessSuperTable(info, elements));
}
SML_CHECK_CODE(smlProcessTagJson(info, tags));
SML_CHECK_CODE(smlJoinMeasureTag(elements));
return smlProcessChildTable(info, elements);
END:
if(info->reRun){
return TSDB_CODE_SUCCESS;
}
RETURN
}
static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPrecision) {
@ -678,7 +425,8 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) {
}
static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
cJSON *metricJson = NULL;
cJSON *tsJson = NULL;
@ -688,57 +436,27 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
int32_t size = cJSON_GetArraySize(root);
// outmost json fields has to be exactly 4
if (size != OTD_JSON_FIELDS_NUM) {
uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
uError("SML:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON **marks[OTD_JSON_FIELDS_NUM] = {&metricJson, &tsJson, &valueJson, &tagsJson};
ret = smlGetJsonElements(root, marks);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret;
}
SML_CHECK_CODE(smlGetJsonElements(root, marks));
// Parse metric
ret = smlParseMetricFromJSON(info, metricJson, elements);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
return ret;
}
SML_CHECK_CODE(smlParseMetricFromJSON(info, metricJson, elements));
// Parse metric value
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN};
ret = smlParseValueFromJSON(valueJson, &kv);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
return ret;
}
SML_CHECK_CODE(smlParseValueFromJSON(valueJson, &kv));
// Parse tags
bool needFree = info->dataFormat;
elements->tags = cJSON_PrintUnformatted(tagsJson);
if (elements->tags == NULL){
return TSDB_CODE_OUT_OF_MEMORY;
}
elements->tagsLen = strlen(elements->tags);
if (is_same_child_table_telnet(elements, &info->preLine) != 0) {
ret = smlParseTagsFromJSON(info, tagsJson, elements);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
taosMemoryFree(elements->tags);
elements->tags = NULL;
return ret;
}
} else {
elements->measureTag = info->preLine.measureTag;
}
SML_CHECK_NULL(elements->tags);
if (needFree) {
taosMemoryFree(elements->tags);
elements->tags = NULL;
}
elements->tagsLen = strlen(elements->tags);
SML_CHECK_CODE(smlParseTagsFromJSON(info, tagsJson, elements));
if (unlikely(info->reRun)) {
return TSDB_CODE_SUCCESS;
goto END;
}
// Parse timestamp
@ -747,29 +465,34 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
if (unlikely(ts < 0)) {
char* tmp = cJSON_PrintUnformatted(tsJson);
if (tmp == NULL) {
uError("cJSON_PrintUnformatted failed since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %" PRId64, info->id, info->msgBuf.buf, ts);
uError("SML:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %" PRId64, info->id, info->msgBuf.buf, ts);
} else {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %s %" PRId64, info->id, info->msgBuf.buf,tmp, ts);
uError("SML:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %s %" PRId64, info->id, info->msgBuf.buf,tmp, ts);
taosMemoryFree(tmp);
}
return TSDB_CODE_INVALID_TIMESTAMP;
SML_CHECK_CODE(TSDB_CODE_INVALID_TIMESTAMP);
}
SSmlKv kvTs = {0};
smlBuildTsKv(&kvTs, ts);
if (info->dataFormat){
code = smlParseEndTelnetJsonFormat(info, elements, &kvTs, &kv);
} else {
code = smlParseEndTelnetJsonUnFormat(info, elements, &kvTs, &kv);
}
SML_CHECK_CODE(code);
taosMemoryFreeClear(info->preLine.tags);
info->preLine = *elements;
elements->tags = NULL;
return smlParseEndTelnetJson(info, elements, &kvTs, &kv);
END:
taosMemoryFree(elements->tags);
RETURN
}
static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
int32_t payloadNum = 0;
int32_t ret = TSDB_CODE_SUCCESS;
if (unlikely(payload == NULL)) {
uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
return TSDB_CODE_TSC_INVALID_JSON;
}
info->root = cJSON_Parse(payload);
if (unlikely(info->root == NULL)) {
uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
@ -782,27 +505,11 @@ static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
} else if (cJSON_IsObject(info->root)) {
payloadNum = 1;
} else {
uError("SML:0x%" PRIx64 " Invalid JSON Payload 3:%s", info->id, payload);
uError("SML:0x%" PRIx64 " Invalid JSON type:%s", info->id, payload);
return TSDB_CODE_TSC_INVALID_JSON;
}
if (unlikely(info->lines != NULL)) {
for (int i = 0; i < info->lineNum; i++) {
taosArrayDestroyEx(info->lines[i].colArray, freeSSmlKv);
if (info->lines[i].measureTagsLen != 0) taosMemoryFree(info->lines[i].measureTag);
}
taosMemoryFree(info->lines);
info->lines = NULL;
}
info->lineNum = payloadNum;
info->dataFormat = true;
ret = smlClearForRerun(info);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
info->parseJsonByLib = true;
cJSON *head = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : info->root->child;
int cnt = 0;
@ -811,6 +518,7 @@ static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
if (info->dataFormat) {
SSmlLineInfo element = {0};
ret = smlParseJSONStringExt(info, dataPoint, &element);
if (element.measureTagsLen != 0) taosMemoryFree(element.measureTag);
} else {
ret = smlParseJSONStringExt(info, dataPoint, info->lines + cnt);
}
@ -836,164 +544,3 @@ static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
if (info->offset[0] == 0) {
ret = smlJsonParseObjFirst(start, elements, info->offset);
} else {
ret = smlJsonParseObj(start, elements, info->offset);
}
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
if (unlikely(**start == '\0' && elements->measure == NULL)) return TSDB_CODE_SUCCESS;
if (unlikely(IS_INVALID_TABLE_LEN(elements->measureLen))) {
smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = (size_t)elements->colsLen};
if (unlikely(elements->colsLen == 0)) {
uError("SML:colsLen == 0");
return TSDB_CODE_TSC_INVALID_VALUE;
} else if (unlikely(elements->cols[0] == '{')) {
char tmp = elements->cols[elements->colsLen];
elements->cols[elements->colsLen] = '\0';
cJSON *valueJson = cJSON_Parse(elements->cols);
if (unlikely(valueJson == NULL)) {
uError("SML:0x%" PRIx64 " parse json cols failed:%s", info->id, elements->cols);
elements->cols[elements->colsLen] = tmp;
return TSDB_CODE_TSC_INVALID_JSON;
}
if (taosArrayPush(info->tagJsonArray, &valueJson) == NULL){
cJSON_Delete(valueJson);
elements->cols[elements->colsLen] = tmp;
return terrno;
}
ret = smlParseValueFromJSONObj(valueJson, &kv);
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " Failed to parse value from JSON Obj:%s", info->id, elements->cols);
elements->cols[elements->colsLen] = tmp;
return TSDB_CODE_TSC_INVALID_VALUE;
}
elements->cols[elements->colsLen] = tmp;
} else if (smlParseValue(&kv, &info->msgBuf) != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " cols invalidate:%s", info->id, elements->cols);
return TSDB_CODE_TSC_INVALID_VALUE;
}
// Parse tags
if (is_same_child_table_telnet(elements, &info->preLine) != 0) {
char tmp = *(elements->tags + elements->tagsLen);
*(elements->tags + elements->tagsLen) = 0;
cJSON *tagsJson = cJSON_Parse(elements->tags);
*(elements->tags + elements->tagsLen) = tmp;
if (unlikely(tagsJson == NULL)) {
uError("SML:0x%" PRIx64 " parse json tag failed:%s", info->id, elements->tags);
return TSDB_CODE_TSC_INVALID_JSON;
}
if (taosArrayPush(info->tagJsonArray, &tagsJson) == NULL){
cJSON_Delete(tagsJson);
uError("SML:0x%" PRIx64 " taosArrayPush failed", info->id);
return terrno;
}
ret = smlParseTagsFromJSON(info, tagsJson, elements);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
return ret;
}
} else {
elements->measureTag = info->preLine.measureTag;
}
if (unlikely(info->reRun)) {
return TSDB_CODE_SUCCESS;
}
// Parse timestamp
// notice!!! put ts back to tag to ensure get meta->precision
int64_t ts = 0;
if (unlikely(elements->timestampLen == 0)) {
uError("OTD:0x%" PRIx64 " elements->timestampLen == 0", info->id);
return TSDB_CODE_INVALID_TIMESTAMP;
} else if (elements->timestamp[0] == '{') {
char tmp = elements->timestamp[elements->timestampLen];
elements->timestamp[elements->timestampLen] = '\0';
cJSON *tsJson = cJSON_Parse(elements->timestamp);
ts = smlParseTSFromJSON(info, tsJson);
if (unlikely(ts < 0)) {
uError("SML:0x%" PRIx64 " Unable to parse timestamp from JSON payload:%s", info->id, elements->timestamp);
elements->timestamp[elements->timestampLen] = tmp;
cJSON_Delete(tsJson);
return TSDB_CODE_INVALID_TIMESTAMP;
}
elements->timestamp[elements->timestampLen] = tmp;
cJSON_Delete(tsJson);
} else {
ts = smlParseOpenTsdbTime(info, elements->timestamp, elements->timestampLen);
if (unlikely(ts < 0)) {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
return TSDB_CODE_INVALID_TIMESTAMP;
}
}
SSmlKv kvTs = {0};
smlBuildTsKv(&kvTs, ts);
return smlParseEndTelnetJson(info, elements, &kvTs, &kv);
}
int32_t smlParseJSON(SSmlHandle *info, char *payload) {
int32_t payloadNum = 1 << 15;
int32_t ret = TSDB_CODE_SUCCESS;
uDebug("SML:0x%" PRIx64 "json:%s", info->id, payload);
int cnt = 0;
char *dataPointStart = payload;
while (1) {
if (info->dataFormat) {
SSmlLineInfo element = {0};
ret = smlParseJSONString(info, &dataPointStart, &element);
if (element.measureTagsLen != 0) taosMemoryFree(element.measureTag);
} else {
if (cnt >= payloadNum) {
payloadNum = payloadNum << 1;
void *tmp = taosMemoryRealloc(info->lines, payloadNum * sizeof(SSmlLineInfo));
if (tmp == NULL) {
ret = terrno;
return ret;
}
info->lines = (SSmlLineInfo *)tmp;
(void)memset(info->lines + cnt, 0, (payloadNum - cnt) * sizeof(SSmlLineInfo));
}
ret = smlParseJSONString(info, &dataPointStart, info->lines + cnt);
if ((info->lines + cnt)->measure == NULL) break;
}
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("SML:0x%" PRIx64 " Invalid JSON Payload 1:%s", info->id, payload);
return smlParseJSONExt(info, payload);
}
if (unlikely(info->reRun)) {
cnt = 0;
dataPointStart = payload;
info->lineNum = payloadNum;
ret = smlClearForRerun(info);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
continue;
}
cnt++;
if (*dataPointStart == '\0') break;
}
info->lineNum = cnt;
return TSDB_CODE_SUCCESS;
}

View File

@ -63,7 +63,7 @@ static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t le
int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
if (unlikely(ts == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
smlBuildInvalidDataMsg(&info->msgBuf, "SML line invalid timestamp", data);
return TSDB_CODE_SML_INVALID_DATA;
}
return ts;
@ -84,7 +84,7 @@ int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
}
if (pVal->value[0] == 'l' || pVal->value[0] == 'L') { // nchar
if (pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"' && pVal->length >= 3) {
if (pVal->length >= NCHAR_ADD_LEN && pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"') {
pVal->type = TSDB_DATA_TYPE_NCHAR;
pVal->length -= NCHAR_ADD_LEN;
if (pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
@ -97,7 +97,7 @@ int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
}
if (pVal->value[0] == 'g' || pVal->value[0] == 'G') { // geometry
if (pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"' && pVal->length >= sizeof("POINT")+3) {
if (pVal->length >= NCHAR_ADD_LEN && pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"') {
int32_t code = initCtxGeomFromText();
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -124,7 +124,7 @@ int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
}
if (pVal->value[0] == 'b' || pVal->value[0] == 'B') { // varbinary
if (pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"' && pVal->length >= 3) {
if (pVal->length >= NCHAR_ADD_LEN && pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"') {
pVal->type = TSDB_DATA_TYPE_VARBINARY;
if(isHex(pVal->value + NCHAR_ADD_LEN - 1, pVal->length - NCHAR_ADD_LEN)){
if(!isValidateHex(pVal->value + NCHAR_ADD_LEN - 1, pVal->length - NCHAR_ADD_LEN)){
@ -298,7 +298,7 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){
}
if (info->dataFormat && !isSmlTagAligned(info, cnt, &kv)) {
return TSDB_CODE_TSC_INVALID_JSON;
return TSDB_CODE_SML_INVALID_DATA;
}
cnt++;
@ -311,31 +311,24 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){
}
static int32_t smlParseTagLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *elements) {
int32_t code = 0;
int32_t lino = 0;
bool isSameCTable = IS_SAME_CHILD_TABLE;
if(isSameCTable){
return TSDB_CODE_SUCCESS;
}
int32_t ret = 0;
if(info->dataFormat){
ret = smlProcessSuperTable(info, elements);
if(ret != 0){
if(info->reRun){
return TSDB_CODE_SUCCESS;
}
return ret;
}
SML_CHECK_CODE(smlProcessSuperTable(info, elements));
}
ret = smlProcessTagLine(info, sql, sqlEnd);
if(ret != 0){
if (info->reRun){
return TSDB_CODE_SUCCESS;
}
return ret;
}
SML_CHECK_CODE(smlProcessTagLine(info, sql, sqlEnd));
return smlProcessChildTable(info, elements);
END:
if(info->reRun){
return TSDB_CODE_SUCCESS;
}
RETURN
}
static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *currElement) {
@ -353,7 +346,7 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
const char *escapeChar = NULL;
while (*sql < sqlEnd) {
if (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
smlBuildInvalidDataMsg(&info->msgBuf, "SML line invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA;
}
if (unlikely(IS_EQUAL(*sql,escapeChar))) {
@ -370,7 +363,7 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
}
if (unlikely(IS_INVALID_COL_LEN(keyLen - keyLenEscaped))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
smlBuildInvalidDataMsg(&info->msgBuf, "SML line invalid key or key is too long than 64", key);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
@ -404,18 +397,18 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
valueLen = *sql - value;
if (unlikely(quoteNum != 0 && quoteNum != 2)) {
smlBuildInvalidDataMsg(&info->msgBuf, "unbalanced quotes", value);
smlBuildInvalidDataMsg(&info->msgBuf, "SML line unbalanced quotes", value);
return TSDB_CODE_SML_INVALID_DATA;
}
if (unlikely(valueLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
smlBuildInvalidDataMsg(&info->msgBuf, "SML line invalid value", value);
return TSDB_CODE_SML_INVALID_DATA;
}
SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen};
int32_t ret = smlParseValue(&kv, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlParseValue error", value);
uError("SML:0x%" PRIx64 " %s parse value error:%d.", info->id, __FUNCTION__, ret);
return ret;
}
@ -437,11 +430,6 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
}
(void)memcpy(tmp, kv.value, kv.length);
PROCESS_SLASH_IN_FIELD_VALUE(tmp, kv.length);
if(kv.type == TSDB_DATA_TYPE_GEOMETRY) {
uError("SML:0x%" PRIx64 " smlParseColLine error, invalid GEOMETRY type.", info->id);
taosMemoryFree((void*)kv.value);
return TSDB_CODE_TSC_INVALID_VALUE;
}
if(kv.type == TSDB_DATA_TYPE_VARBINARY){
taosMemoryFree((void*)kv.value);
}
@ -510,7 +498,7 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
}
elements->measureLen = sql - elements->measure;
if (unlikely(IS_INVALID_TABLE_LEN(elements->measureLen - measureLenEscaped))) {
smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
smlBuildInvalidDataMsg(&info->msgBuf, "SML line measure is empty or too large than 192", NULL);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
@ -557,7 +545,7 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
elements->colsLen = sql - elements->cols;
if (unlikely(elements->colsLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "cols is empty", NULL);
smlBuildInvalidDataMsg(&info->msgBuf, "SML line cols is empty", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
@ -574,7 +562,7 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
int64_t ts = smlParseInfluxTime(info, elements->timestamp, elements->timestampLen);
if (unlikely(ts <= 0)) {
uError("SML:0x%" PRIx64 " smlParseTS error:%" PRId64, info->id, ts);
uError("SML:0x%" PRIx64 " %s error:%" PRId64, info->id, __FUNCTION__, ts);
return TSDB_CODE_INVALID_TIMESTAMP;
}

View File

@ -148,31 +148,21 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
return TSDB_CODE_SUCCESS;
}
int32_t ret = 0;
int32_t code = 0;
int32_t lino = 0;
if(info->dataFormat){
ret = smlProcessSuperTable(info, elements);
if(ret != 0){
if(info->reRun){
return TSDB_CODE_SUCCESS;
}
return ret;
}
SML_CHECK_CODE(smlProcessSuperTable(info, elements));
}
SML_CHECK_CODE(smlProcessTagTelnet(info, data, sqlEnd));
SML_CHECK_CODE(smlJoinMeasureTag(elements));
ret = smlProcessTagTelnet(info, data, sqlEnd);
if(ret != 0){
if (info->reRun){
return TSDB_CODE_SUCCESS;
}
return ret;
code = smlProcessChildTable(info, elements);
END:
if(info->reRun){
return TSDB_CODE_SUCCESS;
}
ret = smlJoinMeasureTag(elements);
if(ret != 0){
return ret;
}
return smlProcessChildTable(info, elements);
RETURN
}
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
@ -182,14 +172,14 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
// parse metric
smlParseTelnetElement(&sql, sqlEnd, &elements->measure, &elements->measureLen);
if (unlikely((!(elements->measure) || IS_INVALID_TABLE_LEN(elements->measureLen)))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
smlBuildInvalidDataMsg(&info->msgBuf, "SML telnet invalid measure", sql);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
// parse timestamp
smlParseTelnetElement(&sql, sqlEnd, &elements->timestamp, &elements->timestampLen);
if (unlikely(!elements->timestamp || elements->timestampLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
smlBuildInvalidDataMsg(&info->msgBuf, "SML telnet invalid timestamp", sql);
return TSDB_CODE_SML_INVALID_DATA;
}
@ -199,19 +189,21 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
}
int64_t ts = smlParseOpenTsdbTime(info, elements->timestamp, elements->timestampLen);
if (unlikely(ts < 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
smlBuildInvalidDataMsg(&info->msgBuf, "SML telnet parse timestamp failed", sql);
return TSDB_CODE_INVALID_TIMESTAMP;
}
// parse value
smlParseTelnetElement(&sql, sqlEnd, &elements->cols, &elements->colsLen);
if (unlikely(!elements->cols || elements->colsLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
smlBuildInvalidDataMsg(&info->msgBuf, "SML telnet invalid value", sql);
return TSDB_CODE_TSC_INVALID_VALUE;
}
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = (size_t)elements->colsLen};
if (smlParseValue(&kv, &info->msgBuf) != TSDB_CODE_SUCCESS) {
int ret = smlParseValue(&kv, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " %s parse value error:%d.", info->id, __FUNCTION__, ret);
return TSDB_CODE_TSC_INVALID_VALUE;
}
@ -220,11 +212,11 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
elements->tags = sql;
elements->tagsLen = sqlEnd - sql;
if (unlikely(!elements->tags || elements->tagsLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
smlBuildInvalidDataMsg(&info->msgBuf, "SML telnet invalid tag value", sql);
return TSDB_CODE_TSC_INVALID_VALUE;
}
int ret = smlParseTelnetTags(info, sql, sqlEnd, elements);
ret = smlParseTelnetTags(info, sql, sqlEnd, elements);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret;
}
@ -239,5 +231,12 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
}
return smlParseEndTelnetJson(info, elements, &kvTs, &kv);
if (info->dataFormat){
ret = smlParseEndTelnetJsonFormat(info, elements, &kvTs, &kv);
} else {
ret = smlParseEndTelnetJsonUnFormat(info, elements, &kvTs, &kv);
}
info->preLine = *elements;
return ret;
}

View File

@ -1068,48 +1068,6 @@ static int stmtFetchColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_E
return TSDB_CODE_SUCCESS;
}
static int stmtFetchFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_ALL** fields) {
SBoundColInfo* tags = (SBoundColInfo*)pStmt->bInfo.boundTags;
STableMeta* meta = ((SVnodeModifyOpStmt*)(pStmt->sql.pQuery->pRoot))->pTableMeta;
if (tags == NULL || meta == NULL || (meta->schema == NULL && tags->numOfBound != 0)) {
return TSDB_CODE_INVALID_PARA;
}
if (fields != NULL) {
*fields = taosMemoryCalloc(tags->numOfBound, sizeof(TAOS_FIELD_ALL));
if (NULL == *fields) {
return terrno;
}
SSchema* pSchema = meta->schema;
int32_t tbnameIdx = meta->tableInfo.numOfTags + meta->tableInfo.numOfColumns;
for (int32_t i = 0; i < tags->numOfBound; ++i) {
int16_t idx = tags->pColIndex[i];
if (idx == tbnameIdx) {
(*fields)[i].field_type = TAOS_FIELD_TBNAME;
tstrncpy((*fields)[i].name, "tbname", sizeof((*fields)[i].name));
continue;
} else if (idx < meta->tableInfo.numOfColumns) {
(*fields)[i].field_type = TAOS_FIELD_COL;
} else {
(*fields)[i].field_type = TAOS_FIELD_TAG;
}
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema[tags->pColIndex[i]].type) {
(*fields)[i].precision = meta->tableInfo.precision;
}
tstrncpy((*fields)[i].name, pSchema[tags->pColIndex[i]].name, sizeof((*fields)[i].name));
(*fields)[i].type = pSchema[tags->pColIndex[i]].type;
(*fields)[i].bytes = pSchema[tags->pColIndex[i]].bytes;
}
}
*fieldNum = tags->numOfBound;
return TSDB_CODE_SUCCESS;
}
/*
SArray* stmtGetFreeCol(STscStmt2* pStmt, int32_t* idx) {
while (true) {
@ -1893,50 +1851,6 @@ _return:
return code;
}
int stmtGetColFieldsALL2(TAOS_STMT2* stmt, int* nums, TAOS_FIELD_ALL** fields) {
int32_t code = 0;
STscStmt2* pStmt = (STscStmt2*)stmt;
int32_t preCode = pStmt->errCode;
STMT_DLOG_E("start to get col fields");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false;
}
if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL;
STMT_ERR_RET(stmtCreateRequest(pStmt));
}
STMT_ERRI_JRET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERRI_JRET(stmtParseSql(pStmt));
}
_return:
pStmt->errCode = preCode;
if (code == TSDB_CODE_TSC_INVALID_OPERATION) {
return stmtFetchFields2(stmt, nums, fields);
}
return code;
}
int stmtGetParamNum2(TAOS_STMT2* stmt, int* nums) {
STscStmt2* pStmt = (STscStmt2*)stmt;

View File

@ -68,6 +68,15 @@ TEST(testCase, smlParseInfluxString_Test) {
taosArrayDestroy(elements.colArray);
elements.colArray = nullptr;
// case 0 false
tmp = "st,t1=3 c3=\"";
(void)memcpy(sql, tmp, strlen(tmp) + 1);
(void)memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseInfluxString(info, sql, sql + strlen(sql), &elements);
ASSERT_NE(ret, 0);
taosArrayDestroy(elements.colArray);
elements.colArray = nullptr;
// case 2 false
tmp = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000";
(void)memcpy(sql, tmp, strlen(tmp) + 1);
@ -591,6 +600,104 @@ TEST(testCase, smlParseTelnetLine_Test) {
// smlDestroyInfo(info);
//}
bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->length;
char *endptr = NULL;
double result = taosStr2Double(pVal, &endptr);
if (pVal == endptr) {
smlBuildInvalidDataMsg(msg, "invalid data", pVal);
return false;
}
int32_t left = len - (endptr - pVal);
if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
kvVal->type = TSDB_DATA_TYPE_DOUBLE;
kvVal->d = result;
} else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
if (!IS_VALID_FLOAT(result)) {
smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_FLOAT;
kvVal->f = (float)result;
} else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
if (smlDoubleToInt64OverFlow(result)) {
errno = 0;
int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
if (errno == ERANGE) {
smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_BIGINT;
kvVal->i = tmp;
return true;
}
kvVal->type = TSDB_DATA_TYPE_BIGINT;
kvVal->i = (int64_t)result;
} else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
if (result >= (double)UINT64_MAX || result < 0) {
errno = 0;
uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
if (errno == ERANGE || result < 0) {
smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_UBIGINT;
kvVal->u = tmp;
return true;
}
kvVal->type = TSDB_DATA_TYPE_UBIGINT;
kvVal->u = result;
} else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
if (!IS_VALID_INT(result)) {
smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_INT;
kvVal->i = result;
} else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
if (!IS_VALID_UINT(result)) {
smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_UINT;
kvVal->u = result;
} else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
if (!IS_VALID_SMALLINT(result)) {
smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_SMALLINT;
kvVal->i = result;
} else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
if (!IS_VALID_USMALLINT(result)) {
smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_USMALLINT;
kvVal->u = result;
} else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
if (!IS_VALID_TINYINT(result)) {
smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_TINYINT;
kvVal->i = result;
} else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
if (!IS_VALID_UTINYINT(result)) {
smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_UTINYINT;
kvVal->u = result;
} else {
smlBuildInvalidDataMsg(msg, "invalid data", pVal);
return false;
}
return true;
}
TEST(testCase, smlParseNumber_performance_Test) {
char msg[256] = {0};
SSmlMsgBuf msgBuf;

View File

@ -133,6 +133,7 @@ int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgr
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj** pStream);
bool mndStreamNodeIsUpdated(SMnode *pMnode);
int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb);
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
int32_t mndProcessStreamHb(SRpcMsg *pReq);

View File

@ -795,12 +795,22 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
}
if (createReq.sql != NULL) {
sqlLen = strlen(createReq.sql);
sql = taosMemoryMalloc(sqlLen + 1);
sql = taosStrdup(createReq.sql);
TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
}
memset(sql, 0, sqlLen + 1);
memcpy(sql, createReq.sql, sqlLen);
SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
if (pSourceDb == NULL) {
code = terrno;
mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
tstrerror(code));
goto _OVER;
}
code = mndCheckForSnode(pMnode, pSourceDb);
mndReleaseDb(pMnode, pSourceDb);
if (code != 0) {
goto _OVER;
}
// build stream obj from request

View File

@ -1497,6 +1497,30 @@ bool mndStreamNodeIsUpdated(SMnode *pMnode) {
return updated;
}
int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SSnodeObj *pObj = NULL;
if (pSrcDb->cfg.replications == 1) {
return TSDB_CODE_SUCCESS;
} else {
while (1) {
pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter == NULL) {
break;
}
sdbRelease(pSdb, pObj);
sdbCancelFetch(pSdb, pIter);
return TSDB_CODE_SUCCESS;
}
mError("snode not existed when trying to create stream in db with multiple replica");
return TSDB_CODE_SNODE_NOT_DEPLOYED;
}
}
uint32_t seed = 0;
static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) {
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};

View File

@ -342,7 +342,10 @@ typedef struct {
rocksdb_writeoptions_t *writeoptions;
rocksdb_readoptions_t *readoptions;
rocksdb_writebatch_t *writebatch;
TdThreadMutex writeBatchMutex;
TdThreadMutex writeBatchMutex;
int32_t sver;
tb_uid_t suid;
tb_uid_t uid;
STSchema *pTSchema;
} SRocksCache;

View File

@ -222,6 +222,7 @@ int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t
int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, bool hasPrimayKey);
int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type);
int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey);
void tsdbCacheInvalidateSchema(STsdb* pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);

View File

@ -620,6 +620,8 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
}
}
if (uids) taosArrayDestroy(uids);
tsdbCacheInvalidateSchema(pTsdb, pReq->suid, -1, pReq->schemaRow.version);
}
metaWLock(pMeta);
@ -1945,6 +1947,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
break;
}
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
tsdbCacheInvalidateSchema(pMeta->pVnode->pTsdb, 0, entry.uid, pSchema->version);
}
entry.version = version;
// do actual write

View File

@ -221,7 +221,7 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6) ;
TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6);
pTsdb->rCache.writebatch = writebatch;
pTsdb->rCache.my_comparator = cmp;
@ -230,6 +230,9 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
pTsdb->rCache.readoptions = readoptions;
pTsdb->rCache.flushoptions = flushoptions;
pTsdb->rCache.db = db;
pTsdb->rCache.sver = -1;
pTsdb->rCache.suid = -1;
pTsdb->rCache.uid = -1;
pTsdb->rCache.pTSchema = NULL;
TAOS_RETURN(code);
@ -1132,19 +1135,17 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
(void)taosThreadMutexLock(&pTsdb->lruMutex);
for (int i = 0; i < num_keys; ++i) {
SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i);
int8_t lflag = updCtx->lflag;
SRowKey *pRowKey = &updCtx->tsdbRowKey.key;
SColVal *pColVal = &updCtx->colVal;
SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i];
int8_t lflag = updCtx->lflag;
SRowKey *pRowKey = &updCtx->tsdbRowKey.key;
SColVal *pColVal = &updCtx->colVal;
if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
continue;
}
SLastKey *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
size_t klen = ROCKS_KEY_LEN;
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
@ -1299,53 +1300,94 @@ _exit:
TAOS_RETURN(code);
}
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
SRocksCache *pRCache = &pTsdb->rCache;
if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
if (suid > 0 && suid == pRCache->suid) {
pRCache->sver = -1;
pRCache->suid = -1;
}
if (suid == 0 && uid == pRCache->uid) {
pRCache->sver = -1;
pRCache->uid = -1;
}
}
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
SRocksCache *pRCache = &pTsdb->rCache;
if (pRCache->pTSchema && sver == pRCache->sver) {
if (suid > 0 && suid == pRCache->suid) {
return 0;
}
if (suid == 0 && uid == pRCache->uid) {
return 0;
}
}
pRCache->suid = suid;
pRCache->uid = uid;
pRCache->sver = sver;
tDestroyTSchema(pRCache->pTSchema);
return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pRCache->pTSchema);
}
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
SRow **aRow) {
int32_t code = 0, lino = 0;
// 1. prepare last
TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
STSchema *pTSchema = NULL;
int32_t sver = TSDBROW_SVERSION(&lRow);
SArray *ctxArray = NULL;
SSHashObj *iColHash = NULL;
TAOS_CHECK_GOTO(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema), &lino, _exit);
TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
pTSchema = pTsdb->rCache.pTSchema;
TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
int32_t nCol = pTSchema->numOfCols;
ctxArray = taosArrayInit(nCol, sizeof(SLastUpdateCtx));
iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
ctxArray = taosArrayInit(nCol * 2, sizeof(SLastUpdateCtx));
if (ctxArray == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
}
// 1. prepare by lrow
STsdbRowKey tsdbRowKey = {0};
tsdbRowGetKey(&lRow, &tsdbRowKey);
STSDBRowIter iter = {0};
code = tsdbRowIterOpen(&iter, &lRow, pTSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("vgId:%d, %s tsdbRowIterOpen failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
tstrerror(code));
TAOS_CHECK_GOTO(code, &lino, _exit);
}
TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
int32_t iCol = 0;
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
if (!taosArrayPush(ctxArray, &updateCtx)) {
tsdbRowClose(&iter);
TAOS_CHECK_GOTO(terrno, &lino, _exit);
}
if (!COL_VAL_IS_VALUE(pColVal)) {
if (COL_VAL_IS_VALUE(pColVal)) {
updateCtx.lflag = LFLAG_LAST;
if (!taosArrayPush(ctxArray, &updateCtx)) {
tsdbRowClose(&iter);
TAOS_CHECK_GOTO(terrno, &lino, _exit);
}
} else {
if (!iColHash) {
iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (iColHash == NULL) {
tsdbRowClose(&iter);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
}
}
if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
tsdbRowClose(&iter);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
}
continue;
}
updateCtx.lflag = LFLAG_LAST;
if (!taosArrayPush(ctxArray, &updateCtx)) {
TAOS_CHECK_GOTO(terrno, &lino, _exit);
}
}
tsdbRowClose(&iter);
@ -1390,7 +1432,10 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6
}
_exit:
taosMemoryFreeClear(pTSchema);
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
}
taosArrayDestroy(ctxArray);
tSimpleHashCleanup(iColHash);

View File

@ -2018,7 +2018,12 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
cleanupAfterGroupResultGen(pMiaInfo, pRes);
code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
break;
if (pRes->info.rows == 0) {
// After filtering for last group, the result is empty, so we need to continue to process next group
continue;
} else {
break;
}
} else {
// continue
pRes->info.id.groupId = pMiaInfo->groupId;

View File

@ -468,35 +468,38 @@ end:
int32_t smlInitHandle(SQuery** query) {
*query = NULL;
SQuery* pQuery = NULL;
SVnodeModifyOpStmt* stmt = NULL;
int32_t code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
if (NULL == pQuery) {
uError("create pQuery error");
return code;
if (code != 0) {
uError("SML create pQuery error");
goto END;
}
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->haveResultSet = false;
pQuery->msgType = TDMT_VND_SUBMIT;
SVnodeModifyOpStmt* stmt = NULL;
code = nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT, (SNode**)&stmt);
if (NULL == stmt) {
uError("create SVnodeModifyOpStmt error");
qDestroyQuery(pQuery);
return code;
if (code != 0) {
uError("SML create SVnodeModifyOpStmt error");
goto END;
}
stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (stmt->pTableBlockHashObj == NULL){
uError("create pTableBlockHashObj error");
qDestroyQuery(pQuery);
nodesDestroyNode((SNode*)stmt);
return terrno;
uError("SML create pTableBlockHashObj error");
code = terrno;
goto END;
}
stmt->freeHashFunc = insDestroyTableDataCxtHashMap;
stmt->freeArrayFunc = insDestroyVgroupDataCxtList;
pQuery->pRoot = (SNode*)stmt;
*query = pQuery;
return code;
return TSDB_CODE_SUCCESS;
END:
nodesDestroyNode((SNode*)stmt);
qDestroyQuery(pQuery);
return code;
}
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash) {

View File

@ -30,7 +30,7 @@ typedef struct SInsertParseContext {
bool forceUpdate;
bool needTableTagVal;
bool needRequest; // whether or not request server
bool isStmtBind; // whether is stmt bind
bool isStmtBind; // whether is stmt bind
} SInsertParseContext;
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
@ -757,7 +757,7 @@ int32_t parseTagValue(SMsgBuf* pMsgBuf, const char** pSql, uint8_t precision, SS
STagVal val = {0};
int32_t code = parseTagToken(pSql, pToken, pTagSchema, precision, &val, pMsgBuf);
if (TSDB_CODE_SUCCESS == code) {
if (NULL == taosArrayPush(pTagVals, &val)) {
if (NULL == taosArrayPush(pTagVals, &val)){
code = terrno;
}
}
@ -775,14 +775,11 @@ static int32_t buildCreateTbReq(SVnodeModifyOpStmt* pStmt, STag* pTag, SArray* p
return terrno;
}
return insBuildCreateTbReq(pStmt->pCreateTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid,
pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags,
TSDB_DEFAULT_TABLE_TTL);
pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags,
TSDB_DEFAULT_TABLE_TTL);
}
int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf, int8_t type) {
if (pToken->type == TK_NK_QUESTION) {
return buildInvalidOperationMsg(pMsgBuf, "insert into super table syntax is not supported for stmt");
}
if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN &&
@ -813,7 +810,7 @@ typedef struct SRewriteTagCondCxt {
static int32_t rewriteTagCondColumnImpl(STagVal* pVal, SNode** pNode) {
SValueNode* pValue = NULL;
int32_t code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pValue);
int32_t code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pValue);
if (NULL == pValue) {
return code;
}
@ -1044,7 +1041,7 @@ static int32_t storeChildTableMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt
return TSDB_CODE_OUT_OF_MEMORY;
}
char tbFName[TSDB_TABLE_FNAME_LEN];
char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t code = tNameExtractFullName(&pStmt->targetTableName, tbFName);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(pBackup);
@ -1239,7 +1236,7 @@ static int32_t getTargetTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModi
}
static int32_t collectUseTable(const SName* pName, SHashObj* pTable) {
char fullName[TSDB_TABLE_FNAME_LEN];
char fullName[TSDB_TABLE_FNAME_LEN];
int32_t code = tNameExtractFullName(pName, fullName);
if (TSDB_CODE_SUCCESS != code) {
return code;
@ -1385,7 +1382,7 @@ static int32_t getTableDataCxt(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
pStmt->pTableMeta, &pStmt->pCreateTblReq, pTableCxt, false, false);
}
char tbFName[TSDB_TABLE_FNAME_LEN];
char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t code = tNameExtractFullName(&pStmt->targetTableName, tbFName);
if (TSDB_CODE_SUCCESS != code) {
return code;
@ -1926,8 +1923,8 @@ static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnod
}
if (code == TSDB_CODE_SUCCESS) {
code = insBuildCreateTbReq(pStbRowsCxt->pCreateCtbReq, pStbRowsCxt->ctbName.tname, pStbRowsCxt->pTag,
pStbRowsCxt->pStbMeta->uid, pStbRowsCxt->stbName.tname, pStbRowsCxt->aTagNames,
getNumOfTags(pStbRowsCxt->pStbMeta), TSDB_DEFAULT_TABLE_TTL);
pStbRowsCxt->pStbMeta->uid, pStbRowsCxt->stbName.tname, pStbRowsCxt->aTagNames,
getNumOfTags(pStbRowsCxt->pStbMeta), TSDB_DEFAULT_TABLE_TTL);
pStbRowsCxt->pTag = NULL;
}
@ -1936,9 +1933,9 @@ static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnod
code = tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
SVgroupInfo vg;
SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter,
.requestId = pCxt->pComCxt->requestId,
.requestObjRefId = pCxt->pComCxt->requestRid,
.mgmtEps = pCxt->pComCxt->mgmtEpSet};
.requestId = pCxt->pComCxt->requestId,
.requestObjRefId = pCxt->pComCxt->requestRid,
.mgmtEps = pCxt->pComCxt->mgmtEpSet};
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStbRowsCxt->ctbName, &vg);
}
@ -2179,8 +2176,8 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
if (code == TSDB_CODE_SUCCESS) {
SStbRowsDataContext* pStbRowsCxt = rowsDataCxt.pStbRowsCxt;
void* pData = pTableDataCxt;
code = taosHashPut(pStmt->pTableCxtHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
&pData, POINTER_BYTES);
code = taosHashPut(pStmt->pTableCxtHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), &pData,
POINTER_BYTES);
if (TSDB_CODE_SUCCESS != code) {
break;
}
@ -2252,7 +2249,7 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpSt
if (!pStmt->stbSyntax && numOfRows > 0) {
void* pData = rowsDataCxt.pTableDataCxt;
code = taosHashPut(pStmt->pTableCxtHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid), &pData,
POINTER_BYTES);
POINTER_BYTES);
}
return code;
@ -2366,7 +2363,8 @@ static int32_t constructStbRowsDataContext(SVnodeModifyOpStmt* pStmt, SStbRowsDa
if (TSDB_CODE_SUCCESS == code) {
// col values and bound cols info of STableDataContext is not used
pStbRowsCxt->aColVals = taosArrayInit(getNumOfColumns(pStbRowsCxt->pStbMeta), sizeof(SColVal));
if (!pStbRowsCxt->aColVals) code = terrno;
if (!pStbRowsCxt->aColVals)
code = terrno;
}
if (TSDB_CODE_SUCCESS == code) {
code = insInitColValues(pStbRowsCxt->pStbMeta, pStbRowsCxt->aColVals);
@ -2424,6 +2422,9 @@ static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModif
// 1. [(tag1_name, ...)] ...
// 2. VALUES ... | FILE ...
static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
if (pStmt->stbSyntax && TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
return buildInvalidOperationMsg(&pCxt->msg, "insert into super table syntax is not supported for stmt");
}
if (!pStmt->stbSyntax) {
STableDataCxt* pTableCxt = NULL;
int32_t code = parseSchemaClauseBottom(pCxt, pStmt, &pTableCxt);
@ -2510,9 +2511,9 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif
}
// db.? situationensure that the only thing following the '.' mark is '?'
char* tbNameAfterDbName = strnchr(pTbName->z, '.', pTbName->n, true);
char *tbNameAfterDbName = strnchr(pTbName->z, '.', pTbName->n, true);
if ((tbNameAfterDbName != NULL) && (*(tbNameAfterDbName + 1) == '?')) {
char* tbName = NULL;
char *tbName = NULL;
if (NULL == pCxt->pComCxt->pStmtCb) {
return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pTbName->z);
}
@ -2527,8 +2528,7 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif
if (pCxt->isStmtBind) {
if (TK_NK_ID == pTbName->type || (tbNameAfterDbName != NULL && *(tbNameAfterDbName + 1) != '?')) {
// In SQL statements, the table name has already been specified.
parserWarn("0x%" PRIx64 " table name is specified in sql, ignore the table name in bind param",
pCxt->pComCxt->requestId);
parserWarn("0x%" PRIx64 " table name is specified in sql, ignore the table name in bind param", pCxt->pComCxt->requestId);
}
}
@ -2614,7 +2614,7 @@ static void destroySubTableHashElem(void* p) { taosMemoryFree(*(STableMeta**)p);
static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, SNode** pOutput) {
SVnodeModifyOpStmt* pStmt = NULL;
int32_t code = nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT, (SNode**)&pStmt);
int32_t code = nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT, (SNode**)&pStmt);
if (NULL == pStmt) {
return code;
}
@ -2729,7 +2729,7 @@ static int32_t buildTagNameFromMeta(STableMeta* pMeta, SArray** pTagName) {
return terrno;
}
SSchema* pSchema = getTableTagSchema(pMeta);
int32_t code = 0;
int32_t code = 0;
for (int32_t i = 0; i < pMeta->tableInfo.numOfTags; ++i) {
if (NULL == taosArrayPush(*pTagName, pSchema[i].name)) {
code = terrno;
@ -2834,7 +2834,7 @@ static int32_t resetVnodeModifOpStmt(SInsertParseContext* pCxt, SQuery* pQuery)
}
if (NULL == pStmt->pTableBlockHashObj) {
pStmt->pTableBlockHashObj =
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
if (NULL == pStmt->pVgroupsHashObj || NULL == pStmt->pTableBlockHashObj) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -2866,7 +2866,7 @@ static int32_t initInsertQuery(SInsertParseContext* pCxt, SCatalogReq* pCatalogR
static int32_t setRefreshMeta(SQuery* pQuery) {
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
int32_t code = 0;
int32_t code = 0;
if (taosHashGetSize(pStmt->pTableNameHashObj) > 0) {
taosArrayDestroy(pQuery->pTableList);
@ -3065,10 +3065,9 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal
.forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false),
.isStmtBind = pCxt->isStmtBind};
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)((*pQuery)->pRoot);
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = parseInsertSqlImpl(&context, pStmt);
code = parseInsertSqlImpl(&context, (SVnodeModifyOpStmt*)(*pQuery)->pRoot);
}
if (TSDB_CODE_SUCCESS == code) {
code = setNextStageInfo(&context, *pQuery, pCatalogReq);
@ -3077,27 +3076,8 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal
QUERY_EXEC_STAGE_SCHEDULE == (*pQuery)->execStage) {
code = setRefreshMeta(*pQuery);
}
if (pStmt->stbSyntax && TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT) &&
code == TSDB_CODE_TSC_INVALID_OPERATION) {
context.tags.numOfBound = pStmt->pStbRowsCxt->boundColsInfo.numOfBound;
context.tags.numOfCols = pStmt->pStbRowsCxt->boundColsInfo.numOfCols;
context.tags.hasBoundCols = pStmt->pStbRowsCxt->boundColsInfo.hasBoundCols;
context.tags.pColIndex = taosMemoryMalloc(sizeof(int16_t) * context.tags.numOfBound);
if (NULL == context.tags.pColIndex) {
return terrno;
}
(void)memcpy(context.tags.pColIndex, pStmt->pStbRowsCxt->boundColsInfo.pColIndex,
sizeof(int16_t) * pStmt->pStbRowsCxt->boundColsInfo.numOfBound);
code = setStmtInfo(&context, pStmt);
if (TSDB_CODE_SUCCESS == code) {
insDestroyBoundColInfo(&context.tags);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
insDestroyBoundColInfo(&context.tags);
// if no data to insert, set emptyMode to avoid request server
if (!context.needRequest) {
(*pQuery)->execMode = QUERY_EXEC_MODE_EMPTY_RESULT;

View File

@ -242,7 +242,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
}
code = insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName,
pDataBlock->pMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
pDataBlock->pMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
pTag = NULL;
end:
@ -594,7 +594,7 @@ int32_t qBindStmtTagsValue2(void* pBlock, void* boundTags, int64_t suid, const c
}
code = insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName,
pDataBlock->pMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
pDataBlock->pMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
pTag = NULL;
end:
@ -886,7 +886,7 @@ _return:
int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSchema, int32_t* fieldNum,
TAOS_FIELD_E** fields, uint8_t timePrec) {
if (fields != NULL) {
if (fields) {
*fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD_E));
if (NULL == *fields) {
return terrno;
@ -939,7 +939,7 @@ int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fiel
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
if (pDataBlock->boundColsInfo.numOfBound <= 0) {
*fieldNum = 0;
if (fields != NULL) {
if (fields) {
*fields = NULL;
}

View File

@ -147,7 +147,7 @@ int16_t insFindCol(SToken* pColname, int16_t start, int16_t end, SSchema* pSchem
}
int32_t insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
SArray* tagName, uint8_t tagNum, int32_t ttl) {
SArray* tagName, uint8_t tagNum, int32_t ttl) {
pTbReq->type = TD_CHILD_TABLE;
pTbReq->ctb.pTag = (uint8_t*)pTag;
pTbReq->name = taosStrdup(tname);
@ -174,7 +174,7 @@ static void initBoundCols(int32_t ncols, int16_t* pBoundCols) {
static int32_t initColValues(STableMeta* pTableMeta, SArray* pValues) {
SSchema* pSchemas = getTableColumnSchema(pTableMeta);
int32_t code = 0;
int32_t code = 0;
for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type);
if (NULL == taosArrayPush(pValues, &val)) {
@ -886,19 +886,77 @@ static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
return false;
}
int32_t checkSchema(SSchema* pColSchema, int8_t* fields, char* errstr, int32_t errstrLen) {
if (*fields != pColSchema->type) {
if (errstr != NULL)
snprintf(errstr, errstrLen, "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
return TSDB_CODE_INVALID_PARA;
}
if (IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) > pColSchema->bytes) {
if (errstr != NULL)
snprintf(errstr, errstrLen,
"column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
*(int32_t*)(fields + sizeof(int8_t)));
return TSDB_CODE_INVALID_PARA;
}
if (!IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
if (errstr != NULL)
snprintf(errstr, errstrLen,
"column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
*(int32_t*)(fields + sizeof(int8_t)));
return TSDB_CODE_INVALID_PARA;
}
return 0;
}
#define PRCESS_DATA(i, j) \
ret = checkSchema(pColSchema, fields, errstr, errstrLen); \
if (ret != 0) { \
goto end; \
} \
\
if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { \
hasTs = true; \
} \
\
int8_t* offset = pStart; \
if (IS_VAR_DATA_TYPE(pColSchema->type)) { \
pStart += numOfRows * sizeof(int32_t); \
} else { \
pStart += BitmapLen(numOfRows); \
} \
char* pData = pStart; \
\
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j); \
ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); \
if (ret != 0) { \
goto end; \
} \
fields += sizeof(int8_t) + sizeof(int32_t); \
if (needChangeLength && version == BLOCK_VERSION_1) { \
pStart += htonl(colLength[i]); \
} else { \
pStart += colLength[i]; \
} \
boundInfo->pColIndex[j] = -1;
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
int ret = 0;
if(data == NULL) {
if (data == NULL) {
uError("rawBlockBindData, data is NULL");
return TSDB_CODE_APP_ERROR;
}
void* tmp =
taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
SVCreateTbReq *pCreateReqTmp = NULL;
if (tmp == NULL && pCreateTb != NULL){
SVCreateTbReq* pCreateReqTmp = NULL;
if (tmp == NULL && pCreateTb != NULL) {
ret = cloneSVreateTbReq(pCreateTb, &pCreateReqTmp);
if (ret != TSDB_CODE_SUCCESS){
if (ret != TSDB_CODE_SUCCESS) {
uError("cloneSVreateTbReq error");
goto end;
}
@ -906,7 +964,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
STableDataCxt* pTableCxt = NULL;
ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
if (pCreateReqTmp != NULL) {
tdDestroySVCreateTbReq(pCreateReqTmp);
taosMemoryFree(pCreateReqTmp);
@ -963,121 +1021,48 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
// if (tFields != NULL && numFields > boundInfo->numOfBound) {
// if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d bigger than num of bound cols:%d", numFields, boundInfo->numOfBound);
// ret = TSDB_CODE_INVALID_PARA;
// goto end;
// }
if (tFields == NULL && numOfCols != boundInfo->numOfBound) {
if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to num of bound cols:%d", numOfCols, boundInfo->numOfBound);
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
bool hasTs = false;
if (tFields == NULL) {
for (int j = 0; j < boundInfo->numOfBound; j++) {
SSchema* pColSchema = &pSchema[j];
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
if (errstr != NULL)
snprintf(errstr, errstrLen,
"column type or bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
*(int32_t*)(fields + sizeof(int8_t)));
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
int8_t* offset = pStart;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
pStart += numOfRows * sizeof(int32_t);
} else {
pStart += BitmapLen(numOfRows);
}
char* pData = pStart;
ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
if (ret != 0) {
goto end;
}
fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength && version == BLOCK_VERSION_1) {
pStart += htonl(colLength[j]);
} else {
pStart += colLength[j];
}
int32_t len = TMIN(numOfCols, boundInfo->numOfBound);
for (int j = 0; j < len; j++) {
SSchema* pColSchema = &pSchema[j];
PRCESS_DATA(j, j)
}
} else {
bool hasTs = false;
for (int i = 0; i < numFields; i++) {
for (int j = 0; j < boundInfo->numOfBound; j++) {
SSchema* pColSchema = &pSchema[j];
char* fieldName = NULL;
char* fieldName = NULL;
if (raw) {
fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name;
} else {
fieldName = ((TAOS_FIELD*)tFields)[i].name;
}
if (strcmp(pColSchema->name, fieldName) == 0) {
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
if (errstr != NULL)
snprintf(errstr, errstrLen,
"column type or bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
*(int32_t*)(fields + sizeof(int8_t)));
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
hasTs = true;
}
int8_t* offset = pStart;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
pStart += numOfRows * sizeof(int32_t);
} else {
pStart += BitmapLen(numOfRows);
// for(int k = 0; k < numOfRows; k++) {
// if(!colDataIsNull_f(offset, k) && pColSchema->type == TSDB_DATA_TYPE_INT){
// printf("colName:%s,val:%d", fieldName, *(int32_t*)(pStart + k * sizeof(int32_t)));
// }
// }
}
char* pData = pStart;
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
if (ret != 0) {
goto end;
}
fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength && version == BLOCK_VERSION_1) {
pStart += htonl(colLength[i]);
} else {
pStart += colLength[i];
}
boundInfo->pColIndex[j] = -1;
PRCESS_DATA(i, j)
break;
}
}
}
}
if (!hasTs) {
if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
if (!hasTs) {
if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
for (int c = 0; c < boundInfo->numOfBound; ++c) {
if (boundInfo->pColIndex[c] != -1) {
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
if (ret != 0) {
goto end;
}
} else {
boundInfo->pColIndex[c] = c; // restore for next block
// process NULL data
for (int c = 0; c < boundInfo->numOfBound; ++c) {
if (boundInfo->pColIndex[c] != -1) {
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
if (ret != 0) {
goto end;
}
} else {
boundInfo->pColIndex[c] = c; // restore for next block
}
}

View File

@ -3757,7 +3757,7 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
bool partionByTbname = hasTbnameFunction(pSelect->pPartitionByList);
FOREACH(pPartKey, pSelect->pPartitionByList) {
if (nodesEqualNode(pPartKey, *pNode)) {
return pCxt->currClause == SQL_CLAUSE_HAVING ? DEAL_RES_IGNORE_CHILD : rewriteExprToGroupKeyFunc(pCxt, pNode);
return pSelect->hasAggFuncs ? rewriteExprToGroupKeyFunc(pCxt, pNode) : DEAL_RES_IGNORE_CHILD;
}
if ((partionByTbname) && QUERY_NODE_COLUMN == nodeType(*pNode) &&
((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) {

View File

@ -887,7 +887,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
}
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) {
int32_t code;
int32_t code = 0;
int32_t tlen = 0;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;

View File

@ -664,7 +664,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
// set status
if (pWal->vers.firstVer == -1) {
pWal->vers.firstVer = 0;
pWal->vers.firstVer = index;
}
pWal->vers.lastVer = index;
pWal->totSize += sizeof(SWalCkHead) + cyptedBodyLen;

View File

@ -14,12 +14,12 @@
*/
#define _DEFAULT_SOURCE
#include "tlrucache.h"
#include "os.h"
#include "taoserror.h"
#include "tarray.h"
#include "tdef.h"
#include "tlog.h"
#include "tlrucache.h"
#include "tutil.h"
typedef struct SLRUEntry SLRUEntry;
@ -110,7 +110,7 @@ struct SLRUEntryTable {
};
static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) {
table->lengthBits = 4;
table->lengthBits = 16;
table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *));
if (!table->list) {
TAOS_RETURN(terrno);
@ -371,24 +371,35 @@ static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) {
static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *e, LRUHandle **handle,
bool freeOnFail) {
LRUStatus status = TAOS_LRU_STATUS_OK;
SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES);
if (!lastReferenceList) {
taosLRUEntryFree(e);
return TAOS_LRU_STATUS_FAIL;
LRUStatus status = TAOS_LRU_STATUS_OK;
SLRUEntry *toFree = NULL;
SArray *lastReferenceList = NULL;
if (shard->usage + e->totalCharge > shard->capacity) {
lastReferenceList = taosArrayInit(16, POINTER_BYTES);
if (!lastReferenceList) {
taosLRUEntryFree(e);
return TAOS_LRU_STATUS_FAIL;
}
}
(void)taosThreadMutexLock(&shard->mutex);
taosLRUCacheShardEvictLRU(shard, e->totalCharge, lastReferenceList);
if (shard->usage + e->totalCharge > shard->capacity && shard->lru.next != &shard->lru) {
if (!lastReferenceList) {
lastReferenceList = taosArrayInit(16, POINTER_BYTES);
if (!lastReferenceList) {
taosLRUEntryFree(e);
(void)taosThreadMutexUnlock(&shard->mutex);
return TAOS_LRU_STATUS_FAIL;
}
}
taosLRUCacheShardEvictLRU(shard, e->totalCharge, lastReferenceList);
}
if (shard->usage + e->totalCharge > shard->capacity && (shard->strictCapacity || handle == NULL)) {
TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
if (handle == NULL) {
if (!taosArrayPush(lastReferenceList, &e)) {
taosLRUEntryFree(e);
goto _exit;
}
toFree = e;
} else {
if (freeOnFail) {
taosLRUEntryFree(e);
@ -413,11 +424,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
taosLRUCacheShardLRURemove(shard, old);
shard->usage -= old->totalCharge;
if (!taosArrayPush(lastReferenceList, &old)) {
taosLRUEntryFree(e);
taosLRUEntryFree(old);
goto _exit;
}
toFree = old;
}
}
if (handle == NULL) {
@ -434,6 +441,10 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
_exit:
(void)taosThreadMutexUnlock(&shard->mutex);
if (toFree) {
taosLRUEntryFree(toFree);
}
for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) {
SLRUEntry *entry = taosArrayGetP(lastReferenceList, i);
@ -733,7 +744,8 @@ void taosLRUCacheCleanup(SLRUCache *cache) {
}
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
_taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle, LRUPriority priority, void *ud) {
_taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle,
LRUPriority priority, void *ud) {
uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen);
uint32_t shardIndex = hash & cache->shardedCache.shardMask;

View File

@ -1317,6 +1317,7 @@
,,y,script,./test.sh -f tsim/stream/basic2.sim
,,y,script,./test.sh -f tsim/stream/basic3.sim
,,y,script,./test.sh -f tsim/stream/basic4.sim
,,y,script,./test.sh -f tsim/stream/snodeCheck.sim
,,y,script,./test.sh -f tsim/stream/checkpointInterval0.sim
,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim
,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim

View File

@ -25,7 +25,6 @@ exe:
gcc $(CFLAGS) ./stmt.c -o $(ROOT)stmt $(LFLAGS)
gcc $(CFLAGS) ./stmt2.c -o $(ROOT)stmt2 $(LFLAGS)
gcc $(CFLAGS) ./stmt2-example.c -o $(ROOT)stmt2-example $(LFLAGS)
gcc $(CFLAGS) ./stmt2-get-fields.c -o $(ROOT)stmt2-get-fields $(LFLAGS)
gcc $(CFLAGS) ./stmt2-nohole.c -o $(ROOT)stmt2-nohole $(LFLAGS)
gcc $(CFLAGS) ./stmt-crash.c -o $(ROOT)stmt-crash $(LFLAGS)
@ -43,6 +42,5 @@ clean:
rm $(ROOT)stmt
rm $(ROOT)stmt2
rm $(ROOT)stmt2-example
rm $(ROOT)stmt2-get-fields
rm $(ROOT)stmt2-nohole
rm $(ROOT)stmt-crash

View File

@ -1,63 +0,0 @@
// TAOS standard API example. The same syntax as MySQL, but only a subet
// to compile: gcc -o stmt2-get-fields stmt2-get-fields.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taos.h"
void do_query(TAOS *taos, const char *sql) {
TAOS_RES *result = taos_query(taos, sql);
int code = taos_errno(result);
if (code) {
printf("failed to query: %s, reason:%s\n", sql, taos_errstr(result));
taos_free_result(result);
return;
}
taos_free_result(result);
}
void do_stmt(TAOS *taos) {
do_query(taos, "drop database if exists db");
do_query(taos, "create database db");
do_query(taos,
"create table db.stb (ts timestamp, b binary(10)) tags(t1 "
"int, t2 binary(10))");
TAOS_STMT2_OPTION option = {0};
TAOS_STMT2 *stmt = taos_stmt2_init(taos, &option);
const char *sql = "insert into db.stb(t1,t2,ts,b,tbname) values(?,?,?,?,?)";
int code = taos_stmt2_prepare(stmt, sql, 0);
if (code != 0) {
printf("failed to execute taos_stmt2_prepare. error:%s\n", taos_stmt2_error(stmt));
taos_stmt2_close(stmt);
return;
}
int fieldNum = 0;
TAOS_FIELD_ALL *pFields = NULL;
code = taos_stmt2_get_all_fields(stmt, &fieldNum, &pFields);
if (code != 0) {
printf("failed get col,ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_stmt2_error(stmt));
} else {
printf("col nums:%d\n", fieldNum);
for (int i = 0; i < fieldNum; i++) {
printf("field[%d]: %s,type:%d\n", i, pFields[i].name, pFields[i].field_type);
}
}
taos_stmt2_close(stmt);
}
int main() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", "", 0);
if (!taos) {
printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
exit(1);
}
do_stmt(taos);
taos_close(taos);
taos_cleanup();
}

View File

@ -0,0 +1,64 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/cfg.sh -n dnode1 -c supportVnodes -v 4
system sh/cfg.sh -n dnode2 -c supportVnodes -v 4
system sh/cfg.sh -n dnode3 -c supportVnodes -v 4
print ========== step1
system sh/exec.sh -n dnode1 -s start
sql connect
print ========== step2
sql create dnode $hostname port 7200
system sh/exec.sh -n dnode2 -s start
sql create dnode $hostname port 7300
system sh/exec.sh -n dnode3 -s start
$x = 0
step2:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql select * from information_schema.ins_dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 3 then
return -1
endi
if $data(1)[4] != ready then
goto step2
endi
if $data(2)[4] != ready then
goto step2
endi
print ========== step3
sql drop database if exists test;
sql create database if not exists test vgroups 4 replica 3 precision "ms" ;
sql use test;
sql create table test.test (ts timestamp, c1 int) tags (t1 int) ;
print create stream without snode existing
sql_error create stream stream_t1 trigger at_once into str_dst as select count(*) from test interval(20s);
print create snode
sql create snode on dnode 1;
sql create stream stream_t1 trigger at_once into str_dst as select count(*) from test interval(20s);
print drop snode and then create stream
sql drop snode on dnode 1;
sql_error create stream stream_t2 trigger at_once into str_dst as select count(*) from test interval(20s);
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT

View File

@ -102,6 +102,7 @@ run tsim/stream/triggerInterval0.sim
run tsim/stream/triggerSession0.sim
run tsim/stream/distributeIntervalRetrive0.sim
run tsim/stream/basic0.sim
run tsim/stream/snodeCheck.sim
run tsim/stream/session0.sim
run tsim/stream/schedSnode.sim
run tsim/stream/partitionby.sim

View File

@ -344,9 +344,72 @@ class TDTestCase:
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(55)
# TODO Fix Me!
sql = "explain SELECT count(*), timediff(_wend, last(ts)), timediff('2018-09-20 01:00:00', _wstart) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY concat(tbname, 'asd') INTERVAL(5m) having(concat(tbname, 'asd') like '%asd');"
tdSql.error(sql, -2147473664) # Error: Planner internal error
sql = "SELECT count(*), timediff(_wend, last(ts)), timediff('2018-09-20 01:00:00', _wstart) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY concat(tbname, 'asd') INTERVAL(5m) having(concat(tbname, 'asd') like '%asd');"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(60)
sql = "SELECT count(*), timediff(_wend, last(ts)), timediff('2018-09-20 01:00:00', _wstart) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY concat(tbname, 'asd') INTERVAL(5m) having(concat(tbname, 'asd') like 'asd%');"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(0)
sql = "SELECT c1 FROM meters PARTITION BY c1 HAVING c1 > 0 slimit 2 limit 10"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(20)
sql = "SELECT t1 FROM meters PARTITION BY t1 HAVING(t1 = 1)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(20000)
sql = "SELECT concat(t2, 'asd') FROM meters PARTITION BY t2 HAVING(t2 like '%5')"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(10000)
tdSql.checkData(0, 0, 'tb5asd')
sql = "SELECT concat(t2, 'asd') FROM meters PARTITION BY concat(t2, 'asd') HAVING(concat(t2, 'asd')like '%5%')"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(10000)
tdSql.checkData(0, 0, 'tb5asd')
sql = "SELECT avg(c1) FROM meters PARTITION BY tbname, t1 HAVING(t1 = 1)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(2)
sql = "SELECT count(*) FROM meters PARTITION BY concat(tbname, 'asd') HAVING(concat(tbname, 'asd') like '%asd')"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(10)
sql = "SELECT count(*), concat(tbname, 'asd') FROM meters PARTITION BY concat(tbname, 'asd') HAVING(concat(tbname, 'asd') like '%asd')"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(10)
sql = "SELECT count(*) FROM meters PARTITION BY t1 HAVING(t1 < 4) order by t1 +1"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(4)
sql = "SELECT count(*), t1 + 100 FROM meters PARTITION BY t1 HAVING(t1 < 4) order by t1 +1"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(4)
sql = "SELECT count(*), t1 + 100 FROM meters PARTITION BY t1 INTERVAL(1d) HAVING(t1 < 4) order by t1 +1 desc"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(280)
sql = "SELECT count(*), concat(t3, 'asd') FROM meters PARTITION BY concat(t3, 'asd') INTERVAL(1d) HAVING(concat(t3, 'asd') like '%5asd' and count(*) = 118)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(1)
sql = "SELECT count(*), concat(t3, 'asd') FROM meters PARTITION BY concat(t3, 'asd') INTERVAL(1d) HAVING(concat(t3, 'asd') like '%5asd' and count(*) != 118)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(69)
sql = "SELECT count(*), concat(t3, 'asd') FROM meters PARTITION BY concat(t3, 'asd') INTERVAL(1d) HAVING(concat(t3, 'asd') like '%5asd') order by count(*) asc limit 10"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(10)
sql = "SELECT count(*), concat(t3, 'asd') FROM meters PARTITION BY concat(t3, 'asd') INTERVAL(1d) HAVING(concat(t3, 'asd') like '%5asd' or concat(t3, 'asd') like '%3asd') order by count(*) asc limit 10000"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(140)
def run(self):
self.prepareTestEnv()

View File

@ -17,8 +17,6 @@ sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
updatecfgDict = {'sDebugFlag':143, 'wDebugFlag':143}
def __init__(self):
self.vgroups = 1
self.ctbNum = 10

View File

@ -105,6 +105,113 @@ int smlProcess_telnet_Test() {
return code;
}
int smlProcess_telnet0_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
const char *sql1[] = {"sysif.bytes.out 1479496100 1.3E0 host=web01 interface=eth0"};
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_TELNET_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes);
ASSERT(code == 0);
taos_free_result(pRes);
const char *sql2[] = {"sysif.bytes.out 1479496700 1.6E0 host=web01 interface=eth0"};
pRes = taos_schemaless_insert(taos, (char **)sql2, sizeof(sql2) / sizeof(sql2[0]), TSDB_SML_TELNET_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
code = taos_errno(pRes);
ASSERT(code == 0);
taos_free_result(pRes);
const char *sql3[] = {"sysif.bytes.out 1479496300 1.1E0 interface=eth0 host=web01"};
pRes = taos_schemaless_insert(taos, (char **)sql3, sizeof(sql3) / sizeof(sql3[0]), TSDB_SML_TELNET_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
code = taos_errno(pRes);
ASSERT(code == 0);
taos_free_result(pRes);
taos_close(taos);
return code;
}
int smlProcess_json0_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
const char *sql[] = {
"[{\"metric\":\"syscpu.nice\",\"timestamp\":1662344045,\"value\":9,\"tags\":{\"host\":\"web02\",\"dc\":4}}]"};
char *sql1[1] = {0};
for (int i = 0; i < 1; i++) {
sql1[i] = taosMemoryCalloc(1, 1024);
ASSERT(sql1[i] != NULL);
(void)strncpy(sql1[i], sql[i], 1023);
}
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
int code = taos_errno(pRes);
if (code != 0) {
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
} else {
printf("%s result:success\n", __FUNCTION__);
}
taos_free_result(pRes);
for (int i = 0; i < 1; i++) {
taosMemoryFree(sql1[i]);
}
ASSERT(code == 0);
const char *sql2[] = {
"[{\"metric\":\"syscpu.nice\",\"timestamp\":1662344041,\"value\":13,\"tags\":{\"host\":\"web01\",\"dc\":1}"
"},{\"metric\":\"syscpu.nice\",\"timestamp\":1662344042,\"value\":9,\"tags\":{\"host\":\"web02\",\"dc\":4}"
"}]",
};
char *sql3[1] = {0};
for (int i = 0; i < 1; i++) {
sql3[i] = taosMemoryCalloc(1, 1024);
ASSERT(sql3[i] != NULL);
(void)strncpy(sql3[i], sql2[i], 1023);
}
pRes = taos_schemaless_insert(taos, (char **)sql3, sizeof(sql3) / sizeof(sql3[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
code = taos_errno(pRes);
if (code != 0) {
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
} else {
printf("%s result:success\n", __FUNCTION__);
}
taos_free_result(pRes);
for (int i = 0; i < 1; i++) {
taosMemoryFree(sql3[i]);
}
ASSERT(code == 0);
taos_close(taos);
return code;
}
int smlProcess_json1_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -1775,6 +1882,21 @@ int sml_td24559_Test() {
pRes = taos_query(taos, "create database if not exists td24559");
taos_free_result(pRes);
const char *sql1[] = {
"sttb,t1=1 f1=283i32,f2=g\"\" 1632299372000",
"sttb,t1=1 f2=G\"Point(4.343 89.342)\",f1=106i32 1632299373000",
};
pRes = taos_query(taos, "use td24559");
taos_free_result(pRes);
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
int code = taos_errno(pRes);
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
ASSERT(code);
taos_free_result(pRes);
const char *sql[] = {
"stb,t1=1 f1=283i32,f2=g\"Point(4.343 89.342)\" 1632299372000",
"stb,t1=1 f2=G\"Point(4.343 89.342)\",f1=106i32 1632299373000",
@ -1788,7 +1910,7 @@ int sml_td24559_Test() {
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
int code = taos_errno(pRes);
code = taos_errno(pRes);
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
taos_free_result(pRes);
@ -2136,7 +2258,8 @@ int main(int argc, char *argv[]) {
taos_options(TSDB_OPTION_CONFIGDIR, argv[1]);
}
int ret = 0;
int ret = smlProcess_json0_Test();
ASSERT(!ret);
ret = sml_ts5528_test();
ASSERT(!ret);
ret = sml_td29691_Test();
@ -2173,6 +2296,8 @@ int main(int argc, char *argv[]) {
ASSERT(!ret);
ret = smlProcess_telnet_Test();
ASSERT(!ret);
ret = smlProcess_telnet0_Test();
ASSERT(!ret);
ret = smlProcess_json1_Test();
ASSERT(!ret);
ret = smlProcess_json2_Test();

View File

@ -19,196 +19,77 @@
#include "taos.h"
#include "types.h"
int buildStable(TAOS* pConn) {
TAOS_RES* pRes = taos_query(pConn,
"CREATE STABLE `meters` (`ts` TIMESTAMP, `current` INT, `voltage` INT, `phase` FLOAT) TAGS "
"(`groupid` INT, `location` VARCHAR(16))");
if (taos_errno(pRes) != 0) {
printf("failed to create super table meters, reason:%s\n", taos_errstr(pRes));
return -1;
}
TAOS* pConn = NULL;
void action(char* sql) {
TAOS_RES* pRes = taos_query(pConn, sql);
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d0 using meters tags(1, 'San Francisco')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into d0 (ts, current) values (now, 120)");
if (taos_errno(pRes) != 0) {
printf("failed to insert into table d0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d1 using meters tags(2, 'San Francisco')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d2 using meters tags(3, 'San Francisco')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table ntba(ts timestamp, addr binary(32))");
if (taos_errno(pRes) != 0) {
printf("failed to create ntba, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table ntbb(ts timestamp, addr binary(8))");
if (taos_errno(pRes) != 0) {
printf("failed to create ntbb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ntba values(now,'123456789abcdefg123456789')");
if (taos_errno(pRes) != 0) {
printf("failed to insert table ntba, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ntba values(now + 1s,'hello')");
if (taos_errno(pRes) != 0) {
printf("failed to insert table ntba, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
int32_t ret = -1;
TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_raw");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists db_raw vgroups 2");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use db_raw");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
buildStable(pConn);
pRes = taos_query(pConn, "select * from d0");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
goto END;
}
void *data = NULL;
int32_t test_write_raw_block(char* query, char* dst) {
TAOS_RES* pRes = taos_query(pConn, query);
ASSERT(taos_errno(pRes) == 0);
void* data = NULL;
int32_t numOfRows = 0;
int error_code = taos_fetch_raw_block(pRes, &numOfRows, &data);
if(error_code !=0 ){
printf("error fetch raw block, reason:%s\n", taos_errstr(pRes));
goto END;
}
taos_write_raw_block(pConn, numOfRows, data, "d1");
ASSERT(error_code == 0);
error_code = taos_write_raw_block(pConn, numOfRows, data, dst);
taos_free_result(pRes);
return error_code;
}
pRes = taos_query(pConn, "select ts,phase from d0");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
goto END;
}
error_code = taos_fetch_raw_block(pRes, &numOfRows, &data);
if(error_code !=0 ){
printf("error fetch raw block, reason:%s\n", taos_errstr(pRes));
goto END;
}
int32_t test_write_raw_block_with_fields(char* query, char* dst) {
TAOS_RES* pRes = taos_query(pConn, query);
ASSERT(taos_errno(pRes) == 0);
void* data = NULL;
int32_t numOfRows = 0;
int error_code = taos_fetch_raw_block(pRes, &numOfRows, &data);
ASSERT(error_code == 0);
int numFields = taos_num_fields(pRes);
TAOS_FIELD *fields = taos_fetch_fields(pRes);
taos_write_raw_block_with_fields(pConn, numOfRows, data, "d2", fields, numFields);
TAOS_FIELD* fields = taos_fetch_fields(pRes);
error_code = taos_write_raw_block_with_fields(pConn, numOfRows, data, dst, fields, numFields);
taos_free_result(pRes);
return error_code;
}
// check error msg
pRes = taos_query(pConn, "select * from ntba");
if (taos_errno(pRes) != 0) {
printf("error select * from ntba, reason:%s\n", taos_errstr(pRes));
goto END;
}
void init_env() {
pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT(pConn);
data = NULL;
numOfRows = 0;
error_code = taos_fetch_raw_block(pRes, &numOfRows, &data);
if(error_code !=0 ){
printf("error fetch select * from ntba, reason:%s\n", taos_errstr(pRes));
goto END;
}
error_code = taos_write_raw_block(pConn, numOfRows, data, "ntbb");
if(error_code == 0) {
printf(" taos_write_raw_block to ntbb expect failed , but success!\n");
goto END;
}
action("drop database if exists db_raw");
action("create database if not exists db_raw vgroups 2");
action("use db_raw");
// pass NULL return last error code describe
const char* err = tmq_err2str(error_code);
printf("write_raw_block return code =0x%x err=%s\n", error_code, err);
if(strcmp(err, "success") == 0) {
printf("expect failed , but error string is success! err=%s\n", err);
goto END;
}
action(
"CREATE STABLE `meters` (`ts` TIMESTAMP, `current` INT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, "
"`location` VARCHAR(16))");
action("create table d0 using meters tags(1, 'San Francisco')");
action("create table d1 using meters tags(2, 'San Francisco')");
action("create table d2 using meters tags(3, 'San Francisco')");
action("insert into d0 (ts, current) values (now, 120)");
// no exist table
error_code = taos_write_raw_block(pConn, numOfRows, data, "no-exist-table");
if(error_code == 0) {
printf(" taos_write_raw_block to no-exist-table expect failed , but success!\n");
goto END;
}
action("create table ntba(ts timestamp, addr binary(32))");
action("create table ntbb(ts timestamp, addr binary(8))");
action("create table ntbc(ts timestamp, addr binary(8), c2 int)");
err = tmq_err2str(error_code);
printf("write_raw_block no exist table return code =0x%x err=%s\n", error_code, err);
if(strcmp(err, "success") == 0) {
printf("expect failed write no exist table, but error string is success! err=%s\n", err);
goto END;
}
// success
ret = 0;
END:
// free
if(pRes) taos_free_result(pRes);
if(pConn) taos_close(pConn);
return ret;
action("insert into ntba values(now,'123456789abcdefg123456789')");
action("insert into ntbb values(now + 1s,'hello')");
action("insert into ntbc values(now + 13s, 'sdf', 123)");
}
int main(int argc, char* argv[]) {
printf("test write_raw_block...\n");
int ret = init_env();
if (ret < 0) {
printf("test write_raw_block failed.\n");
return ret;
}
printf("test write_raw_block ok.\n");
printf("test write_raw_block start.\n");
init_env();
ASSERT(test_write_raw_block("select * from d0", "d1") == 0); // test schema same
ASSERT(test_write_raw_block("select * from ntbb", "ntba") == 0); // test schema compatible
ASSERT(test_write_raw_block("select * from ntbb", "ntbc") == 0); // test schema small
ASSERT(test_write_raw_block("select * from ntbc", "ntbb") == 0); // test schema bigger
ASSERT(test_write_raw_block("select * from ntba", "ntbb") != 0); // test schema mismatch
ASSERT(test_write_raw_block("select * from ntba", "no-exist-table") != 0); // test no exist table
ASSERT(test_write_raw_block("select addr from ntba", "ntbb") != 0); // test without ts
ASSERT(test_write_raw_block_with_fields("select ts,phase from d0", "d2") == 0); // test with fields
printf("test write_raw_block end.\n");
return 0;
}