diff --git a/docs/en/14-reference/03-connector/05-go.mdx b/docs/en/14-reference/03-connector/05-go.mdx index f00e635af9..a33b302a92 100644 --- a/docs/en/14-reference/03-connector/05-go.mdx +++ b/docs/en/14-reference/03-connector/05-go.mdx @@ -7,7 +7,6 @@ title: TDengine Go Connector import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; -import Preparition from "./_preparation.mdx" import GoInsert from "../../07-develop/03-insert-data/_go_sql.mdx" import GoInfluxLine from "../../07-develop/03-insert-data/_go_line.mdx" import GoOpenTSDBTelnet from "../../07-develop/03-insert-data/_go_opts_telnet.mdx" @@ -176,6 +175,37 @@ func main() { } ``` + + +_taosRestful_ implements Go's `database/sql/driver` interface via `http client`. You can use the [`database/sql`](https://golang.org/pkg/database/sql/) interface by simply introducing the driver (driver-go minimum version 3.0.2). + +Use `taosWS` as `driverName` and use a correct [DSN](#DSN) as `dataSourceName` with the following parameters supported by the DSN. + +* `writeTimeout` The timeout to send data via WebSocket. +* `readTimeout` The timeout to receive response data via WebSocket. + +For example: + +```go +package main + +import ( + "database/sql" + "fmt" + + _ "github.com/taosdata/driver-go/v3/taosWS" +) + +func main() { + var taosUri = "root:taosdata@ws(localhost:6041)/" + taos, err := sql.Open("taosWS", taosUri) + if err != nil { + fmt.Println("failed to connect TDengine, err:", err) + return + } +} +``` + ## Usage examples @@ -331,7 +361,7 @@ Creates consumer group. * `func (c *Consumer) Subscribe(topics []string) error` -Subscribes to a topic. +Subscribes to topics. * `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)` @@ -409,6 +439,30 @@ Close consumer. Closes the parameter binding. +### Subscribe via WebSocket + +* `func NewConsumer(config *Config) (*Consumer, error)` + + Creates consumer group. + +* `func (c *Consumer) Subscribe(topic []string) error` + + Subscribes to topics. + +* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)` + + Polling information. + +* `func (c *Consumer) Commit(messageID uint64) error` + + Commit information. + +* `func (c *Consumer) Close() error` + + Close consumer. + +For a complete example see [GitHub sample file](https://github.com/taosdata/driver-go/blob/3.0/examples/tmqoverws/main.go) + ## API Reference Full API see [driver-go documentation](https://pkg.go.dev/github.com/taosdata/driver-go/v3) diff --git a/docs/zh/08-connector/20-go.mdx b/docs/zh/08-connector/20-go.mdx index 515d1b030b..7a6058db3c 100644 --- a/docs/zh/08-connector/20-go.mdx +++ b/docs/zh/08-connector/20-go.mdx @@ -177,6 +177,37 @@ func main() { } ``` + + +_taosWS_ 通过 `WebSocket` 实现了 Go 的 `database/sql/driver` 接口。只需要引入驱动(driver-go 最低版本 3.0.2)就可以使用[`database/sql`](https://golang.org/pkg/database/sql/)的接口。 + +使用 `taosWS` 作为 `driverName` 并且使用一个正确的 [DSN](#DSN) 作为 `dataSourceName`,DSN 支持的参数: + +* `writeTimeout` 通过 WebSocket 发送数据的超时时间。 +* `readTimeout` 通过 WebSocket 接收响应数据的超时时间。 + +示例: + +```go +package main + +import ( + "database/sql" + "fmt" + + _ "github.com/taosdata/driver-go/v3/taosWS" +) + +func main() { + var taosUri = "root:taosdata@ws(localhost:6041)/" + taos, err := sql.Open("taosWS", taosUri) + if err != nil { + fmt.Println("failed to connect TDengine, err:", err) + return + } +} +``` + ## 使用示例 @@ -410,6 +441,30 @@ func main() { 结束参数绑定。 +### 通过 WebSocket 订阅 + +* `func NewConsumer(config *Config) (*Consumer, error)` + + 创建消费者。 + +* `func (c *Consumer) Subscribe(topic []string) error` + + 订阅主题。 + +* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)` + + 轮询消息。 + +* `func (c *Consumer) Commit(messageID uint64) error` + + 提交消息。 + +* `func (c *Consumer) Close() error` + + 关闭消费者。 + +完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/driver-go/blob/3.0/examples/tmqoverws/main.go) + ## API 参考 全部 API 见 [driver-go 文档](https://pkg.go.dev/github.com/taosdata/driver-go/v3) diff --git a/include/client/taos.h b/include/client/taos.h index 270b647a77..1182a9c2f7 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -198,6 +198,7 @@ DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList); DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision); +DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision); /* --------------------------TMQ INTERFACE------------------------------- */ diff --git a/include/util/tdef.h b/include/util/tdef.h index d9bb558b74..2103dc928e 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -498,6 +498,7 @@ enum { #define MAX_NUM_STR_SIZE 40 #define MAX_META_MSG_IN_BATCH 1048576 +#define MAX_META_BATCH_RSP_SIZE (1 * 1048576 * 1024) #ifdef __cplusplus } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index df717bda23..45d2de4a7a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -199,7 +199,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) { if (TSDB_CODE_SUCCESS != nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) { - tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self, + tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s", (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql); destroyRequest(*pRequest); @@ -955,7 +955,12 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue switch (pQuery->execMode) { case QUERY_EXEC_MODE_LOCAL: if (!pRequest->validateOnly) { - code = execLocalCmd(pRequest, pQuery); + if (NULL == pQuery->pRoot) { + terrno = TSDB_CODE_INVALID_PARA; + code = terrno; + } else { + code = execLocalCmd(pRequest, pQuery); + } } break; case QUERY_EXEC_MODE_RPC: @@ -997,7 +1002,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue handleQueryExecRsp(pRequest); - if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { + if (TSDB_CODE_SUCCESS != code) { pRequest->code = terrno; } @@ -2254,7 +2259,10 @@ void syncQueryFn(void* param, void* res, int32_t code) { void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) { if (sql == NULL || NULL == fp) { terrno = TSDB_CODE_INVALID_PARA; - fp(param, NULL, terrno); + if (fp) { + fp(param, NULL, terrno); + } + return; } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 6126817ece..286fb85373 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -944,7 +944,6 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { if (pResultInfo->completed) { // it is a local executed query, no need to do async fetch if (QUERY_EXEC_MODE_LOCAL == pRequest->body.execMode) { - ASSERT(pResultInfo->numOfRows >= 0); if (pResultInfo->localResultFetched) { pResultInfo->numOfRows = 0; pResultInfo->current = 0; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 8680f93f8c..945562ef36 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -292,8 +292,10 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) { tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp); struct SCatalog* pCatalog = NULL; - catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); - catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid); + int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if (TSDB_CODE_SUCCESS == code) { + catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid); + } } taosMemoryFree(pMsg->pData); @@ -397,6 +399,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) { size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); *pRsp = taosMemoryCalloc(1, rspSize); if (NULL == *pRsp) { + blockDataDestroy(pBlock); return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 5c37822222..f2739c9ab8 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -28,8 +28,8 @@ #define QUOTE '"' #define SLASH '\\' -#define JUMP_SPACE(sql) \ - while (*sql != '\0') { \ +#define JUMP_SPACE(sql, sqlEnd) \ + while (sql < sqlEnd) { \ if (*sql == SPACE) \ sql++; \ else \ @@ -917,16 +917,17 @@ static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) { return TSDB_CODE_TSC_INVALID_VALUE; } -static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSmlMsgBuf *msg) { +static int32_t smlParseInfluxString(const char *sql, const char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) { if (!sql) return TSDB_CODE_SML_INVALID_DATA; - JUMP_SPACE(sql) + JUMP_SPACE(sql, sqlEnd) if (*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA; elements->measure = sql; // parse measure - while (*sql != '\0') { + while (sql < sqlEnd) { if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) { - MOVE_FORWARD_ONE(sql, strlen(sql) + 1); + MOVE_FORWARD_ONE(sql, sqlEnd - sql); + sqlEnd--; continue; } if (IS_COMMA(sql)) { @@ -950,7 +951,7 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm } else { if (*sql == COMMA) sql++; elements->tags = sql; - while (*sql != '\0') { + while (sql < sqlEnd) { if (IS_SPACE(sql)) { break; } @@ -961,10 +962,10 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm elements->measureTagsLen = sql - elements->measure; // parse cols - JUMP_SPACE(sql) + JUMP_SPACE(sql, sqlEnd) elements->cols = sql; bool isInQuote = false; - while (*sql != '\0') { + while (sql < sqlEnd) { if (IS_QUOTE(sql)) { isInQuote = !isInQuote; } @@ -984,10 +985,10 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm } // parse timestamp - JUMP_SPACE(sql) + JUMP_SPACE(sql, sqlEnd) elements->timestamp = sql; - while (*sql != '\0') { - if (*sql == SPACE) { + while (sql < sqlEnd) { + if (isspace(*sql)) { break; } sql++; @@ -997,8 +998,8 @@ static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSm return TSDB_CODE_SUCCESS; } -static void smlParseTelnetElement(const char **sql, const char **data, int32_t *len) { - while (**sql != '\0') { +static void smlParseTelnetElement(const char **sql, const char *sqlEnd, const char **data, int32_t *len) { + while (*sql < sqlEnd) { if (**sql != SPACE && !(*data)) { *data = *sql; } else if (**sql == SPACE && *data) { @@ -1009,20 +1010,20 @@ static void smlParseTelnetElement(const char **sql, const char **data, int32_t * } } -static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTableName, SHashObj *dumplicateKey, +static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *cols, char *childTableName, SHashObj *dumplicateKey, SSmlMsgBuf *msg) { if(!cols) return TSDB_CODE_OUT_OF_MEMORY; const char *sql = data; size_t childTableNameLen = strlen(tsSmlChildTableName); - while (*sql != '\0') { - JUMP_SPACE(sql) + while (sql < sqlEnd) { + JUMP_SPACE(sql, sqlEnd) if (*sql == '\0') break; const char *key = sql; int32_t keyLen = 0; // parse key - while (*sql != '\0') { + while (sql < sqlEnd) { if (*sql == SPACE) { smlBuildInvalidDataMsg(msg, "invalid data", sql); return TSDB_CODE_SML_INVALID_DATA; @@ -1047,7 +1048,7 @@ static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTab // parse value const char *value = sql; int32_t valueLen = 0; - while (*sql != '\0') { + while (sql < sqlEnd) { // parse value if (*sql == SPACE) { break; @@ -1092,11 +1093,11 @@ static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTab } // format: =[ =] -static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTableInfo *tinfo, SArray *cols) { +static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlTableInfo *tinfo, SArray *cols) { if (!sql) return TSDB_CODE_SML_INVALID_DATA; // parse metric - smlParseTelnetElement(&sql, &tinfo->sTableName, &tinfo->sTableNameLen); + smlParseTelnetElement(&sql, sqlEnd, &tinfo->sTableName, &tinfo->sTableNameLen); if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql); return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; @@ -1105,7 +1106,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTable // parse timestamp const char *timestamp = NULL; int32_t tLen = 0; - smlParseTelnetElement(&sql, ×tamp, &tLen); + smlParseTelnetElement(&sql, sqlEnd, ×tamp, &tLen); if (!timestamp || tLen == 0) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql); return TSDB_CODE_SML_INVALID_DATA; @@ -1120,7 +1121,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTable // parse value const char *value = NULL; int32_t valueLen = 0; - smlParseTelnetElement(&sql, &value, &valueLen); + smlParseTelnetElement(&sql, sqlEnd, &value, &valueLen); if (!value || valueLen == 0) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql); return TSDB_CODE_TSC_INVALID_VALUE; @@ -1138,7 +1139,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTable } // parse tags - ret = smlParseTelnetTags(sql, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf); + ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf); if (ret != TSDB_CODE_SUCCESS) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql); return ret; @@ -2073,11 +2074,11 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * } /************* TSDB_SML_JSON_PROTOCOL function end **************/ -static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) { +static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) { SSmlLineInfo elements = {0}; uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql); - int ret = smlParseInfluxString(sql, &elements, &info->msgBuf); + int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf); if (ret != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id); return ret; @@ -2184,7 +2185,7 @@ static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) { return TSDB_CODE_SUCCESS; } -static int32_t smlParseTelnetLine(SSmlHandle *info, void *data) { +static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) { int ret = TSDB_CODE_SUCCESS; SSmlTableInfo *tinfo = smlBuildTableInfo(); if (!tinfo) { @@ -2198,7 +2199,7 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data) { } if (info->protocol == TSDB_SML_TELNET_PROTOCOL) { - ret = smlParseTelnetString(info, (const char *)data, tinfo, cols); + ret = smlParseTelnetString(info, (const char *)data, (char*)data + len, tinfo, cols); } else if (info->protocol == TSDB_SML_JSON_PROTOCOL) { ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols); } else { @@ -2289,7 +2290,7 @@ static int32_t smlParseJSON(SSmlHandle *info, char *payload) { for (int32_t i = 0; i < payloadNum; ++i) { cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(root)) ? root : cJSON_GetArrayItem(root, i); - ret = smlParseTelnetLine(info, dataPoint); + ret = smlParseTelnetLine(info, dataPoint, -1); if (ret != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id); goto end; @@ -2378,10 +2379,14 @@ static void smlPrintStatisticInfo(SSmlHandle *info) { info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime); } -static int32_t smlParseLine(SSmlHandle *info, char *lines[], int numLines) { +static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char* rawLineEnd, int numLines) { int32_t code = TSDB_CODE_SUCCESS; if (info->protocol == TSDB_SML_JSON_PROTOCOL) { - code = smlParseJSON(info, *lines); + if(lines){ + code = smlParseJSON(info, *lines); + }else if(rawLine){ + code = smlParseJSON(info, rawLine); + } if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, *lines); return code; @@ -2390,28 +2395,46 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], int numLines) { } for (int32_t i = 0; i < numLines; ++i) { + char *tmp = NULL; + int len = 0; + if(lines){ + tmp = lines[i]; + len = strlen(tmp); + }else if(rawLine){ + tmp = rawLine; + while(rawLine < rawLineEnd){ + if(*(rawLine++) == '\n'){ + break; + } + len++; + } + if(info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#'){ // this line is comment + continue; + } + } + if (info->protocol == TSDB_SML_LINE_PROTOCOL) { - code = smlParseInfluxLine(info, lines[i]); + code = smlParseInfluxLine(info, tmp, len); } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) { - code = smlParseTelnetLine(info, lines[i]); + code = smlParseTelnetLine(info, tmp, len); } else { ASSERT(0); } if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, lines[i]); + uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp); return code; } } return code; } -static int smlProcess(SSmlHandle *info, char *lines[], int numLines) { +static int smlProcess(SSmlHandle *info, char *lines[], char* rawLine, char* rawLineEnd, int numLines) { int32_t code = TSDB_CODE_SUCCESS; int32_t retryNum = 0; info->cost.parseTime = taosGetTimestampUs(); - code = smlParseLine(info, lines, numLines); + code = smlParseLine(info, lines, rawLine, rawLineEnd, numLines); if (code != 0) { uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code)); return code; @@ -2504,39 +2527,8 @@ static void smlInsertCallback(void *param, void *res, int32_t code) { smlDestroyInfo(info); } -/** - * taos_schemaless_insert() parse and insert data points into database according to - * different protocol. - * - * @param $lines input array may contain multiple lines, each line indicates a data point. - * If protocol=2 is used input array should contain single JSON - * string(e.g. char *lines[] = {"$JSON_string"}). If need to insert - * multiple data points in JSON format, should include them in $JSON_string - * as a JSON array. - * @param $numLines indicates how many data points in $lines. - * If protocol = 2 is used this param will be ignored as $lines should - * contain single JSON string. - * @param $protocol indicates which protocol to use for parsing: - * 0 - influxDB line protocol - * 1 - OpenTSDB telnet line protocol - * 2 - OpenTSDB JSON format protocol - * @return return zero for successful insertion. Otherwise return none-zero error code of - * failure reason. - * - */ - -TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) { - if (NULL == taos) { - terrno = TSDB_CODE_TSC_DISCONNECTED; - return NULL; - } - - SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT); - if (!request) { - uError("SML:taos_schemaless_insert error request is null"); - return NULL; - } +TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd, int numLines, int protocol, int precision) { int batchs = 0; STscObj *pTscObj = request->pTscObj; @@ -2560,12 +2552,6 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr goto end; } - if (!lines) { - request->code = TSDB_CODE_SML_INVALID_DATA; - smlBuildInvalidDataMsg(&msg, "lines is null", NULL); - goto end; - } - if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) { request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE; smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL); @@ -2616,15 +2602,28 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr info->affectedRows = perBatch; info->pRequest->body.queryFp = smlInsertCallback; info->pRequest->body.param = info; - int32_t code = smlProcess(info, lines, perBatch); - lines += perBatch; + int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch); + if(lines){ + lines += perBatch; + } + if(rawLine){ + int num = 0; + while(rawLine < rawLineEnd){ + if(*(rawLine++) == '\n'){ + num++; + } + if(num == perBatch){ + break; + } + } + } if (code != TSDB_CODE_SUCCESS) { info->pRequest->body.queryFp(info, req, code); } } tsem_wait(¶ms.sem); -end: + end: taosThreadSpinDestroy(¶ms.lock); tsem_destroy(¶ms.sem); // ((STscObj *)taos)->schemalessType = 0; @@ -2632,3 +2631,80 @@ end: uDebug("resultend:%s", request->msgBuf); return (TAOS_RES *)request; } + +/** + * taos_schemaless_insert() parse and insert data points into database according to + * different protocol. + * + * @param $lines input array may contain multiple lines, each line indicates a data point. + * If protocol=2 is used input array should contain single JSON + * string(e.g. char *lines[] = {"$JSON_string"}). If need to insert + * multiple data points in JSON format, should include them in $JSON_string + * as a JSON array. + * @param $numLines indicates how many data points in $lines. + * If protocol = 2 is used this param will be ignored as $lines should + * contain single JSON string. + * @param $protocol indicates which protocol to use for parsing: + * 0 - influxDB line protocol + * 1 - OpenTSDB telnet line protocol + * 2 - OpenTSDB JSON format protocol + * @return return zero for successful insertion. Otherwise return none-zero error code of + * failure reason. + * + */ + +TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) { + if (NULL == taos) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return NULL; + } + + SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT); + if (!request) { + uError("SML:taos_schemaless_insert error request is null"); + return NULL; + } + + if (!lines) { + SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; + request->code = TSDB_CODE_SML_INVALID_DATA; + smlBuildInvalidDataMsg(&msg, "lines is null", NULL); + return (TAOS_RES *)request; + } + + return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision); +} + +TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision){ + if (NULL == taos) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return NULL; + } + + SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT); + if (!request) { + uError("SML:taos_schemaless_insert error request is null"); + return NULL; + } + + if (!lines || len <= 0) { + SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; + request->code = TSDB_CODE_SML_INVALID_DATA; + smlBuildInvalidDataMsg(&msg, "lines is null", NULL); + return (TAOS_RES *)request; + } + + int numLines = 0; + *totalRows = 0; + char *tmp = lines; + for(int i = 0; i < len; i++){ + if(lines[i] == '\n' || i == len - 1){ + numLines++; + if(tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL){ //ignore comment + (*totalRows)++; + } + tmp = lines + i + 1; + } + } + return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision); +} diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index bf3fd00f14..81d0d616c9 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -152,7 +152,7 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, pStmt->bInfo.tbType = pTableMeta->tableType; pStmt->bInfo.boundTags = tags; pStmt->bInfo.tagsCached = false; - strcpy(pStmt->bInfo.stbFName, sTableName); + tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName)); return TSDB_CODE_SUCCESS; } diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index ab4442eca7..54778e87a7 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -44,7 +44,7 @@ TEST(testCase, smlParseInfluxString_Test) { char *tmp = "\\,st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 ,32,c=3"; char *sql = (char *)taosMemoryCalloc(256, 1); memcpy(sql, tmp, strlen(tmp) + 1); - int ret = smlParseInfluxString(sql, &elements, &msgBuf); + int ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf); ASSERT_EQ(ret, 0); ASSERT_EQ(elements.measure, sql); ASSERT_EQ(elements.measureLen, strlen(",st")); @@ -63,14 +63,14 @@ TEST(testCase, smlParseInfluxString_Test) { tmp = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000"; memcpy(sql, tmp, strlen(tmp) + 1); memset(&elements, 0, sizeof(SSmlLineInfo)); - ret = smlParseInfluxString(sql, &elements, &msgBuf); + ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf); ASSERT_NE(ret, 0); // case 3 false tmp = "st, t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000"; memcpy(sql, tmp, strlen(tmp) + 1); memset(&elements, 0, sizeof(SSmlLineInfo)); - ret = smlParseInfluxString(sql, &elements, &msgBuf); + ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf); ASSERT_EQ(ret, 0); ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 1); ASSERT_EQ(elements.colsLen, strlen("t1=3,t2=4,t3=t3")); @@ -79,7 +79,7 @@ TEST(testCase, smlParseInfluxString_Test) { tmp = "st, c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000"; memcpy(sql, tmp, strlen(tmp) + 1); memset(&elements, 0, sizeof(SSmlLineInfo)); - ret = smlParseInfluxString(sql, &elements, &msgBuf); + ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf); ASSERT_EQ(ret, 0); ASSERT_EQ(elements.measure, sql); ASSERT_EQ(elements.measureLen, strlen("st")); @@ -98,7 +98,7 @@ TEST(testCase, smlParseInfluxString_Test) { tmp = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 "; memcpy(sql, tmp, strlen(tmp) + 1); memset(&elements, 0, sizeof(SSmlLineInfo)); - ret = smlParseInfluxString(sql, &elements, &msgBuf); + ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf); ASSERT_EQ(ret, 0); ASSERT_EQ(elements.measure, sql + 1); ASSERT_EQ(elements.measureLen, strlen("st")); @@ -116,21 +116,21 @@ TEST(testCase, smlParseInfluxString_Test) { tmp = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 "; memcpy(sql, tmp, strlen(tmp) + 1); memset(&elements, 0, sizeof(SSmlLineInfo)); - ret = smlParseInfluxString(sql, &elements, &msgBuf); + ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf); ASSERT_EQ(ret, 0); // case 7 tmp = " st , "; memcpy(sql, tmp, strlen(tmp) + 1); memset(&elements, 0, sizeof(SSmlLineInfo)); - ret = smlParseInfluxString(sql, &elements, &msgBuf); + ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf); ASSERT_EQ(ret, 0); // case 8 false tmp = ", st , "; memcpy(sql, tmp, strlen(tmp) + 1); memset(&elements, 0, sizeof(SSmlLineInfo)); - ret = smlParseInfluxString(sql, &elements, &msgBuf); + ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf); ASSERT_NE(ret, 0); taosMemoryFree(sql); } @@ -542,7 +542,7 @@ TEST(testCase, smlParseTelnetLine_error_Test) { "sys.procs.running 1479496100 42 host= web01", }; for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) { - int ret = smlParseTelnetLine(info, (void *)sql[i]); + int ret = smlParseTelnetLine(info, (void *)sql[i], strlen(sql[i])); ASSERT_NE(ret, 0); } @@ -561,7 +561,7 @@ TEST(testCase, smlParseTelnetLine_diff_type_Test) { int ret = TSDB_CODE_SUCCESS; for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) { - ret = smlParseTelnetLine(info, (void *)sql[i]); + ret = smlParseTelnetLine(info, (void *)sql[i], strlen(sql[i])); if (ret != TSDB_CODE_SUCCESS) break; } ASSERT_NE(ret, 0); @@ -617,7 +617,7 @@ TEST(testCase, smlParseTelnetLine_json_error_Test) { int ret = TSDB_CODE_SUCCESS; for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) { - ret = smlParseTelnetLine(info, (void *)sql[i]); + ret = smlParseTelnetLine(info, (void *)sql[i], strlen(sql[i])); ASSERT_NE(ret, 0); } @@ -653,7 +653,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type1_Test) { int ret = TSDB_CODE_SUCCESS; for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) { - ret = smlParseTelnetLine(info, (void *)sql[i]); + ret = smlParseTelnetLine(info, (void *)sql[i], strlen(sql[i])); if (ret != TSDB_CODE_SUCCESS) break; } ASSERT_NE(ret, 0); @@ -688,7 +688,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) { }; int ret = TSDB_CODE_SUCCESS; for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) { - ret = smlParseTelnetLine(info, (void *)sql[i]); + ret = smlParseTelnetLine(info, (void *)sql[i], strlen(sql[i])); if (ret != TSDB_CODE_SUCCESS) break; } ASSERT_NE(ret, 0); @@ -1002,7 +1002,7 @@ TEST(testCase, sml_col_4096_Test) { int ret = TSDB_CODE_SUCCESS; for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) { - ret = smlParseInfluxLine(info, sql[i]); + ret = smlParseInfluxLine(info, sql[i], strlen(sql[i])); if (ret != TSDB_CODE_SUCCESS) break; } ASSERT_NE(ret, 0); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 076826ebc2..5a7f149bc6 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -51,6 +51,7 @@ static int32_t dmInitMonitor() { static bool dmCheckDiskSpace() { osUpdate(); + // sufficiency if (!osDataSpaceSufficient()) { dWarn("free data disk size: %f GB, not sufficient, expected %f GB at least", (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0); } @@ -60,7 +61,24 @@ static bool dmCheckDiskSpace() { if (!osTempSpaceSufficient()) { dWarn("free temp disk size: %f GB, not sufficient, expected %f GB at least", (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0); } - return true; + // availability + bool ret = true; + if (!osDataSpaceAvailable()) { + dError("data disk space unavailable, i.e. %s", tsDataDir); + terrno = TSDB_CODE_VND_NO_DISKSPACE; + ret = false; + } + if (!osLogSpaceAvailable()) { + dError("log disk space unavailable, i.e. %s", tsLogDir); + terrno = TSDB_CODE_VND_NO_DISKSPACE; + ret = false; + } + if (!osTempSpaceAvailable()) { + dError("temp disk space unavailable, i.e. %s", tsTempDir); + terrno = TSDB_CODE_VND_NO_DISKSPACE; + ret = false; + } + return ret; } static bool dmCheckDataDirVersion() { diff --git a/source/dnode/mgmt/test/sut/src/sut.cpp b/source/dnode/mgmt/test/sut/src/sut.cpp index 699203e8c1..a4d2e46881 100644 --- a/source/dnode/mgmt/test/sut/src/sut.cpp +++ b/source/dnode/mgmt/test/sut/src/sut.cpp @@ -43,9 +43,7 @@ void Testbase::InitLog(const char* path) { } void Testbase::Init(const char* path, int16_t port) { -#ifdef _TD_DARWIN_64 osDefaultInit(); -#endif tsServerPort = port; strcpy(tsLocalFqdn, "localhost"); snprintf(tsLocalEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 2e01fadbae..3e4c8005b4 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -90,14 +90,39 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { } for (int32_t i = 0; i < msgNum; ++i) { + if (offset >= pMsg->contLen) { + mError("offset %d is bigger than contLen %d", offset, pMsg->contLen); + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + taosArrayDestroy(batchRsp); + return -1; + } + req.msgIdx = ntohl(*(int32_t *)((char *)pMsg->pCont + offset)); offset += sizeof(req.msgIdx); + if (offset >= pMsg->contLen) { + mError("offset %d is bigger than contLen %d", offset, pMsg->contLen); + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + taosArrayDestroy(batchRsp); + return -1; + } req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset)); offset += sizeof(req.msgType); + if (offset >= pMsg->contLen) { + mError("offset %d is bigger than contLen %d", offset, pMsg->contLen); + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + taosArrayDestroy(batchRsp); + return -1; + } req.msgLen = ntohl(*(int32_t *)((char *)pMsg->pCont + offset)); offset += sizeof(req.msgLen); + if (offset >= pMsg->contLen) { + mError("offset %d is bigger than contLen %d", offset, pMsg->contLen); + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + taosArrayDestroy(batchRsp); + return -1; + } req.msg = (char *)pMsg->pCont + offset; offset += req.msgLen; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 10cfb0a660..8a3179b2a9 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -2553,12 +2553,17 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc char rollup[160 + VARSTR_HEADER_SIZE] = {0}; int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs); + char *sep = ", "; + int32_t sepLen = strlen(sep); + int32_t rollupLen = sizeof(rollup) - 2; for (int32_t i = 0; i < rollupNum; ++i) { char *funcName = taosArrayGet(pStb->pFuncs, i); if (i) { - strcat(varDataVal(rollup), ", "); + strncat(varDataVal(rollup), sep, rollupLen); + rollupLen -= sepLen; } - strcat(varDataVal(rollup), funcName); + strncat(varDataVal(rollup), funcName, rollupLen); + rollupLen -= strlen(funcName); } varDataSetLen(rollup, strlen(varDataVal(rollup))); diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index dade85b12d..50e3c3a9c1 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -258,6 +258,7 @@ enum { TD_FTYPE_RSMA_QTASKINFO = 0, }; +#if 0 struct STFile { uint8_t state; STFInfo info; @@ -287,6 +288,7 @@ int32_t tdUpdateTFileHeader(STFile *pTFile); void tdUpdateTFileMagic(STFile *pTFile, void *pCksm); void tdCloseTFile(STFile *pTFile); void tdDestroyTFile(STFile *pTFile); +#endif void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version, char *outputName); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 20e0fcdb5b..2d055acd2f 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -51,7 +51,7 @@ static int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema return -1; } - strncpy(pMetaRsp->tbName, tbName, TSDB_TABLE_NAME_LEN); + tstrncpy(pMetaRsp->tbName, tbName, TSDB_TABLE_NAME_LEN); pMetaRsp->numOfColumns = pSchema->nCols; pMetaRsp->tableType = TSDB_NORMAL_TABLE; pMetaRsp->sversion = pSchema->version; diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index db5f4c55b9..6168a00815 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -17,14 +17,17 @@ extern SSmaMgmt smaMgmt; +#if 0 static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma); static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma); static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma); +#endif static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma); static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat); +#if 0 /** * @brief Only applicable to Rollup SMA * @@ -48,6 +51,7 @@ int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaSyncCommitImpl(pSma); } * @return int32_t */ int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(pSma); } +#endif /** * @brief Only applicable to Rollup SMA @@ -108,6 +112,7 @@ int32_t smaBegin(SSma *pSma) { return TSDB_CODE_SUCCESS; } +#if 0 /** * @brief pre-commit for rollup sma(sync commit). * 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED. @@ -169,6 +174,7 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) { #endif return TSDB_CODE_SUCCESS; } +#endif // SQTaskFile ====================================================== @@ -230,6 +236,7 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { return TSDB_CODE_SUCCESS; } +#if 0 /** * @brief post-commit for rollup sma * 1) clean up the outdated qtaskinfo files @@ -249,6 +256,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; } +#endif /** * @brief Rsma async commit implementation(only do some necessary light weighted task) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 722a3f479e..77c5955098 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -15,8 +15,6 @@ #include "sma.h" -#define RSMA_QTASKINFO_BUFSIZE (32768) // size -#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt #define RSMA_FETCH_DELAY_MAX (120000) // ms @@ -48,23 +46,10 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SR static void tdRSmaFetchTrigger(void *param, void *tmrId); static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo); static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); -static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); -static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); -static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter); -static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer); static int32_t tdRSmaRestoreTSDataReload(SSma *pSma); -static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) { - // adapt accordingly if definition of SRSmaInfo update - SRSmaInfo *pResult = NULL; - ASSERT(pItem->level == TSDB_RETENTION_L1 || pItem->level == TSDB_RETENTION_L2); - pResult = (SRSmaInfo *)POINTER_SHIFT(pItem, -(sizeof(SRSmaInfoItem) * (pItem->level - 1) + RSMA_INFO_HEAD_LEN)); - ASSERT(pResult->pTSchema->numOfCols > 1); - return pResult; -} - struct SRSmaQTaskInfoItem { int32_t len; int8_t type; @@ -104,12 +89,6 @@ void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, con snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level); } -static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { - return lenWithHead - RSMA_QTASKINFO_HEAD_LEN; -} - -static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); } - static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { // Note: free/kill may in RC if (!taskHandle || !(*taskHandle)) return; @@ -803,6 +782,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu return TSDB_CODE_SUCCESS; } +#if 0 static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) { SSubmitMsgIter msgIter = {0}; SSubmitBlkIter blkIter = {0}; @@ -820,6 +800,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) { } return 0; } +#endif /** * @brief sync mode @@ -1189,65 +1170,6 @@ _err: return TSDB_CODE_FAILED; } -static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer) { - SVnode *pVnode = pSma->pVnode; - STFile tFile = {0}; - char qTaskInfoFName[TSDB_FILENAME_LEN] = {0}; - - tdRSmaQTaskInfoGetFileName(TD_VID(pVnode), qTaskFileVer, qTaskInfoFName); - if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { - goto _err; - } - - if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) { - if (qTaskFileVer > 0) { - smaWarn("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", not start as %s not exist", - TD_VID(pVnode), type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile)); - } else { - smaDebug("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode), - type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile)); - } - return TSDB_CODE_SUCCESS; - } - - if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) { - goto _err; - } - - STFInfo tFileInfo = {0}; - if (tdLoadTFileHeader(&tFile, &tFileInfo) < 0) { - goto _err; - } - - SRSmaQTaskInfoIter fIter = {0}; - if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) { - tdRSmaQTaskInfoIterDestroy(&fIter); - tdCloseTFile(&tFile); - tdDestroyTFile(&tFile); - goto _err; - } - - if (tdRSmaQTaskInfoRestore(pSma, type, &fIter) < 0) { - tdRSmaQTaskInfoIterDestroy(&fIter); - tdCloseTFile(&tFile); - tdDestroyTFile(&tFile); - goto _err; - } - - tdRSmaQTaskInfoIterDestroy(&fIter); - tdCloseTFile(&tFile); - tdDestroyTFile(&tFile); - - // restored successfully from committed or sync - smaInfo("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload succeed", TD_VID(pVnode), - type, qTaskFileVer); - return TSDB_CODE_SUCCESS; -_err: - smaError("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload failed since %s", - TD_VID(pVnode), type, qTaskFileVer, terrstr()); - return TSDB_CODE_FAILED; -} - /** * @brief reload ts data from checkpoint * @@ -1270,19 +1192,12 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) return TSDB_CODE_SUCCESS; } -#if 0 - // step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore - if (tdRSmaRestoreQTaskInfoReload(pSma, type, qtaskFileVer) < 0) { - goto _err; - } -#endif - - // step 3: reload ts data from checkpoint + // step 2: reload ts data from checkpoint if (tdRSmaRestoreTSDataReload(pSma) < 0) { goto _err; } - // step 4: open SRSmaFS for qTaskFiles + // step 3: open SRSmaFS for qTaskFiles if (tdRSmaFSOpen(pSma, qtaskFileVer) < 0) { goto _err; } @@ -1295,191 +1210,6 @@ _err: return TSDB_CODE_FAILED; } -/** - * @brief Restore from SRSmaQTaskInfoItem - * - * @param pSma - * @param pItem - * @return int32_t - */ -static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *pItem) { - SRSmaInfo *pRSmaInfo = NULL; - void *qTaskInfo = NULL; - - pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pItem->suid); - if (!pRSmaInfo) { - smaDebug("vgId:%d, no restore as no rsma info for table:%" PRIu64, SMA_VID(pSma), pItem->suid); - return TSDB_CODE_SUCCESS; - } - - if (pItem->type == TSDB_RETENTION_L1) { - qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 0); - } else if (pItem->type == TSDB_RETENTION_L2) { - qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 1); - } else { - ASSERT(0); - } - - if (!qTaskInfo) { - tdReleaseRSmaInfo(pSma, pRSmaInfo); - smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for table:%" PRIu64, SMA_VID(pSma), pItem->suid); - return TSDB_CODE_SUCCESS; - } - - if (qDeserializeTaskStatus(qTaskInfo, pItem->qTaskInfo, pItem->len) < 0) { - tdReleaseRSmaInfo(pSma, pRSmaInfo); - smaError("vgId:%d, restore rsma task failed for table:%" PRIi64 " level %d since %s", SMA_VID(pSma), pItem->suid, - pItem->type, terrstr()); - return TSDB_CODE_FAILED; - } - smaDebug("vgId:%d, restore rsma task success for table:%" PRIi64 " level %d", SMA_VID(pSma), pItem->suid, - pItem->type); - - tdReleaseRSmaInfo(pSma, pRSmaInfo); - return TSDB_CODE_SUCCESS; -} - -static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile) { - memset(pIter, 0, sizeof(*pIter)); - pIter->pTFile = pTFile; - pIter->offset = TD_FILE_HEAD_SIZE; - - if (tdGetTFileSize(pTFile, &pIter->fsize) < 0) { - return TSDB_CODE_FAILED; - } - - if ((pIter->fsize - TD_FILE_HEAD_SIZE) < RSMA_QTASKINFO_BUFSIZE) { - pIter->nAlloc = pIter->fsize - TD_FILE_HEAD_SIZE; - } else { - pIter->nAlloc = RSMA_QTASKINFO_BUFSIZE; - } - - if (pIter->nAlloc < TD_FILE_HEAD_SIZE) { - pIter->nAlloc = TD_FILE_HEAD_SIZE; - } - - pIter->pBuf = taosMemoryMalloc(pIter->nAlloc); - if (!pIter->pBuf) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; - } - pIter->qBuf = pIter->pBuf; - - return TSDB_CODE_SUCCESS; -} - -static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish) { - STFile *pTFile = pIter->pTFile; - int64_t nBytes = RSMA_QTASKINFO_BUFSIZE; - - if (pIter->offset >= pIter->fsize) { - *isFinish = true; - return TSDB_CODE_SUCCESS; - } - - if ((pIter->fsize - pIter->offset) < RSMA_QTASKINFO_BUFSIZE) { - nBytes = pIter->fsize - pIter->offset; - } - - if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) { - return TSDB_CODE_FAILED; - } - - if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) { - return TSDB_CODE_FAILED; - } - - int32_t infoLen = 0; - taosDecodeFixedI32(pIter->pBuf, &infoLen); - if (infoLen > nBytes) { - if (infoLen <= RSMA_QTASKINFO_BUFSIZE) { - terrno = TSDB_CODE_RSMA_FILE_CORRUPTED; - smaError("iterate rsma qtaskinfo file %s failed since %s", TD_TFILE_FULL_NAME(pIter->pTFile), terrstr()); - return TSDB_CODE_FAILED; - } - if (pIter->nAlloc < infoLen) { - pIter->nAlloc = infoLen; - void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen); - if (!pBuf) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; - } - pIter->pBuf = pBuf; - } - - nBytes = infoLen; - - if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) { - return TSDB_CODE_FAILED; - } - - if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) { - return TSDB_CODE_FAILED; - } - } - - pIter->qBuf = pIter->pBuf; - pIter->offset += nBytes; - pIter->nBytes = nBytes; - pIter->nBufPos = 0; - - return TSDB_CODE_SUCCESS; -} - -static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter) { - while (1) { - // block iter - bool isFinish = false; - if (tdRSmaQTaskInfoIterNextBlock(pIter, &isFinish) < 0) { - return TSDB_CODE_FAILED; - } - if (isFinish) { - return TSDB_CODE_SUCCESS; - } - - // consume the block - int32_t qTaskInfoLenWithHead = 0; - pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead); - if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) { - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - smaError("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s failed since %s", SMA_VID(pSma), type, - TD_TFILE_FULL_NAME(pIter->pTFile), terrstr()); - return TSDB_CODE_FAILED; - } - - while (1) { - if ((pIter->nBufPos + qTaskInfoLenWithHead) <= pIter->nBytes) { - SRSmaQTaskInfoItem infoItem = {0}; - pIter->qBuf = taosDecodeFixedI8(pIter->qBuf, &infoItem.type); - pIter->qBuf = taosDecodeFixedI64(pIter->qBuf, &infoItem.suid); - infoItem.qTaskInfo = pIter->qBuf; - infoItem.len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead); - // do the restore job - smaDebug("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s offset:%" PRIi64 "\n", SMA_VID(pSma), - type, TD_TFILE_FULL_NAME(pIter->pTFile), pIter->offset - pIter->nBytes + pIter->nBufPos); - tdRSmaQTaskInfoItemRestore(pSma, &infoItem); - - pIter->qBuf = POINTER_SHIFT(pIter->qBuf, infoItem.len); - pIter->nBufPos += qTaskInfoLenWithHead; - - if ((pIter->nBufPos + RSMA_QTASKINFO_HEAD_LEN) >= pIter->nBytes) { - // prepare and load next block in the file - pIter->offset -= (pIter->nBytes - pIter->nBufPos); - break; - } - - pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead); - continue; - } - // prepare and load next block in the file - pIter->offset -= (pIter->nBytes - pIter->nBufPos); - break; - } - } - - return TSDB_CODE_SUCCESS; -} - int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { SSma *pSma = pRSmaStat->pSma; SVnode *pVnode = pSma->pVnode; @@ -1523,148 +1253,6 @@ _err: return TSDB_CODE_FAILED; } -#if 0 -int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { - SSma *pSma = pRSmaStat->pSma; - SVnode *pVnode = pSma->pVnode; - int32_t vid = SMA_VID(pSma); - int64_t toffset = 0; - bool isFileCreated = false; - - if (taosHashGetSize(pInfoHash) <= 0) { - return TSDB_CODE_SUCCESS; - } - - void *infoHash = taosHashIterate(pInfoHash, NULL); - if (!infoHash) { - return TSDB_CODE_SUCCESS; - } - - int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat); - if (pRSmaStat->commitAppliedVer <= fsMaxVer) { - smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid, - pRSmaStat->commitAppliedVer, fsMaxVer); - return TSDB_CODE_SUCCESS; - } - - STFile tFile = {0}; -#if 0 - if (pRSmaStat->commitAppliedVer > 0) { - char qTaskInfoFName[TSDB_FILENAME_LEN]; - tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName); - if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { - smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr()); - goto _err; - } - if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) { - smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr()); - goto _err; - } - smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile)); - - isFileCreated = true; - } -#endif - - while (infoHash) { - SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; - - if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - infoHash = taosHashIterate(pInfoHash, infoHash); - continue; - } - - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { -#if 0 - qTaskInfo_t taskInfo = RSMA_INFO_IQTASK(pRSmaInfo, i); -#endif - qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i); - if (!taskInfo) { - smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); - continue; - } - - char *pOutput = NULL; - int32_t len = 0; - int8_t type = (int8_t)(i + 1); - if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) { - smaError("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo failed since %s", vid, pRSmaInfo->suid, - i + 1, terrstr()); - goto _err; - } - if (!pOutput || len <= 0) { - smaDebug("vgId:%d, rsma, table %" PRIi64 - " level %d serialize qTaskInfo success but no output(len %d), not persist", - vid, pRSmaInfo->suid, i + 1, len); - taosMemoryFreeClear(pOutput); - continue; - } - - smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo success with len %d, need persist", vid, - pRSmaInfo->suid, i + 1, len); - - if (!isFileCreated) { - char qTaskInfoFName[TSDB_FILENAME_LEN]; - tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName); - if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { - smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr()); - goto _err; - } - if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) { - smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr()); - goto _err; - } - smaDebug("vgId:%d, rsma, table %" PRIi64 " serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid, - TD_TFILE_FULL_NAME(&tFile)); - - isFileCreated = true; - } - - char tmpBuf[RSMA_QTASKINFO_HEAD_LEN] = {0}; - void *pTmpBuf = &tmpBuf; - int32_t headLen = 0; - headLen += taosEncodeFixedI32(&pTmpBuf, len + RSMA_QTASKINFO_HEAD_LEN); - headLen += taosEncodeFixedI8(&pTmpBuf, type); - headLen += taosEncodeFixedI64(&pTmpBuf, pRSmaInfo->suid); - - ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN); - tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset); - smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d head part(len:%d) appended to offset:%" PRIi64, vid, - pRSmaInfo->suid, i + 1, headLen, toffset); - tdAppendTFile(&tFile, pOutput, len, &toffset); - smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid, - pRSmaInfo->suid, i + 1, len, toffset); - - taosMemoryFree(pOutput); - } - - infoHash = taosHashIterate(pInfoHash, infoHash); - } - - if (isFileCreated) { - if (tdUpdateTFileHeader(&tFile) < 0) { - smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile), - tstrerror(terrno)); - goto _err; - } else { - smaDebug("vgId:%d, rsma, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile)); - } - - tdCloseTFile(&tFile); - tdDestroyTFile(&tFile); - } - return TSDB_CODE_SUCCESS; -_err: - smaError("vgId:%d, rsma persist failed since %s", vid, terrstr()); - if (isFileCreated) { - tdRemoveTFile(&tFile); - tdDestroyTFile(&tFile); - } - return TSDB_CODE_FAILED; -} - -#endif - /** * @brief trigger to get rsma result in async mode * diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 6d7b7df1ee..4d09d690d6 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -16,7 +16,7 @@ #include "sma.h" // smaFileUtil ================ - +#if 0 #define TD_FILE_STATE_OK 0 #define TD_FILE_STATE_BAD 1 @@ -182,6 +182,8 @@ void tdCloseTFile(STFile *pTFile) { void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); } +#endif + void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version, char *outputName) { if (version < 0) { @@ -221,6 +223,7 @@ void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool e } } +#if 0 int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) { TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK); TD_TFILE_SET_CLOSED(pTFile); @@ -286,6 +289,8 @@ int32_t tdRemoveTFile(STFile *pTFile) { return 0; } +#endif + // smaXXXUtil ================ void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) { void *pResult = taosAcquireRef(rsetId, refId); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 8c1f858cbb..64df3aa1eb 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -330,6 +330,11 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { rspSize += sizeof(int32_t); offset = 0; + if (rspSize > MAX_META_BATCH_RSP_SIZE) { + code = TSDB_CODE_INVALID_MSG_LEN; + goto _exit; + } + pRsp = rpcMallocCont(rspSize); if (pRsp == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index b960103d94..218a86ed5c 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -302,9 +302,11 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) { _return: - taosMemoryFreeClear(output->tbMeta); - taosMemoryFreeClear(output); - + if (output) { + taosMemoryFreeClear(output->tbMeta); + taosMemoryFreeClear(output); + } + CTG_RET(code); } diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 1d2e3640a1..93d36bc4b3 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -252,7 +252,7 @@ int32_t ctgInitGetIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) { SCtgIndexCtx* ctx = task.taskCtx; - strcpy(ctx->indexFName, name); + tstrncpy(ctx->indexFName, name, sizeof(ctx->indexFName)); taosArrayPush(pJob->pTasks, &task); @@ -277,7 +277,7 @@ int32_t ctgInitGetUdfTask(SCtgJob* pJob, int32_t taskIdx, void* param) { SCtgUdfCtx* ctx = task.taskCtx; - strcpy(ctx->udfName, name); + tstrncpy(ctx->udfName, name, sizeof(ctx->udfName)); taosArrayPush(pJob->pTasks, &task); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index b3588898e4..1a7a0057ba 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -660,7 +660,7 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) } msg->pCtg = pCtg; - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); msg->dbId = dbId; op->data = msg; @@ -693,7 +693,7 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp) } msg->pCtg = pCtg; - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); op->data = msg; @@ -721,8 +721,8 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, } msg->pCtg = pCtg; - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); - strncpy(msg->stbName, stbName, sizeof(msg->stbName)); + tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + tstrncpy(msg->stbName, stbName, sizeof(msg->stbName)); msg->dbId = dbId; msg->suid = suid; @@ -751,8 +751,8 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, } msg->pCtg = pCtg; - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); - strncpy(msg->tbName, tbName, sizeof(msg->tbName)); + tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + tstrncpy(msg->tbName, tbName, sizeof(msg->tbName)); msg->dbId = dbId; op->data = msg; @@ -785,7 +785,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId dbFName = p + 1; } - strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); msg->pCtg = pCtg; msg->dbId = dbId; msg->dbInfo = dbInfo; @@ -817,7 +817,8 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool sy char *p = strchr(output->dbFName, '.'); if (p && IS_SYS_DBNAME(p + 1)) { - memmove(output->dbFName, p + 1, strlen(p + 1)); + int32_t len = strlen(p + 1); + memmove(output->dbFName, p + 1, len >= TSDB_DB_FNAME_LEN ? TSDB_DB_FNAME_LEN - 1 : len); } msg->pCtg = pCtg; @@ -852,7 +853,7 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp } msg->pCtg = pCtg; - strcpy(msg->dbFName, dbFName); + tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); msg->vgId = vgId; msg->epSet = *pEpSet; @@ -1215,7 +1216,7 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { CTG_CACHE_STAT_INC(numOfDb, 1); SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1}; - strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); + tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId); @@ -1331,8 +1332,8 @@ int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uin metaRent.smaVer = pCache->pIndex->version; } - strcpy(metaRent.dbFName, dbFName); - strcpy(metaRent.stbName, tbName); + tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName)); + tstrncpy(metaRent.stbName, tbName, sizeof(metaRent.stbName)); CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion), ctgStbVersionSortCompare, ctgStbVersionSearchCompare)); @@ -1418,8 +1419,10 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName, meta->tableType); - CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache)); - + if (pCache) { + CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache)); + } + return TSDB_CODE_SUCCESS; } @@ -1590,7 +1593,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { dbCache = NULL; - strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); + tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare)); @@ -1680,9 +1683,9 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) { if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) { int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta); - CTG_ERR_JRET( - ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize)); + code = ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize); pMeta->tbMeta = NULL; + CTG_ERR_JRET(code); } if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) { @@ -1697,10 +1700,8 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) { _return: - if (pMeta) { - taosMemoryFreeClear(pMeta->tbMeta); - taosMemoryFreeClear(pMeta); - } + taosMemoryFreeClear(pMeta->tbMeta); + taosMemoryFreeClear(pMeta); taosMemoryFreeClear(msg); diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 33e5b0e1e4..095d2b093d 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -361,7 +361,12 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) { SArray* pTagVals = NULL; STag* pTag = (STag*)pCfg->pTags; - if (pCfg->pTags && tTagIsJson(pTag)) { + if (NULL == pCfg->pTags || pCfg->numOfTags <= 0) { + qError("tag missed in table cfg, pointer:%p, numOfTags:%d", pCfg->pTags, pCfg->numOfTags); + return TSDB_CODE_APP_ERROR; + } + + if (tTagIsJson(pTag)) { char* pJson = parseTagDatatoJson(pTag); if (pJson) { *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 0b5c8372f1..ffdcf48d48 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -143,9 +143,15 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM); - if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) { + if (NULL == pBuf) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } + + if (!allocBuf(pDispatcher, pInput, pBuf)) { + taosFreeQitem(pBuf); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + toDataCacheEntry(pDispatcher, pInput, pBuf); taosWriteQitem(pDispatcher->pDataBlocks, pBuf); *pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false); diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 4c4ba59fa9..ed455e5e75 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -323,7 +323,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat int32_t code = tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid); if (code) { - destroyDataSinker((SDataSinkHandle*)pInserterNode); + destroyDataSinker((SDataSinkHandle*)inserter); return code; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d99945e7a9..1f2b394b9c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1725,6 +1725,9 @@ static void destroyStateWindowOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); taosMemoryFreeClear(pInfo->stateKey.pData); cleanupExprSupp(&pInfo->scalarSup); + colDataDestroy(&pInfo->twAggSup.timeWindowData); + cleanupAggSup(&pInfo->aggSup); + cleanupGroupResInfo(&pInfo->groupResInfo); taosMemoryFreeClear(param); } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index a917397d02..618f597d72 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -357,8 +357,7 @@ char* parseTagDatatoJson(void* p) { for (int j = 0; j < nCols; ++j) { STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j); // json key encode by binary - memset(tagJsonKey, 0, sizeof(tagJsonKey)); - memcpy(tagJsonKey, pTagVal->pKey, strlen(pTagVal->pKey)); + tstrncpy(tagJsonKey, pTagVal->pKey, sizeof(tagJsonKey)); // json value char type = pTagVal->type; if (type == TSDB_DATA_TYPE_NULL) { diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index a6f26088de..fadd07a9f3 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -173,7 +173,7 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t } SDbCfgReq dbCfgReq = {0}; - strcpy(dbCfgReq.db, input); + strncpy(dbCfgReq.db, input, sizeof(dbCfgReq.db) - 1); int32_t bufLen = tSerializeSDbCfgReq(NULL, 0, &dbCfgReq); void *pBuf = (*mallcFp)(bufLen); @@ -191,7 +191,7 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t } SUserIndexReq indexReq = {0}; - strcpy(indexReq.indexFName, input); + strncpy(indexReq.indexFName, input, sizeof(indexReq.indexFName) - 1); int32_t bufLen = tSerializeSUserIndexReq(NULL, 0, &indexReq); void *pBuf = (*mallcFp)(bufLen); @@ -233,7 +233,7 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32 } SGetUserAuthReq req = {0}; - strncpy(req.user, input, sizeof(req.user)); + strncpy(req.user, input, sizeof(req.user) - 1); int32_t bufLen = tSerializeSGetUserAuthReq(NULL, 0, &req); void *pBuf = (*mallcFp)(bufLen); @@ -251,7 +251,7 @@ int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_ } STableIndexReq indexReq = {0}; - strcpy(indexReq.tbFName, input); + strncpy(indexReq.tbFName, input, sizeof(indexReq.tbFName) - 1); int32_t bufLen = tSerializeSTableIndexReq(NULL, 0, &indexReq); void *pBuf = (*mallcFp)(bufLen); @@ -271,8 +271,8 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t SBuildTableInput *pInput = input; STableCfgReq cfgReq = {0}; cfgReq.header.vgId = pInput->vgId; - strncpy(cfgReq.dbFName, pInput->dbFName, sizeof(cfgReq.dbFName)); - strncpy(cfgReq.tbName, pInput->tbName, sizeof(cfgReq.tbName)); + strncpy(cfgReq.dbFName, pInput->dbFName, sizeof(cfgReq.dbFName) - 1); + strncpy(cfgReq.tbName, pInput->tbName, sizeof(cfgReq.tbName) - 1); int32_t bufLen = tSerializeSTableCfgReq(NULL, 0, &cfgReq); void *pBuf = (*mallcFp)(bufLen); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 3038b87930..e9ded9b269 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -412,7 +412,7 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { while (true) { paramIdx = atomic_load_32(&gQwMgmt.paramIdx); if (paramIdx == tListLen(gQwMgmt.param)) { - newParamIdx = 0; + newParamIdx = 1; } else { newParamIdx = paramIdx + 1; } @@ -422,6 +422,10 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { } } + if (paramIdx == tListLen(gQwMgmt.param)) { + paramIdx = 0; + } + gQwMgmt.param[paramIdx].qwrId = gQwMgmt.qwRef; gQwMgmt.param[paramIdx].refId = refId; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 0ff40f3b9a..3df16563e2 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -398,7 +398,6 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR); - break; } if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 1f09fd4799..cc1949f17d 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -629,6 +629,8 @@ _return: sclFreeParam(pWhen); sclFreeParam(pThen); + taosMemoryFree(pWhen); + taosMemoryFree(pThen); SCL_RET(code); } @@ -664,6 +666,8 @@ int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCe sclFreeParam(pWhen); sclFreeParam(pThen); + taosMemoryFreeClear(pWhen); + taosMemoryFreeClear(pThen); } if (pElse) { @@ -688,6 +692,8 @@ _return: sclFreeParam(pWhen); sclFreeParam(pThen); + taosMemoryFree(pWhen); + taosMemoryFree(pThen); SCL_RET(code); } @@ -929,6 +935,10 @@ int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *outp sclFreeParam(&comp); sclFreeParam(pWhen); sclFreeParam(pThen); + taosMemoryFree(pCase); + taosMemoryFree(pElse); + taosMemoryFree(pWhen); + taosMemoryFree(pThen); return TSDB_CODE_SUCCESS; @@ -940,6 +950,10 @@ _return: sclFreeParam(pWhen); sclFreeParam(pThen); sclFreeParam(output); + taosMemoryFree(pCase); + taosMemoryFree(pElse); + taosMemoryFree(pWhen); + taosMemoryFree(pThen); SCL_RET(code); } diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 22e9fb4305..f089bad04e 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -654,7 +654,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut, return TSDB_CODE_APP_ERROR; } - int32_t rstart = startIndex >= 0 ? startIndex : 0; + int32_t rstart = (startIndex >= 0 && startIndex < pIn->numOfRows) ? startIndex : 0; int32_t rend = numOfRows > 0 ? rstart + numOfRows - 1 : rstart + pIn->numOfRows - 1; SSclVectorConvCtx cCtx = {pIn, pOut, rstart, rend, pInputCol->info.type, pOutputCol->info.type}; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index c641b88152..b585373b0a 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -430,7 +430,8 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 if (SCH_IS_DATA_BIND_TASK(pTask)) { if (NULL == pData->pEpSet) { SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode)); - SCH_ERR_JRET(rspCode); + code = rspCode; + goto _return; } } diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 9e17f50dce..8f11fb88a2 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -1086,6 +1086,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const // fetch next ofp, a new ofp and make it dirty ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt); if (ret < 0) { + tdbFree(pBuf); return -1; } } diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index dc403ff0c4..a2002a3ac4 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -106,7 +106,7 @@ int32_t tdbBegin(TDB *pDb, TXN *pTxn) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { ret = tdbPagerBegin(pPager, pTxn); if (ret < 0) { - tdbError("failed to begin pager since %s. dbName:%s, txnId:%d", tstrerror(terrno), pDb->dbName, pTxn->txnId); + tdbError("failed to begin pager since %s. dbName:%s, txnId:%ld", tstrerror(terrno), pDb->dbName, pTxn->txnId); return -1; } } @@ -121,7 +121,7 @@ int32_t tdbCommit(TDB *pDb, TXN *pTxn) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { ret = tdbPagerCommit(pPager, pTxn); if (ret < 0) { - tdbError("failed to commit pager since %s. dbName:%s, txnId:%d", tstrerror(terrno), pDb->dbName, pTxn->txnId); + tdbError("failed to commit pager since %s. dbName:%s, txnId:%ld", tstrerror(terrno), pDb->dbName, pTxn->txnId); return -1; } } @@ -151,7 +151,7 @@ int32_t tdbAbort(TDB *pDb, TXN *pTxn) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { ret = tdbPagerAbort(pPager, pTxn); if (ret < 0) { - tdbError("failed to abort pager since %s. dbName:%s, txnId:%d", tstrerror(terrno), pDb->dbName, pTxn->txnId); + tdbError("failed to abort pager since %s. dbName:%s, txnId:%ld", tstrerror(terrno), pDb->dbName, pTxn->txnId); return -1; } } diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c index c73ddce74c..732a57639f 100644 --- a/source/libs/tdb/src/db/tdbPCache.c +++ b/source/libs/tdb/src/db/tdbPCache.c @@ -268,7 +268,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) // 4. Try a create new page if (!pPage) { ret = tdbPageCreate(pCache->szPage, &pPage, pTxn->xMalloc, pTxn->xArg); - if (ret < 0 && pPage != NULL) { + if (ret < 0 || pPage == NULL) { // TODO ASSERT(0); return NULL; diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 2f89e4a49d..58f6cedc22 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -553,7 +553,7 @@ static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) { ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize); if (ret < 0) { - tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->jFileName, + tdbError("failed to write page data due to %s. file:%s, pageSize:%d", strerror(errno), pPager->jFileName, pPage->pageSize); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -582,7 +582,7 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) { ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize); if (ret < 0) { - tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->dbFileName, + tdbError("failed to write page data due to %s. file:%s, pageSize:%d", strerror(errno), pPager->dbFileName, pPage->pageSize); terrno = TAOS_SYSTEM_ERROR(errno); return -1; diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 279f4dc656..fa22805df2 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -123,8 +123,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { } SWalCkHead* logContent = (SWalCkHead*)candidate; if (walValidHeadCksum(logContent) != 0) { - wError("vgId:%d, failed to validate checksum of wal entry header. offset:% %" PRId64 ", file:%s", - ((char*)(logContent)-buf), fnameStr); + wWarn("vgId:%d, failed to validate checksum of wal entry header. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, + offset + ((char*)(logContent)-buf), fnameStr); haystack = candidate + 1; if (firstTrial) { break; @@ -162,8 +162,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { } if (walValidBodyCksum(logContent) != 0) { terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH; - wError("vgId:%d, failed to validate checksum of wal entry body. offset:% %" PRId64 ", file:%s", - ((char*)(logContent)-buf), fnameStr); + wWarn("vgId:%d, failed to validate checksum of wal entry body. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, + offset + ((char*)(logContent)-buf), fnameStr); haystack = candidate + 1; if (firstTrial) { break; @@ -481,6 +481,10 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { continue; } + if (offset != (idxEntry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry)) { + continue; + } + if (walReadLogHead(pLogFile, idxEntry.offset, &ckHead) < 0) { wWarn("vgId:%d, failed to read log file since %s. file:%s, offset:%" PRId64 ", idx entry ver:%" PRId64 "", pWal->cfg.vgId, terrstr(), fLogNameStr, idxEntry.offset, idxEntry.ver); @@ -493,6 +497,8 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { } offset += sizeof(SWalIdxEntry); + ASSERT(offset == (idxEntry.ver - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)); + // ftruncate idx file if (offset < fileSize) { if (taosFtruncateFile(pIdxFile, offset) < 0) { diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 91fa49fce0..0562bbad27 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -410,25 +410,35 @@ END: static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { SWalIdxEntry entry = {.ver = ver, .offset = offset}; - int64_t idxOffset = taosLSeekFile(pWal->pIdxFile, 0, SEEK_END); + SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal); + ASSERT(pFileInfo != NULL); + ASSERT(pFileInfo->firstVer >= 0); + int64_t idxOffset = (entry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry); wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset, idxOffset); + int64_t size = taosWriteFile(pWal->pIdxFile, &entry, sizeof(SWalIdxEntry)); if (size != sizeof(SWalIdxEntry)) { + wError("vgId:%d, failed to write idx entry due to %s. ver:%lld", pWal->cfg.vgId, strerror(errno), ver); terrno = TAOS_SYSTEM_ERROR(errno); - // TODO truncate return -1; } + + ASSERT(taosLSeekFile(pWal->pIdxFile, 0, SEEK_END) == idxOffset + sizeof(SWalIdxEntry) && "Offset of idx entries misaligned"); return 0; } -// TODO gurantee atomicity by truncate failed writing static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen) { int64_t code = 0; int64_t offset = walGetCurFileOffset(pWal); + SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal); + ASSERT(pFileInfo != NULL); + if (pFileInfo->firstVer == -1) { + pFileInfo->firstVer = index; + } pWal->writeHead.head.version = index; pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.msgType = msgType; @@ -439,11 +449,14 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); - wDebug("vgId:%d, wal write log %ld, msgType: %s", pWal->cfg.vgId, index, TMSG_INFO(msgType)); + code = walWriteIndex(pWal, index, offset); + if (code < 0) { + goto END; + } + if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) { - // TODO ftruncate terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); @@ -452,7 +465,6 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy } if (taosWriteFile(pWal->pLogFile, (char *)body, bodyLen) != bodyLen) { - // TODO ftruncate terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); @@ -460,24 +472,31 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy goto END; } - code = walWriteIndex(pWal, index, offset); - if (code < 0) { - // TODO ftruncate - goto END; - } - // set status if (pWal->vers.firstVer == -1) pWal->vers.firstVer = index; pWal->vers.lastVer = index; pWal->totSize += sizeof(SWalCkHead) + bodyLen; - if (walGetCurFileInfo(pWal)->firstVer == -1) { - walGetCurFileInfo(pWal)->firstVer = index; - } - walGetCurFileInfo(pWal)->lastVer = index; - walGetCurFileInfo(pWal)->fileSize += sizeof(SWalCkHead) + bodyLen; + pFileInfo->lastVer = index; + pFileInfo->fileSize += sizeof(SWalCkHead) + bodyLen; return 0; + END: + // recover in a reverse order + if (taosFtruncateFile(pWal->pLogFile, offset) < 0) { + wFatal("vgId:%d, failed to ftruncate logfile to offset:%lld during recovery due to %s", pWal->cfg.vgId, offset, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + ASSERT(0 && "failed to recover from error"); + } + + int64_t idxOffset = (index - pFileInfo->firstVer) * sizeof(SWalIdxEntry); + if (taosFtruncateFile(pWal->pIdxFile, idxOffset) < 0) { + wFatal("vgId:%d, failed to ftruncate idxfile to offset:%lld during recovery due to %s", pWal->cfg.vgId, idxOffset, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + ASSERT(0 && "failed to recover from error"); + } return -1; } diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 2e2300ba14..9d97cf7ab2 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -443,10 +443,13 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) { static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *buffer, int32_t len) { if ((dflag & DEBUG_FILE) && tsLogObj.logHandle && tsLogObj.logHandle->pFile != NULL && osLogSpaceAvailable()) { taosUpdateLogNums(level); - if (tsAsyncLog) { + if (tsAsyncLog && level != DEBUG_FATAL) { taosPushLogBuffer(tsLogObj.logHandle, buffer, len); } else { taosWriteFile(tsLogObj.logHandle->pFile, buffer, len); + if (level == DEBUG_FATAL) { + taosFsyncFile(tsLogObj.logHandle->pFile); + } } if (tsLogObj.maxLines > 0) { diff --git a/tools/shell/src/shellCommand.c b/tools/shell/src/shellCommand.c index 0b26d685fd..c34b22b691 100644 --- a/tools/shell/src/shellCommand.c +++ b/tools/shell/src/shellCommand.c @@ -101,11 +101,8 @@ void shellInsertChar(SShellCmd *cmd, char *c, int32_t size) { /* update the values */ cmd->commandSize += size; cmd->cursorOffset += size; - for (int i = 0; i < size; i++) { - taosMbToWchar(&wc, c + i, size); - cmd->screenOffset += taosWcharWidth(wc); - cmd->endOffset += taosWcharWidth(wc); - } + cmd->screenOffset += taosWcharWidth(wc); + cmd->endOffset += taosWcharWidth(wc); #ifdef WINDOWS #else shellShowOnScreen(cmd); diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index b6d8d75ba0..f7fd59cd7b 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -63,6 +63,7 @@ int smlProcess_influx_Test() { printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); int code = taos_errno(pRes); taos_free_result(pRes); + taos_close(taos); return code; } @@ -86,6 +87,8 @@ int smlProcess_telnet_Test() { printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); int code = taos_errno(pRes); taos_free_result(pRes); + taos_close(taos); + return code; } @@ -125,6 +128,8 @@ int smlProcess_json1_Test() { printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); int code = taos_errno(pRes); taos_free_result(pRes); + taos_close(taos); + return code; } @@ -165,6 +170,8 @@ int smlProcess_json2_Test() { printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); int code = taos_errno(pRes); taos_free_result(pRes); + taos_close(taos); + return code; } @@ -233,6 +240,8 @@ int smlProcess_json3_Test() { printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); int code = taos_errno(pRes); taos_free_result(pRes); + taos_close(taos); + return code; } @@ -292,6 +301,8 @@ int smlProcess_json4_Test() { printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); int code = taos_errno(pRes); taos_free_result(pRes); + taos_close(taos); + return code; } @@ -313,6 +324,8 @@ int sml_TD15662_Test() { printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); int code = taos_errno(pRes); taos_free_result(pRes); + taos_close(taos); + return code; } @@ -333,6 +346,8 @@ int sml_TD15742_Test() { printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); int code = taos_errno(pRes); taos_free_result(pRes); + taos_close(taos); + return code; } @@ -362,6 +377,8 @@ int sml_16384_Test() { printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); code = taos_errno(pRes); taos_free_result(pRes); + taos_close(taos); + return code; } @@ -781,6 +798,8 @@ int sml_oom_Test() { printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); int code = taos_errno(pRes); taos_free_result(pRes); + taos_close(taos); + return code; } @@ -825,6 +844,8 @@ int sml_16368_Test() { printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); int code = taos_errno(pRes); taos_free_result(pRes); + taos_close(taos); + return code; } @@ -862,6 +883,8 @@ int sml_dup_time_Test() { printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); int code = taos_errno(pRes); taos_free_result(pRes); + taos_close(taos); + return code; } @@ -1068,6 +1091,8 @@ int sml_16960_Test() { printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); int code = taos_errno(pRes); taos_free_result(pRes); + taos_close(taos); + return code; } @@ -1097,6 +1122,7 @@ int sml_add_tag_col_Test() { printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); code = taos_errno(pRes); taos_free_result(pRes); + taos_close(taos); return code; } @@ -1151,6 +1177,36 @@ int smlProcess_18784_Test() { rowIndex++; } taos_free_result(pRes); + taos_close(taos); + + return code; +} + +int sml_19221_Test() { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + + TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db schemaless 1"); + taos_free_result(pRes); + + const char *sql[] = { + "qelhxo,id=pnnqhsa,t0=t,t1=127i8 c11=L\"ncharColValue\",c0=t,c1=127i8 1626006833639000000\nqelhxo,id=pnnhsa,t0=t,t1=127i8 c11=L\"ncharColValue\",c0=t,c1=127i8 1626006833639000000\n#comment\nqelhxo,id=pnqhsa,t0=t,t1=127i8 c11=L\"ncharColValue\",c0=t,c1=127i8 1626006833639000000", + }; + + pRes = taos_query(taos, "use sml_db"); + taos_free_result(pRes); + + char* tmp = (char*)taosMemoryCalloc(1024, 1); + memcpy(tmp, sql[0], strlen(sql[0])); + *(char*)(tmp+44) = 0; + int32_t totalRows = 0; + pRes = taos_schemaless_insert_raw(taos, tmp, strlen(sql[0]), &totalRows, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); + + ASSERT(totalRows == 3); + printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); + int code = taos_errno(pRes); + taos_free_result(pRes); + taos_close(taos); + taosMemoryFree(tmp); return code; } @@ -1187,5 +1243,7 @@ int main(int argc, char *argv[]) { ASSERT(!ret); ret = smlProcess_18784_Test(); ASSERT(!ret); + ret = sml_19221_Test(); + ASSERT(!ret); return ret; }