diff --git a/docs/examples/python/connection_usage_native_reference.py b/docs/examples/python/connection_usage_native_reference.py index 8b754ec722..0a23c5f95b 100644 --- a/docs/examples/python/connection_usage_native_reference.py +++ b/docs/examples/python/connection_usage_native_reference.py @@ -42,4 +42,4 @@ print(data) # ANCHOR_END: query -conn.close() +conn.close() \ No newline at end of file diff --git a/docs/examples/python/kafka_example.py b/docs/examples/python/kafka_example.py index 43f9183f7e..5b81706ef7 100644 --- a/docs/examples/python/kafka_example.py +++ b/docs/examples/python/kafka_example.py @@ -106,8 +106,8 @@ class Consumer(object): for task in self.tasks: while not task.done(): pass - if self.pool is not None: - self.pool.shutdown() + if self.pool is not None: + self.pool.shutdown() # clean data if self.config.get('clean_after_testing'): diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 8f22745973..51b285cddd 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -86,10 +86,11 @@ void qCleanupKeywordsTable(); int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash); int32_t qResetStmtDataBlock(STableDataCxt* block, bool keepBuf); int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset); -int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, uint64_t suid, int32_t vgId, bool rebuildCreateTb); +int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, uint64_t suid, int32_t vgId, + bool rebuildCreateTb); void qDestroyStmtDataBlock(STableDataCxt* pBlock); STableMeta* qGetTableMetaInDataBlock(STableDataCxt* pDataBlock); -int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData **pData); +int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData** pData); int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx); int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery); @@ -104,18 +105,20 @@ void destroyBoundColumnInfo(void* pBoundInfo); int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf, int32_t msgBufLen); -void qDestroyBoundColInfo(void* pInfo); +void qDestroyBoundColInfo(void* pInfo); -SQuery* smlInitHandle(); -int32_t smlBuildRow(STableDataCxt* pTableCxt); -int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void *kv, int32_t index); +SQuery* smlInitHandle(); +int32_t smlBuildRow(STableDataCxt* pTableCxt); +int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* kv, int32_t index); STableDataCxt* smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta); -int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols, STableMeta* pTableMeta, - char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen); +int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols, + STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, + char* msgBuf, int16_t msgBufLen); int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash); -int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD *fields, int numFields); +int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* fields, + int numFields); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap); diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index 7f95ca3d72..dfdbaa6fdd 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -210,8 +210,8 @@ function install_bin() { [ -x ${install_main_dir}/bin/${serverName} ] && ${csudo}ln -s ${install_main_dir}/bin/${serverName} ${bin_link_dir}/${serverName} || : [ -x ${install_main_dir}/bin/${udfdName} ] && ${csudo}ln -s ${install_main_dir}/bin/${udfdName} ${bin_link_dir}/${udfdName} || : [ -x ${install_main_dir}/bin/${adapterName} ] && ${csudo}ln -s ${install_main_dir}/bin/${adapterName} ${bin_link_dir}/${adapterName} || : - [ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${demoName} || : - [ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${benchmarkName} || : + [ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${demoName} || : + [ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${benchmarkName} || : [ -x ${install_main_dir}/bin/${dumpName} ] && ${csudo}ln -s ${install_main_dir}/bin/${dumpName} ${bin_link_dir}/${dumpName} || : [ -x ${install_main_dir}/bin/${xname} ] && ${csudo}ln -s ${install_main_dir}/bin/${xname} ${bin_link_dir}/${xname} || : [ -x ${install_main_dir}/bin/TDinsight.sh ] && ${csudo}ln -s ${install_main_dir}/bin/TDinsight.sh ${bin_link_dir}/TDinsight.sh || : @@ -787,7 +787,7 @@ function updateProduct() { if echo $osinfo | grep -qwi "centos"; then rpm -q tdengine 2>&1 > /dev/null && rpm_erase tdengine ||: elif echo $osinfo | grep -qwi "ubuntu"; then - dpkg -l tdengine 2>&1 > /dev/null && deb_erase tdengine ||: + dpkg -l tdengine 2>&1 | grep ii > /dev/null && deb_erase tdengine ||: fi tar -zxf ${tarName} diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index f5b65371a7..3ed157efef 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -393,7 +393,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) { if (pStmt->bInfo.inExecCache) { - if(ASSERT(taosHashGetSize(pStmt->exec.pBlockHash) == 1)){ + if (ASSERT(taosHashGetSize(pStmt->exec.pBlockHash) == 1)) { tscError("stmtGetFromCache error"); return TSDB_CODE_TSC_STMT_CACHE_ERROR; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 01c99c6e9e..96f18e5fb6 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1885,9 +1885,6 @@ int32_t tmq_consumer_close(tmq_t* tmq) { } tmq_list_destroy(lst); - - /*return rsp;*/ - return 0; } taosRemoveRef(tmqMgmt.rsetId, tmq->refId); return 0; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 9b5b32cf69..a4b1be5a12 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1431,6 +1431,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pBlock->info.rows = 0; pBlock->info.capacity = 0; pBlock->info.rowSize = 0; + pBlock->info.id = pDataBlock->info.id; size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index d441d3d187..7d11bc7082 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -167,6 +167,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index a0c1754e82..8049db9c78 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -79,8 +79,6 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) { char path[TSDB_FILENAME_LEN] = {0}; - vnodeProposeCommitOnNeed(pVnode->pImpl); - taosThreadRwlockWrlock(&pMgmt->lock); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); taosThreadRwlockUnlock(&pMgmt->lock); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index eeb4249217..eb4fc3cdad 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -839,10 +839,14 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, char *addedTopic = strdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); // not exist in current topic -#if 0 + + bool existing = false; +#if 1 for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->currentTopics); i++) { char *topic = taosArrayGetP(pOldConsumer->currentTopics, i); - A(strcmp(topic, addedTopic) != 0); + if (strcmp(topic, addedTopic) == 0) { + existing = true; + } } #endif @@ -857,8 +861,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, } // add to current topic - taosArrayPush(pOldConsumer->currentTopics, &addedTopic); - taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString); + if (!existing) { + taosArrayPush(pOldConsumer->currentTopics, &addedTopic); + taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString); + } // set status if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) { diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index c243e83a15..52f7190dd6 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -41,6 +41,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq); static int32_t mndProcessCreateStbReq(SRpcMsg *pReq); static int32_t mndProcessAlterStbReq(SRpcMsg *pReq); static int32_t mndProcessDropStbReq(SRpcMsg *pReq); +static int32_t mndProcessDropTtltbReq(SRpcMsg *pReq); static int32_t mndProcessTableMetaReq(SRpcMsg *pReq); static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -65,6 +66,7 @@ int32_t mndInitStb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_DROP_TTL_TABLE_RSP, mndProcessDropTtltbReq); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq); @@ -2181,6 +2183,10 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, return 0; } +static int32_t mndProcessDropTtltbReq(SRpcMsg *pRsp) { + return 0; +} + static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 860db20fa8..1d2f4da26b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -93,6 +93,8 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle); ASSERT(pTask->exec.executor); + streamSetupTrigger(pTask); + return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index f9c2f0b3fa..fc0678e3e6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -218,6 +218,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader); static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); +static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } @@ -1192,9 +1193,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1; tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 - ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", + ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%"PRIu64" elapsed time:%.2f ms, %s", pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows, - unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); + unDumpedRows, pBlock->minVer, pBlock->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -2268,17 +2269,17 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea if (code == TSDB_CODE_SUCCESS) { pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL); - tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 + tsdbDebug("%p uid:%" PRIu64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 "-%" PRId64 " %s", pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr); } else { - tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid, + tsdbError("%p uid:%" PRIu64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr); return code; } } } else { - tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr); + tsdbDebug("%p uid:%" PRIu64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr); } STbData* di = NULL; @@ -2289,17 +2290,17 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea if (code == TSDB_CODE_SUCCESS) { pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL); - tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 + tsdbDebug("%p uid:%" PRIu64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 "-%" PRId64 " %s", pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr); } else { - tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid, + tsdbError("%p uid:%" PRIu64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr); return code; } } } else { - tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr); + tsdbDebug("%p uid:%" PRIu64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr); } initDelSkylineIterator(pBlockScanInfo, pReader, d, di); @@ -2609,6 +2610,14 @@ _end: void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; } +int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) { + if (pDelSkyline == NULL) { + return 0; + } + + return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1; +} + int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, STbData* piMemTbData) { if (pBlockScanInfo->delSkyline != NULL) { @@ -2626,7 +2635,6 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* if (pIdx != NULL) { code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData); } - if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -2655,11 +2663,13 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* } taosArrayDestroy(pDelData); - pBlockScanInfo->iter.index = - ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1; - pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index; - pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index; - pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index; + int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order); + + pBlockScanInfo->iter.index = index; + pBlockScanInfo->iiter.index = index; + pBlockScanInfo->fileDelIndex = index; + pBlockScanInfo->lastBlockDelIndex = index; + return code; _err: @@ -2758,7 +2768,7 @@ static int32_t uidComparFunc(const void* p1, const void* p2) { } } -static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) { +static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus, int32_t order) { int32_t index = 0; int32_t total = taosHashGetSize(pStatus->pTableMap); @@ -2772,7 +2782,21 @@ static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SRea taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc); } -static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) { +// reset the last del file index +static void resetScanBlockLastBlockDelIndex(SReaderStatus* pStatus, int32_t order) { + void* p = taosHashIterate(pStatus->pTableMap, NULL); + while (p != NULL) { + STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p; + + // reset the last del file index + pScanInfo->lastBlockDelIndex = getInitialDelIndex(pScanInfo->delSkyline, order); + p = taosHashIterate(pStatus->pTableMap, p); + } +} + +static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, STsdbReader* pReader) { + SReaderStatus* pStatus = &pReader->status; + int32_t total = taosHashGetSize(pStatus->pTableMap); if (total == 0) { return TSDB_CODE_SUCCESS; @@ -2785,7 +2809,7 @@ static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderSt return TSDB_CODE_OUT_OF_MEMORY; } - extractOrderedTableUidList(pOrderCheckInfo, pStatus); + extractOrderedTableUidList(pOrderCheckInfo, pStatus, pReader->order); uint64_t uid = pOrderCheckInfo->tableUidList[0]; pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid)); } else { @@ -2802,7 +2826,7 @@ static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderSt } pOrderCheckInfo->tableUidList = p; - extractOrderedTableUidList(pOrderCheckInfo, pStatus); + extractOrderedTableUidList(pOrderCheckInfo, pStatus, pReader->order); uid = pOrderCheckInfo->tableUidList[0]; pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid)); @@ -2822,11 +2846,7 @@ static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex]; pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid)); - if (pStatus->pTableIter == NULL) { - return false; - } - - return true; + return (pStatus->pTableIter != NULL); } static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { @@ -2834,7 +2854,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader; SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo; - int32_t code = initOrderCheckInfo(pOrderedCheckInfo, pStatus); + int32_t code = initOrderCheckInfo(pOrderedCheckInfo, pReader); if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) { return code; } @@ -2899,6 +2919,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; + ASSERT(pBlockInfo != NULL); + if (pBlockInfo != NULL) { pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); @@ -2919,7 +2941,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { initLastBlockReader(pLastBlockReader, pScanInfo, pReader); TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); - if (pBlockInfo == NULL) { // build data block from last data file + /*if (pBlockInfo == NULL) { // build data block from last data file SBlockData* pBData = &pReader->status.fileBlockData; tBlockDataReset(pBData); @@ -2951,7 +2973,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } - } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) { + } else*/ if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) { code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid); if (code != TSDB_CODE_SUCCESS) { return code; @@ -3130,6 +3152,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { // this file does not have data files, let's start check the last block file if exists if (pBlockIter->numOfBlocks == 0) { + resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order); goto _begin; } } @@ -3166,6 +3189,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { tBlockDataReset(pBlockData); resetDataBlockIterator(pBlockIter, pReader->order); + resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order); goto _begin; } else { code = initForFirstBlockInFile(pReader, pBlockIter); @@ -3177,6 +3201,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { // this file does not have blocks, let's start check the last block file if (pBlockIter->numOfBlocks == 0) { + resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order); goto _begin; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index c280e8c0e7..0e804bc65e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1207,7 +1207,7 @@ static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) { TSDB_CHECK_CODE(code, lino, _exit); } - tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk); + tMapDataPutItem(&pWriter->mDataBlk, &dataBlk, tPutDataBlk); pWriter->pDIter->dIter.iDataBlk++; } else { code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData); @@ -1645,8 +1645,8 @@ _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { - tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, pId->suid, - pId->uid); + tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, + pWriter->tbid.suid, pWriter->tbid.uid); } return code; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 7afc9ef830..d939f7ea77 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -717,9 +717,10 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG SDiskbasedBuf* pBuf); bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); +bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo); void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo); -bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator); +bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 037b33dc9f..e5089ab4a9 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -218,10 +218,7 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) { if (status == PROJECT_RETRIEVE_CONTINUE) { continue; } else if (status == PROJECT_RETRIEVE_DONE) { - size_t rows = pBlock->info.rows; - pExchangeInfo->limitInfo.numOfOutputRows += rows; - - if (rows == 0) { + if (pBlock->info.rows == 0) { setOperatorCompleted(pOperator); return NULL; } else { @@ -707,6 +704,8 @@ int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { } int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + if (pLimitInfo->remainGroupOffset > 0) { if (pLimitInfo->currentGroupId == 0) { // it is the first group pLimitInfo->currentGroupId = pBlock->info.id.groupId; @@ -750,36 +749,20 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa // set current group id pLimitInfo->currentGroupId = pBlock->info.id.groupId; - if (pLimitInfo->remainOffset >= pBlock->info.rows) { - pLimitInfo->remainOffset -= pBlock->info.rows; - blockDataCleanup(pBlock); + bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo); + if (pBlock->info.rows == 0) { return PROJECT_RETRIEVE_CONTINUE; - } else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) { - blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); - pLimitInfo->remainOffset = 0; - } - - // check for the limitation in each group - if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) { - int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); - blockDataKeepFirstNRows(pBlock, keepRows); - if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) { + } else { + if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { setOperatorCompleted(pOperator); - } else { - // current group limitation is reached, and future blocks of this group need to be discarded. - if (pBlock->info.rows == 0) { - return PROJECT_RETRIEVE_CONTINUE; - } + return PROJECT_RETRIEVE_DONE; } - - return PROJECT_RETRIEVE_DONE; } // todo optimize performance // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the // they may not belong to the same group the limit/offset value is not valid in this case. - if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 || - pLimitInfo->slimit.limit != -1) { + if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) { return PROJECT_RETRIEVE_DONE; } else { // not full enough, continue to accumulate the output data in the buffer. return PROJECT_RETRIEVE_CONTINUE; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index f8cce56dd4..540b5e90f8 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1761,6 +1761,10 @@ bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) { pLimitInfo->slimit.offset != -1); } +bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) { + return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1); +} + void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) { SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)}; SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)}; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 6c2bcf086d..f84871ea92 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -90,7 +90,16 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->binfo.pRes = pResBlock; pInfo->pFinalRes = createOneDataBlock(pResBlock, false); - pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? false : pProjPhyNode->mergeDataBlock; + + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + pInfo->mergeDataBlocks = false; + } else { + if (!pProjPhyNode->ignoreGroupId) { + pInfo->mergeDataBlocks = false; + } else { + pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock; + } + } int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -185,36 +194,15 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS SOperatorInfo* pOperator) { // set current group id pLimitInfo->currentGroupId = groupId; - - if (pLimitInfo->remainOffset >= pBlock->info.rows) { - pLimitInfo->remainOffset -= pBlock->info.rows; - blockDataCleanup(pBlock); + bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pOperator->pTaskInfo); + if (pBlock->info.rows == 0) { return PROJECT_RETRIEVE_CONTINUE; - } else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) { - blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); - pLimitInfo->remainOffset = 0; - } - - // check for the limitation in each group - if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) { - int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); - blockDataKeepFirstNRows(pBlock, keepRows); - - // TODO: optimize it later when partition by + limit - // all retrieved requirement has been fulfilled, let's finish this - if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) || - (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { + } else { + if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { setOperatorCompleted(pOperator); - } else { - // Even current group is done, there may be many vgroups remain existed, and we need to continue to retrieve data - // from next group. So let's continue this retrieve process - if (keepRows == 0) { - return PROJECT_RETRIEVE_CONTINUE; - } } } - pLimitInfo->numOfOutputRows += pBlock->info.rows; return PROJECT_RETRIEVE_DONE; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 25de627127..a7b9cce7aa 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -256,12 +256,11 @@ static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlo } } -// todo handle the slimit info -bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) { +bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SLimit* pLimit = &pLimitInfo->limit; const char* id = GET_TASKID(pTaskInfo); - if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) { + if (pLimitInfo->remainOffset > 0) { if (pLimitInfo->remainOffset >= pBlock->info.rows) { pLimitInfo->remainOffset -= pBlock->info.rows; blockDataEmpty(pBlock); @@ -276,12 +275,14 @@ bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) { // limit the output rows int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows); - blockDataKeepFirstNRows(pBlock, keep); + + pLimitInfo->numOfOutputRows += pBlock->info.rows; qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id); return true; } + pLimitInfo->numOfOutputRows += pBlock->info.rows; return false; } @@ -397,13 +398,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca } } - bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator); + bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo); if (limitReached) { // set operator flag is done setOperatorCompleted(pOperator); } pCost->totalRows += pBlock->info.rows; - pTableScanInfo->limitInfo.numOfOutputRows = pCost->totalRows; return TSDB_CODE_SUCCESS; } @@ -2714,9 +2714,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* } } - applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo, pOperator); - pInfo->limitInfo.numOfOutputRows += pResBlock->info.rows; - + applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); qDebug("%s get sorted row block, rows:%d, limit:%"PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, pInfo->limitInfo.numOfOutputRows); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 97b4fd9dc4..98ef6b8a36 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -222,6 +222,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } + // multi-group case not handle here SSDataBlock* pBlock = NULL; while (1) { pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, @@ -236,28 +237,14 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { continue; } - // todo add the limit/offset info - if (pInfo->limitInfo.remainOffset > 0) { - if (pInfo->limitInfo.remainOffset >= blockDataGetNumOfRows(pBlock)) { - pInfo->limitInfo.remainOffset -= pBlock->info.rows; - continue; - } - - blockDataTrimFirstNRows(pBlock, pInfo->limitInfo.remainOffset); - pInfo->limitInfo.remainOffset = 0; + // there are bugs? + bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo); + if (limitReached) { + resetLimitInfoForNextGroup(&pInfo->limitInfo); } - if (pInfo->limitInfo.limit.limit > 0 && - pInfo->limitInfo.limit.limit <= pInfo->limitInfo.numOfOutputRows + blockDataGetNumOfRows(pBlock)) { - int32_t remain = pInfo->limitInfo.limit.limit - pInfo->limitInfo.numOfOutputRows; - blockDataKeepFirstNRows(pBlock, remain); - } - - size_t numOfRows = blockDataGetNumOfRows(pBlock); - pInfo->limitInfo.numOfOutputRows += numOfRows; - pOperator->resultInfo.totalRows += numOfRows; - - if (numOfRows > 0) { + pOperator->resultInfo.totalRows += pBlock->info.rows; + if (pBlock->info.rows > 0) { break; } } @@ -557,7 +544,6 @@ typedef struct SMultiwayMergeOperatorInfo { SSDataBlock* pIntermediateBlock; // to hold the intermediate result int64_t startTs; // sort start time bool groupSort; - bool hasGroupId; uint64_t groupId; STupleHandle* prefetchedTuple; } SMultiwayMergeOperatorInfo; @@ -604,7 +590,9 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity, SSDataBlock* p) { +static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity, + SSDataBlock* p, bool* newgroup) { + *newgroup = false; while (1) { STupleHandle* pTupleHandle = NULL; @@ -613,8 +601,12 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pTupleHandle = tsortNextTuple(pHandle); } else { pTupleHandle = pInfo->prefetchedTuple; - pInfo->groupId = tsortGetGroupId(pTupleHandle); pInfo->prefetchedTuple = NULL; + uint64_t gid = tsortGetGroupId(pTupleHandle); + if (gid != pInfo->groupId) { + *newgroup = true; + pInfo->groupId = gid; + } } } else { pTupleHandle = tsortNextTuple(pHandle); @@ -627,12 +619,10 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* if (pInfo->groupSort) { uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); - if (!pInfo->hasGroupId) { + if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) { + appendOneRowToDataBlock(p, pTupleHandle); + p->info.id.groupId = tupleGroupId; pInfo->groupId = tupleGroupId; - pInfo->hasGroupId = true; - appendOneRowToDataBlock(p, pTupleHandle); - } else if (pInfo->groupId == tupleGroupId) { - appendOneRowToDataBlock(p, pTupleHandle); } else { pInfo->prefetchedTuple = pTupleHandle; break; @@ -645,11 +635,6 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* break; } } - - if (pInfo->groupSort) { - pInfo->hasGroupId = false; - } - } SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo, @@ -673,14 +658,19 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData } SSDataBlock* p = pInfo->pIntermediateBlock; + bool newgroup = false; while (1) { - doGetSortedBlockData(pInfo, pHandle, capacity, p); + doGetSortedBlockData(pInfo, pHandle, capacity, p, &newgroup); if (p->info.rows == 0) { break; } - bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator); + if (newgroup) { + resetLimitInfoForNextGroup(&pInfo->limitInfo); + } + + bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo); if (limitReached) { resetLimitInfoForNextGroup(&pInfo->limitInfo); } diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index f5ceeeafe7..7cb49c8f54 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -440,6 +440,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { break; } + if (pSliceInfo->scalarSup.pExprInfo != NULL) { + SExprSupp* pExprSup = &pSliceInfo->scalarSup; + projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + } + int32_t code = initKeeperInfo(pSliceInfo, pBlock); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); @@ -538,6 +543,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); } + doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); + // restore the value setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); if (pResBlock->info.rows == 0) { @@ -573,6 +580,11 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode } } + code = filterInitFromNode((SNode*)pInterpPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries); pInfo->fillType = convertFillType(pInterpPhyNode->fillMode); initResultSizeInfo(&pOperator->resultInfo, 4096); @@ -629,6 +641,7 @@ void destroyTimeSliceOperatorInfo(void* param) { taosMemoryFree(pKey->end.val); } taosArrayDestroy(pInfo->pLinearInfo); + cleanupExprSupp(&pInfo->scalarSup); taosMemoryFree(pInfo->pFillColInfo); taosMemoryFreeClear(param); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 648580b913..38108dcff6 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2445,7 +2445,19 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p pInfo->delKey = key; } int32_t prevEndPos = (forwardRows - 1) * step + startPos; - ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); + if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) { + qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64 + ",maxKey %" PRId64, + pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey); + blockDataUpdateTsWindow(pSDataBlock, 0); + + // timestamp of the data is incorrect + if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) { + qError("table uid %" PRIu64 " data block timestamp is out of range! minKey %" PRId64 ",maxKey %" PRId64, + pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey); + } + } + if (IS_FINAL_OP(pInfo)) { startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos); } else { diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 3c5d4d506a..630b61bd4b 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1693,6 +1693,9 @@ static int32_t getTableSchemaFromMetaData(SInsertParseContext* pCxt, const SMeta if (TSDB_CODE_SUCCESS == code && !isStb && TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) { code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported"); } + if (TSDB_CODE_SUCCESS == code && isStb) { + code = storeTableMeta(pCxt, pStmt); + } if (TSDB_CODE_SUCCESS == code) { code = getTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb); } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 4b3f807a53..ae3f5226f5 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1550,11 +1550,14 @@ static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* p // select percentile() without from clause is also valid if ((NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) || (TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType && - TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType))) || - NULL != pSelect->pPartitionByList) { + TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType)))) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE, "%s is only supported in single table query", pFunc->functionName); } + if (NULL != pSelect->pPartitionByList) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC, + "%s function is not supported in partition query", pFunc->functionName); + } return TSDB_CODE_SUCCESS; } @@ -2979,7 +2982,7 @@ static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode* intervalRange = pInterval->datum.i; } - if ((timeRange == 0) || (timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) { + if ((timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 001ec66725..208f548457 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1055,7 +1055,7 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel TSWAP(pProject->node.pLimit, pSelect->pLimit); TSWAP(pProject->node.pSlimit, pSelect->pSlimit); - pProject->ignoreGroupId = (NULL == pSelect->pPartitionByList); + pProject->ignoreGroupId = pSelect->isSubquery ? true : (NULL == pSelect->pPartitionByList); pProject->node.groupAction = (!pSelect->isSubquery && pCxt->pPlanCxt->streamQuery) ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR; pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 6208fc172a..361cf33d58 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -348,7 +348,8 @@ static bool stbSplIsPartTableAgg(SAggLogicNode* pAgg) { return false; } if (NULL != pAgg->pGroupKeys) { - return stbSplHasPartTbname(pAgg->pGroupKeys) && stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)); + return stbSplHasPartTbname(pAgg->pGroupKeys) && + stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)); } return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))); } @@ -559,6 +560,8 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla if (NULL == pMerge->node.pLimit) { code = TSDB_CODE_OUT_OF_MEMORY; } + ((SLimitNode*)pSplitNode->pLimit)->limit += ((SLimitNode*)pSplitNode->pLimit)->offset; + ((SLimitNode*)pSplitNode->pLimit)->offset = 0; } if (TSDB_CODE_SUCCESS == code) { if (NULL == pSubplan) { @@ -1039,21 +1042,29 @@ static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) return code; } -static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) { - SLogicNode* pSplitNode = pInfo->pSplitNode; +static int32_t stbSplGetSplitNodeForScan(SStableSplitInfo* pInfo, SLogicNode** pSplitNode) { + *pSplitNode = pInfo->pSplitNode; if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) && NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) { - pSplitNode = pInfo->pSplitNode->pParent; + *pSplitNode = pInfo->pSplitNode->pParent; if (NULL != pInfo->pSplitNode->pLimit) { - pSplitNode->pLimit = nodesCloneNode(pInfo->pSplitNode->pLimit); - if (NULL == pSplitNode->pLimit) { + (*pSplitNode)->pLimit = nodesCloneNode(pInfo->pSplitNode->pLimit); + if (NULL == (*pSplitNode)->pLimit) { return TSDB_CODE_OUT_OF_MEMORY; } ((SLimitNode*)pInfo->pSplitNode->pLimit)->limit += ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset; ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset = 0; } } - int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, SUBPLAN_TYPE_MERGE); + return TSDB_CODE_SUCCESS; +} + +static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pSplitNode = NULL; + int32_t code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode); + if (TSDB_CODE_SUCCESS == code) { + code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, SUBPLAN_TYPE_MERGE); + } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); @@ -1063,12 +1074,11 @@ static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSp } static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) { - SLogicNode* pSplitNode = pInfo->pSplitNode; - if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) && - NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) { - pSplitNode = pInfo->pSplitNode->pParent; + SLogicNode* pSplitNode = NULL; + int32_t code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true); } - int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true); if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 787ef7501d..9553cb97b5 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -194,6 +194,8 @@ typedef struct SQWorker { SMsgCb msgCb; SQWStat stat; int32_t *destroyed; + + int8_t nodeStopped; } SQWorker; typedef struct SQWorkerMgmt { diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index fdd2775daa..7ee7c50c96 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -213,9 +213,15 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { QW_SET_QTID(id, qId, tId, eId); *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id)); + int8_t nodeStopped = atomic_load_8(&mgmt->nodeStopped); if (NULL == (*ctx)) { - QW_TASK_DLOG_E("task ctx not exist, may be dropped"); - QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); + if (!nodeStopped) { + QW_TASK_DLOG_E("task ctx not exist, may be dropped"); + QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); + } else { + QW_TASK_DLOG_E("node stopped"); + QW_ERR_RET(TSDB_CODE_VND_STOPPED); + } } return TSDB_CODE_SUCCESS; @@ -226,9 +232,16 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { QW_SET_QTID(id, qId, tId, eId); *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); + int8_t nodeStopped = atomic_load_8(&mgmt->nodeStopped); + if (NULL == (*ctx)) { - QW_TASK_DLOG_E("task ctx not exist, may be dropped"); - QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); + if (!nodeStopped) { + QW_TASK_DLOG_E("task ctx not exist, may be dropped"); + QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); + } else { + QW_TASK_DLOG_E("node stopped"); + QW_ERR_RET(TSDB_CODE_VND_STOPPED); + } } return TSDB_CODE_SUCCESS; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 2f712e6eba..1e35529d27 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1187,6 +1187,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { uint64_t qId, tId, sId; int32_t eId; int64_t rId = 0; + + atomic_store_8(&mgmt->nodeStopped, 1); + void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index e26c8a5c5b..56c3c3ebef 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -199,7 +199,7 @@ void streamStateClose(SStreamState* pState) { } int32_t streamStateBegin(SStreamState* pState) { - if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { tdbAbort(pState->pTdbState->db, pState->pTdbState->txn); return -1; @@ -215,7 +215,7 @@ int32_t streamStateCommit(SStreamState* pState) { return -1; } - if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } @@ -227,7 +227,7 @@ int32_t streamStateAbort(SStreamState* pState) { return -1; } - if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 5277e7818f..063b4f51f5 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -24,7 +24,7 @@ extern "C" { #define SYNC_SNAPSHOT_SEQ_INVALID -2 #define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3 -#define SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT -1 +#define SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT -1 #define SYNC_SNAPSHOT_SEQ_BEGIN 0 #define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 6cc517fda0..bb3bb0d6a4 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -830,7 +830,7 @@ int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy pMgr->endIndex = index + 1; SSyncLogBuffer* pBuf = pNode->pLogBuf; - sInfo("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term: %" PRId64 ". mgr (rs:%d): [%" PRId64 + sTrace("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term: %" PRId64 ". mgr (rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 880c76e4dd..e61bcc9ffc 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -112,7 +112,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfig = pSender->lastConfig; pMsg->startTime = pSender->startTime; - pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT; + pMsg->seq = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT; // event log syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start"); @@ -379,7 +379,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p } pReceiver->start = true; - pReceiver->ack = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT; + pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT; pReceiver->term = pReceiver->pSyncNode->raftStore.currentTerm; pReceiver->fromId = pPreMsg->srcId; pReceiver->startTime = pPreMsg->startTime; @@ -527,7 +527,7 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) { return snapStart; } -static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { +static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; int64_t timeNow = taosGetTimestampMs(); int32_t code = 0; @@ -565,7 +565,7 @@ _START_RECEIVER: } else { // waiting for clock match while (timeNow < pMsg->startTime) { - sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, + sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", startTime:%" PRId64, timeNow, pMsg->startTime); taosMsleep(10); timeNow = taosGetTimestampMs(); @@ -765,7 +765,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs // receiver on message // -// condition 1, recv SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT +// condition 1, recv SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT // if receiver already start // if sender.start-time > receiver.start-time, restart receiver(reply snapshot start) // if sender.start-time = receiver.start-time, maybe duplicate msg @@ -809,9 +809,9 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { int32_t code = 0; if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pMsg->term == pSyncNode->raftStore.currentTerm) { - if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) { + if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot"); - code = syncNodeOnSnapshotPre(pSyncNode, pMsg); + code = syncNodeOnSnapshotPrep(pSyncNode, pMsg); } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin"); code = syncNodeOnSnapshotBegin(pSyncNode, pMsg); @@ -848,7 +848,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { return code; } -static int32_t syncNodeOnSnapshotPreRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { +static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { SSnapshot snapshot = {0}; pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); @@ -945,8 +945,8 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { if (pMsg->startTime != pSender->startTime) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match"); - sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, code:0x%x", pMsg->startTime, - pSender->startTime, pMsg->code); + sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, error:%s 0x%x", pMsg->startTime, + pSender->startTime, tstrerror(pMsg->code), pMsg->code); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; goto _ERROR; } @@ -961,15 +961,15 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { if (pMsg->code != 0) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code"); - sSError(pSender, "snapshot sender receive error code:0x%x and stop sender", pMsg->code); + sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code); terrno = pMsg->code; goto _ERROR; } // prepare , send begin msg - if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) { + if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot"); - return syncNodeOnSnapshotPreRsp(pSyncNode, pSender, pMsg); + return syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg); } if (pSender->pReader == NULL || pSender->finish) { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index b246d9a79d..6a50572cba 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -141,20 +141,15 @@ static void syncLogReplMgrStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bu } static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { - int32_t len = 1; - + int32_t len = 0; + len += snprintf(buf + len, bufLen - len, "%s", "{"); for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { SPeerState* pState = syncNodeGetPeerState(pSyncNode, &(pSyncNode->replicasId[i])); if (pState == NULL) break; - - if (i < pSyncNode->replicaNum - 1) { - len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 ", ", i, pState->lastSendIndex, - pState->lastSendTime); - } else { - len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 "}", i, pState->lastSendIndex, - pState->lastSendTime); - } + len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 "%s", i, pState->lastSendIndex, + pState->lastSendTime, (i < pSyncNode->replicaNum - 1) ? ", " : ""); } + len += snprintf(buf + len, bufLen - len, "%s", "}"); } void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { @@ -245,7 +240,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla char cfgStr[1024] = ""; syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr)); - char peerStr[1024] = "{"; + char peerStr[1024] = ""; syncPeerState2Str(pNode, peerStr, sizeof(peerStr)); char eventLog[512]; // {0}; @@ -255,20 +250,21 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla va_end(argpointer); taosPrintLog(flags, level, dflag, - "vgId:%d, %s, sync:%s, {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 - " lcindex:%" PRId64 - " seq:%d ack:%d finish:%d replica-index:%d dnode:%d}" - ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 - ", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64 - ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", + "vgId:%d, %s, sync:%s, snap-sender:{%p start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 + " last-term:%" PRIu64 " last-cfg:%" PRId64 + ", seq:%d ack:%d finish:%d, as:%d dnode:%d}" + ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 + ", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64 + "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64 + ", chging:%d, restore:%d, quorum:%d, lc-timer:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s", pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex, DID(&pNode->replicasId[pSender->replicaIndex]), pNode->raftStore.currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, - pNode->raftCfg.isStandBy, pNode->raftCfg.snapshotStrategy, pNode->raftCfg.batchSize, pNode->replicaNum, - pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), - pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); + pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, + pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, + pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); } void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, @@ -291,7 +287,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df char cfgStr[1024] = ""; syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr)); - char peerStr[1024] = "{"; + char peerStr[1024] = ""; syncPeerState2Str(pNode, peerStr, sizeof(peerStr)); char eventLog[512]; // {0}; @@ -300,22 +296,22 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer); va_end(argpointer); - taosPrintLog(flags, level, dflag, - "vgId:%d, %s, sync:%s," - " {%p start:%d ack:%d term:%" PRIu64 " start-time:%" PRId64 " from dnode:%d s-param:%" PRId64 - " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64 - "}" - ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 - ", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64 - ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", - pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, - pReceiver->term, pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, - pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, - pReceiver->snapshot.lastConfigIndex, pNode->raftStore.currentTerm, pNode->commitIndex, logBeginIndex, - logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, - pNode->raftCfg.isStandBy, pNode->raftCfg.snapshotStrategy, pNode->raftCfg.batchSize, pNode->replicaNum, - pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), - pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); + taosPrintLog( + flags, level, dflag, + "vgId:%d, %s, sync:%s," + " snap-receiver:{%p started:%d acked:%d term:%" PRIu64 " start-time:%" PRId64 " from-dnode:%d, start:%" PRId64 + " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64 + "}" + ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64 + ", snap:{last-index:%" PRId64 ", last-term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64 + ", chging:%d, restore:%d, quorum:%d, lc-timers:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s", + pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, + pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, + pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex, + pNode->raftStore.currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, + snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, + pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, + syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); } void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) { @@ -351,13 +347,13 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool int64_t execTime) { if (printX) { sNTrace(pSyncNode, - "send sync-heartbeat to dnode:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 - "}, x", + "send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 + ", ts:%" PRId64 "}, x", DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp); } else { sNTrace(pSyncNode, - "send sync-heartbeat to dnode:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 - "}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64, + "send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 + ", ts:%" PRId64 "}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64, DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed, execTime); } @@ -368,14 +364,14 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64 pSyncNode->hbSlowNum++; sNInfo(pSyncNode, - "recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 + "recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64, DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff); } sNTrace(pSyncNode, - "recv sync-heartbeat from dnode:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 - "}, %s, net elapsed:%" PRId64, + "recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 + ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64, DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff); } @@ -400,67 +396,64 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { sNDebug(pSyncNode, - "send sync-snapshot-send to dnode:%d, %s, seq:%d, term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 - ", lterm:%" PRId64 ", stime:%" PRId64, + "send sync-snapshot-send to dnode:%d, %s, seq:%d, term:%" PRId64 ", begin-index:%" PRId64 + ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64, DID(&pMsg->destId), s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime); } void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { sNDebug(pSyncNode, - "recv sync-snapshot-send from dnode:%d, %s, seq:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 - ", lterm:%" PRId64 ", stime:%" PRId64 ", len:%u", + "recv sync-snapshot-send from dnode:%d, %s, seq:%d, term:%" PRId64 ", begin-index:%" PRId64 + ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64 ", data-len:%u", DID(&pMsg->srcId), s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->dataLen); } void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { sNDebug(pSyncNode, - "send sync-snapshot-rsp to dnode:%d, %s, ack:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 - ", lterm:%" PRId64 ", stime:%" PRId64, + "send sync-snapshot-rsp to dnode:%d, %s, acked:%d, term:%" PRId64 ", begin-index:%" PRId64 + ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64, DID(&pMsg->destId), s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime); } void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { sNDebug(pSyncNode, - "recv sync-snapshot-rsp from dnode:%d, %s, ack:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 - ", lterm:%" PRId64 ", stime:%" PRId64, + "recv sync-snapshot-rsp from dnode:%d, %s, ack:%d, term:%" PRId64 ", begin-index:%" PRId64 + ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64, DID(&pMsg->srcId), s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime); } void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { sNTrace(pSyncNode, - "recv sync-append-entries from dnode:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 - ", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s", - DID(&pMsg->srcId), pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataLen, s); + "recv sync-append-entries from dnode:%d {term:%" PRId64 ", prev-log:{index:%" PRId64 ", term:%" PRId64 + "}, commit-index:%" PRId64 ", datalen:%d}, %s", + DID(&pMsg->srcId), pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->dataLen, s); } void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { sNTrace(pSyncNode, - "send sync-append-entries to dnode:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 - ", lsend-index:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s", + "send sync-append-entries to dnode:%d, {term:%" PRId64 ", prev-log:{index:%" PRId64 ", term:%" PRId64 + "}, index:%" PRId64 ", commit-index:%" PRId64 ", datalen:%d}, %s", DID(&pMsg->destId), pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, (pMsg->prevLogIndex + 1), pMsg->commitIndex, pMsg->dataLen, s); } -void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* s) { - if (voteGranted == -1) { - sNInfo(pSyncNode, - "recv sync-request-vote from dnode:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", - DID(&pMsg->srcId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); - } else { - sNInfo(pSyncNode, - "recv sync-request-vote from dnode:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 - "}, granted:%d", - DID(&pMsg->srcId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, voteGranted); - } +void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, + const char* errmsg) { + char statusMsg[64]; + snprintf(statusMsg, sizeof(statusMsg), "granted:%d", voteGranted); + sNInfo(pSyncNode, + "recv sync-request-vote from dnode:%d, {term:%" PRId64 ", last-index:%" PRId64 ", last-term:%" PRId64 "}, %s", + DID(&pMsg->srcId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, + (voteGranted != -1) ? statusMsg : errmsg); } void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) { - sNInfo(pNode, "send sync-request-vote to dnode:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", + sNInfo(pNode, + "send sync-request-vote to dnode:%d {term:%" PRId64 ", last-index:%" PRId64 ", last-term:%" PRId64 "}, %s", DID(&pMsg->destId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); } diff --git a/source/util/src/talgo.c b/source/util/src/talgo.c index a06aac6afe..e373850b3c 100644 --- a/source/util/src/talgo.c +++ b/source/util/src/talgo.c @@ -28,14 +28,14 @@ static void median(void *src, int64_t size, int64_t s, int64_t e, const void *pa void *buf) { int32_t mid = ((int32_t)(e - s) >> 1u) + (int32_t)s; - if (comparFn(elePtrAt(src, size, mid), elePtrAt(src, size, s), param) == 1) { + if (comparFn(elePtrAt(src, size, mid), elePtrAt(src, size, s), param) > 0) { doswap(elePtrAt(src, size, mid), elePtrAt(src, size, s), size, buf); } - if (comparFn(elePtrAt(src, size, mid), elePtrAt(src, size, e), param) == 1) { + if (comparFn(elePtrAt(src, size, mid), elePtrAt(src, size, e), param) > 0) { doswap(elePtrAt(src, size, mid), elePtrAt(src, size, s), size, buf); doswap(elePtrAt(src, size, mid), elePtrAt(src, size, e), size, buf); - } else if (comparFn(elePtrAt(src, size, s), elePtrAt(src, size, e), param) == 1) { + } else if (comparFn(elePtrAt(src, size, s), elePtrAt(src, size, e), param) > 0) { doswap(elePtrAt(src, size, s), elePtrAt(src, size, e), size, buf); } @@ -47,7 +47,7 @@ static void tInsertSort(void *src, int64_t size, int32_t s, int32_t e, const voi void *buf) { for (int32_t i = s + 1; i <= e; ++i) { for (int32_t j = i; j > s; --j) { - if (comparFn(elePtrAt(src, size, j), elePtrAt(src, size, j - 1), param) == -1) { + if (comparFn(elePtrAt(src, size, j), elePtrAt(src, size, j - 1), param) < 0) { doswap(elePtrAt(src, size, j), elePtrAt(src, size, j - 1), size, buf); } else { break; diff --git a/tests/ci/Dockerfile b/tests/ci/Dockerfile new file mode 100644 index 0000000000..594bcc902d --- /dev/null +++ b/tests/ci/Dockerfile @@ -0,0 +1,48 @@ +FROM python:3.8 +RUN pip3 config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple +RUN pip3 install pandas psutil fabric2 requests faker simplejson toml pexpect tzlocal distro +RUN apt-get update +RUN apt-get install -y psmisc sudo tree libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g pkg-config build-essential valgrind \ + vim libjemalloc-dev openssh-server screen sshpass net-tools dirmngr gnupg apt-transport-https ca-certificates software-properties-common r-base iputils-ping +RUN apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9 +RUN add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/' +RUN apt install -y r-base +ADD go1.17.6.linux-amd64.tar.gz /usr/local/ +ADD jdk-8u144-linux-x64.tar.gz /usr/local/ +ADD apache-maven-3.8.4-bin.tar.gz /usr/local/ +RUN apt-get install wget -y \ + && wget https://packages.microsoft.com/config/ubuntu/18.04/packages-microsoft-prod.deb -O packages-microsoft-prod.deb \ + && dpkg -i packages-microsoft-prod.deb \ + && rm packages-microsoft-prod.deb \ + && apt-get update && apt-get install -y dotnet-sdk-5.0 && apt-get install -y dotnet-sdk-6.0 +ADD node-v12.20.0-linux-x64.tar.gz /usr/local/ +RUN sh -c "rm -f /etc/localtime;ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime;echo \"Asia/Shanghai\" >/etc/timezone" +COPY id_rsa /root/.ssh/id_rsa +COPY .m2 /root/.m2 +COPY .nuget /root/.nuget +COPY .dotnet /root/.dotnet +COPY .cargo /root/.cargo +COPY go /root/go +ADD cmake-3.21.5-linux-x86_64.tar.gz /usr/local/ +RUN echo " export RUSTUP_DIST_SERVER=\"https://rsproxy.cn\" " >> /root/.bashrc +RUN echo " export RUSTUP_UPDATE_ROOT=\"https://rsproxy.cn/rustup\" " >> /root/.bashrc +RUN curl https://sh.rustup.rs -o /tmp/rustup-init.sh +RUN sh /tmp/rustup-init.sh -y +ENV PATH /usr/local/go/bin:/usr/local/node-v12.20.0-linux-x64/bin:/usr/local/apache-maven-3.8.4/bin:/usr/local/jdk1.8.0_144/bin:/usr/local/cmake-3.21.5-linux-x86_64/bin:/root/.cargo/bin:$PATH +ENV JAVA_HOME /usr/local/jdk1.8.0_144 +RUN go env -w GOPROXY=https://goproxy.cn +RUN echo "StrictHostKeyChecking no" >>/etc/ssh/ssh_config +RUN npm config -g set unsafe-perm +RUN npm config -g set registry https://registry.npm.taobao.org +COPY .npm /root/.npm +RUN R CMD javareconf JAVA_HOME=${JAVA_HOME} JAVA=${JAVA_HOME}/bin/java JAVAC=${JAVA_HOME}/bin/javac JAVAH=${JAVA_HOME}/bin/javah JAR=${JAVA_HOME}/bin/jar +RUN echo "install.packages(\"RJDBC\", repos=\"http://cran.us.r-project.org\")"|R --no-save +COPY .gitconfig /root/.gitconfig +RUN mkdir -p /run/sshd +COPY id_rsa.pub /root/.ssh/id_rsa.pub +COPY id_rsa.pub /root/.ssh/authorized_keys +RUN pip3 uninstall -y taostest +COPY repository/TDinternal /home/TDinternal +COPY repository/taos-connector-python /home/taos-connector-python +RUN sh -c "cd /home/taos-connector-python; pip3 install ." +COPY setup.sh /home/setup.sh \ No newline at end of file diff --git a/tests/ci/build_image.sh b/tests/ci/build_image.sh new file mode 100755 index 0000000000..1864df35db --- /dev/null +++ b/tests/ci/build_image.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +docker build --no-cache -t taos_test:v1.0 . + diff --git a/tests/ci/daily_build_image.sh b/tests/ci/daily_build_image.sh new file mode 100755 index 0000000000..01148a3aae --- /dev/null +++ b/tests/ci/daily_build_image.sh @@ -0,0 +1,60 @@ +#!/bin/bash + +set -x + +script_dir=`dirname $0` +cd $script_dir +script_dir=`pwd` +cd $script_dir/repository/taos-connector-python +git pull + +cd $script_dir/repository/TDinternal +git clean -fxd +git pull + +cd $script_dir/repository/TDinternal/community +git clean -fxd +git checkout main +git pull origin main +git submodule update --init --recursive + +cd $script_dir +cp $script_dir/repository/TDinternal/community/tests/ci/build_image.sh . +cp $script_dir/repository/TDinternal/community/tests/ci/daily_build_image.sh . + +./build_image.sh || exit 1 +docker image prune -f +ips="\ +192.168.1.47 \ +192.168.1.48 \ +192.168.1.49 \ +192.168.1.52 \ +192.168.0.215 \ +192.168.0.217 \ +192.168.0.219 \ +" + +image=taos_image.tar + +docker save taos_test:v1.0 -o $image + +for ip in $ips; do + echo "scp $image root@$ip:/home/ &" + scp $image root@$ip:/home/ & +done +wait + +for ip in $ips; do + echo "ssh root@$ip docker load -i /home/$image &" + ssh root@$ip docker load -i /home/$image & +done +wait + +for ip in $ips; do + echo "ssh root@$ip rm -f /home/$image &" + ssh root@$ip rm -f /home/$image & +done +wait + +rm -rf taos_image.tar + diff --git a/tests/script/tsim/parser/interp.sim b/tests/script/tsim/parser/interp.sim index 1b7878178c..e6512a22d7 100644 --- a/tests/script/tsim/parser/interp.sim +++ b/tests/script/tsim/parser/interp.sim @@ -72,4 +72,33 @@ sql_error select interp(*) from nt5931 where ts=now sql_error select interp(*) from st5931 where ts=now sql_error select interp(*) from ct5931 where ts=now +sql create stable sta (ts timestamp, f1 double, f2 binary(200)) tags(t1 int); +sql create table tba1 using sta tags(1); +sql insert into tba1 values ('2022-04-26 15:15:01', -3.0, "a"); +sql insert into tba1 values ('2022-04-26 15:15:05', 3.0, "b"); +sql select a from (select interp(f1) as a from tba1 where ts >= '2022-04-26 15:15:01' and ts <= '2022-04-26 15:15:05' range('2022-04-26 15:15:01','2022-04-26 15:15:05') every(1s) fill(linear)) where a > 0; +if $rows != 2 then + return -1 +endi +if $data00 != 1.500000000 then + return -1 +endi +if $data10 != 3.000000000 then + return -1 +endi + +sql select a from (select interp(f1+1) as a from tba1 where ts >= '2022-04-26 15:15:01' and ts <= '2022-04-26 15:15:05' range('2022-04-26 15:15:01','2022-04-26 15:15:05') every(1s) fill(linear)) where a > 0; +if $rows != 3 then + return -1 +endi +if $data00 != 1.000000000 then + return -1 +endi +if $data10 != 2.500000000 then + return -1 +endi +if $data20 != 4.000000000 then + return -1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/parser/regressiontest.sim b/tests/script/tsim/parser/regressiontest.sim index 1b127155cb..3ce2b47b44 100644 --- a/tests/script/tsim/parser/regressiontest.sim +++ b/tests/script/tsim/parser/regressiontest.sim @@ -63,4 +63,38 @@ if $rows != 8198 then return -1 endi +print ===========================> TD-22077 && TD-21877 +sql drop database if exists $db -x step1 +sql create database $db vgroups 1; + +sql use $db +sql create stable st1 (ts timestamp, c int) tags(a int); +sql create table t1 using st1 tags(1); +sql create table t2 using st1 tags(2); + +$i = 0 +$ts = 1674977959000 +$rowNum = 200 + +$x = 0 +while $x < $rowNum +$xs = $x * $delta +$ts = $ts0 + $xs +sql insert into t1 values ( $ts , $x ) +sql insert into t2 values ( $ts + 1000a, $x ) +$x = $x + 1 +$ts = $ts + 1000 +endw + +sql flush database $db + +sql insert into t1 values('2018-09-17 09:00:26', 26); +sql insert into t2 values('2018-09-17 09:00:25', 25); + +sql insert into t2 values('2018-09-17 09:00:30', 30); +sql flush database reg_db0; + +sql delete from st1 where ts<='2018-9-17 09:00:26'; +sql select * from st1; + system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/basic1.sim b/tests/script/tsim/stream/basic1.sim index 7bf10df637..c61c7667f8 100644 --- a/tests/script/tsim/stream/basic1.sim +++ b/tests/script/tsim/stream/basic1.sim @@ -834,4 +834,57 @@ endi print ====== test _wstart end +print insert into ts1 values(-1648791211000,1,2,3) + +sql create database test7 vgroups 1; +sql use test7; +sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create stream streams7 trigger at_once into streamt7 as select _wstart, count(*) from ts1 interval(10s) ; + +sql insert into ts1 values(1648791211000,1,2,3); +sql_error insert into ts1 values(-1648791211000,1,2,3); + +loop18: + +sleep 200 +sql select * from streamt7; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop18 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop18 +endi + +sql_error insert into ts1 values(-1648791211001,1,2,3) (1648791211001,1,2,3); + +sql select _wstart, count(*) from ts1 interval(10s) ; + +print $data00 $data01 +print $data10 $data11 + +loop19: + +sleep 200 +sql select * from streamt7; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop19 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/triggerInterval0.sim b/tests/script/tsim/stream/triggerInterval0.sim index 7353f026bb..b522dcf035 100644 --- a/tests/script/tsim/stream/triggerInterval0.sim +++ b/tests/script/tsim/stream/triggerInterval0.sim @@ -29,69 +29,119 @@ sql insert into t1 values(1648791223001,2,2,3,1.1); sql insert into t1 values(1648791223002,2,2,3,1.1); sql insert into t1 values(1648791223003,2,2,3,1.1); sql insert into t1 values(1648791223001,2,2,3,1.1); + +print step 0 + +$loop_count = 0 + +loop0: sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + sql select * from streamt; + if $rows != 1 then print ======$rows - return -1 + goto loop0 endi if $data01 != 1 then print ======$data01 - return -1 + goto loop0 endi sql insert into t1 values(1648791233001,2,2,3,1.1); + +print step 1 + +$loop_count = 0 + +loop1: sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + sql select * from streamt; if $rows != 2 then print ======$rows - return -1 + goto loop1 endi if $data01 != 1 then print ======$data01 - return -1 + goto loop1 endi if $data11 != 3 then print ======$data11 - return -1 + goto loop1 endi sql insert into t1 values(1648791223004,2,2,3,1.1); sql insert into t1 values(1648791223004,2,2,3,1.1); sql insert into t1 values(1648791223005,2,2,3,1.1); + +print step 2 + +$loop_count = 0 + +loop2: sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + sql select * from streamt; if $rows != 2 then print ======$rows - return -1 + goto loop2 endi + if $data01 != 1 then print ======$data01 - return -1 + goto loop2 endi if $data11 != 5 then print ======$data11 - return -1 + goto loop2 endi sql insert into t1 values(1648791233002,3,2,3,2.1); sql insert into t1 values(1648791213002,4,2,3,3.1) sql insert into t1 values(1648791213002,4,2,3,4.1); + +print step 3 + +$loop_count = 0 + +loop3: sleep 300 -sql select * from streamt; -if $rows != 2 then - print ======$rows - return -1 -endi -if $data01 != 2 then - print ======$data01 - return -1 -endi -if $data11 != 5 then - print ======$data11 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then return -1 endi +sql select * from streamt; +if $rows != 2 then + print ======$rows + goto loop3 +endi +if $data01 != 2 then + print ======$data01 + goto loop3 +endi +if $data11 != 5 then + print ======$data11 + goto loop3 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file