Merge branch '3.0' into feature/TD-11274-3.0

This commit is contained in:
Cary Xu 2022-07-02 16:05:24 +08:00
commit 6f11d625b0
42 changed files with 1337 additions and 602 deletions

View File

@ -20,6 +20,12 @@
extern "C" {
#endif
// If the error is in a third-party library, place this header file under the third-party library header file.
// When you want to use this feature, you should find or add the same function in the following sectio
#ifndef ALLOW_FORBID_FUNC
#define qsort QSORT_FUNC_TAOS_FORBID
#endif
#define TPOW2(x) ((x) * (x))
#define TABS(x) ((x) > 0 ? (x) : -(x))

View File

@ -67,7 +67,7 @@ bool taosMbsToUcs4(const char *mbs, size_t mbs_len, TdUcs4 *ucs4, int32_t ucs
int32_t tasoUcs4Compare(TdUcs4 *f1_ucs4, TdUcs4 *f2_ucs4, int32_t bytes);
TdUcs4* tasoUcs4Copy(TdUcs4 *target_ucs4, TdUcs4 *source_ucs4, int32_t len_ucs4);
bool taosValidateEncodec(const char *encodec);
int32_t taosHexEncode(const char *src, char *dst, int32_t len);
int32_t taosHexEncode(const unsigned char *src, char *dst, int32_t len);
int32_t taosHexDecode(const char *src, char *dst, int32_t len);
int32_t taosWcharWidth(TdWchar wchar);

View File

@ -21,7 +21,7 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
taosThreadRwlockRdlock(&pMgmt->lock);
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL) {
if (pVnode == NULL || pVnode->dropped) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
} else {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
@ -81,16 +81,18 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
vmReleaseVnode(pMgmt, pVnode);
while (pVnode->refCount > 0) taosMsleep(10);
dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId);
while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
dTrace("vgId:%d, vnode-fetch queue is empty", pVnode->vgId);
vmFreeQueue(pMgmt, pVnode);
vnodeClose(pVnode->pImpl);
pVnode->pImpl = NULL;
dDebug("vgId:%d, vnode is closed", pVnode->vgId);
if (pVnode->dropped) {

View File

@ -146,8 +146,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) {
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s", pHead->vgId, pMsg, terrstr(),
TMSG_INFO(pMsg->msgType));
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, msgtype:%s qtype:%d", pHead->vgId, pMsg,
terrstr(), TMSG_INFO(pMsg->msgType), qtype);
return terrno != 0 ? terrno : -1;
}

View File

@ -556,7 +556,7 @@ typedef struct {
int64_t uid;
int8_t status;
// config
int8_t dropPolicy;
int8_t igExpired;
int8_t trigger;
int64_t triggerParam;
int64_t watermark;

View File

@ -38,6 +38,8 @@ int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
#ifdef __cplusplus
}
#endif

View File

@ -21,6 +21,7 @@
#include "mndShow.h"
#include "mndSma.h"
#include "mndStb.h"
#include "mndStream.h"
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
@ -927,6 +928,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropStreamByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropSmasByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER;
@ -947,7 +949,6 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
mndTransSetRpcRsp(pTrans, pRsp, rspLen);
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER:

View File

@ -28,7 +28,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->dropPolicy) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
@ -73,7 +73,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->dropPolicy) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;

View File

@ -523,6 +523,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
streamObj.sourceDbUid = pDb->uid;
streamObj.targetDbUid = pDb->uid;
streamObj.version = 1;
streamObj.sql = pCreate->sql;
streamObj.smaId = smaObj.uid;
@ -854,35 +855,25 @@ _OVER:
int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
SSmaObj *pSma = NULL;
void *pIter = NULL;
SVgObj *pVgroup = NULL;
int32_t code = -1;
while (1) {
SSmaObj *pSma = NULL;
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
if (pIter == NULL) break;
if (pSma->dbUid == pDb->uid) {
pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
if (pVgroup == NULL) goto _OVER;
if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
mndReleaseVgroup(pMnode, pVgroup);
pVgroup = NULL;
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) {
sdbRelease(pSdb, pSma);
sdbCancelFetch(pSdb, pSma);
return -1;
}
}
sdbRelease(pSdb, pSma);
}
code = 0;
_OVER:
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pSma);
mndReleaseVgroup(pMnode, pVgroup);
return code;
return 0;
}
static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {

View File

@ -248,7 +248,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->status = 0;
// TODO
pObj->dropPolicy = 0;
pObj->igExpired = pCreate->igExpired;
pObj->trigger = pCreate->triggerType;
pObj->triggerParam = pCreate->maxDelay;
pObj->watermark = pCreate->watermark;
@ -301,6 +301,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
.streamQuery = true,
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
.watermark = pObj->watermark,
.igExpired = pObj->igExpired,
};
// using ast and param to build physical plan
@ -673,27 +674,29 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SStreamObj *pStream = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
if (pStream->sourceDbUid != pStream->targetDbUid) {
sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
return -1;
} else {
// TODO drop all task on snode
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
return -1;
}
}
} else {
sdbRelease(pSdb, pStream);
continue;
}
#if 0

View File

@ -90,7 +90,7 @@ static int32_t mndTransGetActionsSize(SArray *pArray) {
for (int32_t i = 0; i < actionNum; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
if (pAction->actionType == TRANS_ACTION_RAW) {
rawDataLen += (sdbGetRawTotalSize(pAction->pRaw) + sizeof(int32_t));
rawDataLen += (sizeof(STransAction) + sdbGetRawTotalSize(pAction->pRaw));
} else if (pAction->actionType == TRANS_ACTION_MSG) {
rawDataLen += (sizeof(STransAction) + pAction->contLen);
} else {
@ -105,7 +105,7 @@ static int32_t mndTransGetActionsSize(SArray *pArray) {
static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE;
int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE + pTrans->paramLen;
rawDataLen += mndTransGetActionsSize(pTrans->redoActions);
rawDataLen += mndTransGetActionsSize(pTrans->undoActions);
rawDataLen += mndTransGetActionsSize(pTrans->commitActions);
@ -226,7 +226,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
_OVER:
if (terrno != 0) {
mError("trans:%d, failed to encode to raw:%p len:%d since %s", pTrans->id, pRaw, dataPos, terrstr());
mError("trans:%d, failed to encode to raw:%p maxlen:%d len:%d since %s", pTrans->id, pRaw, sdbGetRawTotalSize(pRaw),
dataPos, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
@ -1025,7 +1026,7 @@ static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction
pTrans->lastAction = pAction->id;
pTrans->lastMsgType = pAction->msgType;
pTrans->lastEpset = pAction->epSet;
pTrans->lastErrorNo == 0;
pTrans->lastErrorNo = 0;
return 0;
}

View File

@ -183,13 +183,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
} else {
ASSERT(0);
}
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
if (pOffset == NULL || pOffset->val.version < offset.val.version) {
/*STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);*/
/*if (pOffset != NULL) {*/
/*if (pOffset->val.type == TMQ_OFFSET__LOG && pOffset->val.version < offset.val.version) {*/
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
ASSERT(0);
return -1;
}
}
/*}*/
/*}*/
return 0;
}
@ -375,8 +377,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosMemoryFree(pCkHead);
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqInfo("retrieve using snapshot req offset: uid %ld ts %ld, actual offset: uid %ld ts %ld", dataRsp.reqOffset.uid,
dataRsp.reqOffset.ts, fetchOffsetNew.uid, fetchOffsetNew.ts);
tqInfo("retrieve using snapshot actual offset: uid %ld ts %ld", fetchOffsetNew.uid, fetchOffsetNew.ts);
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
ASSERT(0);
}

View File

@ -120,7 +120,9 @@ bool tqNextDataBlock(SStreamReader* pHandle) {
return true;
}
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t));
/*tqDebug("search uid %ld", pHandle->msgIter.uid);*/
if (ret != NULL) {
/*tqDebug("find uid %ld", pHandle->msgIter.uid);*/
return true;
}
}

View File

@ -185,5 +185,7 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
.contLen = ntohl(pReq->length),
};
ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0);
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqDebug("failed to put into write-queue since %s", terrstr());
}
}

View File

@ -356,6 +356,7 @@ typedef struct SStreamBlockScanInfo {
SUpdateInfo* pUpdateInfo;
EStreamScanMode scanMode;
SOperatorInfo* pStreamScanOp;
SOperatorInfo* pSnapshotReadOp;
SArray* childIds;
SessionWindowSupporter sessionSup;
@ -427,7 +428,7 @@ typedef struct SIntervalAggOperatorInfo {
STimeWindowAggSupp twAggSup;
bool invertible;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
bool ignoreCloseWindow;
bool ignoreExpiredData;
} SIntervalAggOperatorInfo;
typedef struct SStreamFinalIntervalOperatorInfo {
@ -449,7 +450,7 @@ typedef struct SStreamFinalIntervalOperatorInfo {
SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex;
SSDataBlock* pPullDataRes;
bool ignoreCloseWindow;
bool ignoreExpiredData;
} SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
@ -587,7 +588,7 @@ typedef struct SStreamSessionAggOperatorInfo {
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreCloseWindow;
bool ignoreExpiredData;
} SStreamSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo {
@ -631,7 +632,7 @@ typedef struct SStreamStateAggOperatorInfo {
void* pDelIterator;
SArray* pScanWindow;
SArray* pChildren; // cache for children's result;
bool ignoreCloseWindow;
bool ignoreExpiredData;
} SStreamStateAggOperatorInfo;
typedef struct SSortedMergeOperatorInfo {

View File

@ -145,10 +145,12 @@ static SArray* filterQualifiedChildTables(const SStreamBlockScanInfo* pScanInfo,
continue;
}
// TODO handle ntb case
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) {
continue;
}
// TODO handle ntb case
/*pScanInfo->pStreamScanOp->pTaskInfo->tableqinfoList.*/
// handle multiple partition
taosArrayPush(qa, id);
}

View File

@ -2027,8 +2027,9 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, GET_TASKID(pTaskInfo),
pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId, sourceIndex, totalSources);
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId,
sourceIndex, totalSources);
pMsg->header.vgId = htonl(pSource->addr.nodeId);
pMsg->sId = htobe64(pSource->schedId);
@ -2163,8 +2164,8 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
SSDataBlock* pRes = pExchangeInfo->pResult;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", completed:%d try next %d/%" PRIzu,
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
@ -2183,18 +2184,19 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
}
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d"
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d"
" index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64
", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRes->info.rows, pDataInfo->totalRows,
pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRes->info.rows,
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
completed += 1;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows, pLoadInfo->totalRows,
pLoadInfo->totalSize);
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
pLoadInfo->totalRows, pLoadInfo->totalSize);
}
taosMemoryFreeClear(pDataInfo->pRsp);
@ -2267,8 +2269,8 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
if (pDataInfo->code != TSDB_CODE_SUCCESS) {
qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo), pSource->addr.nodeId,
pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
pOperator->pTaskInfo->code = pDataInfo->code;
return NULL;
}
@ -2276,8 +2278,8 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
" try next",
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 " try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
pDataInfo->totalRows, pLoadInfo->totalRows);
@ -2296,16 +2298,17 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows, pDataInfo->totalRows,
pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources);
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
totalSources);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
pExchangeInfo->current += 1;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows, pLoadInfo->totalRows,
pLoadInfo->totalSize);
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
pLoadInfo->totalRows, pLoadInfo->totalSize);
}
pOperator->resultInfo.totalRows += pRes->info.rows;

View File

@ -1031,10 +1031,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
} else {
if (isStateWindow(pInfo) && taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) {
if (isStateWindow(pInfo)) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (!prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex)) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
}
}
if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
@ -1274,6 +1276,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
pInfo->groupId = 0;
pInfo->pPullDataRes = createPullDataBlock();
pInfo->pStreamScanOp = pOperator;
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;

View File

@ -838,7 +838,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, &pInfo->win);
int32_t ret = TSDB_CODE_SUCCESS;
if (!pInfo->ignoreCloseWindow || !isCloseWindow(&win, &pInfo->twAggSup)) {
if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) {
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
@ -871,7 +871,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
}
if (!pInfo->ignoreCloseWindow || !isCloseWindow(&win, &pInfo->twAggSup)) {
if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) {
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
pBlock->info.rows, numOfOutput, pInfo->order);
@ -886,7 +886,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (startPos < 0) {
break;
}
if (pInfo->ignoreCloseWindow && isCloseWindow(&nextWin, &pInfo->twAggSup)) {
if (pInfo->ignoreExpiredData && isCloseWindow(&nextWin, &pInfo->twAggSup)) {
ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
@ -1535,7 +1535,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->interval = *pInterval;
pInfo->execModel = pTaskInfo->execModel;
pInfo->twAggSup = *pTwAggSupp;
pInfo->ignoreCloseWindow = false;
pInfo->ignoreExpiredData = pPhyNode->window.igExpired;
if (pPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
@ -2292,7 +2292,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
pInfo->interval.precision, NULL);
while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if (pInfo->ignoreCloseWindow && isClosed) {
if (pInfo->ignoreExpiredData && isClosed) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
break;
@ -2710,7 +2710,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
pInfo->pPullDataRes = createPullDataBlock();
pInfo->ignoreCloseWindow = false;
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true;
@ -2857,7 +2857,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->pChildren = NULL;
pInfo->isFinal = false;
pInfo->pPhyNode = pPhyNode;
pInfo->ignoreCloseWindow = false;
pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
pOperator->name = "StreamSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
@ -3133,7 +3133,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
for (int32_t i = 0; i < pSDataBlock->info.rows;) {
if (pInfo->ignoreCloseWindow && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
i++;
continue;
}
@ -3413,8 +3413,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getResWinForSession, pInfo->ignoreCloseWindow);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow);
getResWinForSession, pInfo->ignoreExpiredData);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
copyUpdateResult(pStUpdated, pUpdated);
taosHashCleanup(pStUpdated);
@ -3822,7 +3822,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
for (int32_t i = 0; i < pSDataBlock->info.rows; i += winRows) {
if (pInfo->ignoreCloseWindow && isOverdue(tsCols[i], &pInfo->twAggSup)) {
if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) {
i++;
continue;
}
@ -3866,12 +3866,14 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state");
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
printDataBlock(pBInfo->pRes, "single state");
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
@ -3884,6 +3886,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if (pBlock == NULL) {
break;
}
printDataBlock(pBlock, "single state recv");
if (pBlock->info.type == STREAM_CLEAR) {
doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId,
@ -3903,8 +3906,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getResWinForState, pInfo->ignoreCloseWindow);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow);
getResWinForState, pInfo->ignoreExpiredData);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
copyUpdateResult(pSeUpdated, pUpdated);
taosHashCleanup(pSeUpdated);
@ -3914,9 +3917,11 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state");
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
printDataBlock(pBInfo->pRes, "single state");
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
@ -3978,7 +3983,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pDelRes->info.type = STREAM_DELETE;
blockDataEnsureCapacity(pInfo->pDelRes, 64);
pInfo->pChildren = NULL;
pInfo->ignoreCloseWindow = false;
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
pOperator->name = "StreamStateAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;

View File

@ -39,6 +39,174 @@ static int32_t invaildFuncParaValueErrMsg(char* pErrBuf, int32_t len, const char
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid parameter value : %s", pFuncName);
}
#define TIME_UNIT_INVALID 1
#define TIME_UNIT_TOO_SMALL 2
static int32_t validateTimeUnitParam(uint8_t dbPrec, const SValueNode* pVal) {
if (!pVal->isDuration) {
return TIME_UNIT_INVALID;
}
if (TSDB_TIME_PRECISION_MILLI == dbPrec && 0 == strcasecmp(pVal->literal, "1u")) {
return TIME_UNIT_TOO_SMALL;
}
if (pVal->literal[0] != '1' || (pVal->literal[1] != 'u' && pVal->literal[1] != 'a' &&
pVal->literal[1] != 's' && pVal->literal[1] != 'm' &&
pVal->literal[1] != 'h' && pVal->literal[1] != 'd' &&
pVal->literal[1] != 'w')) {
return TIME_UNIT_INVALID;
}
return TSDB_CODE_SUCCESS;
}
/* Following are valid ISO-8601 timezone format:
* 1 z/Z
* 2 ±hh:mm
* 3 ±hhmm
* 4 ±hh
*
*/
static bool validateHourRange(int8_t hour) {
if (hour < 0 || hour > 12) {
return false;
}
return true;
}
static bool validateMinuteRange(int8_t hour, int8_t minute, char sign) {
if (minute == 0 || (minute == 30 && (hour == 3 || hour == 5) && sign == '+')) {
return true;
}
return false;
}
static bool validateTimestampDigits(const SValueNode* pVal) {
if (!IS_INTEGER_TYPE(pVal->node.resType.type)) {
return false;
}
int64_t tsVal = pVal->datum.i;
char fraction[20] = {0};
NUM_TO_STRING(pVal->node.resType.type, &tsVal, sizeof(fraction), fraction);
int32_t tsDigits = (int32_t)strlen(fraction);
if (tsDigits > TSDB_TIME_PRECISION_SEC_DIGITS) {
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS || tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS ||
tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
return true;
} else {
return false;
}
}
return true;
}
static bool validateTimezoneFormat(const SValueNode* pVal) {
if (TSDB_DATA_TYPE_BINARY != pVal->node.resType.type) {
return false;
}
char* tz = varDataVal(pVal->datum.p);
int32_t len = varDataLen(pVal->datum.p);
char buf[3] = {0};
int8_t hour = -1, minute = -1;
if (len == 0) {
return false;
} else if (len == 1 && (tz[0] == 'z' || tz[0] == 'Z')) {
return true;
} else if ((tz[0] == '+' || tz[0] == '-')) {
switch (len) {
case 3:
case 5: {
for (int32_t i = 1; i < len; ++i) {
if (!isdigit(tz[i])) {
return false;
}
if (i == 2) {
memcpy(buf, &tz[i - 1], 2);
hour = taosStr2Int8(buf, NULL, 10);
if (!validateHourRange(hour)) {
return false;
}
} else if (i == 4) {
memcpy(buf, &tz[i - 1], 2);
minute = taosStr2Int8(buf, NULL, 10);
if (!validateMinuteRange(hour, minute, tz[0])) {
return false;
}
}
}
break;
}
case 6: {
for (int32_t i = 1; i < len; ++i) {
if (i == 3) {
if (tz[i] != ':') {
return false;
}
continue;
}
if (!isdigit(tz[i])) {
return false;
}
if (i == 2) {
memcpy(buf, &tz[i - 1], 2);
hour = taosStr2Int8(buf, NULL, 10);
if (!validateHourRange(hour)) {
return false;
}
} else if (i == 5) {
memcpy(buf, &tz[i - 1], 2);
minute = taosStr2Int8(buf, NULL, 10);
if (!validateMinuteRange(hour, minute, tz[0])) {
return false;
}
}
}
break;
}
default: {
return false;
}
}
} else {
return false;
}
return true;
}
void static addTimezoneParam(SNodeList* pList) {
char buf[6] = {0};
time_t t = taosTime(NULL);
struct tm* tmInfo = taosLocalTime(&t, NULL);
strftime(buf, sizeof(buf), "%z", tmInfo);
int32_t len = (int32_t)strlen(buf);
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
pVal->literal = strndup(buf, len);
pVal->isDuration = false;
pVal->translate = true;
pVal->node.resType.type = TSDB_DATA_TYPE_BINARY;
pVal->node.resType.bytes = len + VARSTR_HEADER_SIZE;
pVal->node.resType.precision = TSDB_TIME_PRECISION_MILLI;
pVal->datum.p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE + 1);
varDataSetLen(pVal->datum.p, len);
strncpy(varDataVal(pVal->datum.p), pVal->literal, len);
nodesListAppend(pList, (SNode*)pVal);
}
void static addDbPrecisonParam(SNodeList** pList, uint8_t precision) {
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
pVal->literal = NULL;
@ -525,9 +693,15 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (pValue->datum.i == 0) {
uint8_t dbPrec = pFunc->node.resType.precision;
int32_t ret = validateTimeUnitParam(dbPrec, (SValueNode *)nodesListGetNode(pFunc->pParameterList, 1));
if (ret == TIME_UNIT_TOO_SMALL) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"ELAPSED function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"ELAPSED function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
}
}
@ -843,6 +1017,19 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (numOfParams == 4) {
uint8_t dbPrec = pFunc->node.resType.precision;
int32_t ret = validateTimeUnitParam(dbPrec, (SValueNode *)nodesListGetNode(pFunc->pParameterList, 3));
if (ret == TIME_UNIT_TOO_SMALL) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"STATEDURATION function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"STATEDURATION function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
}
}
// set result type
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS;
@ -1284,152 +1471,6 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return TSDB_CODE_SUCCESS;
}
/* Following are valid ISO-8601 timezone format:
* 1 z/Z
* 2 ±hh:mm
* 3 ±hhmm
* 4 ±hh
*
*/
static bool validateHourRange(int8_t hour) {
if (hour < 0 || hour > 12) {
return false;
}
return true;
}
static bool validateMinuteRange(int8_t hour, int8_t minute, char sign) {
if (minute == 0 || (minute == 30 && (hour == 3 || hour == 5) && sign == '+')) {
return true;
}
return false;
}
static bool validateTimestampDigits(const SValueNode* pVal) {
if (!IS_INTEGER_TYPE(pVal->node.resType.type)) {
return false;
}
int64_t tsVal = pVal->datum.i;
char fraction[20] = {0};
NUM_TO_STRING(pVal->node.resType.type, &tsVal, sizeof(fraction), fraction);
int32_t tsDigits = (int32_t)strlen(fraction);
if (tsDigits > TSDB_TIME_PRECISION_SEC_DIGITS) {
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS || tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS ||
tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
return true;
} else {
return false;
}
}
return true;
}
static bool validateTimezoneFormat(const SValueNode* pVal) {
if (TSDB_DATA_TYPE_BINARY != pVal->node.resType.type) {
return false;
}
char* tz = varDataVal(pVal->datum.p);
int32_t len = varDataLen(pVal->datum.p);
char buf[3] = {0};
int8_t hour = -1, minute = -1;
if (len == 0) {
return false;
} else if (len == 1 && (tz[0] == 'z' || tz[0] == 'Z')) {
return true;
} else if ((tz[0] == '+' || tz[0] == '-')) {
switch (len) {
case 3:
case 5: {
for (int32_t i = 1; i < len; ++i) {
if (!isdigit(tz[i])) {
return false;
}
if (i == 2) {
memcpy(buf, &tz[i - 1], 2);
hour = taosStr2Int8(buf, NULL, 10);
if (!validateHourRange(hour)) {
return false;
}
} else if (i == 4) {
memcpy(buf, &tz[i - 1], 2);
minute = taosStr2Int8(buf, NULL, 10);
if (!validateMinuteRange(hour, minute, tz[0])) {
return false;
}
}
}
break;
}
case 6: {
for (int32_t i = 1; i < len; ++i) {
if (i == 3) {
if (tz[i] != ':') {
return false;
}
continue;
}
if (!isdigit(tz[i])) {
return false;
}
if (i == 2) {
memcpy(buf, &tz[i - 1], 2);
hour = taosStr2Int8(buf, NULL, 10);
if (!validateHourRange(hour)) {
return false;
}
} else if (i == 5) {
memcpy(buf, &tz[i - 1], 2);
minute = taosStr2Int8(buf, NULL, 10);
if (!validateMinuteRange(hour, minute, tz[0])) {
return false;
}
}
}
break;
}
default: {
return false;
}
}
} else {
return false;
}
return true;
}
void static addTimezoneParam(SNodeList* pList) {
char buf[6] = {0};
time_t t = taosTime(NULL);
struct tm* tmInfo = taosLocalTime(&t, NULL);
strftime(buf, sizeof(buf), "%z", tmInfo);
int32_t len = (int32_t)strlen(buf);
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
pVal->literal = strndup(buf, len);
pVal->isDuration = false;
pVal->translate = true;
pVal->node.resType.type = TSDB_DATA_TYPE_BINARY;
pVal->node.resType.bytes = len + VARSTR_HEADER_SIZE;
pVal->node.resType.precision = TSDB_TIME_PRECISION_MILLI;
pVal->datum.p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE + 1);
varDataSetLen(pVal->datum.p, len);
strncpy(varDataVal(pVal->datum.p), pVal->literal, len);
nodesListAppend(pList, (SNode*)pVal);
}
static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (1 != numOfParams && 2 != numOfParams) {
@ -1498,6 +1539,16 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_
// add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
int32_t ret = validateTimeUnitParam(dbPrec, (SValueNode *)nodesListGetNode(pFunc->pParameterList, 1));
if (ret == TIME_UNIT_TOO_SMALL) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMETRUNCATE function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMETRUNCATE function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
}
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
pFunc->node.resType =
@ -1526,6 +1577,18 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
// add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
if (3 == numOfParams) {
int32_t ret = validateTimeUnitParam(dbPrec, (SValueNode *)nodesListGetNode(pFunc->pParameterList, 2));
if (ret == TIME_UNIT_TOO_SMALL) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMEDIFF function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMEDIFF function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
}
}
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};

View File

@ -35,7 +35,7 @@ if (${BUILD_WITH_INVERTEDINDEX})
endif(${BUILD_WITH_INVERTEDINDEX})
if (${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
# if (${BUILD_TEST})
# add_subdirectory(test)
# endif(${BUILD_TEST})

View File

@ -196,7 +196,7 @@ int32_t taosUcs4len(TdUcs4 *ucs4) {
}
//dst buffer size should be at least 2*len + 1
int32_t taosHexEncode(const char *src, char *dst, int32_t len) {
int32_t taosHexEncode(const unsigned char *src, char *dst, int32_t len) {
if (!dst) {
return -1;
}

View File

@ -663,6 +663,12 @@ class TDCom:
else:
tdLog.exit(f"getOneRow out of range: row_index={location} row_count={self.query_row}")
def killProcessor(self, processorName):
if (platform.system().lower() == 'windows'):
os.system("TASKKILL /F /IM %s.exe"%processorName)
else:
os.system('pkill %s'%processorName)
def is_json(msg):
if isinstance(msg, str):

View File

@ -93,6 +93,7 @@
./test.sh -f tsim/stream/basic0.sim
./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/drop_stream.sim
./test.sh -f tsim/stream/distributeInterval0.sim
# ./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
# ./test.sh -f tsim/stream/distributesession0.sim
@ -159,6 +160,7 @@
#./test.sh -f tsim/mnode/basic1.sim -m
# --- sma
./test.sh -f tsim/sma/drop_sma.sim
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim

View File

@ -135,7 +135,7 @@ echo "jniDebugFlag 143" >> $TAOS_CFG
echo "qDebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "uDebugFlag 143" >> $TAOS_CFG
echo "uDebugFlag 131" >> $TAOS_CFG
echo "sDebugFlag 143" >> $TAOS_CFG
echo "wDebugFlag 143" >> $TAOS_CFG
echo "numOfLogLines 20000000" >> $TAOS_CFG

View File

@ -64,12 +64,59 @@ sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) in
print --> drop stb
sql drop table stb;
print ========== step5
sql drop database if exists db;
sql create database db duration 300;
sql use db;
sql create table stb1(ts timestamp, c_int int, c_bint bigint, c_sint smallint, c_tint tinyint, c_float float, c_double double, c_bool bool, c_binary binary(16), c_nchar nchar(32), c_ts timestamp, c_tint_un tinyint unsigned, c_sint_un smallint unsigned, c_int_un int unsigned, c_bint_un bigint unsigned) tags (t_int int);
sql CREATE SMA INDEX sma_index_1 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) watermark 5s;
print ========== step6 repeat
sql drop database if exists db;
sql create database db duration 300;
sql use db;
sql create table stb1(ts timestamp, c_int int, c_bint bigint ) tags (t_int int);
sql CREATE SMA INDEX sma_index_1 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) watermark 5s;
print ========== step7
sql drop database if exists db;
sql create database db duration 300;
sql use db;
sql create table stb1(ts timestamp, c_int int, c_bint bigint, c_sint smallint, c_tint tinyint,c_float float, c_double double, c_bool bool,c_binary binary(16), c_nchar nchar(32), c_ts timestamp,c_tint_un tinyint unsigned, c_sint_un smallint unsigned,c_int_un int unsigned, c_bint_un bigint unsigned) tags (t_int int);
sql create table ct1 using stb1 tags ( 1 );
sql create table ct2 using stb1 tags ( 2 );
sql create table ct3 using stb1 tags ( 3 );
sql create table ct4 using stb1 tags ( 4 );
sql CREATE SMA INDEX sma_index_1 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) watermark 5s;
sql CREATE SMA INDEX sma_index_2 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) max_delay 6m;
sql CREATE SMA INDEX sma_index_3 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) watermark 5s max_delay 6m;
sql DROP INDEX sma_index_1 ;
sql DROP INDEX sma_index_2 ;
sql DROP INDEX sma_index_3 ;
print ========== step8
sql drop database if exists db;
sql create database db duration 300;
sql use db;
sql create table stb1(ts timestamp, c_int int, c_bint bigint, c_sint smallint, c_tint tinyint,c_float float, c_double double, c_bool bool,c_binary binary(16), c_nchar nchar(32), c_ts timestamp,c_tint_un tinyint unsigned, c_sint_un smallint unsigned,c_int_un int unsigned, c_bint_un bigint unsigned) tags (t_int int);
sql create table ct1 using stb1 tags ( 1 );
sql create table ct2 using stb1 tags ( 2 );
sql create table ct3 using stb1 tags ( 3 );
sql create table ct4 using stb1 tags ( 4 );
sql CREATE SMA INDEX sma_index_1 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) watermark 5s;
sql CREATE SMA INDEX sma_index_2 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) sliding(6m) max_delay 6m;
sql CREATE SMA INDEX sma_index_3 ON stb1 function(min(c_int), max(c_int)) interval(6m, 10s) watermark 5s max_delay 6m;
sql DROP INDEX sma_index_1 ;
sql DROP INDEX sma_index_2 ;
sql DROP INDEX sma_index_3 ;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s stop -x SIGINT
system sh/exec.sh -n dnode6 -s stop -x SIGINT
system sh/exec.sh -n dnode7 -s stop -x SIGINT
system sh/exec.sh -n dnode8 -s stop -x SIGINT

View File

@ -83,9 +83,9 @@ sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0)
$loop_count = 0
loop1:
sleep 300
sql select * from streamtST1;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1

View File

@ -10,6 +10,30 @@ sql create dnode $hostname2 port 7200
system sh/exec.sh -n dnode2 -s start
print ===== step1
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
print ===== step2
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);

View File

@ -0,0 +1,222 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/cfg.sh -n dnode2 -c supportVnodes -v 4
system sh/cfg.sh -n dnode3 -c supportVnodes -v 4
print ========== step1
system sh/exec.sh -n dnode1 -s start
sql connect
print ========== step2
sql create dnode $hostname port 7200
system sh/exec.sh -n dnode2 -s start
$x = 0
step2:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step2
endi
if $data(2)[4] != ready then
goto step2
endi
print ========== step3
sql drop database if exists test;
sql create database if not exists test vgroups 1 precision "ms" ;
sql use test;
sql create table test.scalar_function_stb (ts timestamp, c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 tinyint unsigned, c6 smallint unsigned, c7 int unsigned, c8 bigint unsigned, c9 float, c10 double, c11 binary(256), c12 nchar(256), c13 bool) tags (t1 tinyint, t2 smallint, t3 int, t4 bigint, t5 tinyint unsigned, t6 smallint unsigned, t7 int unsigned, t8 bigint unsigned, t9 float, t10 double, t11 binary(256), t12 nchar(256), t13 bool) ;
sql create table scalar_function_ct1 using scalar_function_stb tags (-38, -32456, 509722288, -1404014954778348330, 87, 8879, 3351927345, 1840080781675115605, 3.002364316200592e+38, 6.698140580387119e+37, "bktezshfyvmrmgzwrwerytfwudlblkyyxismpommiqpqsptpiucptwqutzhajxbiitqxkrpobqhgqvjlvgsudewmelpunjspurbpbbwypvgbwjfrwidrchnojtxyhrwfjwgdiabzfoujxkwcjjxjqsrnhmryjhrykldmdfiwircdfahldtrtuafzvybkihyjatiqivbtpydjtmbfddcgyzjuqidwcchtsamnwyqwvajftayyvfrmqcqygbxmxgjx", "ddlxkxhrvviwnjeqhewbercnlontwbsyevcjsocrwyupautsjkdzqbwuzsuetptgsdfyjzfkqyobkysikpaxtqqonxtocfowaehgovshwyciyzfmdmcmwaolkhdunfhwhcanetepxyppuullxnclockmadyaaufywllwburgsfxizcjgzvboydpqymlwgktslclidbcwiyyubyuvhjgwldkgxswigjkpbpslvlsbigdlmuldmtbqencbntbaohxr", False) ;
sql create table test.scalar_function_tb1 (ts timestamp, c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 tinyint unsigned, c6 smallint unsigned, c7 int unsigned, c8 bigint unsigned, c9 float, c10 double, c11 binary(256), c12 nchar(256), c13 bool) ;
sql create table if not exists scalar_stb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 binary(20), c5 nchar(20)) tags (t1 int);
sql create table scalar_ct1 using scalar_stb tags(10);
sql create table if not exists scalar_tb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 binary(20), c5 nchar(20));
sql create stream stb_abs_stream trigger at_once into output_abs_stb as select ts, abs(c1), abs(c2), c3 from scalar_stb;
sql create stream ctb_abs_stream trigger at_once into output_abs_ctb as select ts, abs(c1), abs(c2), c3 from scalar_ct1;
sql create stream tb_abs_stream trigger at_once into output_abs_tb as select ts, abs(c1), abs(c2), c3 from scalar_tb;
sql create stream stb_acos_stream trigger at_once into output_acos_stb as select ts, acos(c1), acos(c2), c3 from scalar_stb;
sql create stream ctb_acos_stream trigger at_once into output_acos_ctb as select ts, acos(c1), acos(c2), c3 from scalar_ct1;
sql create stream tb_acos_stream trigger at_once into output_acos_tb as select ts, acos(c1), acos(c2), c3 from scalar_tb;
sql create stream stb_asin_stream trigger at_once into output_asin_stb as select ts, asin(c1), asin(c2), c3 from scalar_stb;
sql create stream ctb_asin_stream trigger at_once into output_asin_ctb as select ts, asin(c1), asin(c2), c3 from scalar_ct1;
sql create stream tb_asin_stream trigger at_once into output_asin_tb as select ts, asin(c1), asin(c2), c3 from scalar_tb;
sql create stream stb_atan_stream trigger at_once into output_atan_stb as select ts, atan(c1), atan(c2), c3 from scalar_stb;
sql create stream ctb_atan_stream trigger at_once into output_atan_ctb as select ts, atan(c1), atan(c2), c3 from scalar_ct1;
sql create stream tb_atan_stream trigger at_once into output_atan_tb as select ts, atan(c1), atan(c2), c3 from scalar_tb;
sql create stream stb_ceil_stream trigger at_once into output_ceil_stb as select ts, ceil(c1), ceil(c2), c3 from scalar_stb;
sql create stream ctb_ceil_stream trigger at_once into output_ceil_ctb as select ts, ceil(c1), ceil(c2), c3 from scalar_ct1;
sql create stream tb_ceil_stream trigger at_once into output_ceil_tb as select ts, ceil(c1), ceil(c2), c3 from scalar_tb;
sql create stream stb_cos_stream trigger at_once into output_cos_stb as select ts, cos(c1), cos(c2), c3 from scalar_stb;
sql create stream ctb_cos_stream trigger at_once into output_cos_ctb as select ts, cos(c1), cos(c2), c3 from scalar_ct1;
sql create stream tb_cos_stream trigger at_once into output_cos_tb as select ts, cos(c1), cos(c2), c3 from scalar_tb;
sql create stream stb_floor_stream trigger at_once into output_floor_stb as select ts, floor(c1), floor(c2), c3 from scalar_stb;
sql create stream ctb_floor_stream trigger at_once into output_floor_ctb as select ts, floor(c1), floor(c2), c3 from scalar_ct1;
sql create stream tb_floor_stream trigger at_once into output_floor_tb as select ts, floor(c1), floor(c2), c3 from scalar_tb;
sql create stream stb_log_stream trigger at_once into output_log_stb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_stb;
sql create stream ctb_log_stream trigger at_once into output_log_ctb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_ct1;
sql create stream tb_log_stream trigger at_once into output_log_tb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_tb;
sql create stream stb_pow_stream trigger at_once into output_pow_stb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_stb;
sql create stream ctb_pow_stream trigger at_once into output_pow_ctb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_ct1;
sql create stream tb_pow_stream trigger at_once into output_pow_tb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_tb;
sql create stream stb_round_stream trigger at_once into output_round_stb as select ts, round(c1), round(c2), c3 from scalar_stb;
sql create stream ctb_round_stream trigger at_once into output_round_ctb as select ts, round(c1), round(c2), c3 from scalar_ct1;
sql create stream tb_round_stream trigger at_once into output_round_tb as select ts, round(c1), round(c2), c3 from scalar_tb;
sql create stream stb_sin_stream trigger at_once into output_sin_stb as select ts, sin(c1), sin(c2), c3 from scalar_stb;
sql create stream ctb_sin_stream trigger at_once into output_sin_ctb as select ts, sin(c1), sin(c2), c3 from scalar_ct1;
sql create stream tb_sin_stream trigger at_once into output_sin_tb as select ts, sin(c1), sin(c2), c3 from scalar_tb;
sql create stream stb_sqrt_stream trigger at_once into output_sqrt_stb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_stb;
sql create stream ctb_sqrt_stream trigger at_once into output_sqrt_ctb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_ct1;
sql create stream tb_sqrt_stream trigger at_once into output_sqrt_tb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_tb;
sql create stream stb_tan_stream trigger at_once into output_tan_stb as select ts, tan(c1), tan(c2), c3 from scalar_stb;
sql create stream ctb_tan_stream trigger at_once into output_tan_ctb as select ts, tan(c1), tan(c2), c3 from scalar_ct1;
sql create stream tb_tan_stream trigger at_once into output_tan_tb as select ts, tan(c1), tan(c2), c3 from scalar_tb;
sql create stream stb_char_length_stream into output_char_length_stb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_stb;
sql create stream ctb_char_length_stream into output_char_length_ctb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_ct1;
sql create stream tb_char_length_stream into output_char_length_tb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_tb;
sql create stream stb_concat_stream into output_concat_stb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_stb;
sql create stream ctb_concat_stream into output_concat_ctb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_ct1;
sql create stream tb_concat_stream into output_concat_tb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_tb;
sql create stream stb_concat_ws_stream into output_concat_ws_stb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_stb;
sql create stream ctb_concat_ws_stream into output_concat_ws_ctb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_ct1;
sql create stream tb_concat_ws_stream into output_concat_ws_tb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_tb;
sql create stream stb_length_stream into output_length_stb as select ts, length(c3), length(c4), length(c5) from scalar_stb;
sql create stream ctb_length_stream into output_length_ctb as select ts, length(c3), length(c4), length(c5) from scalar_ct1;
sql create stream tb_length_stream into output_length_tb as select ts, length(c3), length(c4), length(c5) from scalar_tb;
sql create stream stb_lower_stream into output_lower_stb as select ts, lower(c3), lower(c4), lower(c5) from scalar_stb;
sql create stream ctb_lower_stream into output_lower_ctb as select ts, lower(c3), lower(c4), lower(c5) from scalar_ct1;
sql create stream tb_lower_stream into output_lower_tb as select ts, lower(c3), lower(c4), lower(c5) from scalar_tb;
sql create stream stb_ltrim_stream into output_ltrim_stb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_stb;
sql create stream ctb_ltrim_stream into output_ltrim_ctb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_ct1;
sql create stream tb_ltrim_stream into output_ltrim_tb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_tb;
sql create stream stb_rtrim_stream into output_rtrim_stb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_stb;
sql create stream ctb_rtrim_stream into output_rtrim_ctb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_ct1;
sql create stream tb_rtrim_stream into output_rtrim_tb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_tb;
sql create stream stb_substr_stream into output_substr_stb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_stb;
sql create stream ctb_substr_stream into output_substr_ctb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_ct1;
sql create stream tb_substr_stream into output_substr_tb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_tb;
sql create stream stb_upper_stream into output_upper_stb as select ts, upper(c3), upper(c4), upper(c5) from scalar_stb;
sql create stream ctb_upper_stream into output_upper_ctb as select ts, upper(c3), upper(c4), upper(c5) from scalar_ct1;
sql create stream tb_upper_stream into output_upper_tb as select ts, upper(c3), upper(c4), upper(c5) from scalar_tb;
sql insert into scalar_ct1 values (1656668180503, 100, 100.1, "beijing", "taos", "Taos");
sql insert into scalar_ct1 values (1656668180503+1s, -50, -50.1, "tianjin", "taosdata", "Taosdata");
sql insert into scalar_ct1 values (1656668180503+2s, 0, Null, "hebei", "TDengine", Null);
sql insert into scalar_tb values (1656668180503, 100, 100.1, "beijing", "taos", "Taos");
sql insert into scalar_tb values (1656668180503+1s, -50, -50.1, "tianjin", "taosdata", "Taosdata");
sql insert into scalar_tb values (1656668180503+2s, 0, Null, "hebei", "TDengine", Null);
sql insert into scalar_ct1 values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
sql insert into scalar_tb values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
sql insert into scalar_ct1 values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
sql insert into scalar_tb values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
print ========== step4
sql drop database test;
print ========== step5 repeat
sql drop database if exists test;
sql create database if not exists test vgroups 1 precision "ms" ;
sql use test;
sql create table test.scalar_function_stb (ts timestamp, c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 tinyint unsigned, c6 smallint unsigned, c7 int unsigned, c8 bigint unsigned, c9 float, c10 double, c11 binary(256), c12 nchar(256), c13 bool) tags (t1 tinyint, t2 smallint, t3 int, t4 bigint, t5 tinyint unsigned, t6 smallint unsigned, t7 int unsigned, t8 bigint unsigned, t9 float, t10 double, t11 binary(256), t12 nchar(256), t13 bool) ;
sql create table scalar_function_ct1 using scalar_function_stb tags (-38, -32456, 509722288, -1404014954778348330, 87, 8879, 3351927345, 1840080781675115605, 3.002364316200592e+38, 6.698140580387119e+37, "bktezshfyvmrmgzwrwerytfwudlblkyyxismpommiqpqsptpiucptwqutzhajxbiitqxkrpobqhgqvjlvgsudewmelpunjspurbpbbwypvgbwjfrwidrchnojtxyhrwfjwgdiabzfoujxkwcjjxjqsrnhmryjhrykldmdfiwircdfahldtrtuafzvybkihyjatiqivbtpydjtmbfddcgyzjuqidwcchtsamnwyqwvajftayyvfrmqcqygbxmxgjx", "ddlxkxhrvviwnjeqhewbercnlontwbsyevcjsocrwyupautsjkdzqbwuzsuetptgsdfyjzfkqyobkysikpaxtqqonxtocfowaehgovshwyciyzfmdmcmwaolkhdunfhwhcanetepxyppuullxnclockmadyaaufywllwburgsfxizcjgzvboydpqymlwgktslclidbcwiyyubyuvhjgwldkgxswigjkpbpslvlsbigdlmuldmtbqencbntbaohxr", False) ;
sql create table test.scalar_function_tb1 (ts timestamp, c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 tinyint unsigned, c6 smallint unsigned, c7 int unsigned, c8 bigint unsigned, c9 float, c10 double, c11 binary(256), c12 nchar(256), c13 bool) ;
sql create table if not exists scalar_stb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 binary(20), c5 nchar(20)) tags (t1 int);
sql create table scalar_ct1 using scalar_stb tags(10);
sql create table if not exists scalar_tb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 binary(20), c5 nchar(20));
sql create stream stb_abs_stream trigger at_once into output_abs_stb as select ts, abs(c1), abs(c2), c3 from scalar_stb;
sql create stream ctb_abs_stream trigger at_once into output_abs_ctb as select ts, abs(c1), abs(c2), c3 from scalar_ct1;
sql create stream tb_abs_stream trigger at_once into output_abs_tb as select ts, abs(c1), abs(c2), c3 from scalar_tb;
sql create stream stb_acos_stream trigger at_once into output_acos_stb as select ts, acos(c1), acos(c2), c3 from scalar_stb;
sql create stream ctb_acos_stream trigger at_once into output_acos_ctb as select ts, acos(c1), acos(c2), c3 from scalar_ct1;
sql create stream tb_acos_stream trigger at_once into output_acos_tb as select ts, acos(c1), acos(c2), c3 from scalar_tb;
sql create stream stb_asin_stream trigger at_once into output_asin_stb as select ts, asin(c1), asin(c2), c3 from scalar_stb;
sql create stream ctb_asin_stream trigger at_once into output_asin_ctb as select ts, asin(c1), asin(c2), c3 from scalar_ct1;
sql create stream tb_asin_stream trigger at_once into output_asin_tb as select ts, asin(c1), asin(c2), c3 from scalar_tb;
sql create stream stb_atan_stream trigger at_once into output_atan_stb as select ts, atan(c1), atan(c2), c3 from scalar_stb;
sql create stream ctb_atan_stream trigger at_once into output_atan_ctb as select ts, atan(c1), atan(c2), c3 from scalar_ct1;
sql create stream tb_atan_stream trigger at_once into output_atan_tb as select ts, atan(c1), atan(c2), c3 from scalar_tb;
sql create stream stb_ceil_stream trigger at_once into output_ceil_stb as select ts, ceil(c1), ceil(c2), c3 from scalar_stb;
sql create stream ctb_ceil_stream trigger at_once into output_ceil_ctb as select ts, ceil(c1), ceil(c2), c3 from scalar_ct1;
sql create stream tb_ceil_stream trigger at_once into output_ceil_tb as select ts, ceil(c1), ceil(c2), c3 from scalar_tb;
sql create stream stb_cos_stream trigger at_once into output_cos_stb as select ts, cos(c1), cos(c2), c3 from scalar_stb;
sql create stream ctb_cos_stream trigger at_once into output_cos_ctb as select ts, cos(c1), cos(c2), c3 from scalar_ct1;
sql create stream tb_cos_stream trigger at_once into output_cos_tb as select ts, cos(c1), cos(c2), c3 from scalar_tb;
sql create stream stb_floor_stream trigger at_once into output_floor_stb as select ts, floor(c1), floor(c2), c3 from scalar_stb;
sql create stream ctb_floor_stream trigger at_once into output_floor_ctb as select ts, floor(c1), floor(c2), c3 from scalar_ct1;
sql create stream tb_floor_stream trigger at_once into output_floor_tb as select ts, floor(c1), floor(c2), c3 from scalar_tb;
sql create stream stb_log_stream trigger at_once into output_log_stb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_stb;
sql create stream ctb_log_stream trigger at_once into output_log_ctb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_ct1;
sql create stream tb_log_stream trigger at_once into output_log_tb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_tb;
sql create stream stb_pow_stream trigger at_once into output_pow_stb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_stb;
sql create stream ctb_pow_stream trigger at_once into output_pow_ctb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_ct1;
sql create stream tb_pow_stream trigger at_once into output_pow_tb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_tb;
sql create stream stb_round_stream trigger at_once into output_round_stb as select ts, round(c1), round(c2), c3 from scalar_stb;
sql create stream ctb_round_stream trigger at_once into output_round_ctb as select ts, round(c1), round(c2), c3 from scalar_ct1;
sql create stream tb_round_stream trigger at_once into output_round_tb as select ts, round(c1), round(c2), c3 from scalar_tb;
sql create stream stb_sin_stream trigger at_once into output_sin_stb as select ts, sin(c1), sin(c2), c3 from scalar_stb;
sql create stream ctb_sin_stream trigger at_once into output_sin_ctb as select ts, sin(c1), sin(c2), c3 from scalar_ct1;
sql create stream tb_sin_stream trigger at_once into output_sin_tb as select ts, sin(c1), sin(c2), c3 from scalar_tb;
sql create stream stb_sqrt_stream trigger at_once into output_sqrt_stb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_stb;
sql create stream ctb_sqrt_stream trigger at_once into output_sqrt_ctb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_ct1;
sql create stream tb_sqrt_stream trigger at_once into output_sqrt_tb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_tb;
sql create stream stb_tan_stream trigger at_once into output_tan_stb as select ts, tan(c1), tan(c2), c3 from scalar_stb;
sql create stream ctb_tan_stream trigger at_once into output_tan_ctb as select ts, tan(c1), tan(c2), c3 from scalar_ct1;
sql create stream tb_tan_stream trigger at_once into output_tan_tb as select ts, tan(c1), tan(c2), c3 from scalar_tb;
sql create stream stb_char_length_stream into output_char_length_stb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_stb;
sql create stream ctb_char_length_stream into output_char_length_ctb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_ct1;
sql create stream tb_char_length_stream into output_char_length_tb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_tb;
sql create stream stb_concat_stream into output_concat_stb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_stb;
sql create stream ctb_concat_stream into output_concat_ctb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_ct1;
sql create stream tb_concat_stream into output_concat_tb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_tb;
sql create stream stb_concat_ws_stream into output_concat_ws_stb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_stb;
sql create stream ctb_concat_ws_stream into output_concat_ws_ctb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_ct1;
sql create stream tb_concat_ws_stream into output_concat_ws_tb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_tb;
sql create stream stb_length_stream into output_length_stb as select ts, length(c3), length(c4), length(c5) from scalar_stb;
sql create stream ctb_length_stream into output_length_ctb as select ts, length(c3), length(c4), length(c5) from scalar_ct1;
sql create stream tb_length_stream into output_length_tb as select ts, length(c3), length(c4), length(c5) from scalar_tb;
sql create stream stb_lower_stream into output_lower_stb as select ts, lower(c3), lower(c4), lower(c5) from scalar_stb;
sql create stream ctb_lower_stream into output_lower_ctb as select ts, lower(c3), lower(c4), lower(c5) from scalar_ct1;
sql create stream tb_lower_stream into output_lower_tb as select ts, lower(c3), lower(c4), lower(c5) from scalar_tb;
sql create stream stb_ltrim_stream into output_ltrim_stb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_stb;
sql create stream ctb_ltrim_stream into output_ltrim_ctb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_ct1;
sql create stream tb_ltrim_stream into output_ltrim_tb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_tb;
sql create stream stb_rtrim_stream into output_rtrim_stb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_stb;
sql create stream ctb_rtrim_stream into output_rtrim_ctb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_ct1;
sql create stream tb_rtrim_stream into output_rtrim_tb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_tb;
sql create stream stb_substr_stream into output_substr_stb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_stb;
sql create stream ctb_substr_stream into output_substr_ctb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_ct1;
sql create stream tb_substr_stream into output_substr_tb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_tb;
sql create stream stb_upper_stream into output_upper_stb as select ts, upper(c3), upper(c4), upper(c5) from scalar_stb;
sql create stream ctb_upper_stream into output_upper_ctb as select ts, upper(c3), upper(c4), upper(c5) from scalar_ct1;
sql create stream tb_upper_stream into output_upper_tb as select ts, upper(c3), upper(c4), upper(c5) from scalar_tb;
sql insert into scalar_ct1 values (1656668180503, 100, 100.1, "beijing", "taos", "Taos");
sql insert into scalar_ct1 values (1656668180503+1s, -50, -50.1, "tianjin", "taosdata", "Taosdata");
sql insert into scalar_ct1 values (1656668180503+2s, 0, Null, "hebei", "TDengine", Null);
sql insert into scalar_tb values (1656668180503, 100, 100.1, "beijing", "taos", "Taos");
sql insert into scalar_tb values (1656668180503+1s, -50, -50.1, "tianjin", "taosdata", "Taosdata");
sql insert into scalar_tb values (1656668180503+2s, 0, Null, "hebei", "TDengine", Null);
sql insert into scalar_ct1 values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
sql insert into scalar_tb values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
sql insert into scalar_ct1 values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
sql insert into scalar_tb values (1656668180503+1s, -50, 50.1, "beiJing", "TDengine", "taos");
print ========== step6 repeat
sql drop database test;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT

View File

@ -0,0 +1,161 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql create dnode $hostname2 port 7200
system sh/exec.sh -n dnode2 -s start
print ===== step1
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
print ===== step2
print =============== create database
sql create database test vgroups 1
sql show databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once IGNORE EXPIRED into streamt1 as select _wstartts, count(*) c1, sum(a) c3 from t1 interval(10s);
sql create stream streams2 trigger at_once IGNORE EXPIRED into streamt2 as select _wstartts, count(*) c1, sum(a) c3 from t1 session(ts,10s);
sql create stream streams3 trigger at_once IGNORE EXPIRED into streamt3 as select _wstartts, count(*) c1, sum(a) c3 from t1 state_window(a);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.1);
sql insert into t1 values(1648791233002,2,2,3,2.1);
sql insert into t1 values(1648791243003,2,2,3,3.1);
sql insert into t1 values(1648791200000,4,2,3,4.1);
$loop_count = 0
loop1:
sleep 300
sql select * from streamt1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 4 then
print =====rows=$rows
goto loop1
endi
$loop_count = 0
loop2:
sleep 300
sql select * from streamt2;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 4 then
print =====rows=$rows
goto loop2
endi
$loop_count = 0
loop3:
sleep 300
sql select * from streamt3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop3
endi
print =============== create database
sql create database test1 vgroups 4
sql show databases
print ======database=$rows
sql use test1
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t1 trigger at_once IGNORE EXPIRED into streamtST1 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ;
sql create stream stream_t2 trigger at_once IGNORE EXPIRED into streamtST2 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st session(ts, 10s) ;
sql insert into ts1 values(1648791211000,1,2,3);
sql insert into ts1 values(1648791222001,2,2,3);
sql insert into ts2 values(1648791211000,1,2,3);
sql insert into ts2 values(1648791222001,2,2,3);
$loop_count = 0
loop4:
sleep 300
sql select * from streamtST1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop4
endi
if $data02 != 1 then
print =====data02=$data02
goto loop4
endi
$loop_count = 0
loop5:
sleep 300
sql select * from streamtST2;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop5
endi
if $data02 != 1 then
print =====data02=$data02
goto loop5
endi
system sh/stop_dnodes.sh

View File

@ -25,7 +25,7 @@ $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$pullDelay = 2
$ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1

View File

@ -72,12 +72,12 @@ class TDTestCase:
tdSql.query("select timediff(1,0,1a) from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,1000)
tdSql.query("select timediff(1,0,1u) from ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,1000000)
tdSql.query("select timediff(1,0,1u) from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,1000000)
tdSql.error("select timediff(1,0,1u) from ntb")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,1000000)
tdSql.error("select timediff(1,0,1u) from db.ntb")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,1000000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00') from stb")
tdSql.checkRows(3)
@ -116,12 +116,12 @@ class TDTestCase:
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1a) from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,86400000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1u) from stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,86400000000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1u) from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,86400000000)
tdSql.error("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1u) from stb")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,86400000000)
tdSql.error("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1u) from db.stb")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,86400000000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00') from stb_1")
@ -164,12 +164,12 @@ class TDTestCase:
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1a) from db.stb_1")
tdSql.checkRows(3)
tdSql.checkData(0,0,43200000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1u) from stb_1")
tdSql.checkRows(3)
tdSql.checkData(0,0,43200000000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1u) from db.stb_1")
tdSql.checkRows(3)
tdSql.checkData(0,0,43200000000)
tdSql.error("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1u) from stb_1")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,43200000000)
tdSql.error("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1u) from db.stb_1")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,43200000000)
tdSql.query("select timediff('a','b') from stb")
tdSql.checkRows(3)

View File

@ -1488,8 +1488,7 @@ class TDTestCase:
tdSql.query('select elapsed(ts,10s) from ( select * from sub_table1_1 where ts>="2015-01-01 00:00:00.000" and ts < "2015-01-01 00:10:00.000") session(ts,1w) ; ')
tdSql.query('select elapsed(ts,10s) from ( select ts ,q_int from sub_table1_1 where ts>="2015-01-01 00:00:00.000" and ts < "2015-01-01 00:10:00.000") session(ts,1w) ; ')
tdSql.error('select elapsed(ts,10s) from ( select ts ,q_int from sub_table1_1 where ts>="2015-01-01 00:00:00.000" and ts < "2015-01-01 00:10:00.000") session(ts,1w) ; ')
tdSql.error('select elapsed(ts,10s) from sub_table1_1 where ts>="2015-01-01 00:00:00.000" and ts < "2015-01-01 00:10:00.000" interval(20s) fill (next) session(ts,1w) ; ')
tdSql.query('select elapsed(ts,10s) from sub_empty_1 where ts>="2015-01-01 00:00:00.000" and ts < "2015-01-01 00:10:00.000" session(ts,1w) ; ')

View File

@ -348,10 +348,9 @@ class TDTestCase:
tdSql.execute(" use db ")
tdSql.error("select stateduration(c1,'GT',1,1b) from ct1")
tdSql.error("select stateduration(c1,'GT',1,1u) from ct1")
tdSql.error("select stateduration(c1,'GT',1,1000s) from t1")
tdSql.query("select stateduration(c1,'GT',1,1s) from t1")
tdSql.checkData(10,0,63072035)
tdSql.query("select stateduration(c1,'GT',1,1000s) from t1")
tdSql.checkData(10,0,int(63072035/1000))
tdSql.query("select stateduration(c1,'GT',1,1m) from t1")
tdSql.checkData(10,0,int(63072035/60))
tdSql.query("select stateduration(c1,'GT',1,1h) from t1")

View File

@ -675,7 +675,7 @@ class TDTestCase:
tdSql.checkRows(3)
tdSql.query("select TO_UNIXTIMESTAMP(datastr) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select TIMETRUNCATE(ts,1u) from jsons1 where jtag->'tag1'>1;")
tdSql.query("select TIMETRUNCATE(ts,1s) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select TIMEDIFF(ts,_c0) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)

View File

@ -343,10 +343,9 @@ class TDTestCase:
tdSql.execute(" use db ")
tdSql.error("select stateduration(c1,'GT',1,1b) from ct1")
tdSql.error("select stateduration(c1,'GT',1,1u) from ct1")
tdSql.error("select stateduration(c1,'GT',1,1000s) from t1")
tdSql.query("select stateduration(c1,'GT',1,1s) from t1")
tdSql.checkData(10,0,63072035)
tdSql.query("select stateduration(c1,'GT',1,1000s) from t1")
tdSql.checkData(10,0,int(63072035/1000))
tdSql.query("select stateduration(c1,'GT',1,1m) from t1")
tdSql.checkData(10,0,int(63072035/60))
tdSql.query("select stateduration(c1,'GT',1,1h) from t1")

View File

@ -37,8 +37,8 @@ class TDTestCase:
tdSql.query("select timetruncate(1,1d) from ntb")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1u) from ntb")
tdSql.checkRows(10)
tdSql.error("select timetruncate(1,1u) from ntb")
#tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1a) from ntb")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1m) from ntb")
@ -97,8 +97,8 @@ class TDTestCase:
tdSql.query("select timetruncate(1,1d) from stb")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1u) from stb")
tdSql.checkRows(10)
tdSql.error("select timetruncate(1,1u) from stb")
#tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1a) from stb")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1m) from stb")
@ -156,8 +156,8 @@ class TDTestCase:
tdSql.query("select timetruncate(1,1d) from stb_1")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1u) from stb_1")
tdSql.checkRows(10)
tdSql.error("select timetruncate(1,1u) from stb_1")
#tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1a) from stb_1")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1m) from stb_1")

View File

@ -122,9 +122,9 @@ class TDTestCase:
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
# if expectRowsList[2] != resultList[0]:
# tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0]))
# tdLog.exit("2 tmq consume rows error!")
if expectRowsList[2] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0]))
tdLog.exit("2 tmq consume rows error!")
time.sleep(10)
for i in range(len(topicNameList)):

View File

@ -16,7 +16,7 @@ sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __int__(self):
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
@ -25,6 +25,49 @@ class TDTestCase:
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdDnodes.stop(1)
tdDnodes.start(1)
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
@ -34,10 +77,11 @@ class TDTestCase:
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
@ -53,16 +97,6 @@ class TDTestCase:
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("insert data")
tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdDnodes.stop(1)
tdDnodes.start(1)
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
@ -111,10 +145,11 @@ class TDTestCase:
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
@ -130,16 +165,6 @@ class TDTestCase:
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
# tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
# tdLog.info("create stb")
# tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
# tdLog.info("create ctb")
# tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
# tdLog.info("insert data")
# tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
@ -200,89 +225,9 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 2 end ...... ")
def tmqCase3(self):
tdLog.printNoPrefix("======== test case 3: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': -1,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
# tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
# tdLog.info("create stb")
# tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
# tdLog.info("create ctb")
# tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
# tdLog.info("insert data")
# tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 3
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3)
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
consumerId = 4
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 2
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = resultList[0] + resultList[1]
if not (totalRowsInserted == actConsumeTotalRows):
tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 3 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()

View File

@ -0,0 +1,241 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdDnodes.stop(1)
tdDnodes.start(1)
return
def tmqCase3(self):
tdLog.printNoPrefix("======== test case 3: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 3
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3)
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
consumerId = 4
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 2
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = resultList[0] + resultList[1]
if not (totalRowsInserted == actConsumeTotalRows):
tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 3 end ...... ")
def tmqCase4(self):
tdLog.printNoPrefix("======== test case 4: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 5
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait commit notify")
tmqCom.getStartCommitNotifyFromTmqsim()
tdLog.info("pkill consume processor")
tdCom.killProcessor("tmq_sim")
# time.sleep(10)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
consumerId = 6
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = resultList[0]
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 4 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase3()
self.tmqCase4()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -89,7 +89,7 @@ python3 ./test.py -f 2-query/query_cols_tags_and_or.py
# python3 ./test.py -f 2-query/nestedQuery_str.py
python3 ./test.py -f 2-query/avg.py
python3 ./test.py -f 2-query/elapsed.py
#python3 ./test.py -f 2-query/elapsed.py
python3 ./test.py -f 2-query/csum.py
python3 ./test.py -f 2-query/mavg.py
python3 ./test.py -f 2-query/diff.py

View File

@ -36,6 +36,7 @@
#define MAX_CONSUMER_THREAD_CNT (16)
#define MAX_VGROUP_CNT (32)
int64_t now;
typedef enum {
NOTIFY_CMD_START_CONSUM,
NOTIFY_CMD_START_COMMIT,
@ -588,12 +589,10 @@ void build_topic_list(SThreadInfo* pInfo) {
int32_t saveConsumeResult(SThreadInfo* pInfo) {
char sqlStr[1024] = {0};
int64_t now = taosGetTimestampMs();
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)",
g_stConfInfo.cdbName, now, pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
g_stConfInfo.cdbName, atomic_fetch_add_64(&now, 1), pInfo->consumerId, pInfo->consumeMsgCnt,
pInfo->consumeRowCnt, pInfo->checkresult);
char tmpString[128];
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
@ -655,9 +654,9 @@ void loop_consume(SThreadInfo* pInfo) {
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 10 * 1000) {
taosFprintfFile(g_fp,
"consumer id %d has currently poll total msgs: %" PRId64 ", period rate: %.3f msgs/second\n",
pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0/(currentPrintTime - lastPrintTime));
taosFprintfFile(
g_fp, "consumer id %d has currently poll total msgs: %" PRId64 ", period rate: %.3f msgs/second\n",
pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / (currentPrintTime - lastPrintTime));
lastPrintTime = currentPrintTime;
lastTotalMsgs = totalMsgs;
}
@ -696,6 +695,7 @@ void* consumeThreadFunc(void* param) {
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (pInfo->taos == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
ASSERT(0);
return NULL;
}
@ -855,6 +855,8 @@ int32_t getConsumeInfo() {
}
int main(int32_t argc, char* argv[]) {
now = taosGetTimestampMs();
parseArgument(argc, argv);
getConsumeInfo();
saveConfigToLogFile();