diff --git a/cmake/cmake.options b/cmake/cmake.options index e3b5782d85..3e655b1796 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -166,6 +166,10 @@ IF(${BUILD_WITH_ANALYSIS}) set(BUILD_WITH_S3 ON) ENDIF() +IF(${TD_LINUX}) + set(BUILD_WITH_ANALYSIS ON) +ENDIF() + IF(${BUILD_S3}) IF(${BUILD_WITH_S3}) @@ -205,13 +209,6 @@ option( off ) - -option( - BUILD_WITH_NURAFT - "If build with NuRaft" - OFF -) - option( BUILD_WITH_UV "If build with libuv" diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 78eded3928..9c719eb68d 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -17,7 +17,6 @@ elseif(${BUILD_WITH_COS}) file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.1/) cat("${TD_SUPPORT_DIR}/mxml_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3}) cat("${TD_SUPPORT_DIR}/apr_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3}) - cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3}) endif(${BUILD_WITH_COS}) configure_file(${CONTRIB_TMP_FILE3} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt") @@ -148,9 +147,7 @@ endif(${BUILD_WITH_SQLITE}) # s3 if(${BUILD_WITH_S3}) - cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/xml2_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) - cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/libs3_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/azure_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) add_definitions(-DUSE_S3) @@ -183,6 +180,11 @@ if(${BUILD_GEOS}) cat("${TD_SUPPORT_DIR}/geos_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) endif() +if (${BUILD_WITH_ANALYSIS}) + cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) + cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) +endif() + # if(${BUILD_PCRE2}) cat("${TD_SUPPORT_DIR}/pcre2_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) diff --git a/contrib/test/CMakeLists.txt b/contrib/test/CMakeLists.txt index 5d613dfed2..318d00b92c 100644 --- a/contrib/test/CMakeLists.txt +++ b/contrib/test/CMakeLists.txt @@ -20,14 +20,6 @@ if(${BUILD_WITH_SQLITE}) add_subdirectory(sqlite) endif(${BUILD_WITH_SQLITE}) -if(${BUILD_WITH_CRAFT}) - add_subdirectory(craft) -endif(${BUILD_WITH_CRAFT}) - -if(${BUILD_WITH_TRAFT}) - # add_subdirectory(traft) -endif(${BUILD_WITH_TRAFT}) - if(${BUILD_S3}) add_subdirectory(azure) endif() diff --git a/include/common/tanalytics.h b/include/common/tanalytics.h index 6ebdb38fa6..344093245b 100644 --- a/include/common/tanalytics.h +++ b/include/common/tanalytics.h @@ -86,7 +86,7 @@ int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf); int32_t taosAnalBufClose(SAnalyticBuf *pBuf); void taosAnalBufDestroy(SAnalyticBuf *pBuf); -const char *taosAnalAlgoStr(EAnalAlgoType algoType); +const char *taosAnalysisAlgoType(EAnalAlgoType algoType); EAnalAlgoType taosAnalAlgoInt(const char *algoName); const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType); diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index 39380a0644..ac8fea90e5 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -15,6 +15,10 @@ if(DEFINED GRANT_CFG_INCLUDE_DIR) add_definitions(-DGRANTS_CFG) endif() +if(${BUILD_WITH_ANALYSIS}) + add_definitions(-DUSE_ANALYTICS) +endif() + if(TD_GRANT) ADD_DEFINITIONS(-D_GRANT) endif() @@ -34,7 +38,9 @@ endif() target_include_directories( common + PUBLIC "$ENV{HOME}/.cos-local.2/include" PUBLIC "${TD_SOURCE_DIR}/include/common" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${GRANT_CFG_INCLUDE_DIR}" ) @@ -45,30 +51,39 @@ if(${TD_WINDOWS}) PRIVATE "${TD_SOURCE_DIR}/contrib/pthread" PRIVATE "${TD_SOURCE_DIR}/contrib/msvcregex" ) -endif() -target_link_libraries( - common - PUBLIC os - PUBLIC util - INTERFACE api -) + target_link_libraries( + common + + PUBLIC os + PUBLIC util + INTERFACE api + ) + +else() + find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) + find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) + find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) + + target_link_libraries( + common + + PUBLIC ${CURL_LIBRARY} + PUBLIC ${SSL_LIBRARY} + PUBLIC ${CRYPTO_LIBRARY} + + PUBLIC os + PUBLIC util + INTERFACE api + ) +endif() if(${BUILD_S3}) if(${BUILD_WITH_S3}) - target_include_directories( - common - - PUBLIC "$ENV{HOME}/.cos-local.2/include" - ) - set(CMAKE_FIND_LIBRARY_SUFFIXES ".a") set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2) find_library(S3_LIBRARY s3) - find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) find_library(XML2_LIBRARY xml2) - find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) - find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) target_link_libraries( common diff --git a/source/util/src/tanalytics.c b/source/common/src/tanalytics.c similarity index 96% rename from source/util/src/tanalytics.c rename to source/common/src/tanalytics.c index bf2cb4fd07..0ed67eed0a 100644 --- a/source/util/src/tanalytics.c +++ b/source/common/src/tanalytics.c @@ -36,7 +36,7 @@ typedef struct { static SAlgoMgmt tsAlgos = {0}; static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen); -const char *taosAnalAlgoStr(EAnalAlgoType type) { +const char *taosAnalysisAlgoType(EAnalAlgoType type) { switch (type) { case ANAL_ALGO_TYPE_ANOMALY_DETECT: return "anomaly-detection"; @@ -60,7 +60,7 @@ const char *taosAnalAlgoUrlStr(EAnalAlgoType type) { EAnalAlgoType taosAnalAlgoInt(const char *name) { for (EAnalAlgoType i = 0; i < ANAL_ALGO_TYPE_END; ++i) { - if (strcasecmp(name, taosAnalAlgoStr(i)) == 0) { + if (strcasecmp(name, taosAnalysisAlgoType(i)) == 0) { return i; } } @@ -188,12 +188,12 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, SAnalyticsUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen); if (pUrl != NULL) { tstrncpy(url, pUrl->url, urlLen); - uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url); + uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalysisAlgoType(type), url); } else { url[0] = 0; terrno = TSDB_CODE_ANA_ALGO_NOT_FOUND; code = terrno; - uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type)); + uError("algo:%s, type:%s, url not found", algoName, taosAnalysisAlgoType(type)); } if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) { @@ -216,20 +216,20 @@ static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void return 0; } - int64_t newDataSize = (int64_t) contLen * nmemb; + int64_t newDataSize = (int64_t)contLen * nmemb; int64_t size = pRsp->dataLen + newDataSize; if (pRsp->data == NULL) { pRsp->data = taosMemoryMalloc(size + 1); if (pRsp->data == NULL) { - uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno)); - return 0; // return the recv length, if failed, return 0 + uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno)); + return 0; // return the recv length, if failed, return 0 } } else { - char* p = taosMemoryRealloc(pRsp->data, size + 1); + char *p = taosMemoryRealloc(pRsp->data, size + 1); if (p == NULL) { - uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno)); - return 0; // return the recv length, if failed, return 0 + uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno)); + return 0; // return the recv length, if failed, return 0 } pRsp->data = p; @@ -473,7 +473,7 @@ static int32_t taosAnalJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, } int32_t bufLen = tsnprintf(buf, sizeof(buf), " [\"%s\", \"%s\", %d]%s\n", colName, tDataTypes[colType].name, - tDataTypes[colType].bytes, last ? "" : ","); + tDataTypes[colType].bytes, last ? "" : ","); if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { return terrno; } @@ -779,7 +779,9 @@ int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; } int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; } int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; } int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; } -int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; } +int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { + return 0; +} int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; } int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; } @@ -788,7 +790,7 @@ int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; } int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { return 0; } void taosAnalBufDestroy(SAnalyticBuf *pBuf) {} -const char *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; } +const char *taosAnalysisAlgoType(EAnalAlgoType algoType) { return 0; } EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; } const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; } diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index ad36d8c8ae..e4e184eee0 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -16,10 +16,10 @@ if(TD_ENTERPRISE) ELSEIF(${BUILD_WITH_COS}) add_definitions(-DUSE_COS) endif() +endif() - if(${BUILD_WITH_ANALYSIS}) - add_definitions(-DUSE_ANALYTICS) - endif() +if(${BUILD_WITH_ANALYSIS}) + add_definitions(-DUSE_ANALYTICS) endif() add_library(mnode STATIC ${MNODE_SRC}) diff --git a/source/dnode/mnode/impl/inc/mndAnode.h b/source/dnode/mnode/impl/inc/mndAnode.h index 63e8f9090e..d92d35a0fc 100644 --- a/source/dnode/mnode/impl/inc/mndAnode.h +++ b/source/dnode/mnode/impl/inc/mndAnode.h @@ -22,8 +22,9 @@ extern "C" { #endif -int32_t mndInitAnode(SMnode *pMnode); -void mndCleanupAnode(SMnode *pMnode); +int32_t mndInitAnode(SMnode* pMnode); +void mndCleanupAnode(SMnode* pMnode); +void mndRetrieveAlgoList(SMnode* pMnode, SArray* pFc, SArray* pAd); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndAnode.c b/source/dnode/mnode/impl/src/mndAnode.c index c64208600a..9f5635a74b 100644 --- a/source/dnode/mnode/impl/src/mndAnode.c +++ b/source/dnode/mnode/impl/src/mndAnode.c @@ -637,6 +637,32 @@ static void mndCancelGetNextAnode(SMnode *pMnode, void *pIter) { sdbCancelFetchByType(pSdb, pIter, SDB_ANODE); } +// todo handle multiple anode case, remove the duplicate algos +void mndRetrieveAlgoList(SMnode* pMnode, SArray* pFc, SArray* pAd) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SAnodeObj *pObj = NULL; + + while (1) { + pIter = sdbFetch(pSdb, SDB_ANODE, pIter, (void **)&pObj); + if (pIter == NULL) { + break; + } + + if (pObj->numOfAlgos >= ANAL_ALGO_TYPE_END) { + if (pObj->algos[ANAL_ALGO_TYPE_ANOMALY_DETECT] != NULL) { + taosArrayAddAll(pAd, pObj->algos[ANAL_ALGO_TYPE_ANOMALY_DETECT]); + } + + if (pObj->algos[ANAL_ALGO_TYPE_FORECAST] != NULL) { + taosArrayAddAll(pFc, pObj->algos[ANAL_ALGO_TYPE_FORECAST]); + } + } + + sdbRelease(pSdb, pObj); + } +} + static int32_t mndRetrieveAnodesFull(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -661,7 +687,7 @@ static int32_t mndRetrieveAnodesFull(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false); if (code != 0) goto _end; - STR_TO_VARSTR(buf, taosAnalAlgoStr(t)); + STR_TO_VARSTR(buf, taosAnalysisAlgoType(t)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); code = colDataSetVal(pColInfo, numOfRows, buf, false); if (code != 0) goto _end; @@ -900,5 +926,6 @@ int32_t mndInitAnode(SMnode *pMnode) { } void mndCleanupAnode(SMnode *pMnode) {} +void mndRetrieveAlgoList(SMnode *pMnode, SArray *pFc, SArray *pAd) {} #endif \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index bd613d7e69..5eee1ed3c4 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -19,6 +19,7 @@ #include "mndSync.h" #include "thttp.h" #include "tjson.h" +#include "mndAnode.h" typedef struct { int64_t numOfDnode; @@ -32,6 +33,7 @@ typedef struct { int64_t totalPoints; int64_t totalStorage; int64_t compStorage; + int32_t numOfAnalysisAlgos; } SMnodeStat; static void mndGetStat(SMnode* pMnode, SMnodeStat* pStat) { @@ -58,18 +60,21 @@ static void mndGetStat(SMnode* pMnode, SMnodeStat* pStat) { sdbRelease(pSdb, pVgroup); } +} - pStat->numOfChildTable = 100; - pStat->numOfColumn = 200; - pStat->totalPoints = 300; - pStat->totalStorage = 400; - pStat->compStorage = 500; +static int32_t algoToJson(const void* pObj, SJson* pJson) { + const SAnodeAlgo* pNode = (const SAnodeAlgo*)pObj; + int32_t code = tjsonAddStringToObject(pJson, "name", pNode->name); + return code; } static void mndBuildRuntimeInfo(SMnode* pMnode, SJson* pJson) { SMnodeStat mstat = {0}; int32_t code = 0; int32_t lino = 0; + SArray* pFcList = NULL; + SArray* pAdList = NULL; + mndGetStat(pMnode, &mstat); TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfDnode", mstat.numOfDnode), &lino, _OVER); @@ -82,8 +87,55 @@ static void mndBuildRuntimeInfo(SMnode* pMnode, SJson* pJson) { TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfPoint", mstat.totalPoints), &lino, _OVER); TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "totalStorage", mstat.totalStorage), &lino, _OVER); TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "compStorage", mstat.compStorage), &lino, _OVER); + + pFcList = taosArrayInit(4, sizeof(SAnodeAlgo)); + pAdList = taosArrayInit(4, sizeof(SAnodeAlgo)); + if (pFcList == NULL || pAdList == NULL) { + lino = __LINE__; + goto _OVER; + } + + mndRetrieveAlgoList(pMnode, pFcList, pAdList); + + if (taosArrayGetSize(pFcList) > 0) { + SJson* items = tjsonAddArrayToObject(pJson, "forecast"); + TSDB_CHECK_NULL(items, code, lino, _OVER, terrno); + + for (int32_t i = 0; i < taosArrayGetSize(pFcList); ++i) { + SJson* item = tjsonCreateObject(); + + TSDB_CHECK_NULL(item, code, lino, _OVER, terrno); + TAOS_CHECK_GOTO(tjsonAddItemToArray(items, item), &lino, _OVER); + + SAnodeAlgo* p = taosArrayGet(pFcList, i); + TSDB_CHECK_NULL(p, code, lino, _OVER, terrno); + TAOS_CHECK_GOTO(tjsonAddStringToObject(item, "name", p->name), &lino, _OVER); + } + } + + if (taosArrayGetSize(pAdList) > 0) { + SJson* items1 = tjsonAddArrayToObject(pJson, "anomaly_detection"); + TSDB_CHECK_NULL(items1, code, lino, _OVER, terrno); + + for (int32_t i = 0; i < taosArrayGetSize(pAdList); ++i) { + SJson* item = tjsonCreateObject(); + + TSDB_CHECK_NULL(item, code, lino, _OVER, terrno); + TAOS_CHECK_GOTO(tjsonAddItemToArray(items1, item), &lino, _OVER); + + SAnodeAlgo* p = taosArrayGet(pAdList, i); + TSDB_CHECK_NULL(p, code, lino, _OVER, terrno); + TAOS_CHECK_GOTO(tjsonAddStringToObject(item, "name", p->name), &lino, _OVER); + } + } + _OVER: - if (code != 0) mError("failed to mndBuildRuntimeInfo at line:%d since %s", lino, tstrerror(code)); + taosArrayDestroy(pFcList); + taosArrayDestroy(pAdList); + + if (code != 0) { + mError("failed to mndBuildRuntimeInfo at line:%d since %s", lino, tstrerror(code)); + } } static char* mndBuildTelemetryReport(SMnode* pMnode) { @@ -136,21 +188,24 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) { int32_t line = 0; SMnode* pMnode = pReq->info.node; STelemMgmt* pMgmt = &pMnode->telemMgmt; - if (!tsEnableTelem) return 0; + + if (!tsEnableTelem) { + return 0; + } (void)taosThreadMutexLock(&pMgmt->lock); char* pCont = mndBuildTelemetryReport(pMnode); (void)taosThreadMutexUnlock(&pMgmt->lock); - if (pCont == NULL) { - return 0; - } + TSDB_CHECK_NULL(pCont, code, line, _end, terrno); + code = taosSendTelemReport(&pMgmt->addrMgt, tsTelemUri, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT); taosMemoryFree(pCont); return code; + _end: if (code != 0) { - mError("%s failed to send at line %d since %s", __func__, line, tstrerror(code)); + mError("%s failed to send telemetry report, line %d since %s", __func__, line, tstrerror(code)); } taosMemoryFree(pCont); return code; @@ -161,15 +216,17 @@ int32_t mndInitTelem(SMnode* pMnode) { STelemMgmt* pMgmt = &pMnode->telemMgmt; (void)taosThreadMutexInit(&pMgmt->lock, NULL); - if ((code = taosGetEmail(pMgmt->email, sizeof(pMgmt->email))) != 0) + if ((code = taosGetEmail(pMgmt->email, sizeof(pMgmt->email))) != 0) { mWarn("failed to get email since %s", tstrerror(code)); + } + code = taosTelemetryMgtInit(&pMgmt->addrMgt, tsTelemServer); if (code != 0) { mError("failed to init telemetry management since %s", tstrerror(code)); return code; } - mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer); + mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer); return 0; } diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index eb72edb964..3124fa0b57 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -668,4 +668,4 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p } void destroyForecastInfo(void* param) {} -#endif +#endif \ No newline at end of file diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index 2985e5e000..02b122830c 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -12,13 +12,12 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ + #include "executorInt.h" #include "filter.h" -#include "function.h" #include "functionMgt.h" #include "operator.h" #include "querytask.h" -#include "storageapi.h" #include "tanalytics.h" #include "tcommon.h" #include "tcompare.h" @@ -29,24 +28,24 @@ #ifdef USE_ANALYTICS typedef struct { - char algoName[TSDB_ANALYTIC_ALGO_NAME_LEN]; - char algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN]; - char algoOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN]; - int64_t maxTs; - int64_t minTs; - int64_t numOfRows; - uint64_t groupId; - int64_t optRows; - int64_t cachedRows; - int32_t numOfBlocks; - int16_t resTsSlot; - int16_t resValSlot; - int16_t resLowSlot; - int16_t resHighSlot; - int16_t inputTsSlot; - int16_t inputValSlot; - int8_t inputValType; - int8_t inputPrecision; + char algoName[TSDB_ANALYTIC_ALGO_NAME_LEN]; + char algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN]; + char algoOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN]; + int64_t maxTs; + int64_t minTs; + int64_t numOfRows; + uint64_t groupId; + int64_t optRows; + int64_t cachedRows; + int32_t numOfBlocks; + int16_t resTsSlot; + int16_t resValSlot; + int16_t resLowSlot; + int16_t resHighSlot; + int16_t inputTsSlot; + int16_t inputValSlot; + int8_t inputValType; + int8_t inputPrecision; SAnalyticBuf analBuf; } SForecastSupp; @@ -118,7 +117,7 @@ static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, con static int32_t forecastCloseBuf(SForecastSupp* pSupp) { SAnalyticBuf* pBuf = &pSupp->analBuf; - int32_t code = 0; + int32_t code = 0; for (int32_t i = 0; i < 2; ++i) { code = taosAnalBufWriteColEnd(pBuf, i); @@ -178,7 +177,6 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp) { code = taosAnalBufWriteOptInt(pBuf, "start", start); if (code != 0) return code; - bool hasEvery = taosAnalGetOptInt(pSupp->algoOpt, "every", &every); if (!hasEvery) { qDebug("forecast every not found from %s, use %" PRId64, pSupp->algoOpt, every); @@ -192,14 +190,14 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp) { static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) { SAnalyticBuf* pBuf = &pSupp->analBuf; - int32_t resCurRow = pBlock->info.rows; - int8_t tmpI8; - int16_t tmpI16; - int32_t tmpI32; - int64_t tmpI64; - float tmpFloat; - double tmpDouble; - int32_t code = 0; + int32_t resCurRow = pBlock->info.rows; + int8_t tmpI8; + int16_t tmpI16; + int32_t tmpI32; + int64_t tmpI64; + float tmpFloat; + double tmpDouble; + int32_t code = 0; SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot); if (NULL == pResValCol) { @@ -356,8 +354,8 @@ _OVER: } static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock, const char* pId) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SAnalyticBuf* pBuf = &pSupp->analBuf; code = forecastCloseBuf(pSupp); @@ -542,7 +540,7 @@ static int32_t forecastParseAlgo(SForecastSupp* pSupp) { static int32_t forecastCreateBuf(SForecastSupp* pSupp) { SAnalyticBuf* pBuf = &pSupp->analBuf; - int64_t ts = 0; // taosGetTimestampMs(); + int64_t ts = 0; // taosGetTimestampMs(); pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL; snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-forecast-%" PRId64, tsTempDir, ts); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 5dd43ca064..1df8dcda95 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -559,13 +559,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_ERR_JRET(ctx->pJobInfo->errCode); } - if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) { - QW_TASK_ELOG("query already end, phase:%d", phase); - QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR); - } - switch (phase) { case QW_PHASE_PRE_QUERY: { + if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) { + QW_TASK_ELOG("query already end, phase:%d", phase); + QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR); + } + if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_ELOG("task already dropped at phase %s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR); @@ -592,6 +592,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu break; } case QW_PHASE_PRE_FETCH: { + if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) { + QW_TASK_ELOG("query already end, phase:%d", phase); + QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR); + } + if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase)); QW_ERR_JRET(ctx->rspCode); @@ -614,6 +619,12 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu break; } case QW_PHASE_PRE_CQUERY: { + if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) { + QW_TASK_ELOG("query already end, phase:%d", phase); + code = ctx->rspCode; + goto _return; + } + if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_ERR_JRET(ctx->rspCode); diff --git a/source/util/CMakeLists.txt b/source/util/CMakeLists.txt index 2633bb3268..d606d83712 100644 --- a/source/util/CMakeLists.txt +++ b/source/util/CMakeLists.txt @@ -17,10 +17,6 @@ else() MESSAGE(STATUS "enable assert core") endif(${ASSERT_NOT_CORE}) -if(${BUILD_WITH_ANALYSIS}) - add_definitions(-DUSE_ANALYTICS) -endif() - target_include_directories( util PUBLIC "${TD_SOURCE_DIR}/include/util"