diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index c2006a2535..93c70e1109 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -68,7 +68,7 @@ typedef struct { typedef struct { char* qmsg; - qTaskInfo_t task[5]; + qTaskInfo_t task; } STqExecCol; typedef struct { @@ -82,7 +82,7 @@ typedef struct { typedef struct { int8_t subType; - STqReader* pExecReader[5]; + STqReader* pExecReader; union { STqExecCol execCol; STqExecTb execTb; @@ -139,8 +139,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); // tqExec -int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId); -int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId); +int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); // tqMeta diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d785376925..5e87e35d68 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -146,7 +146,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int32_t colId); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); +int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 23514c8c76..89e330b78d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -262,7 +262,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return 0; } -int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { +int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SMqPollReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; int64_t timeout = pReq->timeout; @@ -271,9 +271,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { STqOffsetVal reqOffset = pReq->reqOffset; STqOffsetVal fetchOffsetNew; - // todo - workerId = 0; - // 1.find handle STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); /*ASSERT(pHandle);*/ @@ -405,7 +402,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (pHead->msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp, workerId) < 0) { + if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp) < 0) { /*ASSERT(0);*/ } // TODO batch optimization: @@ -518,27 +515,23 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->execHandle.execCol.qmsg = req.qmsg; pHandle->snapshotVer = ver; req.qmsg = NULL; - for (int32_t i = 0; i < 5; i++) { - SReadHandle handle = { - .meta = pTq->pVnode->pMeta, - .vnode = pTq->pVnode, - .initTableReader = true, - .initTqReader = true, - .version = ver, - }; - pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, - &pHandle->execHandle.pSchemaWrapper); - ASSERT(pHandle->execHandle.execCol.task[i]); - void* scanner = NULL; - qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner); - ASSERT(scanner); - pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner); - ASSERT(pHandle->execHandle.pExecReader[i]); - } + SReadHandle handle = { + .meta = pTq->pVnode->pMeta, + .vnode = pTq->pVnode, + .initTableReader = true, + .initTqReader = true, + .version = ver, + }; + pHandle->execHandle.execCol.task = + qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, &pHandle->execHandle.pSchemaWrapper); + ASSERT(pHandle->execHandle.execCol.task); + void* scanner = NULL; + qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner); + ASSERT(scanner); + pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); + ASSERT(pHandle->execHandle.pExecReader); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { - for (int32_t i = 0; i < 5; i++) { - pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode); - } + pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { @@ -550,10 +543,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid); } - for (int32_t i = 0; i < 5; i++) { - pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode); - tqReaderSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList); - } + pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); + tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList); taosArrayDestroy(tbUidList); } taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 5172819d2a..d04b7d036f 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -37,8 +37,8 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, return 0; } -static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerId, SMqDataRsp* pRsp) { - SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper); +static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) { + SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader->pSchemaWrapper); if (pSW == NULL) { return -1; } @@ -61,7 +61,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { const STqExecHandle* pExec = &pHandle->execHandle; - qTaskInfo_t task = pExec->execCol.task[0]; + qTaskInfo_t task = pExec->execCol.task; if (qStreamPrepareScan(task, pOffset) < 0) { if (pOffset->type == TMQ_OFFSET__LOG) { @@ -89,7 +89,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa if (pDataBlock != NULL) { if (pRsp->withTbName) { if (pOffset->type == TMQ_OFFSET__LOG) { - int64_t uid = pExec->pExecReader[0]->msgIter.uid; + int64_t uid = pExec->pExecReader->msgIter.uid; if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { continue; } @@ -184,12 +184,12 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S } #endif -int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) { +int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp) { ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { pRsp->withSchema = 1; - STqReader* pReader = pExec->pExecReader[workerId]; + STqReader* pReader = pExec->pExecReader; tqReaderSetDataMsg(pReader, pReq, 0); while (tqNextDataBlock(pReader)) { SSDataBlock block = {0}; @@ -197,18 +197,18 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { - int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; + int64_t uid = pExec->pExecReader->msgIter.uid; if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { continue; } } tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); - tqAddBlockSchemaToRsp(pExec, workerId, pRsp); + tqAddBlockSchemaToRsp(pExec, pRsp); pRsp->blockNum++; } } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { pRsp->withSchema = 1; - STqReader* pReader = pExec->pExecReader[workerId]; + STqReader* pReader = pExec->pExecReader; tqReaderSetDataMsg(pReader, pReq, 0); while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { SSDataBlock block = {0}; @@ -216,13 +216,13 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { - int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; + int64_t uid = pExec->pExecReader->msgIter.uid; if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { continue; } } tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); - tqAddBlockSchemaToRsp(pExec, workerId, pRsp); + tqAddBlockSchemaToRsp(pExec, pRsp); pRsp->blockNum++; } } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 83e852c79e..835ffb02fd 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -80,28 +80,23 @@ int32_t tqMetaOpen(STQ* pTq) { tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecodeSTqHandle(&decoder, &handle); handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - /*for (int32_t i = 0; i < 5; i++) {*/ - /*handle.execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/ - /*}*/ if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - for (int32_t i = 0; i < 5; i++) { - SReadHandle reader = { - .meta = pTq->pVnode->pMeta, - .vnode = pTq->pVnode, - .initTableReader = true, - .initTqReader = true, - .version = handle.snapshotVer, - }; + SReadHandle reader = { + .meta = pTq->pVnode->pMeta, + .vnode = pTq->pVnode, + .initTableReader = true, + .initTqReader = true, + .version = handle.snapshotVer, + }; - handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, - &handle.execHandle.pSchemaWrapper); - ASSERT(handle.execHandle.execCol.task[i]); - void* scanner = NULL; - qExtractStreamScanner(handle.execHandle.execCol.task[i], &scanner); - ASSERT(scanner); - handle.execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner); - ASSERT(handle.execHandle.pExecReader[i]); - } + handle.execHandle.execCol.task = + qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); + ASSERT(handle.execHandle.execCol.task); + void* scanner = NULL; + qExtractStreamScanner(handle.execHandle.execCol.task, &scanner); + ASSERT(scanner); + handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); + ASSERT(handle.execHandle.pExecReader); } else { handle.execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 236fcca516..17842615c4 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -394,10 +394,8 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { if (pIter == NULL) break; STqHandle* pExec = (STqHandle*)pIter; if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - for (int32_t i = 0; i < 5; i++) { - int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task[i], tbUidList, isAdd); - ASSERT(code == 0); - } + int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task, tbUidList, isAdd); + ASSERT(code == 0); } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) { if (!isAdd) { int32_t sz = taosArrayGetSize(tbUidList); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index b0eb7f4a14..464a3a3ee1 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -127,6 +127,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo int32_t rows = pDataBlock->info.rows; + tqDebug("tq sink, convert block %d, rows: %d", i, rows); + int32_t dataLen = 0; void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk)); @@ -178,11 +180,14 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { const SArray* pRes = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; - tqDebug("task write into table, vgId %d, block num: %d", pVnode->config.vgId, (int32_t)pRes->size); + tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size); ASSERT(pTask->tbSink.pTSchema); SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, pTask->tbSink.stbFullName, pVnode->config.vgId); + + tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); + /*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/ // build write msg SRpcMsg msg = { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index e6d116dfef..9929258df0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -316,7 +316,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { case TDMT_VND_TABLE_CFG: return vnodeGetTableCfg(pVnode, pMsg); case TDMT_VND_CONSUME: - return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId); + return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH: diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 9dd1a745d3..3fa6344009 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -229,8 +229,8 @@ typedef struct { int8_t stop; } SAsyncPool; -SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); -void transDestroyAsyncPool(SAsyncPool* pool); +SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); +void transAsyncPoolDestroy(SAsyncPool* pool); int transAsyncSend(SAsyncPool* pool, queue* mq); bool transAsyncPoolIsEmpty(SAsyncPool* pool); @@ -322,7 +322,7 @@ typedef struct STransReq { } STransReq; void transReqQueueInit(queue* q); -void* transReqQueuePushReq(queue* q); +void* transReqQueuePush(queue* q); void* transReqQueueRemove(void* arg); void transReqQueueClear(queue* q); @@ -393,9 +393,9 @@ typedef struct SDelayQueue { uv_loop_t* loop; } SDelayQueue; -int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); -void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); -int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); +int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); +void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); +SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); bool transEpSetIsEqual(SEpSet* a, SEpSet* b); /* diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f94a7f3c37..9de8c273d9 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -26,7 +26,7 @@ typedef struct SCliConn { SConnBuffer readBuf; STransQueue cliMsgs; - queue conn; + queue q; uint64_t expireTime; STransCtx ctx; @@ -451,7 +451,7 @@ void cliTimeoutCb(uv_timer_t* handle) { while (p != NULL) { while (!QUEUE_IS_EMPTY(&p->conn)) { queue* h = QUEUE_HEAD(&p->conn); - SCliConn* c = QUEUE_DATA(h, SCliConn, conn); + SCliConn* c = QUEUE_DATA(h, SCliConn, q); if (c->expireTime < currentTime) { QUEUE_REMOVE(h); transUnrefCliHandle(c); @@ -475,7 +475,7 @@ void* destroyConnPool(void* pool) { while (connList != NULL) { while (!QUEUE_IS_EMPTY(&connList->conn)) { queue* h = QUEUE_HEAD(&connList->conn); - SCliConn* c = QUEUE_DATA(h, SCliConn, conn); + SCliConn* c = QUEUE_DATA(h, SCliConn, q); cliDestroyConn(c, true); } connList = taosHashIterate((SHashObj*)pool, connList); @@ -501,11 +501,11 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { return NULL; } queue* h = QUEUE_HEAD(&plist->conn); - SCliConn* conn = QUEUE_DATA(h, SCliConn, conn); + SCliConn* conn = QUEUE_DATA(h, SCliConn, q); conn->status = ConnNormal; - QUEUE_REMOVE(&conn->conn); - QUEUE_INIT(&conn->conn); - assert(h == &conn->conn); + QUEUE_REMOVE(&conn->q); + QUEUE_INIT(&conn->q); + assert(h == &conn->q); return conn; } static int32_t allocConnRef(SCliConn* conn, bool update) { @@ -560,8 +560,8 @@ static void addConnToPool(void* pool, SCliConn* conn) { SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); // list already create before assert(plist != NULL); - QUEUE_INIT(&conn->conn); - QUEUE_PUSH(&plist->conn, &conn->conn); + QUEUE_INIT(&conn->q); + QUEUE_PUSH(&plist->conn, &conn->q); assert(!QUEUE_IS_EMPTY(&plist->conn)); } static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { @@ -614,7 +614,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { transReqQueueInit(&conn->wreqQueue); transQueueInit(&conn->cliMsgs, NULL); - QUEUE_INIT(&conn->conn); + QUEUE_INIT(&conn->q); conn->hostThrd = pThrd; conn->status = ConnNormal; conn->broken = 0; @@ -626,8 +626,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { } static void cliDestroyConn(SCliConn* conn, bool clear) { tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); - QUEUE_REMOVE(&conn->conn); - QUEUE_INIT(&conn->conn); + QUEUE_REMOVE(&conn->q); + QUEUE_INIT(&conn->q); transRemoveExHandle(transGetRefMgt(), conn->refId); conn->refId = -1; @@ -735,7 +735,7 @@ void cliSend(SCliConn* pConn) { CONN_SET_PERSIST_BY_APP(pConn); } - uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue); + uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); return; _RETURN: @@ -990,7 +990,7 @@ static SCliThrd* createThrdObj() { pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); - pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb); + pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, cliAsyncCb); uv_timer_init(pThrd->loop, &pThrd->timer); pThrd->timer.data = pThrd; @@ -1009,7 +1009,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { CLI_RELEASE_UV(pThrd->loop); taosThreadMutexDestroy(&pThrd->msgMtx); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg); - transDestroyAsyncPool(pThrd->asyncPool); + transAsyncPoolDestroy(pThrd->asyncPool); transDQDestroy(pThrd->delayQueue, destroyCmsg); taosMemoryFree(pThrd->loop); @@ -1054,6 +1054,12 @@ static void doDelayTask(void* param) { cliHandleReq(pMsg, pThrd); } +static void doCloseIdleConn(void* param) { + STaskArg* arg = param; + SCliConn* conn = arg->param1; + SCliThrd* pThrd = arg->param2; +} + static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { STransConnCtx* pCtx = pMsg->ctx; @@ -1075,7 +1081,7 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { } } -bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) { +bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { if ((pResp == NULL || pResp->info.hasEpSet == 0)) { return false; } @@ -1116,7 +1122,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { */ STransConnCtx* pCtx = pMsg->ctx; int32_t code = pResp->code; - bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false; + + bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false; if (retry) { pMsg->sent = 0; pCtx->retryCnt += 1; @@ -1125,6 +1132,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (pCtx->retryCnt < pCtx->retryLimit) { transUnrefCliHandle(pConn); EPSET_FORWARD_INUSE(&pCtx->epSet); + transFreeMsg(pResp->pCont); cliSchedMsgToNextNode(pMsg, pThrd); return -1; } @@ -1148,7 +1156,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { STraceId* trace = &pResp->info.traceId; - bool hasEpSet = cliTryToExtractEpSet(pResp, &pCtx->epSet); + bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet); if (hasEpSet) { char tbuf[256] = {0}; EPSET_DEBUG_STR(&pCtx->epSet, tbuf); @@ -1336,19 +1344,18 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - if (0 != transAsyncSend(pThrd->asyncPool, &cliMsg->q)) { - tsem_destroy(sem); - taosMemoryFree(sem); + int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q); + if (ret != 0) { destroyCmsg(cliMsg); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return -1; + goto _RETURN; } tsem_wait(sem); + +_RETURN: tsem_destroy(sem); taosMemoryFree(sem); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return 0; + return ret; } /* * diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index c3cba3118c..34849df2b2 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -175,7 +175,7 @@ int transSetConnOption(uv_tcp_t* stream) { return ret; } -SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { +SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool)); pool->nAsync = sz; pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); @@ -194,7 +194,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) return pool; } -void transDestroyAsyncPool(SAsyncPool* pool) { +void transAsyncPoolDestroy(SAsyncPool* pool) { for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); // uv_close((uv_handle_t*)async, NULL); @@ -205,6 +205,14 @@ void transDestroyAsyncPool(SAsyncPool* pool) { taosMemoryFree(pool->asyncs); taosMemoryFree(pool); } +bool transAsyncPoolIsEmpty(SAsyncPool* pool) { + for (int i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = async->data; + if (!QUEUE_IS_EMPTY(&item->qmsg)) return false; + } + return true; +} int transAsyncSend(SAsyncPool* pool, queue* q) { if (atomic_load_8(&pool->stop) == 1) { return -1; @@ -228,14 +236,6 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { } return uv_async_send(async); } -bool transAsyncPoolIsEmpty(SAsyncPool* pool) { - for (int i = 0; i < pool->nAsync; i++) { - uv_async_t* async = &(pool->asyncs[i]); - SAsyncItem* item = async->data; - if (!QUEUE_IS_EMPTY(&item->qmsg)) return false; - } - return true; -} void transCtxInit(STransCtx* ctx) { // init transCtx @@ -308,7 +308,7 @@ void transReqQueueInit(queue* q) { // init req queue QUEUE_INIT(q); } -void* transReqQueuePushReq(queue* q) { +void* transReqQueuePush(queue* q) { uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq)); wreq->data = req; @@ -488,8 +488,25 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { heapDestroy(queue->heap); taosMemoryFree(queue); } +void transDQCancel(SDelayQueue* queue, SDelayTask* task) { + uv_timer_stop(queue->timer); -int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { + if (heapSize(queue->heap) <= 0) return; + heapRemove(queue->heap, &task->node); + + if (heapSize(queue->heap) != 0) { + HeapNode* minNode = heapMin(queue->heap); + if (minNode != NULL) return; + + uint64_t now = taosGetTimestampMs(); + SDelayTask* task = container_of(minNode, SDelayTask, node); + uint64_t timeout = now > task->execTime ? now - task->execTime : 0; + + uv_timer_start(queue->timer, transDQTimeout, timeout, 0); + } +} + +SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { uint64_t now = taosGetTimestampMs(); SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); task->func = func; @@ -507,7 +524,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_ tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs); heapInsert(queue->heap, &task->node); uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0); - return 0; + return task; } void transPrintEpSet(SEpSet* pEpSet) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 7b9402f954..3fb947bdba 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -434,7 +434,7 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) { uvPrepareSendData(smsg, &wb); transRefSrvHandle(pConn); - uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue); + uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); } static void uvStartSendResp(SSvrMsg* smsg) { @@ -697,7 +697,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { // conn set QUEUE_INIT(&pThrd->conn); - pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 1, pThrd, uvWorkerAsyncCb); + pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 1, pThrd, uvWorkerAsyncCb); uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); return true; @@ -976,7 +976,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) { taosThreadJoin(pThrd->thread, NULL); SRV_RELEASE_UV(pThrd->loop); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); - transDestroyAsyncPool(pThrd->asyncPool); + transAsyncPoolDestroy(pThrd->asyncPool); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index 0aea6e3e14..8990c24305 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -809,6 +809,8 @@ class StateEmpty(AnyState): ] def verifyTasksToState(self, tasks, newState): + if Config.getConfig().ignore_errors: # if we are asked to ignore certain errors, let's not verify CreateDB success. + return if (self.hasSuccess(tasks, TaskCreateDb) ): # at EMPTY, if there's succes in creating DB if (not self.hasTask(tasks, TaskDropDb)): # and no drop_db tasks @@ -2491,7 +2493,7 @@ class MainExec: action='store', default=None, type=str, - help='Ignore error codes, comma separated, 0x supported (default: None)') + help='Ignore error codes, comma separated, 0x supported, also suppresses certain transition state checks. (default: None)') parser.add_argument( '-i', '--num-replicas', diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index 7133e8365d..7a5b70d6ca 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -28,6 +28,7 @@ from util.common import * from util.constant import * from dataclasses import dataclass,field from typing import List +from datetime import datetime @dataclass class DataSet: diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 1ef6d55b27..a13a757bbe 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -89,7 +89,7 @@ ./test.sh -f tsim/parser/alter.sim # jira ./test.sh -f tsim/parser/alter1.sim ./test.sh -f tsim/parser/auto_create_tb_drop_tb.sim -# jira ./test.sh -f tsim/parser/auto_create_tb.sim +./test.sh -f tsim/parser/auto_create_tb.sim ./test.sh -f tsim/parser/between_and.sim ./test.sh -f tsim/parser/binary_escapeCharacter.sim # jira ./test.sh -f tsim/parser/col_arithmetic_operation.sim @@ -121,25 +121,21 @@ ./test.sh -f tsim/parser/insert_multiTbl.sim ./test.sh -f tsim/parser/insert_tb.sim # jira ./test.sh -f tsim/parser/interp.sim -# ./test.sh -f tsim/parser/join.sim -# ./test.sh -f tsim/parser/join_manyblocks.sim +./test.sh -f tsim/parser/join_manyblocks.sim ## ./test.sh -f tsim/parser/join_multitables.sim # ./test.sh -f tsim/parser/join_multivnode.sim +# jira ./test.sh -f tsim/parser/join.sim ./test.sh -f tsim/parser/last_cache.sim -## ./test.sh -f tsim/parser/last_groupby.sim +./test.sh -f tsim/parser/last_groupby.sim # jira ./test.sh -f tsim/parser/lastrow.sim -## ./test.sh -f tsim/parser/like.sim -# ./test.sh -f tsim/parser/limit.sim -# ./test.sh -f tsim/parser/limit1.sim -# ./test.sh -f tsim/parser/limit1_tblocks100.sim -## ./test.sh -f tsim/parser/limit2.sim -## ./test.sh -f tsim/parser/limit2_tblocks100.sim -## ./test.sh -f tsim/parser/limit_stb.sim -## ./test.sh -f tsim/parser/limit_tb.sim -## ./test.sh -f tsim/parser/line_insert.sim +./test.sh -f tsim/parser/like.sim +# jira ./test.sh -f tsim/parser/limit.sim +# jira ./test.sh -f tsim/parser/limit1.sim +# jira ./test.sh -f tsim/parser/limit2.sim +# jira ./test.sh -f tsim/parser/line_insert.sim ./test.sh -f tsim/parser/mixed_blocks.sim ./test.sh -f tsim/parser/nchar.sim -# ./test.sh -f tsim/parser/nestquery.sim +# jira ./test.sh -f tsim/parser/nestquery.sim # jira ./test.sh -f tsim/parser/null_char.sim ./test.sh -f tsim/parser/precision_ns.sim ./test.sh -f tsim/parser/projection_limit_offset.sim @@ -165,7 +161,8 @@ # jira ./test.sh -f tsim/parser/udf_dll_stable.sim # jira ./test.sh -f tsim/parser/udf_dll.sim # jira ./test.sh -f tsim/parser/udf.sim -# ./test.sh -f tsim/parser/union.sim +./test.sh -f tsim/parser/union.sim +# jira ./test.sh -f tsim/parser/union_sysinfo.sim # jira ./test.sh -f tsim/parser/where.sim # ---- query diff --git a/tests/script/test.sh b/tests/script/test.sh index 1cfe8dd6f5..0ffe8cf8f1 100755 --- a/tests/script/test.sh +++ b/tests/script/test.sh @@ -84,6 +84,7 @@ echo "SIM_DIR : $SIM_DIR" echo "CODE_DIR : $CODE_DIR" echo "CFG_DIR : $CFG_DIR" +rm -rf $SIM_DIR/* rm -rf $LOG_DIR rm -rf $CFG_DIR diff --git a/tests/script/tsim/parser/auto_create_tb.sim b/tests/script/tsim/parser/auto_create_tb.sim index 485f4f480c..3a64b79239 100644 --- a/tests/script/tsim/parser/auto_create_tb.sim +++ b/tests/script/tsim/parser/auto_create_tb.sim @@ -282,7 +282,7 @@ if $rows != 2 then return -1 endi -sql insert into tick_000001 ('ts', 'last_prc', 'volume', 'amount', 'oi', 'bid_prc1', 'ask_prc1') using tick tags (000001, Stocks) VALUES (1546391700000, 0.000000, 0, 0.000000, 0, 0.000000, 10.320000); +sql insert into tick_000001 (ts, last_prc, volume, amount, oi, bid_prc1, ask_prc1) using tick tags ('000001', 'Stocks') VALUES (1546391700000, 0.000000, 0, 0.000000, 0, 0.000000, 10.320000); sql select tbname from tick if $rows != 1 then return -1 diff --git a/tests/script/tsim/parser/join.sim b/tests/script/tsim/parser/join.sim index 55842d5c16..fa03ad8214 100644 --- a/tests/script/tsim/parser/join.sim +++ b/tests/script/tsim/parser/join.sim @@ -233,8 +233,15 @@ endi print 1 #select + where condition + interval query -sql select count(join_tb1.*) from $tb1 , $tb2 where $ts1 = $ts2 and join_tb1.ts >= 100000 and join_tb0.c7 = true interval(10a) order by join_tb0.ts desc; +print select count(join_tb1.*) from $tb1 , $tb2 where $ts1 = $ts2 and join_tb1.ts >= 100000 and join_tb0.c7 = true interval(10a) order by _wstart asc; +sql select count(join_tb1.*) from $tb1 , $tb2 where $ts1 = $ts2 and join_tb1.ts >= 100000 and join_tb0.c7 = true interval(10a) order by _wstart asc; +$val = 100 +if $rows != $val then + return -1 +endi +print select count(join_tb1.*) from $tb1 , $tb2 where $ts1 = $ts2 and join_tb1.ts >= 100000 and join_tb0.c7 = true interval(10a) order by _wstart desc; +sql select count(join_tb1.*) from $tb1 , $tb2 where $ts1 = $ts2 and join_tb1.ts >= 100000 and join_tb0.c7 = true interval(10a) order by _wstart desc; $val = 100 if $rows != $val then return -1 diff --git a/tests/script/tsim/parser/join_manyblocks.sim b/tests/script/tsim/parser/join_manyblocks.sim index eb5e34b079..154316a03f 100644 --- a/tests/script/tsim/parser/join_manyblocks.sim +++ b/tests/script/tsim/parser/join_manyblocks.sim @@ -73,8 +73,6 @@ while $i < $tbNum $tstart = 100000 endw -sleep 100 - print ===============join_manyblocks.sim print ==============> td-3313 sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1; diff --git a/tests/script/tsim/parser/last_groupby.sim b/tests/script/tsim/parser/last_groupby.sim index 8f9574412d..68d7f10fe2 100644 --- a/tests/script/tsim/parser/last_groupby.sim +++ b/tests/script/tsim/parser/last_groupby.sim @@ -4,14 +4,11 @@ system sh/exec.sh -n dnode1 -s start sql connect print ======================== dnode1 start - $db = testdb - sql create database $db sql use $db sql create stable st2 (ts timestamp, f1 int, f2 float, f3 double, f4 bigint, f5 smallint, f6 tinyint, f7 bool, f8 binary(10), f9 nchar(10)) tags (id1 int, id2 float, id3 nchar(10), id4 double, id5 smallint, id6 bigint, id7 binary(10)) - sql create table tb1 using st2 tags (1,1.0,"1",1.0,1,1,"1"); sql insert into tb1 values (now-200s,1,1.0,1.0,1,1,1,true,"1","1") @@ -23,16 +20,13 @@ sql insert into tb1 values (now+300s,4,4.0,4.0,4,4,4,true,"4","4") sql insert into tb1 values (now+400s,4,4.0,4.0,4,4,4,true,"4","4") sql insert into tb1 values (now+500s,4,4.0,4.0,4,4,4,true,"4","4") -sql select f1,last(*) from st2 group by f1; - +sql select f1, last(*) from st2 group by f1 order by f1; if $rows != 4 then return -1 endi - if $data00 != 1 then return -1 endi - if $data02 != 1 then print $data02 return -1 @@ -59,15 +53,13 @@ if $data09 != 1 then return -1 endi -sql select f1,last(f1,st2.*) from st2 group by f1; +sql select f1, last(f1,st2.*) from st2 group by f1 order by f1; if $rows != 4 then return -1 endi - if $data00 != 1 then return -1 endi - if $data01 != 1 then return -1 endi diff --git a/tests/script/tsim/parser/like.sim b/tests/script/tsim/parser/like.sim index e7c191ed92..96672aee3c 100644 --- a/tests/script/tsim/parser/like.sim +++ b/tests/script/tsim/parser/like.sim @@ -5,7 +5,6 @@ sql connect print ======================== dnode1 start - $db = testdb sql drop database if exists $db sql create database $db cachemodel 'last_value' @@ -32,7 +31,6 @@ if $rows != 2 then return -1 endi - sql select b from $table1 where b like 'table\_name' if $rows != 1 then return -1 diff --git a/tests/script/tsim/parser/limit1.sim b/tests/script/tsim/parser/limit1.sim index 1f72999784..b6d0629c8f 100644 --- a/tests/script/tsim/parser/limit1.sim +++ b/tests/script/tsim/parser/limit1.sim @@ -18,7 +18,7 @@ $stb = $stbPrefix . $i sql drop database $db -x step1 step1: -sql create database $db +sql create database $db cache 16 print ====== create tables sql use $db sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int) @@ -57,11 +57,10 @@ run tsim/parser/limit1_stb.sim print ================== restart server to commit data into disk system sh/exec.sh -n dnode1 -s stop -x SIGINT -sleep 500 system sh/exec.sh -n dnode1 -s start print ================== server restart completed run tsim/parser/limit1_tb.sim run tsim/parser/limit1_stb.sim -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/parser/limit1_stb.sim b/tests/script/tsim/parser/limit1_stb.sim index 513e2fac02..879fd7882f 100644 --- a/tests/script/tsim/parser/limit1_stb.sim +++ b/tests/script/tsim/parser/limit1_stb.sim @@ -1,4 +1,3 @@ -sleep 100 sql connect $dbPrefix = lm1_db diff --git a/tests/script/tsim/parser/limit1_tb.sim b/tests/script/tsim/parser/limit1_tb.sim index 300af7ac7b..5a7c1bc201 100644 --- a/tests/script/tsim/parser/limit1_tb.sim +++ b/tests/script/tsim/parser/limit1_tb.sim @@ -1,4 +1,3 @@ -sleep 100 sql connect $dbPrefix = lm1_db diff --git a/tests/script/tsim/parser/limit1_tblocks100.sim b/tests/script/tsim/parser/limit1_tblocks100.sim deleted file mode 100644 index f541ea7158..0000000000 --- a/tests/script/tsim/parser/limit1_tblocks100.sim +++ /dev/null @@ -1,67 +0,0 @@ -system sh/stop_dnodes.sh -system sh/deploy.sh -n dnode1 -i 1 -system sh/exec.sh -n dnode1 -s start -sql connect - -$dbPrefix = lm1_db -$tbPrefix = lm1_tb -$stbPrefix = lm1_stb -$tbNum = 10 -$rowNum = 10000 -$totalNum = $tbNum * $rowNum -$ts0 = 1537146000000 -$delta = 600000 -print ========== limit1.sim -$i = 0 -$db = $dbPrefix . $i -$stb = $stbPrefix . $i - -sql drop database $db -x step1 -step1: -sql create database $db cache 16 -print ====== create tables -sql use $db -sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int) - -$i = 0 -$ts = $ts0 -$halfNum = $tbNum / 2 -while $i < $halfNum - $tbId = $i + $halfNum - $tb = $tbPrefix . $i - $tb1 = $tbPrefix . $tbId - sql create table $tb using $stb tags( $i ) - sql create table $tb1 using $stb tags( $tbId ) - - $x = 0 - while $x < $rowNum - $xs = $x * $delta - $ts = $ts0 + $xs - $c = $x / 10 - $c = $c * 10 - $c = $x - $c - $binary = 'binary . $c - $binary = $binary . ' - $nchar = 'nchar . $c - $nchar = $nchar . ' - sql insert into $tb values ( $ts , $c , $c , $c , $c , $c , $c , true, $binary , $nchar ) $tb1 values ( $ts , $c , NULL , $c , NULL , $c , $c , true, $binary , $nchar ) - $x = $x + 1 - endw - - $i = $i + 1 -endw -print ====== tables created - -run tsim/parser/limit1_tb.sim -run tsim/parser/limit1_stb.sim - -print ================== restart server to commit data into disk -system sh/exec.sh -n dnode1 -s stop -x SIGINT -sleep 500 -system sh/exec.sh -n dnode1 -s start -print ================== server restart completed - -run tsim/parser/limit1_tb.sim -run tsim/parser/limit1_stb.sim - -system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/parser/limit2.sim b/tests/script/tsim/parser/limit2.sim index af03c6bb7f..ca308fa6e7 100644 --- a/tests/script/tsim/parser/limit2.sim +++ b/tests/script/tsim/parser/limit2.sim @@ -1,10 +1,6 @@ system sh/stop_dnodes.sh - system sh/deploy.sh -n dnode1 -i 1 -system sh/cfg.sh -n dnode1 -c walLevel -v 1 -system sh/cfg.sh -n dnode1 -c rowsInFileBlock -v 255 system sh/exec.sh -n dnode1 -s start -sleep 100 sql connect $dbPrefix = lm2_db @@ -69,10 +65,8 @@ print ====== tables created print ================== restart server to commit data into disk system sh/exec.sh -n dnode1 -s stop -x SIGINT -sleep 500 +sleep 100 system sh/exec.sh -n dnode1 -s start print ================== server restart completed run tsim/parser/limit2_query.sim - -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/parser/limit2_query.sim b/tests/script/tsim/parser/limit2_query.sim index c35fd369ca..8a2da7988d 100644 --- a/tests/script/tsim/parser/limit2_query.sim +++ b/tests/script/tsim/parser/limit2_query.sim @@ -1,4 +1,3 @@ -sleep 100 sql connect $dbPrefix = lm2_db @@ -24,8 +23,11 @@ sql use $db ##### aggregation on stb with 6 tags + where + group by + limit offset $val1 = 1 $val2 = $tbNum - 1 -sql select count(*) from $stb where t1 > $val1 and t1 < $val2 group by t1, t2, t3, t4, t5, t6 order by t1 asc limit 1 offset 0 +print select count(*) from $stb where t1 > $val1 and t1 < $val2 group by t1, t2, t3, t4, t5, t6 order by t1 asc limit 1 offset 0 +sql select count(*), t1, t2, t3, t4, t5, t6 from $stb where t1 > $val1 and t1 < $val2 group by t1, t2, t3, t4, t5, t6 order by t1 asc limit 1 offset 0 $val = $tbNum - 3 + +print $rows $val if $rows != $val then return -1 endi diff --git a/tests/script/tsim/parser/limit2_tblocks100.sim b/tests/script/tsim/parser/limit2_tblocks100.sim deleted file mode 100644 index 0d87a41838..0000000000 --- a/tests/script/tsim/parser/limit2_tblocks100.sim +++ /dev/null @@ -1,76 +0,0 @@ -system sh/stop_dnodes.sh - -system sh/deploy.sh -n dnode1 -i 1 -system sh/cfg.sh -n dnode1 -c walLevel -v 1 -system sh/cfg.sh -n dnode1 -c rowsInFileBlock -v 255 -system sh/exec.sh -n dnode1 -s start -sleep 100 -sql connect - -$dbPrefix = lm2_db -$tbPrefix = lm2_tb -$stbPrefix = lm2_stb -$tbNum = 10 -$rowNum = 10000 -$totalNum = $tbNum * $rowNum -$ts0 = 1537146000000 -$delta = 600000 -$tsu = $rowNum * $delta -$tsu = $tsu - $delta -$tsu = $tsu + $ts0 - -print ========== limit2.sim -$i = 0 -$db = $dbPrefix . $i -$stb = $stbPrefix . $i - -sql drop database $db -x step1 -step1: -sql create database $db tblocks 100 -print ====== create tables -sql use $db -sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int, t2 nchar(20), t3 binary(20), t4 bigint, t5 smallint, t6 double) - -$i = 0 -$ts = $ts0 -$halfNum = $tbNum / 2 -while $i < $halfNum - $i1 = $i + $halfNum - $tb = $tbPrefix . $i - $tb1 = $tbPrefix . $i1 - $tgstr = 'tb . $i - $tgstr = $tgstr . ' - $tgstr1 = 'tb . $i1 - $tgstr1 = $tgstr1 . ' - sql create table $tb using $stb tags( $i , $tgstr , $tgstr , $i , $i , $i ) - sql create table $tb1 using $stb tags( $i1 , $tgstr1 , $tgstr1 , $i , $i , $i ) - - $x = 0 - while $x < $rowNum - $xs = $x * $delta - $ts = $ts0 + $xs - $c = $x / 10 - $c = $c * 10 - $c = $x - $c - $binary = 'binary . $c - $binary = $binary . ' - $nchar = 'nchar . $c - $nchar = $nchar . ' - sql insert into $tb values ( $ts , $c , $c , $c , $c , $c , $c , true, $binary , $nchar ) - sql insert into $tb1 values ( $ts , $c , NULL , $c , NULL , $c , $c , true, $binary , $nchar ) - $x = $x + 1 - endw - - $i = $i + 1 -endw -print ====== tables created - -#run tsim/parser/limit2_query.sim - -print ================== restart server to commit data into disk -system sh/exec.sh -n dnode1 -s stop -x SIGINT -sleep 100 -system sh/exec.sh -n dnode1 -s start -print ================== server restart completed - -run tsim/parser/limit2_query.sim diff --git a/tests/script/tsim/parser/limit_stb.sim b/tests/script/tsim/parser/limit_stb.sim index ec7c0e0f13..a3064d59e9 100644 --- a/tests/script/tsim/parser/limit_stb.sim +++ b/tests/script/tsim/parser/limit_stb.sim @@ -1,4 +1,3 @@ -sleep 100 sql connect $dbPrefix = lm_db diff --git a/tests/script/tsim/parser/limit_tb.sim b/tests/script/tsim/parser/limit_tb.sim index 4a93797d40..d0d14c5bfc 100644 --- a/tests/script/tsim/parser/limit_tb.sim +++ b/tests/script/tsim/parser/limit_tb.sim @@ -1,4 +1,3 @@ -sleep 100 sql connect $dbPrefix = lm_db diff --git a/tests/script/tsim/parser/line_insert.sim b/tests/script/tsim/parser/line_insert.sim index cbd960bed6..fc522ecaa7 100644 --- a/tests/script/tsim/parser/line_insert.sim +++ b/tests/script/tsim/parser/line_insert.sim @@ -43,7 +43,7 @@ endi #print =============== clear sql drop database $db sql show databases -if $rows != 0 then +if $rows != 2 then return -1 endi diff --git a/tests/script/tsim/parser/nestquery.sim b/tests/script/tsim/parser/nestquery.sim index c82718c1cb..82d39eff8e 100644 --- a/tests/script/tsim/parser/nestquery.sim +++ b/tests/script/tsim/parser/nestquery.sim @@ -53,8 +53,6 @@ while $i < $half $i = $i + 1 endw -sleep 100 - $i = 1 $tb = $tbPrefix . $i @@ -63,7 +61,6 @@ sql select count(*) from (select count(*) from nest_mt0) if $rows != 1 then return -1 endi - if $data00 != 1 then return -1 endi @@ -72,35 +69,31 @@ sql select count(*) from (select count(*) from nest_mt0 group by tbname) if $rows != 1 then return -1 endi - if $data00 != 10 then return -1 endi -sql select count(*) from (select count(*) from nest_mt0 interval(10h) group by tbname) +sql select count(*) from (select count(*) from nest_mt0 partition by tbname interval(10h) ) if $rows != 1 then return -1 endi - if $data00 != 170 then return -1 endi -sql select sum(a) from (select count(*) a from nest_mt0 interval(10h) group by tbname) +sql select sum(a) from (select count(*) a from nest_mt0 partition by tbname interval(10h)) if $rows != 1 then return -1 endi - if $data00 != 100000 then return -1 endi print =================> alias name test -sql select ts from (select count(*) a from nest_tb0 interval(1h)) +sql select ts from (select _wstart as ts, count(*) a from nest_tb0 interval(1h)) if $rows != 167 then return -1 endi - if $data00 != @20-09-15 00:00:00.000@ then return -1 endi @@ -109,7 +102,6 @@ sql select count(a) from (select count(*) a from nest_tb0 interval(1h)) if $rows != 1 then return -1 endi - if $data00 != 167 then return -1 endi @@ -125,19 +117,16 @@ if $rows != 0 then return -1 endi -sql select * from (select count(*) a, tbname f1 from nest_mt0 group by tbname) t where t.a>0 and f1 = 'nest_tb0'; +sql select * from (select count(*) a, tbname f1, tbname from nest_mt0 group by tbname) t where t.a>0 and f1 = 'nest_tb0'; if $rows != 1 then return -1 endi - if $data00 != 10000 then return -1 endi - if $data01 != @nest_tb0@ then return -1 endi - if $data02 != @nest_tb0@ then return -1 endi @@ -145,37 +134,30 @@ endi print ===================> nest query interval sql_error select ts, avg(c1) from (select ts, c1 from nest_tb0); -sql select avg(c1) from (select * from nest_tb0) interval(3d) +sql select _wstart, avg(c1) from (select * from nest_tb0) interval(3d) if $rows != 3 then return -1 endi - if $data00 != @20-09-14 00:00:00.000@ then return -1 endi - if $data01 != 49.222222222 then return -1 endi - if $data10 != @20-09-17 00:00:00.000@ then - print expect 20-09-17 00:00:00.000, actual: $data10 return -1 endi - -if $data11 != 49.685185185 then +if $data11 != 49.581325301 then return -1 endi - if $data20 != @20-09-20 00:00:00.000@ then return -1 endi - -if $data21 != 49.500000000 then +if $data21 != 49.703539823 then return -1 endi -sql_error select stddev(c1) from (select c1 from nest_tb0); +sql select stddev(c1) from (select c1 from nest_tb0); sql_error select percentile(c1, 20) from (select * from nest_tb0); sql_error select interp(c1) from (select * from nest_tb0); sql_error select derivative(val, 1s, 0) from (select c1 val from nest_tb0); @@ -184,39 +166,31 @@ sql_error select irate(c1) from (select c1 from nest_tb0); sql_error select diff(c1), twa(c1) from (select * from nest_tb0); sql_error select irate(c1), interp(c1), twa(c1) from (select * from nest_tb0); -sql select apercentile(c1, 50) from (select * from nest_tb0) interval(1d) +sql select _wstart, apercentile(c1, 50) from (select * from nest_tb0) interval(1d) if $rows != 7 then return -1 endi - if $data00 != @20-09-15 00:00:00.000@ then return -1 endi - if $data01 != 47.571428571 then return -1 endi - if $data10 != @20-09-16 00:00:00.000@ then return -1 endi - if $data11 != 49.666666667 then return -1 endi - if $data20 != @20-09-17 00:00:00.000@ then return -1 endi - if $data21 != 49.000000000 then return -1 endi - if $data30 != @20-09-18 00:00:00.000@ then return -1 endi - if $data31 != 48.333333333 then return -1 endi @@ -225,7 +199,6 @@ sql select twa(c1) from (select * from nest_tb0); if $rows != 1 then return -1 endi - if $data00 != 49.500000000 then return -1 endi @@ -234,7 +207,6 @@ sql select leastsquares(c1, 1, 1) from (select * from nest_tb0); if $rows != 1 then return -1 endi - if $data00 != @{slop:0.000100, intercept:49.000000}@ then return -1 endi @@ -248,19 +220,15 @@ sql select derivative(c1, 1s, 0) from (select * from nest_tb0); if $rows != 9999 then return -1 endi - if $data00 != @20-09-15 00:01:00.000@ then return -1 endi - if $data01 != 0.016666667 then return -1 endi - if $data10 != @20-09-15 00:02:00.000@ then return -1 endi - if $data11 != 0.016666667 then return -1 endi @@ -274,54 +242,42 @@ sql select avg(c1),sum(c2), max(c3), min(c4), count(*), first(c7), last(c7),spre if $rows != 7 then return -1 endi - if $data00 != @20-09-15 00:00:00.000@ then return -1 endi - if $data01 != 48.666666667 then print expect 48.666666667, actual: $data01 return -1 endi - if $data02 != 70080.000000000 then return -1 endi - if $data03 != 99 then return -1 endi - if $data04 != 0 then return -1 endi - if $data05 != 1440 then return -1 endi - if $data06 != 0 then print $data06 return -1 endi - if $data07 != 1 then return -1 endi - if $data08 != 99.000000000 then print expect 99.000000000, actual: $data08 return -1 endi - if $data10 != @20-09-16 00:00:00.000@ then return -1 endi - if $data11 != 49.777777778 then return -1 endi - if $data12 != 71680.000000000 then return -1 endi @@ -332,39 +288,28 @@ sql select bottom(x, 20) from (select c1 x from nest_tb0) print ===================> group by + having - - print =========================> ascending order/descending order - - - print =========================> nest query join sql select a.ts,a.k,b.ts from (select count(*) k from nest_tb0 interval(30a)) a, (select count(*) f from nest_tb1 interval(30a)) b where a.ts = b.ts ; if $rows != 10000 then return -1 endi - if $data00 != @20-09-15 00:00:00.000@ then return -1 endi - if $data01 != 1 then return -1 endi - if $data02 != @20-09-15 00:00:00.000@ then return -1 endi - if $data10 != @20-09-15 00:01:00.000@ then return -1 endi - if $data11 != 1 then return -1 endi - if $data12 != @20-09-15 00:01:00.000@ then return -1 endi @@ -373,11 +318,9 @@ sql select sum(a.k), sum(b.f) from (select count(*) k from nest_tb0 interval(30a if $rows != 1 then return -1 endi - if $data00 != 10000 then return -1 endi - if $data01 != 10000 then return -1 endi @@ -386,19 +329,15 @@ sql select a.ts,a.k,b.ts,c.ts,c.ts,c.x from (select count(*) k from nest_tb0 int if $rows != 10000 then return -1 endi - if $data00 != @20-09-15 00:00:00.000@ then return -1 endi - if $data01 != 1 then return -1 endi - if $data02 != @20-09-15 00:00:00.000@ then return -1 endi - if $data03 != @20-09-15 00:00:00.000@ then return -1 endi @@ -407,11 +346,9 @@ sql select diff(val) from (select c1 val from nest_tb0); if $rows != 9999 then return -1 endi - if $data00 != @70-01-01 08:00:00.000@ then return -1 endi - if $data01 != 1 then return -1 endi @@ -425,19 +362,15 @@ sql select count(*),c1 from (select * from nest_tb0) where c1 < 2 group by c1; if $rows != 2 then return -1 endi - if $data00 != 100 then return -1 endi - if $data01 != 0 then return -1 endi - if $data10 != 100 then return -1 endi - if $data11 != 1 then return -1 endi @@ -447,11 +380,9 @@ sql select twa(c1) from nest_tb1 interval(19a); if $rows != 10000 then return -1 endi - if $data00 != @20-09-14 23:59:59.992@ then return -1 endi - if $data01 != 0.000083333 then return -1 endi @@ -461,19 +392,15 @@ sql select min(val),max(val),first(val),last(val),count(val),sum(val),avg(val) f if $rows != 1 then return -1 endi - if $data00 != 10000 then return -1 endi - if $data01 != 10000 then return -1 endi - if $data04 != 10 then return -1 endi - if $data05 != 100000 then return -1 endi @@ -487,19 +414,15 @@ sql select avg(k) from (select avg(k) k from t1 interval(1s)) interval(1m); if $rows != 2 then return -1 endi - if $data00 != @20-01-01 01:01:00.000000@ then return -1 endi - if $data01 != 1.000000000 then return -1 endi - if $data10 != @20-01-01 01:02:00.000000@ then return -1 endi - if $data11 != 2.000000000 then return -1 endi diff --git a/tests/script/tsim/parser/union.sim b/tests/script/tsim/parser/union.sim index 4d05d4ced7..95150616d1 100644 --- a/tests/script/tsim/parser/union.sim +++ b/tests/script/tsim/parser/union.sim @@ -102,11 +102,11 @@ $i = 1 $tb = $tbPrefix . $i ## column type not identical -sql_error select count(*) as a from union_mt0 union all select avg(c1) as a from union_mt0 -sql_error select count(*) as a from union_mt0 union all select spread(c1) as a from union_mt0; +sql select count(*) as a from union_mt0 union all select avg(c1) as a from union_mt0 +sql select count(*) as a from union_mt0 union all select spread(c1) as a from union_mt0; ## union not supported -sql_error (select count(*) from union_mt0) union (select count(*) from union_mt0); +sql (select count(*) from union_mt0) union (select count(*) from union_mt0); ## column type not identical sql_error select c1 from union_mt0 limit 10 union all select c2 from union_tb1 limit 20; @@ -123,145 +123,114 @@ sql (((select c1 from union_tb0))) if $rows != 10000 then return -1 endi - if $data00 != 0 then return -1 endi - if $data10 != 1 then return -1 endi -sql select 'ab' as options from union_tb1 limit 1 union all select 'dd' as options from union_tb0 limit 1; +sql (select 'ab' as options from union_tb1 limit 1) union all (select 'dd' as options from union_tb0 limit 1) order by options; if $rows != 2 then return -1 endi - if $data00 != @ab@ then return -1 endi - if $data10 != @dd@ then return -1 endi - -sql select 'ab' as options from union_tb1 limit 1 union all select '1234567' as options from union_tb0 limit 1; +sql (select 'ab12345' as options from union_tb1 limit 1) union all (select '1234567' as options from union_tb0 limit 1) order by options desc; if $rows != 2 then return -1 endi - -if $data00 != @ab@ then +if $data00 != @ab12345@ then return -1 endi - if $data10 != @1234567@ then return -1 endi - # mixed order -sql select ts, c1 from union_tb1 order by ts asc limit 10 union all select ts, c1 from union_tb0 order by ts desc limit 2 union all select ts, c1 from union_tb2 order by ts asc limit 10 +sql (select ts, c1 from union_tb1 order by ts asc limit 10) union all (select ts, c1 from union_tb0 order by ts desc limit 2) union all (select ts, c1 from union_tb2 order by ts asc limit 10) order by ts if $rows != 22 then return -1 endi - if $data00 != @20-01-05 13:51:24.000@ then return -1 endi - if $data01 != 0 then return -1 endi - -if $data10 != @20-01-05 13:52:24.000@ then +if $data10 != @20-01-05 13:51:24.000@ then return -1 endi - -if $data11 != 1 then +if $data11 != 0 then return -1 endi - -if $data90 != @20-01-05 14:00:24.000@ then +print $data90 $data91 +if $data90 != @20-01-05 13:55:24.000@ then return -1 endi - -if $data91 != 9 then +if $data91 != 4 then return -1 endi # different sort order # super table & normal table mixed up -sql select c3 from union_tb0 limit 2 union all select sum(c1) as c3 from union_mt0; +sql (select c3 from union_tb0 limit 2) union all (select sum(c1) as c3 from union_mt0) order by c3; if $rows != 3 then return -1 endi - if $data00 != 0 then return -1 endi - if $data10 != 1 then return -1 endi - if $data20 != 4950000 then return -1 endi # type compatible -sql select c3 from union_tb0 limit 2 union all select sum(c1) as c3 from union_tb1; +sql (select c3 from union_tb0 limit 2) union all (select sum(c1) as c3 from union_tb1) order by c3; if $rows != 3 then return -1 endi - if $data00 != 0 then return -1 endi - if $data10 != 1 then return -1 endi - if $data20 != 495000 then return -1 endi # two join subclause -sql select count(*) as c from union_tb0, union_tb1 where union_tb0.ts=union_tb1.ts union all select union_tb0.c3 as c from union_tb0, union_tb1 where union_tb0.ts=union_tb1.ts limit 10 +sql (select count(*) as c from union_tb0, union_tb1 where union_tb0.ts=union_tb1.ts) union all (select union_tb0.c3 as c from union_tb0, union_tb1 where union_tb0.ts=union_tb1.ts limit 10) order by c desc if $rows != 11 then return -1 endi - if $data00 != 10000 then return -1 endi - -if $data10 != 0 then +if $data10 != 9 then return -1 endi - -if $data20 != 1 then +if $data20 != 8 then return -1 endi - -if $data90 != 8 then +if $data90 != 1 then return -1 endi print ===========================================tags union # two super table tag union, limit is not active during retrieve tags query -sql select t1 from union_mt0 union all select t1 from union_mt0 -if $rows != 20 then - return -1 -endi - -if $data00 != 0 then - return -1 -endi - -if $data90 != 9 then +sql (select t1 from union_mt0) union all (select t1 from union_mt0) +if $rows != 200000 then return -1 endi @@ -271,39 +240,35 @@ endi #endi #========================================== two super table join subclause print ================two super table join subclause -sql select avg(union_mt0.c1) as c from union_mt0 interval(1h) limit 10 union all select union_mt1.ts, union_mt1.c1/1.0 as c from union_mt0, union_mt1 where union_mt1.ts=union_mt0.ts and union_mt1.t1=union_mt0.t1 limit 5; +sql (select _wstart as ts, avg(union_mt0.c1) as c from union_mt0 interval(1h) limit 10) union all (select union_mt1.ts, union_mt1.c1/1.0 as c from union_mt0, union_mt1 where union_mt1.ts=union_mt0.ts and union_mt1.t1=union_mt0.t1 limit 5); print the rows value is: $rows - if $rows != 15 then return -1 endi # first subclause are empty -sql select count(*) as c from union_tb0 where ts > now + 3650d union all select sum(c1) as c from union_tb1; +sql (select count(*) as c from union_tb0 where ts > now + 3650d) union all (select sum(c1) as c from union_tb1); if $rows != 1 then return -1 endi - if $data00 != 495000 then return -1 endi # all subclause are empty -sql select c1 from union_tb0 limit 0 union all select c1 from union_tb1 where ts>'2021-1-1 0:0:0' +sql (select c1 from union_tb0 limit 0) union all (select c1 from union_tb1 where ts>'2021-1-1 0:0:0') if $rows != 0 then return -1 endi # middle subclause empty -sql select c1 from union_tb0 limit 1 union all select c1 from union_tb1 where ts>'2030-1-1 0:0:0' union all select last(c1) as c1 from union_tb1; +sql (select c1 from union_tb0 limit 1) union all (select c1 from union_tb1 where ts>'2030-1-1 0:0:0' union all select last(c1) as c1 from union_tb1) order by c1; if $rows != 2 then return -1 endi - if $data00 != 0 then return -1 endi - if $data10 != 99 then return -1 endi @@ -319,141 +284,90 @@ sql (select ts, c1 from union_mt0 limit 1) union all (select ts, c1 from union_m if $rows != 2 then return -1 endi - if $data00 != @20-01-05 13:51:24.000@ then return -1 endi - if $data01 != 0 then return -1 endi - if $data10 != @20-01-05 13:51:24.000@ then return -1 endi - if $data11 != 0 then return -1 endi # two aggregated functions for super tables -sql select sum(c1) as a from union_mt0 interval(1s) limit 9 union all select ts, max(c3) as a from union_mt0 limit 2; +sql (select _wstart as ts, sum(c1) as a from union_mt0 interval(1s) limit 9) union all (select ts, max(c3) as a from union_mt0 limit 2) order by ts; if $rows != 10 then return -1 endi - if $data00 != @20-01-05 13:51:24.000@ then return -1 endi - if $data01 != 0 then return -1 endi - if $data10 != @20-01-05 13:52:24.000@ then return -1 endi - if $data11 != 10 then return -1 endi - if $data20 != @20-01-05 13:53:24.000@ then return -1 endi - if $data21 != 20 then return -1 endi - if $data90 != @20-01-05 15:30:24.000@ then return -1 endi - if $data91 != 99 then return -1 endi #================================================================================================= # two aggregated functions for normal tables -sql select sum(c1) as a from union_tb0 limit 1 union all select sum(c3) as a from union_tb1 limit 2; +sql (select sum(c1) as a from union_tb0 limit 1) union all (select sum(c3) as a from union_tb1 limit 2); if $rows != 2 then return -1 endi - if $data00 != 495000 then return -1 endi - if $data10 != 495000 then return -1 endi # two super table query + interval + limit -sql select ts, first(c3) as a from union_mt0 limit 1 union all select sum(c3) as a from union_mt0 interval(1h) limit 1; +sql (select ts, first(c3) as a from union_mt0 limit 1) union all (select _wstart as ts, sum(c3) as a from union_mt0 interval(1h) limit 1) order by ts desc; if $rows != 2 then return -1 endi - if $data00 != @20-01-05 13:51:24.000@ then return -1 endi - if $data01 != 0 then return -1 endi - if $data10 != @20-01-05 13:00:00.000@ then return -1 endi - if $data11 != 360 then return -1 endi -sql select server_status() union all select server_status() -if $rows != 2 then - return -1 -endi - -if $data00 != 1 then - return -1 -endi - -if $data10 != 1 then - return -1 -endi - -sql select client_version() union all select server_version() -if $rows != 2 then - return -1 -endi - -sql select database() union all select database() -if $rows != 2 then - return -1 -endi - -if $data00 != @union_db0@ then - return -1 -endi - -if $data10 != @union_db0@ then - return -1 -endi - -sql select 'aaa' as option from union_tb1 where c1 < 0 limit 1 union all select 'bbb' as option from union_tb0 limit 1 +sql (select 'aaa' as option from union_tb1 where c1 < 0 limit 1) union all (select 'bbb' as option from union_tb0 limit 1) if $rows != 1 then return -1 endi - if $data00 != @bbb@ then return -1 endi - -sql_error show tables union all show tables -sql_error show stables union all show stables -sql_error show databases union all show databases +sql_error (show tables) union all (show tables) +sql_error (show stables) union all (show stables) +sql_error (show databases) union all (show databases) system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/parser/union_sysinfo.sim b/tests/script/tsim/parser/union_sysinfo.sim new file mode 100644 index 0000000000..ea45dc68e1 --- /dev/null +++ b/tests/script/tsim/parser/union_sysinfo.sim @@ -0,0 +1,35 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +sql (select server_status()) union all (select server_status()) +if $rows != 2 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +if $data10 != 1 then + return -1 +endi + +sql (select client_version()) union all (select server_version()) +if $rows != 2 then + return -1 +endi + +sql (select database()) union all (select database()) +if $rows != 2 then + return -1 +endi +if $data00 != @union_db0@ then + return -1 +endi +if $data10 != @union_db0@ then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/1-insert/delete_data.py b/tests/system-test/1-insert/delete_data.py index 4c1426d0b1..1eb270d997 100644 --- a/tests/system-test/1-insert/delete_data.py +++ b/tests/system-test/1-insert/delete_data.py @@ -214,6 +214,24 @@ class TDTestCase: tdSql.checkRows((row_num-i)*tb_num) for j in range(tb_num): self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data) + for i in range(row_num): + tdSql.execute(f'delete from {tbname} where ts between {self.ts} and {self.ts+i}') + tdSql.execute(f'flush database {dbname}') + tdSql.execute('reset query cache') + tdSql.query(f'select {col_name} from {tbname}') + if tb_type == 'ntb' or tb_type == 'ctb': + tdSql.checkRows(row_num - i-1) + self.insert_base_data(col_type,tbname,row_num,base_data) + elif tb_type == 'stb': + tdSql.checkRows(tb_num*(row_num - i-1)) + for j in range(tb_num): + self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data) + tdSql.execute(f'delete from {tbname} where ts between {self.ts+i+1} and {self.ts}') + tdSql.query(f'select {col_name} from {tbname}') + if tb_type == 'ntb' or tb_type == 'ctb': + tdSql.checkRows(row_num) + elif tb_type == 'stb': + tdSql.checkRows(tb_num*row_num) def delete_error(self,tbname,column_name,column_type,base_data): for error_list in ['',f'ts = {self.ts} and',f'ts = {self.ts} or']: if 'binary' in column_type.lower(): @@ -221,7 +239,8 @@ class TDTestCase: elif 'nchar' in column_type.lower(): tdSql.error(f'''delete from {tbname} where {error_list} {column_name} ="{base_data['nchar']}"''') else: - tdSql.error(f'delete from {tbname} where {error_list} {column_name} = {base_data[column_type]}') + tdSql.error(f'delete from {tbname} where {error_list} {column_name} = {base_data[column_type]}') + def delete_data_ntb(self): tdSql.execute(f'create database if not exists {self.dbname}') tdSql.execute(f'use {self.dbname}') diff --git a/tests/system-test/1-insert/update_data.py b/tests/system-test/1-insert/update_data.py index 27e1559d7e..a9c5f39179 100644 --- a/tests/system-test/1-insert/update_data.py +++ b/tests/system-test/1-insert/update_data.py @@ -81,39 +81,63 @@ class TDTestCase: if col_type.lower() == 'double': for error_value in [tdCom.getLongName(self.str_length),True,False,1.1*constant.DOUBLE_MIN,1.1*constant.DOUBLE_MAX]: tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') + if tb_type == 'ctb': + tdSql.error(f'insert into {stbname} values({self.ts},{error_value})') elif col_type.lower() == 'float': for error_value in [tdCom.getLongName(self.str_length),True,False,1.1*constant.FLOAT_MIN,1.1*constant.FLOAT_MAX]: tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') + if tb_type == 'ctb': + tdSql.error(f'insert into {stbname} values({self.ts},{error_value})') elif 'binary' in col_type.lower() or 'nchar' in col_type.lower(): for error_value in [tdCom.getLongName(str_length)]: tdSql.error(f'insert into {tbname} values({self.ts},"{error_value}")') + if tb_type == 'ctb': + tdSql.error(f'insert into {stbname} values({self.ts},{error_value})') elif col_type.lower() == 'bool': for error_value in [tdCom.getLongName(self.str_length)]: tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') + if tb_type == 'ctb': + tdSql.error(f'insert into {stbname} values({self.ts},{error_value})') elif col_type.lower() == 'tinyint': for error_value in [constant.TINYINT_MIN-1,constant.TINYINT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') + if tb_type == 'ctb': + tdSql.error(f'insert into {stbname} values({self.ts},{error_value})') elif col_type.lower() == 'smallint': for error_value in [constant.SMALLINT_MIN-1,constant.SMALLINT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') + if tb_type == 'ctb': + tdSql.error(f'insert into {stbname} values({self.ts},{error_value})') elif col_type.lower() == 'int': for error_value in [constant.INT_MIN-1,constant.INT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') + if tb_type == 'ctb': + tdSql.error(f'insert into {stbname} values({self.ts},{error_value})') elif col_type.lower() == 'bigint': for error_value in [constant.BIGINT_MIN-1,constant.BIGINT_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') + if tb_type == 'ctb': + tdSql.error(f'insert into {stbname} values({self.ts},{error_value})') elif col_type.lower() == 'tinyint unsigned': for error_value in [constant.TINYINT_UN_MIN-1,constant.TINYINT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: - tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') + tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') + if tb_type == 'ctb': + tdSql.error(f'insert into {stbname} values({self.ts},{error_value})') elif col_type.lower() == 'smallint unsigned': for error_value in [constant.SMALLINT_UN_MIN-1,constant.SMALLINT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') + if tb_type == 'ctb': + tdSql.error(f'insert into {stbname} values({self.ts},{error_value})') elif col_type.lower() == 'int unsigned': for error_value in [constant.INT_UN_MIN-1,constant.INT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') + if tb_type == 'ctb': + tdSql.error(f'insert into {stbname} values({self.ts},{error_value})') elif col_type.lower() == 'bigint unsigned': for error_value in [constant.BIGINT_UN_MIN-1,constant.BIGINT_UN_MAX+1,random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX),tdCom.getLongName(self.str_length),True,False]: - tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') + tdSql.error(f'insert into {tbname} values({self.ts},{error_value})') + if tb_type == 'ctb': + tdSql.error(f'insert into {stbname} values({self.ts},{error_value})') tdSql.execute(f'drop table {tbname}') if tb_type == 'ctb': tdSql.execute(f'drop table {stbname}') diff --git a/tests/system-test/1-insert/update_data_muti_rows.py b/tests/system-test/1-insert/update_data_muti_rows.py new file mode 100644 index 0000000000..e7da35426a --- /dev/null +++ b/tests/system-test/1-insert/update_data_muti_rows.py @@ -0,0 +1,179 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import random +import string + +from numpy import logspace +from util import constant +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + self.dbname = 'db_test' + self.ntbname = 'ntb' + self.stbname = 'stb' + self.rowNum = 10 + self.tbnum = 5 + self.ts = 1537146000000 + self.str_length = 20 + self.column_dict = { + 'col1': 'tinyint', + 'col2': 'smallint', + 'col3': 'int', + 'col4': 'bigint', + 'col5': 'tinyint unsigned', + 'col6': 'smallint unsigned', + 'col7': 'int unsigned', + 'col8': 'bigint unsigned', + 'col9': 'float', + 'col10': 'double', + 'col11': 'bool', + 'col12': f'binary({self.str_length})', + 'col13': f'nchar({self.str_length})' + } + self.tinyint_val = random.randint(constant.TINYINT_MIN,constant.TINYINT_MAX) + self.smallint_val = random.randint(constant.SMALLINT_MIN,constant.SMALLINT_MAX) + self.int_val = random.randint(constant.INT_MIN,constant.INT_MAX) + self.bigint_val = random.randint(constant.BIGINT_MIN,constant.BIGINT_MAX) + self.untingint_val = random.randint(constant.TINYINT_UN_MIN,constant.TINYINT_UN_MAX) + self.unsmallint_val = random.randint(constant.SMALLINT_UN_MIN,constant.SMALLINT_UN_MAX) + self.unint_val = random.randint(constant.INT_UN_MIN,constant.INT_MAX) + self.unbigint_val = random.randint(constant.BIGINT_UN_MIN,constant.BIGINT_UN_MAX) + self.float_val = random.uniform(constant.FLOAT_MIN,constant.FLOAT_MAX) + self.double_val = random.uniform(constant.DOUBLE_MIN*(1E-300),constant.DOUBLE_MAX*(1E-300)) + self.bool_val = random.randint(0,2)%2 + self.binary_val = tdCom.getLongName(random.randint(0,self.str_length)) + self.nchar_val = tdCom.getLongName(random.randint(0,self.str_length)) + self.data = { + 'tinyint':self.tinyint_val, + 'smallint':self.smallint_val, + 'int':self.int_val, + 'bigint':self.bigint_val, + 'tinyint unsigned':self.untingint_val, + 'smallint unsigned':self.unsmallint_val, + 'int unsigned':self.unint_val, + 'bigint unsigned':self.unbigint_val, + 'bool':self.bool_val, + 'float':self.float_val, + 'double':self.double_val, + 'binary':self.binary_val, + 'nchar':self.nchar_val + } + def update_data(self,dbname,tbname,tb_num,rows,values,col_type): + sql = f'insert into ' + for j in range(tb_num): + sql += f'{dbname}.{tbname}_{j} values' + for i in range(rows): + if 'binary' in col_type.lower() or 'nchar' in col_type.lower(): + sql += f'({self.ts+i},"{values}")' + else: + sql += f'({self.ts+i},{values})' + sql += ' ' + tdSql.execute(sql) + + def insert_data(self,col_type,tbname,rows,data): + for i in range(rows): + if col_type.lower() == 'tinyint': + tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["tinyint"]})') + elif col_type.lower() == 'smallint': + tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["smallint"]})') + elif col_type.lower() == 'int': + tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["int"]})') + elif col_type.lower() == 'bigint': + tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["bigint"]})') + elif col_type.lower() == 'tinyint unsigned': + tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["tinyint unsigned"]})') + elif col_type.lower() == 'smallint unsigned': + tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["smallint unsigned"]})') + elif col_type.lower() == 'int unsigned': + tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["int unsigned"]})') + elif col_type.lower() == 'bigint unsigned': + tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["bigint unsigned"]})') + elif col_type.lower() == 'bool': + tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["bool"]})') + elif col_type.lower() == 'float': + tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["float"]})') + elif col_type.lower() == 'double': + tdSql.execute(f'insert into {tbname} values({self.ts+i},{data["double"]})') + elif 'binary' in col_type.lower(): + tdSql.execute(f'''insert into {tbname} values({self.ts+i},"{data['binary']}")''') + elif 'nchar' in col_type.lower(): + tdSql.execute(f'''insert into {tbname} values({self.ts+i},"{data['nchar']}")''') + + def data_check(self,dbname,tbname,tbnum,rownum,data,col_name,col_type): + if 'binary' in col_type.lower(): + self.update_data(dbname,f'{tbname}',tbnum,rownum,data['binary'],col_type) + elif 'nchar' in col_type.lower(): + self.update_data(dbname,f'{tbname}',tbnum,rownum,data['nchar'],col_type) + else: + self.update_data(dbname,f'{tbname}',tbnum,rownum,data[col_type],col_type) + tdSql.execute(f'flush database {dbname}') + tdSql.execute('reset query cache') + for i in range(self.tbnum): + tdSql.query(f'select {col_name} from {dbname}.{tbname}_{i}') + for j in range(rownum): + if col_type.lower() == 'float' or col_type.lower() == 'double': + if abs(tdSql.queryResult[j][0] - data[col_type]) / data[col_type] <= 0.0001: + tdSql.checkEqual(tdSql.queryResult[j][0],tdSql.queryResult[j][0]) + elif 'binary' in col_type.lower(): + tdSql.checkEqual(tdSql.queryResult[j][0],data['binary']) + elif 'nchar' in col_type.lower(): + tdSql.checkEqual(tdSql.queryResult[j][0],data['nchar']) + else: + tdSql.checkEqual(tdSql.queryResult[j][0],data[col_type]) + def update_data_ntb(self): + tdSql.execute(f'drop database if exists {self.dbname}') + tdSql.execute(f'create database {self.dbname}') + tdSql.execute(f'use {self.dbname}') + for col_name,col_type in self.column_dict.items(): + for i in range(self.tbnum): + tdSql.execute(f'create table {self.dbname}.{self.ntbname}_{i} (ts timestamp,{col_name} {col_type})') + for j in range(self.rowNum): + tdSql.execute(f'insert into {self.dbname}.{self.ntbname}_{i} values({self.ts+j},null)' ) + tdSql.execute(f'flush database {self.dbname}') + tdSql.execute('reset query cache') + self.data_check(self.dbname,self.ntbname,self.tbnum,self.rowNum,self.data,col_name,col_type) + for i in range(self.tbnum): + tdSql.execute(f'drop table {self.ntbname}_{i}') + def update_data_ctb(self): + tdSql.execute(f'drop database if exists {self.dbname}') + tdSql.execute(f'create database {self.dbname}') + tdSql.execute(f'use {self.dbname}') + for col_name,col_type in self.column_dict.items(): + tdSql.execute(f'create table {self.dbname}.{self.stbname} (ts timestamp,{col_name} {col_type}) tags(t0 int)') + for i in range(self.tbnum): + tdSql.execute(f'create table {self.dbname}.{self.stbname}_{i} using {self.dbname}.{self.stbname} tags(1)') + for j in range(self.rowNum): + tdSql.execute(f'insert into {self.dbname}.{self.stbname}_{i} values({self.ts+j},null)' ) + tdSql.execute(f'flush database {self.dbname}') + tdSql.execute('reset query cache') + self.data_check(self.dbname,self.stbname,self.tbnum,self.rowNum,self.data,col_name,col_type) + tdSql.execute(f'drop table {self.stbname}') + def run(self): + self.update_data_ntb() + self.update_data_ctb() + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/2-query/count.py b/tests/system-test/2-query/count.py index c83ff43c51..e047225c1f 100644 --- a/tests/system-test/2-query/count.py +++ b/tests/system-test/2-query/count.py @@ -94,17 +94,15 @@ class TDTestCase: tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict)) for i in range(self.tbnum): tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})') - #!TODO - # tdSql.query(f'SELECT count(*) from (select distinct tbname from {self.stbname})') - # tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum) + tdSql.query(f'SELECT count(*) from (select distinct tbname from {self.stbname})') + tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum) tdSql.query(f'select count(tbname) from {self.stbname}') tdSql.checkRows(0) tdSql.execute('flush database db') tdSql.query(f'select count(tbname) from {self.stbname}') tdSql.checkRows(0) - #!TODO - # tdSql.query(f'SELECT count(*) from (select distinct tbname from {self.stbname})') - # tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum) + tdSql.query(f'SELECT count(*) from (select distinct tbname from {self.stbname})') + tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum) for i in range(self.tbnum): self.insert_data(self.column_dict,f'{self.stbname}_{i}',self.rowNum) self.count_query_stb(self.column_dict,self.tag_dict,self.stbname,self.tbnum,self.rowNum) diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index 164a6b24ba..a56f79d20f 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -20,6 +20,8 @@ import threading import requests import time # import socketfrom +import json +import toml import taos from util.log import * @@ -207,7 +209,7 @@ class TMQCom: def drop_ctable(self, tsql, dbname=None, count=1, default_ctbname_prefix="ctb",ctbStartIdx=0): for _ in range(count): - create_ctable_sql = f'drop table {dbname}.{default_ctbname_prefix}{ctbStartIdx};' + create_ctable_sql = f'drop table if exists {dbname}.{default_ctbname_prefix}{ctbStartIdx};' ctbStartIdx += 1 tdLog.info("drop ctb sql: %s"%create_ctable_sql) tsql.execute(create_ctable_sql) @@ -503,6 +505,37 @@ class TMQCom: break return + def create_ntable(self, tsql, dbname=None, tbname_prefix="ntb", tbname_index_start_num = 1, column_elm_list=None, colPrefix='c', tblNum=1, **kwargs): + tb_params = "" + if len(kwargs) > 0: + for param, value in kwargs.items(): + tb_params += f'{param} "{value}" ' + column_type_str = tdCom.gen_column_type_str(colPrefix, column_elm_list) + + for _ in range(tblNum): + create_table_sql = f'create table {dbname}.{tbname_prefix}{tbname_index_start_num} ({column_type_str}) {tb_params};' + tbname_index_start_num += 1 + tsql.execute(create_table_sql) + + def insert_rows_into_ntbl(self, tsql, dbname=None, tbname_prefix="ntb", tbname_index_start_num = 1, column_ele_list=None, startTs=None, tblNum=1, rows=1): + if startTs is None: + startTs = tdCom.genTs()[0] + + for tblIdx in range(tblNum): + for rowIdx in range(rows): + column_value_list = tdCom.gen_column_value_list(column_ele_list, f'{startTs}+{rowIdx}s') + column_value_str = '' + idx = 0 + for column_value in column_value_list: + if isinstance(column_value, str) and idx != 0: + column_value_str += f'"{column_value}", ' + else: + column_value_str += f'{column_value}, ' + idx += 1 + column_value_str = column_value_str.rstrip()[:-1] + insert_sql = f'insert into {dbname}.{tbname_prefix}{tblIdx+tbname_index_start_num} values ({column_value_str});' + tsql.execute(insert_sql) + def close(self): self.cursor.close() diff --git a/tests/system-test/7-tmq/tmqDropNtb.py b/tests/system-test/7-tmq/tmqDropNtb.py new file mode 100644 index 0000000000..9200200588 --- /dev/null +++ b/tests/system-test/7-tmq/tmqDropNtb.py @@ -0,0 +1,237 @@ + +import taos +import sys +import time +import socket +import os +import threading +from enum import Enum + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.snapshot = 0 + self.vgroups = 4 + self.ctbNum = 100 + self.rowsPerTbl = 10 + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def waitSubscriptionExit(self, max_wait_count=20): + wait_cnt = 0 + while (wait_cnt < max_wait_count): + tdSql.query("show subscriptions") + if tdSql.getRows() == 0: + break + else: + time.sleep(1) + wait_cnt += 1 + + tdLog.info("wait subscriptions exit for %d s"%wait_cnt) + + # drop some ntbs + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ntb', + 'ctbStartIdx': 0, + 'ctbNum': 100, + 'rowsPerTbl': 1000, + 'batchNum': 1000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'endTs': 0, + 'pollDelay': 5, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + paraDict['snapshot'] = self.snapshot + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdLog.info("start create database....") + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("start create normal tables....") + tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"]) + tdLog.info("start insert data into normal tables....") + tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"]) + + tdLog.info("create topics from database") + topicFromDb = 'topic_dbt' + tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName'])) + + if self.snapshot == 0: + consumerId = 0 + elif self.snapshot == 1: + consumerId = 1 + + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"]) + topicList = topicFromDb + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + tmqCom.getStartConsumeNotifyFromTmqsim() + tdLog.info("drop some ntables") + # drop 1/4 ctbls from half offset + paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2) + paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4) + tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"]) + + tdLog.info("start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + + if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)): + tdLog.exit("tmq consume rows error with snapshot = 0!") + + tdLog.info("wait subscriptions exit ....") + self.waitSubscriptionExit() + + tdSql.query("drop topic %s"%topicFromDb) + tdLog.info("success dorp topic: %s"%topicFromDb) + tdLog.printNoPrefix("======== test case 1 end ...... ") + + + + # drop some ntbs and create some new ntbs + def tmqCase2(self): + tdLog.printNoPrefix("======== test case 2: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ntb', + 'ctbStartIdx': 0, + 'ctbNum': 100, + 'rowsPerTbl': 1000, + 'batchNum': 1000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'endTs': 0, + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + paraDict['snapshot'] = self.snapshot + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdLog.info("start create database....") + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("start create normal tables....") + tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"]) + tdLog.info("start insert data into normal tables....") + tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"]) + + tdLog.info("create topics from database") + topicFromDb = 'topic_dbt' + tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName'])) + + if self.snapshot == 0: + consumerId = 2 + elif self.snapshot == 1: + consumerId = 3 + + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2) + topicList = topicFromDb + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + tmqCom.getStartConsumeNotifyFromTmqsim() + tdLog.info("drop some ntables") + # drop 1/4 ctbls from half offset + paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2) + paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4) + tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"]) + + tdLog.info("start create some new normal tables....") + paraDict["ctbPrefix"] = 'newCtb' + paraDict["ctbNum"] = self.ctbNum + tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"]) + tdLog.info("start insert data into these new normal tables....") + tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"]) + + tdLog.info("start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + + if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)): + tdLog.exit("tmq consume rows error with snapshot = 0!") + + tdLog.info("wait subscriptions exit ....") + self.waitSubscriptionExit() + + tdSql.query("drop topic %s"%topicFromDb) + tdLog.info("success dorp topic: %s"%topicFromDb) + tdLog.printNoPrefix("======== test case 2 end ...... ") + + def run(self): + tdLog.printNoPrefix("=============================================") + tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") + self.snapshot = 0 + # self.tmqCase1() + self.tmqCase2() + + tdLog.printNoPrefix("====================================================================") + tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") + self.snapshot = 1 + # self.tmqCase1() + self.tmqCase2() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index d3dd93f9ca..be526c6ccd 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -32,6 +32,9 @@ python3 ./test.py -f 1-insert/block_wise.py python3 ./test.py -f 1-insert/create_retentions.py python3 ./test.py -f 1-insert/table_param_ttl.py +python3 ./test.py -f 1-insert/update_data_muti_rows.py + + python3 ./test.py -f 2-query/abs.py python3 ./test.py -f 2-query/abs.py -R python3 ./test.py -f 2-query/and_or_for_byte.py @@ -59,7 +62,9 @@ python3 ./test.py -f 2-query/char_length.py -R python3 ./test.py -f 2-query/check_tsdb.py python3 ./test.py -f 2-query/check_tsdb.py -R -# jira python3 ./test.py -f 1-insert/update_data.py + +python3 ./test.py -f 1-insert/update_data.py + python3 ./test.py -f 1-insert/delete_data.py python3 ./test.py -f 2-query/db.py diff --git a/tools/taosadapter b/tools/taosadapter index df8678f070..d8f19ede56 160000 --- a/tools/taosadapter +++ b/tools/taosadapter @@ -1 +1 @@ -Subproject commit df8678f070e3f707faf59baebec90065f6e1268b +Subproject commit d8f19ede56f1f489c5d2ac8f963cced01e68ecef