other: merge rows.

This commit is contained in:
Haojun Liao 2024-05-28 11:13:09 +08:00
commit bd514fe4d3
20 changed files with 343 additions and 68 deletions

View File

@ -44,6 +44,11 @@ int32_t dmRun();
*/ */
void dmStop(); void dmStop();
/**
* for tests
*/
bool dmReadyForTest();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -415,6 +415,7 @@ typedef struct SSelectStmt {
int32_t returnRows; // EFuncReturnRows int32_t returnRows; // EFuncReturnRows
ETimeLineMode timeLineCurMode; ETimeLineMode timeLineCurMode;
ETimeLineMode timeLineResMode; ETimeLineMode timeLineResMode;
bool timeLineFromOrderBy;
bool isEmptyResult; bool isEmptyResult;
bool isSubquery; bool isSubquery;
bool hasAggFuncs; bool hasAggFuncs;
@ -453,6 +454,7 @@ typedef struct SSetOperator {
char stmtName[TSDB_TABLE_NAME_LEN]; char stmtName[TSDB_TABLE_NAME_LEN];
uint8_t precision; uint8_t precision;
ETimeLineMode timeLineResMode; ETimeLineMode timeLineResMode;
bool timeLineFromOrderBy;
bool joinContains; bool joinContains;
} SSetOperator; } SSetOperator;

View File

@ -2908,7 +2908,7 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt
} }
} else { } else {
if (varDataTLen(data + offset) > bytes) { if (varDataTLen(data + offset) > bytes) {
uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset), uError("var data length invalid, varDataTLen(data + offset):%d >= bytes:%d", (int)varDataTLen(data + offset),
bytes); bytes);
code = TSDB_CODE_PAR_VALUE_TOO_LONG; code = TSDB_CODE_PAR_VALUE_TOO_LONG;
goto _exit; goto _exit;

View File

@ -415,3 +415,7 @@ void dmReportStartup(const char *pName, const char *pDesc) {
} }
int64_t dmGetClusterId() { return globalDnode.data.clusterId; } int64_t dmGetClusterId() { return globalDnode.data.clusterId; }
bool dmReadyForTest() {
return dmInstance()->data.dnodeVer > 0;
}

View File

@ -20,10 +20,10 @@ class TestServer {
public: public:
bool Start(); bool Start();
void Stop(); void Stop();
bool runnning; bool running;
private: private:
TdThread threadId; TdThread threadId;
}; };
#endif /* _TD_TEST_SERVER_H_ */ #endif /* _TD_TEST_SERVER_H_ */

View File

@ -17,13 +17,11 @@
void* serverLoop(void* param) { void* serverLoop(void* param) {
TestServer* server = (TestServer*)param; TestServer* server = (TestServer*)param;
server->runnning = false;
if (dmInit() != 0) { if (dmInit() != 0) {
return NULL; return NULL;
} }
server->runnning = true;
if (dmRun() != 0) { if (dmRun() != 0) {
return NULL; return NULL;
} }
@ -33,13 +31,18 @@ void* serverLoop(void* param) {
} }
bool TestServer::Start() { bool TestServer::Start() {
tstrncpy(tsVersionName, "trial", strlen("trial"));
running = false;
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
taosThreadCreate(&threadId, &thAttr, serverLoop, this); taosThreadCreate(&threadId, &thAttr, serverLoop, this);
taosThreadAttrDestroy(&thAttr); taosThreadAttrDestroy(&thAttr);
taosMsleep(10000); while (!dmReadyForTest()) {
return runnning; taosMsleep(500);
}
running = true;
return running;
} }
void TestServer::Stop() { void TestServer::Stop() {

View File

@ -49,6 +49,7 @@ TEST_F(MndTestFunc, 01_Show_Func) {
} }
TEST_F(MndTestFunc, 02_Create_Func) { TEST_F(MndTestFunc, 02_Create_Func) {
#ifndef WINDOWS
{ {
SCreateFuncReq createReq = {0}; SCreateFuncReq createReq = {0};
strcpy(createReq.name, ""); strcpy(createReq.name, "");
@ -159,9 +160,11 @@ TEST_F(MndTestFunc, 02_Create_Func) {
test.SendShowReq(TSDB_MGMT_TABLE_FUNC, "ins_functions", ""); test.SendShowReq(TSDB_MGMT_TABLE_FUNC, "ins_functions", "");
EXPECT_EQ(test.GetShowRows(), 1); EXPECT_EQ(test.GetShowRows(), 1);
#endif
} }
TEST_F(MndTestFunc, 03_Retrieve_Func) { TEST_F(MndTestFunc, 03_Retrieve_Func) {
#ifndef WINDOWS
{ {
SRetrieveFuncReq retrieveReq = {0}; SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1; retrieveReq.numOfFuncs = 1;
@ -376,9 +379,11 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) {
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_FUNC_NOT_EXIST); ASSERT_EQ(pRsp->code, TSDB_CODE_MND_FUNC_NOT_EXIST);
} }
#endif
} }
TEST_F(MndTestFunc, 04_Drop_Func) { TEST_F(MndTestFunc, 04_Drop_Func) {
#ifndef WINDOWS
{ {
SDropFuncReq dropReq = {0}; SDropFuncReq dropReq = {0};
strcpy(dropReq.name, ""); strcpy(dropReq.name, "");
@ -441,9 +446,11 @@ TEST_F(MndTestFunc, 04_Drop_Func) {
test.SendShowReq(TSDB_MGMT_TABLE_FUNC, "ins_functions", ""); test.SendShowReq(TSDB_MGMT_TABLE_FUNC, "ins_functions", "");
EXPECT_EQ(test.GetShowRows(), 1); EXPECT_EQ(test.GetShowRows(), 1);
#endif
} }
TEST_F(MndTestFunc, 05_Actual_code) { TEST_F(MndTestFunc, 05_Actual_code) {
#ifndef WINDOWS
{ {
SCreateFuncReq createReq = {0}; SCreateFuncReq createReq = {0};
strcpy(createReq.name, "udf1"); strcpy(createReq.name, "udf1");
@ -507,4 +514,5 @@ TEST_F(MndTestFunc, 05_Actual_code) {
} }
tFreeSRetrieveFuncRsp(&retrieveRsp); tFreeSRetrieveFuncRsp(&retrieveRsp);
} }
} #endif
}

View File

@ -588,6 +588,13 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud
tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud); tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud);
} }
for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
SValue *pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
taosMemoryFree(pValue->pData);
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) /* && pLastCol->colVal.value.nData > 0*/) { if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) /* && pLastCol->colVal.value.nData > 0*/) {
taosMemoryFree(pLastCol->colVal.value.pData); taosMemoryFree(pLastCol->colVal.value.pData);
} }
@ -1072,6 +1079,8 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
SLastCol *PToFree = pLastCol; SLastCol *PToFree = pLastCol;
if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) { if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
taosMemoryFreeClear(PToFree);
rocksdb_free(values_list[i]);
continue; continue;
} }

View File

@ -512,7 +512,6 @@ static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB
} }
pInfo->isPrevRowSet = false; pInfo->isPrevRowSet = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -826,8 +825,12 @@ static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperato
SSDataBlock* pResBlock = pSliceInfo->pRes; SSDataBlock* pResBlock = pSliceInfo->pRes;
SInterval* pInterval = &pSliceInfo->interval; SInterval* pInterval = &pSliceInfo->interval;
while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && if (pSliceInfo->fillType == TSDB_FILL_NEXT || pSliceInfo->fillType == TSDB_FILL_LINEAR ||
pSliceInfo->fillType != TSDB_FILL_LINEAR) { pSliceInfo->pPrevGroupKey == NULL) {
return;
}
while (pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false); genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false);
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
@ -1069,6 +1072,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
// int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;

View File

@ -845,6 +845,7 @@ static int32_t selectStmtCopy(const SSelectStmt* pSrc, SSelectStmt* pDst) {
COPY_SCALAR_FIELD(precision); COPY_SCALAR_FIELD(precision);
COPY_SCALAR_FIELD(isEmptyResult); COPY_SCALAR_FIELD(isEmptyResult);
COPY_SCALAR_FIELD(timeLineResMode); COPY_SCALAR_FIELD(timeLineResMode);
COPY_SCALAR_FIELD(timeLineFromOrderBy);
COPY_SCALAR_FIELD(timeLineCurMode); COPY_SCALAR_FIELD(timeLineCurMode);
COPY_SCALAR_FIELD(hasAggFuncs); COPY_SCALAR_FIELD(hasAggFuncs);
COPY_SCALAR_FIELD(hasRepeatScanFuncs); COPY_SCALAR_FIELD(hasRepeatScanFuncs);
@ -862,6 +863,8 @@ static int32_t setOperatorCopy(const SSetOperator* pSrc, SSetOperator* pDst) {
COPY_CHAR_ARRAY_FIELD(stmtName); COPY_CHAR_ARRAY_FIELD(stmtName);
COPY_SCALAR_FIELD(precision); COPY_SCALAR_FIELD(precision);
COPY_SCALAR_FIELD(timeLineResMode); COPY_SCALAR_FIELD(timeLineResMode);
COPY_SCALAR_FIELD(timeLineFromOrderBy);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -959,7 +959,8 @@ static bool isTimeLineQuery(SNode* pStmt) {
return (TIME_LINE_MULTI == ((SSelectStmt*)pStmt)->timeLineCurMode) || return (TIME_LINE_MULTI == ((SSelectStmt*)pStmt)->timeLineCurMode) ||
(TIME_LINE_GLOBAL == ((SSelectStmt*)pStmt)->timeLineCurMode); (TIME_LINE_GLOBAL == ((SSelectStmt*)pStmt)->timeLineCurMode);
} else if (QUERY_NODE_SET_OPERATOR == nodeType(pStmt)) { } else if (QUERY_NODE_SET_OPERATOR == nodeType(pStmt)) {
return TIME_LINE_GLOBAL == ((SSetOperator*)pStmt)->timeLineResMode; return (TIME_LINE_MULTI == ((SSetOperator*)pStmt)->timeLineResMode) ||
(TIME_LINE_GLOBAL == ((SSetOperator*)pStmt)->timeLineResMode);
} else { } else {
return false; return false;
} }
@ -1000,18 +1001,64 @@ static bool isBlockTimeLineAlignedQuery(SNode* pStmt) {
return false; return false;
} }
SNodeList* buildPartitionListFromOrderList(SNodeList* pOrderList, int32_t nodesNum) {
SNodeList* pPartitionList = NULL;
SNode* pNode = NULL;
if (pOrderList->length <= nodesNum) {
return NULL;
}
pNode = nodesListGetNode(pOrderList, nodesNum);
SOrderByExprNode* pOrder = (SOrderByExprNode*)pNode;
if (!isPrimaryKeyImpl(pOrder->pExpr)) {
return NULL;
}
for (int32_t i = 0; i < nodesNum; ++i) {
pNode = nodesListGetNode(pOrderList, i);
pOrder = (SOrderByExprNode*)pNode;
nodesListMakeStrictAppend(&pPartitionList, nodesCloneNode(pOrder->pExpr));
}
return pPartitionList;
}
static bool isTimeLineAlignedQuery(SNode* pStmt) { static bool isTimeLineAlignedQuery(SNode* pStmt) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt; SSelectStmt* pSelect = (SSelectStmt*)pStmt;
if (!isTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) { if (!isTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) {
return false; return false;
} }
if (QUERY_NODE_SELECT_STMT != nodeType(((STempTableNode*)pSelect->pFromTable)->pSubquery)) { if (QUERY_NODE_SELECT_STMT == nodeType(((STempTableNode*)pSelect->pFromTable)->pSubquery)) {
return false; SSelectStmt* pSub = (SSelectStmt*)((STempTableNode*)pSelect->pFromTable)->pSubquery;
if (pSelect->pPartitionByList) {
if (!pSub->timeLineFromOrderBy && nodesListMatch(pSelect->pPartitionByList, pSub->pPartitionByList)) {
return true;
}
if (pSub->timeLineFromOrderBy && pSub->pOrderByList->length > 1) {
SNodeList* pPartitionList = buildPartitionListFromOrderList(pSub->pOrderByList, pSelect->pPartitionByList->length);
bool match = nodesListMatch(pSelect->pPartitionByList, pPartitionList);
nodesDestroyList(pPartitionList);
if (match) {
return true;
}
}
}
} }
SSelectStmt* pSub = (SSelectStmt*)((STempTableNode*)pSelect->pFromTable)->pSubquery; if (QUERY_NODE_SET_OPERATOR == nodeType(((STempTableNode*)pSelect->pFromTable)->pSubquery)) {
if (pSelect->pPartitionByList && nodesListMatch(pSelect->pPartitionByList, pSub->pPartitionByList)) { SSetOperator* pSub = (SSetOperator*)((STempTableNode*)pSelect->pFromTable)->pSubquery;
return true; if (pSelect->pPartitionByList && pSub->timeLineFromOrderBy && pSub->pOrderByList->length > 1) {
SNodeList* pPartitionList = buildPartitionListFromOrderList(pSub->pOrderByList, pSelect->pPartitionByList->length);
bool match = nodesListMatch(pSelect->pPartitionByList, pPartitionList);
nodesDestroyList(pPartitionList);
if (match) {
return true;
}
}
} }
return false; return false;
} }
@ -6025,9 +6072,19 @@ static void resetResultTimeline(SSelectStmt* pSelect) {
if ((QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) && isPrimaryKeyImpl(pOrder)) || if ((QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) && isPrimaryKeyImpl(pOrder)) ||
(QUERY_NODE_TEMP_TABLE != nodeType(pSelect->pFromTable) && isPrimaryKeyImpl(pOrder))) { (QUERY_NODE_TEMP_TABLE != nodeType(pSelect->pFromTable) && isPrimaryKeyImpl(pOrder))) {
pSelect->timeLineResMode = TIME_LINE_GLOBAL; pSelect->timeLineResMode = TIME_LINE_GLOBAL;
} else { return;
pSelect->timeLineResMode = TIME_LINE_NONE; } else if (pSelect->pOrderByList->length > 1) {
for (int32_t i = 1; i < pSelect->pOrderByList->length; ++i) {
pOrder = ((SOrderByExprNode*)nodesListGetNode(pSelect->pOrderByList, i))->pExpr;
if (isPrimaryKeyImpl(pOrder)) {
pSelect->timeLineResMode = TIME_LINE_MULTI;
pSelect->timeLineFromOrderBy = true;
return;
}
}
} }
pSelect->timeLineResMode = TIME_LINE_NONE;
} }
static int32_t replaceOrderByAliasForSelect(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t replaceOrderByAliasForSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
@ -6180,16 +6237,13 @@ static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pS
} }
snprintf(pRightExpr->aliasName, sizeof(pRightExpr->aliasName), "%s", pLeftExpr->aliasName); snprintf(pRightExpr->aliasName, sizeof(pRightExpr->aliasName), "%s", pLeftExpr->aliasName);
SNode* pProj = createSetOperProject(pSetOperator->stmtName, pLeft); SNode* pProj = createSetOperProject(pSetOperator->stmtName, pLeft);
if (QUERY_NODE_COLUMN == nodeType(pLeft) && QUERY_NODE_COLUMN == nodeType(pRight)) { bool isLeftPrimTs = isPrimaryKeyImpl(pLeft);
SColumnNode* pLCol = (SColumnNode*)pLeft; bool isRightPrimTs = isPrimaryKeyImpl(pRight);
SColumnNode* pRCol = (SColumnNode*)pRight;
if (isLeftPrimTs && isRightPrimTs) {
SColumnNode* pFCol = (SColumnNode*)pProj; SColumnNode* pFCol = (SColumnNode*)pProj;
if (pLCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID && pRCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { pFCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
pFCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; pFCol->isPrimTs = true;
if (pLCol->isPrimTs && pRCol->isPrimTs) {
pFCol->isPrimTs = true;
}
}
} }
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pSetOperator->pProjectionList, pProj)) { if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pSetOperator->pProjectionList, pProj)) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -6225,9 +6279,19 @@ static int32_t translateSetOperOrderBy(STranslateContext* pCxt, SSetOperator* pS
SNode* pOrder = ((SOrderByExprNode*)nodesListGetNode(pSetOperator->pOrderByList, 0))->pExpr; SNode* pOrder = ((SOrderByExprNode*)nodesListGetNode(pSetOperator->pOrderByList, 0))->pExpr;
if (isPrimaryKeyImpl(pOrder)) { if (isPrimaryKeyImpl(pOrder)) {
pSetOperator->timeLineResMode = TIME_LINE_GLOBAL; pSetOperator->timeLineResMode = TIME_LINE_GLOBAL;
} else { return code;
pSetOperator->timeLineResMode = TIME_LINE_NONE; } else if (pSetOperator->pOrderByList->length > 1) {
for (int32_t i = 1; i < pSetOperator->pOrderByList->length; ++i) {
pOrder = ((SOrderByExprNode*)nodesListGetNode(pSetOperator->pOrderByList, i))->pExpr;
if (isPrimaryKeyImpl(pOrder)) {
pSetOperator->timeLineResMode = TIME_LINE_MULTI;
pSetOperator->timeLineFromOrderBy = true;
return code;
}
}
} }
pSetOperator->timeLineResMode = TIME_LINE_NONE;
} }
return code; return code;
} }

View File

@ -544,6 +544,7 @@ void streamMetaCloseImpl(void* arg) {
// todo let's check the status for each task // todo let's check the status for each task
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t vgId = pTask->pMeta->vgId;
void* buf = NULL; void* buf = NULL;
int32_t len; int32_t len;
int32_t code; int32_t code;
@ -566,11 +567,12 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
tEncoderClear(&encoder); tEncoderClear(&encoder);
int64_t id[2] = {pTask->id.streamId, pTask->id.taskId}; int64_t id[2] = {pTask->id.streamId, pTask->id.taskId};
code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn); code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn);
if (code < 0) { if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno)); stError("s-task:%s vgId:%d task meta save to disk failed, code:%s", pTask->id.idStr, vgId, tstrerror(terrno));
} else { } else {
stDebug("s-task:%s vgId:%d stream task write to meta file", pTask->id.idStr, pTask->pMeta->vgId); stDebug("s-task:%s vgId:%d task meta save to disk", pTask->id.idStr, vgId);
} }
taosMemoryFree(buf); taosMemoryFree(buf);

View File

@ -0,0 +1,77 @@
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
from frame.eos import *
import random
import string
class TDTestCase(TBase):
"""Add test case to verify the complicated query accuracy
"""
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def prepare_data(self):
# database for case TS-4806
tdSql.execute("create database db_ts4806;")
tdSql.execute("use db_ts4806;")
# super table
tdSql.execute("create table st (ts timestamp, adl float, bdl float, cdl float, ady float, bdy float, cdy float) \
tags(pt_radio float, ct_ratio float, rated_cap float, ta_id varchar(128), id varchar(128), area_code \
varchar(128), zdy_flag int, elec_cust_name bigint,bureau_code bigint, fl_name varchar(32), classify_id \
varchar(128));")
# child table
tdSql.execute("create table ct_1 using st tags(1.2, 1.3, 3.4, '271000276', '30000001', '10001', 1, 10001, 2000001, 'beijing', '13169');")
tdSql.execute("create table ct_2 using st tags(2.1, 1.2, 3.3, '271000277', '30000002', '10002', 1, 10002, 2000002, 'shanghai', '13141');")
tdSql.execute("create table ct_3 using st tags(3.1, 4.2, 5.3, '271000278', '30000003', '10003', 0, 10003, 2000003, 'guangzhou', '13151');")
# insert data for ts4806
start_ts = 1705783972000
data = [
(1.1, 2.2, 3.3, 1.1, 2.2, 3.3),
(1.2, 2.3, 3.4, 1.2, 2.3, 3.4),
(1.3, 2.4, 3.5, 1.3, 2.4, 3.5),
(1.4, 2.5, 3.6, 1.4, 2.5, 3.6),
(1.5, 2.6, 3.7, 1.5, 2.6, 3.7),
(1.6, 2.7, 3.8, 1.6, 2.7, 3.8),
(1.7, 2.8, 3.9, 1.7, 2.8, 3.9),
(1.8, 2.9, 4.0, 1.8, 2.9, 4.0),
(1.9, 4.2, 4.1, 1.9, 3.0, 4.1),
(1.2, 3.1, 4.2, 2.0, 3.1, 4.2)
]
index = [1, 2, 5, 0, 7, 3, 8, 4, 6, 9]
for ct in ['ct_1', 'ct_2']:
for i in range(10):
sql = f"insert into {ct} values"
for j in range(1000):
sql += f"({start_ts + i * 1000 * 1000 + j * 1000}, {','.join([str(item) for item in data[index[i]]])}),"
sql += ";"
tdSql.execute(sql)
def test_ts4806(self):
tdSql.execute("use db_ts4806;")
tdSql.query("select _wstart, cj.id, count(*) from st cj where cj.ts >= '2024-01-21 04:52:52.000' and cj.ts <= ' 2024-01-21 07:39:31.000' \
and cj.zdy_flag = 1 and cj.id in ('30000001', '30000002') partition by cj.id event_window start with \
(CASE WHEN cj.adl >= cj.bdl AND cj.adl >= cj.cdl THEN cj.adl WHEN cj.bdl >= cj.adl AND cj.bdl >= cj.cdl \
THEN cj.bdl ELSE cj.cdl END) * cj.ct_ratio * 0.4 * 1.732 / cj.rated_cap > 1 end with (CASE WHEN cj.adl >= \
cj.bdl AND cj.adl >= cj.cdl THEN cj.adl WHEN cj.bdl >= cj.adl AND cj.bdl >= cj.cdl THEN cj.bdl ELSE cj.cdl \
END) * cj.ct_ratio * 0.4 * 1.732 / cj.rated_cap <= 1 HAVING count(*) >= 4 order by _wstart, cj.id;")
tdSql.checkRows(5)
tdSql.checkData(4, 1, '30000002')
tdSql.checkData(4, 2, 1001)
def run(self):
self.prepare_data()
self.test_ts4806()
def stop(self):
tdSql.execute("drop database db_ts4806;")
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -20,6 +20,7 @@
,,y,army,./pytest.sh python3 ./test.py -f community/query/fill/fill_desc.py -N 3 -L 3 -D 2 ,,y,army,./pytest.sh python3 ./test.py -f community/query/fill/fill_desc.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/incSnapshot.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f community/cluster/incSnapshot.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/query/query_basic.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f community/query/query_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/query/accuracy/test_query_accuracy.py
,,y,army,./pytest.sh python3 ./test.py -f community/insert/insert_basic.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f community/insert/insert_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/splitVgroupByLearner.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f community/cluster/splitVgroupByLearner.py -N 3
,,n,army,python3 ./test.py -f community/cmdline/fullopt.py ,,n,army,python3 ./test.py -f community/cmdline/fullopt.py
@ -994,6 +995,7 @@
,,n,system-test,python3 ./test.py -f eco-system/meta/database/keep_time_offset.py ,,n,system-test,python3 ./test.py -f eco-system/meta/database/keep_time_offset.py
#tsim test #tsim test
,,y,script,./test.sh -f tsim/query/timeline.sim
,,y,script,./test.sh -f tsim/join/join.sim ,,y,script,./test.sh -f tsim/join/join.sim
,,y,script,./test.sh -f tsim/tmq/basic2Of2ConsOverlap.sim ,,y,script,./test.sh -f tsim/tmq/basic2Of2ConsOverlap.sim
,,y,script,./test.sh -f tsim/parser/where.sim ,,y,script,./test.sh -f tsim/parser/where.sim

View File

@ -201,6 +201,9 @@ class TDCom:
self.cast_tag_stb_filter_des_select_elm = "ts, t1, t2, t3, t4, cast(t1 as TINYINT UNSIGNED), t6, t7, t8, t9, t10, cast(t2 as varchar(256)), t12, cast(t3 as bool)" self.cast_tag_stb_filter_des_select_elm = "ts, t1, t2, t3, t4, cast(t1 as TINYINT UNSIGNED), t6, t7, t8, t9, t10, cast(t2 as varchar(256)), t12, cast(t3 as bool)"
self.tag_count = len(self.tag_filter_des_select_elm.split(",")) self.tag_count = len(self.tag_filter_des_select_elm.split(","))
self.state_window_range = list() self.state_window_range = list()
self.custom_col_val = 0
self.part_val_list = [1, 2]
# def init(self, conn, logSql): # def init(self, conn, logSql):
# # tdSql.init(conn.cursor(), logSql) # # tdSql.init(conn.cursor(), logSql)
@ -1259,7 +1262,7 @@ class TDCom:
default_ctbname_index_start_num += 1 default_ctbname_index_start_num += 1
tdSql.execute(create_stable_sql) tdSql.execute(create_stable_sql)
def sgen_column_value_list(self, column_elm_list, need_null, ts_value=None): def sgen_column_value_list(self, column_elm_list, need_null, ts_value=None, additional_ts=None, custom_col_index=None, col_value_type=None, force_pk_val=None):
"""_summary_ """_summary_
Args: Args:
@ -1269,6 +1272,8 @@ class TDCom:
""" """
self.column_value_list = list() self.column_value_list = list()
self.ts_value = self.genTs()[0] self.ts_value = self.genTs()[0]
if additional_ts is not None:
self.additional_ts = self.genTs(additional_ts=additional_ts)[2]
if ts_value is not None: if ts_value is not None:
self.ts_value = ts_value self.ts_value = ts_value
@ -1292,7 +1297,22 @@ class TDCom:
for i in range(int(len(self.column_value_list)/2)): for i in range(int(len(self.column_value_list)/2)):
index_num = random.randint(0, len(self.column_value_list)-1) index_num = random.randint(0, len(self.column_value_list)-1)
self.column_value_list[index_num] = None self.column_value_list[index_num] = None
self.column_value_list = [self.ts_value] + self.column_value_list
if custom_col_index is not None:
if col_value_type == "Random":
pass
elif col_value_type == "Incremental":
self.column_value_list[custom_col_index] = self.custom_col_val
self.custom_col_val += 1
elif col_value_type == "Part_equal":
self.column_value_list[custom_col_index] = random.choice(self.part_val_list)
self.column_value_list = [self.ts_value] + [self.additional_ts] + self.column_value_list if additional_ts is not None else [self.ts_value] + self.column_value_list
if col_value_type == "Incremental" and custom_col_index==1:
self.column_value_list[custom_col_index] = self.custom_col_val if force_pk_val is None else force_pk_val
if col_value_type == "Part_equal" and custom_col_index==1:
self.column_value_list[custom_col_index] = random.randint(0, self.custom_col_val) if force_pk_val is None else force_pk_val
def screate_table(self, dbname=None, tbname="tb", use_name="table", column_elm_list=None, def screate_table(self, dbname=None, tbname="tb", use_name="table", column_elm_list=None,
count=1, default_tbname_prefix="tb", default_tbname_index_start_num=1, count=1, default_tbname_prefix="tb", default_tbname_index_start_num=1,
@ -1333,7 +1353,7 @@ class TDCom:
default_tbname_index_start_num += 1 default_tbname_index_start_num += 1
tdSql.execute(create_table_sql) tdSql.execute(create_table_sql)
def sinsert_rows(self, dbname=None, tbname=None, column_ele_list=None, ts_value=None, count=1, need_null=False): def sinsert_rows(self, dbname=None, tbname=None, column_ele_list=None, ts_value=None, count=1, need_null=False, custom_col_index=None, col_value_type="random"):
"""insert rows """insert rows
Args: Args:
@ -1353,7 +1373,7 @@ class TDCom:
if tbname is not None: if tbname is not None:
self.tbname = tbname self.tbname = tbname
self.sgen_column_value_list(column_ele_list, need_null, ts_value) self.sgen_column_value_list(column_ele_list, need_null, ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
# column_value_str = ", ".join(str(v) for v in self.column_value_list) # column_value_str = ", ".join(str(v) for v in self.column_value_list)
column_value_str = "" column_value_str = ""
for column_value in self.column_value_list: for column_value in self.column_value_list:
@ -1370,7 +1390,7 @@ class TDCom:
else: else:
for num in range(count): for num in range(count):
ts_value = self.genTs()[0] ts_value = self.genTs()[0]
self.sgen_column_value_list(column_ele_list, need_null, f'{ts_value}+{num}s') self.sgen_column_value_list(column_ele_list, need_null, f'{ts_value}+{num}s', custom_col_index=custom_col_index, col_value_type=col_value_type)
column_value_str = "" column_value_str = ""
for column_value in self.column_value_list: for column_value in self.column_value_list:
if column_value is None: if column_value is None:
@ -1777,7 +1797,7 @@ class TDCom:
self.sdelete_rows(tbname=self.ctb_name, start_ts=self.time_cast(self.record_history_ts, "-")) self.sdelete_rows(tbname=self.ctb_name, start_ts=self.time_cast(self.record_history_ts, "-"))
self.sdelete_rows(tbname=self.tb_name, start_ts=self.time_cast(self.record_history_ts, "-")) self.sdelete_rows(tbname=self.tb_name, start_ts=self.time_cast(self.record_history_ts, "-"))
def prepare_data(self, interval=None, watermark=None, session=None, state_window=None, state_window_max=127, interation=3, range_count=None, precision="ms", fill_history_value=0, ext_stb=None): def prepare_data(self, interval=None, watermark=None, session=None, state_window=None, state_window_max=127, interation=3, range_count=None, precision="ms", fill_history_value=0, ext_stb=None, custom_col_index=None, col_value_type="random"):
"""prepare stream data """prepare stream data
Args: Args:
@ -1840,8 +1860,8 @@ class TDCom:
if fill_history_value == 1: if fill_history_value == 1:
for i in range(self.range_count): for i in range(self.range_count):
ts_value = str(self.date_time)+f'-{self.default_interval*(i+1)}s' ts_value = str(self.date_time)+f'-{self.default_interval*(i+1)}s'
self.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value) self.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
self.sinsert_rows(tbname=self.tb_name, ts_value=ts_value) self.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
if i == 1: if i == 1:
self.record_history_ts = ts_value self.record_history_ts = ts_value
@ -1862,6 +1882,18 @@ class TDCom:
time.sleep(1) time.sleep(1)
return tbname return tbname
def get_group_id_from_stb(self, stbname):
tdSql.query(f'select distinct group_id from {stbname}')
cnt = 0
while len(tdSql.queryResult) == 0:
tdSql.query(f'select distinct group_id from {stbname}')
if cnt < self.default_interval:
cnt += 1
time.sleep(1)
else:
return False
return tdSql.queryResult[0][0]
def update_json_file_replica(self, json_file_path, new_replica_value, output_file_path=None): def update_json_file_replica(self, json_file_path, new_replica_value, output_file_path=None):
""" """
Read a JSON file, update the 'replica' value, and write the result back to a file. Read a JSON file, update the 'replica' value, and write the result back to a file.

View File

@ -0,0 +1,51 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database test;
sql use test;
sql CREATE STABLE `demo` (`_ts` TIMESTAMP, `faev` DOUBLE) TAGS (`deviceid` VARCHAR(256));
sql CREATE TABLE demo_1 USING demo (deviceid) TAGS ('1');
sql CREATE TABLE demo_2 USING demo (deviceid) TAGS ('2');
sql INSERT INTO demo_1 (_ts,faev) VALUES ('2023-11-30 00:00:00.000', 1.0);
sql INSERT INTO demo_1 (_ts,faev) VALUES ('2023-12-04 01:00:00.001', 2.0);
sql INSERT INTO demo_1 (_ts,faev) VALUES ('2023-12-04 02:00:00.002', 3.0);
sql INSERT INTO demo_1 (_ts,faev) VALUES ('2023-12-05 03:00:00.003', 4.0);
sql INSERT INTO demo_2 (_ts,faev) VALUES ('2023-11-30 00:00:00.000', 5.0);
sql INSERT INTO demo_2 (_ts,faev) VALUES ('2023-12-28 01:00:00.001', 6.0);
sql INSERT INTO demo_2 (_ts,faev) VALUES ('2023-12-28 02:00:00.002', 7.0);
sql INSERT INTO demo_2 (_ts,faev) VALUES ('2023-12-29 03:00:00.003', 8.0);
sql_error select diff(faev) from ((select ts, faev from demo union all select ts, faev from demo));
sql_error select diff(faev) from (select _ts, faev from demo union all select _ts, faev from demo order by faev, _ts);
sql_error select diff(faev) from (select _ts, faev from demo union all select _ts, faev from demo order by faev, _ts) partition by faev;
sql select diff(faev) from (select _ts, faev from demo union all select _ts + 1s, faev from demo order by faev, _ts) partition by faev;
sql_error select diff(faev) from (select _ts, faev, deviceid from demo union all select _ts + 1s, faev, deviceid from demo order by deviceid, _ts) partition by faev;
sql select diff(faev) from (select _ts, faev, deviceid from demo union all select _ts + 1s, faev, deviceid from demo order by faev, _ts, deviceid) partition by faev;
sql_error select diff(faev) from (select _ts, faev from demo);
sql_error select diff(faev) from (select _ts, faev from demo order by faev, _ts);
sql select diff(faev) from (select _ts, faev from demo order by faev, _ts) partition by faev;
sql_error select diff(faev) from (select _ts, faev, deviceid from demo order by faev, _ts) partition by deviceid;
sql_error select diff(faev) from (select _ts, faev, deviceid from demo order by deviceid, _ts) partition by faev;
sql select diff(faev) from (select _ts, faev, deviceid from demo order by faev, _ts, deviceid) partition by faev;
sql select deviceid, ts, diff(faev) as diff_faev FROM (SELECT deviceid, ts, faev FROM ((SELECT deviceid, ts, faev FROM (SELECT deviceid, _ts AS ts, faev, DIFF(ROUND(faev*1000)/1000) AS diff_faev FROM demo WHERE deviceid in ('201000008','K201000258') AND _ts >= '2023-12-01 00:00:00' AND _ts < '2024-01-01 00:00:00' PARTITION BY deviceid) WHERE diff_faev < 0)UNION ALL(SELECT deviceid, ts, faev FROM (SELECT deviceid, ts, faev, DIFF(ROUND(faev*1000)/1000) as diff_faev FROM (SELECT deviceid, _ts as ts , faev FROM demo WHERE deviceid in ('201000008','K201000258')AND _ts >= '2023-12-01 00:00:00' AND _ts < '2024-01-01 00:00:00' ORDER BY ts desc) PARTITION BY deviceid) WHERE diff_faev > 0)UNION ALL(SELECT deviceid, _wstart AS ts, LAST(faev) AS faev FROM demo WHERE deviceid in ('201000008','K201000258') AND _ts >= '2023-11-01 00:00:00' AND _ts < '2024-01-01 00:00:00' PARTITION BY deviceid INTERVAL(1n))) ORDER BY deviceid, ts) PARTITION by deviceid;
sql select deviceid, ts, diff(faev) as diff_faev FROM (SELECT deviceid, ts, faev FROM ((SELECT deviceid, ts, faev FROM (SELECT deviceid, _ts AS ts, faev, DIFF(ROUND(faev*1000)/1000) AS diff_faev FROM demo WHERE deviceid in ('201000008','K201000258') AND _ts >= '2023-12-01 00:00:00' AND _ts < '2024-01-01 00:00:00' PARTITION BY deviceid) WHERE diff_faev < 0)UNION ALL(SELECT deviceid, ts, faev FROM (SELECT deviceid, ts, faev, DIFF(ROUND(faev*1000)/1000) as diff_faev FROM (SELECT deviceid, _ts as ts , faev FROM demo WHERE deviceid in ('201000008','K201000258')AND _ts >= '2023-12-01 00:00:00' AND _ts < '2024-01-01 00:00:00' ORDER BY ts desc) PARTITION BY deviceid) WHERE diff_faev > 0)UNION ALL(SELECT deviceid, _wstart AS ts, LAST(faev) AS faev FROM demo WHERE deviceid in ('201000008','K201000258') AND _ts >= '2023-11-01 00:00:00' AND _ts < '2024-01-01 00:00:00' PARTITION BY deviceid INTERVAL(1n))) ORDER BY ts, deviceid) PARTITION by deviceid;
sql select deviceid, ts, diff(faev) as diff_faev FROM (SELECT deviceid, ts, faev FROM (SELECT deviceid, _wstart AS ts, LAST(faev) AS faev FROM demo WHERE deviceid in ('201000008','K201000258') AND _ts >= '2023-11-01 00:00:00' AND _ts < '2024-01-01 00:00:00' PARTITION BY deviceid INTERVAL(1n)) ORDER BY deviceid, ts) PARTITION by deviceid;
sql select deviceid, ts, diff(faev) as diff_faev FROM (SELECT deviceid, ts, faev FROM (SELECT deviceid, _wstart AS ts, LAST(faev) AS faev FROM demo WHERE deviceid in ('201000008','K201000258') AND _ts >= '2023-11-01 00:00:00' AND _ts < '2024-01-01 00:00:00' PARTITION BY deviceid INTERVAL(1n)) ORDER BY ts, deviceid) PARTITION by deviceid;
sql select deviceid, ts, diff(faev) as diff_faev FROM ((SELECT deviceid, ts, faev FROM (SELECT deviceid, _ts AS ts, faev, DIFF(ROUND(faev*1000)/1000) AS diff_faev FROM demo WHERE deviceid in ('201000008','K201000258') AND _ts >= '2023-12-01 00:00:00' AND _ts < '2024-01-01 00:00:00' PARTITION BY deviceid) WHERE diff_faev < 0)UNION ALL(SELECT deviceid, ts, faev FROM (SELECT deviceid, ts, faev, DIFF(ROUND(faev*1000)/1000) as diff_faev FROM (SELECT deviceid, _ts as ts , faev FROM demo WHERE deviceid in ('201000008','K201000258')AND _ts >= '2023-12-01 00:00:00' AND _ts < '2024-01-01 00:00:00' ORDER BY ts desc) PARTITION BY deviceid) WHERE diff_faev > 0)UNION ALL(SELECT deviceid, _wstart AS ts, LAST(faev) AS faev FROM demo WHERE deviceid in ('201000008','K201000258') AND _ts >= '2023-11-01 00:00:00' AND _ts < '2024-01-01 00:00:00' PARTITION BY deviceid INTERVAL(1n)) ORDER BY deviceid, ts) PARTITION by deviceid;
sql select deviceid, ts, diff(faev) as diff_faev FROM ((SELECT deviceid, ts, faev FROM (SELECT deviceid, _ts AS ts, faev, DIFF(ROUND(faev*1000)/1000) AS diff_faev FROM demo WHERE deviceid in ('201000008','K201000258') AND _ts >= '2023-12-01 00:00:00' AND _ts < '2024-01-01 00:00:00' PARTITION BY deviceid) WHERE diff_faev < 0)UNION ALL(SELECT deviceid, ts, faev FROM (SELECT deviceid, ts, faev, DIFF(ROUND(faev*1000)/1000) as diff_faev FROM (SELECT deviceid, _ts as ts , faev FROM demo WHERE deviceid in ('201000008','K201000258')AND _ts >= '2023-12-01 00:00:00' AND _ts < '2024-01-01 00:00:00' ORDER BY ts desc) PARTITION BY deviceid) WHERE diff_faev > 0)UNION ALL(SELECT deviceid, _wstart AS ts, LAST(faev) AS faev FROM demo WHERE deviceid in ('201000008','K201000258') AND _ts >= '2023-11-01 00:00:00' AND _ts < '2024-01-01 00:00:00' PARTITION BY deviceid INTERVAL(1n)) ORDER BY ts, deviceid) PARTITION by deviceid;
sql select deviceid, ts, diff(faev) as diff_faev FROM ((SELECT deviceid, ts, faev FROM (SELECT deviceid, _ts AS ts, faev, DIFF(ROUND(faev*1000)/1000) AS diff_faev FROM demo WHERE deviceid in ('201000008','K201000258') AND _ts >= '2023-12-01 00:00:00' AND _ts < '2024-01-01 00:00:00' PARTITION BY deviceid) WHERE diff_faev < 0)UNION ALL(SELECT deviceid, ts, faev FROM (SELECT deviceid, ts, faev, DIFF(ROUND(faev*1000)/1000) as diff_faev FROM (SELECT deviceid, _ts as ts , faev FROM demo WHERE deviceid in ('201000008','K201000258')AND _ts >= '2023-12-01 00:00:00' AND _ts < '2024-01-01 00:00:00' ORDER BY ts desc) PARTITION BY deviceid) WHERE diff_faev > 0) ORDER BY deviceid, ts) PARTITION by deviceid;
sql select deviceid, ts, diff(faev) as diff_faev FROM ((SELECT deviceid, ts, faev FROM (SELECT deviceid, _ts AS ts, faev, DIFF(ROUND(faev*1000)/1000) AS diff_faev FROM demo WHERE deviceid in ('201000008','K201000258') AND _ts >= '2023-12-01 00:00:00' AND _ts < '2024-01-01 00:00:00' PARTITION BY deviceid) WHERE diff_faev < 0)UNION ALL(SELECT deviceid, ts, faev FROM (SELECT deviceid, ts, faev, DIFF(ROUND(faev*1000)/1000) as diff_faev FROM (SELECT deviceid, _ts as ts , faev FROM demo WHERE deviceid in ('201000008','K201000258')AND _ts >= '2023-12-01 00:00:00' AND _ts < '2024-01-01 00:00:00' ORDER BY ts desc) PARTITION BY deviceid) WHERE diff_faev > 0) ORDER BY ts, deviceid) PARTITION by deviceid;
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -15,9 +15,12 @@ class TDTestCase:
def at_once_interval(self, interval, partition="tbname", delete=False, fill_value=None, fill_history_value=None, case_when=None): def at_once_interval(self, interval, partition="tbname", delete=False, fill_value=None, fill_history_value=None, case_when=None):
tdLog.info(f"*** testing stream at_once+interval: interval: {interval}, partition: {partition}, fill_history: {fill_history_value}, fill: {fill_value}, delete: {delete}, case_when: {case_when} ***") tdLog.info(f"*** testing stream at_once+interval: interval: {interval}, partition: {partition}, fill_history: {fill_history_value}, fill: {fill_value}, delete: {delete}, case_when: {case_when} ***")
col_value_type = "Incremental" if partition=="c1" else "random"
custom_col_index = 1 if partition=="c1" else None
self.tdCom.custom_col_val = 0
self.delete = delete self.delete = delete
self.tdCom.case_name = sys._getframe().f_code.co_name self.tdCom.case_name = sys._getframe().f_code.co_name
self.tdCom.prepare_data(interval=interval, fill_history_value=fill_history_value) self.tdCom.prepare_data(interval=interval, fill_history_value=fill_history_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "") self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
@ -76,15 +79,15 @@ class TDTestCase:
for i in range(self.tdCom.range_count): for i in range(self.tdCom.range_count):
ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s' ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s'
ts_cast_delete_value = self.tdCom.time_cast(ts_value) ts_cast_delete_value = self.tdCom.time_cast(ts_value)
self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value) self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
if i%2 == 0: if i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value) self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
if self.delete and i%2 != 0: if self.delete and i%2 != 0:
self.tdCom.sdelete_rows(tbname=self.tdCom.ctb_name, start_ts=ts_cast_delete_value) self.tdCom.sdelete_rows(tbname=self.tdCom.ctb_name, start_ts=ts_cast_delete_value)
self.tdCom.date_time += 1 self.tdCom.date_time += 1
self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value) self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
if i%2 == 0: if i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value) self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
if self.delete and i%2 != 0: if self.delete and i%2 != 0:
self.tdCom.sdelete_rows(tbname=self.tdCom.tb_name, start_ts=ts_cast_delete_value) self.tdCom.sdelete_rows(tbname=self.tdCom.tb_name, start_ts=ts_cast_delete_value)
self.tdCom.date_time += 1 self.tdCom.date_time += 1
@ -102,6 +105,7 @@ class TDTestCase:
if self.tdCom.subtable: if self.tdCom.subtable:
for tname in [self.stb_name, self.ctb_name]: for tname in [self.stb_name, self.ctb_name]:
group_id = self.tdCom.get_group_id_from_stb(f'{tname}_output')
tdSql.query(f'select * from {self.ctb_name}') tdSql.query(f'select * from {self.ctb_name}')
ptn_counter = 0 ptn_counter = 0
for c1_value in tdSql.queryResult: for c1_value in tdSql.queryResult:
@ -116,11 +120,11 @@ class TDTestCase:
tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}') tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}')
tdSql.query(f'select count(*) from `{tbname}`') tdSql.query(f'select count(*) from `{tbname}`')
elif partition == "tbname" and ptn_counter == 0: elif partition == "tbname" and ptn_counter == 0:
tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}') tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}_{tname}_output_{group_id}')
tdSql.query(f'select count(*) from `{tbname}`') tdSql.query(f'select count(*) from `{tbname}`')
ptn_counter += 1 ptn_counter += 1
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True)
group_id = self.tdCom.get_group_id_from_stb(f'{self.tb_name}_output')
tdSql.query(f'select * from {self.tb_name}') tdSql.query(f'select * from {self.tb_name}')
ptn_counter = 0 ptn_counter = 0
for c1_value in tdSql.queryResult: for c1_value in tdSql.queryResult:
@ -135,7 +139,7 @@ class TDTestCase:
tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}') tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}')
tdSql.query(f'select count(*) from `{tbname}`') tdSql.query(f'select count(*) from `{tbname}`')
elif partition == "tbname" and ptn_counter == 0: elif partition == "tbname" and ptn_counter == 0:
tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}') tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}_{self.tb_name}_output_{group_id}')
tdSql.query(f'select count(*) from `{tbname}`') tdSql.query(f'select count(*) from `{tbname}`')
ptn_counter += 1 ptn_counter += 1

View File

@ -15,9 +15,12 @@ class TDTestCase:
def at_once_session(self, session, ignore_expired=None, ignore_update=None, partition="tbname", delete=False, fill_history_value=None, case_when=None, subtable=True): def at_once_session(self, session, ignore_expired=None, ignore_update=None, partition="tbname", delete=False, fill_history_value=None, case_when=None, subtable=True):
tdLog.info(f"*** testing stream at_once+interval: session: {session}, ignore_expired: {ignore_expired}, ignore_update: {ignore_update}, partition: {partition}, delete: {delete}, fill_history: {fill_history_value}, case_when: {case_when}, subtable: {subtable} ***") tdLog.info(f"*** testing stream at_once+interval: session: {session}, ignore_expired: {ignore_expired}, ignore_update: {ignore_update}, partition: {partition}, delete: {delete}, fill_history: {fill_history_value}, case_when: {case_when}, subtable: {subtable} ***")
col_value_type = "Incremental" if partition=="c1" else "random"
custom_col_index = 1 if partition=="c1" else None
self.tdCom.custom_col_val = 0
self.delete = delete self.delete = delete
self.tdCom.case_name = sys._getframe().f_code.co_name self.tdCom.case_name = sys._getframe().f_code.co_name
self.tdCom.prepare_data(session=session, fill_history_value=fill_history_value) self.tdCom.prepare_data(session=session, fill_history_value=fill_history_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "") self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
@ -79,11 +82,11 @@ class TDTestCase:
if i == 0: if i == 0:
record_window_close_ts = window_close_ts record_window_close_ts = window_close_ts
for ts_value in [self.tdCom.date_time, window_close_ts]: for ts_value in [self.tdCom.date_time, window_close_ts]:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, need_null=True) self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, need_null=True, custom_col_index=custom_col_index, col_value_type=col_value_type)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, need_null=True) self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, need_null=True, custom_col_index=custom_col_index, col_value_type=col_value_type)
if self.tdCom.update and i%2 == 0: if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, need_null=True) self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, need_null=True, custom_col_index=custom_col_index, col_value_type=col_value_type)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, need_null=True) self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, need_null=True, custom_col_index=custom_col_index, col_value_type=col_value_type)
if self.delete and i%2 != 0: if self.delete and i%2 != 0:
dt = f'cast({self.tdCom.date_time-1} as timestamp)' dt = f'cast({self.tdCom.date_time-1} as timestamp)'
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=dt) self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=dt)
@ -166,6 +169,7 @@ class TDTestCase:
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(self.tdCom.record_history_ts, "-")) self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(self.tdCom.record_history_ts, "-"))
if self.tdCom.subtable: if self.tdCom.subtable:
group_id = self.tdCom.get_group_id_from_stb(f'{self.ctb_name}_output')
tdSql.query(f'select * from {self.ctb_name}') tdSql.query(f'select * from {self.ctb_name}')
ptn_counter = 0 ptn_counter = 0
for c1_value in tdSql.queryResult: for c1_value in tdSql.queryResult:
@ -182,11 +186,11 @@ class TDTestCase:
tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}') tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}')
tdSql.query(f'select count(*) from `{tbname}`') tdSql.query(f'select count(*) from `{tbname}`')
elif partition == "tbname" and ptn_counter == 0: elif partition == "tbname" and ptn_counter == 0:
tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}') tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}_{self.ctb_name}_output_{group_id}')
tdSql.query(f'select count(*) from `{tbname}`') tdSql.query(f'select count(*) from `{tbname}`')
ptn_counter += 1 ptn_counter += 1
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) if subtable is not None else tdSql.checkEqual(tdSql.queryResult[0][0] >= 0, True) tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) if subtable is not None else tdSql.checkEqual(tdSql.queryResult[0][0] >= 0, True)
group_id = self.tdCom.get_group_id_from_stb(f'{self.tb_name}_output')
tdSql.query(f'select * from {self.tb_name}') tdSql.query(f'select * from {self.tb_name}')
ptn_counter = 0 ptn_counter = 0
for c1_value in tdSql.queryResult: for c1_value in tdSql.queryResult:
@ -203,7 +207,7 @@ class TDTestCase:
tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}') tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}')
tdSql.query(f'select count(*) from `{tbname}`') tdSql.query(f'select count(*) from `{tbname}`')
elif partition == "tbname" and ptn_counter == 0: elif partition == "tbname" and ptn_counter == 0:
tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}') tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}_{self.tb_name}_output_{group_id}')
tdSql.query(f'select count(*) from `{tbname}`') tdSql.query(f'select count(*) from `{tbname}`')
ptn_counter += 1 ptn_counter += 1

View File

@ -140,8 +140,8 @@ ELSE ()
BUILD_COMMAND BUILD_COMMAND
COMMAND set CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client COMMAND set CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client
COMMAND set CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib COMMAND set CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib
COMMAND go build -a -o taosadapter.exe -ldflags "-s -w -X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'" COMMAND go build -a -o taosadapter.exe -ldflags "-s -w -X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}'"
COMMAND go build -a -o taosadapter-debug.exe -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'" COMMAND go build -a -o taosadapter-debug.exe -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}'"
INSTALL_COMMAND INSTALL_COMMAND
COMMAND cmake -E echo "Comparessing taosadapter.exe" COMMAND cmake -E echo "Comparessing taosadapter.exe"
@ -167,8 +167,8 @@ ELSE ()
PATCH_COMMAND PATCH_COMMAND
COMMAND git clean -f -d COMMAND git clean -f -d
BUILD_COMMAND BUILD_COMMAND
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'" COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}'"
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'" COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}'"
INSTALL_COMMAND INSTALL_COMMAND
COMMAND cmake -E echo "Copy taosadapter" COMMAND cmake -E echo "Copy taosadapter"
COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin
@ -192,19 +192,19 @@ ELSE ()
PATCH_COMMAND PATCH_COMMAND
COMMAND git clean -f -d COMMAND git clean -f -d
BUILD_COMMAND BUILD_COMMAND
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'" COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}'"
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'" # COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}'"
INSTALL_COMMAND INSTALL_COMMAND
COMMAND cmake -E echo "Comparessing taosadapter.exe" # COMMAND cmake -E echo "Comparessing taosadapter.exe"
COMMAND upx taosadapter || : # COMMAND upx taosadapter || :
COMMAND cmake -E echo "Copy taosadapter" COMMAND cmake -E echo "Copy taosadapter"
COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/ COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E echo "Copy taosadapter.toml" COMMAND cmake -E echo "Copy taosadapter.toml"
COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/ COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E copy ./taosadapter.service ${CMAKE_BINARY_DIR}/test/cfg/ COMMAND cmake -E copy ./taosadapter.service ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E echo "Copy taosadapter-debug" # COMMAND cmake -E echo "Copy taosadapter-debug"
COMMAND cmake -E copy taosadapter-debug ${CMAKE_BINARY_DIR}/build/bin # COMMAND cmake -E copy taosadapter-debug ${CMAKE_BINARY_DIR}/build/bin
) )
ENDIF () ENDIF ()
ENDIF () ENDIF ()

View File

@ -78,7 +78,7 @@ int buildStable(TAOS* pConn) {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ntba values(now,'hello')"); pRes = taos_query(pConn, "insert into ntba values(now + 1s,'hello')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to insert table ntba, reason:%s\n", taos_errstr(pRes)); printf("failed to insert table ntba, reason:%s\n", taos_errstr(pRes));
return -1; return -1;