diff --git a/cmake/cmake.options b/cmake/cmake.options index 5d99b2214a..51d6f53048 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -51,6 +51,13 @@ IF(${TD_WINDOWS}) "If build unit tests using googletest" ON ) + + option( + TDENGINE_3 + "TDengine 3.x" + ON + ) + ELSEIF (TD_DARWIN_64) add_definitions(-DCOMPILER_SUPPORTS_CXX13) option( diff --git a/docs/en/14-reference/03-connector/java.mdx b/docs/en/14-reference/03-connector/java.mdx index 310e0a15c6..22f99bb9ae 100644 --- a/docs/en/14-reference/03-connector/java.mdx +++ b/docs/en/14-reference/03-connector/java.mdx @@ -91,7 +91,7 @@ Add following dependency in the `pom.xml` file of your Maven project: You can build Java connector from source code after cloning the TDengine project: ``` -git clone https://github.com/taosdata/taos-connector-jdbc.git +git clone https://github.com/taosdata/taos-connector-jdbc.git --branch 2.0 cd taos-connector-jdbc mvn clean install -Dmaven.test.skip=true ``` @@ -140,34 +140,34 @@ When you use a JDBC native connection to connect to a TDengine cluster, you can 1. Do not specify hostname and port in Java applications. - ```java - public Connection getConn() throws Exception{ - Class.forName("com.taosdata.jdbc.TSDBDriver"); - String jdbcUrl = "jdbc:TAOS://:/test?user=root&password=taosdata"; - Properties connProps = new Properties(); - connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); - connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8"); - connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); - Connection conn = DriverManager.getConnection(jdbcUrl, connProps); - return conn; - } - ``` + ```java + public Connection getConn() throws Exception{ + Class.forName("com.taosdata.jdbc.TSDBDriver"); + String jdbcUrl = "jdbc:TAOS://:/test?user=root&password=taosdata"; + Properties connProps = new Properties(); + connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); + connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8"); + connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); + Connection conn = DriverManager.getConnection(jdbcUrl, connProps); + return conn; + } + ``` 2. specify the firstEp and the secondEp in the configuration file taos.cfg - ```shell - # first fully qualified domain name (FQDN) for TDengine system - firstEp cluster_node1:6030 + ```shell + # first fully qualified domain name (FQDN) for TDengine system + firstEp cluster_node1:6030 - # second fully qualified domain name (FQDN) for TDengine system, for cluster only - secondEp cluster_node2:6030 + # second fully qualified domain name (FQDN) for TDengine system, for cluster only + secondEp cluster_node2:6030 - # default system charset - # charset UTF-8 + # default system charset + # charset UTF-8 - # system locale - # locale en_US.UTF-8 - ``` + # system locale + # locale en_US.UTF-8 + ``` In the above example, JDBC uses the client's configuration file to establish a connection to a hostname `cluster_node1`, port 6030, and a database named `test`. When the firstEp node in the cluster fails, JDBC attempts to connect to the cluster using secondEp. diff --git a/docs/zh/14-reference/03-connector/java.mdx b/docs/zh/14-reference/03-connector/java.mdx index 838fa2eff8..88a3674671 100644 --- a/docs/zh/14-reference/03-connector/java.mdx +++ b/docs/zh/14-reference/03-connector/java.mdx @@ -93,7 +93,7 @@ Maven 项目中,在 pom.xml 中添加以下依赖: 可以通过下载 TDengine 的源码,自己编译最新版本的 Java connector ```shell -git clone https://github.com/taosdata/taos-connector-jdbc.git +git clone https://github.com/taosdata/taos-connector-jdbc.git --branch 2.0 cd taos-connector-jdbc mvn clean install -Dmaven.test.skip=true ``` diff --git a/include/os/osSignal.h b/include/os/osSignal.h index e22c43684c..12f4f2ed0f 100644 --- a/include/os/osSignal.h +++ b/include/os/osSignal.h @@ -44,7 +44,11 @@ extern "C" { #define SIGBREAK 1234 #endif +#ifdef WINDOWS +typedef BOOL (*FSignalHandler)(DWORD fdwCtrlType); +#else typedef void (*FSignalHandler)(int32_t signum, void *sigInfo, void *context); +#endif void taosSetSignal(int32_t signum, FSignalHandler sigfp); void taosIgnSignal(int32_t signum); void taosDflSignal(int32_t signum); diff --git a/include/os/osSystem.h b/include/os/osSystem.h index 581e688ccb..6770be6e46 100644 --- a/include/os/osSystem.h +++ b/include/os/osSystem.h @@ -29,9 +29,6 @@ extern "C" { #define tcgetattr TCGETATTR_FUNC_TAOS_FORBID #endif -#define TAOS_CONSOLE_PROMPT_HEADER "taos> " -#define TAOS_CONSOLE_PROMPT_CONTINUE " -> " - typedef struct TdCmd *TdCmdPtr; TdCmdPtr taosOpenCmd(const char* cmd); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 76ec5b2d22..6dc78d6400 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -183,7 +183,7 @@ bool tsStartUdfd = true; int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; int32_t tsTtlUnit = 86400; -int32_t tsTtlPushInterval = 60; +int32_t tsTtlPushInterval = 86400; int32_t tsGrantHBInterval = 60; void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) { @@ -466,7 +466,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, 1) != 0) return -1; - if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 10000, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 100000, 1) != 0) return -1; if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; return 0; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 5255724b58..c4299181bd 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -438,12 +438,15 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi } int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) { - metaWLock(pMeta); int ret = metaTtlSmaller(pMeta, ttl, tbUids); if (ret != 0) { - metaULock(pMeta); return ret; } + if (taosArrayGetSize(tbUids) == 0){ + return 0; + } + + metaWLock(pMeta); for (int i = 0; i < taosArrayGetSize(tbUids); ++i) { tb_uid_t *uid = (tb_uid_t *)taosArrayGet(tbUids, i); metaDropTableByUid(pMeta, *uid, NULL); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 9fc51cb59d..4e2750d9f0 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -51,6 +51,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { metaReaderInit(&mr, pTq->pVnode->pMeta, 0); // TODO add reference to gurantee success if (metaGetTableEntryByUid(&mr, uid) < 0) { + metaReaderClear(&mr); return -1; } char* tbName = strdup(mr.me.name); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9e85d0fef5..814c7c060c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -52,11 +52,12 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int #define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0) -#define START_TS_COLUMN_INDEX 0 -#define END_TS_COLUMN_INDEX 1 -#define UID_COLUMN_INDEX 2 -#define GROUPID_COLUMN_INDEX UID_COLUMN_INDEX -#define DELETE_GROUPID_COLUMN_INDEX 2 +#define START_TS_COLUMN_INDEX 0 +#define END_TS_COLUMN_INDEX 1 +#define UID_COLUMN_INDEX 2 +#define GROUPID_COLUMN_INDEX 3 +#define CALCULATE_START_TS_COLUMN_INDEX 4 +#define CALCULATE_END_TS_COLUMN_INDEX 5 enum { // when this task starts to execute, this status will set @@ -347,7 +348,6 @@ typedef enum EStreamScanMode { STREAM_SCAN_FROM_READERHANDLE = 1, STREAM_SCAN_FROM_RES, STREAM_SCAN_FROM_UPDATERES, - STREAM_SCAN_FROM_DATAREADER, // todo(liuyao) delete it STREAM_SCAN_FROM_DATAREADER_RETRIEVE, STREAM_SCAN_FROM_DATAREADER_RANGE, } EStreamScanMode; @@ -367,7 +367,7 @@ typedef struct SStreamAggSupporter { char* pKeyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row - SArray* pScanWindow; + SSDataBlock* pScanBlock; } SStreamAggSupporter; typedef struct SessionWindowSupporter { @@ -420,7 +420,7 @@ typedef struct SStreamScanInfo { int32_t deleteDataIndex; STimeWindow updateWin; STimeWindowAggSupp twAggSup; - + SSDataBlock* pUpdateDataRes; // status for tmq // SSchemaWrapper schema; STqOffset offset; @@ -713,7 +713,6 @@ typedef struct SStreamStateAggOperatorInfo { SSDataBlock* pDelRes; SHashObj* pSeDeleted; void* pDelIterator; - SArray* pScanWindow; SArray* pChildren; // cache for children's result; bool ignoreExpiredData; } SStreamStateAggOperatorInfo; @@ -955,6 +954,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted); bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); +void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid); int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, @@ -971,7 +971,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); -SSDataBlock* createPullDataBlock(); +SSDataBlock* createSpecialDataBlock(EStreamType type); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7cdfecd6b7..6057138964 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5139,8 +5139,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF } pSup->valueSize = size; - pSup->pScanWindow = taosArrayInit(4, sizeof(STimeWindow)); - + pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR); int32_t pageSize = 4096; while (pageSize < pSup->resultRowSize * 4) { pageSize <<= 1u; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 503826afd3..8dd44d0e0f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -25,7 +25,6 @@ #include "tdatablock.h" #include "tmsg.h" -#include "executorimpl.h" #include "query.h" #include "tcompare.h" #include "thash.h" @@ -812,6 +811,10 @@ static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) { return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL; } +static bool isSlidingWindow(SStreamScanInfo* pInfo) { + return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding; +} + static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) { uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t)); if (groupId) { @@ -834,17 +837,10 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) { } static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex); + uint64_t* groupCol = (uint64_t*)pColInfo->pData; ASSERT(rowIndex < pBlock->info.rows); - switch (pBlock->info.type) { - case STREAM_DELETE_DATA: - case STREAM_RETRIEVE: { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex); - uint64_t* groupCol = (uint64_t*)pColInfo->pData; - pInfo->groupId = groupCol[rowIndex]; - } break; - default: - break; - } + pInfo->groupId = groupCol[rowIndex]; } void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { @@ -864,7 +860,17 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); TSKEY* endData = (TSKEY*)pEndTsCol->pData; STimeWindow win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]}; + + SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); + TSKEY* calStartData = (TSKEY*)pCalStartTsCol->pData; + SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + TSKEY* calEndData = (TSKEY*)pCalEndTsCol->pData; + setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex); + if (isSlidingWindow(pInfo)) { + pInfo->updateWin.skey = calStartData[*pRowIndex]; + pInfo->updateWin.ekey = calEndData[*pRowIndex]; + } (*pRowIndex)++; for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) { @@ -876,8 +882,8 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ win.skey = TMIN(win.skey, startData[*pRowIndex]); continue; } - ASSERT((win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) || - (isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0))); + ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) || + !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0))); break; } @@ -908,68 +914,6 @@ static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlo win.ekey = endWin.ekey; } } -static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { - STimeWindow win = { - .skey = INT64_MIN, - .ekey = INT64_MAX, - }; - bool needRead = false; - if (!isStateWindow(pInfo) && (*pRowIndex) < pSDB->info.rows) { - SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, tsColIndex); - TSKEY* tsCols = (TSKEY*)pColDataInfo->pData; - SResultRowInfo dumyInfo; - dumyInfo.cur.pageId = -1; - if (isSessionWindow(pInfo)) { - SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup; - int64_t gap = pInfo->sessionSup.gap; - int32_t winIndex = 0; - SResultWindowInfo* pCurWin = - getSessionTimeWindow(pAggSup, tsCols[*pRowIndex], INT64_MIN, pSDB->info.groupId, gap, &winIndex); - win = pCurWin->win; - setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); - (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL); - } else { - setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); - pInfo->updateWin.skey = tsCols[*pRowIndex]; - win = getSlidingWindow(tsCols, &pInfo->interval, &pSDB->info, pRowIndex); - pInfo->updateWin.ekey = tsCols[*pRowIndex - 1]; - // win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, TSDB_ORDER_ASC); - // (*pRowIndex) += - // getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, - // TSDB_ORDER_ASC); - } - needRead = true; - } else if (isStateWindow(pInfo)) { - SArray* pWins = pInfo->sessionSup.pStreamAggSup->pScanWindow; - int32_t size = taosArrayGetSize(pWins); - if (pInfo->scanWinIndex < size) { - win = *(STimeWindow*)taosArrayGet(pWins, pInfo->scanWinIndex); - pInfo->scanWinIndex++; - needRead = true; - } else { - pInfo->scanWinIndex = 0; - taosArrayClear(pWins); - } - } - if (!needRead) { - return false; - } - resetTableScanInfo(pInfo->pTableScanOp->info, &win); - return true; -} - -static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRowId) { - for (int32_t j = 0; j < taosArrayGetSize(source->pDataBlock); j++) { - SColumnInfoData* pDestCol = (SColumnInfoData*)taosArrayGet(dest->pDataBlock, j); - SColumnInfoData* pSourceCol = (SColumnInfoData*)taosArrayGet(source->pDataBlock, j); - if (colDataIsNull_s(pSourceCol, sourceRowId)) { - colDataAppendNULL(pDestCol, dest->info.rows); - } else { - colDataAppend(pDestCol, dest->info.rows, colDataGetData(pSourceCol, sourceRowId), false); - } - } - dest->info.rows++; -} static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { while (1) { @@ -982,29 +926,6 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 if (!pResult) { blockDataCleanup(pSDB); *pRowIndex = 0; - STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - tsdbReaderClose(pTableScanInfo->dataReader); - pTableScanInfo->dataReader = NULL; - return NULL; - } - - if (pResult->info.groupId == pInfo->groupId) { - return pResult; - } - } -} - -static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { - while (1) { - SSDataBlock* pResult = NULL; - pResult = doTableScan(pInfo->pTableScanOp); - if (pResult == NULL) { - if (prepareDataScan(pInfo, pSDB, tsColIndex, pRowIndex)) { - // scan next window data - pResult = doTableScan(pInfo->pTableScanOp); - } - } - if (!pResult) { pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; tsdbReaderClose(pTableScanInfo->dataReader); @@ -1017,77 +938,31 @@ static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_ return pResult; } } - - /* Todo(liuyao) for partition by column - SSDataBlock* pBlock = createOneDataBlock(pResult, true); - blockDataCleanup(pResult); - for (int32_t i = 0; i < pBlock->info.rows; i++) { - uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock->info.uid); - if (id == pInfo->groupId) { - copyOneRow(pResult, pBlock, i); - } - } - return pResult; - */ -} -static void generateIntervalTs(SStreamScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator, - SSDataBlock* pUpdateRes) { - if (pDelBlock->info.rows == 0) { - return; - } - blockDataCleanup(pUpdateRes); - blockDataEnsureCapacity(pUpdateRes, 64); - ASSERT(taosArrayGetSize(pDelBlock->pDataBlock) >= 3); - SColumnInfoData* pStartTsCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX); - TSKEY* startData = (TSKEY*)pStartTsCol->pData; - SColumnInfoData* pEndTsCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX); - TSKEY* endData = (TSKEY*)pEndTsCol->pData; - SColumnInfoData* pGpCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX); - uint64_t* uidCol = (uint64_t*)pGpCol->pData; - - SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX); - SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX); - for (int32_t i = pInfo->deleteDataIndex; - i < pDelBlock->info.rows && - i < pDelBlock->info.capacity - (endData[i] - startData[i]) / pInfo->interval.interval - 1; - i++) { - uint64_t groupId = getGroupId(pOperator, uidCol[i]); - for (TSKEY startTs = startData[i]; startTs <= endData[i];) { - colDataAppend(pDestTsCol, pUpdateRes->info.rows, (const char*)&startTs, false); - colDataAppend(pDestGpCol, pUpdateRes->info.rows, (const char*)&groupId, false); - pUpdateRes->info.rows++; - startTs = taosTimeAdd(startTs, pInfo->interval.interval, pInfo->interval.intervalUnit, pInfo->interval.precision); - } - pInfo->deleteDataIndex++; - } - - if (pInfo->deleteDataIndex > 0 && pInfo->deleteDataIndex == pDelBlock->info.rows) { - blockDataCleanup(pDelBlock); - pInfo->deleteDataIndex = 0; - } } -static void generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SOperatorInfo* pOperator, - SSDataBlock* pUpdateRes) { - if (pBlock->info.rows == 0) { - return; +static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) { + if (pSrcBlock->info.rows == 0) { + return TSDB_CODE_SUCCESS; } - blockDataCleanup(pUpdateRes); - blockDataEnsureCapacity(pUpdateRes, pBlock->info.rows); - ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3); - SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + blockDataCleanup(pDestBlock); + int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3); + SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* startData = (TSKEY*)pStartTsCol->pData; - SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX); TSKEY* endData = (TSKEY*)pEndTsCol->pData; - SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); - uint64_t* uidCol = (uint64_t*)pGpCol->pData; + SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); + uint64_t* uidCol = (uint64_t*)pUidCol->pData; - SColumnInfoData* pDestStartCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX); - SColumnInfoData* pDestEndCol = taosArrayGet(pUpdateRes->pDataBlock, END_TS_COLUMN_INDEX); - SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX); + SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX); + SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); int32_t dummy = 0; - for (int32_t i = 0; i < pBlock->info.rows; i++) { - uint64_t groupId = getGroupId(pOperator, uidCol[i]); + for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { + uint64_t groupId = getGroupId(pInfo->pTableScanOp, uidCol[i]); // gap must be 0. SResultWindowInfo* pStartWin = getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy); @@ -1101,46 +976,75 @@ static void generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SOper colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false); colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false); colDataAppend(pDestGpCol, i, (const char*)&groupId, false); - pUpdateRes->info.rows++; + pDestBlock->info.rows++; } + return TSDB_CODE_SUCCESS; } -static void setUpdateData(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) { - blockDataCleanup(pUpdateBlock); - int32_t size = taosArrayGetSize(pInfo->tsArray); - if (pInfo->tsArrayIndex < size) { - SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, pInfo->primaryTsIndex); - ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); - blockDataEnsureCapacity(pUpdateBlock, size); - int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex); - pInfo->groupId = getGroupId(pInfo->pTableScanOp, pBlock->info.uid); - int32_t i = 0; - for (; i < size; i++) { - rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex); - uint64_t id = getGroupId(pInfo->pTableScanOp, pBlock->info.uid); - if (pInfo->groupId != id) { - break; - } - copyOneRow(pUpdateBlock, pBlock, rowId); - } - pUpdateBlock->info.rows = i; - pInfo->tsArrayIndex += i; - pUpdateBlock->info.groupId = pInfo->groupId; - pUpdateBlock->info.type = STREAM_CLEAR; - blockDataUpdateTsWindow(pUpdateBlock, 0); +static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) { + blockDataCleanup(pDestBlock); + int32_t rows = pSrcBlock->info.rows; + if (rows == 0) { + return TSDB_CODE_SUCCESS; + } + int32_t code = blockDataEnsureCapacity(pDestBlock, rows); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); + uint64_t* uidCol = (uint64_t*)pUidCol->pData; + ASSERT(pTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); + TSKEY* tsCol = (TSKEY*)pTsCol->pData; + SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX); + SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); + SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); + SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + uint64_t groupId = getGroupId(pInfo->pTableScanOp, uidCol[0]); + for (int32_t i = 0; i < rows; ) { + colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(tsCol + i), false); + STimeWindow win = getSlidingWindow(tsCol, &pInfo->interval, &pSrcBlock->info, &i); + colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(tsCol + i - 1), false); + + colDataAppend(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false); + colDataAppend(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false); + colDataAppend(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false); + pDestBlock->info.rows++; } // all rows have same group id - ASSERT(pInfo->tsArrayIndex >= size); - if (size > 0 && pInfo->tsArrayIndex == size) { - taosArrayClear(pInfo->tsArray); - } + pDestBlock->info.groupId = groupId; + return TSDB_CODE_SUCCESS; +} - if (size == 0) { - generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pUpdateBlock); +static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) { + int32_t code = TSDB_CODE_SUCCESS; + if (isIntervalWindow(pInfo)) { + code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock); + } else { + code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock); } + pDestBlock->info.type = STREAM_CLEAR; + blockDataUpdateTsWindow(pDestBlock, 0); + return code; +} + +void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid) { + SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); + colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false); + colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false); + colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false); + pBlock->info.rows++; } static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) { + if (out) { + blockDataCleanup(pInfo->pUpdateDataRes); + blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows); + } SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* tsCol = (TSKEY*)pColDataInfo->pData; @@ -1151,9 +1055,13 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock // must check update info first. bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup))) && out) { - taosArrayPush(pInfo->tsArray, &rowId); + appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid); } } + if (out) { + blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0); + pInfo->pUpdateDataRes->info.type = STREAM_CLEAR; + } } static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t uidColIndex) { @@ -1320,26 +1228,18 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { case STREAM_RETRIEVE: { pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE; - copyDataBlock(pInfo->pPullDataRes, pBlock); - pInfo->pullDataResIndex = 0; - prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex); + copyDataBlock(pInfo->pUpdateRes, pBlock); + prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo); } break; case STREAM_DELETE_DATA: { pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->updateResIndex = 0; - if (isIntervalWindow(pInfo)) { - copyDataBlock(pInfo->pDeleteDataRes, pBlock); - generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pInfo->pUpdateRes); - prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex); - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; - } else { - generateScanRange(pInfo, pBlock, pInfo->pTableScanOp, pInfo->pUpdateRes); - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - } - pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA; - return pInfo->pUpdateRes; + generateScanRange(pInfo, pBlock, pInfo->pUpdateRes); + prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + return pInfo->pDeleteDataRes; } break; default: break; @@ -1352,51 +1252,27 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; return pInfo->pRes; } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) { - if (isStateWindow(pInfo)) { - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - } else { - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; - prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); - } + generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); return pInfo->pUpdateRes; - } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) { - SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex); - if (pSDB != NULL) { - checkUpdateData(pInfo, true, pSDB, false); - pSDB->info.type = STREAM_PULL_DATA; - return pSDB; - } - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; - } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) { - SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); - if (pSDB) { - pSDB->info.type = STREAM_NORMAL; - checkUpdateData(pInfo, true, pSDB, false); - return pSDB; - } - setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes); - if (pInfo->pUpdateRes->info.rows > 0) { - prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); - return pInfo->pUpdateRes; - } - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE) { + } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE || pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) { SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { - pSDB->info.type = STREAM_NORMAL; + pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); return pSDB; } pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - } else if (isStateWindow(pInfo)) { - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; - pInfo->updateResIndex = pInfo->pUpdateRes->info.rows; - if (prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex)) { - blockDataCleanup(pInfo->pUpdateRes); - // return empty data blcok - return pInfo->pUpdateRes; - } - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + } + + if (isStateWindow(pInfo) && pInfo->sessionSup.pStreamAggSup->pScanBlock->info.rows > 0) { + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + pInfo->updateResIndex = 0; + copyDataBlock(pInfo->pUpdateRes, pInfo->sessionSup.pStreamAggSup->pScanBlock); + blockDataCleanup(pInfo->sessionSup.pStreamAggSup->pScanBlock); + prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + return pInfo->pUpdateRes; } SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; @@ -1455,15 +1331,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } else if (pInfo->pUpdateInfo) { pInfo->tsArrayIndex = 0; checkUpdateData(pInfo, true, pInfo->pRes, true); - setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey); - if (pInfo->pUpdateRes->info.rows > 0) { - if (pInfo->pUpdateRes->info.type == STREAM_CLEAR) { + if (pInfo->pUpdateDataRes->info.rows > 0) { + if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) { pInfo->updateResIndex = 0; pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; - } else if (pInfo->pUpdateRes->info.type == STREAM_INVERT) { + } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) { pInfo->scanMode = STREAM_SCAN_FROM_RES; - return pInfo->pUpdateRes; + return pInfo->pUpdateDataRes; } } } @@ -1629,17 +1504,18 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } pInfo->pRes = createResDataBlock(pDescNode); - pInfo->pUpdateRes = createResDataBlock(pDescNode); + pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); pInfo->pCondition = pScanPhyNode->node.pConditions; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->groupId = 0; - pInfo->pPullDataRes = createPullDataBlock(); + pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE); pInfo->pStreamScanOp = pOperator; pInfo->deleteDataIndex = 0; - pInfo->pDeleteDataRes = createPullDataBlock(); + pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA); pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX}; + pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR); pOperator->name = "StreamScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 66ff739c5a..19abb88df7 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1373,8 +1373,10 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pInterval, int32_t numOfOutput, SSDataBlock* pBlock, SArray* pUpWins) { - SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); - TSKEY* tsCols = (TSKEY*)pTsCol->pData; + SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData; + SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + TSKEY* endTsCols = (TSKEY*)pEndTsCol->pData; uint64_t* pGpDatas = NULL; if (pBlock->info.type == STREAM_RETRIEVE) { SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); @@ -1382,22 +1384,18 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* } int32_t step = 0; int32_t startPos = 0; - SResultRowInfo dumyInfo; - dumyInfo.cur.pageId = -1; - STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[0], pInterval, TSDB_ORDER_ASC); - while (1) { - step = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); - uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId; - bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TSKEY), winGpId, numOfOutput); - if (pUpWins && res) { - SWinRes winRes = {.ts = win.skey, .groupId = winGpId}; - taosArrayPush(pUpWins, &winRes); - } - int32_t prevEndPos = step - 1 + startPos; - startPos = getNextQualifiedWindow(pInterval, &win, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC); - if (startPos < 0) { - break; + for (int32_t i = 0; i < pBlock->info.rows; i++) { + SResultRowInfo dumyInfo; + dumyInfo.cur.pageId = -1; + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC); + while (win.ekey <= endTsCols[i]) { + uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId; + bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TSKEY), winGpId, numOfOutput); + if (pUpWins && res) { + SWinRes winRes = {.ts = win.skey, .groupId = winGpId}; + taosArrayPush(pUpWins, &winRes); + } + getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); } } } @@ -1501,7 +1499,7 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo } blockDataEnsureCapacity(pBlock, size - *index); SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); - SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, DELETE_GROUPID_COLUMN_INDEX); + SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); for (int32_t i = *index; i < size; i++) { SWinRes* pWin = taosArrayGet(pWins, i); colDataAppend(pTsCol, pBlock->info.rows, (const char*)&pWin->ts, false); @@ -1798,10 +1796,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes)); pInfo->delIndex = 0; - // pInfo->pDelRes = createPullDataBlock(); todo(liuyao) for delete - pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete - pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete - + pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); initResultRowInfo(&pInfo->binfo.resultRowInfo); pOperator->name = "TimeIntervalAggOperator"; @@ -2603,14 +2598,6 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc chId = getChildIndex(pSDataBlock); index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); } - // if (index != -1 && pSDataBlock->info.type == STREAM_PULL_DATA) { - // qDebug("===stream===delete child id %d", chId); - // taosArrayRemove(chArray, index); - // if (taosArrayGetSize(chArray) == 0) { - // // pull data is over - // taosHashRemove(pInfo->pPullDataMap, &winRes, sizeof(SWinRes)); - // } - // } if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) { ignore = false; } @@ -2702,16 +2689,18 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB } blockDataEnsureCapacity(pBlock, size - (*pIndex)); ASSERT(3 <= taosArrayGetSize(pBlock->pDataBlock)); + SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + SColumnInfoData* pCalStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); + SColumnInfoData* pCalEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); for (; (*pIndex) < size; (*pIndex)++) { SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex)); - SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false); - - SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false); - - SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false); + colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false); + colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false); pBlock->info.rows++; } if ((*pIndex) == size) { @@ -2830,7 +2819,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { continue; } removeResults(pUpWins, pUpdated); - copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); + copyDataBlock(pInfo->pUpdateRes, pBlock); + // copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); pInfo->returnUpdate = true; taosArrayDestroy(pUpWins); break; @@ -2938,12 +2928,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return NULL; } -SSDataBlock* createPullDataBlock() { +SSDataBlock* createSpecialDataBlock(EStreamType type) { SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); pBlock->info.hasVarCol = false; pBlock->info.groupId = 0; pBlock->info.rows = 0; - pBlock->info.type = STREAM_RETRIEVE; + pBlock->info.type = type; pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t); pBlock->pDataBlock = taosArrayInit(3, sizeof(SColumnInfoData)); @@ -2957,6 +2947,14 @@ SSDataBlock* createPullDataBlock() { infoData.info.type = TSDB_DATA_TYPE_UBIGINT; infoData.info.bytes = sizeof(uint64_t); + // uid + taosArrayPush(pBlock->pDataBlock, &infoData); + // group id + taosArrayPush(pBlock->pDataBlock, &infoData); + + // calculate start ts + taosArrayPush(pBlock->pDataBlock, &infoData); + // calculate end ts taosArrayPush(pBlock->pDataBlock, &infoData); return pBlock; @@ -3024,8 +3022,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, goto _error; } } - pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - pInfo->pUpdateRes->info.type = STREAM_CLEAR; + pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); blockDataEnsureCapacity(pInfo->pUpdateRes, 128); pInfo->returnUpdate = false; @@ -3047,11 +3044,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pullIndex = 0; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK); - pInfo->pPullDataRes = createPullDataBlock(); + pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE); pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; - // pInfo->pDelRes = createPullDataBlock(); // todo(liuyao) for delete - pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete - pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete + pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->delIndex = 0; pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes)); @@ -3066,7 +3061,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); - + if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { + initIntervalDownStream(downstream, pPhyNode->type); + } code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -3091,6 +3088,7 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) { } taosHashCleanup(pSup->pResultRows); destroyDiskbasedBuf(pSup->pResultBuf); + blockDataDestroy(pSup->pScanBlock); } void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { @@ -3205,7 +3203,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pDelIterator = NULL; - // pInfo->pDelRes = createPullDataBlock(); + // pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete pInfo->pChildren = NULL; @@ -3564,7 +3562,7 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc TSKEY* startDatas = (TSKEY*)pStartTsCol->pData; SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); TSKEY* endDatas = (TSKEY*)pEndTsCol->pData; - SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); + SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); uint64_t* gpDatas = (uint64_t*)pGroupCol->pData; for (int32_t i = 0; i < pBlock->info.rows; i++) { int32_t winIndex = 0; @@ -4260,7 +4258,6 @@ static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc step = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCol, pKeyColInfo, pBlock->info.rows, i, &allEqual, pSeDeleted); ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData)); - taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); deleteWindow(pAggSup->pCurWins, winIndex); } @@ -4285,8 +4282,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } else { return; } - + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + blockDataEnsureCapacity(pAggSup->pScanBlock, pSDataBlock->info.rows); SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId); for (int32_t i = 0; i < pSDataBlock->info.rows; i += winRows) { if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) { @@ -4301,7 +4299,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, pKeyColInfo, pSDataBlock->info.rows, i, &allEqual, pInfo->pSeDeleted); if (!allEqual) { - taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win); + appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, + &pSDataBlock->info.groupId); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); deleteWindow(pAggSup->pCurWins, winIndex); continue; @@ -4465,7 +4464,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pDelIterator = NULL; - // pInfo->pDelRes = createPullDataBlock(); // todo(liuyao) for delete + // pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete pInfo->pChildren = NULL; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 05f84df7f8..2ea9652a4a 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -476,16 +476,16 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); - char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); + SFirstLastRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); int32_t type = pDestCtx->input.pData[0]->info.type; int32_t bytes = pDestCtx->input.pData[0]->info.bytes; SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); - char* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); + SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); - if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || *(TSKEY*)(pDBuf + bytes) > *(TSKEY*)(pSBuf + bytes))) { - memcpy(pDBuf, pSBuf, bytes); - *(TSKEY*)(pDBuf + bytes) = *(TSKEY*)(pSBuf + bytes); + if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || pDBuf->ts > pSBuf->ts)) { + memcpy(pDBuf->buf, pSBuf->buf, bytes); + pDBuf->ts = pSBuf->ts; pDResInfo->numOfRes = 1; } return TSDB_CODE_SUCCESS; @@ -2994,16 +2994,16 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { // todo rewrite: int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); - char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); + SFirstLastRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); int32_t type = pDestCtx->input.pData[0]->info.type; int32_t bytes = pDestCtx->input.pData[0]->info.bytes; SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); - char* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); + SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); - if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || *(TSKEY*)(pDBuf + bytes) < *(TSKEY*)(pSBuf + bytes))) { - memcpy(pDBuf, pSBuf, bytes); - *(TSKEY*)(pDBuf + bytes) = *(TSKEY*)(pSBuf + bytes); + if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || pDBuf->ts < pSBuf->ts)) { + memcpy(pDBuf->buf, pSBuf->buf, bytes); + pDBuf->ts = pSBuf->ts; pDResInfo->numOfRes = 1; } return TSDB_CODE_SUCCESS; diff --git a/source/os/src/osSystem.c b/source/os/src/osSystem.c index 9c66ac1df8..ad7fa57182 100644 --- a/source/os/src/osSystem.c +++ b/source/os/src/osSystem.c @@ -18,10 +18,6 @@ #include "os.h" #if defined(WINDOWS) -BOOL WINAPI CtrlHandler(DWORD fdwCtrlType) { - printf("\n" TAOS_CONSOLE_PROMPT_HEADER); - return TRUE; -} #elif defined(_TD_DARWIN_64) #else #include @@ -128,7 +124,6 @@ int taosSetConsoleEcho(bool on) { void taosSetTerminalMode() { #if defined(WINDOWS) - SetConsoleCtrlHandler(CtrlHandler, TRUE); #else struct termios newtio; @@ -179,7 +174,6 @@ int32_t taosGetOldTerminalMode() { void taosResetTerminalMode() { #if defined(WINDOWS) - SetConsoleCtrlHandler(CtrlHandler, FALSE); #else if (tcsetattr(0, TCSANOW, &oldtio) != 0) { fprintf(stderr, "Fail to reset the terminal properties!\n"); diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 38613a77e6..e8f1f06ef1 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -58,7 +58,7 @@ static const int32_t TEST_NUMBER = 1; #define is_bigendian() ((*(char *)&TEST_NUMBER) == 0) -#define SIMPLE8B_MAX_INT64 ((uint64_t)2305843009213693951LL) +#define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL) #define safeInt64Add(a, b) (((a >= 0) && (b <= INT64_MAX - a)) || ((a < 0) && (b >= INT64_MIN - a))) #define ZIGZAG_ENCODE(T, v) ((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1) // zigzag encode @@ -101,8 +101,8 @@ int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; char bit_to_selector[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 12, 13, 13, 13, 13, 13, - 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, - 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; + 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, + 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; // get the byte limit. int32_t word_length = 0; diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 052bc62f59..561bf2d5ba 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -69,6 +69,7 @@ ./test.sh -f tsim/insert/basic.sim ./test.sh -f tsim/insert/basic0.sim ./test.sh -f tsim/insert/basic1.sim +./test.sh -f tsim/insert/basic2.sim ./test.sh -f tsim/insert/commit-merge0.sim ./test.sh -f tsim/insert/insert_drop.sim ./test.sh -f tsim/insert/insert_select.sim @@ -109,7 +110,7 @@ ./test.sh -f tsim/parser/fill.sim ./test.sh -f tsim/parser/first_last.sim ./test.sh -f tsim/parser/fourArithmetic-basic.sim -# TD-17659 TD-17658 ./test.sh -f tsim/parser/function.sim +# TD-17659 ./test.sh -f tsim/parser/function.sim ./test.sh -f tsim/parser/groupby-basic.sim # ./test.sh -f tsim/parser/groupby.sim # TD-17622 ./test.sh -f tsim/parser/having_child.sim @@ -117,7 +118,7 @@ ./test.sh -f tsim/parser/import_commit1.sim ./test.sh -f tsim/parser/import_commit2.sim ./test.sh -f tsim/parser/import_commit3.sim -# TD-17663 ./test.sh -f tsim/parser/import_file.sim +./test.sh -f tsim/parser/import_file.sim ./test.sh -f tsim/parser/import.sim ./test.sh -f tsim/parser/insert_multiTbl.sim ./test.sh -f tsim/parser/insert_tb.sim diff --git a/tests/script/jenkins/clusterCase.txt b/tests/script/jenkins/clusterCase.txt new file mode 100644 index 0000000000..b6b20e9517 --- /dev/null +++ b/tests/script/jenkins/clusterCase.txt @@ -0,0 +1,29 @@ + +#======================b1-start=============== + +# ---- mnode +./test.sh -f tsim/mnode/basic1.sim +./test.sh -f tsim/mnode/basic2.sim +./test.sh -f tsim/mnode/basic3.sim +./test.sh -f tsim/mnode/basic4.sim +./test.sh -f tsim/mnode/basic5.sim + +# --- vnode +# unsupport ./test.sh -f tsim/vnode/replica3_basic.sim +# unsupport ./test.sh -f tsim/vnode/replica3_repeat.sim +# unsupport ./test.sh -f tsim/vnode/replica3_vgroup.sim +# unsupport ./test.sh -f tsim/vnode/replica3_many.sim +# unsupport ./test.sh -f tsim/vnode/replica3_import.sim +# unsupport ./test.sh -f tsim/vnode/stable_balance_replica1.sim +# unsupport ./test.sh -f tsim/vnode/stable_dnode2_stop.sim +./test.sh -f tsim/vnode/stable_dnode2.sim +./test.sh -f tsim/vnode/stable_dnode3.sim +./test.sh -f tsim/vnode/stable_replica3_dnode6.sim +./test.sh -f tsim/vnode/stable_replica3_vnode3.sim + +# --- sync +# jira ./test.sh -f tsim/sync/3Replica1VgElect.sim +./test.sh -f tsim/sync/3Replica5VgElect.sim +./test.sh -f tsim/sync/oneReplica1VgElect.sim +./test.sh -f tsim/sync/oneReplica5VgElect.sim + diff --git a/tests/script/tsim/dnode/offline_reason.sim b/tests/script/tsim/dnode/offline_reason.sim index 3c6fff8b59..3608dd4a3a 100644 --- a/tests/script/tsim/dnode/offline_reason.sim +++ b/tests/script/tsim/dnode/offline_reason.sim @@ -40,7 +40,7 @@ if $data(2)[4] != ready then endi print ========== step3 -system sh/exec.sh -n dnode2 -s stop +system sh/exec.sh -n dnode2 -s stop $x = 0 step3: @@ -64,9 +64,10 @@ if $rows != 1 then endi print ========== step5 -system sh/exec.sh -n dnode2 -s start sql create dnode $hostname port 7200 +system sh/exec.sh -n dnode2 -s start +return $x = 0 step5: $x = $x + 1 diff --git a/tests/script/tsim/insert/basic2.sim b/tests/script/tsim/insert/basic2.sim new file mode 100644 index 0000000000..0bd64b1d02 --- /dev/null +++ b/tests/script/tsim/insert/basic2.sim @@ -0,0 +1,322 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +print =============== create database +sql create database d0 keep 365000d,365000d,365000d +sql use d0 + +print =============== create super table +sql create table if not exists stb (ts timestamp, c1 int unsigned, c2 double, c3 binary(10), c4 nchar(10), c5 double) tags (city binary(20),district binary(20)); + +sql show stables +if $rows != 1 then + return -1 +endi + +print =============== create child table +sql create table ct1 using stb tags("BeiJing", "ChaoYang") +sql create table ct2 using stb tags("BeiJing", "HaiDian") + +sql show tables +if $rows != 2 then + return -1 +endi + +print =============== step3-1 insert records into ct1 +sql insert into ct1 values('2022-05-03 16:59:00.010', 10, 20, 'n','n',30); +sql insert into ct1 values('2022-05-03 16:59:00.011', 'N', 'n', 'N',"N",30); +sql insert into ct1 values('2022-05-03 16:59:00.012', 'Nu', 'nul', 'Nul','NUL',30); +sql insert into ct1 values('2022-05-03 16:59:00.013', NULL, 'null', 'Null',null,30); +sql insert into ct1 values('2022-05-03 16:59:00.014', NULL, 'NuLL', 'Null',NULL,30); + +sql_error insert into ct1 values('2022-05-03 16:59:00.015', NULL, 20, 'Null',NUL,30); +sql_error insert into ct1 values('2022-05-03 16:59:00.015', NULL, 20, 'Null',NU,30); +sql_error insert into ct1 values('2022-05-03 16:59:00.015', NULL, 20, 'Null',Nu,30); +sql_error insert into ct1 values('2022-05-03 16:59:00.015', NULL, 20, 'Null',N,30); +sql_error insert into ct1 values('2022-05-03 16:59:00.015', N, 20, 'Null',NULL,30); +sql_error insert into ct1 values('2022-05-03 16:59:00.015', Nu, 20, 'Null',NULL,30); +sql_error insert into ct1 values('2022-05-03 16:59:00.015', Nul, 20, 'Null',NULL,30); + +print =============== step3-1 query records of ct1 from memory +sql select * from ct1; +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 + +if $rows != 5 then + print rows $rows != 5 + return -1 +endi + +if $data01 != 10 then + print data01 $data01 != 10 + return -1 +endi + +if $data02 != 20.000000000 then + print data02 $data02 != 20.000000000 + return -1 +endi + +if $data03 != n then + print data03 $data03 != n + return -1 +endi + +if $data04 != n then + print data04 $data04 != n + return -1 +endi + +if $data05 != 30.000000000 then + print data05 $data05 != 30.000000000 + return -1 +endi + +if $data11 != NULL then + print data11 $data11 != NULL + return -1 +endi + +if $data12 != NULL then + print data12 $data12 != NULL + return -1 +endi + +if $data13 != N then + print data13 $data13 != N + return -1 +endi + +if $data14 != N then + print data14 $data14 != N + return -1 +endi + +if $data15 != 30.000000000 then + print data15 $data15 != 30.000000000 + return -1 +endi + +if $data21 != NULL then + print data21 $data21 != NULL + return -1 +endi + +if $data22 != NULL then + print data22 $data22 != NULL + return -1 +endi + +if $data23 != Nul then + print data23 $data23 != Nul + return -1 +endi + +if $data24 != NUL then + print data24 $data24 != NUL + return -1 +endi + +if $data25 != 30.000000000 then + print data25 $data25 != 30.000000000 + return -1 +endi + +if $data31 != NULL then + print data31 $data31 != NULL + return -1 +endi + +if $data32 != NULL then + print data32 $data32 != NULL + return -1 +endi + +if $data33 != Null then + print data33 $data33 != Null + return -1 +endi + +if $data34 != NULL then + print data34 $data34 != NULL + return -1 +endi + +if $data35 != 30.000000000 then + print data35 $data35 != 30.000000000 + return -1 +endi + +if $data41 != NULL then + print data41 $data41 != NULL + return -1 +endi + +if $data42 != NULL then + print data42 $data42 != NULL + return -1 +endi + +if $data43 != Null then + print data43 $data43 != Null + return -1 +endi + +if $data44 != NULL then + print data44 $data44 != NULL + return -1 +endi + +if $data45 != 30.000000000 then + print data45 $data45 != 30.000000000 + return -1 +endi + +#==================== reboot to trigger commit data to file +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s start + +print =============== step3-2 query records of ct1 from file +sql select * from ct1; +print $data00 $data01 $data02 $data03 $data04 $data05 +print $data10 $data11 $data12 $data13 $data14 $data15 +print $data20 $data21 $data22 $data23 $data24 $data25 +print $data30 $data31 $data32 $data33 $data34 $data35 +print $data40 $data41 $data42 $data43 $data44 $data45 + +if $rows != 5 then + print rows $rows != 5 + return -1 +endi + +if $data01 != 10 then + print data01 $data01 != 10 + return -1 +endi + +if $data02 != 20.000000000 then + print data02 $data02 != 20.000000000 + return -1 +endi + +if $data03 != n then + print data03 $data03 != n + return -1 +endi + +if $data04 != n then + print data04 $data04 != n + return -1 +endi + +if $data05 != 30.000000000 then + print data05 $data05 != 30.000000000 + return -1 +endi + +if $data11 != NULL then + print data11 $data11 != NULL + return -1 +endi + +if $data12 != NULL then + print data12 $data12 != NULL + return -1 +endi + +if $data13 != N then + print data13 $data13 != N + return -1 +endi + +if $data14 != N then + print data14 $data14 != N + return -1 +endi + +if $data15 != 30.000000000 then + print data15 $data15 != 30.000000000 + return -1 +endi + +if $data21 != NULL then + print data21 $data21 != NULL + return -1 +endi + +if $data22 != NULL then + print data22 $data22 != NULL + return -1 +endi + +if $data23 != Nul then + print data23 $data23 != Nul + return -1 +endi + +if $data24 != NUL then + print data24 $data24 != NUL + return -1 +endi + +if $data25 != 30.000000000 then + print data25 $data25 != 30.000000000 + return -1 +endi + +if $data31 != NULL then + print data31 $data31 != NULL + return -1 +endi + +if $data32 != NULL then + print data32 $data32 != NULL + return -1 +endi + +if $data33 != Null then + print data33 $data33 != Null + return -1 +endi + +if $data34 != NULL then + print data34 $data34 != NULL + return -1 +endi + +if $data35 != 30.000000000 then + print data35 $data35 != 30.000000000 + return -1 +endi + +if $data41 != NULL then + print data41 $data41 != NULL + return -1 +endi + +if $data42 != NULL then + print data42 $data42 != NULL + return -1 +endi + +if $data43 != Null then + print data43 $data43 != Null + return -1 +endi + +if $data44 != NULL then + print data44 $data44 != NULL + return -1 +endi + +if $data45 != 30.000000000 then + print data45 $data45 != 30.000000000 + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/insert/commit-merge0.sim b/tests/script/tsim/insert/commit-merge0.sim index 5fe7cc57b3..66486c4c31 100644 --- a/tests/script/tsim/insert/commit-merge0.sim +++ b/tests/script/tsim/insert/commit-merge0.sim @@ -242,3 +242,5 @@ else $reboot_cnt = $reboot_cnt + 1 goto reboot_and_check endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/insert/update0.sim b/tests/script/tsim/insert/update0.sim index c6843acb9d..c4bd29615b 100644 --- a/tests/script/tsim/insert/update0.sim +++ b/tests/script/tsim/insert/update0.sim @@ -226,4 +226,6 @@ endi if $data41 != NULL then print data41 $data41 != NULL return -1 -endi \ No newline at end of file +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/insert/update1_sort_merge.sim b/tests/script/tsim/insert/update1_sort_merge.sim index 79d72b43a0..13462520ea 100644 --- a/tests/script/tsim/insert/update1_sort_merge.sim +++ b/tests/script/tsim/insert/update1_sort_merge.sim @@ -816,4 +816,6 @@ endi if $data44 != n8 then print data44 $data44 != n8 return -1 -endi \ No newline at end of file +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/parser/function.sim b/tests/script/tsim/parser/function.sim index 7dd66bedb0..7927715988 100644 --- a/tests/script/tsim/parser/function.sim +++ b/tests/script/tsim/parser/function.sim @@ -278,7 +278,6 @@ sql create stable td6086st(ts timestamp, d double) tags(t nchar(50)); sql create table td6086ct1 using td6086st tags("ct1"); sql create table td6086ct2 using td6086st tags("ct2"); -return sql SELECT LAST(d),t FROM td6086st WHERE tbname in ('td6086ct1', 'td6086ct2') and ts>="2019-07-30 00:00:00" and ts<="2021-08-31 00:00:00" partition BY tbname interval(1800s) fill(prev); print ==================> td-2624 diff --git a/tests/script/tsim/parser/gendata.sh b/tests/script/tsim/parser/gendata.sh index b2074147ca..eb3676729e 100755 --- a/tests/script/tsim/parser/gendata.sh +++ b/tests/script/tsim/parser/gendata.sh @@ -3,6 +3,6 @@ Cur_Dir=$(pwd) echo $Cur_Dir -echo "'2020-1-1 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> ~/data.sql -echo "'2020-1-2 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> ~/data.sql -echo "'2020-1-3 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> ~/data.sql +echo "'2020-1-1 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> /tmp/data.sql +echo "'2020-1-2 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> /tmp/data.sql +echo "'2020-1-3 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> /tmp/data.sql diff --git a/tests/script/tsim/parser/import_file.sim b/tests/script/tsim/parser/import_file.sim index 5c778a5875..0250af16b3 100644 --- a/tests/script/tsim/parser/import_file.sim +++ b/tests/script/tsim/parser/import_file.sim @@ -7,7 +7,7 @@ sql drop database if exists indb sql create database if not exists indb sql use indb -$inFileName = '~/data.csv' +$inFileName = '/tmp/data.csv' $numOfRows = 10000 system tsim/parser/gendata.sh @@ -16,8 +16,8 @@ sql create table stbx (ts TIMESTAMP, collect_area NCHAR(12), device_id BINARY(16 sql create table tbx (ts TIMESTAMP, collect_area NCHAR(12), device_id BINARY(16), imsi BINARY(16), imei BINARY(16), mdn BINARY(10), net_type BINARY(4), mno NCHAR(4), province NCHAR(10), city NCHAR(16), alarm BINARY(2)) print ====== create tables success, starting insert data -sql insert into tbx file '~/data.sql' -sql import into tbx file '~/data.sql' +sql insert into tbx file '/tmp/data.sql' +sql import into tbx file '/tmp/data.sql' sql select count(*) from tbx if $rows != 1 then @@ -31,8 +31,8 @@ endi sql drop table tbx; -sql insert into tbx using stbx tags(1,'abc') file '~/data.sql'; -sql insert into tbx using stbx tags(1,'abc') file '~/data.sql'; +sql insert into tbx using stbx tags(1,'abc') file '/tmp/data.sql'; +sql insert into tbx using stbx tags(1,'abc') file '/tmp/data.sql'; sql select count(*) from tbx if $rows != 1 then @@ -44,7 +44,7 @@ if $data00 != 3 then endi sql drop table tbx; -sql insert into tbx using stbx(b) tags('abcf') file '~/data.sql'; +sql insert into tbx using stbx(b) tags('abcf') file '/tmp/data.sql'; sql select ts,a,b from tbx; if $rows != 3 then @@ -64,6 +64,6 @@ if $data02 != @abcf@ then return -1 endi -system rm -f ~/data.sql +system rm -f /tmp/data.sql system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/parser/lastrow_query.sim b/tests/script/tsim/parser/lastrow_query.sim index cb523d5c8e..5ffba38d14 100644 --- a/tests/script/tsim/parser/lastrow_query.sim +++ b/tests/script/tsim/parser/lastrow_query.sim @@ -66,32 +66,32 @@ if $row != 21600 then endi #regression test case 3 -sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' partition by t1 interval(1h) fill(NULL) limit 1 +sql select _wstart, t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' partition by t1 interval(1h) fill(NULL) limit 1 if $row != 2 then return -1 endi -#if $data01 != 7 then -# return -1 -#endi -#if $data02 != 7 then -# return -1 -#endi -#if $data03 != 59 then -# print expect 59, actual: $data03 -# return -1 -#endi -#if $data04 != 7 then -# return -1 -#endi -#if $data11 != 8 then -# return -1 -#endi -#if $data12 != 8 then -# return -1 -#endi -#if $data13 != NULL then -# return -1 -#endi +if $data01 != NULL then + return -1 +endi +if $data02 != NULL then + return -1 +endi +if $data03 != NULL then + return -1 +endi +if $data11 != 7 then + return -1 +endi +if $data12 != 7 then + return -1 +endi +if $data13 != 59 then + print expect 59, actual: $data03 + return -1 +endi +if $data14 != 7 then + return -1 +endi sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' partition by t1 interval(1h) fill(NULL) limit 9 if $rows != 18 then diff --git a/tests/script/tsim/sma/tsmaCreateInsertQuery.sim b/tests/script/tsim/sma/tsmaCreateInsertQuery.sim index 868207c80b..e3b38d415c 100644 --- a/tests/script/tsim/sma/tsmaCreateInsertQuery.sim +++ b/tests/script/tsim/sma/tsmaCreateInsertQuery.sim @@ -179,4 +179,6 @@ if $data05 != 30.000000000 then return -1 endi +system sh/exec.sh -n dnode1 -s stop -x SIGINT + diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim index 4364b56d44..8ebadbfb50 100644 --- a/tests/script/tsim/stream/sliding.sim +++ b/tests/script/tsim/stream/sliding.sim @@ -367,7 +367,7 @@ if $data32 != 8 then endi #$loop_all = 0 -#looptest: +#=looptest: sql drop database IF EXISTS test2; sql drop stream IF EXISTS streams21; @@ -511,6 +511,6 @@ endi $loop_all = $loop_all + 1 print ============loop_all=$loop_all -#goto looptest +#=goto looptest system sh/stop_dnodes.sh \ No newline at end of file diff --git a/tests/system-test/2-query/tsbsQuery.py b/tests/system-test/2-query/tsbsQuery.py index a82c7bfe1a..ca270932b1 100644 --- a/tests/system-test/2-query/tsbsQuery.py +++ b/tests/system-test/2-query/tsbsQuery.py @@ -136,9 +136,14 @@ class TDTestCase: tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;") + tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;") + + # 9. breakdown-frequency # NULL ---count(NULL)=0 expect count(NULL)= 100 tdSql.query("SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where model is null partition BY model,state_changed ") + parRows=tdSql.queryRows + assert parRows != 0 , "query result is wrong" tdSql.query(" SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where state_changed =1 partition BY model,state_changed ;") diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py index 8971a51ef3..bc45b10f9b 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py @@ -93,7 +93,7 @@ class TDTestCase: def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): tdLog.printNoPrefix("======== test case 1: ") paraDict = {'dbName': 'db', - 'dbNumbers': 20, + 'dbNumbers': 8, 'dropFlag': 1, 'event': '', 'vgroups': 4, @@ -183,15 +183,22 @@ class TDTestCase: for tr in threads: tr.join() + tdLog.info("check dnode number:") clusterComCheck.checkDnodes(dnodeNumbers) - clusterComCheck.checkDbRows(allDbNumbers) - for i in range(restartNumbers): - clusterComCheck.checkDb(paraDict['dbNumbers'],restartNumbers,dbNameIndex = '%s%d'%(paraDict["dbName"],i)) + tdSql.query("show databases") + tdLog.debug("we find %d databases but exepect to create %d databases "%(tdSql.queryRows-2,allDbNumbers-2)) + + # tdLog.info("check DB Rows:") + # clusterComCheck.checkDbRows(allDbNumbers) + # tdLog.info("check DB Status on by on") + # for i in range(restartNumbers): + # clusterComCheck.checkDb(paraDict['dbNumbers'],restartNumbers,dbNameIndex = '%s%d'%(paraDict["dbName"],i)) + def run(self): # print(self.master_dnode.cfgDict) - self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=2,stopRole='dnode') + self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=10,stopRole='dnode') def stop(self): tdSql.close() diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py index 6db1a9fddd..36db0ffc31 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py @@ -98,7 +98,7 @@ class TDTestCase: 'vgroups': 4, 'replica': 1, 'stbName': 'stb', - 'stbNumbers': 100, + 'stbNumbers': 80, 'colPrefix': 'c', 'tagPrefix': 't', 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], @@ -188,7 +188,10 @@ class TDTestCase: tdSql.execute("use %s" %(paraDict["dbName"])) tdSql.query("show stables") - tdSql.checkRows(allStbNumbers) + tdLog.debug("we find %d stables but exepect to create %d stables "%(tdSql.queryRows,allStbNumbers)) + # # tdLog.info("check Stable Rows:") + # tdSql.checkRows(allStbNumbers) + def run(self): diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py index d9871bb35f..1788f24c3f 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py @@ -68,7 +68,7 @@ class TDTestCase: def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): tdLog.printNoPrefix("======== test case 1: ") paraDict = {'dbName': 'db', - 'dbNumbers': 10, + 'dbNumbers': 8, 'dropFlag': 1, 'event': '', 'vgroups': 2, diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py index f7d62857f9..3a10427664 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py @@ -98,7 +98,7 @@ class TDTestCase: 'vgroups': 4, 'replica': 1, 'stbName': 'stb', - 'stbNumbers': 100, + 'stbNumbers': 80, 'colPrefix': 'c', 'tagPrefix': 't', 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], @@ -142,7 +142,8 @@ class TDTestCase: threads=[] for i in range(restartNumbers): stableName= '%s%d'%(paraDict['stbName'],i) - threads.append(threading.Thread(target=clusterComCreate.create_stables, args=(tdSql, paraDict["dbName"],stableName,paraDict['stbNumbers']))) + newTdSql=tdCom.newTdSql() + threads.append(threading.Thread(target=clusterComCreate.create_stables, args=(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers']))) for tr in threads: tr.start() @@ -190,6 +191,7 @@ class TDTestCase: tdSql.execute("use %s" %(paraDict["dbName"])) tdSql.query("show stables") tdLog.debug("we find %d stables but exepect to create %d stables "%(tdSql.queryRows,allStbNumbers)) + # # tdLog.info("check Stable Rows:") # tdSql.checkRows(allStbNumbers) diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py index a9c8afd741..da32d1b4a8 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py @@ -159,19 +159,19 @@ class TDTestCase: for tr in threads: tr.join() clusterComCheck.checkDnodes(dnodeNumbers) - tdSql.query("show databases") - tdLog.debug("we find %d databases but exepect to create %d databases "%(tdSql.queryRows-2,allDbNumbers)) + # tdSql.query("show databases") + # tdLog.debug("we find %d databases but exepect to create %d databases "%(tdSql.queryRows-2,allDbNumbers)) - # # tdLog.info("check DB Rows:") - # clusterComCheck.checkDbRows(allDbNumbers) - # # tdLog.info("check DB Status on by on") + # tdLog.info("check DB Rows:") + clusterComCheck.checkDbRows(allDbNumbers) + # tdLog.info("check DB Status on by on") # for i in range(restartNumbers): # clusterComCheck.checkDb(paraDict['dbNumbers'],restartNumbers,dbNameIndex = '%s%d'%(paraDict["dbName"],i)) def run(self): # print(self.master_dnode.cfgDict) - self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=15,stopRole='vnode') + self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=10,stopRole='vnode') def stop(self): tdSql.close() diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py index 128dc10b37..5c9e3587c4 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py @@ -98,7 +98,7 @@ class TDTestCase: 'vgroups': 4, 'replica': 1, 'stbName': 'stb', - 'stbNumbers': 100, + 'stbNumbers': 80, 'colPrefix': 'c', 'tagPrefix': 't', 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], @@ -192,6 +192,8 @@ class TDTestCase: tdSql.execute("use %s" %(paraDict["dbName"])) tdSql.query("show stables") + tdLog.debug("we find %d stables but exepect to create %d stables "%(tdSql.queryRows,allStbNumbers)) + # # tdLog.info("check Stable Rows:") tdSql.checkRows(allStbNumbers) diff --git a/tests/system-test/6-cluster/5dnode3mnodeStop.py b/tests/system-test/6-cluster/5dnode3mnodeStop.py index f932e5537e..46e7771079 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeStop.py +++ b/tests/system-test/6-cluster/5dnode3mnodeStop.py @@ -111,7 +111,7 @@ class TDTestCase: def run(self): # print(self.master_dnode.cfgDict) - self.fiveDnodeThreeMnode(5,3,1) + self.fiveDnodeThreeMnode(dnodenumbers=5,mnodeNums=3,restartNumber=1) def stop(self): tdSql.close() diff --git a/tests/system-test/6-cluster/clusterCommonCheck.py b/tests/system-test/6-cluster/clusterCommonCheck.py index 203d756353..12ae34ac2b 100644 --- a/tests/system-test/6-cluster/clusterCommonCheck.py +++ b/tests/system-test/6-cluster/clusterCommonCheck.py @@ -63,6 +63,7 @@ class ClusterComCheck: count=0 while count < 5: tdSql.query("show databases;") + count+=1 if tdSql.checkRows(dbNumbers+2): tdLog.success("we find %d databases and expect %d in clusters! " %(tdSql.queryRows,dbNumbers+2)) return True diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index a56f79d20f..81c2becbde 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -535,6 +535,18 @@ class TMQCom: column_value_str = column_value_str.rstrip()[:-1] insert_sql = f'insert into {dbname}.{tbname_prefix}{tblIdx+tbname_index_start_num} values ({column_value_str});' tsql.execute(insert_sql) + + def waitSubscriptionExit(self, tsql, max_wait_count=20): + wait_cnt = 0 + while (wait_cnt < max_wait_count): + tsql.query("show subscriptions") + if tsql.getRows() == 0: + break + else: + time.sleep(2) + wait_cnt += 1 + + tdLog.info("wait subscriptions exit for %d s"%wait_cnt) def close(self): self.cursor.close() diff --git a/tests/system-test/7-tmq/tmqDnodeRestart.py b/tests/system-test/7-tmq/tmqDnodeRestart.py index 9699c4b32c..e69f5b1eeb 100644 --- a/tests/system-test/7-tmq/tmqDnodeRestart.py +++ b/tests/system-test/7-tmq/tmqDnodeRestart.py @@ -18,8 +18,8 @@ class TDTestCase: def __init__(self): self.snapshot = 0 self.vgroups = 2 - self.ctbNum = 100 - self.rowsPerTbl = 10000 + self.ctbNum = 1000 + self.rowsPerTbl = 1000 def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") @@ -38,8 +38,8 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 100, - 'rowsPerTbl': 10000, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 3, @@ -83,8 +83,8 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 100, - 'rowsPerTbl': 10000, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 5, @@ -117,13 +117,13 @@ class TDTestCase: tdSql.execute(sqlString) consumerId = 0 - expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 topicList = topicFromStb1 ifcheckdata = 0 ifManualCommit = 0 keyList = 'group.id:cgrp1,\ enable.auto.commit:true,\ - auto.commit.interval.ms:500,\ + auto.commit.interval.ms:3000,\ auto.offset.reset:earliest' tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) @@ -147,10 +147,46 @@ class TDTestCase: tdSql.query(queryString) totalRowsFromQury = tdSql.getRows() - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQury)) - if totalConsumeRows != totalRowsFromQury: + tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury)) + if not (totalConsumeRows == totalRowsFromQury): tdLog.exit("tmq consume rows error!") + + + + # tdLog.info("****************************************************************************") + # tmqCom.initConsumerTable() + # consumerId = 1 + # expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + # topicList = topicFromStb1 + # ifcheckdata = 0 + # ifManualCommit = 0 + # keyList = 'group.id:cgrp2,\ + # enable.auto.commit:true,\ + # auto.commit.interval.ms:3000,\ + # auto.offset.reset:earliest' + # tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + # tdLog.info("start consume processor") + # tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + # expectRows = 1 + # resultList = tmqCom.selectConsumeResult(expectRows) + # totalConsumeRows = 0 + # for i in range(expectRows): + # totalConsumeRows += resultList[i] + + # tdSql.query(queryString) + # totalRowsFromQury = tdSql.getRows() + + # tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury)) + # if not (totalConsumeRows == totalRowsFromQury): + # tdLog.exit("tmq consume rows error!") + + + # tdLog.info("****************************************************************************") + + tmqCom.waitSubscriptionExit(tdSql) tdSql.query("drop topic %s"%topicFromStb1) tdLog.printNoPrefix("======== test case 1 end ...... ") @@ -168,8 +204,8 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 100, - 'rowsPerTbl': 10000, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, 'batchNum': 3000, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 5, @@ -201,7 +237,7 @@ class TDTestCase: tdSql.execute(sqlString) consumerId = 1 - expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + 100000 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 topicList = topicFromStb1 ifcheckdata = 0 ifManualCommit = 0 @@ -220,8 +256,10 @@ class TDTestCase: tdDnodes.start(1) time.sleep(3) - tdLog.info("create some new child table and insert data ") - tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) + # tdLog.info("create some new child table and insert data ") + # paraDict["batchNum"] = 1000 + # paraDict["ctbPrefix"] = 'newCtb' + # tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) tdLog.info("insert process end, and start to check consume result") expectRows = 1 @@ -242,9 +280,9 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 2 end ...... ") def run(self): - tdSql.prepare() + # tdSql.prepare() self.prepareTestEnv() - # self.tmqCase1() + self.tmqCase1() self.tmqCase2() def stop(self): diff --git a/tests/system-test/7-tmq/tmqDropNtb.py b/tests/system-test/7-tmq/tmqDropNtb.py index 9200200588..5d58c38690 100644 --- a/tests/system-test/7-tmq/tmqDropNtb.py +++ b/tests/system-test/7-tmq/tmqDropNtb.py @@ -25,18 +25,6 @@ class TDTestCase: tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) - def waitSubscriptionExit(self, max_wait_count=20): - wait_cnt = 0 - while (wait_cnt < max_wait_count): - tdSql.query("show subscriptions") - if tdSql.getRows() == 0: - break - else: - time.sleep(1) - wait_cnt += 1 - - tdLog.info("wait subscriptions exit for %d s"%wait_cnt) - # drop some ntbs def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") @@ -115,7 +103,7 @@ class TDTestCase: tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") - self.waitSubscriptionExit() + tmqCom.waitSubscriptionExit(tdSql) tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) @@ -208,7 +196,7 @@ class TDTestCase: tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") - self.waitSubscriptionExit() + tmqCom.waitSubscriptionExit(tdSql) tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) diff --git a/tests/system-test/7-tmq/tmqDropStbCtb.py b/tests/system-test/7-tmq/tmqDropStbCtb.py index d9e675ddc6..e6783a2815 100644 --- a/tests/system-test/7-tmq/tmqDropStbCtb.py +++ b/tests/system-test/7-tmq/tmqDropStbCtb.py @@ -24,19 +24,7 @@ class TDTestCase: def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) - - def waitSubscriptionExit(self, max_wait_count=20): - wait_cnt = 0 - while (wait_cnt < max_wait_count): - tdSql.query("show subscriptions") - if tdSql.getRows() == 0: - break - else: - time.sleep(2) - wait_cnt += 1 - - tdLog.info("wait subscriptions exit for %d s"%wait_cnt) - + def prepareTestEnv(self): tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") paraDict = {'dbName': 'dbt', @@ -169,7 +157,7 @@ class TDTestCase: tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") - self.waitSubscriptionExit() + tmqCom.waitSubscriptionExit(tdSql) tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) @@ -258,7 +246,7 @@ class TDTestCase: tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") - self.waitSubscriptionExit() + tmqCom.waitSubscriptionExit(tdSql) tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) diff --git a/tests/system-test/clusterCase.sh b/tests/system-test/clusterCase.sh new file mode 100755 index 0000000000..cfc44f7f95 --- /dev/null +++ b/tests/system-test/clusterCase.sh @@ -0,0 +1,22 @@ +#!/bin/bash +set -e +set -x + +python3 ./test.py -f 6-cluster/5dnode1mnode.py +#python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3 +#python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3 +# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3 +# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3 +# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3 +# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3 +# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py +# python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 +# python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3 +# BUG Redict python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 6 -M 3 -C 5 +# python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 5 -M 3 + python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 6 -M 3 -C 5 + + diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index ec70d9ddbf..da29849b7f 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -152,15 +152,17 @@ python3 ./test.py -f 2-query/max_partition.py python3 ./test.py -f 2-query/last_row.py python3 ./test.py -f 6-cluster/5dnode1mnode.py -#python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3 -#python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3 -# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3 -# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3 -python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3 -# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3 + +# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3 +# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3 +# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3 + # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py # python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 # python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3 diff --git a/tools/shell/src/shellArguments.c b/tools/shell/src/shellArguments.c index cdbdc0de60..466aa52390 100644 --- a/tools/shell/src/shellArguments.c +++ b/tools/shell/src/shellArguments.c @@ -19,6 +19,9 @@ #include "shellInt.h" +#define TAOS_CONSOLE_PROMPT_HEADER "taos> " +#define TAOS_CONSOLE_PROMPT_CONTINUE " -> " + #define SHELL_HOST "The auth string to use when connecting to the server." #define SHELL_PORT "The TCP/IP port number to use for the connection." #define SHELL_USER "The user name to use when connecting to the server." diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index eefb0aa8b2..56bc1ed6cc 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -41,7 +41,7 @@ static void shellPrintError(TAOS_RES *tres, int64_t st); static bool shellIsCommentLine(char *line); static void shellSourceFile(const char *file); static void shellGetGrantInfo(); -static void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context); + static void shellCleanup(void *arg); static void *shellCancelHandler(void *arg); static void *shellThreadLoop(void *arg); @@ -919,11 +919,14 @@ void shellGetGrantInfo() { fprintf(stdout, "\r\n"); } -void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) { tsem_post(&shell.cancelSem); } - -void shellSigintHandler(int32_t signum, void *sigInfo, void *context) { - // do nothing +#ifdef WINDOWS +BOOL shellQueryInterruptHandler(DWORD fdwCtrlType) { + tsem_post(&shell.cancelSem); + return TRUE; } +#else +void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) { tsem_post(&shell.cancelSem); } +#endif void shellCleanup(void *arg) { taosResetTerminalMode(); } @@ -934,11 +937,10 @@ void *shellCancelHandler(void *arg) { taosMsleep(10); continue; } - - taosResetTerminalMode(); - printf("\r\nReceive SIGTERM or other signal, quit shell.\r\n"); - shellWriteHistory(); - shellExit(); + taos_kill_query(shell.conn); + #ifdef WINDOWS + printf("\n%s", shell.info.promptHeader); + #endif } return NULL; @@ -1022,7 +1024,7 @@ int32_t shellExecute() { taosSetSignal(SIGHUP, shellQueryInterruptHandler); taosSetSignal(SIGABRT, shellQueryInterruptHandler); - taosSetSignal(SIGINT, shellSigintHandler); + taosSetSignal(SIGINT, shellQueryInterruptHandler); shellGetGrantInfo();