diff --git a/cmake/cmake.install b/cmake/cmake.install index 4e3d0b166a..6dc6864975 100644 --- a/cmake/cmake.install +++ b/cmake/cmake.install @@ -1,38 +1,8 @@ -IF (EXISTS /var/lib/taos/dnode/dnodeCfg.json) - INSTALL(CODE "MESSAGE(\"The default data directory /var/lib/taos contains old data of tdengine 2.x, please clear it before installing!\")") -ELSEIF (EXISTS C:/TDengine/data/dnode/dnodeCfg.json) - INSTALL(CODE "MESSAGE(\"The default data directory C:/TDengine/data contains old data of tdengine 2.x, please clear it before installing!\")") -ELSEIF (TD_LINUX) +IF (TD_LINUX) SET(TD_MAKE_INSTALL_SH "${TD_SOURCE_DIR}/packaging/tools/make_install.sh") INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")") INSTALL(CODE "execute_process(COMMAND bash ${TD_MAKE_INSTALL_SH} ${TD_SOURCE_DIR} ${PROJECT_BINARY_DIR} Linux ${TD_VER_NUMBER})") ELSEIF (TD_WINDOWS) - SET(CMAKE_INSTALL_PREFIX C:/TDengine) - - # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/go DESTINATION connector) - # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/nodejs DESTINATION connector) - # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/python DESTINATION connector) - # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/C\# DESTINATION connector) - # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/examples DESTINATION .) - INSTALL(CODE "IF (NOT EXISTS ${CMAKE_INSTALL_PREFIX}/cfg/taos.cfg) - execute_process(COMMAND ${CMAKE_COMMAND} -E copy ${TD_SOURCE_DIR}/packaging/cfg/taos.cfg ${CMAKE_INSTALL_PREFIX}/cfg/taos.cfg) - ENDIF ()") - INSTALL(FILES ${TD_SOURCE_DIR}/include/client/taos.h DESTINATION include) - INSTALL(FILES ${TD_SOURCE_DIR}/include/util/taoserror.h DESTINATION include) - INSTALL(FILES ${TD_SOURCE_DIR}/include/libs/function/taosudf.h DESTINATION include) - INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.lib DESTINATION driver) - INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos_static.lib DESTINATION driver) - INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.dll DESTINATION driver) - INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taos.exe DESTINATION .) - INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taosd.exe DESTINATION .) - INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/udfd.exe DESTINATION .) - IF (BUILD_TOOLS) - INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taosBenchmark.exe DESTINATION .) - ENDIF () - - IF (TD_MVN_INSTALLED) - INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.38-dist.jar DESTINATION connector/jdbc) - ENDIF () SET(TD_MAKE_INSTALL_SH "${TD_SOURCE_DIR}/packaging/tools/make_install.bat") INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")") INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} :needAdmin ${TD_SOURCE_DIR} ${PROJECT_BINARY_DIR} Windows ${TD_VER_NUMBER})") diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index a1eec81ee0..494d1577fc 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -135,24 +135,6 @@ execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download") execute_process(COMMAND "${CMAKE_COMMAND}" --build . WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download") - -# clear submodule -execute_process(COMMAND git submodule deinit -f tools/taos-tools - WORKING_DIRECTORY "${TD_SOURCE_DIR}") -execute_process(COMMAND git rm --cached tools/taos-tools - WORKING_DIRECTORY "${TD_SOURCE_DIR}") -execute_process(COMMAND git submodule deinit -f tools/taosadapter - WORKING_DIRECTORY "${TD_SOURCE_DIR}") -execute_process(COMMAND git rm --cached tools/taosadapter - WORKING_DIRECTORY "${TD_SOURCE_DIR}") -execute_process(COMMAND git submodule deinit -f tools/taosws-rs - WORKING_DIRECTORY "${TD_SOURCE_DIR}") -execute_process(COMMAND git rm --cached tools/taosws-rs - WORKING_DIRECTORY "${TD_SOURCE_DIR}") -execute_process(COMMAND git submodule deinit -f examples/rust - WORKING_DIRECTORY "${TD_SOURCE_DIR}") -execute_process(COMMAND git rm --cached examples/rust - WORKING_DIRECTORY "${TD_SOURCE_DIR}") # ================================================================================================ # Build diff --git a/docs/zh/17-operation/03-tolerance.md b/docs/zh/17-operation/03-tolerance.md index 2cfd4b6484..1ce485b042 100644 --- a/docs/zh/17-operation/03-tolerance.md +++ b/docs/zh/17-operation/03-tolerance.md @@ -26,5 +26,3 @@ TDengine 集群中的时序数据的副本数是与数据库关联的,一个 TDengine 集群的节点数必须大于等于副本数,否则创建表时将报错。 当 TDengine 集群中的节点部署在不同的物理机上,并设置多个副本数时,就实现了系统的高可靠性,无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。 - -另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosX) diff --git a/packaging/tools/make_install.bat b/packaging/tools/make_install.bat index 0f9e836ae2..3c27c1beca 100644 --- a/packaging/tools/make_install.bat +++ b/packaging/tools/make_install.bat @@ -1,7 +1,47 @@ @echo off goto %1 :needAdmin + +if exist C:\\TDengine\\data\\dnode\\dnodeCfg.json ( + echo The default data directory C:/TDengine/data contains old data of tdengine 2.x, please clear it before installing! +) +set source_dir=%2 +set source_dir=%source_dir:/=\\% +set binary_dir=%3 +set binary_dir=%binary_dir:/=\\% +set osType=%4 +set verNumber=%5 +set tagert_dir=C:\\TDengine + +if not exist %tagert_dir% ( + mkdir %tagert_dir% +) +if not exist %tagert_dir%\\cfg ( + mkdir %tagert_dir%\\cfg +) +if not exist %tagert_dir%\\include ( + mkdir %tagert_dir%\\include +) +if not exist %tagert_dir%\\driver ( + mkdir %tagert_dir%\\driver +) +if not exist C:\\TDengine\\cfg\\taos.cfg ( + copy %source_dir%\\packaging\\cfg\\taos.cfg %tagert_dir%\\cfg\\taos.cfg > nul +) +copy %source_dir%\\include\\client\\taos.h %tagert_dir%\\include > nul +copy %source_dir%\\include\\util\\taoserror.h %tagert_dir%\\include > nul +copy %source_dir%\\include\\libs\\function\\taosudf.h %tagert_dir%\\include > nul +copy %binary_dir%\\build\\lib\\taos.lib %tagert_dir%\\driver > nul +copy %binary_dir%\\build\\lib\\taos_static.lib %tagert_dir%\\driver > nul +copy %binary_dir%\\build\\lib\\taos.dll %tagert_dir%\\driver > nul +copy %binary_dir%\\build\\bin\\taos.exe %tagert_dir% > nul +copy %binary_dir%\\build\\bin\\taosd.exe %tagert_dir% > nul +copy %binary_dir%\\build\\bin\\udfd.exe %tagert_dir% > nul +if exist %binary_dir%\\build\\bin\\taosBenchmark.exe ( + copy %binary_dir%\\build\\bin\\taosBenchmark.exe %tagert_dir% > nul +) + mshta vbscript:createobject("shell.application").shellexecute("%~s0",":hasAdmin","","runas",1)(window.close)&& echo To start/stop TDengine with administrator privileges: sc start/stop taosd &goto :eof :hasAdmin -cp -f C:\\TDengine\\driver\\taos.dll C:\\Windows\\System32 +copy /y C:\\TDengine\\driver\\taos.dll C:\\Windows\\System32 > nul sc query "taosd" >nul || sc create "taosd" binPath= "C:\\TDengine\\taosd.exe --win_service" start= DEMAND diff --git a/packaging/tools/make_install.sh b/packaging/tools/make_install.sh index d8d4c5bf2a..6a95ace99e 100755 --- a/packaging/tools/make_install.sh +++ b/packaging/tools/make_install.sh @@ -664,7 +664,9 @@ function install_TDengine() { ## ==============================Main program starts from here============================ echo source directory: $1 echo binary directory: $2 -if [ "$osType" != "Darwin" ]; then +if [ -x ${data_dir}/dnode/dnodeCfg.json ]; then + echo -e "\033[44;31;5mThe default data directory ${data_dir} contains old data of tdengine 2.x, please clear it before installing!\033[0m" +elif [ "$osType" != "Darwin" ]; then if [ -x ${bin_dir}/${clientName} ]; then update_TDengine else diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c763bbed9c..adc5af1a17 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -75,7 +75,7 @@ int32_t tsMonitorMaxLogs = 100; bool tsMonitorComp = false; // telem -bool tsEnableTelem = false; +bool tsEnableTelem = true; int32_t tsTelemInterval = 86400; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; uint16_t tsTelemPort = 80; @@ -166,7 +166,7 @@ int32_t tsTtlPushInterval = 86400; int32_t tsGrantHBInterval = 60; #ifndef _STORAGE -int32_t taosSetTfsCfg(SConfig *pCfg) { +int32_t taosSetTfsCfg(SConfig *pCfg) { SConfigItem *pItem = cfgGetItem(pCfg, "dataDir"); memset(tsDataDir, 0, PATH_MAX); @@ -180,7 +180,7 @@ int32_t taosSetTfsCfg(SConfig *pCfg) { uError("failed to create dataDir:%s", tsDataDir); return -1; } - return 0; + return 0; } #else int32_t taosSetTfsCfg(SConfig *pCfg); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 006d9e749c..2fb934aaad 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -489,7 +489,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); ASSERT(smaObj.uid != 0); char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0}; - snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb",pCreate->name); + snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreate->name); memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN); smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN); smaObj.stbUid = pStb->uid; @@ -530,7 +530,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.sourceDbUid = pDb->uid; streamObj.targetDbUid = pDb->uid; streamObj.version = 1; - streamObj.sql = pCreate->sql; + streamObj.sql = strdup(pCreate->sql); streamObj.smaId = smaObj.uid; streamObj.watermark = pCreate->watermark; streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE; @@ -585,6 +585,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea return -1; } if (pAst != NULL) nodesDestroyNode(pAst); + nodesDestroyNode((SNode *)pPlan); int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq); @@ -609,6 +610,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea code = 0; _OVER: + tFreeStreamObj(&streamObj); mndDestroySmaObj(&smaObj); mndTransDrop(pTrans); return code; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 0567ec4e14..09eed7fb32 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -509,6 +509,7 @@ int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) { pVgroup->replica = 1; if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1; + taosArrayDestroy(pArray); mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId); return 0; @@ -1862,4 +1863,4 @@ _OVER: #endif } -bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } \ No newline at end of file +bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index f1e980c026..a30f308ecd 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -260,7 +260,7 @@ void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap); // tsdbCache int32_t tsdbOpenCache(STsdb *pTsdb); -void tsdbCloseCache(SLRUCache *pCache); +void tsdbCloseCache(STsdb *pTsdb); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup); int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); @@ -298,6 +298,7 @@ struct STsdb { SMemTable *imem; STsdbFS fs; SLRUCache *lruCache; + TdThreadMutex lruMutex; }; struct TSDBKEY { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index b8803b20fc..e6a331f20e 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -341,7 +341,7 @@ FAIL: return -1; } -void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; } +void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; } int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) { if (pReader->tbIdHash) { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index f03b02af27..bb367ff8b1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -33,16 +33,21 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { taosLRUCacheSetStrictCapacity(pCache, true); + taosThreadMutexInit(&pTsdb->lruMutex, NULL); + _err: pTsdb->lruCache = pCache; return code; } -void tsdbCloseCache(SLRUCache *pCache) { +void tsdbCloseCache(STsdb *pTsdb) { + SLRUCache *pCache = pTsdb->lruCache; if (pCache) { taosLRUCacheEraseUnrefEntries(pCache); taosLRUCacheCleanup(pCache); + + taosThreadMutexDestroy(&pTsdb->lruMutex); } } @@ -1100,29 +1105,40 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH // getTableCacheKeyS(uid, "lr", key, &keyLen); getTableCacheKey(uid, 0, key, &keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); - if (h) { - } else { - STSRow *pRow = NULL; - bool dup = false; // which is always false for now - code = mergeLastRow(uid, pTsdb, &dup, &pRow); - // if table's empty or error, return code of -1 - if (code < 0 || pRow == NULL) { - if (!dup && pRow) { - taosMemoryFree(pRow); - } - - *handle = NULL; - return 0; - } - - _taos_lru_deleter_t deleter = deleteTableCacheLastrow; - LRUStatus status = - taosLRUCacheInsert(pCache, key, keyLen, pRow, TD_ROW_LEN(pRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW); - if (status != TAOS_LRU_STATUS_OK) { - code = -1; - } + if (!h) { + taosThreadMutexLock(&pTsdb->lruMutex); h = taosLRUCacheLookup(pCache, key, keyLen); + if (!h) { + STSRow *pRow = NULL; + bool dup = false; // which is always false for now + code = mergeLastRow(uid, pTsdb, &dup, &pRow); + // if table's empty or error, return code of -1 + if (code < 0 || pRow == NULL) { + if (!dup && pRow) { + taosMemoryFree(pRow); + } + + taosThreadMutexUnlock(&pTsdb->lruMutex); + + *handle = NULL; + + return 0; + } + + _taos_lru_deleter_t deleter = deleteTableCacheLastrow; + LRUStatus status = + taosLRUCacheInsert(pCache, key, keyLen, pRow, TD_ROW_LEN(pRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + + taosThreadMutexUnlock(&pTsdb->lruMutex); + + h = taosLRUCacheLookup(pCache, key, keyLen); + } else { + taosThreadMutexUnlock(&pTsdb->lruMutex); + } } *handle = h; diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index be2828d187..ec760e3c57 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -86,7 +86,7 @@ int tsdbClose(STsdb **pTsdb) { if (*pTsdb) { taosThreadRwlockDestroy(&(*pTsdb)->rwLock); tsdbFSClose(*pTsdb); - tsdbCloseCache((*pTsdb)->lruCache); + tsdbCloseCache(*pTsdb); taosMemoryFreeClear(*pTsdb); } return 0; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d15dc99122..9ff5b5d759 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3217,8 +3217,8 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa } static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); -static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, - SExecTaskInfo* pTaskInfo) { +static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, + SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) { pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; SSDataBlock* pResBlock = pInfo->pFinalRes; @@ -3242,8 +3242,8 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOp pInfo->existNewGroupBlock = NULL; } -static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, - SExecTaskInfo* pTaskInfo) { +static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, + SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) { if (taosFillHasMoreResults(pInfo->pFillInfo)) { int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows; taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows); @@ -3259,8 +3259,8 @@ static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOpera static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) { SFillOperatorInfo* pInfo = pOperator->info; - SExprSupp* pSup = &pOperator->exprSupp; - SSDataBlock* pResBlock = pInfo->pFinalRes; + SExprSupp* pSup = &pOperator->exprSupp; + SSDataBlock* pResBlock = pInfo->pFinalRes; setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false); projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL); @@ -3270,13 +3270,13 @@ static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlo SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId); colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info); - for(int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) { + for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) { SFillColInfo* pCol = &pInfo->pFillInfo->pFillCol[i + pInfo->numOfExpr]; ASSERT(pCol->notFillCol); SExprInfo* pExpr = pCol->pExpr; - int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId; - int32_t dstSlotId = pExpr->base.resSchema.slotId; + int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId; + int32_t dstSlotId = pExpr->base.resSchema.slotId; SColumnInfoData* pDst1 = taosArrayGet(pInfo->pRes->pDataBlock, dstSlotId); SColumnInfoData* pSrc1 = taosArrayGet(pBlock->pDataBlock, srcSlotId); @@ -3664,7 +3664,7 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { taosRemoveRef(exchangeObjRefPool, pExInfo->self); } -void freeSourceDataInfo(void *p) { +void freeSourceDataInfo(void* p) { SSourceDataInfo* pInfo = (SSourceDataInfo*)p; taosMemoryFreeClear(pInfo->pRsp); } @@ -3694,8 +3694,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey); w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC); - pInfo->pFillInfo = - taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, pInfo->primaryTsCol, order, id); + pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, + pInfo->primaryTsCol, order, id); pInfo->win = win; pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); @@ -3721,10 +3721,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr); - SInterval* pInterval = + SInterval* pInterval = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType - ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval - : &((SIntervalAggOperatorInfo*)downstream->info)->interval; + ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval + : &((SIntervalAggOperatorInfo*)downstream->info)->interval; int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t type = convertFillType(pPhyFillNode->mode); @@ -3741,9 +3741,9 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); - int32_t code = - initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, (SNodeListNode*)pPhyFillNode->pValues, - pPhyFillNode->timeRange, pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order); + int32_t code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, + (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity, + pTaskInfo->id.str, pInterval, type, order); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index aca0078592..7b13aa8ad8 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -139,7 +139,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn } assert(w.ekey > pBlockInfo->window.ekey); - if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) { + if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) { return true; } } @@ -147,7 +147,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey); assert(w.skey <= pBlockInfo->window.ekey); - if (w.skey > pBlockInfo->window.skey) { + if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) { return true; } @@ -158,7 +158,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn } assert(w.skey < pBlockInfo->window.skey); - if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) { + if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) { return true; } } @@ -1649,6 +1649,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } taosArrayDestroy(tableIdList); memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->cond, sizeof(SQueryTableDataCond)); + } else { + taosArrayDestroy(pColIds); } // create the pseduo columns info @@ -2038,10 +2040,34 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { metaReaderClear(&smr); if (numOfRows >= pOperator->resultInfo.capacity) { - break; + p->info.rows = numOfRows; + pInfo->pRes->info.rows = numOfRows; + + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + doFilterResult(pInfo); + + blockDataCleanup(p); + numOfRows = 0; + + if (pInfo->pRes->info.rows > 0) { + break; + } } } + if (numOfRows > 0) { + p->info.rows = numOfRows; + pInfo->pRes->info.rows = numOfRows; + + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + doFilterResult(pInfo); + + blockDataCleanup(p); + numOfRows = 0; + } + + blockDataDestroy(p); + // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found if (ret != 0) { metaCloseTbCursor(pInfo->pCur); @@ -2049,14 +2075,6 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); } - p->info.rows = numOfRows; - pInfo->pRes->info.rows = numOfRows; - - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); - doFilterResult(pInfo); - - blockDataDestroy(p); - pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } @@ -2213,10 +2231,34 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { colDataAppend(pColInfoData, numOfRows, n, false); if (++numOfRows >= pOperator->resultInfo.capacity) { - break; + p->info.rows = numOfRows; + pInfo->pRes->info.rows = numOfRows; + + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + doFilterResult(pInfo); + + blockDataCleanup(p); + numOfRows = 0; + + if (pInfo->pRes->info.rows > 0) { + break; + } } } + if (numOfRows > 0) { + p->info.rows = numOfRows; + pInfo->pRes->info.rows = numOfRows; + + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + doFilterResult(pInfo); + + blockDataCleanup(p); + numOfRows = 0; + } + + blockDataDestroy(p); + // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found if (ret != 0) { metaCloseTbCursor(pInfo->pCur); @@ -2224,14 +2266,6 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); } - p->info.rows = numOfRows; - pInfo->pRes->info.rows = numOfRows; - - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); - doFilterResult(pInfo); - - blockDataDestroy(p); - pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 757ab09d1a..02be01cdb8 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1706,14 +1706,19 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { blockDataDestroy(pInfo->pPullDataRes); taosArrayDestroy(pInfo->pRecycledPages); blockDataDestroy(pInfo->pUpdateRes); + taosArrayDestroy(pInfo->pDelWins); + blockDataDestroy(pInfo->pDelRes); if (pInfo->pChildren) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i); destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput); + taosMemoryFree(pChildOp->pDownstream); + cleanupExprSupp(&pChildOp->exprSupp); taosMemoryFreeClear(pChildOp); } + taosArrayDestroy(pInfo->pChildren); } nodesDestroyNode((SNode*)pInfo->pPhyNode); colDataDestroy(&pInfo->twAggSup.timeWindowData); @@ -2644,7 +2649,6 @@ void destroyTimeSliceOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } - SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 5aeaecd598..bf4a07f8e2 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3970,16 +3970,16 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { TSKEY* ptsList = (int64_t*)colDataGetData(pCol, 0); if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->start.key == INT64_MIN) { - pInfo->max = - (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max; + pInfo->max = (pInfo->max < ptsList[start]) ? ptsList[start] : pInfo->max; } else { pInfo->max = pCtx->start.key + 1; } - if (pCtx->end.key != INT64_MIN) { - pInfo->min = pCtx->end.key; + if (pCtx->end.key == INT64_MIN) { + pInfo->min = (pInfo->min > ptsList[start + pInput->numOfRows - 1]) ? + ptsList[start + pInput->numOfRows - 1] : pInfo->min; } else { - pInfo->min = ptsList[start]; + pInfo->min = pCtx->end.key; } } else { if (pCtx->start.key == INT64_MIN) { @@ -3988,10 +3988,11 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { pInfo->min = pCtx->start.key; } - if (pCtx->end.key != INT64_MIN) { - pInfo->max = pCtx->end.key + 1; + if (pCtx->end.key == INT64_MIN) { + pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? + ptsList[start + pInput->numOfRows - 1] : pInfo->max; } else { - pInfo->max = ptsList[start + pInput->numOfRows - 1]; + pInfo->max = pCtx->end.key + 1; } } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index d588d90543..4009a47c65 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -152,11 +152,12 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } void tFreeSStreamTask(SStreamTask* pTask) { + qDebug("free stream task %d", pTask->taskId); if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg); if (pTask->exec.executor) qDestroyTask(pTask->exec.executor); - taosArrayDestroy(pTask->childEpInfo); + taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree); if (pTask->outputType == TASK_OUTPUT__TABLE) { tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper); taosMemoryFree(pTask->tbSink.pTSchema); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 6b52c74271..c6f3066be7 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -98,6 +98,17 @@ typedef void* queue[2]; #define TRANS_RETRY_INTERVAL 15 // retry interval (ms) #define TRANS_CONN_TIMEOUT 3 // connect timeout (s) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) +#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 + +#define TRANS_MAGIC_NUM 0x5f375a86 + +#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) + +#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 + +#define TRANS_MAGIC_NUM 0x5f375a86 + +#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; @@ -151,6 +162,7 @@ typedef struct { char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset char user[TSDB_UNI_LEN]; + uint32_t magicNum; STraceId traceId; uint64_t ahandle; // ahandle assigned by client uint32_t code; // del later @@ -203,6 +215,7 @@ typedef struct SConnBuffer { int cap; int left; int total; + int invalid; } SConnBuffer; typedef void (*AsyncCB)(uv_async_t* handle); diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 62277a7569..4e49e9ca13 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -14,15 +14,22 @@ */ #define _DEFAULT_SOURCE -#ifdef USE_UV -#include -#endif // clang-format off +#include #include "zlib.h" #include "thttp.h" #include "taoserror.h" #include "tlog.h" +typedef struct SHttpClient { + uv_connect_t conn; + uv_tcp_t tcp; + uv_write_t req; + uv_buf_t* buf; + char* addr; + uint16_t port; +} SHttpClient; + static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, EHttpCompFlag flag) { if (flag == HTTP_FLAT) { @@ -45,7 +52,7 @@ static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pH } } -int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { +static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { int32_t code = -1; int32_t destLen = srcLen; void* pDest = taosMemoryMalloc(destLen); @@ -114,84 +121,53 @@ _OVER: return code; } -#ifdef USE_UV -static void clientConnCb(uv_connect_t* req, int32_t status) { - if (status < 0) { +static void destroyHttpClient(SHttpClient* cli) { + taosMemoryFree(cli->buf); + taosMemoryFree(cli->addr); + taosMemoryFree(cli); +} +static void clientCloseCb(uv_handle_t* handle) { + SHttpClient* cli = handle->data; + destroyHttpClient(cli); +} +static void clientSentCb(uv_write_t* req, int32_t status) { + SHttpClient* cli = req->data; + if (status != 0) { terrno = TAOS_SYSTEM_ERROR(status); - uError("connection error %s", uv_strerror(status)); - uv_close((uv_handle_t*)req->handle, NULL); + uError("http-report failed to send data %s", uv_strerror(status)); + } else { + uInfo("http-report succ to send data"); + } + uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); +} +static void clientConnCb(uv_connect_t* req, int32_t status) { + SHttpClient* cli = req->data; + if (status != 0) { + terrno = TAOS_SYSTEM_ERROR(status); + uError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); return; } - uv_buf_t* wb = req->data; - assert(wb != NULL); - uv_write_t write_req; - uv_write(&write_req, req->handle, wb, 2, NULL); - uv_close((uv_handle_t*)req->handle, NULL); + uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->buf, 2, clientSentCb); } -int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { - uint32_t ipv4 = taosGetIpv4FromFqdn(server); - if (ipv4 == 0xffffffff) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to get http server:%s ip since %s", server, terrstr()); - return -1; - } - - char ipv4Buf[128] = {0}; - tinet_ntoa(ipv4Buf, ipv4); - - struct sockaddr_in dest = {0}; - uv_ip4_addr(ipv4Buf, port, &dest); - - uv_tcp_t socket_tcp = {0}; - uv_loop_t* loop = uv_default_loop(); - uv_tcp_init(loop, &socket_tcp); - uv_connect_t* connect = (uv_connect_t*)taosMemoryMalloc(sizeof(uv_connect_t)); - - if (flag == HTTP_GZIP) { - int32_t dstLen = taosCompressHttpRport(pCont, contLen); - if (dstLen > 0) { - contLen = dstLen; - } else { - flag = HTTP_FLAT; - } - } - - char header[1024] = {0}; - int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); - - uv_buf_t wb[2]; - wb[0] = uv_buf_init((char*)header, headLen); - wb[1] = uv_buf_init((char*)pCont, contLen); - - connect->data = wb; - terrno = 0; - uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb); - uv_run(loop, UV_RUN_DEFAULT); - uv_loop_close(loop); - taosMemoryFree(connect); - return terrno; -} - -#else -int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { - int32_t code = -1; - TdSocketPtr pSocket = NULL; - +static int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { uint32_t ip = taosGetIpv4FromFqdn(server); if (ip == 0xffffffff) { terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to get http server:%s ip since %s", server, terrstr()); - goto SEND_OVER; + uError("http-report failed to get http server:%s ip since %s", server, terrstr()); + return -1; } - - pSocket = taosOpenTcpClientSocket(ip, port, 0); - if (pSocket == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to create http socket to %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; + char buf[128] = {0}; + tinet_ntoa(buf, ip); + uv_ip4_addr(buf, port, dest); + return 0; +} +int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { + struct sockaddr_in dest = {0}; + if (taosBuildDstAddr(server, port, &dest) < 0) { + return -1; } - if (flag == HTTP_GZIP) { int32_t dstLen = taosCompressHttpRport(pCont, contLen); if (dstLen > 0) { @@ -200,37 +176,38 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 flag = HTTP_FLAT; } } + terrno = 0; - char header[1024] = {0}; + char header[2048] = {0}; int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); - if (taosWriteMsg(pSocket, header, headLen) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to send http header to %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; + + uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); + wb[0] = uv_buf_init((char*)header, headLen); // stack var + wb[1] = uv_buf_init((char*)pCont, contLen); // heap var + + SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); + cli->conn.data = cli; + cli->tcp.data = cli; + cli->req.data = cli; + cli->buf = wb; + cli->addr = tstrdup(server); + cli->port = port; + + uv_loop_t* loop = uv_default_loop(); + uv_tcp_init(loop, &cli->tcp); + // set up timeout to avoid stuck; + int32_t fd = taosCreateSocketWithTimeout(5); + uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); + + + int32_t ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb); + if (ret != 0) { + uError("http-report failed to connect to server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); + destroyHttpClient(cli); } - if (taosWriteMsg(pSocket, (void*)pCont, contLen) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to send http content to %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; - } - - // read something to avoid nginx error 499 - if (taosWriteMsg(pSocket, header, 10) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to receive response from %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; - } - - code = 0; - -SEND_OVER: - if (pSocket != NULL) { - taosCloseSocket(&pSocket); - } - - return code; + uv_run(loop, UV_RUN_DEFAULT); + uv_loop_close(loop); + return terrno; } - // clang-format on -#endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index add007e14d..763483cbf6 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -318,10 +318,17 @@ void cliHandleResp(SCliConn* conn) { } STransMsgHead* pHead = NULL; - transDumpFromBuffer(&conn->readBuf, (char**)&pHead); + if (transDumpFromBuffer(&conn->readBuf, (char**)&pHead) <= 0) { + tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); + return; + } pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); + if (cliRecvReleaseReq(conn, pHead)) { + return; + } + STransMsg transMsg = {0}; transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.pCont = transContFromHead((char*)pHead); @@ -333,10 +340,6 @@ void cliHandleResp(SCliConn* conn) { SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; - if (cliRecvReleaseReq(conn, pHead)) { - return; - } - if (CONN_NO_PERSIST_BY_APP(conn)) { pMsg = transQueuePop(&conn->cliMsgs); @@ -598,7 +601,12 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { pBuf->len += nread; while (transReadComplete(pBuf)) { tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn); - cliHandleResp(conn); + if (pBuf->invalid) { + cliHandleExcept(conn); + break; + } else { + cliHandleResp(conn); + } } return; } @@ -759,6 +767,7 @@ void cliSend(SCliConn* pConn) { pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); pHead->traceId = pMsg->info.traceId; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 4272ec0b1c..a4d679b281 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -91,6 +91,7 @@ int transInitBuffer(SConnBuffer* buf) { buf->left = -1; buf->len = 0; buf->total = 0; + buf->invalid = 0; return 0; } int transDestroyBuffer(SConnBuffer* p) { @@ -108,19 +109,25 @@ int transClearBuffer(SConnBuffer* buf) { p->left = -1; p->len = 0; p->total = 0; + p->invalid = 0; return 0; } int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { + static const int HEADSIZE = sizeof(STransMsgHead); + SConnBuffer* p = connBuf; if (p->left != 0) { return -1; } int total = connBuf->total; - *buf = taosMemoryCalloc(1, total); - memcpy(*buf, p->buf, total); - - transResetBuffer(connBuf); + if (total >= HEADSIZE && !p->invalid) { + *buf = taosMemoryCalloc(1, total); + memcpy(*buf, p->buf, total); + transResetBuffer(connBuf); + } else { + total = -1; + } return total; } @@ -173,6 +180,7 @@ bool transReadComplete(SConnBuffer* connBuf) { memcpy((char*)&head, connBuf->buf, sizeof(head)); int32_t msgLen = (int32_t)htonl(head.msgLen); p->total = msgLen; + p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)); } if (p->total >= p->len) { p->left = p->total - p->len; @@ -180,7 +188,7 @@ bool transReadComplete(SConnBuffer* connBuf) { p->left = 0; } } - return p->left == 0 ? true : false; + return (p->left == 0 || p->invalid) ? true : false; } int transSetConnOption(uv_tcp_t* stream) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 3512b27bf8..db05aefe7b 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -183,17 +183,25 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { tDebug("%p timeout since no activity", conn); } -static void uvHandleReq(SSvrConn* pConn) { - STransMsgHead* msg = NULL; - int msgLen = 0; +static bool uvHandleReq(SSvrConn* pConn) { + STrans* pTransInst = pConn->pTransInst; - msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); + STransMsgHead* msg = NULL; + int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); + if (msgLen <= 0) { + tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn); + return false; + } STransMsgHead* pHead = (STransMsgHead*)msg; pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); memcpy(pConn->user, pHead->user, strlen(pHead->user)); + if (uvRecvReleaseReq(pConn, pHead)) { + return true; + } + // TODO(dengyihao): time-consuming task throwed into BG Thread // uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t)); // wreq->data = pConn; @@ -201,10 +209,6 @@ static void uvHandleReq(SSvrConn* pConn) { // transRefSrvHandle(pConn); // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); - if (uvRecvReleaseReq(pConn, pHead)) { - return; - } - STransMsg transMsg; memset(&transMsg, 0, sizeof(transMsg)); transMsg.contLen = transContLenFromMsg(pHead->msgLen); @@ -220,7 +224,6 @@ static void uvHandleReq(SSvrConn* pConn) { tDebug("conn %p acquired by server app", pConn); } } - STrans* pTransInst = pConn->pTransInst; STraceId* trace = &pHead->traceId; if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); @@ -258,21 +261,33 @@ static void uvHandleReq(SSvrConn* pConn) { transReleaseExHandle(transGetRefMgt(), pConn->refId); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); + return true; } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { - // opt - SSvrConn* conn = cli->data; + SSvrConn* conn = cli->data; + STrans* pTransInst = conn->pTransInst; + SConnBuffer* pBuf = &conn->readBuf; - STrans* pTransInst = conn->pTransInst; if (nread > 0) { pBuf->len += nread; tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread); - while (transReadComplete(pBuf)) { - tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); - uvHandleReq(conn); + if (pBuf->len <= TRANS_PACKET_LIMIT) { + while (transReadComplete(pBuf)) { + tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); + if (pBuf->invalid) { + tTrace("%s conn %p alread read invalid packet", transLabel(pTransInst), conn); + destroyConn(conn, true); + return; + } else { + if (false == uvHandleReq(conn)) break; + } + } + return; + } else { + destroyConn(conn, true); + return; } - return; } if (nread == 0) { return; @@ -364,6 +379,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->ahandle = (uint64_t)pMsg->info.ahandle; pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); if (pConn->status == ConnNormal) { pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); @@ -859,6 +875,7 @@ static int reallocConnRef(SSvrConn* conn) { } static void uvDestroyConn(uv_handle_t* handle) { SSvrConn* conn = handle->data; + if (conn == NULL) { return; } @@ -874,9 +891,8 @@ static void uvDestroyConn(uv_handle_t* handle) { SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i); destroySmsg(msg); } - - transReqQueueClear(&conn->wreqQueue); transQueueDestroy(&conn->srvMsgs); + transReqQueueClear(&conn->wreqQueue); QUEUE_REMOVE(&conn->queue); taosMemoryFree(conn->pTcp); diff --git a/tests/script/tsim/stream/basic0.sim b/tests/script/tsim/stream/basic0.sim index 9a5fb8012f..6d05f69dcf 100644 --- a/tests/script/tsim/stream/basic0.sim +++ b/tests/script/tsim/stream/basic0.sim @@ -1,7 +1,7 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 -system sh/exec.sh -n dnode1 -s start -sleep 50 +system sh/cfg.sh -n dnode1 -c debugflag -v 131 +system sh/exec.sh -n dnode1 -s start -v sql connect print =============== create database @@ -137,4 +137,17 @@ if $data13 != 789 then return -1 endi +_OVER: system sh/exec.sh -n dnode1 -s stop -x SIGINT +print =============== check +$null= + +system_content sh/checkValgrind.sh -n dnode1 +print cmd return result ----> [ $system_content ] +if $system_content > 0 then + return -1 +endi + +if $system_content == $null then + return -1 +endi