diff --git a/example/src/tmq.c b/example/src/tmq.c
index 832e389a13..fdd26bc95d 100644
--- a/example/src/tmq.c
+++ b/example/src/tmq.c
@@ -54,7 +54,8 @@ int32_t init_env() {
}
taos_free_result(pRes);
- pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c4 int) tags(t1 int)");
+ pRes =
+ taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
@@ -101,7 +102,7 @@ int32_t create_topic() {
/*const char* sql = "select * from tu1";*/
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
- pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c4 from ct1");
+ pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
@@ -144,6 +145,7 @@ void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_
}
tmq_t* build_consumer() {
+#if 0
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@@ -152,11 +154,15 @@ tmq_t* build_consumer() {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
+#endif
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
+ tmq_conf_set(conf, "td.connect.user", "root");
+ tmq_conf_set(conf, "td.connect.pass", "taosdata");
+ tmq_conf_set(conf, "td.connect.db", "abc1");
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
- tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
+ tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
return tmq;
}
diff --git a/include/client/taos.h b/include/client/taos.h
index 87948e7824..2180903633 100644
--- a/include/client/taos.h
+++ b/include/client/taos.h
@@ -247,10 +247,10 @@ DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
-// will be removed in 3.0
+#if 1
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
+#endif
-// will replace last one
DLL_EXPORT tmq_t *tmq_consumer_new1(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index 632425cb44..8956c3b7cf 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -332,6 +332,7 @@ int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(void* buf, SEpSet* pEp);
typedef struct {
+ int8_t connType;
int32_t pid;
char app[TSDB_APP_NAME_LEN];
char db[TSDB_DB_NAME_LEN];
@@ -346,6 +347,7 @@ typedef struct {
int64_t clusterId;
int32_t connId;
int8_t superUser;
+ int8_t connType;
SEpSet epSet;
char sVersion[128];
} SConnectRsp;
diff --git a/include/common/ttime.h b/include/common/ttime.h
index 15450c31ca..3de0b98d85 100644
--- a/include/common/ttime.h
+++ b/include/common/ttime.h
@@ -40,6 +40,7 @@ extern "C" {
* @return timestamp decided by global conf variable, tsTimePrecision
* if precision == TSDB_TIME_PRECISION_MICRO, it returns timestamp in microsecond.
* precision == TSDB_TIME_PRECISION_MILLI, it returns timestamp in millisecond.
+ * precision == TSDB_TIME_PRECISION_NANO, it returns timestamp in nanosecond.
*/
static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) {
if (precision == TSDB_TIME_PRECISION_MICRO) {
@@ -51,6 +52,24 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) {
}
}
+/*
+ * @return timestamp of today at 00:00:00 in given precision
+ * if precision == TSDB_TIME_PRECISION_MICRO, it returns timestamp in microsecond.
+ * precision == TSDB_TIME_PRECISION_MILLI, it returns timestamp in millisecond.
+ * precision == TSDB_TIME_PRECISION_NANO, it returns timestamp in nanosecond.
+ */
+static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) {
+ int64_t factor = (precision == TSDB_TIME_PRECISION_MILLI) ? 1000 :
+ (precision == TSDB_TIME_PRECISION_MICRO) ? 1000000 : 1000000000;
+ time_t t = taosTime(NULL);
+ struct tm * tm= taosLocalTime(&t, NULL);
+ tm->tm_hour = 0;
+ tm->tm_min = 0;
+ tm->tm_sec = 0;
+
+ return (int64_t)taosMktime(tm) * factor;
+}
+
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision);
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision);
diff --git a/include/os/osTime.h b/include/os/osTime.h
index 766fec0fbd..fd431f6df8 100644
--- a/include/os/osTime.h
+++ b/include/os/osTime.h
@@ -27,9 +27,11 @@ extern "C" {
#ifndef ALLOW_FORBID_FUNC
#define strptime STRPTIME_FUNC_TAOS_FORBID
#define gettimeofday GETTIMEOFDAY_FUNC_TAOS_FORBID
+ #define localtime LOCALTIME_FUNC_TAOS_FORBID
#define localtime_s LOCALTIMES_FUNC_TAOS_FORBID
#define localtime_r LOCALTIMER_FUNC_TAOS_FORBID
#define time TIME_FUNC_TAOS_FORBID
+ #define mktime MKTIME_FUNC_TAOS_FORBID
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
@@ -82,6 +84,8 @@ static FORCE_INLINE int64_t taosGetTimestampNs() {
char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm);
struct tm *taosLocalTime(const time_t *timep, struct tm *result);
+time_t taosTime(time_t *t);
+time_t taosMktime(struct tm *timep);
#ifdef __cplusplus
}
diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h
index f262705d5f..0f12880272 100644
--- a/source/client/inc/clientInt.h
+++ b/source/client/inc/clientInt.h
@@ -45,6 +45,11 @@ extern "C" {
#define HEARTBEAT_INTERVAL 1500 // ms
+enum {
+ CONN_TYPE__QUERY = 1,
+ CONN_TYPE__TMQ,
+};
+
typedef struct SAppInstInfo SAppInstInfo;
typedef struct {
@@ -132,9 +137,9 @@ typedef struct STscObj {
char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_FNAME_LEN];
char ver[128];
+ int8_t connType;
int32_t acctId;
uint32_t connId;
- int32_t connType;
uint64_t id; // ref ID returned by taosAddRef
TdThreadMutex mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection
@@ -272,7 +277,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
void initMsgHandleFp();
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
- uint16_t port);
+ uint16_t port, int connType);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c
index d389fc34c6..82788b2e11 100644
--- a/source/client/src/clientHb.c
+++ b/source/client/src/clientHb.c
@@ -23,6 +23,8 @@ static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread();
static void hbStopThread();
+static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; }
+
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
@@ -297,11 +299,10 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
return TSDB_CODE_SUCCESS;
}
-int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; }
-
void hbMgrInitMqHbHandle() {
clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle;
clientHbMgr.reqHandle[HEARTBEAT_TYPE_MQ] = hbMqHbReqHandle;
+
clientHbMgr.rspHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbRspHandle;
clientHbMgr.rspHandle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle;
}
@@ -438,7 +439,7 @@ static int32_t hbCreateThread() {
if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
- }
+ }
taosThreadAttrDestroy(&thAttr);
return 0;
}
@@ -568,7 +569,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
SClientHbKey connKey = {
.connId = connId,
- .hbType = HEARTBEAT_TYPE_QUERY,
+ .hbType = hbType,
};
SHbConnInfo info = {0};
@@ -578,16 +579,14 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int3
*pClusterId = clusterId;
info.param = pClusterId;
- break;
+ return hbRegisterConnImpl(pAppHbMgr, connKey, &info);
}
case HEARTBEAT_TYPE_MQ: {
- break;
+ return 0;
}
default:
- break;
+ return 0;
}
-
- return hbRegisterConnImpl(pAppHbMgr, connKey, &info);
}
void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c
index d9fb65e83d..52262eeaef 100644
--- a/source/client/src/clientImpl.c
+++ b/source/client/src/clientImpl.c
@@ -11,7 +11,7 @@
#include "tref.h"
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
-static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
+static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static bool stringLengthCheck(const char* str, size_t maxsize) {
@@ -40,10 +40,10 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
}
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
- SAppInstInfo* pAppInfo);
+ SAppInstInfo* pAppInfo, int connType);
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
- uint16_t port) {
+ uint16_t port, int connType) {
if (taos_init() != TSDB_CODE_SUCCESS) {
return NULL;
}
@@ -111,7 +111,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
taosThreadMutexUnlock(&appInfo.mutex);
taosMemoryFreeClear(key);
- return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst);
+ return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
}
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
@@ -418,7 +418,7 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
}
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
- SAppInstInfo* pAppInfo) {
+ SAppInstInfo* pAppInfo, int connType) {
STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo);
if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
@@ -432,7 +432,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return NULL;
}
- SMsgSendInfo* body = buildConnectMsg(pRequest);
+ SMsgSendInfo* body = buildConnectMsg(pRequest, connType);
int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
@@ -455,7 +455,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return pTscObj;
}
-static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
+static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) {
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
@@ -478,6 +478,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
}
taosMemoryFreeClear(db);
+ connectReq.connType = connType;
connectReq.pid = htonl(appInfo.pid);
connectReq.startTime = htobe64(appInfo.startTime);
tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
@@ -563,7 +564,7 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
return NULL;
}
- return taos_connect_internal(ip, user, NULL, auth, db, port);
+ return taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
}
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c
index 07a1199855..6fab4aa4e1 100644
--- a/source/client/src/clientMain.c
+++ b/source/client/src/clientMain.c
@@ -87,7 +87,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
pass = TSDB_DEFAULT_PASS;
}
- return taos_connect_internal(ip, user, pass, NULL, db, port);
+ return taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
}
void taos_close(TAOS *taos) {
@@ -124,8 +124,10 @@ const char *taos_errstr(TAOS_RES *res) {
}
void taos_free_result(TAOS_RES *res) {
- SRequestObj *pRequest = (SRequestObj *)res;
- destroyRequest(pRequest);
+ if (TD_RES_QUERY(res)) {
+ SRequestObj *pRequest = (SRequestObj *)res;
+ destroyRequest(pRequest);
+ }
}
int taos_field_count(TAOS_RES *res) {
diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c
index 550a33ecda..67c5679cac 100644
--- a/source/client/src/clientMsgHandler.c
+++ b/source/client/src/clientMsgHandler.c
@@ -69,9 +69,9 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
- pTscObj->connType = HEARTBEAT_TYPE_QUERY;
+ pTscObj->connType = connectRsp.connType;
- hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connectRsp.connId, connectRsp.clusterId, HEARTBEAT_TYPE_QUERY);
+ hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connectRsp.connId, connectRsp.clusterId, connectRsp.connType);
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
@@ -119,13 +119,14 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if (usedbRsp.vgVersion >= 0) {
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
- tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tstrerror(code));
+ tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
+ tstrerror(code));
} else {
catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
}
}
- tFreeSUsedbRsp(&usedbRsp);
+ tFreeSUsedbRsp(&usedbRsp);
}
if (code != TSDB_CODE_SUCCESS) {
@@ -139,7 +140,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
SName name = {0};
- tNameFromString(&name, usedbRsp.db, T_NAME_ACCT|T_NAME_DB);
+ tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB);
SUseDbOutput output = {0};
code = queryBuildUseDbOutput(&output, &usedbRsp);
@@ -151,11 +152,12 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
tscError("failed to build use db output since %s", terrstr());
} else {
- struct SCatalog *pCatalog = NULL;
-
+ struct SCatalog* pCatalog = NULL;
+
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
- tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tstrerror(code));
+ tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
+ tstrerror(code));
} else {
catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
}
diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c
index 2b69b47865..478e328a16 100644
--- a/source/client/src/tmq.c
+++ b/source/client/src/tmq.c
@@ -357,7 +357,15 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (pTmq == NULL) {
return NULL;
}
- pTmq->pTscObj = taos_connect(conf->ip, conf->user, conf->pass, conf->db, conf->port);
+ const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
+ const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
+
+ ASSERT(user);
+ ASSERT(pass);
+ ASSERT(conf->db);
+
+ pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
+ if (pTmq->pTscObj == NULL) return NULL;
pTmq->inWaiting = 0;
pTmq->status = 0;
@@ -783,7 +791,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
}
}
- struct tm* ptm = localtime(&tt);
+ struct tm* ptm = taosLocalTime(&tt, NULL);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) {
diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c
index 053a1a1ee6..dad5805c66 100644
--- a/source/common/src/tdatablock.c
+++ b/source/common/src/tdatablock.c
@@ -1368,7 +1368,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
}
}
- struct tm* ptm = localtime(&tt);
+ struct tm* ptm = taosLocalTime(&tt, NULL);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) {
diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c
index 9122cbb006..4aeaec5051 100644
--- a/source/common/src/tmsg.c
+++ b/source/common/src/tmsg.c
@@ -2532,6 +2532,7 @@ int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
+ if (tEncodeI8(&encoder, pReq->connType) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pid) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->app) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
@@ -2548,6 +2549,7 @@ int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
+ if (tDecodeI8(&decoder, &pReq->connType) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pid) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->app) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
@@ -2567,6 +2569,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tEncodeI64(&encoder, pRsp->clusterId) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->connId) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1;
+ if (tEncodeI8(&encoder, pRsp->connType) < 0) return -1;
if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->sVersion) < 0) return -1;
tEndEncode(&encoder);
@@ -2585,6 +2588,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->connId) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1;
+ if (tDecodeI8(&decoder, &pRsp->connType) < 0) return -1;
if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->sVersion) < 0) return -1;
tEndDecode(&decoder);
diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c
index 1baf393e9a..4686d856cc 100644
--- a/source/common/src/ttime.c
+++ b/source/common/src/ttime.c
@@ -70,7 +70,7 @@ void deltaToUtcInitOnce() {
struct tm tm = {0};
(void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm);
- m_deltaUtc = (int64_t)mktime(&tm);
+ m_deltaUtc = (int64_t)taosMktime(&tm);
// printf("====delta:%lld\n\n", seconds);
}
@@ -344,7 +344,7 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) {
}
/* mktime will be affected by TZ, set by using taos_options */
- int64_t seconds = mktime(&tm);
+ int64_t seconds = taosMktime(&tm);
int64_t fraction = 0;
@@ -539,7 +539,7 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
tm.tm_year = mon / 12;
tm.tm_mon = mon % 12;
- return (int64_t)(mktime(&tm) * TSDB_TICK_PER_SECOND(precision));
+ return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision));
}
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision) {
@@ -598,7 +598,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
tm.tm_mon = mon % 12;
}
- start = (int64_t)(mktime(&tm) * TSDB_TICK_PER_SECOND(precision));
+ start = (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision));
} else {
int64_t delta = t - pInterval->interval;
int32_t factor = (delta >= 0) ? 1 : -1;
@@ -745,7 +745,7 @@ void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision)
assert(false);
}
- ptm = localtime(");
+ ptm = taosLocalTime(", NULL);
int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", ptm);
length += snprintf(ts + length, fractionLen, format, mod);
length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm);
diff --git a/source/dnode/mnode/impl/src/mndGrant.c b/source/dnode/mnode/impl/src/mndGrant.c
index c97d4dd807..15aa55ef83 100644
--- a/source/dnode/mnode/impl/src/mndGrant.c
+++ b/source/dnode/mnode/impl/src/mndGrant.c
@@ -14,12 +14,13 @@
*/
#define _DEFAULT_SOURCE
-#ifndef _GRANT
#include "os.h"
#include "taoserror.h"
#include "mndGrant.h"
#include "mndInt.h"
+#ifndef _GRANT
+
int32_t mndInitGrant(SMnode *pMnode) { return TSDB_CODE_SUCCESS; }
void mndCleanupGrant() {}
void grantParseParameter() { mError("can't parsed parameter k"); }
@@ -30,4 +31,4 @@ void grantRestore(EGrantType grant, uint64_t value) {}
#endif
-void parseGrantParameter() { parseGrantParameter(); }
\ No newline at end of file
+void parseGrantParameter() { grantParseParameter(); }
\ No newline at end of file
diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c
index 9c8fe9ab0a..556e17ad2d 100644
--- a/source/dnode/mnode/impl/src/mndProfile.c
+++ b/source/dnode/mnode/impl/src/mndProfile.c
@@ -23,13 +23,14 @@
#include "tglobal.h"
#include "version.h"
-#define QUERY_ID_SIZE 20
-#define QUERY_OBJ_ID_SIZE 18
+#define QUERY_ID_SIZE 20
+#define QUERY_OBJ_ID_SIZE 18
#define SUBQUERY_INFO_SIZE 6
-#define QUERY_SAVE_SIZE 20
+#define QUERY_SAVE_SIZE 20
typedef struct {
int32_t id;
+ int8_t connType;
char user[TSDB_USER_LEN];
char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc
int64_t appStartTimeMs; // app start time
@@ -44,8 +45,8 @@ typedef struct {
SQueryDesc *pQueries;
} SConnObj;
-static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid,
- const char *app, int64_t startTime);
+static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
+ int32_t pid, const char *app, int64_t startTime);
static void mndFreeConn(SConnObj *pConn);
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
@@ -93,8 +94,8 @@ void mndCleanupProfile(SMnode *pMnode) {
}
}
-static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid,
- const char *app, int64_t startTime) {
+static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
+ int32_t pid, const char *app, int64_t startTime) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1);
@@ -102,6 +103,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, ui
if (startTime == 0) startTime = taosGetTimestampMs();
SConnObj connObj = {.id = connId,
+ .connType = connType,
.appStartTimeMs = startTime,
.pid = pid,
.ip = ip,
@@ -159,8 +161,8 @@ static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
}
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
- SConnObj* pConn = NULL;
- bool hasNext = taosCacheIterNext(pIter);
+ SConnObj *pConn = NULL;
+ bool hasNext = taosCacheIterNext(pIter);
if (hasNext) {
size_t dataLen = 0;
pConn = taosCacheIterGetData(pIter, &dataLen);
@@ -210,8 +212,8 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
}
}
- pConn =
- mndCreateConn(pMnode, pReq->user, pReq->clientIp, pReq->clientPort, connReq.pid, connReq.app, connReq.startTime);
+ pConn = mndCreateConn(pMnode, pReq->user, connReq.connType, pReq->clientIp, pReq->clientPort, connReq.pid,
+ connReq.app, connReq.startTime);
if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
goto CONN_OVER;
@@ -222,6 +224,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
connectRsp.superUser = pUser->superUser;
connectRsp.clusterId = pMnode->clusterId;
connectRsp.connId = pConn->id;
+ connectRsp.connType = connReq.connType;
snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
gitinfo);
@@ -343,7 +346,6 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
return -1;
}
-
SClientHbBatchRsp batchRsp = {0};
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
@@ -916,4 +918,4 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
int32_t mndGetNumOfConnections(SMnode *pMnode) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
return taosCacheGetNumOfObj(pMgmt->cache);
-}
\ No newline at end of file
+}
diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c
index e2b1725d9c..054bff466c 100644
--- a/source/dnode/mnode/impl/src/mndUser.c
+++ b/source/dnode/mnode/impl/src/mndUser.c
@@ -652,24 +652,26 @@ static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pB
if (pShow->pIter == NULL) break;
cols = 0;
-
- SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+ SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
char name[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->user, pShow->bytes[cols]);
colDataAppend(pColInfo, numOfRows, (const char*) name, false);
- pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+ cols++;
+ pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
const char* src = pUser->superUser? "super":"normal";
char b[10+VARSTR_HEADER_SIZE] = {0};
STR_WITH_SIZE_TO_VARSTR(b, src, strlen(src));
colDataAppend(pColInfo, numOfRows, (const char*) b, false);
- pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+ cols++;
+ pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
colDataAppend(pColInfo, numOfRows, (const char*) &pUser->createdTime, false);
+ cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->acct, pShow->bytes[cols]);
colDataAppend(pColInfo, numOfRows, (const char*) name, false);
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index 68e70da976..afcffa3d28 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -173,12 +173,12 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o
int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
tm.tm_year = mon / 12;
tm.tm_mon = mon % 12;
- tw->skey = convertTimePrecision((int64_t)mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision);
+ tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision);
mon = (int)(mon + interval);
tm.tm_year = mon / 12;
tm.tm_mon = mon % 12;
- tw->ekey = convertTimePrecision((int64_t)mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision);
+ tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision);
tw->ekey -= 1;
}
diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c
index 81831ab164..9e53eee5c8 100644
--- a/source/libs/parser/src/parInsert.c
+++ b/source/libs/parser/src/parInsert.c
@@ -312,6 +312,8 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time
if (pToken->type == TK_NOW) {
ts = taosGetTimestamp(timePrec);
+ } else if (pToken->type == TK_TODAY) {
+ ts = taosGetTimestampToday(timePrec);
} else if (pToken->type == TK_NK_INTEGER) {
bool isSigned = false;
toInteger(pToken->z, pToken->n, 10, &ts, &isSigned);
@@ -376,8 +378,8 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time
}
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
- if ((pToken->type != TK_NOW && pToken->type != TK_NK_INTEGER && pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
- pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN) ||
+ if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER && pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT &&
+ pToken->type != TK_NK_BOOL && pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN) ||
(pToken->n == 0) || (pToken->type == TK_NK_RP)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
}
diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c
index db62a6b33d..4d57af822e 100644
--- a/source/libs/scalar/src/sclfunc.c
+++ b/source/libs/scalar/src/sclfunc.c
@@ -841,7 +841,7 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
memmove(fraction, fraction + TSDB_TIME_PRECISION_SEC_DIGITS, TSDB_TIME_PRECISION_SEC_DIGITS);
}
- struct tm *tmInfo = localtime((const time_t *)&timeVal);
+ struct tm *tmInfo = taosLocalTime((const time_t *)&timeVal, NULL);
strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S%z", tmInfo);
int32_t len = (int32_t)strlen(buf);
diff --git a/source/os/src/osTime.c b/source/os/src/osTime.c
index 2b0de94880..9ea49b364e 100644
--- a/source/os/src/osTime.c
+++ b/source/os/src/osTime.c
@@ -406,7 +406,18 @@ FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) {
#endif
}
+time_t taosTime(time_t *t) {
+ return time(t);
+}
+
+time_t taosMktime(struct tm *timep) {
+ return mktime(timep);
+}
+
struct tm *taosLocalTime(const time_t *timep, struct tm *result) {
+ if (result == NULL) {
+ return localtime(timep);
+ }
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
localtime_s(result, timep);
#else
diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c
index 756893f217..16f6c42d87 100644
--- a/tests/test/c/tmqDemo.c
+++ b/tests/test/c/tmqDemo.c
@@ -13,76 +13,71 @@
* along with this program. If not, see .
*/
-// clang-format off
+#define ALLOW_FORBID_FUNC
#include
+#include
#include
+#include
#include
-#include
#include
#include
+#include
#include
-#include
-#include
#include "taos.h"
#include "taoserror.h"
#include "tlog.h"
-#define GREEN "\033[1;32m"
-#define NC "\033[0m"
+#define GREEN "\033[1;32m"
+#define NC "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b))
-#define MAX_SQL_STR_LEN (1024 * 1024)
-#define MAX_ROW_STR_LEN (16 * 1024)
+#define MAX_SQL_STR_LEN (1024 * 1024)
+#define MAX_ROW_STR_LEN (16 * 1024)
-enum _RUN_MODE {
- TMQ_RUN_INSERT_AND_CONSUME,
- TMQ_RUN_ONLY_INSERT,
- TMQ_RUN_ONLY_CONSUME,
- TMQ_RUN_MODE_BUTT
-};
+enum _RUN_MODE { TMQ_RUN_INSERT_AND_CONSUME, TMQ_RUN_ONLY_INSERT, TMQ_RUN_ONLY_CONSUME, TMQ_RUN_MODE_BUTT };
typedef struct {
- char dbName[32];
- char stbName[64];
- char resultFileName[256];
- char vnodeWalPath[256];
- int32_t numOfThreads;
- int32_t numOfTables;
- int32_t numOfVgroups;
- int32_t runMode;
- int32_t numOfColumn;
- double ratio;
- int32_t batchNumOfRow;
- int32_t totalRowsOfPerTbl;
- int64_t startTimestamp;
- int32_t showMsgFlag;
- int32_t simCase;
-
- int32_t totalRowsOfT2;
+ char dbName[32];
+ char stbName[64];
+ char resultFileName[256];
+ char vnodeWalPath[256];
+ int32_t numOfThreads;
+ int32_t numOfTables;
+ int32_t numOfVgroups;
+ int32_t runMode;
+ int32_t numOfColumn;
+ double ratio;
+ int32_t batchNumOfRow;
+ int32_t totalRowsOfPerTbl;
+ int64_t startTimestamp;
+ int32_t showMsgFlag;
+ int32_t simCase;
+
+ int32_t totalRowsOfT2;
} SConfInfo;
static SConfInfo g_stConfInfo = {
"tmqdb",
"stb",
- "./tmqResult.txt", // output_file
+ "./tmqResult.txt", // output_file
"", // /data2/dnode/data/vnode/vnode2/wal",
- 1, // threads
- 1, // tables
- 1, // vgroups
- 0, // run mode
- 1, // columns
- 1, // ratio
- 1, // batch size
- 10000, // total rows for per table
- 0, // 2020-01-01 00:00:00.000
- 0, // show consume msg switch
- 0, // if run in sim case
+ 1, // threads
+ 1, // tables
+ 1, // vgroups
+ 0, // run mode
+ 1, // columns
+ 1, // ratio
+ 1, // batch size
+ 10000, // total rows for per table
+ 0, // 2020-01-01 00:00:00.000
+ 0, // show consume msg switch
+ 0, // if run in sim case
10000,
};
-char* g_pRowValue = NULL;
+char* g_pRowValue = NULL;
TdFilePtr g_fp = NULL;
static void printHelp() {
@@ -125,10 +120,8 @@ static void printHelp() {
exit(EXIT_SUCCESS);
}
-void parseArgument(int32_t argc, char *argv[]) {
-
+void parseArgument(int32_t argc, char* argv[]) {
g_stConfInfo.startTimestamp = 1640966400000; // 2020-01-01 00:00:00.000
-
for (int32_t i = 1; i < argc; i++) {
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
@@ -156,7 +149,7 @@ void parseArgument(int32_t argc, char *argv[]) {
g_stConfInfo.batchNumOfRow = atoi(argv[++i]);
} else if (strcmp(argv[i], "-r") == 0) {
g_stConfInfo.totalRowsOfPerTbl = atoi(argv[++i]);
- } else if (strcmp(argv[i], "-l") == 0) {
+ } else if (strcmp(argv[i], "-l") == 0) {
g_stConfInfo.numOfColumn = atoi(argv[++i]);
} else if (strcmp(argv[i], "-q") == 0) {
g_stConfInfo.ratio = atof(argv[++i]);
@@ -168,7 +161,7 @@ void parseArgument(int32_t argc, char *argv[]) {
g_stConfInfo.simCase = atol(argv[++i]);
} else {
printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
- exit(-1);
+ exit(-1);
}
}
@@ -191,73 +184,71 @@ void parseArgument(int32_t argc, char *argv[]) {
pPrint("%s totalRowsOfT2:%d %s", GREEN, g_stConfInfo.totalRowsOfT2, NC);
pPrint("%s startTimestamp:%" PRId64" %s", GREEN, g_stConfInfo.startTimestamp, NC);
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
-#endif
+#endif
}
-static int running = 1;
+static int running = 1;
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
// calc dir size (not include itself 4096Byte)
-int64_t getDirectorySize(char *dir)
-{
- TdDirPtr pDir;
- TdDirEntryPtr pDirEntry;
- int64_t totalSize=0;
+int64_t getDirectorySize(char* dir) {
+ TdDirPtr pDir;
+ TdDirEntryPtr pDirEntry;
+ int64_t totalSize = 0;
- if ((pDir = taosOpenDir(dir)) == NULL) {
- fprintf(stderr, "Cannot open dir: %s\n", dir);
- return -1;
+ if ((pDir = taosOpenDir(dir)) == NULL) {
+ fprintf(stderr, "Cannot open dir: %s\n", dir);
+ return -1;
+ }
+
+ // lstat(dir, &statbuf);
+ // totalSize+=statbuf.st_size;
+
+ while ((pDirEntry = taosReadDir(pDir)) != NULL) {
+ char subdir[1024];
+ char* fileName = taosGetDirEntryName(pDirEntry);
+ sprintf(subdir, "%s/%s", dir, fileName);
+
+ // printf("===d_name: %s\n", entry->d_name);
+ if (taosIsDir(subdir)) {
+ if (strcmp(".", fileName) == 0 || strcmp("..", fileName) == 0) {
+ continue;
+ }
+
+ int64_t subDirSize = getDirectorySize(subdir);
+ totalSize += subDirSize;
+ } else if (0 == strcmp(strchr(fileName, '.'), ".log")) { // only calc .log file size, and not include .idx file
+ int64_t file_size = 0;
+ taosStatFile(subdir, &file_size, NULL);
+ totalSize += file_size;
}
+ }
- //lstat(dir, &statbuf);
- //totalSize+=statbuf.st_size;
-
- while ((pDirEntry = taosReadDir(pDir)) != NULL) {
- char subdir[1024];
- char* fileName = taosGetDirEntryName(pDirEntry);
- sprintf(subdir, "%s/%s", dir, fileName);
-
- //printf("===d_name: %s\n", entry->d_name);
- if (taosIsDir(subdir)) {
- if (strcmp(".", fileName) == 0 || strcmp("..", fileName) == 0) {
- continue;
- }
-
- int64_t subDirSize = getDirectorySize(subdir);
- totalSize+=subDirSize;
- } else if (0 == strcmp(strchr(fileName, '.'), ".log")) { // only calc .log file size, and not include .idx file
- int64_t file_size = 0;
- taosStatFile(subdir, &file_size, NULL);
- totalSize+=file_size;
- }
- }
-
- taosCloseDir(pDir);
- return totalSize;
+ taosCloseDir(pDir);
+ return totalSize;
}
-
-int queryDB(TAOS *taos, char *command) {
- TAOS_RES *pRes = taos_query(taos, command);
- int code = taos_errno(pRes);
- //if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
- if (code != 0) {
- pError("failed to reason:%s, sql: %s", tstrerror(code), command);
- taos_free_result(pRes);
- return -1;
- }
- taos_free_result(pRes);
- return 0 ;
+int queryDB(TAOS* taos, char* command) {
+ TAOS_RES* pRes = taos_query(taos, command);
+ int code = taos_errno(pRes);
+ // if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
+ if (code != 0) {
+ pError("failed to reason:%s, sql: %s", tstrerror(code), command);
+ taos_free_result(pRes);
+ return -1;
+ }
+ taos_free_result(pRes);
+ return 0;
}
int32_t init_env() {
char sqlStr[1024] = {0};
-
+
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
-
+
sprintf(sqlStr, "create database if not exists %s vgroups %d", g_stConfInfo.dbName, g_stConfInfo.numOfVgroups);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
@@ -282,19 +273,19 @@ int32_t init_env() {
int32_t dataLen = 0;
int32_t sqlLen = 0;
- sqlLen += sprintf(sqlStr+sqlLen, "create stable if not exists %s (ts timestamp, ", g_stConfInfo.stbName);
+ sqlLen += sprintf(sqlStr + sqlLen, "create stable if not exists %s (ts timestamp, ", g_stConfInfo.stbName);
for (int32_t i = 0; i < g_stConfInfo.numOfColumn; i++) {
- if (i == g_stConfInfo.numOfColumn - 1) {
- sqlLen += sprintf(sqlStr+sqlLen, "c%d int) ", i);
- memcpy(g_pRowValue + dataLen, "66778899", strlen("66778899"));
- dataLen += strlen("66778899");
- } else {
- sqlLen += sprintf(sqlStr+sqlLen, "c%d int, ", i);
- memcpy(g_pRowValue + dataLen, "66778899, ", strlen("66778899, "));
- dataLen += strlen("66778899, ");
- }
+ if (i == g_stConfInfo.numOfColumn - 1) {
+ sqlLen += sprintf(sqlStr + sqlLen, "c%d int) ", i);
+ memcpy(g_pRowValue + dataLen, "66778899", strlen("66778899"));
+ dataLen += strlen("66778899");
+ } else {
+ sqlLen += sprintf(sqlStr + sqlLen, "c%d int, ", i);
+ memcpy(g_pRowValue + dataLen, "66778899, ", strlen("66778899, "));
+ dataLen += strlen("66778899, ");
+ }
}
- sqlLen += sprintf(sqlStr+sqlLen, "tags (t0 int)");
+ sqlLen += sprintf(sqlStr + sqlLen, "tags (t0 int)");
pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
@@ -313,7 +304,7 @@ int32_t init_env() {
taos_free_result(pRes);
}
- //const char* sql = "select * from tu1";
+ // const char* sql = "select * from tu1";
sprintf(sqlStr, "create topic test_stb_topic_1 as select ts,c0 from %s", g_stConfInfo.stbName);
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/
pRes = taos_query(pConn, sqlStr);
@@ -327,6 +318,7 @@ int32_t init_env() {
}
tmq_t* build_consumer() {
+#if 0
char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
@@ -338,10 +330,15 @@ tmq_t* build_consumer() {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
+#endif
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
- tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
+ tmq_conf_set(conf, "td.connect.user", "root");
+ tmq_conf_set(conf, "td.connect.pass", "taosdata");
+ tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
+ tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
+ tmq_conf_destroy(conf);
return tmq;
}
@@ -396,20 +393,20 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
if (tmqmessage) {
batchCnt++;
/*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
- if (0 != g_stConfInfo.showMsgFlag) {
+ if (0 != g_stConfInfo.showMsgFlag) {
/*msg_process(tmqmessage);*/
- }
+ }
tmq_message_destroy(tmqmessage);
} else {
break;
}
}
int64_t endTime = taosGetTimestampUs();
- double consumeTime = (double)(endTime - startTime) / 1000000;
+ double consumeTime = (double)(endTime - startTime) / 1000000;
if (batchCnt != totalMsgs) {
- printf("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC);
- /*exit(-1);*/
+ printf("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC);
+ /*exit(-1);*/
}
if (0 == g_stConfInfo.simCase) {
@@ -417,12 +414,14 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
} else {
printf("{consume success: %d}", totalMsgs);
}
- taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.2f| %10.2f |\n", batchCnt, consumeTime, (double)batchCnt / consumeTime, (double)walLogSize / (1024 * 1024.0) / consumeTime, (double)walLogSize / 1024.0 / batchCnt);
+ taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.2f| %10.2f |\n", batchCnt, consumeTime,
+ (double)batchCnt / consumeTime, (double)walLogSize / (1024 * 1024.0) / consumeTime,
+ (double)walLogSize / 1024.0 / batchCnt);
err = tmq_consumer_close(tmq);
if (err) {
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
- exit(-1);
+ exit(-1);
}
}
@@ -430,7 +429,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
int32_t syncWriteData() {
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (pConn == NULL) {
- return -1;
+ return -1;
}
char sqlStr[1024] = {0};
@@ -449,11 +448,11 @@ int32_t syncWriteData() {
}
int32_t totalMsgs = 0;
-
+
int64_t time_counter = g_stConfInfo.startTimestamp;
for (int i = 0; i < g_stConfInfo.totalRowsOfPerTbl;) {
for (int tID = 0; tID <= g_stConfInfo.numOfTables - 1; tID++) {
- int inserted = i;
+ int inserted = i;
int64_t tmp_time = time_counter;
int32_t data_len = 0;
@@ -465,22 +464,22 @@ int32_t syncWriteData() {
k++;
if (inserted >= g_stConfInfo.totalRowsOfPerTbl) {
- break;
+ break;
}
- if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) {
+ if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) {
break;
- }
+ }
}
int code = queryDB(pConn, buffer);
- if (0 != code){
+ if (0 != code) {
fprintf(stderr, "insert data error!\n");
- taosMemoryFreeClear(buffer);
- return -1;
- }
+ taosMemoryFreeClear(buffer);
+ return -1;
+ }
- totalMsgs++;
+ totalMsgs++;
if (tID == g_stConfInfo.numOfTables - 1) {
i = inserted;
@@ -492,12 +491,11 @@ int32_t syncWriteData() {
return totalMsgs;
}
-
// sync insertion
int32_t syncWriteDataByRatio() {
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (pConn == NULL) {
- return -1;
+ return -1;
}
char sqlStr[1024] = {0};
@@ -518,27 +516,27 @@ int32_t syncWriteDataByRatio() {
int32_t totalMsgs = 0;
int32_t insertedOfT1 = 0;
- int32_t insertedOfT2 = 0;
+ int32_t insertedOfT2 = 0;
int64_t tsOfT1 = g_stConfInfo.startTimestamp;
int64_t tsOfT2 = g_stConfInfo.startTimestamp;
int64_t tmp_time;
-
+
for (;;) {
- if ((insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) && (insertedOfT2 >= g_stConfInfo.totalRowsOfT2)) {
+ if ((insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) && (insertedOfT2 >= g_stConfInfo.totalRowsOfT2)) {
break;
- }
-
+ }
+
for (int tID = 0; tID <= g_stConfInfo.numOfTables - 1; tID++) {
if (0 == tID) {
- tmp_time = tsOfT1;
+ tmp_time = tsOfT1;
if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) {
- continue;
+ continue;
}
- } else if (1 == tID){
- tmp_time = tsOfT2;
+ } else if (1 == tID) {
+ tmp_time = tsOfT2;
if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) {
- continue;
+ continue;
}
}
@@ -548,79 +546,86 @@ int32_t syncWriteDataByRatio() {
for (k = 0; k < g_stConfInfo.batchNumOfRow;) {
data_len += sprintf(buffer + data_len, "(%" PRId64 ", %s) ", tmp_time++, g_pRowValue);
k++;
- if (0 == tID) {
+ if (0 == tID) {
insertedOfT1++;
- if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) {
- break;
- }
- } else if (1 == tID){
+ if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) {
+ break;
+ }
+ } else if (1 == tID) {
insertedOfT2++;
- if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) {
- break;
- }
- }
+ if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) {
+ break;
+ }
+ }
- if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) {
+ if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) {
break;
- }
+ }
}
int code = queryDB(pConn, buffer);
- if (0 != code){
+ if (0 != code) {
fprintf(stderr, "insert data error!\n");
- taosMemoryFreeClear(buffer);
- return -1;
- }
-
- if (0 == tID) {
- tsOfT1 = tmp_time;
- } else if (1 == tID){
- tsOfT2 = tmp_time;
+ taosMemoryFreeClear(buffer);
+ return -1;
}
- totalMsgs++;
+ if (0 == tID) {
+ tsOfT1 = tmp_time;
+ } else if (1 == tID) {
+ tsOfT2 = tmp_time;
+ }
+
+ totalMsgs++;
}
}
- pPrint("expect insert rows: T1[%d] T2[%d], actual insert rows: T1[%d] T2[%d]\n", g_stConfInfo.totalRowsOfPerTbl, g_stConfInfo.totalRowsOfT2, insertedOfT1, insertedOfT2);
+ pPrint("expect insert rows: T1[%d] T2[%d], actual insert rows: T1[%d] T2[%d]\n", g_stConfInfo.totalRowsOfPerTbl,
+ g_stConfInfo.totalRowsOfT2, insertedOfT1, insertedOfT2);
taosMemoryFreeClear(buffer);
return totalMsgs;
}
void printParaIntoFile() {
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
- TdFilePtr pFile = taosOpenFile(g_stConfInfo.resultFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
+ TdFilePtr pFile =
+ taosOpenFile(g_stConfInfo.resultFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", g_stConfInfo.resultFileName);
- exit -1;
+ exit - 1;
};
g_fp = pFile;
- time_t tTime = taosGetTimestampSec();
+ time_t tTime = taosGetTimestampSec();
struct tm tm = *localtime(&tTime);
taosFprintfFile(pFile, "###################################################################\n");
- taosFprintfFile(pFile, "# configDir: %s\n", configDir);
- taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName);
- taosFprintfFile(pFile, "# stbName: %s\n", g_stConfInfo.stbName);
- taosFprintfFile(pFile, "# vnodeWalPath: %s\n", g_stConfInfo.vnodeWalPath);
- taosFprintfFile(pFile, "# numOfTables: %d\n", g_stConfInfo.numOfTables);
- taosFprintfFile(pFile, "# numOfThreads: %d\n", g_stConfInfo.numOfThreads);
- taosFprintfFile(pFile, "# numOfVgroups: %d\n", g_stConfInfo.numOfVgroups);
- taosFprintfFile(pFile, "# runMode: %d\n", g_stConfInfo.runMode);
- taosFprintfFile(pFile, "# ratio: %f\n", g_stConfInfo.ratio);
- taosFprintfFile(pFile, "# numOfColumn: %d\n", g_stConfInfo.numOfColumn);
- taosFprintfFile(pFile, "# batchNumOfRow: %d\n", g_stConfInfo.batchNumOfRow);
- taosFprintfFile(pFile, "# totalRowsOfPerTbl: %d\n", g_stConfInfo.totalRowsOfPerTbl);
- taosFprintfFile(pFile, "# totalRowsOfT2: %d\n", g_stConfInfo.totalRowsOfT2);
+ taosFprintfFile(pFile, "# configDir: %s\n", configDir);
+ taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName);
+ taosFprintfFile(pFile, "# stbName: %s\n", g_stConfInfo.stbName);
+ taosFprintfFile(pFile, "# vnodeWalPath: %s\n", g_stConfInfo.vnodeWalPath);
+ taosFprintfFile(pFile, "# numOfTables: %d\n", g_stConfInfo.numOfTables);
+ taosFprintfFile(pFile, "# numOfThreads: %d\n", g_stConfInfo.numOfThreads);
+ taosFprintfFile(pFile, "# numOfVgroups: %d\n", g_stConfInfo.numOfVgroups);
+ taosFprintfFile(pFile, "# runMode: %d\n", g_stConfInfo.runMode);
+ taosFprintfFile(pFile, "# ratio: %f\n", g_stConfInfo.ratio);
+ taosFprintfFile(pFile, "# numOfColumn: %d\n", g_stConfInfo.numOfColumn);
+ taosFprintfFile(pFile, "# batchNumOfRow: %d\n", g_stConfInfo.batchNumOfRow);
+ taosFprintfFile(pFile, "# totalRowsOfPerTbl: %d\n", g_stConfInfo.totalRowsOfPerTbl);
+ taosFprintfFile(pFile, "# totalRowsOfT2: %d\n", g_stConfInfo.totalRowsOfT2);
taosFprintfFile(pFile, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
- tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
+ tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
taosFprintfFile(pFile, "###################################################################\n");
- taosFprintfFile(pFile, "|-------------------------------insert info-----------------------------|--------------------------------consume info---------------------------------|\n");
- taosFprintfFile(pFile, "|batch size| insert msgs | insert time(s) | msgs/s | walLogSize(MB) | consume msgs | consume time(s) | msgs/s | MB/s | avg msg size(KB) |\n");
+ taosFprintfFile(pFile,
+ "|-------------------------------insert "
+ "info-----------------------------|--------------------------------consume "
+ "info---------------------------------|\n");
+ taosFprintfFile(pFile,
+ "|batch size| insert msgs | insert time(s) | msgs/s | walLogSize(MB) | consume msgs | consume "
+ "time(s) | msgs/s | MB/s | avg msg size(KB) |\n");
taosFprintfFile(g_fp, "|%10d", g_stConfInfo.batchNumOfRow);
}
-int main(int32_t argc, char *argv[]) {
+int main(int32_t argc, char* argv[]) {
parseArgument(argc, argv);
printParaIntoFile();
@@ -630,70 +635,70 @@ int main(int32_t argc, char *argv[]) {
code = init_env();
if (code != 0) {
fprintf(stderr, "%% init_env error!\n");
- return -1;
+ return -1;
}
int32_t totalMsgs = 0;
if (g_stConfInfo.runMode != TMQ_RUN_ONLY_CONSUME) {
-
int64_t startTs = taosGetTimestampUs();
if (1 == g_stConfInfo.ratio) {
- totalMsgs = syncWriteData();
+ totalMsgs = syncWriteData();
} else {
- totalMsgs = syncWriteDataByRatio();
- }
-
- if (totalMsgs <= 0) {
- pError("inset data error!\n");
- return -1;
- }
- int64_t endTs = taosGetTimestampUs();
- int64_t delay = endTs - startTs;
-
- int32_t totalRows = 0;
- if (1 == g_stConfInfo.ratio) {
- totalRows = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.numOfTables;
- } else {
- totalRows = g_stConfInfo.totalRowsOfPerTbl * (1 + g_stConfInfo.ratio);
- }
-
- float seconds = delay / 1000000.0;
- float rowsSpeed = totalRows / seconds;
- float msgsSpeed = totalMsgs / seconds;
-
-
- if ((0 == g_stConfInfo.simCase) && (strlen(g_stConfInfo.vnodeWalPath))) {
- walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath);
- if (walLogSize <= 0) {
- printf("%s size incorrect!", g_stConfInfo.vnodeWalPath);
- exit(-1);
- } else {
- pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize/(1024 * 1024.0));
- }
- }
-
- if (0 == g_stConfInfo.simCase) {
- pPrint("insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second\n", totalRows, totalMsgs, seconds, rowsSpeed, msgsSpeed);
+ totalMsgs = syncWriteDataByRatio();
}
- taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.3f ", totalMsgs, seconds, msgsSpeed, (double)walLogSize/(1024 * 1024.0));
+
+ if (totalMsgs <= 0) {
+ pError("inset data error!\n");
+ return -1;
+ }
+ int64_t endTs = taosGetTimestampUs();
+ int64_t delay = endTs - startTs;
+
+ int32_t totalRows = 0;
+ if (1 == g_stConfInfo.ratio) {
+ totalRows = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.numOfTables;
+ } else {
+ totalRows = g_stConfInfo.totalRowsOfPerTbl * (1 + g_stConfInfo.ratio);
+ }
+
+ float seconds = delay / 1000000.0;
+ float rowsSpeed = totalRows / seconds;
+ float msgsSpeed = totalMsgs / seconds;
+
+ if ((0 == g_stConfInfo.simCase) && (strlen(g_stConfInfo.vnodeWalPath))) {
+ walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath);
+ if (walLogSize <= 0) {
+ printf("%s size incorrect!", g_stConfInfo.vnodeWalPath);
+ exit(-1);
+ } else {
+ pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize / (1024 * 1024.0));
+ }
+ }
+
+ if (0 == g_stConfInfo.simCase) {
+ pPrint("insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second\n", totalRows,
+ totalMsgs, seconds, rowsSpeed, msgsSpeed);
+ }
+ taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.3f ", totalMsgs, seconds, msgsSpeed,
+ (double)walLogSize / (1024 * 1024.0));
}
if (g_stConfInfo.runMode == TMQ_RUN_ONLY_INSERT) {
return 0;
}
-
- tmq_t* tmq = build_consumer();
+
+ tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
- if ((NULL == tmq) || (NULL == topic_list)){
+ if ((NULL == tmq) || (NULL == topic_list)) {
return -1;
}
-
+
perf_loop(tmq, topic_list, totalMsgs, walLogSize);
taosMemoryFreeClear(g_pRowValue);
- taosFprintfFile(g_fp, "\n");
- taosCloseFile(&g_fp);
+ taosFprintfFile(g_fp, "\n");
+ taosCloseFile(&g_fp);
return 0;
}
diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c
index 7862788a71..eaca8f151e 100644
--- a/tests/test/c/tmqSim.c
+++ b/tests/test/c/tmqSim.c
@@ -35,8 +35,8 @@
#define MAX_ROW_STR_LEN (16 * 1024)
typedef struct {
- int32_t expectMsgCnt;
- int32_t consumeMsgCnt;
+ int32_t expectMsgCnt;
+ int32_t consumeMsgCnt;
TdThread thread;
} SThreadInfo;
@@ -45,12 +45,12 @@ typedef struct {
char dbName[32];
char topicString[256];
char keyString[1024];
- char topicString1[256];
- char keyString1[1024];
+ char topicString1[256];
+ char keyString1[1024];
int32_t showMsgFlag;
int32_t consumeDelay; // unit s
int32_t consumeMsgCnt;
- int32_t checkMode;
+ int32_t checkMode;
// save result after parse agrvs
int32_t numOfTopic;
@@ -59,13 +59,13 @@ typedef struct {
int32_t numOfKey;
char key[32][64];
char value[32][64];
-
- int32_t numOfTopic1;
- char topics1[32][64];
-
- int32_t numOfKey1;
- char key1[32][64];
- char value1[32][64];
+
+ int32_t numOfTopic1;
+ char topics1[32][64];
+
+ int32_t numOfKey1;
+ char key1[32][64];
+ char value1[32][64];
} SConfInfo;
static SConfInfo g_stConfInfo;
@@ -186,18 +186,18 @@ void parseInputString() {
ltrim(g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.numOfTopic++;
-
+
token = strtok(NULL, delim);
- }
+ }
token = strtok(g_stConfInfo.topicString1, delim);
- while(token != NULL) {
- //printf("%s\n", token );
- strcpy(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1], token);
+ while (token != NULL) {
+ // printf("%s\n", token );
+ strcpy(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1], token);
ltrim(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1]);
- //printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
- g_stConfInfo.numOfTopic1++;
-
+ // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
+ g_stConfInfo.numOfTopic1++;
+
token = strtok(NULL, delim);
}
@@ -214,28 +214,28 @@ void parseInputString() {
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.numOfKey++;
}
-
+
token = strtok(NULL, delim);
}
token = strtok(g_stConfInfo.keyString1, delim);
- while(token != NULL) {
- //printf("%s\n", token );
- {
- char* pstr = token;
- ltrim(pstr);
- char *ret = strchr(pstr, ch);
- memcpy(g_stConfInfo.key1[g_stConfInfo.numOfKey1], pstr, ret-pstr);
- strcpy(g_stConfInfo.value1[g_stConfInfo.numOfKey1], ret+1);
- //printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], g_stConfInfo.value[g_stConfInfo.numOfKey]);
- g_stConfInfo.numOfKey1++;
+ while (token != NULL) {
+ // printf("%s\n", token );
+ {
+ char* pstr = token;
+ ltrim(pstr);
+ char* ret = strchr(pstr, ch);
+ memcpy(g_stConfInfo.key1[g_stConfInfo.numOfKey1], pstr, ret - pstr);
+ strcpy(g_stConfInfo.value1[g_stConfInfo.numOfKey1], ret + 1);
+ // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
+ // g_stConfInfo.value[g_stConfInfo.numOfKey]);
+ g_stConfInfo.numOfKey1++;
}
-
+
token = strtok(NULL, delim);
}
}
-
static int running = 1;
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
@@ -253,6 +253,7 @@ int queryDB(TAOS* taos, char* command) {
}
tmq_t* build_consumer() {
+#if 0
char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
@@ -266,13 +267,19 @@ tmq_t* build_consumer() {
exit(-1);
}
taos_free_result(pRes);
+#endif
tmq_conf_t* conf = tmq_conf_new();
// tmq_conf_set(conf, "group.id", "tg2");
for (int32_t i = 0; i < g_stConfInfo.numOfKey; i++) {
tmq_conf_set(conf, g_stConfInfo.key[i], g_stConfInfo.value[i]);
}
- tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
+ tmq_conf_set(conf, "td.connect.user", "root");
+ tmq_conf_set(conf, "td.connect.pass", "taosdata");
+ tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
+ tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
+ assert(tmq);
+ tmq_conf_destroy(conf);
return tmq;
}
@@ -285,10 +292,10 @@ tmq_list_t* build_topic_list() {
return topic_list;
}
-
tmq_t* build_consumer_x() {
+#if 0
char sqlStr[1024] = {0};
-
+
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@@ -296,23 +303,29 @@ tmq_t* build_consumer_x() {
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
- taos_free_result(pRes);
- exit(-1);
+ taos_free_result(pRes);
+ exit(-1);
}
taos_free_result(pRes);
+#endif
tmq_conf_t* conf = tmq_conf_new();
- //tmq_conf_set(conf, "group.id", "tg2");
+ // tmq_conf_set(conf, "group.id", "tg2");
for (int32_t i = 0; i < g_stConfInfo.numOfKey1; i++) {
tmq_conf_set(conf, g_stConfInfo.key1[i], g_stConfInfo.value1[i]);
}
- tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
+ tmq_conf_set(conf, "td.connect.user", "root");
+ tmq_conf_set(conf, "td.connect.pass", "taosdata");
+ tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
+ tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
+ assert(tmq);
+ tmq_conf_destroy(conf);
return tmq;
}
tmq_list_t* build_topic_list_x() {
tmq_list_t* topic_list = tmq_list_new();
- //tmq_list_append(topic_list, "test_stb_topic_1");
+ // tmq_list_append(topic_list, "test_stb_topic_1");
for (int32_t i = 0; i < g_stConfInfo.numOfTopic1; i++) {
tmq_list_append(topic_list, g_stConfInfo.topics1[i]);
}
@@ -367,9 +380,9 @@ int32_t parallel_consume(tmq_t* tmq, int threadLable) {
if (tmqMsg) {
totalMsgs++;
- //printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs);
+ // printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs);
- #if 0
+#if 0
TAOS_ROW row;
while (NULL != (row = tmq_get_row(tmqMsg))) {
totalRows++;
@@ -396,65 +409,63 @@ int32_t parallel_consume(tmq_t* tmq, int threadLable) {
exit(-1);
}
- //printf("%d", totalMsgs); // output to sim for check result
+ // printf("%d", totalMsgs); // output to sim for check result
return totalMsgs;
}
-
-void *threadFunc(void *param) {
+void* threadFunc(void* param) {
int32_t totalMsgs = 0;
- SThreadInfo *pInfo = (SThreadInfo *)param;
+ SThreadInfo* pInfo = (SThreadInfo*)param;
- tmq_t* tmq = build_consumer_x();
+ tmq_t* tmq = build_consumer_x();
tmq_list_t* topic_list = build_topic_list_x();
- if ((NULL == tmq) || (NULL == topic_list)){
+ if ((NULL == tmq) || (NULL == topic_list)) {
return NULL;
}
-
+
tmq_resp_err_t err = tmq_subscribe(tmq, topic_list);
if (err) {
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
}
- //if (0 == g_stConfInfo.consumeMsgCnt) {
- // loop_consume(tmq);
- //} else {
- pInfo->consumeMsgCnt = parallel_consume(tmq, 1);
+ // if (0 == g_stConfInfo.consumeMsgCnt) {
+ // loop_consume(tmq);
+ // } else {
+ pInfo->consumeMsgCnt = parallel_consume(tmq, 1);
//}
err = tmq_unsubscribe(tmq);
if (err) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
- pInfo->consumeMsgCnt = -1;
+ pInfo->consumeMsgCnt = -1;
return NULL;
}
return NULL;
}
-
int main(int32_t argc, char* argv[]) {
parseArgument(argc, argv);
parseInputString();
- int32_t numOfThreads = 1;
+ int32_t numOfThreads = 1;
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
- SThreadInfo *pInfo = (SThreadInfo *)taosMemoryCalloc(numOfThreads, sizeof(SThreadInfo));
+ SThreadInfo* pInfo = (SThreadInfo*)taosMemoryCalloc(numOfThreads, sizeof(SThreadInfo));
if (g_stConfInfo.numOfTopic1) {
// pthread_create one thread to consume
for (int32_t i = 0; i < numOfThreads; ++i) {
pInfo[i].expectMsgCnt = 0;
pInfo[i].consumeMsgCnt = 0;
- taosThreadCreate(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i));
+ taosThreadCreate(&(pInfo[i].thread), &thattr, threadFunc, (void*)(pInfo + i));
}
}
- int32_t totalMsgs = 0;
+ int32_t totalMsgs = 0;
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
if ((NULL == tmq) || (NULL == topic_list)) {
@@ -479,48 +490,48 @@ int main(int32_t argc, char* argv[]) {
exit(-1);
}
- if (g_stConfInfo.numOfTopic1) {
+ if (g_stConfInfo.numOfTopic1) {
for (int32_t i = 0; i < numOfThreads; i++) {
taosThreadJoin(pInfo[i].thread, NULL);
}
- //printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
- if (0 == g_stConfInfo.checkMode) {
- if ((totalMsgs + pInfo->consumeMsgCnt) == g_stConfInfo.consumeMsgCnt) {
- printf("success");
+ // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
+ if (0 == g_stConfInfo.checkMode) {
+ if ((totalMsgs + pInfo->consumeMsgCnt) == g_stConfInfo.consumeMsgCnt) {
+ printf("success");
} else {
- printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
- }
- } else if (1 == g_stConfInfo.checkMode) {
+ printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
+ }
+ } else if (1 == g_stConfInfo.checkMode) {
if ((totalMsgs == g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt)) {
- printf("success");
+ printf("success");
} else {
- printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
- }
- } else if (2 == g_stConfInfo.checkMode) {
- if ((totalMsgs + pInfo->consumeMsgCnt) == 3 * g_stConfInfo.consumeMsgCnt) {
- printf("success");
+ printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
+ }
+ } else if (2 == g_stConfInfo.checkMode) {
+ if ((totalMsgs + pInfo->consumeMsgCnt) == 3 * g_stConfInfo.consumeMsgCnt) {
+ printf("success");
} else {
- printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
- }
- } else if (3 == g_stConfInfo.checkMode) {
+ printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
+ }
+ } else if (3 == g_stConfInfo.checkMode) {
if ((totalMsgs == 2 * g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt)) {
- printf("success");
+ printf("success");
} else {
- printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
- }
- } else if (4 == g_stConfInfo.checkMode) {
- if (((totalMsgs == 0) && (pInfo->consumeMsgCnt == 3 * g_stConfInfo.consumeMsgCnt))
- || ((pInfo->consumeMsgCnt == 0) && (totalMsgs == 3 * g_stConfInfo.consumeMsgCnt))
- || ((pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt) && (totalMsgs == 2 * g_stConfInfo.consumeMsgCnt))
- || ((pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt) && (totalMsgs == g_stConfInfo.consumeMsgCnt))) {
- printf("success");
+ printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
+ }
+ } else if (4 == g_stConfInfo.checkMode) {
+ if (((totalMsgs == 0) && (pInfo->consumeMsgCnt == 3 * g_stConfInfo.consumeMsgCnt)) ||
+ ((pInfo->consumeMsgCnt == 0) && (totalMsgs == 3 * g_stConfInfo.consumeMsgCnt)) ||
+ ((pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt) && (totalMsgs == 2 * g_stConfInfo.consumeMsgCnt)) ||
+ ((pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt) && (totalMsgs == g_stConfInfo.consumeMsgCnt))) {
+ printf("success");
} else {
- printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
- }
- } else {
- printf("fail, check mode unknow. consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
- }
+ printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
+ }
+ } else {
+ printf("fail, check mode unknow. consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
+ }
}
return 0;
diff --git a/tests/tsim/src/simExe.c b/tests/tsim/src/simExe.c
index 8ef5c816c8..d713233362 100644
--- a/tests/tsim/src/simExe.c
+++ b/tests/tsim/src/simExe.c
@@ -678,7 +678,7 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
if (tt < 0) tt = 0;
#endif
- tp = localtime(&tt);
+ tp = taosLocalTime(&tt, NULL);
strftime(timeStr, 64, "%y-%m-%d %H:%M:%S", tp);
if (precision == TSDB_TIME_PRECISION_MILLI) {
sprintf(value, "%s.%03d", timeStr, (int32_t)(*((int64_t *)row[i]) % 1000));
diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c
index b89d517ad3..1809d99209 100644
--- a/tools/shell/src/shellEngine.c
+++ b/tools/shell/src/shellEngine.c
@@ -452,7 +452,7 @@ static char *formatTimestamp(char *buf, int64_t val, int precision) {
}
}
- struct tm *ptm = localtime(&tt);
+ struct tm *ptm = taosLocalTime(&tt, NULL);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) {