Merge remote-tracking branch 'origin/3.0' into enh/TD-19463_2
This commit is contained in:
commit
2ece142aba
|
@ -198,6 +198,7 @@ DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res);
|
||||||
|
|
||||||
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
|
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
|
||||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
|
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
|
||||||
|
DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision);
|
||||||
|
|
||||||
/* --------------------------TMQ INTERFACE------------------------------- */
|
/* --------------------------TMQ INTERFACE------------------------------- */
|
||||||
|
|
||||||
|
|
|
@ -28,8 +28,8 @@
|
||||||
#define QUOTE '"'
|
#define QUOTE '"'
|
||||||
#define SLASH '\\'
|
#define SLASH '\\'
|
||||||
|
|
||||||
#define JUMP_SPACE(sql) \
|
#define JUMP_SPACE(sql, sqlEnd) \
|
||||||
while (*sql != '\0') { \
|
while (sql < sqlEnd) { \
|
||||||
if (*sql == SPACE) \
|
if (*sql == SPACE) \
|
||||||
sql++; \
|
sql++; \
|
||||||
else \
|
else \
|
||||||
|
@ -917,16 +917,17 @@ static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
|
||||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
|
static int32_t smlParseInfluxString(const char *sql, const char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
|
||||||
if (!sql) return TSDB_CODE_SML_INVALID_DATA;
|
if (!sql) return TSDB_CODE_SML_INVALID_DATA;
|
||||||
JUMP_SPACE(sql)
|
JUMP_SPACE(sql, sqlEnd)
|
||||||
if (*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA;
|
if (*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA;
|
||||||
elements->measure = sql;
|
elements->measure = sql;
|
||||||
|
|
||||||
// parse measure
|
// parse measure
|
||||||
while (*sql != '\0') {
|
while (sql < sqlEnd) {
|
||||||
if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
|
if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
|
||||||
MOVE_FORWARD_ONE(sql, strlen(sql) + 1);
|
MOVE_FORWARD_ONE(sql, sqlEnd - sql);
|
||||||
|
sqlEnd--;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (IS_COMMA(sql)) {
|
if (IS_COMMA(sql)) {
|
||||||
|
@ -950,7 +951,7 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm
|
||||||
} else {
|
} else {
|
||||||
if (*sql == COMMA) sql++;
|
if (*sql == COMMA) sql++;
|
||||||
elements->tags = sql;
|
elements->tags = sql;
|
||||||
while (*sql != '\0') {
|
while (sql < sqlEnd) {
|
||||||
if (IS_SPACE(sql)) {
|
if (IS_SPACE(sql)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -961,10 +962,10 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm
|
||||||
elements->measureTagsLen = sql - elements->measure;
|
elements->measureTagsLen = sql - elements->measure;
|
||||||
|
|
||||||
// parse cols
|
// parse cols
|
||||||
JUMP_SPACE(sql)
|
JUMP_SPACE(sql, sqlEnd)
|
||||||
elements->cols = sql;
|
elements->cols = sql;
|
||||||
bool isInQuote = false;
|
bool isInQuote = false;
|
||||||
while (*sql != '\0') {
|
while (sql < sqlEnd) {
|
||||||
if (IS_QUOTE(sql)) {
|
if (IS_QUOTE(sql)) {
|
||||||
isInQuote = !isInQuote;
|
isInQuote = !isInQuote;
|
||||||
}
|
}
|
||||||
|
@ -984,10 +985,10 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm
|
||||||
}
|
}
|
||||||
|
|
||||||
// parse timestamp
|
// parse timestamp
|
||||||
JUMP_SPACE(sql)
|
JUMP_SPACE(sql, sqlEnd)
|
||||||
elements->timestamp = sql;
|
elements->timestamp = sql;
|
||||||
while (*sql != '\0') {
|
while (sql < sqlEnd) {
|
||||||
if (*sql == SPACE) {
|
if (isspace(*sql)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
sql++;
|
sql++;
|
||||||
|
@ -997,8 +998,8 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void smlParseTelnetElement(const char **sql, const char **data, int32_t *len) {
|
static void smlParseTelnetElement(const char **sql, const char *sqlEnd, const char **data, int32_t *len) {
|
||||||
while (**sql != '\0') {
|
while (*sql < sqlEnd) {
|
||||||
if (**sql != SPACE && !(*data)) {
|
if (**sql != SPACE && !(*data)) {
|
||||||
*data = *sql;
|
*data = *sql;
|
||||||
} else if (**sql == SPACE && *data) {
|
} else if (**sql == SPACE && *data) {
|
||||||
|
@ -1009,20 +1010,20 @@ static void smlParseTelnetElement(const char **sql, const char **data, int32_t *
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTableName, SHashObj *dumplicateKey,
|
static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *cols, char *childTableName, SHashObj *dumplicateKey,
|
||||||
SSmlMsgBuf *msg) {
|
SSmlMsgBuf *msg) {
|
||||||
if(!cols) return TSDB_CODE_OUT_OF_MEMORY;
|
if(!cols) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
const char *sql = data;
|
const char *sql = data;
|
||||||
size_t childTableNameLen = strlen(tsSmlChildTableName);
|
size_t childTableNameLen = strlen(tsSmlChildTableName);
|
||||||
while (*sql != '\0') {
|
while (sql < sqlEnd) {
|
||||||
JUMP_SPACE(sql)
|
JUMP_SPACE(sql, sqlEnd)
|
||||||
if (*sql == '\0') break;
|
if (*sql == '\0') break;
|
||||||
|
|
||||||
const char *key = sql;
|
const char *key = sql;
|
||||||
int32_t keyLen = 0;
|
int32_t keyLen = 0;
|
||||||
|
|
||||||
// parse key
|
// parse key
|
||||||
while (*sql != '\0') {
|
while (sql < sqlEnd) {
|
||||||
if (*sql == SPACE) {
|
if (*sql == SPACE) {
|
||||||
smlBuildInvalidDataMsg(msg, "invalid data", sql);
|
smlBuildInvalidDataMsg(msg, "invalid data", sql);
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
|
@ -1047,7 +1048,7 @@ static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTab
|
||||||
// parse value
|
// parse value
|
||||||
const char *value = sql;
|
const char *value = sql;
|
||||||
int32_t valueLen = 0;
|
int32_t valueLen = 0;
|
||||||
while (*sql != '\0') {
|
while (sql < sqlEnd) {
|
||||||
// parse value
|
// parse value
|
||||||
if (*sql == SPACE) {
|
if (*sql == SPACE) {
|
||||||
break;
|
break;
|
||||||
|
@ -1092,11 +1093,11 @@ static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTab
|
||||||
}
|
}
|
||||||
|
|
||||||
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
|
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
|
||||||
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTableInfo *tinfo, SArray *cols) {
|
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlTableInfo *tinfo, SArray *cols) {
|
||||||
if (!sql) return TSDB_CODE_SML_INVALID_DATA;
|
if (!sql) return TSDB_CODE_SML_INVALID_DATA;
|
||||||
|
|
||||||
// parse metric
|
// parse metric
|
||||||
smlParseTelnetElement(&sql, &tinfo->sTableName, &tinfo->sTableNameLen);
|
smlParseTelnetElement(&sql, sqlEnd, &tinfo->sTableName, &tinfo->sTableNameLen);
|
||||||
if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
|
if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
|
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
|
||||||
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
||||||
|
@ -1105,7 +1106,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTable
|
||||||
// parse timestamp
|
// parse timestamp
|
||||||
const char *timestamp = NULL;
|
const char *timestamp = NULL;
|
||||||
int32_t tLen = 0;
|
int32_t tLen = 0;
|
||||||
smlParseTelnetElement(&sql, ×tamp, &tLen);
|
smlParseTelnetElement(&sql, sqlEnd, ×tamp, &tLen);
|
||||||
if (!timestamp || tLen == 0) {
|
if (!timestamp || tLen == 0) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
|
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
|
@ -1120,7 +1121,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTable
|
||||||
// parse value
|
// parse value
|
||||||
const char *value = NULL;
|
const char *value = NULL;
|
||||||
int32_t valueLen = 0;
|
int32_t valueLen = 0;
|
||||||
smlParseTelnetElement(&sql, &value, &valueLen);
|
smlParseTelnetElement(&sql, sqlEnd, &value, &valueLen);
|
||||||
if (!value || valueLen == 0) {
|
if (!value || valueLen == 0) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
|
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
|
||||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
|
@ -1138,7 +1139,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTable
|
||||||
}
|
}
|
||||||
|
|
||||||
// parse tags
|
// parse tags
|
||||||
ret = smlParseTelnetTags(sql, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
|
ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
|
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -2073,11 +2074,11 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *
|
||||||
}
|
}
|
||||||
/************* TSDB_SML_JSON_PROTOCOL function end **************/
|
/************* TSDB_SML_JSON_PROTOCOL function end **************/
|
||||||
|
|
||||||
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) {
|
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) {
|
||||||
SSmlLineInfo elements = {0};
|
SSmlLineInfo elements = {0};
|
||||||
uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql);
|
uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql);
|
||||||
|
|
||||||
int ret = smlParseInfluxString(sql, &elements, &info->msgBuf);
|
int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id);
|
uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id);
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -2184,7 +2185,7 @@ static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlParseTelnetLine(SSmlHandle *info, void *data) {
|
static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) {
|
||||||
int ret = TSDB_CODE_SUCCESS;
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
SSmlTableInfo *tinfo = smlBuildTableInfo();
|
SSmlTableInfo *tinfo = smlBuildTableInfo();
|
||||||
if (!tinfo) {
|
if (!tinfo) {
|
||||||
|
@ -2198,7 +2199,7 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
|
if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
|
||||||
ret = smlParseTelnetString(info, (const char *)data, tinfo, cols);
|
ret = smlParseTelnetString(info, (const char *)data, (char*)data + len, tinfo, cols);
|
||||||
} else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
} else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
||||||
ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
|
ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2289,7 +2290,7 @@ static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < payloadNum; ++i) {
|
for (int32_t i = 0; i < payloadNum; ++i) {
|
||||||
cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(root)) ? root : cJSON_GetArrayItem(root, i);
|
cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(root)) ? root : cJSON_GetArrayItem(root, i);
|
||||||
ret = smlParseTelnetLine(info, dataPoint);
|
ret = smlParseTelnetLine(info, dataPoint, -1);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
|
uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
|
||||||
goto end;
|
goto end;
|
||||||
|
@ -2378,10 +2379,14 @@ static void smlPrintStatisticInfo(SSmlHandle *info) {
|
||||||
info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
|
info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlParseLine(SSmlHandle *info, char *lines[], int numLines) {
|
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char* rawLineEnd, int numLines) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
||||||
|
if(lines){
|
||||||
code = smlParseJSON(info, *lines);
|
code = smlParseJSON(info, *lines);
|
||||||
|
}else if(rawLine){
|
||||||
|
code = smlParseJSON(info, rawLine);
|
||||||
|
}
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, *lines);
|
uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, *lines);
|
||||||
return code;
|
return code;
|
||||||
|
@ -2390,28 +2395,46 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], int numLines) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numLines; ++i) {
|
for (int32_t i = 0; i < numLines; ++i) {
|
||||||
|
char *tmp = NULL;
|
||||||
|
int len = 0;
|
||||||
|
if(lines){
|
||||||
|
tmp = lines[i];
|
||||||
|
len = strlen(tmp);
|
||||||
|
}else if(rawLine){
|
||||||
|
tmp = rawLine;
|
||||||
|
while(rawLine < rawLineEnd){
|
||||||
|
if(*(rawLine++) == '\n'){
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
len++;
|
||||||
|
}
|
||||||
|
if(info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#'){ // this line is comment
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
|
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
|
||||||
code = smlParseInfluxLine(info, lines[i]);
|
code = smlParseInfluxLine(info, tmp, len);
|
||||||
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
|
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
|
||||||
code = smlParseTelnetLine(info, lines[i]);
|
code = smlParseTelnetLine(info, tmp, len);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, lines[i]);
|
uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int smlProcess(SSmlHandle *info, char *lines[], int numLines) {
|
static int smlProcess(SSmlHandle *info, char *lines[], char* rawLine, char* rawLineEnd, int numLines) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t retryNum = 0;
|
int32_t retryNum = 0;
|
||||||
|
|
||||||
info->cost.parseTime = taosGetTimestampUs();
|
info->cost.parseTime = taosGetTimestampUs();
|
||||||
|
|
||||||
code = smlParseLine(info, lines, numLines);
|
code = smlParseLine(info, lines, rawLine, rawLineEnd, numLines);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
|
uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
|
@ -2504,39 +2527,8 @@ static void smlInsertCallback(void *param, void *res, int32_t code) {
|
||||||
smlDestroyInfo(info);
|
smlDestroyInfo(info);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* taos_schemaless_insert() parse and insert data points into database according to
|
|
||||||
* different protocol.
|
|
||||||
*
|
|
||||||
* @param $lines input array may contain multiple lines, each line indicates a data point.
|
|
||||||
* If protocol=2 is used input array should contain single JSON
|
|
||||||
* string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
|
|
||||||
* multiple data points in JSON format, should include them in $JSON_string
|
|
||||||
* as a JSON array.
|
|
||||||
* @param $numLines indicates how many data points in $lines.
|
|
||||||
* If protocol = 2 is used this param will be ignored as $lines should
|
|
||||||
* contain single JSON string.
|
|
||||||
* @param $protocol indicates which protocol to use for parsing:
|
|
||||||
* 0 - influxDB line protocol
|
|
||||||
* 1 - OpenTSDB telnet line protocol
|
|
||||||
* 2 - OpenTSDB JSON format protocol
|
|
||||||
* @return return zero for successful insertion. Otherwise return none-zero error code of
|
|
||||||
* failure reason.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
|
|
||||||
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
|
|
||||||
if (NULL == taos) {
|
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT);
|
|
||||||
if (!request) {
|
|
||||||
uError("SML:taos_schemaless_insert error request is null");
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd, int numLines, int protocol, int precision) {
|
||||||
int batchs = 0;
|
int batchs = 0;
|
||||||
STscObj *pTscObj = request->pTscObj;
|
STscObj *pTscObj = request->pTscObj;
|
||||||
|
|
||||||
|
@ -2560,12 +2552,6 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!lines) {
|
|
||||||
request->code = TSDB_CODE_SML_INVALID_DATA;
|
|
||||||
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
|
if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
|
||||||
request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
|
request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
|
||||||
smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
|
smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
|
||||||
|
@ -2616,8 +2602,21 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
|
||||||
info->affectedRows = perBatch;
|
info->affectedRows = perBatch;
|
||||||
info->pRequest->body.queryFp = smlInsertCallback;
|
info->pRequest->body.queryFp = smlInsertCallback;
|
||||||
info->pRequest->body.param = info;
|
info->pRequest->body.param = info;
|
||||||
int32_t code = smlProcess(info, lines, perBatch);
|
int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch);
|
||||||
|
if(lines){
|
||||||
lines += perBatch;
|
lines += perBatch;
|
||||||
|
}
|
||||||
|
if(rawLine){
|
||||||
|
int num = 0;
|
||||||
|
while(rawLine < rawLineEnd){
|
||||||
|
if(*(rawLine++) == '\n'){
|
||||||
|
num++;
|
||||||
|
}
|
||||||
|
if(num == perBatch){
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
info->pRequest->body.queryFp(info, req, code);
|
info->pRequest->body.queryFp(info, req, code);
|
||||||
}
|
}
|
||||||
|
@ -2632,3 +2631,80 @@ end:
|
||||||
uDebug("resultend:%s", request->msgBuf);
|
uDebug("resultend:%s", request->msgBuf);
|
||||||
return (TAOS_RES *)request;
|
return (TAOS_RES *)request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* taos_schemaless_insert() parse and insert data points into database according to
|
||||||
|
* different protocol.
|
||||||
|
*
|
||||||
|
* @param $lines input array may contain multiple lines, each line indicates a data point.
|
||||||
|
* If protocol=2 is used input array should contain single JSON
|
||||||
|
* string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
|
||||||
|
* multiple data points in JSON format, should include them in $JSON_string
|
||||||
|
* as a JSON array.
|
||||||
|
* @param $numLines indicates how many data points in $lines.
|
||||||
|
* If protocol = 2 is used this param will be ignored as $lines should
|
||||||
|
* contain single JSON string.
|
||||||
|
* @param $protocol indicates which protocol to use for parsing:
|
||||||
|
* 0 - influxDB line protocol
|
||||||
|
* 1 - OpenTSDB telnet line protocol
|
||||||
|
* 2 - OpenTSDB JSON format protocol
|
||||||
|
* @return return zero for successful insertion. Otherwise return none-zero error code of
|
||||||
|
* failure reason.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
|
||||||
|
if (NULL == taos) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT);
|
||||||
|
if (!request) {
|
||||||
|
uError("SML:taos_schemaless_insert error request is null");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!lines) {
|
||||||
|
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
|
||||||
|
request->code = TSDB_CODE_SML_INVALID_DATA;
|
||||||
|
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
|
||||||
|
return (TAOS_RES *)request;
|
||||||
|
}
|
||||||
|
|
||||||
|
return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision);
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision){
|
||||||
|
if (NULL == taos) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT);
|
||||||
|
if (!request) {
|
||||||
|
uError("SML:taos_schemaless_insert error request is null");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!lines || len <= 0) {
|
||||||
|
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
|
||||||
|
request->code = TSDB_CODE_SML_INVALID_DATA;
|
||||||
|
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
|
||||||
|
return (TAOS_RES *)request;
|
||||||
|
}
|
||||||
|
|
||||||
|
int numLines = 0;
|
||||||
|
*totalRows = 0;
|
||||||
|
char *tmp = lines;
|
||||||
|
for(int i = 0; i < len; i++){
|
||||||
|
if(lines[i] == '\n' || i == len - 1){
|
||||||
|
numLines++;
|
||||||
|
if(tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL){ //ignore comment
|
||||||
|
(*totalRows)++;
|
||||||
|
}
|
||||||
|
tmp = lines + i + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision);
|
||||||
|
}
|
||||||
|
|
|
@ -44,7 +44,7 @@ TEST(testCase, smlParseInfluxString_Test) {
|
||||||
char *tmp = "\\,st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 ,32,c=3";
|
char *tmp = "\\,st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 ,32,c=3";
|
||||||
char *sql = (char *)taosMemoryCalloc(256, 1);
|
char *sql = (char *)taosMemoryCalloc(256, 1);
|
||||||
memcpy(sql, tmp, strlen(tmp) + 1);
|
memcpy(sql, tmp, strlen(tmp) + 1);
|
||||||
int ret = smlParseInfluxString(sql, &elements, &msgBuf);
|
int ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(elements.measure, sql);
|
ASSERT_EQ(elements.measure, sql);
|
||||||
ASSERT_EQ(elements.measureLen, strlen(",st"));
|
ASSERT_EQ(elements.measureLen, strlen(",st"));
|
||||||
|
@ -63,14 +63,14 @@ TEST(testCase, smlParseInfluxString_Test) {
|
||||||
tmp = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000";
|
tmp = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000";
|
||||||
memcpy(sql, tmp, strlen(tmp) + 1);
|
memcpy(sql, tmp, strlen(tmp) + 1);
|
||||||
memset(&elements, 0, sizeof(SSmlLineInfo));
|
memset(&elements, 0, sizeof(SSmlLineInfo));
|
||||||
ret = smlParseInfluxString(sql, &elements, &msgBuf);
|
ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
|
||||||
ASSERT_NE(ret, 0);
|
ASSERT_NE(ret, 0);
|
||||||
|
|
||||||
// case 3 false
|
// case 3 false
|
||||||
tmp = "st, t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000";
|
tmp = "st, t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000";
|
||||||
memcpy(sql, tmp, strlen(tmp) + 1);
|
memcpy(sql, tmp, strlen(tmp) + 1);
|
||||||
memset(&elements, 0, sizeof(SSmlLineInfo));
|
memset(&elements, 0, sizeof(SSmlLineInfo));
|
||||||
ret = smlParseInfluxString(sql, &elements, &msgBuf);
|
ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 1);
|
ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 1);
|
||||||
ASSERT_EQ(elements.colsLen, strlen("t1=3,t2=4,t3=t3"));
|
ASSERT_EQ(elements.colsLen, strlen("t1=3,t2=4,t3=t3"));
|
||||||
|
@ -79,7 +79,7 @@ TEST(testCase, smlParseInfluxString_Test) {
|
||||||
tmp = "st, c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000";
|
tmp = "st, c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000";
|
||||||
memcpy(sql, tmp, strlen(tmp) + 1);
|
memcpy(sql, tmp, strlen(tmp) + 1);
|
||||||
memset(&elements, 0, sizeof(SSmlLineInfo));
|
memset(&elements, 0, sizeof(SSmlLineInfo));
|
||||||
ret = smlParseInfluxString(sql, &elements, &msgBuf);
|
ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(elements.measure, sql);
|
ASSERT_EQ(elements.measure, sql);
|
||||||
ASSERT_EQ(elements.measureLen, strlen("st"));
|
ASSERT_EQ(elements.measureLen, strlen("st"));
|
||||||
|
@ -98,7 +98,7 @@ TEST(testCase, smlParseInfluxString_Test) {
|
||||||
tmp = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 ";
|
tmp = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 ";
|
||||||
memcpy(sql, tmp, strlen(tmp) + 1);
|
memcpy(sql, tmp, strlen(tmp) + 1);
|
||||||
memset(&elements, 0, sizeof(SSmlLineInfo));
|
memset(&elements, 0, sizeof(SSmlLineInfo));
|
||||||
ret = smlParseInfluxString(sql, &elements, &msgBuf);
|
ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(elements.measure, sql + 1);
|
ASSERT_EQ(elements.measure, sql + 1);
|
||||||
ASSERT_EQ(elements.measureLen, strlen("st"));
|
ASSERT_EQ(elements.measureLen, strlen("st"));
|
||||||
|
@ -116,21 +116,21 @@ TEST(testCase, smlParseInfluxString_Test) {
|
||||||
tmp = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 ";
|
tmp = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 ";
|
||||||
memcpy(sql, tmp, strlen(tmp) + 1);
|
memcpy(sql, tmp, strlen(tmp) + 1);
|
||||||
memset(&elements, 0, sizeof(SSmlLineInfo));
|
memset(&elements, 0, sizeof(SSmlLineInfo));
|
||||||
ret = smlParseInfluxString(sql, &elements, &msgBuf);
|
ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
// case 7
|
// case 7
|
||||||
tmp = " st , ";
|
tmp = " st , ";
|
||||||
memcpy(sql, tmp, strlen(tmp) + 1);
|
memcpy(sql, tmp, strlen(tmp) + 1);
|
||||||
memset(&elements, 0, sizeof(SSmlLineInfo));
|
memset(&elements, 0, sizeof(SSmlLineInfo));
|
||||||
ret = smlParseInfluxString(sql, &elements, &msgBuf);
|
ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
// case 8 false
|
// case 8 false
|
||||||
tmp = ", st , ";
|
tmp = ", st , ";
|
||||||
memcpy(sql, tmp, strlen(tmp) + 1);
|
memcpy(sql, tmp, strlen(tmp) + 1);
|
||||||
memset(&elements, 0, sizeof(SSmlLineInfo));
|
memset(&elements, 0, sizeof(SSmlLineInfo));
|
||||||
ret = smlParseInfluxString(sql, &elements, &msgBuf);
|
ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
|
||||||
ASSERT_NE(ret, 0);
|
ASSERT_NE(ret, 0);
|
||||||
taosMemoryFree(sql);
|
taosMemoryFree(sql);
|
||||||
}
|
}
|
||||||
|
@ -542,7 +542,7 @@ TEST(testCase, smlParseTelnetLine_error_Test) {
|
||||||
"sys.procs.running 1479496100 42 host= web01",
|
"sys.procs.running 1479496100 42 host= web01",
|
||||||
};
|
};
|
||||||
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
|
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
|
||||||
int ret = smlParseTelnetLine(info, (void *)sql[i]);
|
int ret = smlParseTelnetLine(info, (void *)sql[i], strlen(sql[i]));
|
||||||
ASSERT_NE(ret, 0);
|
ASSERT_NE(ret, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -561,7 +561,7 @@ TEST(testCase, smlParseTelnetLine_diff_type_Test) {
|
||||||
|
|
||||||
int ret = TSDB_CODE_SUCCESS;
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
|
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
|
||||||
ret = smlParseTelnetLine(info, (void *)sql[i]);
|
ret = smlParseTelnetLine(info, (void *)sql[i], strlen(sql[i]));
|
||||||
if (ret != TSDB_CODE_SUCCESS) break;
|
if (ret != TSDB_CODE_SUCCESS) break;
|
||||||
}
|
}
|
||||||
ASSERT_NE(ret, 0);
|
ASSERT_NE(ret, 0);
|
||||||
|
@ -617,7 +617,7 @@ TEST(testCase, smlParseTelnetLine_json_error_Test) {
|
||||||
|
|
||||||
int ret = TSDB_CODE_SUCCESS;
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
|
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
|
||||||
ret = smlParseTelnetLine(info, (void *)sql[i]);
|
ret = smlParseTelnetLine(info, (void *)sql[i], strlen(sql[i]));
|
||||||
ASSERT_NE(ret, 0);
|
ASSERT_NE(ret, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -653,7 +653,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type1_Test) {
|
||||||
|
|
||||||
int ret = TSDB_CODE_SUCCESS;
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
|
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
|
||||||
ret = smlParseTelnetLine(info, (void *)sql[i]);
|
ret = smlParseTelnetLine(info, (void *)sql[i], strlen(sql[i]));
|
||||||
if (ret != TSDB_CODE_SUCCESS) break;
|
if (ret != TSDB_CODE_SUCCESS) break;
|
||||||
}
|
}
|
||||||
ASSERT_NE(ret, 0);
|
ASSERT_NE(ret, 0);
|
||||||
|
@ -688,7 +688,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) {
|
||||||
};
|
};
|
||||||
int ret = TSDB_CODE_SUCCESS;
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
|
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
|
||||||
ret = smlParseTelnetLine(info, (void *)sql[i]);
|
ret = smlParseTelnetLine(info, (void *)sql[i], strlen(sql[i]));
|
||||||
if (ret != TSDB_CODE_SUCCESS) break;
|
if (ret != TSDB_CODE_SUCCESS) break;
|
||||||
}
|
}
|
||||||
ASSERT_NE(ret, 0);
|
ASSERT_NE(ret, 0);
|
||||||
|
@ -1002,7 +1002,7 @@ TEST(testCase, sml_col_4096_Test) {
|
||||||
|
|
||||||
int ret = TSDB_CODE_SUCCESS;
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
|
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
|
||||||
ret = smlParseInfluxLine(info, sql[i]);
|
ret = smlParseInfluxLine(info, sql[i], strlen(sql[i]));
|
||||||
if (ret != TSDB_CODE_SUCCESS) break;
|
if (ret != TSDB_CODE_SUCCESS) break;
|
||||||
}
|
}
|
||||||
ASSERT_NE(ret, 0);
|
ASSERT_NE(ret, 0);
|
||||||
|
|
|
@ -51,6 +51,7 @@ static int32_t dmInitMonitor() {
|
||||||
|
|
||||||
static bool dmCheckDiskSpace() {
|
static bool dmCheckDiskSpace() {
|
||||||
osUpdate();
|
osUpdate();
|
||||||
|
// sufficiency
|
||||||
if (!osDataSpaceSufficient()) {
|
if (!osDataSpaceSufficient()) {
|
||||||
dWarn("free data disk size: %f GB, not sufficient, expected %f GB at least", (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0);
|
dWarn("free data disk size: %f GB, not sufficient, expected %f GB at least", (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0);
|
||||||
}
|
}
|
||||||
|
@ -60,7 +61,24 @@ static bool dmCheckDiskSpace() {
|
||||||
if (!osTempSpaceSufficient()) {
|
if (!osTempSpaceSufficient()) {
|
||||||
dWarn("free temp disk size: %f GB, not sufficient, expected %f GB at least", (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0);
|
dWarn("free temp disk size: %f GB, not sufficient, expected %f GB at least", (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0);
|
||||||
}
|
}
|
||||||
return true;
|
// availability
|
||||||
|
bool ret = true;
|
||||||
|
if (!osDataSpaceAvailable()) {
|
||||||
|
dError("data disk space unavailable, i.e. %s", tsDataDir);
|
||||||
|
terrno = TSDB_CODE_VND_NO_DISKSPACE;
|
||||||
|
ret = false;
|
||||||
|
}
|
||||||
|
if (!osLogSpaceAvailable()) {
|
||||||
|
dError("log disk space unavailable, i.e. %s", tsLogDir);
|
||||||
|
terrno = TSDB_CODE_VND_NO_DISKSPACE;
|
||||||
|
ret = false;
|
||||||
|
}
|
||||||
|
if (!osTempSpaceAvailable()) {
|
||||||
|
dError("temp disk space unavailable, i.e. %s", tsTempDir);
|
||||||
|
terrno = TSDB_CODE_VND_NO_DISKSPACE;
|
||||||
|
ret = false;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool dmCheckDataDirVersion() {
|
static bool dmCheckDataDirVersion() {
|
||||||
|
|
|
@ -43,9 +43,7 @@ void Testbase::InitLog(const char* path) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void Testbase::Init(const char* path, int16_t port) {
|
void Testbase::Init(const char* path, int16_t port) {
|
||||||
#ifdef _TD_DARWIN_64
|
|
||||||
osDefaultInit();
|
osDefaultInit();
|
||||||
#endif
|
|
||||||
tsServerPort = port;
|
tsServerPort = port;
|
||||||
strcpy(tsLocalFqdn, "localhost");
|
strcpy(tsLocalFqdn, "localhost");
|
||||||
snprintf(tsLocalEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
|
snprintf(tsLocalEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
|
@ -258,6 +258,7 @@ enum {
|
||||||
TD_FTYPE_RSMA_QTASKINFO = 0,
|
TD_FTYPE_RSMA_QTASKINFO = 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#if 0
|
||||||
struct STFile {
|
struct STFile {
|
||||||
uint8_t state;
|
uint8_t state;
|
||||||
STFInfo info;
|
STFInfo info;
|
||||||
|
@ -287,6 +288,7 @@ int32_t tdUpdateTFileHeader(STFile *pTFile);
|
||||||
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
|
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
|
||||||
void tdCloseTFile(STFile *pTFile);
|
void tdCloseTFile(STFile *pTFile);
|
||||||
void tdDestroyTFile(STFile *pTFile);
|
void tdDestroyTFile(STFile *pTFile);
|
||||||
|
#endif
|
||||||
|
|
||||||
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
|
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
|
||||||
char *outputName);
|
char *outputName);
|
||||||
|
|
|
@ -17,14 +17,17 @@
|
||||||
|
|
||||||
extern SSmaMgmt smaMgmt;
|
extern SSmaMgmt smaMgmt;
|
||||||
|
|
||||||
|
#if 0
|
||||||
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma);
|
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma);
|
||||||
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma);
|
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma);
|
||||||
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma);
|
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma);
|
||||||
|
#endif
|
||||||
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma);
|
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma);
|
||||||
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma);
|
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma);
|
||||||
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
|
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
|
||||||
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
|
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
|
||||||
|
|
||||||
|
#if 0
|
||||||
/**
|
/**
|
||||||
* @brief Only applicable to Rollup SMA
|
* @brief Only applicable to Rollup SMA
|
||||||
*
|
*
|
||||||
|
@ -48,6 +51,7 @@ int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaSyncCommitImpl(pSma); }
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(pSma); }
|
int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(pSma); }
|
||||||
|
#endif
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Only applicable to Rollup SMA
|
* @brief Only applicable to Rollup SMA
|
||||||
|
@ -108,6 +112,7 @@ int32_t smaBegin(SSma *pSma) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
/**
|
/**
|
||||||
* @brief pre-commit for rollup sma(sync commit).
|
* @brief pre-commit for rollup sma(sync commit).
|
||||||
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
|
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
|
||||||
|
@ -169,6 +174,7 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) {
|
||||||
#endif
|
#endif
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
// SQTaskFile ======================================================
|
// SQTaskFile ======================================================
|
||||||
|
|
||||||
|
@ -230,6 +236,7 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
/**
|
/**
|
||||||
* @brief post-commit for rollup sma
|
* @brief post-commit for rollup sma
|
||||||
* 1) clean up the outdated qtaskinfo files
|
* 1) clean up the outdated qtaskinfo files
|
||||||
|
@ -249,6 +256,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Rsma async commit implementation(only do some necessary light weighted task)
|
* @brief Rsma async commit implementation(only do some necessary light weighted task)
|
||||||
|
|
|
@ -15,8 +15,6 @@
|
||||||
|
|
||||||
#include "sma.h"
|
#include "sma.h"
|
||||||
|
|
||||||
#define RSMA_QTASKINFO_BUFSIZE (32768) // size
|
|
||||||
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
|
|
||||||
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
|
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
|
||||||
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
|
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
|
||||||
#define RSMA_FETCH_DELAY_MAX (120000) // ms
|
#define RSMA_FETCH_DELAY_MAX (120000) // ms
|
||||||
|
@ -48,23 +46,10 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SR
|
||||||
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
||||||
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo);
|
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo);
|
||||||
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
|
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
|
||||||
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
|
|
||||||
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
|
|
||||||
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter);
|
|
||||||
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
|
|
||||||
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
|
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
|
||||||
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
|
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
|
||||||
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
|
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
|
||||||
|
|
||||||
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
|
|
||||||
// adapt accordingly if definition of SRSmaInfo update
|
|
||||||
SRSmaInfo *pResult = NULL;
|
|
||||||
ASSERT(pItem->level == TSDB_RETENTION_L1 || pItem->level == TSDB_RETENTION_L2);
|
|
||||||
pResult = (SRSmaInfo *)POINTER_SHIFT(pItem, -(sizeof(SRSmaInfoItem) * (pItem->level - 1) + RSMA_INFO_HEAD_LEN));
|
|
||||||
ASSERT(pResult->pTSchema->numOfCols > 1);
|
|
||||||
return pResult;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct SRSmaQTaskInfoItem {
|
struct SRSmaQTaskInfoItem {
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
@ -104,12 +89,6 @@ void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, con
|
||||||
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level);
|
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
|
|
||||||
return lenWithHead - RSMA_QTASKINFO_HEAD_LEN;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); }
|
|
||||||
|
|
||||||
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
|
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
|
||||||
// Note: free/kill may in RC
|
// Note: free/kill may in RC
|
||||||
if (!taskHandle || !(*taskHandle)) return;
|
if (!taskHandle || !(*taskHandle)) return;
|
||||||
|
@ -803,6 +782,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
|
static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
|
||||||
SSubmitMsgIter msgIter = {0};
|
SSubmitMsgIter msgIter = {0};
|
||||||
SSubmitBlkIter blkIter = {0};
|
SSubmitBlkIter blkIter = {0};
|
||||||
|
@ -820,6 +800,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief sync mode
|
* @brief sync mode
|
||||||
|
@ -1189,65 +1170,6 @@ _err:
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer) {
|
|
||||||
SVnode *pVnode = pSma->pVnode;
|
|
||||||
STFile tFile = {0};
|
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN] = {0};
|
|
||||||
|
|
||||||
tdRSmaQTaskInfoGetFileName(TD_VID(pVnode), qTaskFileVer, qTaskInfoFName);
|
|
||||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
|
|
||||||
if (qTaskFileVer > 0) {
|
|
||||||
smaWarn("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", not start as %s not exist",
|
|
||||||
TD_VID(pVnode), type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile));
|
|
||||||
} else {
|
|
||||||
smaDebug("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode),
|
|
||||||
type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile));
|
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
STFInfo tFileInfo = {0};
|
|
||||||
if (tdLoadTFileHeader(&tFile, &tFileInfo) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
SRSmaQTaskInfoIter fIter = {0};
|
|
||||||
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
|
|
||||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
|
||||||
tdCloseTFile(&tFile);
|
|
||||||
tdDestroyTFile(&tFile);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdRSmaQTaskInfoRestore(pSma, type, &fIter) < 0) {
|
|
||||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
|
||||||
tdCloseTFile(&tFile);
|
|
||||||
tdDestroyTFile(&tFile);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
|
||||||
tdCloseTFile(&tFile);
|
|
||||||
tdDestroyTFile(&tFile);
|
|
||||||
|
|
||||||
// restored successfully from committed or sync
|
|
||||||
smaInfo("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload succeed", TD_VID(pVnode),
|
|
||||||
type, qTaskFileVer);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
_err:
|
|
||||||
smaError("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload failed since %s",
|
|
||||||
TD_VID(pVnode), type, qTaskFileVer, terrstr());
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief reload ts data from checkpoint
|
* @brief reload ts data from checkpoint
|
||||||
*
|
*
|
||||||
|
@ -1270,19 +1192,12 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
// step 2: reload ts data from checkpoint
|
||||||
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
|
|
||||||
if (tdRSmaRestoreQTaskInfoReload(pSma, type, qtaskFileVer) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// step 3: reload ts data from checkpoint
|
|
||||||
if (tdRSmaRestoreTSDataReload(pSma) < 0) {
|
if (tdRSmaRestoreTSDataReload(pSma) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// step 4: open SRSmaFS for qTaskFiles
|
// step 3: open SRSmaFS for qTaskFiles
|
||||||
if (tdRSmaFSOpen(pSma, qtaskFileVer) < 0) {
|
if (tdRSmaFSOpen(pSma, qtaskFileVer) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -1295,191 +1210,6 @@ _err:
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Restore from SRSmaQTaskInfoItem
|
|
||||||
*
|
|
||||||
* @param pSma
|
|
||||||
* @param pItem
|
|
||||||
* @return int32_t
|
|
||||||
*/
|
|
||||||
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *pItem) {
|
|
||||||
SRSmaInfo *pRSmaInfo = NULL;
|
|
||||||
void *qTaskInfo = NULL;
|
|
||||||
|
|
||||||
pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pItem->suid);
|
|
||||||
if (!pRSmaInfo) {
|
|
||||||
smaDebug("vgId:%d, no restore as no rsma info for table:%" PRIu64, SMA_VID(pSma), pItem->suid);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pItem->type == TSDB_RETENTION_L1) {
|
|
||||||
qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 0);
|
|
||||||
} else if (pItem->type == TSDB_RETENTION_L2) {
|
|
||||||
qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 1);
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!qTaskInfo) {
|
|
||||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
|
||||||
smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for table:%" PRIu64, SMA_VID(pSma), pItem->suid);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (qDeserializeTaskStatus(qTaskInfo, pItem->qTaskInfo, pItem->len) < 0) {
|
|
||||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
|
||||||
smaError("vgId:%d, restore rsma task failed for table:%" PRIi64 " level %d since %s", SMA_VID(pSma), pItem->suid,
|
|
||||||
pItem->type, terrstr());
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
smaDebug("vgId:%d, restore rsma task success for table:%" PRIi64 " level %d", SMA_VID(pSma), pItem->suid,
|
|
||||||
pItem->type);
|
|
||||||
|
|
||||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile) {
|
|
||||||
memset(pIter, 0, sizeof(*pIter));
|
|
||||||
pIter->pTFile = pTFile;
|
|
||||||
pIter->offset = TD_FILE_HEAD_SIZE;
|
|
||||||
|
|
||||||
if (tdGetTFileSize(pTFile, &pIter->fsize) < 0) {
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((pIter->fsize - TD_FILE_HEAD_SIZE) < RSMA_QTASKINFO_BUFSIZE) {
|
|
||||||
pIter->nAlloc = pIter->fsize - TD_FILE_HEAD_SIZE;
|
|
||||||
} else {
|
|
||||||
pIter->nAlloc = RSMA_QTASKINFO_BUFSIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pIter->nAlloc < TD_FILE_HEAD_SIZE) {
|
|
||||||
pIter->nAlloc = TD_FILE_HEAD_SIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
pIter->pBuf = taosMemoryMalloc(pIter->nAlloc);
|
|
||||||
if (!pIter->pBuf) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
pIter->qBuf = pIter->pBuf;
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish) {
|
|
||||||
STFile *pTFile = pIter->pTFile;
|
|
||||||
int64_t nBytes = RSMA_QTASKINFO_BUFSIZE;
|
|
||||||
|
|
||||||
if (pIter->offset >= pIter->fsize) {
|
|
||||||
*isFinish = true;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((pIter->fsize - pIter->offset) < RSMA_QTASKINFO_BUFSIZE) {
|
|
||||||
nBytes = pIter->fsize - pIter->offset;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) {
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t infoLen = 0;
|
|
||||||
taosDecodeFixedI32(pIter->pBuf, &infoLen);
|
|
||||||
if (infoLen > nBytes) {
|
|
||||||
if (infoLen <= RSMA_QTASKINFO_BUFSIZE) {
|
|
||||||
terrno = TSDB_CODE_RSMA_FILE_CORRUPTED;
|
|
||||||
smaError("iterate rsma qtaskinfo file %s failed since %s", TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
if (pIter->nAlloc < infoLen) {
|
|
||||||
pIter->nAlloc = infoLen;
|
|
||||||
void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen);
|
|
||||||
if (!pBuf) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
pIter->pBuf = pBuf;
|
|
||||||
}
|
|
||||||
|
|
||||||
nBytes = infoLen;
|
|
||||||
|
|
||||||
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) {
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pIter->qBuf = pIter->pBuf;
|
|
||||||
pIter->offset += nBytes;
|
|
||||||
pIter->nBytes = nBytes;
|
|
||||||
pIter->nBufPos = 0;
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter) {
|
|
||||||
while (1) {
|
|
||||||
// block iter
|
|
||||||
bool isFinish = false;
|
|
||||||
if (tdRSmaQTaskInfoIterNextBlock(pIter, &isFinish) < 0) {
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
if (isFinish) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// consume the block
|
|
||||||
int32_t qTaskInfoLenWithHead = 0;
|
|
||||||
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
|
|
||||||
if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) {
|
|
||||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
|
||||||
smaError("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s failed since %s", SMA_VID(pSma), type,
|
|
||||||
TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
if ((pIter->nBufPos + qTaskInfoLenWithHead) <= pIter->nBytes) {
|
|
||||||
SRSmaQTaskInfoItem infoItem = {0};
|
|
||||||
pIter->qBuf = taosDecodeFixedI8(pIter->qBuf, &infoItem.type);
|
|
||||||
pIter->qBuf = taosDecodeFixedI64(pIter->qBuf, &infoItem.suid);
|
|
||||||
infoItem.qTaskInfo = pIter->qBuf;
|
|
||||||
infoItem.len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead);
|
|
||||||
// do the restore job
|
|
||||||
smaDebug("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s offset:%" PRIi64 "\n", SMA_VID(pSma),
|
|
||||||
type, TD_TFILE_FULL_NAME(pIter->pTFile), pIter->offset - pIter->nBytes + pIter->nBufPos);
|
|
||||||
tdRSmaQTaskInfoItemRestore(pSma, &infoItem);
|
|
||||||
|
|
||||||
pIter->qBuf = POINTER_SHIFT(pIter->qBuf, infoItem.len);
|
|
||||||
pIter->nBufPos += qTaskInfoLenWithHead;
|
|
||||||
|
|
||||||
if ((pIter->nBufPos + RSMA_QTASKINFO_HEAD_LEN) >= pIter->nBytes) {
|
|
||||||
// prepare and load next block in the file
|
|
||||||
pIter->offset -= (pIter->nBytes - pIter->nBufPos);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// prepare and load next block in the file
|
|
||||||
pIter->offset -= (pIter->nBytes - pIter->nBufPos);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||||
SSma *pSma = pRSmaStat->pSma;
|
SSma *pSma = pRSmaStat->pSma;
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
|
@ -1523,148 +1253,6 @@ _err:
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
|
||||||
SSma *pSma = pRSmaStat->pSma;
|
|
||||||
SVnode *pVnode = pSma->pVnode;
|
|
||||||
int32_t vid = SMA_VID(pSma);
|
|
||||||
int64_t toffset = 0;
|
|
||||||
bool isFileCreated = false;
|
|
||||||
|
|
||||||
if (taosHashGetSize(pInfoHash) <= 0) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *infoHash = taosHashIterate(pInfoHash, NULL);
|
|
||||||
if (!infoHash) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat);
|
|
||||||
if (pRSmaStat->commitAppliedVer <= fsMaxVer) {
|
|
||||||
smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid,
|
|
||||||
pRSmaStat->commitAppliedVer, fsMaxVer);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
STFile tFile = {0};
|
|
||||||
#if 0
|
|
||||||
if (pRSmaStat->commitAppliedVer > 0) {
|
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
|
||||||
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
|
||||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
|
||||||
smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
|
|
||||||
smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile));
|
|
||||||
|
|
||||||
isFileCreated = true;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
while (infoHash) {
|
|
||||||
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
|
|
||||||
|
|
||||||
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
|
||||||
infoHash = taosHashIterate(pInfoHash, infoHash);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
|
||||||
#if 0
|
|
||||||
qTaskInfo_t taskInfo = RSMA_INFO_IQTASK(pRSmaInfo, i);
|
|
||||||
#endif
|
|
||||||
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i);
|
|
||||||
if (!taskInfo) {
|
|
||||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
char *pOutput = NULL;
|
|
||||||
int32_t len = 0;
|
|
||||||
int8_t type = (int8_t)(i + 1);
|
|
||||||
if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) {
|
|
||||||
smaError("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo failed since %s", vid, pRSmaInfo->suid,
|
|
||||||
i + 1, terrstr());
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (!pOutput || len <= 0) {
|
|
||||||
smaDebug("vgId:%d, rsma, table %" PRIi64
|
|
||||||
" level %d serialize qTaskInfo success but no output(len %d), not persist",
|
|
||||||
vid, pRSmaInfo->suid, i + 1, len);
|
|
||||||
taosMemoryFreeClear(pOutput);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo success with len %d, need persist", vid,
|
|
||||||
pRSmaInfo->suid, i + 1, len);
|
|
||||||
|
|
||||||
if (!isFileCreated) {
|
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
|
||||||
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
|
||||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
|
||||||
smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
|
|
||||||
smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid,
|
|
||||||
TD_TFILE_FULL_NAME(&tFile));
|
|
||||||
|
|
||||||
isFileCreated = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
char tmpBuf[RSMA_QTASKINFO_HEAD_LEN] = {0};
|
|
||||||
void *pTmpBuf = &tmpBuf;
|
|
||||||
int32_t headLen = 0;
|
|
||||||
headLen += taosEncodeFixedI32(&pTmpBuf, len + RSMA_QTASKINFO_HEAD_LEN);
|
|
||||||
headLen += taosEncodeFixedI8(&pTmpBuf, type);
|
|
||||||
headLen += taosEncodeFixedI64(&pTmpBuf, pRSmaInfo->suid);
|
|
||||||
|
|
||||||
ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN);
|
|
||||||
tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset);
|
|
||||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d head part(len:%d) appended to offset:%" PRIi64, vid,
|
|
||||||
pRSmaInfo->suid, i + 1, headLen, toffset);
|
|
||||||
tdAppendTFile(&tFile, pOutput, len, &toffset);
|
|
||||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid,
|
|
||||||
pRSmaInfo->suid, i + 1, len, toffset);
|
|
||||||
|
|
||||||
taosMemoryFree(pOutput);
|
|
||||||
}
|
|
||||||
|
|
||||||
infoHash = taosHashIterate(pInfoHash, infoHash);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isFileCreated) {
|
|
||||||
if (tdUpdateTFileHeader(&tFile) < 0) {
|
|
||||||
smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
|
|
||||||
tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
} else {
|
|
||||||
smaDebug("vgId:%d, rsma, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile));
|
|
||||||
}
|
|
||||||
|
|
||||||
tdCloseTFile(&tFile);
|
|
||||||
tdDestroyTFile(&tFile);
|
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
_err:
|
|
||||||
smaError("vgId:%d, rsma persist failed since %s", vid, terrstr());
|
|
||||||
if (isFileCreated) {
|
|
||||||
tdRemoveTFile(&tFile);
|
|
||||||
tdDestroyTFile(&tFile);
|
|
||||||
}
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief trigger to get rsma result in async mode
|
* @brief trigger to get rsma result in async mode
|
||||||
*
|
*
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#include "sma.h"
|
#include "sma.h"
|
||||||
|
|
||||||
// smaFileUtil ================
|
// smaFileUtil ================
|
||||||
|
#if 0
|
||||||
#define TD_FILE_STATE_OK 0
|
#define TD_FILE_STATE_OK 0
|
||||||
#define TD_FILE_STATE_BAD 1
|
#define TD_FILE_STATE_BAD 1
|
||||||
|
|
||||||
|
@ -182,6 +182,8 @@ void tdCloseTFile(STFile *pTFile) {
|
||||||
|
|
||||||
void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); }
|
void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); }
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
|
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
|
||||||
char *outputName) {
|
char *outputName) {
|
||||||
if (version < 0) {
|
if (version < 0) {
|
||||||
|
@ -221,6 +223,7 @@ void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) {
|
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) {
|
||||||
TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
|
TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
|
||||||
TD_TFILE_SET_CLOSED(pTFile);
|
TD_TFILE_SET_CLOSED(pTFile);
|
||||||
|
@ -286,6 +289,8 @@ int32_t tdRemoveTFile(STFile *pTFile) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
// smaXXXUtil ================
|
// smaXXXUtil ================
|
||||||
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) {
|
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) {
|
||||||
void *pResult = taosAcquireRef(rsetId, refId);
|
void *pResult = taosAcquireRef(rsetId, refId);
|
||||||
|
|
|
@ -1725,6 +1725,9 @@ static void destroyStateWindowOperatorInfo(void* param) {
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
taosMemoryFreeClear(pInfo->stateKey.pData);
|
taosMemoryFreeClear(pInfo->stateKey.pData);
|
||||||
cleanupExprSupp(&pInfo->scalarSup);
|
cleanupExprSupp(&pInfo->scalarSup);
|
||||||
|
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||||
|
cleanupAggSup(&pInfo->aggSup);
|
||||||
|
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
|
@ -629,6 +629,8 @@ _return:
|
||||||
|
|
||||||
sclFreeParam(pWhen);
|
sclFreeParam(pWhen);
|
||||||
sclFreeParam(pThen);
|
sclFreeParam(pThen);
|
||||||
|
taosMemoryFree(pWhen);
|
||||||
|
taosMemoryFree(pThen);
|
||||||
|
|
||||||
SCL_RET(code);
|
SCL_RET(code);
|
||||||
}
|
}
|
||||||
|
@ -664,6 +666,8 @@ int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCe
|
||||||
|
|
||||||
sclFreeParam(pWhen);
|
sclFreeParam(pWhen);
|
||||||
sclFreeParam(pThen);
|
sclFreeParam(pThen);
|
||||||
|
taosMemoryFreeClear(pWhen);
|
||||||
|
taosMemoryFreeClear(pThen);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pElse) {
|
if (pElse) {
|
||||||
|
@ -688,6 +692,8 @@ _return:
|
||||||
|
|
||||||
sclFreeParam(pWhen);
|
sclFreeParam(pWhen);
|
||||||
sclFreeParam(pThen);
|
sclFreeParam(pThen);
|
||||||
|
taosMemoryFree(pWhen);
|
||||||
|
taosMemoryFree(pThen);
|
||||||
|
|
||||||
SCL_RET(code);
|
SCL_RET(code);
|
||||||
}
|
}
|
||||||
|
@ -929,6 +935,10 @@ int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *outp
|
||||||
sclFreeParam(&comp);
|
sclFreeParam(&comp);
|
||||||
sclFreeParam(pWhen);
|
sclFreeParam(pWhen);
|
||||||
sclFreeParam(pThen);
|
sclFreeParam(pThen);
|
||||||
|
taosMemoryFree(pCase);
|
||||||
|
taosMemoryFree(pElse);
|
||||||
|
taosMemoryFree(pWhen);
|
||||||
|
taosMemoryFree(pThen);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -940,6 +950,10 @@ _return:
|
||||||
sclFreeParam(pWhen);
|
sclFreeParam(pWhen);
|
||||||
sclFreeParam(pThen);
|
sclFreeParam(pThen);
|
||||||
sclFreeParam(output);
|
sclFreeParam(output);
|
||||||
|
taosMemoryFree(pCase);
|
||||||
|
taosMemoryFree(pElse);
|
||||||
|
taosMemoryFree(pWhen);
|
||||||
|
taosMemoryFree(pThen);
|
||||||
|
|
||||||
SCL_RET(code);
|
SCL_RET(code);
|
||||||
}
|
}
|
||||||
|
|
|
@ -654,7 +654,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut,
|
||||||
return TSDB_CODE_APP_ERROR;
|
return TSDB_CODE_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rstart = startIndex >= 0 ? startIndex : 0;
|
int32_t rstart = (startIndex >= 0 && startIndex < pIn->numOfRows) ? startIndex : 0;
|
||||||
int32_t rend = numOfRows > 0 ? rstart + numOfRows - 1 : rstart + pIn->numOfRows - 1;
|
int32_t rend = numOfRows > 0 ? rstart + numOfRows - 1 : rstart + pIn->numOfRows - 1;
|
||||||
SSclVectorConvCtx cCtx = {pIn, pOut, rstart, rend, pInputCol->info.type, pOutputCol->info.type};
|
SSclVectorConvCtx cCtx = {pIn, pOut, rstart, rend, pInputCol->info.type, pOutputCol->info.type};
|
||||||
|
|
||||||
|
|
|
@ -123,8 +123,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
|
||||||
}
|
}
|
||||||
SWalCkHead* logContent = (SWalCkHead*)candidate;
|
SWalCkHead* logContent = (SWalCkHead*)candidate;
|
||||||
if (walValidHeadCksum(logContent) != 0) {
|
if (walValidHeadCksum(logContent) != 0) {
|
||||||
wError("vgId:%d, failed to validate checksum of wal entry header. offset:% %" PRId64 ", file:%s",
|
wWarn("vgId:%d, failed to validate checksum of wal entry header. offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
|
||||||
((char*)(logContent)-buf), fnameStr);
|
offset + ((char*)(logContent)-buf), fnameStr);
|
||||||
haystack = candidate + 1;
|
haystack = candidate + 1;
|
||||||
if (firstTrial) {
|
if (firstTrial) {
|
||||||
break;
|
break;
|
||||||
|
@ -162,8 +162,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
|
||||||
}
|
}
|
||||||
if (walValidBodyCksum(logContent) != 0) {
|
if (walValidBodyCksum(logContent) != 0) {
|
||||||
terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH;
|
terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH;
|
||||||
wError("vgId:%d, failed to validate checksum of wal entry body. offset:% %" PRId64 ", file:%s",
|
wWarn("vgId:%d, failed to validate checksum of wal entry body. offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
|
||||||
((char*)(logContent)-buf), fnameStr);
|
offset + ((char*)(logContent)-buf), fnameStr);
|
||||||
haystack = candidate + 1;
|
haystack = candidate + 1;
|
||||||
if (firstTrial) {
|
if (firstTrial) {
|
||||||
break;
|
break;
|
||||||
|
@ -481,6 +481,10 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (offset != (idxEntry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (walReadLogHead(pLogFile, idxEntry.offset, &ckHead) < 0) {
|
if (walReadLogHead(pLogFile, idxEntry.offset, &ckHead) < 0) {
|
||||||
wWarn("vgId:%d, failed to read log file since %s. file:%s, offset:%" PRId64 ", idx entry ver:%" PRId64 "",
|
wWarn("vgId:%d, failed to read log file since %s. file:%s, offset:%" PRId64 ", idx entry ver:%" PRId64 "",
|
||||||
pWal->cfg.vgId, terrstr(), fLogNameStr, idxEntry.offset, idxEntry.ver);
|
pWal->cfg.vgId, terrstr(), fLogNameStr, idxEntry.offset, idxEntry.ver);
|
||||||
|
@ -493,6 +497,8 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
||||||
}
|
}
|
||||||
offset += sizeof(SWalIdxEntry);
|
offset += sizeof(SWalIdxEntry);
|
||||||
|
|
||||||
|
ASSERT(offset == (idxEntry.ver - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry));
|
||||||
|
|
||||||
// ftruncate idx file
|
// ftruncate idx file
|
||||||
if (offset < fileSize) {
|
if (offset < fileSize) {
|
||||||
if (taosFtruncateFile(pIdxFile, offset) < 0) {
|
if (taosFtruncateFile(pIdxFile, offset) < 0) {
|
||||||
|
|
|
@ -410,25 +410,35 @@ END:
|
||||||
|
|
||||||
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||||
SWalIdxEntry entry = {.ver = ver, .offset = offset};
|
SWalIdxEntry entry = {.ver = ver, .offset = offset};
|
||||||
int64_t idxOffset = taosLSeekFile(pWal->pIdxFile, 0, SEEK_END);
|
SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
|
||||||
|
ASSERT(pFileInfo != NULL);
|
||||||
|
ASSERT(pFileInfo->firstVer >= 0);
|
||||||
|
int64_t idxOffset = (entry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry);
|
||||||
wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset,
|
wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset,
|
||||||
idxOffset);
|
idxOffset);
|
||||||
|
|
||||||
int64_t size = taosWriteFile(pWal->pIdxFile, &entry, sizeof(SWalIdxEntry));
|
int64_t size = taosWriteFile(pWal->pIdxFile, &entry, sizeof(SWalIdxEntry));
|
||||||
if (size != sizeof(SWalIdxEntry)) {
|
if (size != sizeof(SWalIdxEntry)) {
|
||||||
|
wError("vgId:%d, failed to write idx entry due to %s. ver:%lld", pWal->cfg.vgId, strerror(errno), ver);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
// TODO truncate
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT(taosLSeekFile(pWal->pIdxFile, 0, SEEK_END) == idxOffset + sizeof(SWalIdxEntry) && "Offset of idx entries misaligned");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO gurantee atomicity by truncate failed writing
|
|
||||||
static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta,
|
static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta,
|
||||||
const void *body, int32_t bodyLen) {
|
const void *body, int32_t bodyLen) {
|
||||||
int64_t code = 0;
|
int64_t code = 0;
|
||||||
|
|
||||||
int64_t offset = walGetCurFileOffset(pWal);
|
int64_t offset = walGetCurFileOffset(pWal);
|
||||||
|
SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
|
||||||
|
ASSERT(pFileInfo != NULL);
|
||||||
|
|
||||||
|
if (pFileInfo->firstVer == -1) {
|
||||||
|
pFileInfo->firstVer = index;
|
||||||
|
}
|
||||||
pWal->writeHead.head.version = index;
|
pWal->writeHead.head.version = index;
|
||||||
pWal->writeHead.head.bodyLen = bodyLen;
|
pWal->writeHead.head.bodyLen = bodyLen;
|
||||||
pWal->writeHead.head.msgType = msgType;
|
pWal->writeHead.head.msgType = msgType;
|
||||||
|
@ -439,11 +449,14 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
||||||
|
|
||||||
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
|
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
|
||||||
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
|
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
|
||||||
|
|
||||||
wDebug("vgId:%d, wal write log %ld, msgType: %s", pWal->cfg.vgId, index, TMSG_INFO(msgType));
|
wDebug("vgId:%d, wal write log %ld, msgType: %s", pWal->cfg.vgId, index, TMSG_INFO(msgType));
|
||||||
|
|
||||||
|
code = walWriteIndex(pWal, index, offset);
|
||||||
|
if (code < 0) {
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
|
||||||
if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
|
if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
|
||||||
// TODO ftruncate
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
|
@ -452,7 +465,6 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosWriteFile(pWal->pLogFile, (char *)body, bodyLen) != bodyLen) {
|
if (taosWriteFile(pWal->pLogFile, (char *)body, bodyLen) != bodyLen) {
|
||||||
// TODO ftruncate
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
|
@ -460,24 +472,31 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = walWriteIndex(pWal, index, offset);
|
|
||||||
if (code < 0) {
|
|
||||||
// TODO ftruncate
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
|
|
||||||
// set status
|
// set status
|
||||||
if (pWal->vers.firstVer == -1) pWal->vers.firstVer = index;
|
if (pWal->vers.firstVer == -1) pWal->vers.firstVer = index;
|
||||||
pWal->vers.lastVer = index;
|
pWal->vers.lastVer = index;
|
||||||
pWal->totSize += sizeof(SWalCkHead) + bodyLen;
|
pWal->totSize += sizeof(SWalCkHead) + bodyLen;
|
||||||
if (walGetCurFileInfo(pWal)->firstVer == -1) {
|
pFileInfo->lastVer = index;
|
||||||
walGetCurFileInfo(pWal)->firstVer = index;
|
pFileInfo->fileSize += sizeof(SWalCkHead) + bodyLen;
|
||||||
}
|
|
||||||
walGetCurFileInfo(pWal)->lastVer = index;
|
|
||||||
walGetCurFileInfo(pWal)->fileSize += sizeof(SWalCkHead) + bodyLen;
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
END:
|
END:
|
||||||
|
// recover in a reverse order
|
||||||
|
if (taosFtruncateFile(pWal->pLogFile, offset) < 0) {
|
||||||
|
wFatal("vgId:%d, failed to ftruncate logfile to offset:%lld during recovery due to %s", pWal->cfg.vgId, offset,
|
||||||
|
strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
ASSERT(0 && "failed to recover from error");
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t idxOffset = (index - pFileInfo->firstVer) * sizeof(SWalIdxEntry);
|
||||||
|
if (taosFtruncateFile(pWal->pIdxFile, idxOffset) < 0) {
|
||||||
|
wFatal("vgId:%d, failed to ftruncate idxfile to offset:%lld during recovery due to %s", pWal->cfg.vgId, idxOffset,
|
||||||
|
strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
ASSERT(0 && "failed to recover from error");
|
||||||
|
}
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -443,10 +443,13 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) {
|
||||||
static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *buffer, int32_t len) {
|
static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *buffer, int32_t len) {
|
||||||
if ((dflag & DEBUG_FILE) && tsLogObj.logHandle && tsLogObj.logHandle->pFile != NULL && osLogSpaceAvailable()) {
|
if ((dflag & DEBUG_FILE) && tsLogObj.logHandle && tsLogObj.logHandle->pFile != NULL && osLogSpaceAvailable()) {
|
||||||
taosUpdateLogNums(level);
|
taosUpdateLogNums(level);
|
||||||
if (tsAsyncLog) {
|
if (tsAsyncLog && level != DEBUG_FATAL) {
|
||||||
taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
|
taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
|
||||||
} else {
|
} else {
|
||||||
taosWriteFile(tsLogObj.logHandle->pFile, buffer, len);
|
taosWriteFile(tsLogObj.logHandle->pFile, buffer, len);
|
||||||
|
if (level == DEBUG_FATAL) {
|
||||||
|
taosFsyncFile(tsLogObj.logHandle->pFile);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsLogObj.maxLines > 0) {
|
if (tsLogObj.maxLines > 0) {
|
||||||
|
|
|
@ -63,6 +63,7 @@ int smlProcess_influx_Test() {
|
||||||
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
int code = taos_errno(pRes);
|
int code = taos_errno(pRes);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -86,6 +87,8 @@ int smlProcess_telnet_Test() {
|
||||||
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
int code = taos_errno(pRes);
|
int code = taos_errno(pRes);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,6 +128,8 @@ int smlProcess_json1_Test() {
|
||||||
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
int code = taos_errno(pRes);
|
int code = taos_errno(pRes);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,6 +170,8 @@ int smlProcess_json2_Test() {
|
||||||
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
int code = taos_errno(pRes);
|
int code = taos_errno(pRes);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,6 +240,8 @@ int smlProcess_json3_Test() {
|
||||||
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
int code = taos_errno(pRes);
|
int code = taos_errno(pRes);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -292,6 +301,8 @@ int smlProcess_json4_Test() {
|
||||||
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
int code = taos_errno(pRes);
|
int code = taos_errno(pRes);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -313,6 +324,8 @@ int sml_TD15662_Test() {
|
||||||
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
int code = taos_errno(pRes);
|
int code = taos_errno(pRes);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -333,6 +346,8 @@ int sml_TD15742_Test() {
|
||||||
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
int code = taos_errno(pRes);
|
int code = taos_errno(pRes);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -362,6 +377,8 @@ int sml_16384_Test() {
|
||||||
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
code = taos_errno(pRes);
|
code = taos_errno(pRes);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -781,6 +798,8 @@ int sml_oom_Test() {
|
||||||
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
int code = taos_errno(pRes);
|
int code = taos_errno(pRes);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -825,6 +844,8 @@ int sml_16368_Test() {
|
||||||
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
int code = taos_errno(pRes);
|
int code = taos_errno(pRes);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -862,6 +883,8 @@ int sml_dup_time_Test() {
|
||||||
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
int code = taos_errno(pRes);
|
int code = taos_errno(pRes);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1068,6 +1091,8 @@ int sml_16960_Test() {
|
||||||
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
int code = taos_errno(pRes);
|
int code = taos_errno(pRes);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1097,6 +1122,7 @@ int sml_add_tag_col_Test() {
|
||||||
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
code = taos_errno(pRes);
|
code = taos_errno(pRes);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1151,6 +1177,36 @@ int smlProcess_18784_Test() {
|
||||||
rowIndex++;
|
rowIndex++;
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int sml_19221_Test() {
|
||||||
|
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
|
||||||
|
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
const char *sql[] = {
|
||||||
|
"qelhxo,id=pnnqhsa,t0=t,t1=127i8 c11=L\"ncharColValue\",c0=t,c1=127i8 1626006833639000000\nqelhxo,id=pnnhsa,t0=t,t1=127i8 c11=L\"ncharColValue\",c0=t,c1=127i8 1626006833639000000\n#comment\nqelhxo,id=pnqhsa,t0=t,t1=127i8 c11=L\"ncharColValue\",c0=t,c1=127i8 1626006833639000000",
|
||||||
|
};
|
||||||
|
|
||||||
|
pRes = taos_query(taos, "use sml_db");
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
char* tmp = (char*)taosMemoryCalloc(1024, 1);
|
||||||
|
memcpy(tmp, sql[0], strlen(sql[0]));
|
||||||
|
*(char*)(tmp+44) = 0;
|
||||||
|
int32_t totalRows = 0;
|
||||||
|
pRes = taos_schemaless_insert_raw(taos, tmp, strlen(sql[0]), &totalRows, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
|
||||||
|
|
||||||
|
ASSERT(totalRows == 3);
|
||||||
|
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
|
int code = taos_errno(pRes);
|
||||||
|
taos_free_result(pRes);
|
||||||
|
taos_close(taos);
|
||||||
|
taosMemoryFree(tmp);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1187,5 +1243,7 @@ int main(int argc, char *argv[]) {
|
||||||
ASSERT(!ret);
|
ASSERT(!ret);
|
||||||
ret = smlProcess_18784_Test();
|
ret = smlProcess_18784_Test();
|
||||||
ASSERT(!ret);
|
ASSERT(!ret);
|
||||||
|
ret = sml_19221_Test();
|
||||||
|
ASSERT(!ret);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue