Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-3.0

This commit is contained in:
Hongze Cheng 2024-02-28 13:23:02 +08:00
commit 1903a7a981
32 changed files with 340 additions and 55 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -27,6 +27,8 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
bool isLeader, bool restored);
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);

View File

@ -87,8 +87,10 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:

View File

@ -835,9 +835,11 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -14,6 +14,10 @@ IF (TD_STORAGE)
ENDIF ()
IF (DEFINED GRANT_CFG_INCLUDE_DIR)
add_definitions(-DGRANTS_CFG)
ENDIF()
target_include_directories(
dnode
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"

View File

@ -26,6 +26,10 @@ target_link_libraries(
mnode scheduler sdb wal transport cjson sync monitor executor qworker stream parser audit monitorfw
)
IF (DEFINED GRANT_CFG_INCLUDE_DIR)
add_definitions(-DGRANTS_CFG)
ENDIF()
IF (TD_GRANT)
TARGET_LINK_LIBRARIES(mnode grant)
ADD_DEFINITIONS(-D_GRANT)

View File

@ -86,6 +86,10 @@ typedef struct SOrphanTask {
int32_t nodeId;
} SOrphanTask;
typedef struct {
SMsgHead head;
} SMStreamHbRspMsg, SMStreamReqCheckpointRspMsg;
int32_t mndInitStream(SMnode *pMnode);
void mndCleanupStream(SMnode *pMnode);
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);

View File

@ -240,7 +240,7 @@ int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompac
SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact);
if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
if (mndTransAppendPrepareLog(pTrans, pVgRaw) != 0) {
sdbFreeRaw(pVgRaw);
return -1;
}

View File

@ -300,15 +300,16 @@ static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet*
}
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
int32_t size = (int32_t) taosArrayGetSize(pList);
for (int32_t i = 0; i < size; ++i) {
SVgroupVer* pVer = taosArrayGet(pList, i);
if (pVer->vgId == vgId) {
return pVer->ver;
}
}
mError("failed to find the vgId:%d for extract last version", vgId);
return -1;
mDebug("no data in vgId:%d for extract last version, set to be 0, total existed vgs:%d", vgId, size);
return 1;
}
static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) {
@ -472,6 +473,9 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream
int code =
doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
if (code != 0) {
mError("create stream task, code:%s", tstrerror(code));
// todo drop the added source tasks.
sdbRelease(pSdb, pVgroup);
return code;
}

View File

@ -877,7 +877,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
int64_t ts = taosGetTimestampMs();
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
// mWarn("checkpoint interval less than the threshold, ignore it");
return -1;
return TSDB_CODE_SUCCESS;
}
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
@ -2179,5 +2179,16 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
mndReleaseStream(pMnode, pStream);
taosThreadMutexUnlock(&execInfo.lock);
{
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRspMsg)};
rsp.pCont = rpcMallocCont(rsp.contLen);
SMsgHead* pHead = rsp.pCont;
pHead->vgId = htonl(req.nodeId);
tmsgSendRsp(&rsp);
pReq->info.handle = NULL; // disable auto rsp
}
return 0;
}

View File

@ -16,10 +16,6 @@
#include "mndStream.h"
#include "mndTrans.h"
typedef struct {
SMsgHead head;
} SMStreamHbRspMsg;
typedef struct SFailedCheckpointInfo {
int64_t streamUid;
int64_t checkpointId;

View File

@ -1217,7 +1217,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
if (numOfActions == 0) return 0;
if ((code = mndTransExecSingleActions(pMnode, pTrans, pArray, topHalf)) != 0) {
return -1;
return code;
}
int32_t numOfExecuted = 0;

View File

@ -180,6 +180,10 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg);
case TDMT_MND_STREAM_HEARTBEAT_RSP:
return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg);
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
return tqStreamProcessCheckpointReadyRsp(pSnode->pMeta, pMsg);
default:
sndError("invalid snode msg:%d", pMsg->msgType);
ASSERT(0);

View File

@ -259,6 +259,8 @@ int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);

View File

@ -1175,7 +1175,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
return TSDB_CODE_SUCCESS;
}
} else {
ASSERT(status == TASK_STATUS__HALT);
// ASSERT(status == TASK_STATUS__HALT);
if (status != TASK_STATUS__HALT) {
tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr);
// streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
}
}
// check if the checkpoint msg already sent or not.
@ -1225,3 +1229,11 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg);
}
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg);
}
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessCheckpointReadyRsp(pTq->pStreamMeta, pMsg);
}

View File

@ -485,6 +485,10 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
return code;
}
typedef struct SMStreamCheckpointReadyRspMsg {
SMsgHead head;
}SMStreamCheckpointReadyRspMsg;
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
@ -513,6 +517,18 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
streamProcessCheckpointReadyMsg(pTask);
streamMetaReleaseTask(pMeta, pTask);
{ // send checkpoint ready rsp
SRpcMsg rsp = {.code = 0, .info = pMsg->info, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)};
rsp.pCont = rpcMallocCont(rsp.contLen);
SMsgHead* pHead = rsp.pCont;
pHead->vgId = htonl(req.downstreamNodeId);
tmsgSendRsp(&rsp);
pMsg->info.handle = NULL; // disable auto rsp
}
return code;
}
@ -938,9 +954,17 @@ int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) {
return taosArrayGetSize(pMeta->pTaskList);
}
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
return doProcessDummyRspMsg(pMeta, pMsg);
}

View File

@ -484,22 +484,26 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
snprintf(pVnode->monitor.strDnodeId, TSDB_NODE_ID_LEN, "%"PRId32, pVnode->config.syncCfg.nodeInfo[0].nodeId);
snprintf(pVnode->monitor.strVgId, TSDB_VGROUP_ID_LEN, "%"PRId32, pVnode->config.vgId);
if(pVnode->monitor.insertCounter == NULL){
int32_t label_count = 7;
const char *sample_labels[] = {VNODE_METRIC_TAG_NAME_SQL_TYPE, VNODE_METRIC_TAG_NAME_CLUSTER_ID,
VNODE_METRIC_TAG_NAME_DNODE_ID, VNODE_METRIC_TAG_NAME_DNODE_EP,
VNODE_METRIC_TAG_NAME_VGROUP_ID, VNODE_METRIC_TAG_NAME_USERNAME,
VNODE_METRIC_TAG_NAME_RESULT};
taos_counter_t *counter = taos_counter_new(VNODE_METRIC_SQL_COUNT, "counter for insert sql",
label_count, sample_labels);
vInfo("vgId:%d, new metric:%p",TD_VID(pVnode), counter);
if(taos_collector_registry_register_metric(counter) == 1){
taos_counter_destroy(counter);
counter = taos_collector_registry_get_metric(VNODE_METRIC_SQL_COUNT);
vInfo("vgId:%d, get metric from registry:%p",TD_VID(pVnode), counter);
if(tsEnableMonitor && pVnode->monitor.insertCounter == NULL){
taos_counter_t *counter = NULL;
counter = taos_collector_registry_get_metric(VNODE_METRIC_SQL_COUNT);
if(counter == NULL){
int32_t label_count = 7;
const char *sample_labels[] = {VNODE_METRIC_TAG_NAME_SQL_TYPE, VNODE_METRIC_TAG_NAME_CLUSTER_ID,
VNODE_METRIC_TAG_NAME_DNODE_ID, VNODE_METRIC_TAG_NAME_DNODE_EP,
VNODE_METRIC_TAG_NAME_VGROUP_ID, VNODE_METRIC_TAG_NAME_USERNAME,
VNODE_METRIC_TAG_NAME_RESULT};
counter = taos_counter_new(VNODE_METRIC_SQL_COUNT, "counter for insert sql",
label_count, sample_labels);
vInfo("vgId:%d, new metric:%p",TD_VID(pVnode), counter);
if(taos_collector_registry_register_metric(counter) == 1){
taos_counter_destroy(counter);
counter = taos_collector_registry_get_metric(VNODE_METRIC_SQL_COUNT);
vInfo("vgId:%d, get metric from registry:%p",TD_VID(pVnode), counter);
}
}
pVnode->monitor.insertCounter = counter;
vInfo("vgId:%d, succeed to set metric:%p",TD_VID(pVnode), counter);
vInfo("vgId:%d, succeed to set metric:%p",TD_VID(pVnode), counter);
}
return pVnode;

View File

@ -16,6 +16,7 @@
#include "audit.h"
#include "cos.h"
#include "tencode.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tstrbuild.h"
#include "vnd.h"
@ -800,6 +801,10 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_HEARTBEAT_RSP:
return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in stream queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;
@ -1704,7 +1709,7 @@ _exit:
atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows);
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
if(pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0){
if(tsEnableMonitor && pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0){
const char *sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS, pVnode->monitor.strClusterId,
pVnode->monitor.strDnodeId, tsLocalEp, pVnode->monitor.strVgId,
pOriginalMsg->info.conn.user, "Success"};

View File

@ -373,6 +373,7 @@ static int32_t collectUseTable(const SName* pName, SHashObj* pTable) {
return taosHashPut(pTable, fullName, strlen(fullName), pName, sizeof(SName));
}
#ifdef BUILD_NO_CALL
static int32_t getViewMetaImpl(SParseContext* pParCxt, SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) {
#ifndef TD_ENTERPRISE
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
@ -396,6 +397,7 @@ static int32_t getViewMetaImpl(SParseContext* pParCxt, SParseMetaCache* pMetaCac
}
return code;
}
#endif
int32_t getTargetMetaImpl(SParseContext* pParCxt, SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta, bool couldBeView) {
int32_t code = TSDB_CODE_SUCCESS;
@ -774,9 +776,11 @@ static bool isAggFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsAggFunc(((SFunctionNode*)pNode)->funcId));
}
#ifdef BUILD_NO_CALL
static bool isSelectFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsSelectFunc(((SFunctionNode*)pNode)->funcId));
}
#endif
static bool isWindowPseudoColumnFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsWindowPseudoColumnFunc(((SFunctionNode*)pNode)->funcId));
@ -790,9 +794,11 @@ static bool isInterpPseudoColumnFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsInterpPseudoColumnFunc(((SFunctionNode*)pNode)->funcId));
}
#ifdef BUILD_NO_CALL
static bool isTimelineFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsTimelineFunc(((SFunctionNode*)pNode)->funcId));
}
#endif
static bool isImplicitTsFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsImplicitTsFunc(((SFunctionNode*)pNode)->funcId));
@ -7750,9 +7756,11 @@ static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STabl
return code;
}
#ifdef BUILD_NO_CALL
static bool isEventWindowQuery(SSelectStmt* pSelect) {
return NULL != pSelect->pWindow && QUERY_NODE_EVENT_WINDOW == nodeType(pSelect->pWindow);
}
#endif
static bool hasJsonTypeProjection(SSelectStmt* pSelect) {
SNode* pProj = NULL;

View File

@ -907,7 +907,6 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
SStreamChkptReadyInfo info = {.upStreamTaskId = pInfo->taskId, .upstreamNodeEpset = pInfo->epSet};
initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
info.msg.info.noResp = 1; // refactor later.
stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64
":0x%x (vgId:%d) idx:%d, vgId:%d",

View File

@ -934,9 +934,8 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
}
tEncoderClear(&encoder);
SRpcMsg msg = {.info.noResp = 1};
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
tmsgSendReq(&pTask->info.mnodeEpset, &msg);

View File

@ -67,7 +67,6 @@ class TDTestCase(TBase):
dirs = glob.glob(dnodesRootDir)
for dir in dirs:
if os.path.isdir(dir):
tdLog.debug("delete dir: %s " % (dnodesRootDir))
self.remove_directory(os.path.join(dir, "wal"))
sc.dnodeStart(1)
@ -88,7 +87,7 @@ class TDTestCase(TBase):
if bFinish:
break
self.timestamp_step = 1
self.timestamp_step = 1000
self.insert_rows = 6000
self.checkInsertCorrect()
self.checkAggCorrect()

View File

@ -0,0 +1,152 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import time
import random
import taos
import frame
import frame.etool
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
from frame.autogen import *
class TDTestCase(TBase):
updatecfgDict = {
}
def insertData(self):
tdLog.info(f"create table and insert data.")
self.stb = "stb"
self.db = "db"
self.childtable_count = 10
self.insert_rows = 10000
self.autoGen = AutoGen(startTs = 1600000000000*1000*1000, batch=500, fillOne=True)
self.autoGen.create_db(self.db, 2, 3, "precision 'ns'")
self.autoGen.create_stable(stbname = self.stb, tag_cnt = 5, column_cnt = 20, binary_len = 10, nchar_len = 5)
self.autoGen.create_child(self.stb, "child", self.childtable_count)
self.autoGen.insert_data(self.insert_rows, True)
tdLog.info("create view.")
tdSql.execute(f"use {self.db}")
sqls = [
"create view viewc0c1 as select c0,c1 from stb ",
"create view viewc0c1c2 as select c0,c1,c2 from stb ",
"create view viewc0c3 as select c0,c3 from stb where c3=1",
"create view viewc0c4c5 as select c4,c5 from stb ",
"create view viewc0c6 as select c0,c1,c6 from stb ",
"create view viewc0c7 as select c0,c1 from stb ",
"create view viewc0c7c8 as select c0,c7,c8 from stb where c8>0",
"create view viewc0c3c1 as select c0,c3,c1 from stb ",
"create view viewc2c4 as select c2,c4 from stb ",
"create view viewc2c5 as select c2,c5 from stb ",
]
tdSql.executes(sqls)
def checkView(self):
tdLog.info(f"check view like.")
# like
sql = f"show views like 'view%'"
tdSql.query(sql)
tdSql.checkRows(10)
sql = f"show views like 'vie_c0c1c2'"
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0,0,"viewc0c1c2")
sql = f"show views like '%c2c_'"
tdSql.query(sql)
tdSql.checkRows(2)
tdSql.checkData(0,0, "viewc2c4")
tdSql.checkData(1,0, "viewc2c5")
sql = f"show views like '%' "
tdSql.query(sql)
tdSql.checkRows(10)
# zero
sql = "show views like '_' "
tdSql.query(sql)
tdSql.checkRows(0)
sql = "show views like 'a%' "
tdSql.query(sql)
tdSql.checkRows(0)
def doQuery(self):
tdLog.info(f"do query.")
# __group_key
sql = f"select count(*) from {self.stb} "
tdSql.query(sql)
# column index 1 value same with 2
allRows = self.insert_rows * self.childtable_count
tdSql.checkFirstValue(sql, allRows)
def checkShow(self):
# not support
sql = "show accounts;"
tdSql.error(sql)
# check result
sql = "SHOW CLUSTER;"
tdSql.query(sql)
tdSql.checkRows(1)
sql = "SHOW COMPACTS;"
tdSql.query(sql)
tdSql.checkRows(0)
sql = "SHOW COMPACT 1;"
tdSql.query(sql)
tdSql.checkRows(0)
sql = "SHOW CLUSTER MACHINES;"
tdSql.query(sql)
tdSql.checkRows(1)
# run to check crash
sqls = [
"show scores;",
"SHOW CLUSTER VARIABLES",
"SHOW BNODES;",
]
tdSql.executes(sqls)
# run
def run(self):
tdLog.debug(f"start to excute {__file__}")
# insert data
self.insertData()
# check view
self.checkView()
# do action
self.doQuery()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"connection_pool_size": 8,
"num_of_records_per_req": 2000,
"num_of_records_per_req": 4000,
"prepared_rand": 1000,
"thread_count": 2,
"create_table_thread_count": 1,
@ -18,26 +18,27 @@
"drop": "yes",
"vgroups": 2,
"replica": 1,
"duration":"1d",
"keep": "3d,6d,30d"
"duration":"15d",
"flush_each_batch":"yes",
"keep": "60d,100d,200d"
},
"super_tables": [
{
"name": "stb",
"child_table_exists": "no",
"childtable_count": 4,
"insert_rows": 1000000,
"childtable_count": 2,
"insert_rows": 2000000,
"childtable_prefix": "d",
"insert_mode": "taosc",
"timestamp_step": 1000,
"start_timestamp":"now-13d",
"start_timestamp":"now-90d",
"columns": [
{ "type": "bool", "name": "bc"},
{ "type": "float", "name": "fc" },
{ "type": "double", "name": "dc"},
{ "type": "tinyint", "name": "ti", "values":["1"]},
{ "type": "tinyint", "name": "ti"},
{ "type": "smallint", "name": "si" },
{ "type": "int", "name": "ic" },
{ "type": "int", "name": "ic" ,"max": 1,"min": 1},
{ "type": "bigint", "name": "bi" },
{ "type": "utinyint", "name": "uti"},
{ "type": "usmallint", "name": "usi"},

View File

@ -58,8 +58,8 @@ class TDTestCase(TBase):
tdSql.execute(f"use {self.db}")
# come from s3_basic.json
self.childtable_count = 4
self.insert_rows = 1000000
self.childtable_count = 2
self.insert_rows = 2000000
self.timestamp_step = 1000
def createStream(self, sname):

View File

@ -14,15 +14,18 @@ import time
# Auto Gen class
#
class AutoGen:
def __init__(self, fillOne=False):
self.ts = 1600000000000
self.batch_size = 100
def __init__(self, startTs = 1600000000000, step = 1000, batch = 100, fillOne=False):
self.startTs = startTs
self.ts = startTs
self.step = step
self.batch_size = batch
self.fillOne = fillOne
seed = time.time() % 10000
random.seed(seed)
self.fillOne = fillOne
# set start ts
def set_start_ts(self, ts):
self.startTs = ts
self.ts = ts
# set batch size
@ -111,9 +114,9 @@ class AutoGen:
return ''.join(random.choice(letters) for i in range(count))
# create db
def create_db(self, dbname, vgroups = 2, replica = 1):
def create_db(self, dbname, vgroups = 2, replica = 1, others=""):
self.dbname = dbname
tdSql.execute(f'create database {dbname} vgroups {vgroups} replica {replica}')
tdSql.execute(f'create database {dbname} vgroups {vgroups} replica {replica} {others}')
# create table or stable
def create_stable(self, stbname, tag_cnt, column_cnt, binary_len, nchar_len):
@ -167,12 +170,12 @@ class AutoGen:
def insert_data(self, cnt, bContinue=False):
if not bContinue:
self.ts = 1600000000000
self.ts = self.startTs
currTs = 1600000000000
currTs = self.startTs
for i in range(self.child_cnt):
name = f"{self.child_name}{i}"
currTs = self.insert_data_child(name, cnt, self.batch_size, 1)
currTs = self.insert_data_child(name, cnt, self.batch_size, self.step)
self.ts = currTs
tdLog.info(f" insert data ok, child table={self.child_cnt} insert rows={cnt}")

View File

@ -15,10 +15,11 @@
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/snapshot.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f community/query/function/test_func_elapsed.py
,,y,army,./pytest.sh python3 ./test.py -f community/query/fill/fill_desc.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/incSnapshot.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/incSnapshot.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/query/query_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/splitVgroupByLearner.py -N 3
,,n,army,python3 ./test.py -f community/cmdline/fullopt.py
,,n,army,python3 ./test.py -f community/query/show.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/storage/oneStageComp.py -N 3 -L 3 -D 1
#

View File

@ -105,7 +105,8 @@ SWords shellCommands[] = {
{"create or replace aggregate function <anyword> as <anyword> outputtype <data_types> bufsize <anyword> language <udf_language>", 0, 0, NULL},
{"create user <anyword> pass <anyword> sysinfo 0;", 0, 0, NULL},
{"create user <anyword> pass <anyword> sysinfo 1;", 0, 0, NULL},
#ifdef TD_ENTERPRISE
#ifdef TD_ENTERPRISE
{"create view <anyword> as select", 0, 0, NULL},
{"compact database <db_name>", 0, 0, NULL},
#endif
{"describe <all_table>", 0, 0, NULL},
@ -162,13 +163,20 @@ SWords shellCommands[] = {
{"show create database <db_name> \\G;", 0, 0, NULL},
{"show create stable <stb_name> \\G;", 0, 0, NULL},
{"show create table <tb_name> \\G;", 0, 0, NULL},
#ifdef TD_ENTERPRISE
{"show create view <all_table> \\G;", 0, 0, NULL},
#endif
{"show connections;", 0, 0, NULL},
{"show compact", 0, 0, NULL},
{"show compacts;", 0, 0, NULL},
{"show cluster;", 0, 0, NULL},
{"show cluster alive;", 0, 0, NULL},
{"show cluster machines;", 0, 0, NULL},
{"show databases;", 0, 0, NULL},
{"show dnodes;", 0, 0, NULL},
{"show dnode <dnode_id> variables;", 0, 0, NULL},
{"show functions;", 0, 0, NULL},
{"show licences;", 0, 0, NULL},
{"show mnodes;", 0, 0, NULL},
{"show queries;", 0, 0, NULL},
// 80
@ -185,6 +193,7 @@ SWords shellCommands[] = {
{"show table distributed <all_table>", 0, 0, NULL},
{"show tags from <tb_name>", 0, 0, NULL},
{"show tags from <db_name>", 0, 0, NULL},
{"show table tags from <all_table>", 0, 0, NULL},
{"show topics;", 0, 0, NULL},
{"show transactions;", 0, 0, NULL},
{"show users;", 0, 0, NULL},
@ -194,6 +203,8 @@ SWords shellCommands[] = {
{"show vgroups;", 0, 0, NULL},
{"show consumers;", 0, 0, NULL},
{"show grants;", 0, 0, NULL},
{"show grants full;", 0, 0, NULL},
{"show grants logs;", 0, 0, NULL},
#ifdef TD_ENTERPRISE
{"split vgroup <vgroup_id>", 0, 0, NULL},
#endif
@ -302,6 +313,20 @@ char* key_systable[] = {
char* udf_language[] = {"\'Python\'", "\'C\'"};
// global keys can tips on anywhere
char* global_keys[] = {
"tbname",
"now",
"_wstart",
"_wend",
"_wduration",
"_qstart",
"_qend",
"_qduration",
"_qtag",
"_isfilled"
};
//
// ------- global variant define ---------
//
@ -341,8 +366,9 @@ bool waitAutoFill = false;
#define WT_VAR_KEYSELECT 20
#define WT_VAR_SYSTABLE 21
#define WT_VAR_LANGUAGE 22
#define WT_VAR_GLOBALKEYS 23
#define WT_VAR_CNT 23
#define WT_VAR_CNT 24
#define WT_TEXT 0xFF
@ -494,10 +520,12 @@ void showHelp() {
show connections;\n\
show cluster;\n\
show cluster alive;\n\
show cluster machines;\n\
show databases;\n\
show dnodes;\n\
show dnode <dnode_id> variables;\n\
show functions;\n\
show licences;\n\
show mnodes;\n\
show queries;\n\
show query <query_id> ;\n\
@ -513,6 +541,7 @@ void showHelp() {
show table distributed <all_table>;\n\
show tags from <tb_name>\n\
show tags from <db_name>\n\
show table tags from <all_table>\n\
show topics;\n\
show transactions;\n\
show users;\n\
@ -522,6 +551,8 @@ void showHelp() {
show vgroups;\n\
show consumers;\n\
show grants;\n\
show grants full;\n\
show grants logs;\n\
----- T ----- \n\
trim database <db_name>;\n\
----- U ----- \n\
@ -534,8 +565,12 @@ void showHelp() {
balance vgroup ;\n\
balance vgroup leader on <vgroup_id> \n\
compact database <db_name>; \n\
crate view <view_name> as select ...\n\
redistribute vgroup <vgroup_id> dnode <dnode_id> ;\n\
split vgroup <vgroup_id>;");
split vgroup <vgroup_id>;\n\
show compacts;\n\
show compact \n\
show create view <all_table>;");
#endif
printf("\n\n");
@ -699,6 +734,7 @@ bool shellAutoInit() {
GenerateVarType(WT_VAR_KEYSELECT, key_select, sizeof(key_select) / sizeof(char*));
GenerateVarType(WT_VAR_SYSTABLE, key_systable, sizeof(key_systable) / sizeof(char*));
GenerateVarType(WT_VAR_LANGUAGE, udf_language, sizeof(udf_language) / sizeof(char*));
GenerateVarType(WT_VAR_GLOBALKEYS, global_keys, sizeof(global_keys) / sizeof(char*));
return true;
}
@ -1800,6 +1836,13 @@ bool matchEnd(TAOS* con, SShellCmd* cmd) {
goto _return;
}
// global keys
if (fillWithType(con, cmd, last, WT_VAR_GLOBALKEYS)) {
ret = true;
goto _return;
}
_return:
taosMemoryFree(ps);
return ret;