Merge branch '3.0' into fix/TD-32302-3.0

This commit is contained in:
kailixu 2024-09-25 09:23:20 +08:00
commit 130d808d1b
11 changed files with 104 additions and 51 deletions

View File

@ -437,41 +437,36 @@ int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSources, SSDataBlock* pBlock,
int32_t* sourceId, SArray* pPageIdList) {
int32_t code = 0;
int32_t lino = 0;
SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
if (pSource == NULL) {
taosArrayDestroy(pPageIdList);
return terrno;
}
QUERY_CHECK_NULL(pSource, code, lino, _err, terrno);
pSource->src.pBlock = pBlock;
pSource->pageIdList = pPageIdList;
void* p = taosArrayPush(pAllSources, &pSource);
if (p == NULL) {
taosArrayDestroy(pPageIdList);
return terrno;
}
SSortSource** p = taosArrayPush(pAllSources, &pSource);
QUERY_CHECK_NULL(p, code, lino, _err, terrno);
pSource = NULL;
(*sourceId) += 1;
int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock);
int32_t rowSize = blockDataGetSerialRowSize((*p)->src.pBlock);
// The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
int32_t numOfRows =
(getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize;
if (numOfRows <= 0) {
qError("sort failed at: %s:%d", __func__, __LINE__);
taosArrayDestroy(pPageIdList);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
QUERY_CHECK_CONDITION((numOfRows > 0), code, lino, _err, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
int32_t code = blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
if (code != 0) {
qError("sort failed at: %s:%d", __func__, __LINE__);
taosArrayDestroy(pPageIdList);
}
code = blockDataEnsureCapacity((*p)->src.pBlock, numOfRows);
QUERY_CHECK_CODE(code, lino, _err);
return code;
_err:
if (pSource) taosMemoryFree(pSource);
qError("sort failed at %s:%d since %s", __func__, lino, tstrerror(code));
return code;
}
static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
@ -554,7 +549,12 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
return code;
}
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
if (code) {
blockDataDestroy(pBlock);
taosArrayDestroy(pPageIdList);
}
return code;
}
static void setCurrentSourceDone(SSortSource* pSource, SSortHandle* pHandle) {
@ -1023,6 +1023,9 @@ static int32_t doSortForEachGroup(SSortHandle* pHandle, int32_t sortTimes, int32
QUERY_CHECK_CODE(code, lino, _err);
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
if (code != TSDB_CODE_SUCCESS) {
blockDataDestroy(pBlock);
}
QUERY_CHECK_CODE(code, lino, _err);
}
@ -2144,6 +2147,10 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
if (code) goto _error;
code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
if (code != TSDB_CODE_SUCCESS) {
blockDataDestroy(pMemSrcBlk);
goto _error;
}
cleanupMergeSup(&sup);
tMergeTreeDestroy(&pTree);
@ -2306,9 +2313,15 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
}
code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
blockDataDestroy(tBlk);
}
QUERY_CHECK_CODE(code, lino, _err);
void* px = taosArrayPush(aBlkSort, &tBlk);
if (px == NULL) {
blockDataDestroy(tBlk);
}
QUERY_CHECK_NULL(px, code, lino, _err, terrno);
}
}

View File

@ -1634,7 +1634,7 @@ static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd) {
int64_t refId = (int64_t)(pMsg->msg.info.handle);
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) {
tDebug("id %" PRId64 " already released", refId);
tDebug("refId %" PRId64 " already released", refId);
destroyCmsg(pMsg);
return;
}
@ -1646,7 +1646,7 @@ static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd) {
if (conn == NULL || conn->refId != refId) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception);
}
tDebug("do free conn %p by id %" PRId64 "", conn, refId);
tDebug("do free conn %p by refId %" PRId64 "", conn, refId);
int32_t size = transQueueSize(&conn->cliMsgs);
if (size == 0) {
@ -3316,12 +3316,13 @@ int32_t transAllocHandle(int64_t* refId) {
QUEUE_INIT(&exh->q);
taosInitRWLatch(&exh->latch);
tDebug("pre alloc refId %" PRId64 "", exh->refId);
tDebug("pre alloc refId %" PRId64 ", alloc exhandle %p", exh->refId, exh);
*refId = exh->refId;
return 0;
}
int32_t transFreeConnById(void* shandle, int64_t transpointId) {
int32_t code = 0;
SCliMsg* pCli = NULL;
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) {
return TSDB_CODE_RPC_MODULE_QUIT;
@ -3336,7 +3337,7 @@ int32_t transFreeConnById(void* shandle, int64_t transpointId) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception);
}
SCliMsg* pCli = taosMemoryCalloc(1, sizeof(SCliMsg));
pCli = taosMemoryCalloc(1, sizeof(SCliMsg));
if (pCli == NULL) {
TAOS_CHECK_GOTO(terrno, NULL, _exception);
}
@ -3349,11 +3350,19 @@ int32_t transFreeConnById(void* shandle, int64_t transpointId) {
code = transAsyncSend(pThrd->asyncPool, &pCli->q);
if (code != 0) {
taosMemoryFree(pCli);
taosMemoryFreeClear(pCli);
TAOS_CHECK_GOTO(code, NULL, _exception);
}
_exception:
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
if (code != 0) {
if (transpointId != 0) {
(void)transReleaseExHandle(transGetRefMgt(), transpointId);
(void)transRemoveExHandle(transGetRefMgt(), transpointId);
}
taosMemoryFree(pCli);
}
return code;
}

View File

@ -768,6 +768,7 @@ void transDestroyExHandle(void* handle) {
if (!QUEUE_IS_EMPTY(&eh->q)) {
tDebug("handle %p mem leak", handle);
}
tDebug("free exhandle %p", handle);
taosMemoryFree(handle);
}

View File

@ -1,5 +1,6 @@
import csv
import os
import platform
class TDCsv:
def __init__(self):
@ -25,7 +26,11 @@ class TDCsv:
@property
def file(self):
if self.file_name and self.file_path:
return os.path.join(self.file_path, self.file_name)
print(f"self.file_path {self.file_path}, self.file_name {self.file_name}")
csv_file = os.path.join(self.file_path, self.file_name)
if platform.system().lower() == 'windows':
csv_file = csv_file.replace("\\", "/")
return csv_file
return None

View File

@ -24,26 +24,48 @@ from util.log import *
from util.constant import *
import ctypes
import random
# from datetime import timezone
import datetime
import time
from tzlocal import get_localzone
def _parse_ns_timestamp(timestr):
dt_obj = datetime.datetime.strptime(timestr[:len(timestr)-3], "%Y-%m-%d %H:%M:%S.%f")
tz = int(int((dt_obj-datetime.datetime.fromtimestamp(0,dt_obj.tzinfo)).total_seconds())*1e9) + int(dt_obj.microsecond * 1000) + int(timestr[-3:])
return tz
def _parse_datetime(timestr):
# defined timestr formats
formats = [
'%Y-%m-%d %H:%M:%S.%f%z', # 包含微秒和时区偏移
'%Y-%m-%d %H:%M:%S%z', # 不包含微秒但包含时区偏移
'%Y-%m-%d %H:%M:%S.%f', # 包含微秒
'%Y-%m-%d %H:%M:%S' # 不包含微秒
]
for fmt in formats:
try:
return datetime.datetime.strptime(timestr, '%Y-%m-%d %H:%M:%S.%f')
# try to parse the string with the current format
dt = datetime.datetime.strptime(timestr, fmt)
# 如果字符串包含时区信息,则返回 aware 对象
# if sting contains timezone info, return aware object
if dt.tzinfo is not None:
return dt
else:
# if sting does not contain timezone info, assume it is in local timezone
# get local timezone
local_timezone = get_localzone()
# print("Timezone:", local_timezone)
return dt.replace(tzinfo=local_timezone)
except ValueError:
pass
try:
return datetime.datetime.strptime(timestr, '%Y-%m-%d %H:%M:%S')
except ValueError:
pass
continue # if the current format does not match, try the next format
# 如果所有格式都不匹配,返回 None
# if none of the formats match, return
raise ValueError(f"input format does not match. correct formats include: '{', '.join(formats)}'")
class TDSql:
def __init__(self):
self.queryRows = 0
self.queryCols = 0
@ -408,6 +430,7 @@ class TDSql:
if self.queryResult[row][col] != data:
if self.cursor.istype(col, "TIMESTAMP"):
# tdLog.debug(f"self.queryResult[row][col]:{self.queryResult[row][col]}, data:{data},len(data):{len(data)}, isinstance(data,str) :{isinstance(data,str)}")
# suppose user want to check nanosecond timestamp if a longer data passed``
if isinstance(data,str) :
if (len(data) >= 28):
@ -419,8 +442,9 @@ class TDSql:
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
else:
# tdLog.info(f"datetime.timezone.utc:{datetime.timezone.utc},data:{data},_parse_datetime(data).astimezone(datetime.timezone.utc):{_parse_datetime(data).astimezone(datetime.timezone.utc)}")
if self.queryResult[row][col].astimezone(datetime.timezone.utc) == _parse_datetime(data).astimezone(datetime.timezone.utc):
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}")
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col].astimezone(datetime.timezone.utc)} == expect:{_parse_datetime(data).astimezone(datetime.timezone.utc)}")
if(show):
tdLog.info("check successfully")
else:

View File

@ -133,7 +133,7 @@ class TDTestCase:
def run(self):
self.topic_name_check()
self.db_name_check()
if platform.system().lower() == 'windows':
if platform.system().lower() != 'windows':
self.stream_name_check()
self.table_name_check()
self.view_name_check()

View File

@ -203,7 +203,7 @@ class TDTestCase:
assert str(v) == str(value)
else:
for v in values:
tdLog.debug("Set {} to {}".format(name, v))
tdLog.debug("Set client {} to {}".format(name, v))
tdSql.error(f'alter local "{name} {v}";')
def svr_check(self, item, except_values=False):

View File

@ -4,6 +4,7 @@ from util.log import *
from util.sql import *
from util.cases import *
from util.csv import *
import platform
import os
import taos
import json
@ -56,7 +57,6 @@ class TDTestCase:
tdSql.init(conn.cursor(), True)
self.testcasePath = os.path.split(__file__)[0]
self.testcasePath = self.testcasePath.replace('\\', '//')
self.testcaseFilename = os.path.split(__file__)[-1]
os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename))
# tdSql.execute(f"insert into db4096.ctb00 file '{self.testcasePath}//tableColumn4096csvLength64k.csv'")

View File

@ -300,7 +300,7 @@ class TDTestCase:
self.drop_table_with_check()
self.drop_table_with_check_tsma()
self.drop_topic_check()
if platform.system().lower() == 'windows':
if platform.system().lower() != 'windows':
self.drop_stream_check()
pass
def stop(self):

View File

@ -1,6 +1,5 @@
import queue
import random
from fabric2.runners import threading
from pandas._libs import interval
import taos
import sys
@ -9,6 +8,7 @@ from util.common import TDCom
from util.log import *
from util.sql import *
from util.cases import *
import threading

View File

@ -140,14 +140,14 @@ class TDTestCase:
tdsql2 = tdCom.newTdSqlWithTimezone(timezone="UTC")
tdsql2.query(f"select * from {dbname}.tzt")
tdsql2.checkRows(1)
tdsql2.checkData(0, 0, "2018-09-17 01:00:00")
# checkData:The expected date and time is the local time zone of the machine where the test case is executed.
tdsql2.checkData(0, 0, "2018-09-17 09:00:00")
tdsql2.execute(f'insert into {dbname}.tzt values({self.ts + 1000}, 2)')
tdsql2.query(f"select * from {dbname}.tzt order by ts")
tdsql2.checkRows(2)
tdsql2.checkData(0, 0, "2018-09-17 01:00:00")
tdsql2.checkData(1, 0, "2018-09-17 01:00:01")
tdsql2.checkData(0, 0, "2018-09-17 09:00:00")
tdsql2.checkData(1, 0, "2018-09-17 09:00:01")
tdsql2 = tdCom.newTdSqlWithTimezone(timezone="Asia/Shanghai")
tdsql2.query(f"select * from {dbname}.tzt order by ts")
@ -160,7 +160,7 @@ class TDTestCase:
tdSql.prepare()
self.timeZoneTest()
self.inAndNotinTest()
# self.inAndNotinTest()
def stop(self):
@ -168,4 +168,5 @@ class TDTestCase:
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())