Merge branch '3.0' of https://github.com/taosdata/TDengine into test/3.0/TS-5130

This commit is contained in:
zhipz 2024-07-08 09:55:20 +08:00
commit d7ca42f925
17 changed files with 59 additions and 112 deletions

View File

@ -134,7 +134,6 @@ extern uint16_t tsMonitorPort;
extern int32_t tsMonitorMaxLogs;
extern bool tsMonitorComp;
extern bool tsMonitorLogProtocol;
extern int32_t tsMonitorIntervalForBasic;
extern bool tsMonitorForceV2;
// audit

View File

@ -226,7 +226,6 @@ void monSetQmInfo(SMonQmInfo *pInfo);
void monSetSmInfo(SMonSmInfo *pInfo);
void monSetBmInfo(SMonBmInfo *pInfo);
void monGenAndSendReport();
void monGenAndSendReportBasic();
void monSendContent(char *pCont, const char* uri);
void tFreeSMonMmInfo(SMonMmInfo *pInfo);

View File

@ -111,7 +111,6 @@ uint16_t tsMonitorPort = 6043;
int32_t tsMonitorMaxLogs = 100;
bool tsMonitorComp = false;
bool tsMonitorLogProtocol = false;
int32_t tsMonitorIntervalForBasic = 30;
bool tsMonitorForceV2 = true;
// audit
@ -712,7 +711,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "monitorComp", tsMonitorComp, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "monitorLogProtocol", tsMonitorLogProtocol, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorIntervalForBasic", tsMonitorIntervalForBasic, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "monitorForceV2", tsMonitorForceV2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
@ -1165,7 +1163,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsMonitorComp = cfgGetItem(pCfg, "monitorComp")->bval;
tsQueryRspPolicy = cfgGetItem(pCfg, "queryRspPolicy")->i32;
tsMonitorLogProtocol = cfgGetItem(pCfg, "monitorLogProtocol")->bval;
tsMonitorIntervalForBasic = cfgGetItem(pCfg, "monitorIntervalForBasic")->i32;
tsMonitorForceV2 = cfgGetItem(pCfg, "monitorForceV2")->i32;
tsEnableAudit = cfgGetItem(pCfg, "audit")->bval;

View File

@ -43,7 +43,6 @@ typedef struct SDnodeMgmt {
GetMnodeLoadsFp getMnodeLoadsFp;
GetQnodeLoadsFp getQnodeLoadsFp;
int32_t statusSeq;
SendMonitorReportFp sendMonitorReportFpBasic;
} SDnodeMgmt;
// dmHandle.c

View File

@ -65,7 +65,6 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->processDropNodeFp = pInput->processDropNodeFp;
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
pMgmt->sendAuditRecordsFp = pInput->sendAuditRecordFp;
pMgmt->sendMonitorReportFpBasic = pInput->sendMonitorReportFpBasic;
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp;
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;

View File

@ -175,15 +175,6 @@ static void *dmMonitorThreadFp(void *param) {
taosMemoryTrim(0);
}
}
if(tsMonitorForceV2){
if (curTime < lastTimeForBasic) lastTimeForBasic = curTime;
float intervalForBasic = (curTime - lastTimeForBasic) / 1000.0f;
if (intervalForBasic >= tsMonitorIntervalForBasic) {
(*pMgmt->sendMonitorReportFpBasic)();
lastTimeForBasic = curTime;
}
}
}
return NULL;

View File

@ -287,7 +287,8 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
return -1;
}
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
EQItype itype = APPLY_QUEUE == qtype ? DEF_QITEM : RPC_QITEM;
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen);
if (pMsg == NULL) {
rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL;

View File

@ -128,7 +128,6 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
// dmMonitor.c
void dmSendMonitorReport();
void dmSendAuditRecords();
void dmSendMonitorReportBasic();
void dmGetVnodeLoads(SMonVloadInfo *pInfo);
void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo);
void dmGetMnodeLoads(SMonMloadInfo *pInfo);

View File

@ -394,7 +394,6 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
.processDropNodeFp = dmProcessDropNodeReq,
.sendMonitorReportFp = dmSendMonitorReport,
.sendAuditRecordFp = auditSendRecordsInBatch,
.sendMonitorReportFpBasic = dmSendMonitorReportBasic,
.getVnodeLoadsFp = dmGetVnodeLoads,
.getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
.getMnodeLoadsFp = dmGetMnodeLoads,

View File

@ -123,16 +123,6 @@ void dmSendMonitorReport() {
monGenAndSendReport();
}
void dmSendMonitorReportBasic() {
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort);
SDnode *pDnode = dmInstance();
dmGetDmMonitorInfoBasic(pDnode);
dmGetMmMonitorInfo(pDnode);
monGenAndSendReportBasic();
}
//Todo: put this in seperate file in the future
void dmSendAuditRecords() {
auditSendRecordsInBatch();

View File

@ -208,7 +208,9 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
}
pRpc->info.wrapper = pWrapper;
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg is not restricted by tsRpcQueueMemoryUsed
pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen);
if (pMsg == NULL) goto _OVER;
memcpy(pMsg, pRpc, sizeof(SRpcMsg));

View File

@ -155,7 +155,6 @@ typedef struct {
ProcessDropNodeFp processDropNodeFp;
SendMonitorReportFp sendMonitorReportFp;
SendAuditRecordsFp sendAuditRecordFp;
SendMonitorReportFp sendMonitorReportFpBasic;
GetVnodeLoadsFp getVnodeLoadsFp;
GetVnodeLoadsFp getVnodeLoadsLiteFp;
GetMnodeLoadsFp getMnodeLoadsFp;

View File

@ -568,6 +568,25 @@ void monSendReport(SMonInfo *pMonitor){
}
}
void monSendReportBasic(SMonInfo *pMonitor) {
char *pCont = tjsonToString(pMonitor->pJson);
if (tsMonitorLogProtocol) {
if (pCont != NULL) {
uInfoL("report cont basic:\n%s", pCont);
} else {
uInfo("report cont basic is null");
}
}
if (pCont != NULL) {
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
if (taosSendHttpReport(tsMonitor.cfg.server, tsMonFwBasicUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) !=
0) {
uError("failed to send monitor msg");
}
taosMemoryFree(pCont);
}
}
void monGenAndSendReport() {
SMonInfo *pMonitor = monCreateMonitorInfo();
if (pMonitor == NULL) return;
@ -595,38 +614,11 @@ void monGenAndSendReport() {
monGenVnodeRoleTable(pMonitor);
monSendPromReport();
}
monCleanupMonitorInfo(pMonitor);
}
void monSendReportBasic(SMonInfo *pMonitor){
char *pCont = tjsonToString(pMonitor->pJson);
if(tsMonitorLogProtocol){
if(pCont != NULL){
uInfoL("report cont basic:\n%s", pCont);
if (pMonitor->mmInfo.cluster.first_ep_dnode_id != 0) {
monGenBasicJsonBasic(pMonitor);
monGenClusterJsonBasic(pMonitor);
monSendReportBasic(pMonitor);
}
else{
uInfo("report cont basic is null");
}
}
if (pCont != NULL) {
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
if (taosSendHttpReport(tsMonitor.cfg.server, tsMonFwBasicUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
uError("failed to send monitor msg");
}
taosMemoryFree(pCont);
}
}
void monGenAndSendReportBasic() {
SMonInfo *pMonitor = monCreateMonitorInfo();
monGenBasicJsonBasic(pMonitor);
monGenClusterJsonBasic(pMonitor);
if (pMonitor->mmInfo.cluster.first_ep_dnode_id != 0) {
monSendReportBasic(pMonitor);
}
monCleanupMonitorInfo(pMonitor);

View File

@ -162,7 +162,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize) {
int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
if (alloced > tsRpcQueueMemoryAllowed) {
uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
tsRpcQueueMemoryUsed);
tsRpcQueueMemoryAllowed);
atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
taosMemoryFree(pNode);
terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
@ -494,6 +494,8 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
qall->start = queue->head;
qall->numOfItems = queue->numOfItems;
qall->memOfItems = queue->memOfItems;
qall->unAccessedNumOfItems = queue->numOfItems;
qall->unAccessMemOfItems = queue->memOfItems;
code = qall->numOfItems;
qinfo->ahandle = queue->ahandle;

View File

@ -28,8 +28,7 @@ from frame import *
class TDTestCase(TBase):
# fix
# fix
def FIX_TD_30686(self):
tdLog.info("check bug TD_30686 ...\n")
sqls = [
@ -49,6 +48,32 @@ class TDTestCase(TBase):
]
tdSql.checkDataMem(sql, results)
def FIX_TS_5105(self):
tdLog.info("check bug TS_5105 ...\n")
ts1 = "2024-07-03 10:00:00.000"
ts2 = "2024-07-03 13:00:00.000"
sqls = [
"drop database if exists ts_5105",
"create database ts_5105 cachemodel 'both';",
"use ts_5105;",
"CREATE STABLE meters (ts timestamp, current float) TAGS (location binary(64), groupId int);",
"CREATE TABLE d1001 USING meters TAGS ('California.B', 2);",
"CREATE TABLE d1002 USING meters TAGS ('California.S', 3);",
f"INSERT INTO d1001 VALUES ('{ts1}', 10);",
f"INSERT INTO d1002 VALUES ('{ts2}', 13);",
]
tdSql.executes(sqls)
sql = "select last(ts), last_row(ts) from meters;"
# 执行多次有些时候last_row(ts)会返回错误的值详见TS-5105
for i in range(1, 10):
tdLog.debug(f"{i}th execute sql: {sql}")
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, ts2)
tdSql.checkData(0, 1, ts2)
# run
def run(self):
tdLog.debug(f"start to excute {__file__}")
@ -57,11 +82,10 @@ class TDTestCase(TBase):
self.FIX_TD_30686()
# TS BUGS
self.FIX_TS_5105()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -1,44 +0,0 @@
# -*- coding: utf-8 -*-
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
class TDTestCase(TBase):
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
sqls = [
"drop database if exists ts_5101",
"create database ts_5101 cachemodel 'both';",
"use ts_5101;",
"CREATE STABLE meters (ts timestamp, current float) TAGS (location binary(64), groupId int);",
"CREATE TABLE d1001 USING meters TAGS ('California.B', 2);",
"CREATE TABLE d1002 USING meters TAGS ('California.S', 3);",
"INSERT INTO d1001 VALUES ('2024-07-03 10:00:00.000', 10);",
"INSERT INTO d1002 VALUES ('2024-07-03 13:00:00.000', 13);",
]
tdSql.executes(sqls)
# 执行多次有些时候last_row(ts)会返回错误的值详见TS-5105
for i in range(1, 10):
sql = "select last(ts), last_row(ts) from meters;"
tdLog.debug(f"{i}th execute sql: {sql}")
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, "2024-07-03 13:00:00.000")
tdSql.checkData(0, 1, "2024-07-03 13:00:00.000")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -20,7 +20,6 @@
,,y,army,./pytest.sh python3 ./test.py -f insert/test_column_tag_boundary.py
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_desc.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_null.py
,,y,army,./pytest.sh python3 ./test.py -f query/query_last_row_repeatly.py
,,y,army,./pytest.sh python3 ./test.py -f cluster/incSnapshot.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f query/query_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f query/accuracy/test_query_accuracy.py