Merge branch '3.0' into feat/TS-4243-3.0

This commit is contained in:
Minglei Jin 2024-04-10 11:18:19 +08:00
commit dcd9901d5f
20 changed files with 562 additions and 78 deletions

View File

@ -496,6 +496,11 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
if (code != 0) {
pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1;
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes, pInst->totalDnodes);
taosThreadMutexUnlock(&clientHbMgr.lock);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tFreeClientHbBatchRsp(&pRsp);
return -1;
}
if (rspNum) {

View File

@ -822,7 +822,7 @@ _OVER:
"msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
"role:%s, redirect numOfEps:%d inUse:%d, type:%s",
pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, state.restored,
syncStr(state.restored), epSet.numOfEps, epSet.inUse, TMSG_INFO(pMsg->msgType));
syncStr(state.state), epSet.numOfEps, epSet.inUse, TMSG_INFO(pMsg->msgType));
if (epSet.numOfEps <= 0) return -1;

View File

@ -923,13 +923,14 @@ int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) {
STaskId hId = pStreamTask->hTaskInfo.id;
SStreamTask* pTask = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId);
if (pTask == NULL) {
// todo handle error
tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t) hId.taskId);
return TSDB_CODE_SUCCESS;
}
doStartFillhistoryStep2(pTask, pStreamTask, pTq);
streamMetaReleaseTask(pMeta, pTask);
return 0;
return TSDB_CODE_SUCCESS;
}
// this function should be executed by only one thread, so we set an sentinel to protect this function

View File

@ -311,7 +311,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
totalRows += pBlock->info.rows;
*totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW);

View File

@ -2168,6 +2168,9 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
}
pTableListInfo->oneTableForEachGroup = groupByTbname;
if (numOfTables == 1 && pTableListInfo->idInfo.tableType == TSDB_CHILD_TABLE) {
pTableListInfo->oneTableForEachGroup = true;
}
if (groupSort && groupByTbname) {
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);

View File

@ -889,14 +889,15 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
if (pTableScanInfo->countState < TABLE_COUNT_STATE_END) {
STableListInfo* pTableListInfo = pTableScanInfo->base.pTableListInfo;
if (pTableListInfo->oneTableForEachGroup || pTableListInfo->groupOffset) { // group by tbname, group by tag + sort
if (pTableListInfo->oneTableForEachGroup || pTableListInfo->groupOffset) { // group by tbname, group by tag + sort
if (pTableScanInfo->countState < TABLE_COUNT_STATE_PROCESSED) {
pTableScanInfo->countState = TABLE_COUNT_STATE_PROCESSED;
STableKeyInfo* pStart =
(STableKeyInfo*)tableListGetInfo(pTableScanInfo->base.pTableListInfo, pTableScanInfo->tableStartIndex);
if (NULL == pStart) return NULL;
return getBlockForEmptyTable(pOperator, pStart);
}
} else { // group by tag + no sort
} else { // group by tag + no sort
int32_t numOfTables = tableListGetSize(pTableListInfo);
if (pTableScanInfo->tableEndIndex + 1 >= numOfTables) {
// get empty group, mark processed & rm from hash

View File

@ -3598,15 +3598,15 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
prepareBuf(pCtx);
SWinKey key;
SWinKey key = {0};
if (pCtx->saveHandle.pBuf == NULL) {
SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0);
if (pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t skey = *(int64_t*)colDataGetData(pColInfo, rowIndex);
key.groupId = pSrcBlock->info.id.groupId;
key.ts = skey;
SColumnInfoData* pColInfo = pCtx->input.pPTS;
if (!pColInfo || pColInfo->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0);
}
ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
key.groupId = pSrcBlock->info.id.groupId;
key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);;
}
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);

View File

@ -1576,11 +1576,13 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql,
case TSDB_DATA_TYPE_NCHAR: {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t len = 0;
char* pUcs4 = taosMemoryCalloc(1, pSchema->bytes - VARSTR_HEADER_SIZE);
int64_t realLen = pToken->n << 2;
if (realLen > pSchema->bytes - VARSTR_HEADER_SIZE) realLen = pSchema->bytes - VARSTR_HEADER_SIZE;
char* pUcs4 = taosMemoryMalloc(realLen);
if (NULL == pUcs4) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)pUcs4, pSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)pUcs4, realLen, &len)) {
taosMemoryFree(pUcs4);
if (errno == E2BIG) {
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);

View File

@ -2116,7 +2116,6 @@ static int32_t translateMultiResFunc(STranslateContext* pCxt, SFunctionNode* pFu
}
if (tsKeepColumnName && 1 == LIST_LENGTH(pFunc->pParameterList) && !pFunc->node.asAlias && !pFunc->node.asParam) {
strcpy(pFunc->node.userAlias, ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->userAlias);
strcpy(pFunc->node.aliasName, pFunc->node.userAlias);
}
return TSDB_CODE_SUCCESS;
}
@ -2708,6 +2707,29 @@ static EDealRes rewriteExprToGroupKeyFunc(STranslateContext* pCxt, SNode** pNode
return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
static bool isTbnameFuction(SNode* pNode) {
return QUERY_NODE_FUNCTION == nodeType(pNode) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pNode)->funcType;
}
static bool hasTbnameFunction(SNodeList* pPartitionByList) {
SNode* pPartKey = NULL;
FOREACH(pPartKey, pPartitionByList) {
if (isTbnameFuction(pPartKey)) {
return true;
}
}
return false;
}
static bool fromSubtable(SNode* table) {
if (NULL == table) return false;
if (table->type == QUERY_NODE_REAL_TABLE && ((SRealTableNode*)table)->pMeta &&
((SRealTableNode*)table)->pMeta->tableType == TSDB_CHILD_TABLE) {
return true;
}
return false;
}
static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
STranslateContext* pCxt = (STranslateContext*)pContext;
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
@ -2719,15 +2741,25 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
}
SNode* pGroupNode = NULL;
FOREACH(pGroupNode, getGroupByList(pCxt)) {
if (nodesEqualNode(getGroupByNode(pGroupNode), *pNode)) {
SNode* pActualNode = getGroupByNode(pGroupNode);
if (nodesEqualNode(pActualNode, *pNode)) {
return DEAL_RES_IGNORE_CHILD;
}
if (isTbnameFuction(pActualNode) && QUERY_NODE_COLUMN == nodeType(*pNode) &&
((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) {
return rewriteExprToGroupKeyFunc(pCxt, pNode);
}
}
SNode* pPartKey = NULL;
bool partionByTbname = hasTbnameFunction(pSelect->pPartitionByList);
FOREACH(pPartKey, pSelect->pPartitionByList) {
if (nodesEqualNode(pPartKey, *pNode)) {
return rewriteExprToGroupKeyFunc(pCxt, pNode);
}
if ((partionByTbname) && QUERY_NODE_COLUMN == nodeType(*pNode) &&
((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) {
return rewriteExprToGroupKeyFunc(pCxt, pNode);
}
}
if (NULL != pSelect->pWindow && QUERY_NODE_STATE_WINDOW == nodeType(pSelect->pWindow)) {
if (nodesEqualNode(((SStateWindowNode*)pSelect->pWindow)->pExpr, *pNode)) {
@ -2791,11 +2823,19 @@ static EDealRes doCheckAggColCoexist(SNode** pNode, void* pContext) {
return DEAL_RES_IGNORE_CHILD;
}
SNode* pPartKey = NULL;
bool partionByTbname = false;
if (fromSubtable(((SSelectStmt*)pCxt->pTranslateCxt->pCurrStmt)->pFromTable) ||
hasTbnameFunction(((SSelectStmt*)pCxt->pTranslateCxt->pCurrStmt)->pPartitionByList)) {
partionByTbname = true;
}
FOREACH(pPartKey, ((SSelectStmt*)pCxt->pTranslateCxt->pCurrStmt)->pPartitionByList) {
if (nodesEqualNode(pPartKey, *pNode)) {
return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode);
}
}
if (partionByTbname && QUERY_NODE_COLUMN == nodeType(*pNode) && ((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) {
return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode);
}
if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) {
pCxt->existCol = true;
}
@ -3966,24 +4006,13 @@ static int32_t checkStateExpr(STranslateContext* pCxt, SNode* pNode) {
return TSDB_CODE_SUCCESS;
}
static bool hasPartitionByTbname(SNodeList* pPartitionByList) {
SNode* pPartKey = NULL;
FOREACH(pPartKey, pPartitionByList) {
if (QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) {
return true;
}
}
return false;
}
static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (!pCxt->createStream) {
return TSDB_CODE_SUCCESS;
}
if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!hasPartitionByTbname(pSelect->pPartitionByList)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"State window for stream on super table must patitioned by table name");
!hasTbnameFunction(pSelect->pPartitionByList)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
}
if ((SRealTableNode*)pSelect->pFromTable && hasPkInTable(((SRealTableNode*)pSelect->pFromTable)->pMeta)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
@ -7679,12 +7708,12 @@ static int32_t translateKillTransaction(STranslateContext* pCxt, SKillStmt* pStm
static bool crossTableWithoutAggOper(SSelectStmt* pSelect) {
return NULL == pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc &&
!pSelect->hasInterpFunc && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!hasPartitionByTbname(pSelect->pPartitionByList);
!hasTbnameFunction(pSelect->pPartitionByList);
}
static bool crossTableWithUdaf(SSelectStmt* pSelect) {
return pSelect->hasUdaf && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!hasPartitionByTbname(pSelect->pPartitionByList);
!hasTbnameFunction(pSelect->pPartitionByList);
}
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
@ -7989,7 +8018,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if ( (SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta
&& TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType
&& !hasPartitionByTbname(pSelect->pPartitionByList)
&& !hasTbnameFunction(pSelect->pPartitionByList)
&& pSelect->pWindow != NULL && pSelect->pWindow->type == QUERY_NODE_EVENT_WINDOW) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Event window for stream on super table must patitioned by table name");
@ -8024,7 +8053,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
if (pSelect->pWindow != NULL && pSelect->pWindow->type == QUERY_NODE_COUNT_WINDOW) {
if ( (SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta
&& TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType
&& !hasPartitionByTbname(pSelect->pPartitionByList) ) {
&& !hasTbnameFunction(pSelect->pPartitionByList) ) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Count window for stream on super table must patitioned by table name");
}

View File

@ -522,6 +522,9 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
} else if (pSelect->pPartitionByList) {
isCountByTag = !keysHasCol(pSelect->pPartitionByList);
}
if (pScan->tableType == TSDB_CHILD_TABLE) {
isCountByTag = true;
}
}
pScan->isCountByTag = isCountByTag;

View File

@ -2999,7 +2999,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1));
if (pFunc->hasPk) {
if (LIST_LENGTH(pFunc->pParameterList) != 2) {
planError("last func which has pk but its parameter list length is not 2");
planError("last func which has pk but its parameter list length is not %d", 2);
nodesClearList(cxt.pLastCols);
taosArrayDestroy(isDuplicateCol);
return TSDB_CODE_PLAN_INTERNAL_ERROR;

View File

@ -276,16 +276,18 @@ int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
int32_t ret = -1;
int32_t errcode = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
void* pHead = NULL;
int32_t contLen = 0;
SVArbSetAssignedLeaderReq req = {0};
if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
errcode = terrno;
goto _OVER;
}
int32_t errcode = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
if (ths->arbTerm > req.arbTerm) {
sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
ths->arbTerm, req.arbTerm);
@ -294,50 +296,58 @@ int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) == 0) {
if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
raftStoreNextTerm(ths);
if (terrno != TSDB_CODE_SUCCESS) {
sError("vgId:%d, failed to set next term since:%s", ths->vgId, terrstr());
goto _OVER;
}
syncNodeBecomeAssignedLeader(ths);
if (syncNodeAppendNoop(ths) < 0) {
sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, terrstr());
}
}
errcode = TSDB_CODE_SUCCESS;
} else {
if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
req.memberToken);
goto _OVER;
}
if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
terrno = TSDB_CODE_SUCCESS;
raftStoreNextTerm(ths);
if (terrno != TSDB_CODE_SUCCESS) {
sError("vgId:%d, failed to set next term since:%s", ths->vgId, terrstr());
errcode = terrno;
goto _OVER;
}
syncNodeBecomeAssignedLeader(ths);
if (syncNodeAppendNoop(ths) < 0) {
sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, terrstr());
}
}
SVArbSetAssignedLeaderRsp rsp = {0};
rsp.arbToken = req.arbToken;
rsp.memberToken = req.memberToken;
rsp.vgId = ths->vgId;
int32_t contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
if (contLen <= 0) {
sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
terrno = TSDB_CODE_OUT_OF_MEMORY;
errcode = terrno;
goto _OVER;
}
void* pHead = rpcMallocCont(contLen);
pHead = rpcMallocCont(contLen);
if (!pHead) {
sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
terrno = TSDB_CODE_OUT_OF_MEMORY;
errcode = terrno;
goto _OVER;
}
if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
terrno = TSDB_CODE_OUT_OF_MEMORY;
errcode = terrno;
rpcFreeCont(pHead);
goto _OVER;
}
errcode = TSDB_CODE_SUCCESS;
ret = 0;
_OVER:;
SRpcMsg rspMsg = {
.code = errcode,
.pCont = pHead,
@ -347,9 +357,6 @@ int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
tmsgSendRsp(&rspMsg);
ret = 0;
_OVER:
tFreeSVArbSetAssignedLeaderReq(&req);
return ret;
}

View File

@ -66,10 +66,10 @@ class TDTestCase:
tdSql.query('select * from (select tbname, avg(f) from st partition by tbname) a partition by a.tbname order by a.tbname');
tdSql.checkRows(2)
tdSql.checkCols(2)
tdSql.checkData(0, 0, 'ct1');
tdSql.checkData(0, 1, 6.0);
tdSql.checkData(1, 0, 'ct2');
tdSql.checkData(1, 1, 12.0);
tdSql.checkData(0, 0, 'ct1')
tdSql.checkData(0, 1, 6.0)
tdSql.checkData(1, 0, 'ct2')
tdSql.checkData(1, 1, 12.0)
tdSql.error('select tbname from (select * from st)')
tdSql.error('select st.tbname from (select st.tbname from st)')

View File

@ -870,7 +870,7 @@ sql_error select stddev(c2), tbname from select_tags_mt0;
sql_error select twa(c2), tbname from select_tags_mt0;
sql_error select interp(c2), tbname from select_tags_mt0 where ts=100001;
sql_error select t1,t2,tbname from select_tags_mt0 group by tbname;
sql select count(tbname) from select_tags_mt0 interval(1d);
sql select count(tbname) from select_tags_mt0 group by t1;
sql select count(tbname),SUM(T1) from select_tags_mt0 interval(1d);
@ -888,16 +888,16 @@ sql_error select tbname, t1 from select_tags_mt0 interval(1y);
print ==================================>TD-4231
sql select t1,tbname from select_tags_mt0 where c1<0
sql select t1,tbname from select_tags_mt0 where c1<0 and tbname in ('select_tags_tb12')
sql select tbname from select_tags_mt0 where tbname in ('select_tags_tb12');
sql_error select first(c1), last(c2), t1 from select_tags_mt0 group by tbname;
sql_error select first(c1), last(c2), tbname, t2 from select_tags_mt0 group by tbname;
sql_error select first(c1), count(*), t2, t1, tbname from select_tags_mt0 group by tbname;
#valid sql: select first(c1), t2 from select_tags_mt0 group by tbname;
sql select first(ts), tbname from select_tags_mt0 group by tbname;
sql select count(c1) from select_tags_mt0 where c1=99 group by tbname;
sql select count(*),tbname from select_tags_mt0 group by tbname
#sql select first(ts), tbname from select_tags_mt0 group by tbname;
#sql select count(c1) from select_tags_mt0 where c1=99 group by tbname;
#sql select count(*),tbname from select_tags_mt0 group by tbname
print ==================================> tag supported in group
sql select t1,t2,tbname from select_tags_mt0 group by tbname;
sql select first(c1), last(c2), t1 from select_tags_mt0 group by tbname;
sql select first(c1), last(c2), tbname, t2 from select_tags_mt0 group by tbname;
sql select first(c1), count(*), t2, t1, tbname from select_tags_mt0 group by tbname;
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -3,15 +3,24 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database test;
sql create database test KEEP 36500;
sql use test;
sql create table st(ts timestamp, f int) tags(t int);
sql insert into ct1 using st tags(1) values(now, 0)(now+1s, 1)(now+2s, 10)(now+3s, 11)
sql insert into ct2 using st tags(2) values(now+2s, 2)(now+3s, 3)
sql insert into ct3 using st tags(3) values(now+4s, 4)(now+5s, 5)
sql insert into ct4 using st tags(4) values(now+6s, 6)(now+7s, 7)
sql select count(*), spread(ts) from st where tbname='ct1'
$ms = 1712135244502
$ms1 = $ms + 1000
$ms2 = $ms + 2000
$ms3 = $ms + 3000
$ms4 = $ms + 4000
$ms5 = $ms + 5000
$ms6 = $ms + 6000
$ms7 = $ms + 7000
sql insert into ct1 using st tags(1) values($ms , 0)($ms1 , 1)($ms2 , 10)($ms3 , 11)
sql insert into ct2 using st tags(2) values($ms2 , 2)($ms3 , 3)
sql insert into ct3 using st tags(3) values($ms4 , 4)($ms5 , 5)
sql insert into ct4 using st tags(4) values($ms6 , 6)($ms7 , 7)
sql select count(*), spread(ts) from st where tbname='ct1'
print $data00, $data01
if $data00 != @4@ then
return -1

View File

@ -15,6 +15,8 @@ sql use test3;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams3 trigger at_once ignore expired 0 ignore update 0 into streamt3 as select _wstart, count(*) c1 from t1 state_window(a);
sleep 1000
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791213000,2,2,3,1.1);
sql insert into t1 values(1648791215000,3,2,3,1.1);
@ -214,4 +216,232 @@ if $data[29][1] != 2 then
goto loop11
endi
print step2=============
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams4 trigger at_once ignore expired 0 ignore update 0 into streamt4 as select _wstart, first(a), b, c, ta, tb from st interval(1s);
sleep 1000
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791213000,2,3,4,1.1);
sql insert into t2 values(1648791215000,3,4,5,1.1);
sql insert into t2 values(1648791217000,4,5,6,1.1);
$loop_count = 0
loop12:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt4 order by 1;
sql select * from streamt4 order by 1;
if $rows != 4 then
print ======rows=$rows
goto loop12
endi
if $data02 != 2 then
print ======data02=$data02
goto loop12
endi
if $data03 != 3 then
print ======data03=$data03
goto loop12
endi
if $data04 != 1 then
print ======data04=$data04
goto loop12
endi
if $data05 != 1 then
print ======data05=$data05
goto loop12
endi
if $data22 != 4 then
print ======data22=$data22
goto loop12
endi
if $data23 != 5 then
print ======data23=$data23
goto loop12
endi
if $data24 != 2 then
print ======data24=$data24
goto loop12
endi
if $data25 != 2 then
print ======data25=$data25
goto loop12
endi
print step3=============
sql create database test5 vgroups 4;
sql use test5;
sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams5 trigger at_once ignore expired 0 ignore update 0 into streamt5 as select _wstart, b, c, ta, tb, max(b) from t1 interval(1s);
sleep 1000
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791213000,2,3,4,1.1);
sql insert into t1 values(1648791215000,3,4,5,1.1);
sql insert into t1 values(1648791217000,4,5,6,1.1);
$loop_count = 0
loop13:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt5 order by 1;
sql select * from streamt5 order by 1;
if $rows != 4 then
print ======rows=$rows
goto loop13
endi
if $data01 != 2 then
print ======data02=$data02
goto loop13
endi
if $data02 != 3 then
print ======data03=$data03
goto loop13
endi
if $data03 != 1 then
print ======data04=$data04
goto loop13
endi
if $data04 != 1 then
print ======data05=$data05
goto loop13
endi
if $data21 != 4 then
print ======data22=$data22
goto loop13
endi
if $data22 != 5 then
print ======data23=$data23
goto loop13
endi
if $data23 != 1 then
print ======data24=$data24
goto loop13
endi
if $data24 != 1 then
print ======data25=$data25
goto loop13
endi
print step4=============
sql create database test6 vgroups 4;
sql use test6;
sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams6 trigger at_once ignore expired 0 ignore update 0 into streamt6 as select _wstart, b, c,min(c), ta, tb from st interval(1s);
sleep 1000
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791213000,2,3,4,1.1);
sql insert into t2 values(1648791215000,3,4,5,1.1);
sql insert into t2 values(1648791217000,4,5,6,1.1);
$loop_count = 0
loop14:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt6 order by 1;
sql select * from streamt6 order by 1;
if $rows != 4 then
print ======rows=$rows
goto loop14
endi
if $data01 != 2 then
print ======data02=$data02
goto loop14
endi
if $data02 != 3 then
print ======data03=$data03
goto loop14
endi
if $data04 != 1 then
print ======data04=$data04
goto loop14
endi
if $data05 != 1 then
print ======data05=$data05
goto loop14
endi
if $data21 != 4 then
print ======data22=$data22
goto loop14
endi
if $data22 != 5 then
print ======data23=$data23
goto loop14
endi
if $data24 != 2 then
print ======data24=$data24
goto loop14
endi
if $data25 != 2 then
print ======data25=$data25
goto loop14
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -103,6 +103,10 @@ class TDTestCase:
tdSql.checkRows(row)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by tbname')
tdSql.checkRows(row)
tdSql.query(f'select t0, {function_name}(c1),sum(c1) from {self.stbname} partition by tbname')
tdSql.checkRows(row)
tdSql.query(f'select cast(t0 as binary(12)), {function_name}(c1),sum(c1) from {self.stbname} partition by tbname')
tdSql.checkRows(row)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by c1')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by t0')

View File

@ -470,7 +470,9 @@ class TDTestCase:
tdSql.checkRows(40)
# bug need fix
tdSql.query("select tbname , csum(c1), csum(c12) from db.stb1 partition by tbname")
tdSql.query("select tbname , st1, csum(c1), csum(c12) from db.stb1 partition by tbname")
tdSql.checkRows(40)
tdSql.query("select tbname , cast(st1 as binary(24)), csum(c1), csum(c12) from db.stb1 partition by tbname")
tdSql.checkRows(40)
tdSql.query("select tbname , csum(st1) from db.stb1 partition by tbname")
tdSql.checkRows(70)

View File

@ -91,15 +91,71 @@ class TDTestCase:
tdSql.query(f"select t2, t3, c1, count(*) from {self.dbname}.{self.stable} {keyword} by t2, t3, c1 ")
tdSql.checkRows(nonempty_tb_num * self.row_nums)
def test_groupby_sub_table(self):
for i in range(self.tb_nums):
tbname = f"{self.dbname}.sub_{self.stable}_{i}"
ts = self.ts + i*10000
tdSql.query(f"select t1, t2, t3,count(*) from {tbname}")
tdSql.checkRows(1)
tdSql.checkData(0, 1, i)
tdSql.checkData(0, 2, i*10)
tdSql.query(f"select cast(t2 as binary(12)),count(*) from {tbname}")
tdSql.checkRows(1)
tdSql.checkData(0, 0, i)
tdSql.query(f"select t2 + 1, count(*) from {tbname}")
tdSql.checkRows(1)
tdSql.checkData(0, 0, i + 1)
tdSql.query(f"select t1, t2, t3, count(*) from {tbname} group by tbname")
tdSql.checkRows(1)
tdSql.checkData(0, 1, i)
tdSql.checkData(0, 2, i*10)
tdSql.query(f"select t1, t2, t3, count(*) from {tbname} group by tbname, c1, t4")
tdSql.checkData(0, 1, i)
tdSql.checkData(0, 2, i*10)
tdSql.query(f"select t1, t2, t3, count(*) from {tbname} partition by tbname")
tdSql.checkRows(1)
tdSql.checkData(0, 1, i)
tdSql.checkData(0, 2, i*10)
tdSql.query(f"select t1, t2, t3, count(*) from {tbname} partition by c1, tbname")
tdSql.checkData(0, 1, i)
tdSql.checkData(0, 2, i*10)
tdSql.query(f"select t1, t2, t3, count(*) from {self.dbname}.{self.stable} partition by c1, tbname order by tbname desc")
tdSql.checkRows(50)
tdSql.checkData(0, 1, 4)
tdSql.checkData(0, 2, 40)
def test_multi_group_key(self, check_num, nonempty_tb_num):
# multi tag/tbname
tdSql.query(f"select t2, t3, tbname, count(*) from {self.dbname}.{self.stable} group by t2, t3, tbname")
tdSql.checkRows(check_num)
tdSql.query(f"select cast(t2 as binary(12)), count(*) from {self.dbname}.{self.stable} group by t2, t3, tbname")
tdSql.checkRows(check_num)
tdSql.query(f"select t2, t3, tbname, count(*) from {self.dbname}.{self.stable} partition by t2, t3, tbname")
tdSql.checkRows(check_num)
tdSql.query(f"select t2, t3, tbname, count(*) from {self.dbname}.{self.stable} group by tbname order by tbname asc")
tdSql.checkRows(check_num)
tdSql.checkData(0, 0, 0)
tdSql.checkData(1, 0, 1)
tdSql.checkData(2, 1, 20)
tdSql.checkData(3, 1, 30)
tdSql.query(f"select t2, t3, tbname, count(*) from {self.dbname}.{self.stable} partition by tbname order by tbname asc")
tdSql.checkRows(check_num)
tdSql.checkData(0, 0, 0)
tdSql.checkData(2, 1, 20)
tdSql.checkData(3, 1, 30)
# multi tag + col
tdSql.query(f"select t1, t2, c1, count(*) from {self.dbname}.{self.stable} partition by t1, t2, c1 ")
tdSql.checkRows(nonempty_tb_num * self.row_nums)
@ -222,12 +278,14 @@ class TDTestCase:
self.test_groupby('group', self.tb_nums, nonempty_tb_num)
self.test_groupby('partition', self.tb_nums, nonempty_tb_num)
self.test_groupby_sub_table()
self.test_innerSelect(self.tb_nums)
self.test_multi_group_key(self.tb_nums, nonempty_tb_num)
self.test_multi_agg(self.tb_nums, nonempty_tb_num)
self.test_window(nonempty_tb_num)
self.test_event_window(nonempty_tb_num)
## test old version before changed
# self.test_groupby('group', 0, 0)
# self.insert_db(5, self.row_nums)

View File

@ -0,0 +1,130 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import random
import string
import sys
import taos
from util.common import *
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.rowNum = 10
self.tbnum = 20
self.ts = 1537146000000
self.binary_str = 'taosdata'
self.nchar_str = '涛思数据'
def first_check_base(self):
dbname = "db"
tdSql.prepare(dbname)
column_dict = {
'col1': 'tinyint',
'col2': 'smallint',
'col3': 'int',
'col4': 'bigint',
'col5': 'tinyint unsigned',
'col6': 'smallint unsigned',
'col7': 'int unsigned',
'col8': 'bigint unsigned',
'col9': 'float',
'col10': 'double',
'col11': 'bool',
'col12': 'binary(20)',
'col13': 'nchar(20)'
}
tdSql.execute(f"alter local \'keepColumnName\' \'1\'")
tdSql.execute(f'''create table {dbname}.stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 tinyint unsigned, col6 smallint unsigned,
col7 int unsigned, col8 bigint unsigned, col9 float, col10 double, col11 bool, col12 binary(20), col13 nchar(20)) tags(loc nchar(20))''')
tdSql.execute(f"create table {dbname}.stb_1 using {dbname}.stb tags('beijing')")
tdSql.execute(f"create table {dbname}.stb_2 using {dbname}.stb tags('beijing')")
column_list = ['col1','col2','col3','col4','col5','col6','col7','col8','col9','col10','col11','col12','col13']
for i in column_list:
for j in ['stb_1']:
tdSql.query(f"select first({i}) from {dbname}.{j}")
tdSql.checkRows(0)
for n in range(self.rowNum):
i = n
tdSql.execute(f"insert into {dbname}.stb_1 values(%d, %d, %d, %d, %d, %d, %d, %d, %d, %f, %f, %d, '{self.binary_str}%d', '{self.nchar_str}%d')"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1))
for n in range(self.rowNum):
i = n + 10
tdSql.execute(f"insert into {dbname}.stb_1 values(%d, %d, %d, %d, %d, %d, %d, %d, %d, %f, %f, %d, '{self.binary_str}%d', '{self.nchar_str}%d')"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1))
for n in range(self.rowNum):
i = n + 100
tdSql.execute(f"insert into {dbname}.stb_2 values(%d, %d, %d, %d, %d, %d, %d, %d, %d, %f, %f, %d, '{self.binary_str}%d', '{self.nchar_str}%d')"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1))
for k, v in column_dict.items():
if v == 'tinyint' or v == 'smallint' or v == 'int' or v == 'bigint' or v == 'tinyint unsigned' or v == 'smallint unsigned'\
or v == 'int unsigned' or v == 'bigint unsigned':
tdSql.query(f"select last({k})-first({k}) from {dbname}.stb")
tdSql.checkData(0, 0, 109)
tdSql.query(f"select first({k})+last({k}) from {dbname}.stb")
tdSql.checkData(0, 0, 111)
tdSql.query(f"select max({k})-first({k}) from {dbname}.stb")
tdSql.checkData(0, 0, 109)
tdSql.query(f"select max({k})-min({k}) from {dbname}.stb")
tdSql.checkData(0, 0, 109)
tdSql.query(f"select last({k})-first({k}) from {dbname}.stb_1")
tdSql.checkData(0, 0, 19)
tdSql.query(f"select first({k})+last({k}) from {dbname}.stb_1")
tdSql.checkData(0, 0, 21)
tdSql.query(f"select max({k})-first({k}) from {dbname}.stb_1")
tdSql.checkData(0, 0, 19)
tdSql.query(f"select max({k})-min({k}) from {dbname}.stb_1")
tdSql.checkData(0, 0, 19)
# float,double
elif v == 'float' or v == 'double':
tdSql.query(f"select first({k})+last({k}) from {dbname}.stb")
tdSql.checkData(0, 0, 109.2)
tdSql.query(f"select first({k})+last({k}) from {dbname}.stb_1")
tdSql.checkData(0, 0, 19.2)
# bool
elif v == 'bool':
continue
# binary
elif 'binary' in v:
continue
# nchar
elif 'nchar' in v:
continue
#tdSql.execute(f'drop database {dbname}')
def run(self):
self.first_check_base()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())