Merge branch '3.0' into fix/TD-17381

This commit is contained in:
Ganlin Zhao 2022-07-18 17:34:28 +08:00
commit e5aadc9323
13 changed files with 321 additions and 123 deletions

View File

@ -76,15 +76,19 @@ IF ("${CPUTYPE}" STREQUAL "")
IF (CMAKE_SYSTEM_PROCESSOR MATCHES "(amd64)|(AMD64)") IF (CMAKE_SYSTEM_PROCESSOR MATCHES "(amd64)|(AMD64)")
MESSAGE(STATUS "The current platform is amd64") MESSAGE(STATUS "The current platform is amd64")
SET(PLATFORM_ARCH_STR "amd64") SET(PLATFORM_ARCH_STR "amd64")
SET(TD_INTEL_64 TRUE)
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "(x86)|(X86)") ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "(x86)|(X86)")
MESSAGE(STATUS "The current platform is x86") MESSAGE(STATUS "The current platform is x86")
SET(PLATFORM_ARCH_STR "i386") SET(PLATFORM_ARCH_STR "i386")
SET(TD_INTEL_32 TRUE)
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "armv7l") ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "armv7l")
MESSAGE(STATUS "The current platform is aarch32") MESSAGE(STATUS "The current platform is aarch32")
SET(PLATFORM_ARCH_STR "arm") SET(PLATFORM_ARCH_STR "arm")
SET(TD_ARM_32 TRUE)
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64") ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64")
MESSAGE(STATUS "The current platform is aarch64") MESSAGE(STATUS "The current platform is aarch64")
SET(PLATFORM_ARCH_STR "arm64") SET(PLATFORM_ARCH_STR "arm64")
SET(TD_ARM_64 TRUE)
ENDIF () ENDIF ()
ELSE () ELSE ()
# if generate ARM version: # if generate ARM version:
@ -92,18 +96,23 @@ ELSE ()
IF (${CPUTYPE} MATCHES "aarch32") IF (${CPUTYPE} MATCHES "aarch32")
SET(PLATFORM_ARCH_STR "arm") SET(PLATFORM_ARCH_STR "arm")
MESSAGE(STATUS "input cpuType: aarch32") MESSAGE(STATUS "input cpuType: aarch32")
SET(TD_ARM_32 TRUE)
ELSEIF (${CPUTYPE} MATCHES "aarch64") ELSEIF (${CPUTYPE} MATCHES "aarch64")
SET(PLATFORM_ARCH_STR "arm64") SET(PLATFORM_ARCH_STR "arm64")
MESSAGE(STATUS "input cpuType: aarch64") MESSAGE(STATUS "input cpuType: aarch64")
SET(TD_ARM_64 TRUE)
ELSEIF (${CPUTYPE} MATCHES "mips64") ELSEIF (${CPUTYPE} MATCHES "mips64")
SET(PLATFORM_ARCH_STR "mips") SET(PLATFORM_ARCH_STR "mips")
MESSAGE(STATUS "input cpuType: mips64") MESSAGE(STATUS "input cpuType: mips64")
SET(TD_MIPS_64 TRUE)
ELSEIF (${CPUTYPE} MATCHES "x64") ELSEIF (${CPUTYPE} MATCHES "x64")
SET(PLATFORM_ARCH_STR "amd64") SET(PLATFORM_ARCH_STR "amd64")
MESSAGE(STATUS "input cpuType: x64") MESSAGE(STATUS "input cpuType: x64")
SET(TD_INTEL_64 TRUE)
ELSEIF (${CPUTYPE} MATCHES "x86") ELSEIF (${CPUTYPE} MATCHES "x86")
SET(PLATFORM_ARCH_STR "i386") SET(PLATFORM_ARCH_STR "i386")
MESSAGE(STATUS "input cpuType: x86") MESSAGE(STATUS "input cpuType: x86")
SET(TD_INTEL_32 TRUE)
ELSE () ELSE ()
MESSAGE(STATUS "input cpuType unknown " ${CPUTYPE}) MESSAGE(STATUS "input cpuType unknown " ${CPUTYPE})
ENDIF () ENDIF ()

View File

@ -63,12 +63,13 @@ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) {
: (precision == TSDB_TIME_PRECISION_MICRO) ? 1000000 : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000000
: 1000000000; : 1000000000;
time_t t = taosTime(NULL); time_t t = taosTime(NULL);
struct tm* tm = taosLocalTime(&t, NULL); struct tm tm;
tm->tm_hour = 0; taosLocalTime(&t, &tm);
tm->tm_min = 0; tm.tm_hour = 0;
tm->tm_sec = 0; tm.tm_min = 0;
tm.tm_sec = 0;
return (int64_t)taosMktime(tm) * factor; return (int64_t)taosMktime(&tm) * factor;
} }
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision); int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);

View File

@ -1661,9 +1661,6 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
} }
*/ */
#ifdef WINDOWS
if (tt < 0) tt = 0;
#endif
if (tt <= 0 && ms < 0) { if (tt <= 0 && ms < 0) {
tt--; tt--;
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_NANO) {
@ -1674,9 +1671,9 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
ms += 1000; ms += 1000;
} }
} }
struct tm ptm = {0};
struct tm* ptm = taosLocalTime(&tt, NULL); taosLocalTime(&tt, &ptm);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", ms); sprintf(buf + pos, ".%09d", ms);

View File

@ -902,7 +902,7 @@ const char* fmtts(int64_t ts) {
void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision) { void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision) {
char ts[40] = {0}; char ts[40] = {0};
struct tm* ptm; struct tm ptm;
int32_t fractionLen; int32_t fractionLen;
char* format = NULL; char* format = NULL;
@ -939,10 +939,10 @@ void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision)
assert(false); assert(false);
} }
ptm = taosLocalTime(&quot, NULL); taosLocalTime(&quot, &ptm);
int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", ptm); int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", &ptm);
length += snprintf(ts + length, fractionLen, format, mod); length += snprintf(ts + length, fractionLen, format, mod);
length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm); length += (int32_t)strftime(ts + length, 40 - length, "%z", &ptm);
tstrncpy(buf, ts, bufLen); tstrncpy(buf, ts, bufLen);
} }

View File

@ -63,7 +63,6 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
assert(win->skey <= win->ekey);
SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
masterscan, tableGroupId, pTaskInfo, true, pAggSup); masterscan, tableGroupId, pTaskInfo, true, pAggSup);

View File

@ -194,8 +194,9 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
void static addTimezoneParam(SNodeList* pList) { void static addTimezoneParam(SNodeList* pList) {
char buf[6] = {0}; char buf[6] = {0};
time_t t = taosTime(NULL); time_t t = taosTime(NULL);
struct tm* tmInfo = taosLocalTime(&t, NULL); struct tm tmInfo;
strftime(buf, sizeof(buf), "%z", tmInfo); taosLocalTime(&t, &tmInfo);
strftime(buf, sizeof(buf), "%z", &tmInfo);
int32_t len = (int32_t)strlen(buf); int32_t len = (int32_t)strlen(buf);
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);

View File

@ -1062,8 +1062,9 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
memmove(fraction, fraction + TSDB_TIME_PRECISION_SEC_DIGITS, TSDB_TIME_PRECISION_SEC_DIGITS); memmove(fraction, fraction + TSDB_TIME_PRECISION_SEC_DIGITS, TSDB_TIME_PRECISION_SEC_DIGITS);
} }
struct tm *tmInfo = taosLocalTime((const time_t *)&timeVal, NULL); struct tm tmInfo;
strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", tmInfo); taosLocalTime((const time_t *)&timeVal, &tmInfo);
strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", &tmInfo);
int32_t len = (int32_t)strlen(buf); int32_t len = (int32_t)strlen(buf);
//add timezone string //add timezone string

View File

@ -357,14 +357,88 @@ FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) {
time_t taosTime(time_t *t) { return time(t); } time_t taosTime(time_t *t) { return time(t); }
time_t taosMktime(struct tm *timep) { return mktime(timep); } time_t taosMktime(struct tm *timep) {
#ifdef WINDOWS
struct tm tm1 = {0};
LARGE_INTEGER t;
FILETIME f;
SYSTEMTIME s;
FILETIME ff;
SYSTEMTIME ss;
LARGE_INTEGER offset;
time_t tt = 0;
localtime_s(&tm1, &tt);
ss.wYear = tm1.tm_year + 1900;
ss.wMonth = tm1.tm_mon + 1;
ss.wDay = tm1.tm_wday;
ss.wHour = tm1.tm_hour;
ss.wMinute = tm1.tm_min;
ss.wSecond = tm1.tm_sec;
ss.wMilliseconds = 0;
SystemTimeToFileTime(&ss, &ff);
offset.QuadPart = ff.dwHighDateTime;
offset.QuadPart <<= 32;
offset.QuadPart |= ff.dwLowDateTime;
s.wYear = timep->tm_year + 1900;
s.wMonth = timep->tm_mon + 1;
s.wDay = timep->tm_wday;
s.wHour = timep->tm_hour;
s.wMinute = timep->tm_min;
s.wSecond = timep->tm_sec;
s.wMilliseconds = 0;
SystemTimeToFileTime(&s, &f);
t.QuadPart = f.dwHighDateTime;
t.QuadPart <<= 32;
t.QuadPart |= f.dwLowDateTime;
t.QuadPart -= offset.QuadPart;
return (time_t)(t.QuadPart / 10000000);
#else
return mktime(timep);
#endif
}
struct tm *taosLocalTime(const time_t *timep, struct tm *result) { struct tm *taosLocalTime(const time_t *timep, struct tm *result) {
if (result == NULL) { if (result == NULL) {
return localtime(timep); return localtime(timep);
} }
#ifdef WINDOWS #ifdef WINDOWS
localtime_s(result, timep); if (*timep < 0) {
SYSTEMTIME ss,s;
FILETIME ff,f;
LARGE_INTEGER offset;
struct tm tm1;
time_t tt = 0;
localtime_s(&tm1, &tt);
ss.wYear = tm1.tm_year + 1900;
ss.wMonth = tm1.tm_mon + 1;
ss.wDay = tm1.tm_mday;
ss.wHour = tm1.tm_hour;
ss.wMinute = tm1.tm_min;
ss.wSecond = tm1.tm_sec;
ss.wMilliseconds = 0;
SystemTimeToFileTime(&ss, &ff);
offset.QuadPart = ff.dwHighDateTime;
offset.QuadPart <<= 32;
offset.QuadPart |= ff.dwLowDateTime;
offset.QuadPart += *timep * 10000000;
f.dwLowDateTime = offset.QuadPart & 0xffffffff;
f.dwHighDateTime = (offset.QuadPart >> 32) & 0xffffffff;
FileTimeToSystemTime(&f, &s);
result->tm_sec = s.wSecond;
result->tm_min = s.wMinute;
result->tm_hour = s.wHour;
result->tm_mday = s.wDay;
result->tm_mon = s.wMonth-1;
result->tm_year = s.wYear-1900;
result->tm_wday = s.wDayOfWeek;
result->tm_yday = 0;
result->tm_isdst = 0;
} else {
localtime_s(result, timep);
}
#else #else
localtime_r(timep, result); localtime_r(timep, result);
#endif #endif

View File

@ -5,103 +5,250 @@ import time
import socket import socket
import os import os
import threading import threading
from enum import Enum
from util.log import * from util.log import *
from util.sql import * from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import * from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq") sys.path.append("./7-tmq")
from tmqCommon import * from tmqCommon import *
class TDTestCase: class TDTestCase:
def __init__(self):
self.snapshot = 0
self.vgroups = 4
self.ctbNum = 1
self.rowsPerTbl = 10000
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor()) tdSql.init(conn.cursor(), False)
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def tmqCase1(self): def prepareTestEnv(self):
tdLog.printNoPrefix("======== test case 1: ") tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'db2', paraDict = {'dbName': 'dbt',
'dropFlag': 1, 'dropFlag': 1,
'event': '', 'event': '',
'vgroups': 1, 'vgroups': 4,
'stbName': 'stb', 'stbName': 'stb',
'colPrefix': 'c', 'colPrefix': 'c',
'tagPrefix': 't', 'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1},{'type': 'TIMESTAMP', 'count':1}], 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb', 'ctbPrefix': 'ctb',
'ctbStartIdx': 0, 'ctbStartIdx': 0,
'ctbNum': 10, 'ctbNum': 1,
'rowsPerTbl': 1000, 'rowsPerTbl': 100000,
'batchNum': 10, 'batchNum': 1200,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10, 'pollDelay': 3,
'showMsg': 1, 'showMsg': 1,
'showRow': 1} 'showRow': 1,
'snapshot': 0}
topicNameList = ['topic1'] paraDict['vgroups'] = self.vgroups
expectRowsList = [] paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=1,replica=1) tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb") tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema']) tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb") tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum'], ctbStartIdx=paraDict['ctbStartIdx']) tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data") tdLog.info("insert data")
tmqCom.asyncInsertData(paraDict) tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
tdLog.info("create topics from stb with filter") startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# queryString = "select ts, sin(c1), pow(c2,3) from %s.%s where t2 == 'beijing' or t2 == 'changsha'" %(paraDict['dbName'], paraDict['stbName']) # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
queryString = "select * from %s.%s where t2 == 'beijing' or t2 == 'changsha'" %(paraDict['dbName'], paraDict['stbName']) # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
sqlString = "create topic %s as %s" %(topicNameList[0], queryString) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 100000,
'batchNum': 3000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
# update to half tables
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
# queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
# start tmq consume processor # paraDict['ctbNum'] = self.ctbNum
tdLog.info("insert consume info to consume processor") paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 0 consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
topicList = topicNameList[0] topicList = topicFromStb1
ifcheckdata = 0 ifcheckdata = 1
ifManualCommit = 1 ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:2000, auto.offset.reset:earliest' keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# tmqCom.getStartCommitNotifyFromTmqsim() tdLog.info("insert process end, and start to check consume result")
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("create some new ctb")
paraDict['ctbStartIdx'] = paraDict['ctbStartIdx'] + paraDict['ctbNum']
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum'], ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data into new ctb")
pThread = tmqCom.asyncInsertData(paraDict)
pThread.join()
tdLog.info("wait insert end")
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
tdLog.info("wait the consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
if expectRowsList[0] != resultList[0]: for i in range(expectRows):
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) totalConsumeRows += resultList[i]
tdLog.exit("0 tmq consume rows error!")
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.info("run select sql from db")
tdSql.query(queryString)
expectrowcnt = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!")
tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 5000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName']))
# update to half tables
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2)
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.initConsumerTable()
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 1
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
topicList = topicFromStb1
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdSql.query(queryString)
totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
self.prepareTestEnv()
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.tmqCase1() self.tmqCase1()
# self.tmqCase2()
self.prepareTestEnv()
tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1
self.tmqCase1()
# self.tmqCase2()
def stop(self): def stop(self):
tdSql.close() tdSql.close()

View File

@ -596,7 +596,8 @@ void printParaIntoFile() {
g_fp = pFile; g_fp = pFile;
time_t tTime = taosGetTimestampSec(); time_t tTime = taosGetTimestampSec();
struct tm tm = *taosLocalTime(&tTime, NULL); struct tm tm;
taosLocalTime(&tTime, &tm);
taosFprintfFile(pFile, "###################################################################\n"); taosFprintfFile(pFile, "###################################################################\n");
taosFprintfFile(pFile, "# configDir: %s\n", configDir); taosFprintfFile(pFile, "# configDir: %s\n", configDir);

View File

@ -171,7 +171,8 @@ static void printHelp() {
char* getCurrentTimeString(char* timeString) { char* getCurrentTimeString(char* timeString) {
time_t tTime = taosGetTimestampSec(); time_t tTime = taosGetTimestampSec();
struct tm tm = *taosLocalTime(&tTime, NULL); struct tm tm;
taosLocalTime(&tTime, &tm);
sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour,
tm.tm_min, tm.tm_sec); tm.tm_min, tm.tm_sec);
@ -420,18 +421,6 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
ms = val % 1000; ms = val % 1000;
} }
/*
comment out as it make testcases like select_with_tags.sim fail.
but in windows, this may cause the call to localtime crash if tt < 0,
need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS
if (tt < 0) tt = 0;
#endif
if (tt <= 0 && ms < 0) { if (tt <= 0 && ms < 0) {
tt--; tt--;
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_NANO) {
@ -443,8 +432,9 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
} }
} }
struct tm* ptm = taosLocalTime(&tt, NULL); struct tm ptm;
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); taosLocalTime(&tt, &ptm);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", ms); sprintf(buf + pos, ".%09d", ms);

View File

@ -635,7 +635,7 @@ bool simCreateTaosdConnect(SScript *script, char *rest) {
bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) { bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
char timeStr[30] = {0}; char timeStr[30] = {0};
time_t tt; time_t tt;
struct tm *tp; struct tm tp;
SCmdLine *line = &script->lines[script->linePos]; SCmdLine *line = &script->lines[script->linePos];
int32_t ret = -1; int32_t ret = -1;
@ -768,20 +768,9 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
} else { } else {
tt = (*(int64_t *)row[i]) / 1000000000; tt = (*(int64_t *)row[i]) / 1000000000;
} }
/* comment out as it make testcases like select_with_tags.sim fail.
but in windows, this may cause the call to localtime crash if tt < 0,
need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS taosLocalTime(&tt, &tp);
if (tt < 0) tt = 0; strftime(timeStr, 64, "%y-%m-%d %H:%M:%S", &tp);
#endif
tp = taosLocalTime(&tt, NULL);
strftime(timeStr, 64, "%y-%m-%d %H:%M:%S", tp);
if (precision == TSDB_TIME_PRECISION_MILLI) { if (precision == TSDB_TIME_PRECISION_MILLI) {
sprintf(value, "%s.%03d", timeStr, (int32_t)(*((int64_t *)row[i]) % 1000)); sprintf(value, "%s.%03d", timeStr, (int32_t)(*((int64_t *)row[i]) % 1000));
} else if (precision == TSDB_TIME_PRECISION_MICRO) { } else if (precision == TSDB_TIME_PRECISION_MICRO) {

View File

@ -231,18 +231,6 @@ char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
ms = val % 1000; ms = val % 1000;
} }
/*
comment out as it make testcases like select_with_tags.sim fail.
but in windows, this may cause the call to localtime crash if tt < 0,
need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS
if (tt < 0) tt = 0;
#endif
if (tt <= 0 && ms < 0) { if (tt <= 0 && ms < 0) {
tt--; tt--;
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_NANO) {
@ -254,8 +242,9 @@ char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
} }
} }
struct tm *ptm = taosLocalTime(&tt, NULL); struct tm ptm = {0};
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); taosLocalTime(&tt, &ptm);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", ms); sprintf(buf + pos, ".%09d", ms);