diff --git a/cmake/cmake.platform b/cmake/cmake.platform index 849d31f93e..5c6ffd4b10 100644 --- a/cmake/cmake.platform +++ b/cmake/cmake.platform @@ -76,15 +76,19 @@ IF ("${CPUTYPE}" STREQUAL "") IF (CMAKE_SYSTEM_PROCESSOR MATCHES "(amd64)|(AMD64)") MESSAGE(STATUS "The current platform is amd64") SET(PLATFORM_ARCH_STR "amd64") + SET(TD_INTEL_64 TRUE) ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "(x86)|(X86)") MESSAGE(STATUS "The current platform is x86") SET(PLATFORM_ARCH_STR "i386") + SET(TD_INTEL_32 TRUE) ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "armv7l") MESSAGE(STATUS "The current platform is aarch32") SET(PLATFORM_ARCH_STR "arm") + SET(TD_ARM_32 TRUE) ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64") MESSAGE(STATUS "The current platform is aarch64") SET(PLATFORM_ARCH_STR "arm64") + SET(TD_ARM_64 TRUE) ENDIF () ELSE () # if generate ARM version: @@ -92,18 +96,23 @@ ELSE () IF (${CPUTYPE} MATCHES "aarch32") SET(PLATFORM_ARCH_STR "arm") MESSAGE(STATUS "input cpuType: aarch32") + SET(TD_ARM_32 TRUE) ELSEIF (${CPUTYPE} MATCHES "aarch64") SET(PLATFORM_ARCH_STR "arm64") MESSAGE(STATUS "input cpuType: aarch64") + SET(TD_ARM_64 TRUE) ELSEIF (${CPUTYPE} MATCHES "mips64") SET(PLATFORM_ARCH_STR "mips") MESSAGE(STATUS "input cpuType: mips64") + SET(TD_MIPS_64 TRUE) ELSEIF (${CPUTYPE} MATCHES "x64") SET(PLATFORM_ARCH_STR "amd64") MESSAGE(STATUS "input cpuType: x64") + SET(TD_INTEL_64 TRUE) ELSEIF (${CPUTYPE} MATCHES "x86") SET(PLATFORM_ARCH_STR "i386") MESSAGE(STATUS "input cpuType: x86") + SET(TD_INTEL_32 TRUE) ELSE () MESSAGE(STATUS "input cpuType unknown " ${CPUTYPE}) ENDIF () diff --git a/include/common/ttime.h b/include/common/ttime.h index de55b016cd..2f4129f979 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -63,12 +63,13 @@ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) { : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000000 : 1000000000; time_t t = taosTime(NULL); - struct tm* tm = taosLocalTime(&t, NULL); - tm->tm_hour = 0; - tm->tm_min = 0; - tm->tm_sec = 0; + struct tm tm; + taosLocalTime(&t, &tm); + tm.tm_hour = 0; + tm.tm_min = 0; + tm.tm_sec = 0; - return (int64_t)taosMktime(tm) * factor; + return (int64_t)taosMktime(&tm) * factor; } int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index eefac77e52..c674728fe6 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1661,9 +1661,6 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { } */ -#ifdef WINDOWS - if (tt < 0) tt = 0; -#endif if (tt <= 0 && ms < 0) { tt--; if (precision == TSDB_TIME_PRECISION_NANO) { @@ -1674,9 +1671,9 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { ms += 1000; } } - - struct tm* ptm = taosLocalTime(&tt, NULL); - size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); + struct tm ptm = {0}; + taosLocalTime(&tt, &ptm); + size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm); if (precision == TSDB_TIME_PRECISION_NANO) { sprintf(buf + pos, ".%09d", ms); diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index d728bbe49e..944ee6a731 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -902,7 +902,7 @@ const char* fmtts(int64_t ts) { void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision) { char ts[40] = {0}; - struct tm* ptm; + struct tm ptm; int32_t fractionLen; char* format = NULL; @@ -939,10 +939,10 @@ void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision) assert(false); } - ptm = taosLocalTime(", NULL); - int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", ptm); + taosLocalTime(", &ptm); + int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", &ptm); length += snprintf(ts + length, fractionLen, format, mod); - length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm); + length += (int32_t)strftime(ts + length, 40 - length, "%z", &ptm); tstrncpy(buf, ts, bufLen); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0feb4430bc..026222f83a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -63,7 +63,6 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) { - assert(win->skey <= win->ekey); SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId, pTaskInfo, true, pAggSup); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 489a11da39..a1d3fa2968 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -194,8 +194,9 @@ static bool validateTimezoneFormat(const SValueNode* pVal) { void static addTimezoneParam(SNodeList* pList) { char buf[6] = {0}; time_t t = taosTime(NULL); - struct tm* tmInfo = taosLocalTime(&t, NULL); - strftime(buf, sizeof(buf), "%z", tmInfo); + struct tm tmInfo; + taosLocalTime(&t, &tmInfo); + strftime(buf, sizeof(buf), "%z", &tmInfo); int32_t len = (int32_t)strlen(buf); SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 55debe51a8..b754c52bbd 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1062,8 +1062,9 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam * memmove(fraction, fraction + TSDB_TIME_PRECISION_SEC_DIGITS, TSDB_TIME_PRECISION_SEC_DIGITS); } - struct tm *tmInfo = taosLocalTime((const time_t *)&timeVal, NULL); - strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", tmInfo); + struct tm tmInfo; + taosLocalTime((const time_t *)&timeVal, &tmInfo); + strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", &tmInfo); int32_t len = (int32_t)strlen(buf); //add timezone string diff --git a/source/os/src/osTime.c b/source/os/src/osTime.c index 7e6e508817..0cb4228e42 100644 --- a/source/os/src/osTime.c +++ b/source/os/src/osTime.c @@ -357,14 +357,88 @@ FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) { time_t taosTime(time_t *t) { return time(t); } -time_t taosMktime(struct tm *timep) { return mktime(timep); } +time_t taosMktime(struct tm *timep) { +#ifdef WINDOWS + struct tm tm1 = {0}; + LARGE_INTEGER t; + FILETIME f; + SYSTEMTIME s; + FILETIME ff; + SYSTEMTIME ss; + LARGE_INTEGER offset; + + time_t tt = 0; + localtime_s(&tm1, &tt); + ss.wYear = tm1.tm_year + 1900; + ss.wMonth = tm1.tm_mon + 1; + ss.wDay = tm1.tm_wday; + ss.wHour = tm1.tm_hour; + ss.wMinute = tm1.tm_min; + ss.wSecond = tm1.tm_sec; + ss.wMilliseconds = 0; + SystemTimeToFileTime(&ss, &ff); + offset.QuadPart = ff.dwHighDateTime; + offset.QuadPart <<= 32; + offset.QuadPart |= ff.dwLowDateTime; + + s.wYear = timep->tm_year + 1900; + s.wMonth = timep->tm_mon + 1; + s.wDay = timep->tm_wday; + s.wHour = timep->tm_hour; + s.wMinute = timep->tm_min; + s.wSecond = timep->tm_sec; + s.wMilliseconds = 0; + SystemTimeToFileTime(&s, &f); + t.QuadPart = f.dwHighDateTime; + t.QuadPart <<= 32; + t.QuadPart |= f.dwLowDateTime; + + t.QuadPart -= offset.QuadPart; + return (time_t)(t.QuadPart / 10000000); +#else + return mktime(timep); +#endif + } struct tm *taosLocalTime(const time_t *timep, struct tm *result) { if (result == NULL) { return localtime(timep); } #ifdef WINDOWS - localtime_s(result, timep); + if (*timep < 0) { + SYSTEMTIME ss,s; + FILETIME ff,f; + LARGE_INTEGER offset; + struct tm tm1; + time_t tt = 0; + localtime_s(&tm1, &tt); + ss.wYear = tm1.tm_year + 1900; + ss.wMonth = tm1.tm_mon + 1; + ss.wDay = tm1.tm_mday; + ss.wHour = tm1.tm_hour; + ss.wMinute = tm1.tm_min; + ss.wSecond = tm1.tm_sec; + ss.wMilliseconds = 0; + SystemTimeToFileTime(&ss, &ff); + offset.QuadPart = ff.dwHighDateTime; + offset.QuadPart <<= 32; + offset.QuadPart |= ff.dwLowDateTime; + offset.QuadPart += *timep * 10000000; + f.dwLowDateTime = offset.QuadPart & 0xffffffff; + f.dwHighDateTime = (offset.QuadPart >> 32) & 0xffffffff; + FileTimeToSystemTime(&f, &s); + result->tm_sec = s.wSecond; + result->tm_min = s.wMinute; + result->tm_hour = s.wHour; + result->tm_mday = s.wDay; + result->tm_mon = s.wMonth-1; + result->tm_year = s.wYear-1900; + result->tm_wday = s.wDayOfWeek; + result->tm_yday = 0; + result->tm_isdst = 0; + } else { + localtime_s(result, timep); + } #else localtime_r(timep, result); #endif diff --git a/tests/system-test/7-tmq/stbTagFilter.py b/tests/system-test/7-tmq/stbTagFilter.py index 65609629bc..82596a32ba 100644 --- a/tests/system-test/7-tmq/stbTagFilter.py +++ b/tests/system-test/7-tmq/stbTagFilter.py @@ -5,103 +5,250 @@ import time import socket import os import threading +from enum import Enum from util.log import * from util.sql import * from util.cases import * from util.dnodes import * -from util.common import * sys.path.append("./7-tmq") from tmqCommon import * class TDTestCase: + def __init__(self): + self.snapshot = 0 + self.vgroups = 4 + self.ctbNum = 1 + self.rowsPerTbl = 10000 + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor()) - #tdSql.init(conn.cursor(), logSql) # output sql.txt file + tdSql.init(conn.cursor(), False) - def tmqCase1(self): - tdLog.printNoPrefix("======== test case 1: ") - paraDict = {'dbName': 'db2', + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', - 'vgroups': 1, + 'vgroups': 4, 'stbName': 'stb', 'colPrefix': 'c', 'tagPrefix': 't', - 'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1},{'type': 'TIMESTAMP', 'count':1}], - 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 10, - 'rowsPerTbl': 1000, - 'batchNum': 10, + 'ctbNum': 1, + 'rowsPerTbl': 100000, + 'batchNum': 1200, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 10, + 'pollDelay': 3, 'showMsg': 1, - 'showRow': 1} + 'showRow': 1, + 'snapshot': 0} - topicNameList = ['topic1'] - expectRowsList = [] + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + tmqCom.initConsumerTable() - tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=1,replica=1) + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") - tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema']) + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) tdLog.info("create ctb") - tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum'], ctbStartIdx=paraDict['ctbStartIdx']) + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) tdLog.info("insert data") - tmqCom.asyncInsertData(paraDict) - - tdLog.info("create topics from stb with filter") - # queryString = "select ts, sin(c1), pow(c2,3) from %s.%s where t2 == 'beijing' or t2 == 'changsha'" %(paraDict['dbName'], paraDict['stbName']) - queryString = "select * from %s.%s where t2 == 'beijing' or t2 == 'changsha'" %(paraDict['dbName'], paraDict['stbName']) - sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", + # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + # tdLog.info("restart taosd to ensure that the data falls into the disk") + # tdSql.query("flush database %s"%(paraDict['dbName'])) + return + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 1, + 'rowsPerTbl': 100000, + 'batchNum': 3000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 5, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + paraDict['snapshot'] = self.snapshot + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + # update to half tables + paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) + # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", + # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + # queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) + queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - - # start tmq consume processor - tdLog.info("insert consume info to consume processor") - consumerId = 0 - expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 - topicList = topicNameList[0] - ifcheckdata = 0 + tdSql.execute(sqlString) + + # paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + consumerId = 0 + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2) + topicList = topicFromStb1 + ifcheckdata = 1 ifManualCommit = 1 - keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:2000, auto.offset.reset:earliest' + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("start consume processor") - tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - # tmqCom.getStartCommitNotifyFromTmqsim() - tmqCom.getStartConsumeNotifyFromTmqsim() - tdLog.info("create some new ctb") - paraDict['ctbStartIdx'] = paraDict['ctbStartIdx'] + paraDict['ctbNum'] - tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum'], ctbStartIdx=paraDict['ctbStartIdx']) - tdLog.info("insert data into new ctb") - pThread = tmqCom.asyncInsertData(paraDict) - - pThread.join() - tdLog.info("wait insert end") - tdSql.query(queryString) - expectRowsList.append(tdSql.getRows()) - - tdLog.info("wait the consume result") + tdLog.info("insert process end, and start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - - if expectRowsList[0] != resultList[0]: - tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) - tdLog.exit("0 tmq consume rows error!") - - time.sleep(10) - for i in range(len(topicNameList)): - tdSql.query("drop topic %s"%topicNameList[i]) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + tdLog.info("run select sql from db") + tdSql.query(queryString) + expectrowcnt = tdSql.getRows() + + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + if totalConsumeRows != expectrowcnt: + tdLog.exit("tmq consume rows error!") + + tmqCom.checkFileContent(consumerId, queryString) + + tdSql.query("drop topic %s"%topicFromStb1) tdLog.printNoPrefix("======== test case 1 end ...... ") + def tmqCase2(self): + tdLog.printNoPrefix("======== test case 2: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 1, + 'rowsPerTbl': 10000, + 'batchNum': 5000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 5, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['snapshot'] = self.snapshot + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdLog.info("restart taosd to ensure that the data falls into the disk") + tdSql.query("flush database %s"%(paraDict['dbName'])) + + # update to half tables + paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2) + paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) + tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tmqCom.initConsumerTable() + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + consumerId = 1 + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2) + topicList = topicFromStb1 + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + tdLog.info("insert process end, and start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() + + tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) + + if totalConsumeRows != expectrowcnt: + tdLog.exit("tmq consume rows error!") + + # tmqCom.checkFileContent(consumerId, queryString) + + tdSql.query("drop topic %s"%topicFromStb1) + + tdLog.printNoPrefix("======== test case 2 end ...... ") + def run(self): tdSql.prepare() + self.prepareTestEnv() + tdLog.printNoPrefix("=============================================") + tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.tmqCase1() + # self.tmqCase2() + + self.prepareTestEnv() + tdLog.printNoPrefix("====================================================================") + tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") + self.snapshot = 1 + self.tmqCase1() + # self.tmqCase2() + def stop(self): tdSql.close() diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c index 61c50fb0e8..784b45c92b 100644 --- a/tests/test/c/tmqDemo.c +++ b/tests/test/c/tmqDemo.c @@ -596,7 +596,8 @@ void printParaIntoFile() { g_fp = pFile; time_t tTime = taosGetTimestampSec(); - struct tm tm = *taosLocalTime(&tTime, NULL); + struct tm tm; + taosLocalTime(&tTime, &tm); taosFprintfFile(pFile, "###################################################################\n"); taosFprintfFile(pFile, "# configDir: %s\n", configDir); diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 5459e3f159..7030f6fea6 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -171,7 +171,8 @@ static void printHelp() { char* getCurrentTimeString(char* timeString) { time_t tTime = taosGetTimestampSec(); - struct tm tm = *taosLocalTime(&tTime, NULL); + struct tm tm; + taosLocalTime(&tTime, &tm); sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); @@ -420,18 +421,6 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) { ms = val % 1000; } - /* - comment out as it make testcases like select_with_tags.sim fail. - but in windows, this may cause the call to localtime crash if tt < 0, - need to find a better solution. - if (tt < 0) { - tt = 0; - } - */ - -#ifdef WINDOWS - if (tt < 0) tt = 0; -#endif if (tt <= 0 && ms < 0) { tt--; if (precision == TSDB_TIME_PRECISION_NANO) { @@ -443,8 +432,9 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) { } } - struct tm* ptm = taosLocalTime(&tt, NULL); - size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); + struct tm ptm; + taosLocalTime(&tt, &ptm); + size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm); if (precision == TSDB_TIME_PRECISION_NANO) { sprintf(buf + pos, ".%09d", ms); diff --git a/tests/tsim/src/simExe.c b/tests/tsim/src/simExe.c index aaad76bb53..b993a8dbf1 100644 --- a/tests/tsim/src/simExe.c +++ b/tests/tsim/src/simExe.c @@ -635,7 +635,7 @@ bool simCreateTaosdConnect(SScript *script, char *rest) { bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) { char timeStr[30] = {0}; time_t tt; - struct tm *tp; + struct tm tp; SCmdLine *line = &script->lines[script->linePos]; int32_t ret = -1; @@ -768,20 +768,9 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) { } else { tt = (*(int64_t *)row[i]) / 1000000000; } - /* comment out as it make testcases like select_with_tags.sim fail. - but in windows, this may cause the call to localtime crash if tt < 0, - need to find a better solution. - if (tt < 0) { - tt = 0; - } - */ -#ifdef WINDOWS - if (tt < 0) tt = 0; -#endif - - tp = taosLocalTime(&tt, NULL); - strftime(timeStr, 64, "%y-%m-%d %H:%M:%S", tp); + taosLocalTime(&tt, &tp); + strftime(timeStr, 64, "%y-%m-%d %H:%M:%S", &tp); if (precision == TSDB_TIME_PRECISION_MILLI) { sprintf(value, "%s.%03d", timeStr, (int32_t)(*((int64_t *)row[i]) % 1000)); } else if (precision == TSDB_TIME_PRECISION_MICRO) { diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 4cfa46bd3c..0982dbc019 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -231,18 +231,6 @@ char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) { ms = val % 1000; } - /* - comment out as it make testcases like select_with_tags.sim fail. - but in windows, this may cause the call to localtime crash if tt < 0, - need to find a better solution. - if (tt < 0) { - tt = 0; - } - */ - -#ifdef WINDOWS - if (tt < 0) tt = 0; -#endif if (tt <= 0 && ms < 0) { tt--; if (precision == TSDB_TIME_PRECISION_NANO) { @@ -254,8 +242,9 @@ char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) { } } - struct tm *ptm = taosLocalTime(&tt, NULL); - size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); + struct tm ptm = {0}; + taosLocalTime(&tt, &ptm); + size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm); if (precision == TSDB_TIME_PRECISION_NANO) { sprintf(buf + pos, ".%09d", ms);