fix create stream and create tsma

This commit is contained in:
wangjiaming0909 2025-02-17 14:57:47 +08:00
parent 44b1bbc6d3
commit b938dbd091
4 changed files with 47 additions and 17 deletions

View File

@ -280,15 +280,16 @@ uint8_t getScaleFromTypeMod(int32_t type, STypeMod mod) {
return 0; return 0;
} }
// bytes, 0, prec, scale
void fillBytesForDecimalType(int32_t *pBytes, int32_t type, uint8_t precision, uint8_t scale) { void fillBytesForDecimalType(int32_t *pBytes, int32_t type, uint8_t precision, uint8_t scale) {
*(char *)pBytes = tDataTypes[type].bytes; *pBytes = 0;
*((char *)pBytes + 1) = 0; *pBytes = tDataTypes[type].bytes << 24;
*((char *)pBytes + 2) = precision; *pBytes |= (uint32_t)precision << 8;
*((char *)pBytes + 3) = scale; *pBytes |= scale;
} }
void extractDecimalTypeInfoFromBytes(int32_t *pBytes, uint8_t *precision, uint8_t *scale) { void extractDecimalTypeInfoFromBytes(int32_t *pBytes, uint8_t *precision, uint8_t *scale) {
*scale = *(uint8_t *)pBytes; *precision = (uint8_t)((*pBytes >> 8) & 0xFF);
*precision = *((uint8_t *)pBytes + 2); *scale = (uint8_t)(*pBytes & 0xFF);
*pBytes &= 0xFF; *pBytes >>= 24;
} }

View File

@ -1595,8 +1595,8 @@ static int32_t mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
if (!pCxt->pCreateStreamReq->pTags) { if (!pCxt->pCreateStreamReq->pTags) {
return terrno; return terrno;
} }
SField f = {0}; SFieldWithOptions f = {0};
int32_t code = 0; int32_t code = 0;
if (pCxt->pSrcStb) { if (pCxt->pSrcStb) {
for (int32_t idx = 0; idx < pCxt->pCreateStreamReq->numOfTags - 1; ++idx) { for (int32_t idx = 0; idx < pCxt->pCreateStreamReq->numOfTags - 1; ++idx) {
SSchema *pSchema = &pCxt->pSrcStb->pTags[idx]; SSchema *pSchema = &pCxt->pSrcStb->pTags[idx];
@ -1631,7 +1631,8 @@ static int32_t mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
f.flags = COL_SMA_ON; f.flags = COL_SMA_ON;
tstrncpy(f.name, pExprNode->userAlias, TSDB_COL_NAME_LEN); tstrncpy(f.name, pExprNode->userAlias, TSDB_COL_NAME_LEN);
if (IS_DECIMAL_TYPE(f.type)) { if (IS_DECIMAL_TYPE(f.type)) {
fillBytesForDecimalType(&f.bytes, f.type, pExprNode->resType.precision, pExprNode->resType.scale); f.typeMod = decimalCalcTypeMod(pExprNode->resType.precision, pExprNode->resType.scale);
f.flags |= COL_HAS_TYPE_MOD;
} }
if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pCols, &f)) { if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pCols, &f)) {
code = terrno; code = terrno;
@ -1800,7 +1801,7 @@ static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
} }
} }
if (LIST_LENGTH(pProjects) > 0) { if (LIST_LENGTH(pProjects) > 0) {
createStreamReq.pCols = taosArrayInit(LIST_LENGTH(pProjects), sizeof(SField)); createStreamReq.pCols = taosArrayInit(LIST_LENGTH(pProjects), sizeof(SFieldWithOptions));
if (!createStreamReq.pCols) { if (!createStreamReq.pCols) {
code = terrno; code = terrno;
goto _OVER; goto _OVER;

View File

@ -1173,7 +1173,7 @@ class DecimalTest : public ::testing::Test {
static constexpr const char* host = "127.0.0.1"; static constexpr const char* host = "127.0.0.1";
static constexpr const char* user = "root"; static constexpr const char* user = "root";
static constexpr const char* passwd = "taosdata"; static constexpr const char* passwd = "taosdata";
static constexpr const char* db = "test_api"; static constexpr const char* db = "test";
public: public:
void SetUp() override { void SetUp() override {
@ -1193,6 +1193,18 @@ TEST_F(DecimalTest, insert) {
} }
TEST(decimal, fillDecimalInfoInBytes) {
auto d = getDecimalType(10, 2);
int32_t bytes = 0;
fillBytesForDecimalType(&bytes, d.type, d.precision, d.scale);
uint8_t prec = 0, scale = 0;
extractDecimalTypeInfoFromBytes(&bytes, &prec, &scale);
ASSERT_EQ(bytes, tDataTypes[d.type].bytes);
ASSERT_EQ(prec, d.precision);
ASSERT_EQ(scale, d.scale);
}
TEST_F(DecimalTest, api_taos_fetch_rows) { TEST_F(DecimalTest, api_taos_fetch_rows) {
const char* host = "127.0.0.1"; const char* host = "127.0.0.1";
const char* user = "root"; const char* user = "root";
@ -1259,9 +1271,10 @@ TEST_F(DecimalTest, api_taos_fetch_rows) {
ASSERT_EQ(t, TSDB_DATA_TYPE_DECIMAL64); ASSERT_EQ(t, TSDB_DATA_TYPE_DECIMAL64);
auto check_type_mod = [](char* pStart, uint8_t prec, uint8_t scale, int32_t bytes) { auto check_type_mod = [](char* pStart, uint8_t prec, uint8_t scale, int32_t bytes) {
ASSERT_EQ(*pStart, bytes); int32_t d = *(int32_t*)pStart;
ASSERT_EQ(*(pStart + 2), prec); ASSERT_EQ(d >> 24, bytes);
ASSERT_EQ(*(pStart + 3), scale); ASSERT_EQ((d & 0xFF00) >> 8, prec);
ASSERT_EQ(d & 0xFF, scale);
}; };
check_type_mod(p + 1, 10, 2, 8); check_type_mod(p + 1, 10, 2, 8);

View File

@ -3,6 +3,7 @@ from re import A
import time import time
import threading import threading
import secrets import secrets
import query
from tag_lite import column from tag_lite import column
from util.log import * from util.log import *
from util.sql import * from util.sql import *
@ -580,9 +581,12 @@ class TDTestCase:
def test_decimal_and_stream(self): def test_decimal_and_stream(self):
create_stream = f'CREATE STREAM {self.stream_name} FILL_HISTORY 1 INTO {self.db_name}.{self.stream_out_stb} AS SELECT _wstart, count(c1), avg(c2), sum(c3) FROM {self.db_name}.{self.stable_name} INTERVAL(10s)' create_stream = f'CREATE STREAM {self.stream_name} FILL_HISTORY 1 INTO {self.db_name}.{self.stream_out_stb} AS SELECT _wstart, count(c1), avg(c2), sum(c3) FROM {self.db_name}.{self.stable_name} INTERVAL(10s)'
tdSql.execute(create_stream, queryTimes=1, show=True) tdSql.execute(create_stream, queryTimes=1, show=True)
self.wait_query_result(f"select count(*) from {self.db_name}.{self.stream_out_stb}", [(50,)], 30)
def test_decimal_and_tsma(self): def test_decimal_and_tsma(self):
pass create_tsma = f"CREATE TSMA {self.tsma_name} ON {self.db_name}.{self.stable_name} FUNCTION(count(c1), min(c2), max(c3), avg(C3)) INTERVAL(1m)"
tdSql.execute(create_tsma, queryTimes=1, show=True)
self.wait_query_result(f"select count(*) from {self.db_name}.{self.tsma_name}_tsma_res_stb_", [(9*self.c_table_num,)], 30)
def run(self): def run(self):
self.test_decimal_ddl() self.test_decimal_ddl()
@ -595,6 +599,17 @@ class TDTestCase:
tdSql.close() tdSql.close()
tdLog.success(f"{__file__} successfully executed") tdLog.success(f"{__file__} successfully executed")
def wait_query_result(self, sql: str, expect_result, times):
for i in range(times):
tdLog.info(f"wait query result for {sql}, times: {i}")
tdSql.query(sql, queryTimes=1)
results = tdSql.queryResult
if results != expect_result:
time.sleep(1)
continue
return True
tdLog.exit(f"wait query result timeout for {sql} failed after {times} time, expect {expect_result}, but got {results}")
event = threading.Event() event = threading.Event()