Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/TD-31890-18

This commit is contained in:
Hongze Cheng 2024-09-26 08:57:25 +08:00
commit 74b0d803ad
42 changed files with 703 additions and 370 deletions

View File

@ -83,7 +83,6 @@ async fn main() -> anyhow::Result<()> {
eprintln!("Failed to execute insert: {:?}", e);
}
tokio::time::sleep(Duration::from_millis(10)).await;
println!("Succed to execute insert 1 row");
}
});
});

View File

@ -181,7 +181,7 @@ INTERVAL(interval_val [, interval_offset])
- FILL用于指定窗口区间数据缺失的情况下数据的填充模式。
对于时间窗口interval_val 和 sliding_val 都表示时间段, 语法上支持三种方式。例如:
1. INTERVAL(1s, 500a) SLIDING(1s),带时间单位的形式,其中的时间单位是单字符表示, 分别为: a (毫秒), b (纳秒), d (天), h (小时), m (分钟), n (月), s (秒), u (微 w (周), y (年);
1. INTERVAL(1s, 500a) SLIDING(1s),带时间单位的形式,其中的时间单位是单字符表示, 分别为: a (毫秒), b (纳秒), d (天), h (小时), m (分钟), n (月), s (秒), u (微 w (周), y (年);
2. INTERVAL(1000, 500) SLIDING(1000),不带时间单位的形式,将使用查询库的时间精度作为默认时间单位,当存在多个库时默认采用精度更高的库;
3. INTERVAL('1s', '500a') SLIDING('1s'),带时间单位的字符串形式,字符串内部不能有任何空格等其它字符。

View File

@ -14,7 +14,8 @@ description: 让开发者能够快速上手的指南
7. 在很多场景下(如车辆管理),应用需要获取每个数据采集点的最新状态,那么建议你采用 TDengine 的 Cache 功能,而不用单独部署 Redis 等缓存软件。
8. 如果你发现 TDengine 的函数无法满足你的要求那么你可以使用用户自定义函数UDF来解决问题。
本部分内容就是按照上述顺序组织的。为便于理解TDengine 为每个功能和每个支持的编程语言都提供了示例代码。如果你希望深入了解 SQL 的使用,需要查看[SQL 手册](../reference/taos-sql/)。如果想更深入地了解各连接器的使用,请阅读[连接器参考指南](../reference/connector/)。如果还希望想将 TDengine 与第三方系统集成起来,比如 Grafana, 请参考[第三方工具](../third-party/)。
本部分内容就是按照上述顺序组织的。为便于理解TDengine 为每个功能和每个支持的编程语言都提供了示例代码,位于 [示例代码](https://github.com/taosdata/TDengine/tree/main/docs/examples)。所有示例代码都会有 CI 保证正确性,脚本位于 [示例代码 CI](https://github.com/taosdata/TDengine/tree/main/tests/docs-examples-test)。
如果你希望深入了解 SQL 的使用,需要查看[SQL 手册](../reference/taos-sql/)。如果想更深入地了解各连接器的使用,请阅读[连接器参考指南](../reference/connector/)。如果还希望想将 TDengine 与第三方系统集成起来,比如 Grafana, 请参考[第三方工具](../third-party/)。
如果在开发过程中遇到任何问题,请点击每个页面下方的["反馈问题"](https://github.com/taosdata/TDengine/issues/new/choose), 在 GitHub 上直接递交 Issue。

View File

@ -80,7 +80,6 @@ database_option: {
```sql
create database if not exists db vgroups 10 buffer 10
```
以上示例创建了一个有 10 个 vgroup 名为 db 的数据库, 其中每个 vnode 分配 10MB 的写入缓存

View File

@ -1214,7 +1214,7 @@ TO_TIMESTAMP(ts_str_literal, format_str_literal)
- 如果没有指定完整的时间,那么默认时间值为指定或默认时区的 `1970-01-01 00:00:00`, 未指定部分使用该默认值中的对应部分. 暂不支持只指定年日而不指定月日的格式, 如'yyyy-mm-DDD', 支持'yyyy-mm-DD'.
- 如果格式串中有`AM`, `PM`等, 那么小时必须是12小时制, 范围必须是01-12.
- `to_timestamp`转换具有一定的容错机制, 在格式串和时间戳串不完全对应时, 有时也可转换, 如: `to_timestamp('200101/2', 'yyyyMM1/dd')`, 格式串中多出来的1会被丢弃. 格式串与时间戳串中多余的空格字符(空格, tab等)也会被 自动忽略. 如`to_timestamp(' 23 年 - 1 月 - 01 日 ', 'yy 年-MM月-dd日')` 可以被成功转换. 虽然`MM`等字段需要两个数字对应(只有一位时前面补0), 在`to_timestamp`时, 一个数字也可以成功转换.
- 输出时间戳的精度与查询表的精度相同, 若查询未指定表, 则输出精度为毫秒. 如`select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns')`的输出将会把微和纳秒进行截断. 如果指定一张纳秒表, 那么就不会发生截断, 如`select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns') from db_ns.table_ns limit 1`.
- 输出时间戳的精度与查询表的精度相同, 若查询未指定表, 则输出精度为毫秒. 如`select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns')`的输出将会把微和纳秒进行截断. 如果指定一张纳秒表, 那么就不会发生截断, 如`select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns') from db_ns.table_ns limit 1`.
### 时间和日期函数

View File

@ -42,9 +42,10 @@ TDengine 支持 `UNION ALL` 和 `UNION` 操作符。UNION ALL 将查询返回的
| 6 | [NOT] BETWEEN AND | 除 BOOL、BLOB、MEDIUMBLOB、JSON 和 GEOMETRY 外的所有类型 | 闭区间比较 |
| 7 | IN | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型,且不可以为表的时间戳主键列 | 与列表内的任意值相等 |
| 8 | NOT IN | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型,且不可以为表的时间戳主键列 | 与列表内的任意值都不相等 |
| 9 | LIKE | BINARY、NCHAR 和 VARCHAR | 通配符匹配 |
| 10 | MATCH, NMATCH | BINARY、NCHAR 和 VARCHAR | 正则表达式匹配 |
| 11 | CONTAINS | JSON | JSON 中是否存在某键 |
| 9 | LIKE | BINARY、NCHAR 和 VARCHAR | 通配符匹配所指定的模式串 |
| 10 | NOT LIKE | BINARY、NCHAR 和 VARCHAR | 通配符不匹配所指定的模式串 |
| 11 | MATCH, NMATCH | BINARY、NCHAR 和 VARCHAR | 正则表达式匹配 |
| 12 | CONTAINS | JSON | JSON 中是否存在某键 |
LIKE 条件使用通配符字符串进行匹配检查,规则如下:

View File

@ -28,7 +28,7 @@ TSMA只能基于超级表和普通表创建, 不能基于子表创建.
由于TSMA输出为一张超级表, 因此输出表的行长度受最大行长度限制, 不同函数的`中间结果`大小各异, 一般都大于原始数据大小, 若输出表的行长度大于最大行长度限制, 将会报`Row length exceeds max length`错误. 此时需要减少函数个数或者将常用的函数进行分组拆分到多个TSMA中.
窗口大小的限制为[1m ~ 1y/12n]. INTERVAL 的单位与查询中INTERVAL子句相同, 如 a (毫秒), b (纳秒), h (小时), m (分钟), s (秒), u (微), d (天), w(周), n(月), y(年).
窗口大小的限制为[1m ~ 1y/12n]. INTERVAL 的单位与查询中INTERVAL子句相同, 如 a (毫秒), b (纳秒), h (小时), m (分钟), s (秒), u (微), d (天), w(周), n(月), y(年).
TSMA为库内对象, 但名字全局唯一. 集群内一共可创建TSMA个数受参数`maxTsmaNum`限制, 参数默认值为3, 范围: [0-3]. 注意, 由于TSMA后台计算使用流计算, 因此每创建一条TSMA, 将会创建一条流, 因此能够创建的TSMA条数也受当前已经存在的流条数和最大可创建流条数限制.

View File

@ -32,7 +32,6 @@ extern int32_t tsS3PageCacheSize;
extern int32_t tsS3UploadDelaySec;
int32_t s3Init();
void s3CleanUp();
int32_t s3CheckCfg();
int32_t s3PutObjectFromFile(const char *file, const char *object);
int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp);

View File

@ -72,8 +72,8 @@ extern "C" {
#ifdef TD_TSZ
extern bool lossyFloat;
extern bool lossyDouble;
int32_t tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
uint32_t intervals, int32_t ifAdtFse, const char *compressor);
void tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, uint32_t intervals,
int32_t ifAdtFse, const char *compressor);
void tsCompressExit();

View File

@ -26,10 +26,10 @@ extern "C" {
#include "query.h"
#include "taos.h"
#include "tcommon.h"
#include "tmisce.h"
#include "tdef.h"
#include "thash.h"
#include "tlist.h"
#include "tmisce.h"
#include "tmsg.h"
#include "tmsgtype.h"
#include "trpc.h"
@ -86,7 +86,7 @@ typedef struct {
int8_t threadStop;
int8_t quitByKill;
TdThread thread;
TdThreadMutex lock; // used when app init and cleanup
TdThreadMutex lock; // used when app init and cleanup
SHashObj* appSummary;
SHashObj* appHbHash; // key: clusterId
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
@ -95,11 +95,11 @@ typedef struct {
} SClientHbMgr;
typedef struct SQueryExecMetric {
int64_t start; // start timestamp, us
int64_t ctgStart; // start to parse, us
int64_t execStart; // start to parse, us
int64_t start; // start timestamp, us
int64_t ctgStart; // start to parse, us
int64_t execStart; // start to parse, us
int64_t parseCostUs;
int64_t parseCostUs;
int64_t ctgCostUs;
int64_t analyseCostUs;
int64_t planCostUs;
@ -193,7 +193,7 @@ typedef struct SReqResultInfo {
char** convertBuf;
TAOS_ROW row;
SResultColumn* pCol;
uint64_t numOfRows; // from int32_t change to int64_t
uint64_t numOfRows; // from int32_t change to int64_t
uint64_t totalRows;
uint64_t current;
bool localResultFetched;
@ -319,12 +319,14 @@ void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source);
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid);
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, int8_t source);
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
int8_t source);
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
int64_t reqid);
void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param);
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser, SParseSqlRes* pRes);
void syncQueryFn(void* param, void* res, int32_t code);
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param);
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
SParseSqlRes* pRes);
void syncQueryFn(void* param, void* res, int32_t code);
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols);
@ -333,7 +335,7 @@ static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
return (SReqResultInfo*)&msg->common.resInfo;
}
int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo);
int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo);
static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) {
if (TD_RES_QUERY(res)) return &(((SRequestObj*)res)->body.resInfo);
return tmqGetCurResInfo(res);
@ -349,8 +351,8 @@ __async_send_cb_fn_t getMsgRspHandle(int32_t msgType);
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo,
STscObj **p);
int32_t createTscObj(const char* user, const char* auth, const char* db, int32_t connType, SAppInstInfo* pAppInfo,
STscObj** p);
void destroyTscObj(void* pObj);
STscObj* acquireTscObj(int64_t rid);
void releaseTscObj(int64_t rid);
@ -358,7 +360,7 @@ void destroyAppInst(void* pAppInfo);
uint64_t generateRequestId();
int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest);
int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj** pRequest);
void destroyRequest(SRequestObj* pRequest);
SRequestObj* acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid);
@ -372,9 +374,9 @@ void resetConnectDB(STscObj* pTscObj);
int taos_options_imp(TSDB_OPTION option, const char* str);
int32_t openTransporter(const char* user, const char* auth, int32_t numOfThreads, void **pDnodeConn);
void tscStopCrashReport();
void cleanupAppInfo();
int32_t openTransporter(const char* user, const char* auth, int32_t numOfThreads, void** pDnodeConn);
void tscStopCrashReport();
void cleanupAppInfo();
typedef struct AsyncArg {
SRpcMsg msg;
@ -402,17 +404,17 @@ int32_t hbMgrInit();
void hbMgrCleanUp();
// cluster level
int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMgr);
void appHbMgrCleanup(void);
void hbRemoveAppHbMrg(SAppHbMgr** pAppHbMgr);
void destroyAllRequests(SHashObj* pRequests);
void stopAllRequests(SHashObj* pRequests);
int32_t appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key, SAppHbMgr** pAppHbMgr);
void appHbMgrCleanup(void);
void hbRemoveAppHbMrg(SAppHbMgr** pAppHbMgr);
void destroyAllRequests(SHashObj* pRequests);
void stopAllRequests(SHashObj* pRequests);
//SAppInstInfo* getAppInstInfo(const char* clusterKey);
// SAppInstInfo* getAppInstInfo(const char* clusterKey);
// conn level
int32_t hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
void hbDeregisterConn(STscObj* pTscObj, SClientHbKey connKey);
void hbDeregisterConn(STscObj* pTscObj, SClientHbKey connKey);
typedef struct SSqlCallbackWrapper {
SParseContext* pParseCtx;
@ -421,9 +423,9 @@ typedef struct SSqlCallbackWrapper {
void* pPlanInfo;
} SSqlCallbackWrapper;
void setQueryRequest(int64_t rId);
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
void setQueryRequest(int64_t rId);
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper);
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);
@ -431,20 +433,21 @@ void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta);
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView);
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog);
int32_t qnodeRequired(SRequestObj* pRequest, bool *required);
int32_t qnodeRequired(SRequestObj* pRequest, bool* required);
void continueInsertFromCsv(SSqlCallbackWrapper* pWrapper, SRequestObj* pRequest);
void destorySqlCallbackWrapper(SSqlCallbackWrapper* pWrapper);
void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code);
void restartAsyncQuery(SRequestObj *pRequest, int32_t code);
int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest);
int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce);
void handleQueryAnslyseRes(SSqlCallbackWrapper* pWrapper, SMetaData* pResultMeta, int32_t code);
void restartAsyncQuery(SRequestObj* pRequest, int32_t code);
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest);
int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper** ppWrapper, SRequestObj* pRequest, bool updateMetaForce);
void returnToUser(SRequestObj* pRequest);
void stopAllQueries(SRequestObj *pRequest);
void stopAllQueries(SRequestObj* pRequest);
void doRequestCallback(SRequestObj* pRequest, int32_t code);
void freeQueryParam(SSyncQueryParam* param);
#ifdef TD_ENTERPRISE
int32_t clientParseSqlImpl(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effeciveUser, SParseSqlRes* pRes);
int32_t clientParseSqlImpl(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effeciveUser,
SParseSqlRes* pRes);
#endif
#define TSC_ERR_RET(c) \
@ -474,13 +477,9 @@ int32_t clientParseSqlImpl(void* param, const char* dbName, const char* sql, boo
void slowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost);
enum {
MONITORSQLTYPESELECT = 0,
MONITORSQLTYPEINSERT = 1,
MONITORSQLTYPEDELETE = 2
};
enum { MONITORSQLTYPESELECT = 0, MONITORSQLTYPEINSERT = 1, MONITORSQLTYPEDELETE = 2 };
void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type);
void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type);
void tmqMgmtClose(void);

View File

@ -1249,7 +1249,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
}
}
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
int32_t code = 0;
if (pQuery->pRoot) {
@ -1335,8 +1335,6 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
*res = pRequest->body.resInfo.execRes.res;
pRequest->body.resInfo.execRes.res = NULL;
}
return pRequest;
}
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
@ -2934,8 +2932,8 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t s
return NULL;
}
code = tsem_destroy(&param->sem);
if(TSDB_CODE_SUCCESS != code) {
tscError("failed to destroy semaphore since %s", tstrerror(code));
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to destroy semaphore since %s", tstrerror(code));
}
SRequestObj* pRequest = NULL;

View File

@ -394,7 +394,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
uint8_t tagNum = pCreateReq->ctb.tagNum;
int32_t code = 0;
cJSON* tags = NULL;
cJSON* tableName = cJSON_CreateString(name);
cJSON* tableName = cJSON_CreateString(name);
RAW_NULL_CHECK(tableName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName));
cJSON* using = cJSON_CreateString(sname);
@ -417,7 +417,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
}
char* pJson = NULL;
parseTagDatatoJson(pTag, &pJson);
if(pJson == NULL) {
if (pJson == NULL) {
uError("parseTagDatatoJson failed, pJson == NULL");
goto end;
}
@ -731,7 +731,7 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
goto end;
}
parseTagDatatoJson(vAlterTbReq.pTagVal, &buf);
if(buf == NULL) {
if (buf == NULL) {
uError("parseTagDatatoJson failed, buf == NULL");
goto end;
}
@ -978,7 +978,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
pQuery.msgType = pQuery.pCmdMsg->msgType;
pQuery.stableQuery = true;
(void)launchQueryImpl(pRequest, &pQuery, true, NULL); // ignore, because return value is pRequest
launchQueryImpl(pRequest, &pQuery, true, NULL); // ignore, because return value is pRequest
taosMemoryFree(pCmdMsg.pMsg);
@ -1082,7 +1082,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
pQuery.msgType = pQuery.pCmdMsg->msgType;
pQuery.stableQuery = true;
(void)launchQueryImpl(pRequest, &pQuery, true, NULL); // ignore, because return value is pRequest
launchQueryImpl(pRequest, &pQuery, true, NULL); // ignore, because return value is pRequest
taosMemoryFree(pCmdMsg.pMsg);
if (pRequest->code == TSDB_CODE_SUCCESS) {
// ignore the error code
@ -1236,7 +1236,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
(void)launchQueryImpl(pRequest, pQuery, true, NULL);
launchQueryImpl(pRequest, pQuery, true, NULL);
if (pRequest->code == TSDB_CODE_SUCCESS) {
RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
}
@ -1365,7 +1365,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
if (TSDB_CODE_SUCCESS != code) goto end;
RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
(void)launchQueryImpl(pRequest, pQuery, true, NULL);
launchQueryImpl(pRequest, pQuery, true, NULL);
if (pRequest->code == TSDB_CODE_SUCCESS) {
RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
}
@ -1510,7 +1510,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
if (TSDB_CODE_SUCCESS != code) goto end;
RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
(void)launchQueryImpl(pRequest, pQuery, true, NULL);
launchQueryImpl(pRequest, pQuery, true, NULL);
pVgData = NULL;
pArray = NULL;
@ -1587,7 +1587,7 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat
RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0));
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
(void)launchQueryImpl(pRequest, pQuery, true, NULL);
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
end:
@ -1647,7 +1647,7 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha
RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0));
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
(void)launchQueryImpl(pRequest, pQuery, true, NULL);
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
end:
@ -1766,7 +1766,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
(void)launchQueryImpl(pRequest, pQuery, true, NULL);
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
end:
@ -1935,7 +1935,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
(void)launchQueryImpl(pRequest, pQuery, true, NULL);
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
end:

View File

@ -112,7 +112,7 @@ static int32_t smlCheckAuth(SSmlHandle *info, SRequestConnInfo *conn, const char
SUserAuthInfo pAuth = {0};
(void)snprintf(pAuth.user, sizeof(pAuth.user), "%s", info->taos->user);
if (NULL == pTabName) {
if (tNameSetDbName(&pAuth.tbName, info->taos->acctId, info->pRequest->pDb, strlen(info->pRequest->pDb)) != 0){
if (tNameSetDbName(&pAuth.tbName, info->taos->acctId, info->pRequest->pDb, strlen(info->pRequest->pDb)) != 0) {
return TSDB_CODE_SML_INVALID_DATA;
}
} else {
@ -165,7 +165,7 @@ int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, u
return convertTimePrecision(tsInt64, fromPrecision, toPrecision);
}
int32_t smlBuildTableInfo(int numRows, const char *measure, int32_t measureLen, SSmlTableInfo** tInfo) {
int32_t smlBuildTableInfo(int numRows, const char *measure, int32_t measureLen, SSmlTableInfo **tInfo) {
SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
if (!tag) {
return terrno;
@ -203,13 +203,13 @@ static void smlDestroySTableMeta(void *para) {
taosMemoryFree(meta);
}
int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSmlSTableMeta** sMeta) {
int32_t code = TSDB_CODE_SUCCESS;
char *measure = currElement->measure;
int measureLen = currElement->measureLen;
int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSmlSTableMeta **sMeta) {
int32_t code = TSDB_CODE_SUCCESS;
char *measure = currElement->measure;
int measureLen = currElement->measureLen;
if (currElement->measureEscaped) {
measure = (char *)taosMemoryMalloc(measureLen);
if (measure == NULL){
if (measure == NULL) {
return terrno;
}
(void)memcpy(measure, currElement->measure, measureLen);
@ -233,7 +233,7 @@ int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSml
}
(*sMeta)->tableMeta = pTableMeta;
code = taosHashPut(info->superTables, currElement->measure, currElement->measureLen, sMeta, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS){
if (code != TSDB_CODE_SUCCESS) {
smlDestroySTableMeta(*sMeta);
return code;
}
@ -250,11 +250,11 @@ int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSml
}
if (i < pTableMeta->tableInfo.numOfColumns) {
if(taosArrayPush((*sMeta)->cols, &kv) == NULL){
if (taosArrayPush((*sMeta)->cols, &kv) == NULL) {
return terrno;
}
} else {
if(taosArrayPush((*sMeta)->tags, &kv) == NULL){
if (taosArrayPush((*sMeta)->tags, &kv) == NULL) {
return terrno;
}
}
@ -277,7 +277,7 @@ bool isSmlColAligned(SSmlHandle *info, int cnt, SSmlKv *kv) {
goto END;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxColKVs, cnt);
if (maxKV == NULL){
if (maxKV == NULL) {
goto END;
}
if (unlikely(!IS_SAME_KEY)) {
@ -336,9 +336,9 @@ int32_t smlJoinMeasureTag(SSmlLineInfo *elements) {
return TSDB_CODE_SUCCESS;
}
static bool smlIsPKTable(STableMeta *pTableMeta){
for(int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++){
if(pTableMeta->schema[i].flags & COL_IS_KEY){
static bool smlIsPKTable(STableMeta *pTableMeta) {
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
if (pTableMeta->schema[i].flags & COL_IS_KEY) {
return true;
}
}
@ -368,14 +368,14 @@ int32_t smlProcessSuperTable(SSmlHandle *info, SSmlLineInfo *elements) {
info->maxTagKVs = sMeta->tags;
info->maxColKVs = sMeta->cols;
if(smlIsPKTable(sMeta->tableMeta)){
if (smlIsPKTable(sMeta->tableMeta)) {
return TSDB_CODE_SML_NOT_SUPPORT_PK;
}
return 0;
}
int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
SSmlTableInfo **oneTable =
(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureTagsLen);
SSmlTableInfo *tinfo = NULL;
@ -385,19 +385,19 @@ int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements) {
return code;
}
code = taosHashPut(info->childTables, elements->measureTag, elements->measureTagsLen, &tinfo, POINTER_BYTES);
if(code != 0){
if (code != 0) {
smlDestroyTableInfo(&tinfo);
return code;
}
tinfo->tags = taosArrayDup(info->preLineTagKV, NULL);
if(tinfo->tags == NULL){
if (tinfo->tags == NULL) {
smlDestroyTableInfo(&tinfo);
return TSDB_CODE_OUT_OF_MEMORY;
}
for (size_t i = 0; i < taosArrayGetSize(info->preLineTagKV); i++) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(info->preLineTagKV, i);
if(kv == NULL){
if (kv == NULL) {
smlDestroyTableInfo(&tinfo);
return TSDB_CODE_SML_INVALID_DATA;
}
@ -406,12 +406,12 @@ int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements) {
}
code = smlSetCTableName(tinfo, info->tbnameKey);
if (code != TSDB_CODE_SUCCESS){
if (code != TSDB_CODE_SUCCESS) {
smlDestroyTableInfo(&tinfo);
return code;
}
code = getTableUid(info, elements, tinfo);
if (code != TSDB_CODE_SUCCESS){
if (code != TSDB_CODE_SUCCESS) {
smlDestroyTableInfo(&tinfo);
return code;
}
@ -458,10 +458,10 @@ int32_t smlParseEndTelnetJson(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *
return terrno;
}
}
if (taosArrayPush(elements->colArray, kvTs) == NULL){
if (taosArrayPush(elements->colArray, kvTs) == NULL) {
return terrno;
}
if (taosArrayPush(elements->colArray, kv) == NULL){
if (taosArrayPush(elements->colArray, kv) == NULL) {
return terrno;
}
}
@ -495,11 +495,11 @@ int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs)
static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnameKey) {
bool autoChildName = false;
size_t delimiter = strlen(tsSmlAutoChildTableNameDelimiter);
if(delimiter > 0 && tbnameKey == NULL){
if (delimiter > 0 && tbnameKey == NULL) {
size_t totalNameLen = delimiter * (taosArrayGetSize(tags) - 1);
for (int i = 0; i < taosArrayGetSize(tags); i++) {
SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i);
if(tag == NULL){
if (tag == NULL) {
return TSDB_CODE_SML_INVALID_DATA;
}
totalNameLen += tag->length;
@ -512,7 +512,7 @@ static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnam
(void)memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
for (int i = 0; i < taosArrayGetSize(tags); i++) {
SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i);
if(tag == NULL){
if (tag == NULL) {
return TSDB_CODE_SML_INVALID_DATA;
}
(void)strncat(childTableName, tag->value, tag->length);
@ -523,8 +523,8 @@ static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnam
if (tsSmlDot2Underline) {
smlStrReplace(childTableName, strlen(childTableName));
}
}else{
if (tbnameKey == NULL){
} else {
if (tbnameKey == NULL) {
tbnameKey = tsSmlChildTableName;
}
size_t childTableNameLen = strlen(tbnameKey);
@ -532,13 +532,14 @@ static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnam
for (int i = 0; i < taosArrayGetSize(tags); i++) {
SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i);
if(tag == NULL){
if (tag == NULL) {
return TSDB_CODE_SML_INVALID_DATA;
}
// handle child table name
if (childTableNameLen == tag->keyLen && strncmp(tag->key, tbnameKey, tag->keyLen) == 0) {
(void)memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
(void)strncpy(childTableName, tag->value, (tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN));
(void)strncpy(childTableName, tag->value,
(tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN));
if (tsSmlDot2Underline) {
smlStrReplace(childTableName, strlen(childTableName));
}
@ -553,7 +554,7 @@ static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnam
int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) {
int32_t code = smlParseTableName(oneTable->tags, oneTable->childTableName, tbnameKey);
if(code != TSDB_CODE_SUCCESS){
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -562,7 +563,7 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) {
if (dst == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if(oneTable->sTableNameLen >= TSDB_TABLE_NAME_LEN){
if (oneTable->sTableNameLen >= TSDB_TABLE_NAME_LEN) {
uError("SML:smlSetCTableName super table name is too long");
taosArrayDestroy(dst);
return TSDB_CODE_SML_INTERNAL_ERROR;
@ -578,7 +579,7 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) {
}
code = buildChildTableName(&rName);
if (code != TSDB_CODE_SUCCESS){
if (code != TSDB_CODE_SUCCESS) {
return code;
}
taosArrayDestroy(dst);
@ -906,13 +907,13 @@ static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
return result;
}
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, SArray *checkDumplicateCols,
ESchemaAction *action, bool isTag) {
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
SArray *checkDumplicateCols, ESchemaAction *action, bool isTag) {
int32_t code = TSDB_CODE_SUCCESS;
for (int j = 0; j < taosArrayGetSize(cols); ++j) {
if (j == 0 && !isTag) continue;
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
if (kv == NULL){
if (kv == NULL) {
return TSDB_CODE_SML_INVALID_DATA;
}
code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
@ -923,10 +924,10 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH
for (int j = 0; j < taosArrayGetSize(checkDumplicateCols); ++j) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(checkDumplicateCols, j);
if (kv == NULL){
if (kv == NULL) {
return TSDB_CODE_SML_INVALID_DATA;
}
if(taosHashGet(schemaHash, kv->key, kv->keyLen) != NULL){
if (taosHashGet(schemaHash, kv->key, kv->keyLen) != NULL) {
return TSDB_CODE_PAR_DUPLICATED_COLUMN;
}
}
@ -934,16 +935,16 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH
}
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (hashTmp == NULL) {
code = terrno;
goto END;
}
int32_t i = 0;
int32_t i = 0;
for (; i < length; i++) {
code = taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
if (code != 0){
if (code != 0) {
goto END;
}
}
@ -955,7 +956,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool
}
for (; i < taosArrayGetSize(cols); i++) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
if (kv == NULL){
if (kv == NULL) {
code = TSDB_CODE_SML_INVALID_DATA;
goto END;
}
@ -982,8 +983,8 @@ static int32_t getBytes(uint8_t type, int32_t length) {
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
SArray *results, int32_t numOfCols, bool isTag) {
for (int j = 0; j < taosArrayGetSize(cols); ++j) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
if (kv == NULL){
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
if (kv == NULL) {
return TSDB_CODE_SML_INVALID_DATA;
}
ESchemaAction action = SCHEMA_ACTION_NULL;
@ -996,7 +997,7 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
field.type = kv->type;
field.bytes = getBytes(kv->type, kv->length);
(void)memcpy(field.name, kv->key, kv->keyLen);
if (taosArrayPush(results, &field) == NULL){
if (taosArrayPush(results, &field) == NULL) {
return terrno;
}
} else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
@ -1008,7 +1009,7 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
uint16_t newIndex = *index;
if (isTag) newIndex -= numOfCols;
SField *field = (SField *)taosArrayGet(results, newIndex);
if (field == NULL){
if (field == NULL) {
return TSDB_CODE_SML_INVALID_DATA;
}
field->bytes = getBytes(kv->type, kv->length);
@ -1019,7 +1020,7 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
int32_t len = 0;
for (int j = 0; j < taosArrayGetSize(results); ++j) {
SField *field = taosArrayGet(results, j);
if (field == NULL){
if (field == NULL) {
return TSDB_CODE_SML_INVALID_DATA;
}
len += field->bytes;
@ -1051,14 +1052,14 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
}
for (int32_t i = 0; i < pReq.numOfColumns; ++i) {
SField *pField = taosArrayGet(pColumns, i);
if (pField == NULL){
if (pField == NULL) {
code = TSDB_CODE_SML_INVALID_DATA;
goto end;
}
SFieldWithOptions fieldWithOption = {0};
setFieldWithOptions(&fieldWithOption, pField);
setDefaultOptionsForField(&fieldWithOption);
if (taosArrayPush(pReq.pColumns, &fieldWithOption) == NULL){
if (taosArrayPush(pReq.pColumns, &fieldWithOption) == NULL) {
code = terrno;
goto end;
}
@ -1105,7 +1106,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
field.type = TSDB_DATA_TYPE_NCHAR;
field.bytes = TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
tstrncpy(field.name, tsSmlTagName, sizeof(field.name));
if (taosArrayPush(pReq.pTags, &field) == NULL){
if (taosArrayPush(pReq.pTags, &field) == NULL) {
code = terrno;
goto end;
}
@ -1121,7 +1122,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
pCmdMsg.msgType = TDMT_MND_CREATE_STB;
pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
if (pCmdMsg.msgLen < 0){
if (pCmdMsg.msgLen < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
@ -1131,7 +1132,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
goto end;
}
if (tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) < 0){
if (tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pCmdMsg.pMsg);
goto end;
@ -1144,11 +1145,11 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
pQuery.msgType = pQuery.pCmdMsg->msgType;
pQuery.stableQuery = true;
(void)launchQueryImpl(pRequest, &pQuery, true, NULL); // no need to check return value
launchQueryImpl(pRequest, &pQuery, true, NULL); // no need to check return value
if (pRequest->code == TSDB_CODE_SUCCESS) {
code = catalogRemoveTableMeta(info->pCatalog, pName);
if (code != TSDB_CODE_SUCCESS){
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
}
@ -1187,7 +1188,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
size_t superTableLen = 0;
void *superTable = taosHashGetKey(tmp, &superTableLen);
char *measure = taosMemoryMalloc(superTableLen);
if (measure == NULL){
if (measure == NULL) {
code = terrno;
goto end;
}
@ -1246,28 +1247,28 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
goto end;
}
} else if (code == TSDB_CODE_SUCCESS) {
if(smlIsPKTable(pTableMeta)){
if (smlIsPKTable(pTableMeta)) {
code = TSDB_CODE_SML_NOT_SUPPORT_PK;
goto end;
}
hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
HASH_NO_LOCK);
if (hashTmp == NULL){
if (hashTmp == NULL) {
code = terrno;
goto end;
}
for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
code = taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
if (code != 0){
if (code != 0) {
goto end;
}
}
ESchemaAction action = SCHEMA_ACTION_NULL;
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, sTableData->cols, &action, true);
code =
smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, sTableData->cols, &action, true);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
@ -1280,13 +1281,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
action);
SArray *pColumns =
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
if (pColumns == NULL){
if (pColumns == NULL) {
code = terrno;
goto end;
}
SArray *pTags =
taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
if (pTags == NULL){
if (pTags == NULL) {
taosArrayDestroy(pColumns);
code = terrno;
goto end;
@ -1297,14 +1298,14 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
field.bytes = pTableMeta->schema[i].bytes;
tstrncpy(field.name, pTableMeta->schema[i].name, sizeof(field.name));
if (i < pTableMeta->tableInfo.numOfColumns) {
if (taosArrayPush(pColumns, &field) == NULL){
if (taosArrayPush(pColumns, &field) == NULL) {
taosArrayDestroy(pColumns);
taosArrayDestroy(pTags);
code = terrno;
goto end;
}
} else {
if (taosArrayPush(pTags, &field) == NULL){
if (taosArrayPush(pTags, &field) == NULL) {
taosArrayDestroy(pColumns);
taosArrayDestroy(pTags);
code = terrno;
@ -1363,7 +1364,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
}
}
action = SCHEMA_ACTION_NULL;
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, sTableData->tags, &action, false);
code =
smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, sTableData->tags, &action, false);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
@ -1376,13 +1378,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
action);
SArray *pColumns =
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
if (pColumns == NULL){
if (pColumns == NULL) {
code = terrno;
goto end;
}
SArray *pTags =
taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
if (pTags == NULL){
if (pTags == NULL) {
taosArrayDestroy(pColumns);
code = terrno;
goto end;
@ -1393,14 +1395,14 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
field.bytes = pTableMeta->schema[i].bytes;
tstrncpy(field.name, pTableMeta->schema[i].name, sizeof(field.name));
if (i < pTableMeta->tableInfo.numOfColumns) {
if (taosArrayPush(pColumns, &field) == NULL){
if (taosArrayPush(pColumns, &field) == NULL) {
taosArrayDestroy(pColumns);
taosArrayDestroy(pTags);
code = terrno;
goto end;
}
} else {
if (taosArrayPush(pTags, &field) == NULL){
if (taosArrayPush(pTags, &field) == NULL) {
taosArrayDestroy(pColumns);
taosArrayDestroy(pTags);
code = terrno;
@ -1483,7 +1485,7 @@ end:
taosHashCancelIterate(info->superTables, tmp);
taosHashCleanup(hashTmp);
taosMemoryFreeClear(pTableMeta);
(void)catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); // ignore refresh meta code if there is an error
(void)catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); // ignore refresh meta code if there is an error
uError("SML:0x%" PRIx64 " smlModifyDBSchemas end failed:%d:%s, format:%d, needModifySchema:%d", info->id, code,
tstrerror(code), info->dataFormat, info->needModifySchema);
@ -1494,34 +1496,35 @@ static int32_t smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
terrno = 0;
for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
if (kv == NULL){
if (kv == NULL) {
return TSDB_CODE_SML_INVALID_DATA;
}
int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
if (ret == 0) {
if (taosArrayPush(metaArray, kv) == NULL){
if (taosArrayPush(metaArray, kv) == NULL) {
return terrno;
}
if(taosHashGet(checkDuplicate, kv->key, kv->keyLen) != NULL) {
if (taosHashGet(checkDuplicate, kv->key, kv->keyLen) != NULL) {
return TSDB_CODE_PAR_DUPLICATED_COLUMN;
}
}else if(terrno == TSDB_CODE_DUP_KEY){
} else if (terrno == TSDB_CODE_DUP_KEY) {
return TSDB_CODE_PAR_DUPLICATED_COLUMN;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg, SHashObj* checkDuplicate) {
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg,
SHashObj *checkDuplicate) {
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
if (kv == NULL){
if (kv == NULL) {
return TSDB_CODE_SML_INVALID_DATA;
}
int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
if (index) {
SSmlKv *value = (SSmlKv *)taosArrayGet(metaArray, *index);
if (value == NULL){
if (value == NULL) {
return TSDB_CODE_SML_INVALID_DATA;
}
@ -1549,13 +1552,13 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
int16_t size = tmp;
int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
if (ret == 0) {
if(taosArrayPush(metaArray, kv) == NULL){
if (taosArrayPush(metaArray, kv) == NULL) {
return terrno;
}
if(taosHashGet(checkDuplicate, kv->key, kv->keyLen) != NULL) {
if (taosHashGet(checkDuplicate, kv->key, kv->keyLen) != NULL) {
return TSDB_CODE_PAR_DUPLICATED_COLUMN;
}
}else{
} else {
return ret;
}
}
@ -1586,7 +1589,7 @@ void freeSSmlKv(void *data) {
void smlDestroyInfo(SSmlHandle *info) {
if (!info) return;
// qDestroyQuery(info->pQuery);
// qDestroyQuery(info->pQuery);
taosHashCleanup(info->pVgHash);
taosHashCleanup(info->childTables);
@ -1657,7 +1660,7 @@ int32_t smlBuildSmlInfo(TAOS *taos, SSmlHandle **handle) {
info->id = smlGenId();
code = smlInitHandle(&info->pQuery);
if (code != TSDB_CODE_SUCCESS){
if (code != TSDB_CODE_SUCCESS) {
goto FAILED;
}
info->dataFormat = true;
@ -1688,7 +1691,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
}
for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
if (kv == NULL){
if (kv == NULL) {
taosHashCleanup(kvHash);
return TSDB_CODE_SML_INVALID_DATA;
}
@ -1698,7 +1701,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
taosHashCleanup(kvHash);
return TSDB_CODE_PAR_DUPLICATED_COLUMN;
}
if (code != TSDB_CODE_SUCCESS){
if (code != TSDB_CODE_SUCCESS) {
taosHashCleanup(kvHash);
return code;
}
@ -1759,9 +1762,11 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
if (tableMeta) { // update meta
uDebug("SML:0x%" PRIx64 " smlParseLineBottom update meta, format:%d, linenum:%d", info->id, info->dataFormat,
info->lineNum);
ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf, (*tableMeta)->tagHash);
ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf,
(*tableMeta)->tagHash);
if (ret == TSDB_CODE_SUCCESS) {
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf, (*tableMeta)->colHash);
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf,
(*tableMeta)->colHash);
}
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlUpdateMeta failed, ret:%d", info->id, ret);
@ -1801,17 +1806,17 @@ static int32_t smlInsertData(SSmlHandle *info) {
if (info->pRequest->dbList == NULL) {
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
if (info->pRequest->dbList == NULL){
if (info->pRequest->dbList == NULL) {
return terrno;
}
}
char *data = (char *)taosArrayReserve(info->pRequest->dbList, 1);
if (data == NULL){
if (data == NULL) {
return terrno;
}
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
(void)tNameGetFullDbName(&pName, data); //ignore
(void)tNameGetFullDbName(&pName, data); // ignore
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
while (oneTable) {
@ -1819,7 +1824,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
int measureLen = tableData->sTableNameLen;
char *measure = (char *)taosMemoryMalloc(tableData->sTableNameLen);
if (measure == NULL){
if (measure == NULL) {
return terrno;
}
(void)memcpy(measure, tableData->sTableName, tableData->sTableNameLen);
@ -1830,11 +1835,11 @@ static int32_t smlInsertData(SSmlHandle *info) {
if (info->pRequest->tableList == NULL) {
info->pRequest->tableList = taosArrayInit(1, sizeof(SName));
if (info->pRequest->tableList == NULL){
if (info->pRequest->tableList == NULL) {
return terrno;
}
}
if (taosArrayPush(info->pRequest->tableList, &pName) == NULL){
if (taosArrayPush(info->pRequest->tableList, &pName) == NULL) {
return terrno;
}
@ -1862,7 +1867,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
return code;
}
code = taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
if (code != TSDB_CODE_SUCCESS){
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " taosHashPut failed. table name: %s", info->id, tableData->childTableName);
taosMemoryFree(measure);
taosHashCancelIterate(info->childTables, oneTable);
@ -1904,9 +1909,9 @@ static int32_t smlInsertData(SSmlHandle *info) {
info->cost.insertRpcTime = taosGetTimestampUs();
SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
(void)atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); // no need to check return code
(void)atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); // no need to check return code
(void)launchQueryImpl(info->pRequest, info->pQuery, true, NULL); // no need to check return code
launchQueryImpl(info->pRequest, info->pQuery, true, NULL); // no need to check return code
uDebug("SML:0x%" PRIx64 " smlInsertData end, format:%d, code:%d,%s", info->id, info->dataFormat, info->pRequest->code,
tstrerror(info->pRequest->code));
@ -1975,12 +1980,12 @@ static bool getLine(SSmlHandle *info, char *lines[], char **rawLine, char *rawLi
if (*rawLine != NULL && (uDebugFlag & DEBUG_DEBUG)) {
char *print = taosMemoryCalloc(*len + 1, 1);
if (print != NULL){
if (print != NULL) {
(void)memcpy(print, *tmp, *len);
uDebug("SML:0x%" PRIx64 " smlParseLine is raw, numLines:%d, protocol:%d, len:%d, data:%s", info->id, numLines,
info->protocol, *len, print);
taosMemoryFree(print);
} else{
} else {
uError("SML:0x%" PRIx64 " smlParseLine taosMemoryCalloc failed", info->id);
}
} else {
@ -2228,7 +2233,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
uInfo("SML:%" PRIx64 " retry:%d/10,ver is old retry or object is creating code:%d, msg:%s", info->id, cnt, code,
tstrerror(code));
code = refreshMeta(request->pTscObj, request);
if (code != 0){
if (code != 0) {
uInfo("SML:%" PRIx64 " refresh meta error code:%d, msg:%s", info->id, code, tstrerror(code));
}
smlDestroyInfo(info);
@ -2266,7 +2271,7 @@ end:
*/
TAOS_RES *taos_schemaless_insert_ttl_with_reqid_tbname_key(TAOS *taos, char *lines[], int numLines, int protocol,
int precision, int32_t ttl, int64_t reqid, char *tbnameKey){
int precision, int32_t ttl, int64_t reqid, char *tbnameKey) {
return taos_schemaless_insert_inner(taos, lines, NULL, NULL, numLines, protocol, precision, ttl, reqid, tbnameKey);
}
@ -2306,14 +2311,17 @@ static void getRawLineLen(char *lines, int len, int32_t *totalRows, int protocol
}
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid_tbname_key(TAOS *taos, char *lines, int len, int32_t *totalRows,
int protocol, int precision, int32_t ttl, int64_t reqid, char *tbnameKey){
int protocol, int precision, int32_t ttl, int64_t reqid,
char *tbnameKey) {
getRawLineLen(lines, len, totalRows, protocol);
return taos_schemaless_insert_inner(taos, NULL, lines, lines + len, *totalRows, protocol, precision, ttl, reqid, tbnameKey);
return taos_schemaless_insert_inner(taos, NULL, lines, lines + len, *totalRows, protocol, precision, ttl, reqid,
tbnameKey);
}
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
int precision, int32_t ttl, int64_t reqid) {
return taos_schemaless_insert_raw_ttl_with_reqid_tbname_key(taos, lines, len, totalRows, protocol, precision, ttl, reqid, NULL);
return taos_schemaless_insert_raw_ttl_with_reqid_tbname_key(taos, lines, len, totalRows, protocol, precision, ttl,
reqid, NULL);
}
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,

View File

@ -25,7 +25,7 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void**
return terrno;
}
if(taosArrayPush(pTblBuf->pBufList, &buff) == NULL){
if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
return terrno;
}
@ -224,8 +224,8 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags,
bool autoCreateTbl) {
STscStmt* pStmt = (STscStmt*)stmt;
char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t code = tNameExtractFullName(tbName, tbFName);
if (code != 0){
int32_t code = tNameExtractFullName(tbName, tbFName);
if (code != 0) {
return code;
}
@ -772,7 +772,7 @@ void* stmtBindThreadFunc(void* param) {
}
int ret = stmtAsyncOutput(pStmt, asyncParam);
if (ret != 0){
if (ret != 0) {
qError("stmtAsyncOutput failed, reason:%s", tstrerror(ret));
}
}
@ -821,7 +821,7 @@ int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
return terrno;
}
if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL){
if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
return terrno;
}
@ -967,7 +967,7 @@ int32_t stmtInitStbInterlaceTableInfo(STscStmt* pStmt) {
}
int stmtSetDbName(TAOS_STMT* stmt, const char* dbName) {
STscStmt *pStmt = (STscStmt *) stmt;
STscStmt* pStmt = (STscStmt*)stmt;
STMT_DLOG("start to set dbName: %s", dbName);
@ -1045,7 +1045,7 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
SBoundColInfo *tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
SBoundColInfo* tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
if (tags_info->numOfBound <= 0 || tags_info->numOfCols <= 0) {
tscWarn("no tags bound in sql, will not bound tags");
return TSDB_CODE_SUCCESS;
@ -1192,7 +1192,7 @@ static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt* pStmt, SArray**
return terrno;
}
if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL){
if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
return terrno;
}
}
@ -1216,7 +1216,6 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false;
@ -1256,7 +1255,7 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
if (pStmt->sql.pQuery->haveResultSet) {
STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
pStmt->sql.pQuery->numOfResCols));
pStmt->sql.pQuery->numOfResCols));
taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
}
@ -1549,7 +1548,7 @@ int stmtExec(TAOS_STMT* stmt) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
if (STMT_TYPE_QUERY == pStmt->sql.type) {
(void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
} else {
if (pStmt->sql.stbInterlaceMode) {
int64_t startTs = taosGetTimestampUs();
@ -1571,7 +1570,7 @@ int stmtExec(TAOS_STMT* stmt) {
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
}
(void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
}
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {

View File

@ -1645,7 +1645,7 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
__taos_async_fn_t fp = pStmt->options.asyncExecFn;
if (!fp) {
(void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);

View File

@ -63,13 +63,10 @@ int32_t s3Begin() {
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
void s3End() { (void)S3_deinitialize(); }
void s3End() { S3_deinitialize(); }
int32_t s3Init() { TAOS_RETURN(TSDB_CODE_SUCCESS); /*s3Begin();*/ }
void s3CleanUp() { /*s3End();*/
}
static int32_t s3ListBucket(char const *bucketname);
static void s3DumpCfgByEp(int8_t epIndex) {
@ -506,7 +503,9 @@ S3Status initial_multipart_callback(const char *upload_id, void *callbackData) {
}
S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properties, void *callbackData) {
(void)responsePropertiesCallbackNull(properties, callbackData);
if (S3StatusOK != responsePropertiesCallbackNull(properties, callbackData)) {
uError("%s failed at line %d to process null callback.", __func__, __LINE__);
}
MultipartPartData *data = (MultipartPartData *)callbackData;
int seq = data->seq;
@ -517,7 +516,9 @@ S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properti
}
S3Status MultipartResponseProperiesCallbackWithCp(const S3ResponseProperties *properties, void *callbackData) {
(void)responsePropertiesCallbackNull(properties, callbackData);
if (S3StatusOK != responsePropertiesCallbackNull(properties, callbackData)) {
uError("%s failed at line %d to process null callback.", __func__, __LINE__);
}
MultipartPartData *data = (MultipartPartData *)callbackData;
int seq = data->seq;
@ -897,8 +898,6 @@ upload:
if (partData.put_object_data.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg);
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
//(void)cos_cp_dump(&cp);
}
if (!manager.etags[seq - 1]) {
@ -952,7 +951,9 @@ _exit:
}
if (cp.thefile) {
(void)cos_cp_close(cp.thefile);
if (cos_cp_close(cp.thefile)) {
uError("%s failed at line %d to close cp file.", __func__, lino);
}
}
if (cp.parts) {
taosMemoryFree(cp.parts);
@ -1293,8 +1294,8 @@ void s3DeleteObjectsByPrefix(const char *prefix) {
SArray *objectArray = getListByPrefix(prefix);
if (objectArray == NULL) return;
int32_t code = s3DeleteObjects(TARRAY_DATA(objectArray), TARRAY_SIZE(objectArray));
if (code) {
uError("failed to delete objects with prefix %s", prefix);
if (!code) {
uError("%s failed at line %d since %s.", __func__, __LINE__, tstrerror(code));
}
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
}
@ -1542,7 +1543,7 @@ int32_t s3Init() {
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
void s3CleanUp() { cos_http_io_deinitialize(); }
// void s3CleanUp() { cos_http_io_deinitialize(); }
static void log_status(cos_status_t *s) {
cos_warn_log("status->code: %d", s->code);
@ -1748,20 +1749,20 @@ bool s3Get(const char *object_name, const char *path) {
cos_table_t *headers = NULL;
int traffic_limit = 0;
//创建内存池
// 创建内存池
cos_pool_create(&p, NULL);
//初始化请求选项
// 初始化请求选项
options = cos_request_options_create(p);
s3InitRequestOptions(options, is_cname);
cos_str_set(&bucket, tsS3BucketName);
if (traffic_limit) {
//限速值设置范围为819200 - 838860800即100KB/s - 100MB/s如果超出该范围将返回400错误
// 限速值设置范围为819200 - 838860800即100KB/s - 100MB/s如果超出该范围将返回400错误
headers = cos_table_make(p, 1);
cos_table_add_int(headers, "x-cos-traffic-limit", 819200);
}
//下载对象
// 下载对象
cos_str_set(&file, path);
cos_str_set(&object, object_name);
s = cos_get_object_to_file(options, &bucket, &object, headers, NULL, &file, &resp_headers);
@ -1772,7 +1773,7 @@ bool s3Get(const char *object_name, const char *path) {
cos_warn_log("get object failed\n");
}
//销毁内存池
// 销毁内存池
cos_pool_destroy(p);
return ret;
@ -1794,10 +1795,10 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_
// int traffic_limit = 0;
char range_buf[64];
//创建内存池
// 创建内存池
cos_pool_create(&p, NULL);
//初始化请求选项
// 初始化请求选项
options = cos_request_options_create(p);
// init_test_request_options(options, is_cname);
s3InitRequestOptions(options, is_cname);
@ -1846,7 +1847,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_
// cos_warn_log("Download data=%s", buf);
_exit:
//销毁内存池
// 销毁内存池
cos_pool_destroy(p);
*ppBlock = buf;
@ -1935,15 +1936,15 @@ long s3Size(const char *object_name) {
cos_string_t object;
cos_table_t *resp_headers = NULL;
//创建内存池
// 创建内存池
cos_pool_create(&p, NULL);
//初始化请求选项
// 初始化请求选项
options = cos_request_options_create(p);
s3InitRequestOptions(options, is_cname);
cos_str_set(&bucket, tsS3BucketName);
//获取对象元数据
// 获取对象元数据
cos_str_set(&object, object_name);
s = cos_head_object(options, &bucket, &object, NULL, &resp_headers);
// print_headers(resp_headers);
@ -1957,7 +1958,7 @@ long s3Size(const char *object_name) {
cos_warn_log("head object failed\n");
}
//销毁内存池
// 销毁内存池
cos_pool_destroy(p);
return size;
@ -1966,7 +1967,6 @@ long s3Size(const char *object_name) {
#else
int32_t s3Init() { return 0; }
void s3CleanUp() {}
int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; }
int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp) { return 0; }
int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) { return 0; }

View File

@ -309,7 +309,7 @@ int32_t cos_cp_dump(SCheckpoint* cp) {
if (!item) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
}
(void)cJSON_AddItemToArray(ajson, item);
if (!cJSON_AddItemToArray(ajson, item)) goto _exit;
if (NULL == cJSON_AddNumberToObject(item, "index", cp->parts[i].index)) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);

View File

@ -81,11 +81,21 @@ static void dmSetAssert(int32_t signum, void *sigInfo, void *context) { tsAssert
static void dmStopDnode(int signum, void *sigInfo, void *context) {
// taosIgnSignal(SIGUSR1);
// taosIgnSignal(SIGUSR2);
(void)taosIgnSignal(SIGTERM);
(void)taosIgnSignal(SIGHUP);
(void)taosIgnSignal(SIGINT);
(void)taosIgnSignal(SIGABRT);
(void)taosIgnSignal(SIGBREAK);
if (taosIgnSignal(SIGTERM) != 0) {
dWarn("failed to ignore signal SIGTERM");
}
if (taosIgnSignal(SIGHUP) != 0) {
dWarn("failed to ignore signal SIGHUP");
}
if (taosIgnSignal(SIGINT) != 0) {
dWarn("failed to ignore signal SIGINT");
}
if (taosIgnSignal(SIGABRT) != 0) {
dWarn("failed to ignore signal SIGABRT");
}
if (taosIgnSignal(SIGBREAK) != 0) {
dWarn("failed to ignore signal SIGBREAK");
}
dInfo("shut down signal is %d", signum);
#ifndef WINDOWS
@ -103,11 +113,19 @@ void dmLogCrash(int signum, void *sigInfo, void *context) {
// taosIgnSignal(SIGBREAK);
#ifndef WINDOWS
(void)taosIgnSignal(SIGBUS);
if (taosIgnSignal(SIGBUS) != 0) {
dWarn("failed to ignore signal SIGBUS");
}
#endif
(void)taosIgnSignal(SIGABRT);
(void)taosIgnSignal(SIGFPE);
(void)taosIgnSignal(SIGSEGV);
if (taosIgnSignal(SIGABRT) != 0) {
dWarn("failed to ignore signal SIGABRT");
}
if (taosIgnSignal(SIGFPE) != 0) {
dWarn("failed to ignore signal SIGABRT");
}
if (taosIgnSignal(SIGSEGV) != 0) {
dWarn("failed to ignore signal SIGABRT");
}
char *pMsg = NULL;
const char *flags = "UTL FATAL ";
@ -136,24 +154,31 @@ _return:
}
static void dmSetSignalHandle() {
(void)taosSetSignal(SIGUSR1, dmSetDebugFlag);
(void)taosSetSignal(SIGUSR2, dmSetAssert);
(void)taosSetSignal(SIGTERM, dmStopDnode);
(void)taosSetSignal(SIGHUP, dmStopDnode);
(void)taosSetSignal(SIGINT, dmStopDnode);
(void)taosSetSignal(SIGBREAK, dmStopDnode);
if (taosSetSignal(SIGUSR1, dmSetDebugFlag) != 0) {
dWarn("failed to set signal SIGUSR1");
}
if (taosSetSignal(SIGUSR2, dmSetAssert) != 0) {
dWarn("failed to set signal SIGUSR1");
}
if (taosSetSignal(SIGTERM, dmStopDnode) != 0) {
dWarn("failed to set signal SIGUSR1");
}
if (taosSetSignal(SIGHUP, dmStopDnode) != 0) {
dWarn("failed to set signal SIGUSR1");
}
if (taosSetSignal(SIGINT, dmStopDnode) != 0) {
dWarn("failed to set signal SIGUSR1");
}
if (taosSetSignal(SIGBREAK, dmStopDnode) != 0) {
dWarn("failed to set signal SIGUSR1");
}
#ifndef WINDOWS
(void)taosSetSignal(SIGTSTP, dmStopDnode);
(void)taosSetSignal(SIGQUIT, dmStopDnode);
#endif
#if 0
#ifndef WINDOWS
(void)taosSetSignal(SIGBUS, dmLogCrash);
#endif
(void)taosSetSignal(SIGABRT, dmLogCrash);
(void)taosSetSignal(SIGFPE, dmLogCrash);
(void)taosSetSignal(SIGSEGV, dmLogCrash);
if (taosSetSignal(SIGTSTP, dmStopDnode) != 0) {
dWarn("failed to set signal SIGUSR1");
}
if (taosSetSignal(SIGQUIT, dmStopDnode) != 0) {
dWarn("failed to set signal SIGUSR1");
}
#endif
}

View File

@ -45,7 +45,9 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
if (pMgmt->pData->ipWhiteVer == ver) {
if (ver == 0) {
dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
(void)rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL);
if (rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL) != 0) {
dError("failed to disable ip white list on dnode");
}
}
return;
}
@ -91,7 +93,9 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
pMgmt->statusSeq);
pMgmt->pData->dropped = 1;
(void)dmWriteEps(pMgmt->pData);
if (dmWriteEps(pMgmt->pData) != 0) {
dError("failed to write dnode file");
}
dInfo("dnode will exit since it is in the dropped state");
(void)raise(SIGINT);
}
@ -147,7 +151,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
req.clusterCfg.monitorParas.tsSlowLogThresholdTest = tsSlowLogThresholdTest;
tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
if (taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0) != 0) {
dError("failed to parse time since %s", tstrerror(code));
}
memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
@ -243,7 +249,9 @@ void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
SEpSet epSet = {0};
dmGetMnodeEpSet(pMgmt->pData, &epSet);
(void)rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL);
if (rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL) != 0) {
dError("failed to send notify req");
}
}
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {

View File

@ -305,11 +305,16 @@ int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->notifyThread)) {
(void)tsem_post(&dmNotifyHdl.sem);
if (tsem_post(&dmNotifyHdl.sem) != 0) {
dError("failed to post notify sem");
}
(void)taosThreadJoin(pMgmt->notifyThread, NULL);
taosThreadClear(&pMgmt->notifyThread);
}
(void)tsem_destroy(&dmNotifyHdl.sem);
if (tsem_destroy(&dmNotifyHdl.sem) != 0) {
dError("failed to destroy notify sem");
}
}
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {

View File

@ -17,7 +17,9 @@
#include "mmInt.h"
void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) {
(void)mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->stb, &pInfo->grant);
if (mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->stb, &pInfo->grant) != 0) {
dError("failed to get monitor info");
}
}
void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {

View File

@ -23,7 +23,7 @@ static inline void qmSendRsp(SRpcMsg *pMsg, int32_t code) {
.contLen = pMsg->info.rspLen,
.info = pMsg->info,
};
(void)tmsgSendRsp(&rsp);
tmsgSendRsp(&rsp);
}
static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {

View File

@ -23,15 +23,15 @@ static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) {
.contLen = pMsg->info.rspLen,
.info = pMsg->info,
};
(void)tmsgSendRsp(&rsp);
tmsgSendRsp(&rsp);
}
static void smProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SSnodeMgmt *pMgmt = pInfo->ahandle;
for (int32_t i = 0; i < numOfMsgs; i++) {
SRpcMsg *pMsg = NULL;
(void)taosGetQitem(qall, (void **)&pMsg);
SRpcMsg *pMsg = NULL;
int32_t num = taosGetQitem(qall, (void **)&pMsg);
const STraceId *trace = &pMsg->info.traceId;
dTrace("msg:%p, get from snode-write queue", pMsg);

View File

@ -35,10 +35,14 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
SVnodeObj *pVnode = *ppVnode;
SVnodeLoad vload = {.vgId = pVnode->vgId};
if (!pVnode->failed) {
(void)vnodeGetLoad(pVnode->pImpl, &vload);
if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
dError("failed to get vnode load");
}
if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
}
(void)taosArrayPush(pInfo->pVloads, &vload);
if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
dError("failed to push vnode load");
}
pIter = taosHashIterate(pMgmt->hash, pIter);
}
@ -116,7 +120,9 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
(void)tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs);
if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
dError("failed to get tfs monitor info");
}
taosArrayDestroy(pVloads);
}
@ -845,7 +851,9 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
vmCloseVnode(pMgmt, pVnode, false);
(void)vmWriteVnodeListToFile(pMgmt);
if (vmWriteVnodeListToFile(pMgmt) != 0) {
dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
}
dInfo("vgId:%d, is dropped", vgId);
return 0;

View File

@ -233,8 +233,12 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
if (commitAndRemoveWal) {
dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
(void)vnodeSyncCommit(pVnode->pImpl);
(void)vnodeBegin(pVnode->pImpl);
if (vnodeSyncCommit(pVnode->pImpl) != 0) {
dError("vgId:%d, failed to commit data", pVnode->vgId);
}
if (vnodeBegin(pVnode->pImpl) != 0) {
dError("vgId:%d, failed to begin", pVnode->vgId);
}
dInfo("vgId:%d, commit data finished", pVnode->vgId);
}
@ -248,8 +252,12 @@ _closed:
if (commitAndRemoveWal) {
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
(void)tfsRmdir(pMgmt->pTfs, path);
(void)tfsMkdir(pMgmt->pTfs, path);
if (tfsRmdir(pMgmt->pTfs, path) != 0) {
dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
}
if (tfsMkdir(pMgmt->pTfs, path) != 0) {
dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
}
}
if (pVnode->dropped) {

View File

@ -187,7 +187,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
static void vmSendResponse(SRpcMsg *pMsg) {
if (pMsg->info.handle) {
SRpcMsg rsp = {.info = pMsg->info, .code = terrno};
(void)rpcSendResponse(&rsp);
if (rpcSendResponse(&rsp) != 0) {
dError("failed to send response since %s", terrstr());
}
}
}
@ -389,10 +391,28 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
SMultiWorkerCfg scfg = {.max = 1, .name = "vnode-sync", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-rd", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
SMultiWorkerCfg acfg = {.max = 1, .name = "vnode-apply", .fp = (FItems)vnodeApplyWriteMsg, .param = pVnode->pImpl};
(void)tMultiWorkerInit(&pVnode->pWriteW, &wcfg);
(void)tMultiWorkerInit(&pVnode->pSyncW, &scfg);
(void)tMultiWorkerInit(&pVnode->pSyncRdW, &sccfg);
(void)tMultiWorkerInit(&pVnode->pApplyW, &acfg);
code = tMultiWorkerInit(&pVnode->pWriteW, &wcfg);
if (code) {
return code;
}
code = tMultiWorkerInit(&pVnode->pSyncW, &scfg);
if (code) {
tMultiWorkerCleanup(&pVnode->pWriteW);
return code;
}
code = tMultiWorkerInit(&pVnode->pSyncRdW, &sccfg);
if (code) {
tMultiWorkerCleanup(&pVnode->pWriteW);
tMultiWorkerCleanup(&pVnode->pSyncW);
return code;
}
code = tMultiWorkerInit(&pVnode->pApplyW, &acfg);
if (code) {
tMultiWorkerCleanup(&pVnode->pWriteW);
tMultiWorkerCleanup(&pVnode->pSyncW);
tMultiWorkerCleanup(&pVnode->pSyncRdW);
return code;
}
pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);

View File

@ -47,8 +47,14 @@ static int32_t dmCheckRepeatInit(SDnode *pDnode) {
}
static int32_t dmInitSystem() {
(void)taosIgnSIGPIPE();
(void)taosBlockSIGPIPE();
if (taosIgnSIGPIPE() != 0) {
dError("failed to ignore SIGPIPE");
}
if (taosBlockSIGPIPE() != 0) {
dError("failed to block SIGPIPE");
}
taosResolveCRC();
return 0;
}
@ -204,7 +210,9 @@ void dmCleanup() {
auditCleanup();
syncCleanUp();
walCleanUp();
(void)udfcClose();
if (udfcClose() != 0) {
dError("failed to close udfc");
}
udfStopUdfd();
taosStopCacheRefreshWorker();
(void)dmDiskClose();

View File

@ -47,8 +47,7 @@ int32_t dmInitDnode(SDnode *pDnode) {
}
// compress module init
(void)tsCompressInit(tsLossyColumns, tsFPrecision, tsDPrecision, tsMaxRange, tsCurRange, (int)tsIfAdtFse,
tsCompressor);
tsCompressInit(tsLossyColumns, tsFPrecision, tsDPrecision, tsMaxRange, tsCurRange, (int)tsIfAdtFse, tsCompressor);
pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
pDnode->wrappers[MNODE].func = mmGetMgmtFunc();
@ -226,7 +225,10 @@ void dmClearVars(SDnode *pDnode) {
(void)taosThreadRwlockDestroy(&pWrapper->lock);
}
if (pDnode->lockfile != NULL) {
(void)taosUnLockFile(pDnode->lockfile);
if (taosUnLockFile(pDnode->lockfile) != 0) {
dError("failed to unlock file");
}
(void)taosCloseFile(&pDnode->lockfile);
pDnode->lockfile = NULL;
}
@ -343,7 +345,9 @@ void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
rsp.contLen = pMsg->contLen;
}
(void)rpcSendResponse(&rsp);
if (rpcSendResponse(&rsp) != 0) {
dError("failed to send response, msg:%p", &rsp);
}
rpcFreeCont(pMsg->pCont);
}
@ -360,11 +364,16 @@ void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
} else {
rsp.pCont = rpcMallocCont(contLen);
if (rsp.pCont != NULL) {
(void)tSerializeSServerStatusRsp(rsp.pCont, contLen, &statusRsp);
rsp.contLen = contLen;
if (tSerializeSServerStatusRsp(rsp.pCont, contLen, &statusRsp) < 0) {
rsp.code = TSDB_CODE_APP_ERROR;
} else {
rsp.contLen = contLen;
}
}
}
(void)rpcSendResponse(&rsp);
if (rpcSendResponse(&rsp) != 0) {
dError("failed to send response, msg:%p", &rsp);
}
rpcFreeCont(pMsg->pCont);
}

View File

@ -18,7 +18,11 @@
#include "qworker.h"
#include "tversion.h"
static inline void dmSendRsp(SRpcMsg *pMsg) { (void)rpcSendResponse(pMsg); }
static inline void dmSendRsp(SRpcMsg *pMsg) {
if (rpcSendResponse(pMsg) != 0) {
dError("failed to send response, msg:%p", pMsg);
}
}
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
SEpSet epSet = {0};
@ -113,7 +117,11 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
int32_t svrVer = 0;
(void)taosVersionStrToInt(version, &svrVer);
code = taosVersionStrToInt(version, &svrVer);
if (code != 0) {
dError("failed to convert version string:%s to int, code:%d", version, code);
goto _OVER;
}
if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer,
pRpc->info.conn.clientIp);
@ -253,7 +261,9 @@ _OVER:
if (pWrapper != NULL) {
dmSendRsp(&rsp);
} else {
(void)rpcSendResponse(&rsp);
if (rpcSendResponse(&rsp) != 0) {
dError("failed to send response, msg:%p", &rsp);
}
}
}
@ -310,7 +320,9 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return code;
} else {
pMsg->info.handle = 0;
(void)rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
if (rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL) != 0) {
dError("failed to send rpc msg");
}
return 0;
}
}
@ -396,7 +408,9 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.notWaitAvaliableConn = 0;
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
dError("failed to convert version string:%s to int", version);
}
pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) {
@ -440,7 +454,10 @@ int32_t dmInitStatusClient(SDnode *pDnode) {
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
dError("failed to convert version string:%s to int", version);
}
pTrans->statusRpc = rpcOpen(&rpcInit);
if (pTrans->statusRpc == NULL) {
@ -485,7 +502,9 @@ int32_t dmInitSyncClient(SDnode *pDnode) {
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
dError("failed to convert version string:%s to int", version);
}
pTrans->syncRpc = rpcOpen(&rpcInit);
if (pTrans->syncRpc == NULL) {
@ -536,7 +555,11 @@ int32_t dmInitServer(SDnode *pDnode) {
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.parent = pDnode;
rpcInit.compressSize = tsCompressMsgSize;
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
dError("failed to convert version string:%s to int", version);
}
pTrans->serverRpc = rpcOpen(&rpcInit);
if (pTrans->serverRpc == NULL) {
dError("failed to init dnode rpc server since:%s", tstrerror(terrno));

View File

@ -259,7 +259,9 @@ _OVER:
if (taosArrayGetSize(pData->dnodeEps) == 0) {
SDnodeEp dnodeEp = {0};
dnodeEp.isMnode = 1;
(void)taosGetFqdnPortFromEp(tsFirst, &dnodeEp.ep);
if (taosGetFqdnPortFromEp(tsFirst, &dnodeEp.ep) != 0) {
dError("failed to get fqdn and port from ep:%s", tsFirst);
}
if (taosArrayPush(pData->dnodeEps, &dnodeEp) == NULL) {
return terrno;
}
@ -370,11 +372,19 @@ int32_t dmGetDnodeSize(SDnodeData *pData) {
}
void dmUpdateEps(SDnodeData *pData, SArray *eps) {
(void)taosThreadRwlockWrlock(&pData->lock);
if (taosThreadRwlockWrlock(&pData->lock) != 0) {
dError("failed to lock dnode lock");
}
dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer);
dmResetEps(pData, eps);
(void)dmWriteEps(pData);
(void)taosThreadRwlockUnlock(&pData->lock);
if (dmWriteEps(pData) != 0) {
dError("failed to write dnode file");
}
if (taosThreadRwlockUnlock(&pData->lock) != 0) {
dError("failed to unlock dnode lock");
}
}
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
@ -590,7 +600,9 @@ void dmRemoveDnodePairs(SDnodeData *pData) {
snprintf(file, sizeof(file), "%s%sdnode%sep.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
snprintf(bak, sizeof(bak), "%s%sdnode%sep.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP);
dInfo("dnode file:%s is rename to bak file", file);
(void)taosRenameFile(file, bak);
if (taosRenameFile(file, bak) != 0) {
dError("failed to rename dnode file:%s to bak file:%s since %s", file, bak, tstrerror(terrno));
}
}
static int32_t dmReadDnodePairs(SDnodeData *pData) {

View File

@ -375,7 +375,7 @@ struct STsdb {
struct {
SVHashTable *ht;
SArray *arr;
} *commitInfo;
} * commitInfo;
};
struct TSDBKEY {
@ -949,7 +949,7 @@ int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle);
int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle);
int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage);
void tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);

View File

@ -3682,8 +3682,7 @@ int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHan
return code;
}
int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
int32_t code = 0;
void tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
char key[128] = {0};
int keyLen = 0;
LRUHandle *handle = NULL;
@ -3696,7 +3695,7 @@ int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_
_taos_lru_deleter_t deleter = deleteBCache;
uint8_t *pPg = taosMemoryMalloc(charge);
if (!pPg) {
TAOS_RETURN(terrno);
return; // ignore error with s3 cache and leave error untouched
}
memcpy(pPg, pPage, charge);
@ -3710,6 +3709,4 @@ int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_
(void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
TAOS_RETURN(code);
}

View File

@ -483,7 +483,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
int nPage = pgnoEnd - pgno + 1;
for (int i = 0; i < nPage; ++i) {
if (pFD->szFile != pgno) { // DONOT cache last volatile page
(void)tsdbCacheSetPageS3(pFD->pTsdb->pgCache, pFD, pgno, pBlock + i * pFD->szPage);
tsdbCacheSetPageS3(pFD->pTsdb->pgCache, pFD, pgno, pBlock + i * pFD->szPage);
}
if (szHint > 0 && n >= size) {

View File

@ -137,8 +137,12 @@ int32_t indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
TAOS_CHECK_GOTO(terrno, NULL, END);
}
TAOS_UNUSED(taosThreadMutexInit(&idx->mtx, NULL));
TAOS_UNUSED(tsem_init(&idx->sem, 0, 0));
if (taosThreadMutexInit(&idx->mtx, NULL) != 0) {
TAOS_CHECK_GOTO(terrno, NULL, END);
}
if (tsem_init(&idx->sem, 0, 0) != 0) {
TAOS_CHECK_GOTO(terrno, NULL, END);
}
idx->refId = idxAddRef(idx);
idx->opts = *opts;
@ -213,7 +217,10 @@ void idxReleaseRef(int64_t ref) {
int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
// TODO(yihao): reduce the lock range
int32_t code = 0;
TAOS_UNUSED(taosThreadMutexLock(&index->mtx));
if (taosThreadMutexLock(&index->mtx) != 0) {
indexError("failed to lock index mutex");
}
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm* p = taosArrayGetP(fVals, i);
@ -231,7 +238,9 @@ int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
}
}
}
TAOS_UNUSED(taosThreadMutexUnlock(&index->mtx));
if (taosThreadMutexUnlock(&index->mtx) != 0) {
indexError("failed to unlock index mutex");
}
if (code != 0) {
return code;
@ -463,7 +472,10 @@ static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** resu
int32_t sz = idxSerialCacheKey(&key, buf);
TAOS_UNUSED(taosThreadMutexLock(&sIdx->mtx));
if (taosThreadMutexLock(&sIdx->mtx) != 0) {
indexError("failed to lock index mutex");
}
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
cache = (pCache == NULL) ? NULL : *pCache;
TAOS_UNUSED(taosThreadMutexUnlock(&sIdx->mtx));
@ -757,7 +769,9 @@ static int64_t idxGetAvailableVer(SIndex* sIdx, IndexCache* cache) {
IndexTFile* tf = (IndexTFile*)(sIdx->tindex);
TAOS_UNUSED(taosThreadMutexLock(&tf->mtx));
if (taosThreadMutexLock(&tf->mtx) != 0) {
indexError("failed to lock tfile mutex");
}
TFileReader* rd = tfileCacheGet(tf->cache, &key);
TAOS_UNUSED(taosThreadMutexUnlock(&tf->mtx));
@ -801,9 +815,15 @@ static int32_t idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
TFileHeader* header = &reader->header;
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
TAOS_UNUSED(taosThreadMutexLock(&tf->mtx));
if (taosThreadMutexLock(&tf->mtx) != 0) {
indexError("failed to lock tfile mutex");
}
code = tfileCachePut(tf->cache, &key, reader);
TAOS_UNUSED(taosThreadMutexUnlock(&tf->mtx));
if (taosThreadMutexUnlock(&tf->mtx) != 0) {
indexError("failed to unlock tfile mutex");
}
return code;

View File

@ -398,8 +398,17 @@ IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8
cache->suid = suid;
cache->occupiedMem = 0;
TAOS_UNUSED(taosThreadMutexInit(&cache->mtx, NULL));
TAOS_UNUSED(taosThreadCondInit(&cache->finished, NULL));
if (taosThreadMutexInit(&cache->mtx, NULL) != 0) {
indexError("failed to create mutex for index cache");
taosMemoryFree(cache);
return NULL;
}
if (taosThreadCondInit(&cache->finished, NULL) != 0) {
indexError("failed to create cond for index cache");
taosMemoryFree(cache);
return NULL;
}
idxCacheRef(cache);
if (idx != NULL) {
@ -410,10 +419,16 @@ IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8
void idxCacheDebug(IndexCache* cache) {
MemTable* tbl = NULL;
TAOS_UNUSED(taosThreadMutexLock(&cache->mtx));
if ((taosThreadMutexLock(&cache->mtx)) != 0) {
indexError("failed to lock cache mutex");
}
tbl = cache->mem;
idxMemRef(tbl);
TAOS_UNUSED(taosThreadMutexUnlock(&cache->mtx));
if (taosThreadMutexUnlock(&cache->mtx) != 0) {
indexError("failed to unlock cache mutex");
}
{
SSkipList* slt = tbl->mem;
@ -432,7 +447,9 @@ void idxCacheDebug(IndexCache* cache) {
}
{
TAOS_UNUSED(taosThreadMutexLock(&cache->mtx));
if (taosThreadMutexLock(&cache->mtx) != 0) {
indexError("failed to lock cache mutex");
}
tbl = cache->imm;
idxMemRef(tbl);
TAOS_UNUSED(taosThreadMutexUnlock(&cache->mtx));
@ -480,7 +497,9 @@ void idxCacheDestroyImm(IndexCache* cache) {
return;
}
MemTable* tbl = NULL;
TAOS_UNUSED(taosThreadMutexLock(&cache->mtx));
if (taosThreadMutexLock(&cache->mtx) != 0) {
indexError("failed to lock cache mutex");
}
tbl = cache->imm;
cache->imm = NULL; // or throw int bg thread
@ -517,7 +536,11 @@ Iterate* idxCacheIteratorCreate(IndexCache* cache) {
if (iter == NULL) {
return NULL;
}
TAOS_UNUSED(taosThreadMutexLock(&cache->mtx));
if (taosThreadMutexLock(&cache->mtx) != 0) {
indexError("failed to lock cache mutex");
taosMemoryFree(iter);
return NULL;
}
idxMemRef(cache->imm);
@ -615,7 +638,9 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
// ugly code, refactor later
int64_t estimate = sizeof(ct) + strlen(ct->colVal);
TAOS_UNUSED(taosThreadMutexLock(&pCache->mtx));
if (taosThreadMutexLock(&pCache->mtx) != 0) {
indexError("failed to lock cache mutex");
}
pCache->occupiedMem += estimate;
idxCacheMakeRoomForWrite(pCache);
MemTable* tbl = pCache->mem;
@ -623,7 +648,9 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
TAOS_UNUSED(tSkipListPut(tbl->mem, (char*)ct));
idxMemUnRef(tbl);
TAOS_UNUSED(taosThreadMutexUnlock(&pCache->mtx));
if (taosThreadMutexUnlock(&pCache->mtx) != 0) {
indexError("failed to unlock cache mutex");
}
idxCacheUnRef(pCache);
return 0;
}
@ -631,13 +658,17 @@ void idxCacheForceToMerge(void* cache) {
IndexCache* pCache = cache;
idxCacheRef(pCache);
TAOS_UNUSED(taosThreadMutexLock(&pCache->mtx));
if (taosThreadMutexLock(&pCache->mtx) != 0) {
indexError("failed to lock cache mutex");
}
indexInfo("%p is forced to merge into tfile", pCache);
pCache->occupiedMem += MEM_SIGNAL_QUIT;
idxCacheMakeRoomForWrite(pCache);
TAOS_UNUSED(taosThreadMutexUnlock(&pCache->mtx));
if (taosThreadMutexUnlock(&pCache->mtx) != 0) {
indexError("failed to unlock cache mutex");
}
idxCacheUnRef(pCache);
return;
}
@ -668,12 +699,16 @@ int idxCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STerm
IndexCache* pCache = cache;
MemTable *mem = NULL, *imm = NULL;
TAOS_UNUSED(taosThreadMutexLock(&pCache->mtx));
if (taosThreadMutexLock(&pCache->mtx) != 0) {
indexError("failed to lock cache mutex");
}
mem = pCache->mem;
imm = pCache->imm;
idxMemRef(mem);
idxMemRef(imm);
TAOS_UNUSED(taosThreadMutexUnlock(&pCache->mtx));
if (taosThreadMutexUnlock(&pCache->mtx) != 0) {
indexError("failed to unlock cache mutex");
}
int64_t st = taosGetTimestampUs();

View File

@ -994,7 +994,9 @@ Fst* fstCreate(FstSlice* slice) {
*s = fstSliceCopy(slice, 0, FST_SLICE_LEN(slice) - 1);
fst->data = s;
TAOS_UNUSED(taosThreadMutexInit(&fst->mtx, NULL));
if (taosThreadMutexInit(&fst->mtx, NULL) != 0) {
goto FST_CREAT_FAILED;
}
return fst;
FST_CREAT_FAILED:

View File

@ -739,7 +739,11 @@ IndexTFile* idxTFileCreate(SIndex* idx, const char* path) {
tfileCacheDestroy(cache);
return NULL;
}
TAOS_UNUSED(taosThreadMutexInit(&tfile->mtx, NULL));
if (taosThreadMutexInit(&tfile->mtx, NULL) != 0) {
taosMemoryFree(tfile);
tfileCacheDestroy(cache);
return NULL;
}
tfile->cache = cache;
return tfile;
}
@ -764,9 +768,16 @@ int idxTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* result) {
SIndexTerm* term = query->term;
ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
TAOS_UNUSED(taosThreadMutexLock(&pTfile->mtx));
if (taosThreadMutexLock(&pTfile->mtx) != 0) {
indexError("failed to lock tfile mutex");
}
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
TAOS_UNUSED(taosThreadMutexUnlock(&pTfile->mtx));
if (taosThreadMutexUnlock(&pTfile->mtx) != 0) {
indexError("failed to unlock tfile mutex");
}
if (reader == NULL) {
return 0;
}
@ -883,9 +894,13 @@ TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) {
TFileReader* rd = NULL;
ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
TAOS_UNUSED(taosThreadMutexLock(&tf->mtx));
if (taosThreadMutexLock(&tf->mtx) != 0) {
indexError("failed to lock tfile mutex");
}
rd = tfileCacheGet(tf->cache, &key);
TAOS_UNUSED(taosThreadMutexUnlock(&tf->mtx));
if (taosThreadMutexUnlock(&tf->mtx) != 0) {
indexError("failed to unlock tfile mutex");
}
return rd;
}

View File

@ -43,7 +43,6 @@
// }
// strcpy(tsSnodeAddress, "127.0.0.1");
// int ret = RUN_ALL_TESTS();
// s3CleanUp();
// return ret;
// }

View File

@ -324,8 +324,8 @@ bool lossyFloat = false;
bool lossyDouble = false;
// init call
int32_t tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
void tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, uint32_t intervals,
int32_t ifAdtFse, const char *compressor) {
// config
lossyFloat = strstr(lossyColumns, "float") != NULL;
lossyDouble = strstr(lossyColumns, "double") != NULL;
@ -333,7 +333,7 @@ int32_t tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision,
tdszInit(fPrecision, dPrecision, maxIntervals, intervals, ifAdtFse, compressor);
if (lossyFloat) uTrace("lossy compression float is opened. ");
if (lossyDouble) uTrace("lossy compression double is opened. ");
return 0;
return;
}
// exit call
void tsCompressExit() { tdszExit(); }

View File

@ -153,14 +153,16 @@ static void getDay(char* buf){
time_t t = taosTime(NULL);
struct tm tmInfo;
if (taosLocalTime(&t, &tmInfo, buf) != NULL) {
(void)strftime(buf, LOG_FILE_DAY_LEN, "%Y-%m-%d", &tmInfo);
TAOS_UNUSED(strftime(buf, LOG_FILE_DAY_LEN, "%Y-%m-%d", &tmInfo));
}
}
static int64_t getTimestampToday() {
time_t t = taosTime(NULL);
struct tm tm;
(void) taosLocalTime(&t, &tm, NULL);
if (taosLocalTime(&t, &tm, NULL) == NULL) {
return 0;
}
tm.tm_hour = 0;
tm.tm_min = 0;
tm.tm_sec = 0;
@ -203,7 +205,7 @@ int32_t taosInitSlowLog() {
tsLogObj.slowHandle = taosLogBuffNew(LOG_SLOW_BUF_SIZE);
if (tsLogObj.slowHandle == NULL) return terrno;
(void)taosUmaskFile(0);
TAOS_UNUSED(taosUmaskFile(0));
tsLogObj.slowHandle->pFile = taosOpenFile(name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (tsLogObj.slowHandle->pFile == NULL) {
(void)printf("\nfailed to open slow log file:%s, reason:%s\n", name, strerror(errno));
@ -281,7 +283,10 @@ static void taosUnLockLogFile(TdFilePtr pFile) {
if (pFile == NULL) return;
if (tsLogObj.fileNum > 1) {
(void)taosUnLockFile(pFile);
int32_t code = taosUnLockFile(pFile);
if (code != 0) {
TAOS_UNUSED(printf("failed to unlock log file:%p, reason:%s\n", pFile, tstrerror(code)));
}
}
}
@ -310,7 +315,10 @@ static void taosKeepOldLog(char *oldName) {
char compressFileName[PATH_MAX + 20];
snprintf(compressFileName, PATH_MAX + 20, "%s.gz", oldName);
if (taosCompressFile(oldName, compressFileName) == 0) {
(void)taosRemoveFile(oldName);
int32_t code = taosRemoveFile(oldName);
if (code != 0) {
TAOS_UNUSED(printf("failed to remove file:%s, reason:%s\n", oldName, tstrerror(code)));
}
}
}
@ -331,7 +339,7 @@ static OldFileKeeper *taosOpenNewFile() {
char name[PATH_MAX + 20];
sprintf(name, "%s.%d", tsLogObj.logName, tsLogObj.flag);
(void)taosUmaskFile(0);
TAOS_UNUSED(taosUmaskFile(0));
TdFilePtr pFile = taosOpenFile(name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
@ -341,8 +349,10 @@ static OldFileKeeper *taosOpenNewFile() {
return NULL;
}
(void)taosLockLogFile(pFile);
(void)taosLSeekFile(pFile, 0, SEEK_SET);
TAOS_UNUSED(taosLockLogFile(pFile));
if (taosLSeekFile(pFile, 0, SEEK_SET) < 0) {
uWarn("failed to seek file:%s, reason:%s", name, tstrerror(terrno));
}
TdFilePtr pOldFile = tsLogObj.logHandle->pFile;
tsLogObj.logHandle->pFile = pFile;
@ -384,7 +394,14 @@ static int32_t taosOpenNewLogFile() {
(void)taosThreadAttrSetDetachState(&attr, PTHREAD_CREATE_DETACHED);
OldFileKeeper *oldFileKeeper = taosOpenNewFile();
(void)taosThreadCreate(&thread, &attr, taosThreadToCloseOldFile, oldFileKeeper);
if (!oldFileKeeper) {
TAOS_UNUSED(taosThreadMutexUnlock(&tsLogObj.logMutex));
return terrno;
}
if (taosThreadCreate(&thread, &attr, taosThreadToCloseOldFile, oldFileKeeper) != 0) {
uError("failed to create thread to close old log file");
taosMemoryFreeClear(oldFileKeeper);
}
(void)taosThreadAttrDestroy(&attr);
}
@ -404,7 +421,7 @@ static void taosOpenNewSlowLogFile() {
for (int32_t i = 1; atomic_val_compare_exchange_32(&tsLogObj.slowHandle->lock, 0, 1) == 1; ++i) {
if (i % 1000 == 0) {
(void)sched_yield();
TAOS_UNUSED(sched_yield());
}
}
tsLogObj.slowHandle->lastDuration = LOG_MAX_WAIT_MSEC; // force write
@ -435,7 +452,10 @@ void taosResetLog() {
tsLogObj.lines = tsNumOfLogLines + 10;
if (tsLogObj.logHandle) {
(void)taosOpenNewLogFile();
int32_t code = taosOpenNewLogFile();
if(code != 0){
uError("failed to open new log file, reason:%s", tstrerror(code));
}
uInfo("==================================");
uInfo(" reset log file ");
}
@ -521,6 +541,7 @@ static void processLogFileName(const char* logName , int32_t maxFileNum){
}
static int32_t taosInitNormalLog(const char *logName, int32_t maxFileNum) {
int32_t code = 0;
#ifdef WINDOWS_STASH
/*
* always set maxFileNum to 1
@ -535,7 +556,7 @@ static int32_t taosInitNormalLog(const char *logName, int32_t maxFileNum) {
(void)sprintf(name, "%s.%d", tsLogObj.logName, tsLogObj.flag);
(void)taosThreadMutexInit(&tsLogObj.logMutex, NULL);
(void)taosUmaskFile(0);
TAOS_UNUSED(taosUmaskFile(0));
tsLogObj.logHandle = taosLogBuffNew(LOG_DEFAULT_BUF_SIZE);
if (tsLogObj.logHandle == NULL) return terrno;
@ -544,24 +565,41 @@ static int32_t taosInitNormalLog(const char *logName, int32_t maxFileNum) {
(void)printf("\nfailed to open log file:%s, reason:%s\n", name, strerror(errno));
return terrno;
}
(void)taosLockLogFile(tsLogObj.logHandle->pFile);
TAOS_UNUSED(taosLockLogFile(tsLogObj.logHandle->pFile));
// only an estimate for number of lines
int64_t filesize = 0;
if (taosFStatFile(tsLogObj.logHandle->pFile, &filesize, NULL) != 0) {
(void)printf("\nfailed to fstat log file:%s, reason:%s\n", name, strerror(errno));
taosUnLockLogFile(tsLogObj.logHandle->pFile);
return terrno;
}
tsLogObj.lines = (int32_t)(filesize / 60);
(void)taosLSeekFile(tsLogObj.logHandle->pFile, 0, SEEK_END);
if ((code = taosLSeekFile(tsLogObj.logHandle->pFile, 0, SEEK_END)) < 0) {
TAOS_UNUSED(printf("failed to seek to the end of log file:%s, reason:%s\n", name, tstrerror(code)));
taosUnLockLogFile(tsLogObj.logHandle->pFile);
return code;
}
(void)sprintf(name, "==================================================\n");
(void)taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name));
if (taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name)) <= 0) {
TAOS_UNUSED(printf("failed to write to log file:%s, reason:%s\n", name, tstrerror(terrno)));
taosUnLockLogFile(tsLogObj.logHandle->pFile);
return terrno;
}
(void)sprintf(name, " new log file \n");
(void)taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name));
if (taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name)) <= 0) {
TAOS_UNUSED(printf("failed to write to log file:%s, reason:%s\n", name, tstrerror(terrno)));
taosUnLockLogFile(tsLogObj.logHandle->pFile);
return terrno;
}
(void)sprintf(name, "==================================================\n");
(void)taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name));
if (taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name)) <= 0) {
TAOS_UNUSED(printf("failed to write to log file:%s, reason:%s\n", name, tstrerror(terrno)));
taosUnLockLogFile(tsLogObj.logHandle->pFile);
return terrno;
}
return 0;
}
@ -569,17 +607,17 @@ static int32_t taosInitNormalLog(const char *logName, int32_t maxFileNum) {
static void taosUpdateLogNums(ELogLevel level) {
switch (level) {
case DEBUG_ERROR:
(void)atomic_add_fetch_64(&tsNumOfErrorLogs, 1);
TAOS_UNUSED(atomic_add_fetch_64(&tsNumOfErrorLogs, 1));
break;
case DEBUG_INFO:
(void)atomic_add_fetch_64(&tsNumOfInfoLogs, 1);
TAOS_UNUSED(atomic_add_fetch_64(&tsNumOfInfoLogs, 1));
break;
case DEBUG_DEBUG:
(void)atomic_add_fetch_64(&tsNumOfDebugLogs, 1);
TAOS_UNUSED(atomic_add_fetch_64(&tsNumOfDebugLogs, 1));
break;
case DEBUG_DUMP:
case DEBUG_TRACE:
(void)atomic_add_fetch_64(&tsNumOfTraceLogs, 1);
TAOS_UNUSED(atomic_add_fetch_64(&tsNumOfTraceLogs, 1));
break;
default:
break;
@ -590,7 +628,7 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) {
struct tm Tm, *ptm;
struct timeval timeSecs;
(void)taosGetTimeOfDay(&timeSecs);
TAOS_UNUSED(taosGetTimeOfDay(&timeSecs));
time_t curTime = timeSecs.tv_sec;
ptm = taosLocalTime(&curTime, &Tm, NULL);
@ -603,15 +641,18 @@ static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *b
if ((dflag & DEBUG_FILE) && tsLogObj.logHandle && tsLogObj.logHandle->pFile != NULL && osLogSpaceSufficient()) {
taosUpdateLogNums(level);
if (tsAsyncLog) {
(void)taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
TAOS_UNUSED(taosPushLogBuffer(tsLogObj.logHandle, buffer, len));
} else {
(void)taosWriteFile(tsLogObj.logHandle->pFile, buffer, len);
TAOS_UNUSED(taosWriteFile(tsLogObj.logHandle->pFile, buffer, len));
}
if (tsNumOfLogLines > 0) {
(void)atomic_add_fetch_32(&tsLogObj.lines, 1);
TAOS_UNUSED(atomic_add_fetch_32(&tsLogObj.lines, 1));
if ((tsLogObj.lines > tsNumOfLogLines) && (tsLogObj.openInProgress == 0)) {
(void)taosOpenNewLogFile();
int32_t code = taosOpenNewLogFile();
if (code != 0) {
uError("failed to open new log file, reason:%s", tstrerror(code));
}
}
}
}
@ -619,7 +660,9 @@ static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *b
if (dflag & DEBUG_SCREEN) {
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-result"
(void)write(1, buffer, (uint32_t)len);
if (write(1, buffer, (uint32_t)len) < 0) {
TAOS_UNUSED(printf("failed to write log to screen, reason:%s\n", strerror(errno)));
}
#pragma GCC diagnostic pop
}
}
@ -690,12 +733,12 @@ void taosPrintSlowLog(const char *format, ...) {
buffer[len++] = '\n';
buffer[len] = 0;
(void)atomic_add_fetch_64(&tsNumOfSlowLogs, 1);
TAOS_UNUSED(atomic_add_fetch_64(&tsNumOfSlowLogs, 1));
if (tsAsyncLog) {
(void)taosPushLogBuffer(tsLogObj.slowHandle, buffer, len);
TAOS_UNUSED(taosPushLogBuffer(tsLogObj.slowHandle, buffer, len));
} else {
(void)taosWriteFile(tsLogObj.slowHandle->pFile, buffer, len);
TAOS_UNUSED(taosWriteFile(tsLogObj.slowHandle->pFile, buffer, len));
}
taosMemoryFree(buffer);
@ -714,7 +757,7 @@ void taosDumpData(unsigned char *msg, int32_t len) {
pos += 3;
if (c >= 16) {
temp[pos++] = '\n';
(void)taosWriteFile(tsLogObj.logHandle->pFile, temp, (uint32_t)pos);
TAOS_UNUSED((taosWriteFile(tsLogObj.logHandle->pFile, temp, (uint32_t)pos) <= 0));
c = 0;
pos = 0;
}
@ -722,7 +765,7 @@ void taosDumpData(unsigned char *msg, int32_t len) {
temp[pos++] = '\n';
(void)taosWriteFile(tsLogObj.logHandle->pFile, temp, (uint32_t)pos);
TAOS_UNUSED(taosWriteFile(tsLogObj.logHandle->pFile, temp, (uint32_t)pos));
}
static void taosCloseLogByFd(TdFilePtr pFile) {
@ -855,12 +898,12 @@ static void taosWriteLog(SLogBuff *pLogBuf) {
pLogBuf->lastDuration = 0;
if (start < end) {
(void)taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, pollSize);
TAOS_UNUSED(taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, pollSize));
} else {
int32_t tsize = LOG_BUF_SIZE(pLogBuf) - start;
(void)taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, tsize);
TAOS_UNUSED(taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, tsize));
(void)taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf), end);
TAOS_UNUSED(taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf), end));
}
dbgWN++;
@ -981,11 +1024,14 @@ void taosLogCrashInfo(char *nodeType, char *pMsg, int64_t msgLen, int signum, vo
goto _return;
}
(void)taosLockFile(pFile);
if (taosLockFile(pFile) < 0) {
taosPrintLog(flags, level, dflag, "failed to lock file:%s since %s", filepath, terrstr());
goto _return;
}
int64_t writeSize = taosWriteFile(pFile, &msgLen, sizeof(msgLen));
if (sizeof(msgLen) != writeSize) {
(void)taosUnLockFile(pFile);
TAOS_UNUSED(taosUnLockFile(pFile));
taosPrintLog(flags, level, dflag, "failed to write len to file:%s,%p wlen:%" PRId64 " tlen:%lu since %s",
filepath, pFile, writeSize, sizeof(msgLen), terrstr());
goto _return;
@ -993,13 +1039,13 @@ void taosLogCrashInfo(char *nodeType, char *pMsg, int64_t msgLen, int signum, vo
writeSize = taosWriteFile(pFile, pMsg, msgLen);
if (msgLen != writeSize) {
(void)taosUnLockFile(pFile);
TAOS_UNUSED(taosUnLockFile(pFile));
taosPrintLog(flags, level, dflag, "failed to write file:%s,%p wlen:%" PRId64 " tlen:%" PRId64 " since %s",
filepath, pFile, writeSize, msgLen, terrstr());
goto _return;
}
(void)taosUnLockFile(pFile);
TAOS_UNUSED(taosUnLockFile(pFile));
}
_return:
@ -1054,7 +1100,7 @@ void taosReadCrashInfo(char *filepath, char **pMsg, int64_t *pMsgLen, TdFilePtr
return;
}
(void)taosLockFile(pFile);
TAOS_UNUSED(taosLockFile(pFile));
} else {
pFile = *pFd;
}
@ -1093,10 +1139,10 @@ void taosReadCrashInfo(char *filepath, char **pMsg, int64_t *pMsgLen, TdFilePtr
_return:
if (truncateFile) {
(void)taosFtruncateFile(pFile, 0);
TAOS_UNUSED(taosFtruncateFile(pFile, 0));
}
(void)taosUnLockFile(pFile);
(void)taosCloseFile(&pFile);
TAOS_UNUSED(taosUnLockFile(pFile));
TAOS_UNUSED(taosCloseFile(&pFile));
taosMemoryFree(buf);
*pMsg = NULL;
@ -1106,11 +1152,11 @@ _return:
void taosReleaseCrashLogFile(TdFilePtr pFile, bool truncateFile) {
if (truncateFile) {
(void)taosFtruncateFile(pFile, 0);
TAOS_UNUSED(taosFtruncateFile(pFile, 0));
}
(void)taosUnLockFile(pFile);
(void)taosCloseFile(&pFile);
TAOS_UNUSED(taosUnLockFile(pFile));
TAOS_UNUSED(taosCloseFile(&pFile));
}
#ifdef NDEBUG

View File

@ -0,0 +1,87 @@
#!/bin/bash
set -e
check_transactions() {
for i in {1..30}
do
output=$(taos -s "show transactions;")
if [[ $output == *"Query OK, 0 row(s)"* ]]; then
echo "Success: No transactions are in progress."
return 0
fi
sleep 1
done
echo "Error: Transactions are still in progress after 30 attempts."
return 1
}
reset_cache() {
response=$(curl --location -uroot:taosdata 'http://127.0.0.1:6041/rest/sql' --data 'reset query cache')
if [[ $response == \{\"code\":0* ]]; then
echo "Success: Query cache reset successfully."
else
echo "Error: Failed to reset query cache. Response: $response"
return 1
fi
}
taosd >>/dev/null 2>&1 &
taosadapter >>/dev/null 2>&1 &
sleep 5
cd ../../docs/examples/rust/nativeexample
cargo run --example connect
cargo run --example createdb
cargo run --example insert
cargo run --example query
taos -s "drop database if exists power"
check_transactions || exit 1
reset_cache || exit 1
cargo run --example schemaless
taos -s "drop database if exists power"
check_transactions || exit 1
reset_cache || exit 1
cargo run --example stmt
cargo run --example tmq
cd ../restexample
cargo run --example connect
cargo run --example createdb
cargo run --example insert
cargo run --example query
taos -s "drop database if exists power"
check_transactions || exit 1
reset_cache || exit 1
taos -s "create database if not exists power"
cargo run --example schemaless
taos -s "drop database if exists power"
check_transactions || exit 1
reset_cache || exit 1
cargo run --example stmt
cargo run --example tmq

View File

@ -1557,5 +1557,6 @@
#,,n,docs-examples-test,bash node.sh
,,n,docs-examples-test,bash csharp.sh
,,n,docs-examples-test,bash jdbc.sh
,,n,docs-examples-test,bash rust.sh
,,n,docs-examples-test,bash go.sh
,,n,docs-examples-test,bash test_R.sh