From 84580286ced9e6e456bd9db31258e8ee58946963 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 22 Jan 2025 11:11:06 +0800 Subject: [PATCH 1/9] enh(analysis): enable community edition to support anodes. --- cmake/cmake.options | 2 + source/common/CMakeLists.txt | 25 +- source/dnode/mnode/impl/src/mndAnode.c | 26 +- .../libs/executor/src/anomalywindowoperator.c | 12 - source/libs/executor/src/forecastoperator.c | 10 - source/util/src/tanalytics.c | 795 ------------------ 6 files changed, 19 insertions(+), 851 deletions(-) delete mode 100644 source/util/src/tanalytics.c diff --git a/cmake/cmake.options b/cmake/cmake.options index e3b5782d85..4c69544466 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -166,6 +166,8 @@ IF(${BUILD_WITH_ANALYSIS}) set(BUILD_WITH_S3 ON) ENDIF() +set(BUILD_WITH_ANALYSIS ON) + IF(${BUILD_S3}) IF(${BUILD_WITH_S3}) diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index 39380a0644..6d2648502d 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -34,7 +34,9 @@ endif() target_include_directories( common + PUBLIC "$ENV{HOME}/.cos-local.2/include" PUBLIC "${TD_SOURCE_DIR}/include/common" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${GRANT_CFG_INCLUDE_DIR}" ) @@ -47,28 +49,33 @@ if(${TD_WINDOWS}) ) endif() +find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) +find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) +find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) + target_link_libraries( common + + PUBLIC ${CURL_LIBRARY} + PUBLIC ${SSL_LIBRARY} + PUBLIC ${CRYPTO_LIBRARY} + PUBLIC os PUBLIC util INTERFACE api ) + + if(${BUILD_S3}) if(${BUILD_WITH_S3}) - target_include_directories( - common - - PUBLIC "$ENV{HOME}/.cos-local.2/include" - ) - set(CMAKE_FIND_LIBRARY_SUFFIXES ".a") set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2) find_library(S3_LIBRARY s3) - find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) +# find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) find_library(XML2_LIBRARY xml2) - find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) - find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) +# find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) +# find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) target_link_libraries( common diff --git a/source/dnode/mnode/impl/src/mndAnode.c b/source/dnode/mnode/impl/src/mndAnode.c index c64208600a..de3f7a607c 100644 --- a/source/dnode/mnode/impl/src/mndAnode.c +++ b/source/dnode/mnode/impl/src/mndAnode.c @@ -24,8 +24,6 @@ #include "tanalytics.h" #include "tjson.h" -#ifdef USE_ANALYTICS - #define TSDB_ANODE_VER_NUMBER 1 #define TSDB_ANODE_RESERVE_SIZE 64 @@ -879,26 +877,4 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) { _OVER: tFreeRetrieveAnalAlgoRsp(&rsp); TAOS_RETURN(code); -} - -#else - -static int32_t mndProcessUnsupportReq(SRpcMsg *pReq) { return TSDB_CODE_OPS_NOT_SUPPORT; } -static int32_t mndRetrieveUnsupport(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { - return TSDB_CODE_OPS_NOT_SUPPORT; -} - -int32_t mndInitAnode(SMnode *pMnode) { - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ANODE, mndProcessUnsupportReq); - mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_ANODE, mndProcessUnsupportReq); - mndSetMsgHandle(pMnode, TDMT_MND_DROP_ANODE, mndProcessUnsupportReq); - mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_ANAL_ALGO, mndProcessUnsupportReq); - - mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ANODE, mndRetrieveUnsupport); - mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ANODE_FULL, mndRetrieveUnsupport); - return 0; -} - -void mndCleanupAnode(SMnode *pMnode) {} - -#endif \ No newline at end of file +} \ No newline at end of file diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index eb72edb964..8e539d52d9 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -26,8 +26,6 @@ #include "tjson.h" #include "ttime.h" -#ifdef USE_ANALYTICS - typedef struct { SArray* blocks; // SSDataBlock* SArray* windows; // STimeWindow @@ -659,13 +657,3 @@ _OVER: return code; } - -#else - -int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, - SOperatorInfo** pOptrInfo) { - return TSDB_CODE_OPS_NOT_SUPPORT; -} -void destroyForecastInfo(void* param) {} - -#endif diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index 2985e5e000..9eecad8644 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -26,8 +26,6 @@ #include "tfill.h" #include "ttime.h" -#ifdef USE_ANALYTICS - typedef struct { char algoName[TSDB_ANALYTIC_ALGO_NAME_LEN]; char algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN]; @@ -662,11 +660,3 @@ static void destroyForecastInfo(void* param) { taosMemoryFreeClear(param); } -#else - -int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, - SOperatorInfo** pOptrInfo) { - return TSDB_CODE_OPS_NOT_SUPPORT; -} - -#endif diff --git a/source/util/src/tanalytics.c b/source/util/src/tanalytics.c deleted file mode 100644 index bf2cb4fd07..0000000000 --- a/source/util/src/tanalytics.c +++ /dev/null @@ -1,795 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "tanalytics.h" -#include "ttypes.h" -#include "tutil.h" - -#ifdef USE_ANALYTICS -#include -#define ANALYTICS_ALOG_SPLIT_CHAR "," - -typedef struct { - int64_t ver; - SHashObj *hash; // algoname:algotype -> SAnalyticsUrl - TdThreadMutex lock; -} SAlgoMgmt; - -typedef struct { - char *data; - int64_t dataLen; -} SCurlResp; - -static SAlgoMgmt tsAlgos = {0}; -static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen); - -const char *taosAnalAlgoStr(EAnalAlgoType type) { - switch (type) { - case ANAL_ALGO_TYPE_ANOMALY_DETECT: - return "anomaly-detection"; - case ANAL_ALGO_TYPE_FORECAST: - return "forecast"; - default: - return "unknown"; - } -} - -const char *taosAnalAlgoUrlStr(EAnalAlgoType type) { - switch (type) { - case ANAL_ALGO_TYPE_ANOMALY_DETECT: - return "anomaly-detect"; - case ANAL_ALGO_TYPE_FORECAST: - return "forecast"; - default: - return "unknown"; - } -} - -EAnalAlgoType taosAnalAlgoInt(const char *name) { - for (EAnalAlgoType i = 0; i < ANAL_ALGO_TYPE_END; ++i) { - if (strcasecmp(name, taosAnalAlgoStr(i)) == 0) { - return i; - } - } - - return ANAL_ALGO_TYPE_END; -} - -int32_t taosAnalyticsInit() { - if (curl_global_init(CURL_GLOBAL_ALL) != 0) { - uError("failed to init curl"); - return -1; - } - - tsAlgos.ver = 0; - if (taosThreadMutexInit(&tsAlgos.lock, NULL) != 0) { - uError("failed to init algo mutex"); - return -1; - } - - tsAlgos.hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); - if (tsAlgos.hash == NULL) { - uError("failed to init algo hash"); - return -1; - } - - uInfo("analysis env is initialized"); - return 0; -} - -static void taosAnalFreeHash(SHashObj *hash) { - void *pIter = taosHashIterate(hash, NULL); - while (pIter != NULL) { - SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter; - taosMemoryFree(pUrl->url); - pIter = taosHashIterate(hash, pIter); - } - taosHashCleanup(hash); -} - -void taosAnalyticsCleanup() { - curl_global_cleanup(); - if (taosThreadMutexDestroy(&tsAlgos.lock) != 0) { - uError("failed to destroy anal lock"); - } - taosAnalFreeHash(tsAlgos.hash); - tsAlgos.hash = NULL; - uInfo("analysis env is cleaned up"); -} - -void taosAnalUpdate(int64_t newVer, SHashObj *pHash) { - if (newVer > tsAlgos.ver) { - if (taosThreadMutexLock(&tsAlgos.lock) == 0) { - SHashObj *hash = tsAlgos.hash; - tsAlgos.ver = newVer; - tsAlgos.hash = pHash; - if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) { - uError("failed to unlock hash") - } - taosAnalFreeHash(hash); - } - } else { - taosAnalFreeHash(pHash); - } -} - -bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { - char buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0}; - char *pStart = NULL; - char *pEnd = NULL; - - pStart = strstr(option, optName); - if (pStart == NULL) { - return false; - } - - pEnd = strstr(pStart, ANALYTICS_ALOG_SPLIT_CHAR); - if (optMaxLen > 0) { - if (pEnd > pStart) { - int32_t len = (int32_t)(pEnd - pStart); - len = MIN(len + 1, TSDB_ANALYTIC_ALGO_OPTION_LEN); - tstrncpy(buf, pStart, len); - } else { - int32_t len = MIN(tListLen(buf), strlen(pStart) + 1); - tstrncpy(buf, pStart, len); - } - - char *pRight = strstr(buf, "="); - if (pRight == NULL) { - return false; - } else { - pRight += 1; - } - - int32_t unused = strtrim(pRight); - - int32_t vLen = MIN(optMaxLen, strlen(pRight) + 1); - tstrncpy(optValue, pRight, vLen); - } - - return true; -} - -bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) { - char buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0}; - int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName); - - char *pos1 = strstr(option, buf); - char *pos2 = strstr(option, ANALYTICS_ALOG_SPLIT_CHAR); - if (pos1 != NULL) { - *optValue = taosStr2Int64(pos1 + bufLen, NULL, 10); - return true; - } else { - return false; - } -} - -int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { - int32_t code = 0; - char name[TSDB_ANALYTIC_ALGO_KEY_LEN] = {0}; - int32_t nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", type, algoName); - - char *unused = strntolower(name, name, nameLen); - - if (taosThreadMutexLock(&tsAlgos.lock) == 0) { - SAnalyticsUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen); - if (pUrl != NULL) { - tstrncpy(url, pUrl->url, urlLen); - uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url); - } else { - url[0] = 0; - terrno = TSDB_CODE_ANA_ALGO_NOT_FOUND; - code = terrno; - uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type)); - } - - if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) { - uError("failed to unlock hash"); - return TSDB_CODE_OUT_OF_MEMORY; - } - } - - return code; -} - -int64_t taosAnalGetVersion() { return tsAlgos.ver; } - -static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) { - SCurlResp *pRsp = userdata; - if (contLen == 0 || nmemb == 0 || pCont == NULL) { - pRsp->dataLen = 0; - pRsp->data = NULL; - uError("curl response is received, len:%" PRId64, pRsp->dataLen); - return 0; - } - - int64_t newDataSize = (int64_t) contLen * nmemb; - int64_t size = pRsp->dataLen + newDataSize; - - if (pRsp->data == NULL) { - pRsp->data = taosMemoryMalloc(size + 1); - if (pRsp->data == NULL) { - uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno)); - return 0; // return the recv length, if failed, return 0 - } - } else { - char* p = taosMemoryRealloc(pRsp->data, size + 1); - if (p == NULL) { - uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno)); - return 0; // return the recv length, if failed, return 0 - } - - pRsp->data = p; - } - - if (pRsp->data != NULL) { - (void)memcpy(pRsp->data + pRsp->dataLen, pCont, newDataSize); - - pRsp->dataLen = size; - pRsp->data[size] = 0; - - uDebugL("curl response is received, len:%" PRId64 ", content:%s", size, pRsp->data); - return newDataSize; - } else { - pRsp->dataLen = 0; - uError("failed to malloc curl response"); - return 0; - } -} - -static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp) { - CURL *curl = NULL; - CURLcode code = 0; - - curl = curl_easy_init(); - if (curl == NULL) { - uError("failed to create curl handle"); - return -1; - } - - if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 100) != 0) goto _OVER; - - uDebug("curl get request will sent, url:%s", url); - code = curl_easy_perform(curl); - if (code != CURLE_OK) { - uError("failed to perform curl action, code:%d", code); - } - -_OVER: - if (curl != NULL) curl_easy_cleanup(curl); - return code; -} - -static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen) { - struct curl_slist *headers = NULL; - CURL *curl = NULL; - CURLcode code = 0; - - curl = curl_easy_init(); - if (curl == NULL) { - uError("failed to create curl handle"); - return -1; - } - - headers = curl_slist_append(headers, "Content-Type:application/json;charset=UTF-8"); - if (curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 60000) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_POST, 1) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf) != 0) goto _OVER; - - uDebugL("curl post request will sent, url:%s len:%d content:%s", url, bufLen, buf); - code = curl_easy_perform(curl); - if (code != CURLE_OK) { - uError("failed to perform curl action, code:%d", code); - } - -_OVER: - if (curl != NULL) { - curl_slist_free_all(headers); - curl_easy_cleanup(curl); - } - return code; -} - -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { - int32_t code = -1; - char *pCont = NULL; - int64_t contentLen; - SJson *pJson = NULL; - SCurlResp curlRsp = {0}; - - if (type == ANALYTICS_HTTP_TYPE_GET) { - if (taosCurlGetRequest(url, &curlRsp) != 0) { - terrno = TSDB_CODE_ANA_URL_CANT_ACCESS; - goto _OVER; - } - } else { - code = taosAnalBufGetCont(pBuf, &pCont, &contentLen); - if (code != 0) { - terrno = code; - goto _OVER; - } - if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen) != 0) { - terrno = TSDB_CODE_ANA_URL_CANT_ACCESS; - goto _OVER; - } - } - - if (curlRsp.data == NULL || curlRsp.dataLen == 0) { - terrno = TSDB_CODE_ANA_URL_RSP_IS_NULL; - goto _OVER; - } - - pJson = tjsonParse(curlRsp.data); - if (pJson == NULL) { - if (curlRsp.data[0] == '<') { - terrno = TSDB_CODE_ANA_ANODE_RETURN_ERROR; - } else { - terrno = TSDB_CODE_INVALID_JSON_FORMAT; - } - goto _OVER; - } - -_OVER: - if (curlRsp.data != NULL) taosMemoryFreeClear(curlRsp.data); - if (pCont != NULL) taosMemoryFree(pCont); - return pJson; -} - -static int32_t taosAnalJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) { - int32_t code = 0; - int64_t contLen; - char *pCont = NULL; - TdFilePtr pFile = NULL; - - pFile = taosOpenFile(fileName, TD_FILE_READ); - if (pFile == NULL) { - code = terrno; - goto _OVER; - } - - code = taosFStatFile(pFile, &contLen, NULL); - if (code != 0) goto _OVER; - - pCont = taosMemoryMalloc(contLen + 1); - if (pCont == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - - if (taosReadFile(pFile, pCont, contLen) != contLen) { - code = terrno; - goto _OVER; - } - - pCont[contLen] = '\0'; - -_OVER: - if (code == 0) { - *ppCont = pCont; - *pContLen = contLen; - } else { - if (pCont != NULL) taosMemoryFree(pCont); - } - if (pFile != NULL) taosCloseFile(&pFile); - return code; -} - -static int32_t taosAnalJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { - char buf[64] = {0}; - int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal); - if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { - return terrno; - } - return 0; -} - -static int32_t taosAnalJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { - char buf[128] = {0}; - int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": \"%s\",\n", optName, optVal); - if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { - return terrno; - } - return 0; -} - -static int32_t taosAnalJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { - char buf[128] = {0}; - int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal); - if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { - return terrno; - } - return 0; -} - -static int32_t taosAnalJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen) { - if (bufLen <= 0) { - bufLen = strlen(buf); - } - if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { - return terrno; - } - return 0; -} - -static int32_t taosAnalJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "{\n", 0); } - -static int32_t tsosAnalJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { - pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); - if (pBuf->filePtr == NULL) { - return terrno; - } - - pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalyticsColBuf)); - if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY; - pBuf->numOfCols = numOfCols; - - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) { - return taosAnalJsonBufWriteStart(pBuf); - } - - for (int32_t i = 0; i < numOfCols; ++i) { - SAnalyticsColBuf *pCol = &pBuf->pCols[i]; - snprintf(pCol->fileName, sizeof(pCol->fileName), "%s-c%d", pBuf->fileName, i); - pCol->filePtr = - taosOpenFile(pCol->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); - if (pCol->filePtr == NULL) { - return terrno; - } - } - - return taosAnalJsonBufWriteStart(pBuf); -} - -static int32_t taosAnalJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { - char buf[128] = {0}; - bool first = (colIndex == 0); - bool last = (colIndex == pBuf->numOfCols - 1); - - if (first) { - if (taosAnalJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) { - return terrno; - } - } - - int32_t bufLen = tsnprintf(buf, sizeof(buf), " [\"%s\", \"%s\", %d]%s\n", colName, tDataTypes[colType].name, - tDataTypes[colType].bytes, last ? "" : ","); - if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { - return terrno; - } - - if (last) { - if (taosAnalJsonBufWriteStr(pBuf, "],\n", 0) != 0) { - return terrno; - } - } - - return 0; -} - -static int32_t taosAnalJsonBufWriteDataBegin(SAnalyticBuf *pBuf) { - return taosAnalJsonBufWriteStr(pBuf, "\"data\": [\n", 0); -} - -static int32_t taosAnalJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) { - if (bufLen <= 0) { - bufLen = strlen(buf); - } - - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) { - int32_t ret = taosWriteFile(pBuf->filePtr, buf, bufLen); - if (ret != bufLen) { - return terrno; - } - } else { - int32_t ret = taosWriteFile(pBuf->pCols[colIndex].filePtr, buf, bufLen); - if (ret != bufLen) { - return terrno; - } - } - - return 0; -} - -static int32_t taosAnalJsonBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { - return taosAnalJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex); -} - -static int32_t taosAnalJsonBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { - if (colIndex == pBuf->numOfCols - 1) { - return taosAnalJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex); - - } else { - return taosAnalJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex); - } -} - -static int32_t taosAnalJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { - char buf[64]; - int32_t bufLen = 0; - - if (pBuf->pCols[colIndex].numOfRows != 0) { - buf[bufLen] = ','; - buf[bufLen + 1] = '\n'; - buf[bufLen + 2] = 0; - bufLen += 2; - } - - switch (colType) { - case TSDB_DATA_TYPE_BOOL: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", (*((int8_t *)colValue) == 1) ? 1 : 0); - break; - case TSDB_DATA_TYPE_TINYINT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int8_t *)colValue); - break; - case TSDB_DATA_TYPE_UTINYINT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint8_t *)colValue); - break; - case TSDB_DATA_TYPE_SMALLINT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int16_t *)colValue); - break; - case TSDB_DATA_TYPE_USMALLINT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint16_t *)colValue); - break; - case TSDB_DATA_TYPE_INT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int32_t *)colValue); - break; - case TSDB_DATA_TYPE_UINT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint32_t *)colValue); - break; - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_TIMESTAMP: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRId64 "", *(int64_t *)colValue); - break; - case TSDB_DATA_TYPE_UBIGINT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRIu64 "", *(uint64_t *)colValue); - break; - case TSDB_DATA_TYPE_FLOAT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_FLOAT_VAL(colValue)); - break; - case TSDB_DATA_TYPE_DOUBLE: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_DOUBLE_VAL(colValue)); - break; - default: - buf[bufLen] = '\0'; - } - - pBuf->pCols[colIndex].numOfRows++; - return taosAnalJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex); -} - -static int32_t taosAnalJsonBufWriteDataEnd(SAnalyticBuf *pBuf) { - int32_t code = 0; - char *pCont = NULL; - int64_t contLen = 0; - - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - for (int32_t i = 0; i < pBuf->numOfCols; ++i) { - SAnalyticsColBuf *pCol = &pBuf->pCols[i]; - - code = taosFsyncFile(pCol->filePtr); - if (code != 0) return code; - - code = taosCloseFile(&pCol->filePtr); - if (code != 0) return code; - - code = taosAnalJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen); - if (code != 0) return code; - - code = taosAnalJsonBufWriteStr(pBuf, pCont, contLen); - if (code != 0) return code; - - taosMemoryFreeClear(pCont); - contLen = 0; - } - } - - return taosAnalJsonBufWriteStr(pBuf, "],\n", 0); -} - -static int32_t taosAnalJsonBufWriteEnd(SAnalyticBuf *pBuf) { - int32_t code = taosAnalJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows); - if (code != 0) return code; - - return taosAnalJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0); -} - -int32_t taosAnalJsonBufClose(SAnalyticBuf *pBuf) { - int32_t code = taosAnalJsonBufWriteEnd(pBuf); - if (code != 0) return code; - - if (pBuf->filePtr != NULL) { - code = taosFsyncFile(pBuf->filePtr); - if (code != 0) return code; - code = taosCloseFile(&pBuf->filePtr); - if (code != 0) return code; - } - - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - for (int32_t i = 0; i < pBuf->numOfCols; ++i) { - SAnalyticsColBuf *pCol = &pBuf->pCols[i]; - if (pCol->filePtr != NULL) { - code = taosFsyncFile(pCol->filePtr); - if (code != 0) return code; - code = taosCloseFile(&pCol->filePtr); - if (code != 0) return code; - } - } - } - - return 0; -} - -void taosAnalBufDestroy(SAnalyticBuf *pBuf) { - if (pBuf->fileName[0] != 0) { - if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr); - // taosRemoveFile(pBuf->fileName); - pBuf->fileName[0] = 0; - } - - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - for (int32_t i = 0; i < pBuf->numOfCols; ++i) { - SAnalyticsColBuf *pCol = &pBuf->pCols[i]; - if (pCol->fileName[0] != 0) { - if (pCol->filePtr != NULL) (void)taosCloseFile(&pCol->filePtr); - if (taosRemoveFile(pCol->fileName) != 0) { - uError("failed to remove file %s", pCol->fileName); - } - pCol->fileName[0] = 0; - } - } - } - - taosMemoryFreeClear(pBuf->pCols); - pBuf->numOfCols = 0; -} - -int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return tsosAnalJsonBufOpen(pBuf, numOfCols); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteOptStr(pBuf, optName, optVal); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteOptInt(pBuf, optName, optVal); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteOptFloat(pBuf, optName, optVal); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteColMeta(pBuf, colIndex, colType, colName); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteDataBegin(pBuf); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteColBegin(pBuf, colIndex); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteColData(pBuf, colIndex, colType, colValue); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteColEnd(pBuf, colIndex); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteDataEnd(pBuf); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufClose(pBuf); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen) { - *ppCont = NULL; - *pContLen = 0; - - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufGetCont(pBuf->fileName, ppCont, pContLen); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -#else - -int32_t taosAnalyticsInit() { return 0; } -void taosAnalyticsCleanup() {} -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; } - -int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } -bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; } -bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) { return true; } -int64_t taosAnalGetVersion() { return 0; } -void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {} - -int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; } -int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; } -int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; } -int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; } -int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; } -int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; } -int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } -int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; } -int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } -int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; } -int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { return 0; } -void taosAnalBufDestroy(SAnalyticBuf *pBuf) {} - -const char *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; } -EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; } -const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; } - -#endif \ No newline at end of file From 9aa36c24d498b8d03715cf9310c47deee8229352 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 22 Jan 2025 11:13:51 +0800 Subject: [PATCH 2/9] enh(analysis): move tanalytics to common module. --- source/common/src/tanalytics.c | 794 +++++++++++++++++++++++++++++++++ 1 file changed, 794 insertions(+) create mode 100644 source/common/src/tanalytics.c diff --git a/source/common/src/tanalytics.c b/source/common/src/tanalytics.c new file mode 100644 index 0000000000..aa9dd842ac --- /dev/null +++ b/source/common/src/tanalytics.c @@ -0,0 +1,794 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "tanalytics.h" +#include "ttypes.h" +#include "tutil.h" + +#include +#define ANALYTICS_ALOG_SPLIT_CHAR "," + +typedef struct { + int64_t ver; + SHashObj *hash; // algoname:algotype -> SAnalyticsUrl + TdThreadMutex lock; +} SAlgoMgmt; + +typedef struct { + char *data; + int64_t dataLen; +} SCurlResp; + +static SAlgoMgmt tsAlgos = {0}; +static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen); + +const char *taosAnalAlgoStr(EAnalAlgoType type) { + switch (type) { + case ANAL_ALGO_TYPE_ANOMALY_DETECT: + return "anomaly-detection"; + case ANAL_ALGO_TYPE_FORECAST: + return "forecast"; + default: + return "unknown"; + } +} + +const char *taosAnalAlgoUrlStr(EAnalAlgoType type) { + switch (type) { + case ANAL_ALGO_TYPE_ANOMALY_DETECT: + return "anomaly-detect"; + case ANAL_ALGO_TYPE_FORECAST: + return "forecast"; + default: + return "unknown"; + } +} + +EAnalAlgoType taosAnalAlgoInt(const char *name) { + for (EAnalAlgoType i = 0; i < ANAL_ALGO_TYPE_END; ++i) { + if (strcasecmp(name, taosAnalAlgoStr(i)) == 0) { + return i; + } + } + + return ANAL_ALGO_TYPE_END; +} + +int32_t taosAnalyticsInit() { + if (curl_global_init(CURL_GLOBAL_ALL) != 0) { + uError("failed to init curl"); + return -1; + } + + tsAlgos.ver = 0; + if (taosThreadMutexInit(&tsAlgos.lock, NULL) != 0) { + uError("failed to init algo mutex"); + return -1; + } + + tsAlgos.hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); + if (tsAlgos.hash == NULL) { + uError("failed to init algo hash"); + return -1; + } + + uInfo("analysis env is initialized"); + return 0; +} + +static void taosAnalFreeHash(SHashObj *hash) { + void *pIter = taosHashIterate(hash, NULL); + while (pIter != NULL) { + SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter; + taosMemoryFree(pUrl->url); + pIter = taosHashIterate(hash, pIter); + } + taosHashCleanup(hash); +} + +void taosAnalyticsCleanup() { + curl_global_cleanup(); + if (taosThreadMutexDestroy(&tsAlgos.lock) != 0) { + uError("failed to destroy anal lock"); + } + taosAnalFreeHash(tsAlgos.hash); + tsAlgos.hash = NULL; + uInfo("analysis env is cleaned up"); +} + +void taosAnalUpdate(int64_t newVer, SHashObj *pHash) { + if (newVer > tsAlgos.ver) { + if (taosThreadMutexLock(&tsAlgos.lock) == 0) { + SHashObj *hash = tsAlgos.hash; + tsAlgos.ver = newVer; + tsAlgos.hash = pHash; + if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) { + uError("failed to unlock hash") + } + taosAnalFreeHash(hash); + } + } else { + taosAnalFreeHash(pHash); + } +} + +bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { + char buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0}; + char *pStart = NULL; + char *pEnd = NULL; + + pStart = strstr(option, optName); + if (pStart == NULL) { + return false; + } + + pEnd = strstr(pStart, ANALYTICS_ALOG_SPLIT_CHAR); + if (optMaxLen > 0) { + if (pEnd > pStart) { + int32_t len = (int32_t)(pEnd - pStart); + len = MIN(len + 1, TSDB_ANALYTIC_ALGO_OPTION_LEN); + tstrncpy(buf, pStart, len); + } else { + int32_t len = MIN(tListLen(buf), strlen(pStart) + 1); + tstrncpy(buf, pStart, len); + } + + char *pRight = strstr(buf, "="); + if (pRight == NULL) { + return false; + } else { + pRight += 1; + } + + int32_t unused = strtrim(pRight); + + int32_t vLen = MIN(optMaxLen, strlen(pRight) + 1); + tstrncpy(optValue, pRight, vLen); + } + + return true; +} + +bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) { + char buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0}; + int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName); + + char *pos1 = strstr(option, buf); + char *pos2 = strstr(option, ANALYTICS_ALOG_SPLIT_CHAR); + if (pos1 != NULL) { + *optValue = taosStr2Int64(pos1 + bufLen, NULL, 10); + return true; + } else { + return false; + } +} + +int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { + int32_t code = 0; + char name[TSDB_ANALYTIC_ALGO_KEY_LEN] = {0}; + int32_t nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", type, algoName); + + char *unused = strntolower(name, name, nameLen); + + if (taosThreadMutexLock(&tsAlgos.lock) == 0) { + SAnalyticsUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen); + if (pUrl != NULL) { + tstrncpy(url, pUrl->url, urlLen); + uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url); + } else { + url[0] = 0; + terrno = TSDB_CODE_ANA_ALGO_NOT_FOUND; + code = terrno; + uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type)); + } + + if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) { + uError("failed to unlock hash"); + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + return code; +} + +int64_t taosAnalGetVersion() { return tsAlgos.ver; } + +static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) { + SCurlResp *pRsp = userdata; + if (contLen == 0 || nmemb == 0 || pCont == NULL) { + pRsp->dataLen = 0; + pRsp->data = NULL; + uError("curl response is received, len:%" PRId64, pRsp->dataLen); + return 0; + } + + int64_t newDataSize = (int64_t) contLen * nmemb; + int64_t size = pRsp->dataLen + newDataSize; + + if (pRsp->data == NULL) { + pRsp->data = taosMemoryMalloc(size + 1); + if (pRsp->data == NULL) { + uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno)); + return 0; // return the recv length, if failed, return 0 + } + } else { + char* p = taosMemoryRealloc(pRsp->data, size + 1); + if (p == NULL) { + uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno)); + return 0; // return the recv length, if failed, return 0 + } + + pRsp->data = p; + } + + if (pRsp->data != NULL) { + (void)memcpy(pRsp->data + pRsp->dataLen, pCont, newDataSize); + + pRsp->dataLen = size; + pRsp->data[size] = 0; + + uDebugL("curl response is received, len:%" PRId64 ", content:%s", size, pRsp->data); + return newDataSize; + } else { + pRsp->dataLen = 0; + uError("failed to malloc curl response"); + return 0; + } +} + +static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp) { + CURL *curl = NULL; + CURLcode code = 0; + + curl = curl_easy_init(); + if (curl == NULL) { + uError("failed to create curl handle"); + return -1; + } + + if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER; + if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER; + if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER; + if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 100) != 0) goto _OVER; + + uDebug("curl get request will sent, url:%s", url); + code = curl_easy_perform(curl); + if (code != CURLE_OK) { + uError("failed to perform curl action, code:%d", code); + } + +_OVER: + if (curl != NULL) curl_easy_cleanup(curl); + return code; +} + +static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen) { + struct curl_slist *headers = NULL; + CURL *curl = NULL; + CURLcode code = 0; + + curl = curl_easy_init(); + if (curl == NULL) { + uError("failed to create curl handle"); + return -1; + } + + headers = curl_slist_append(headers, "Content-Type:application/json;charset=UTF-8"); + if (curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers) != 0) goto _OVER; + if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER; + if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER; + if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER; + if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 60000) != 0) goto _OVER; + if (curl_easy_setopt(curl, CURLOPT_POST, 1) != 0) goto _OVER; + if (curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen) != 0) goto _OVER; + if (curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf) != 0) goto _OVER; + + uDebugL("curl post request will sent, url:%s len:%d content:%s", url, bufLen, buf); + code = curl_easy_perform(curl); + if (code != CURLE_OK) { + uError("failed to perform curl action, code:%d", code); + } + +_OVER: + if (curl != NULL) { + curl_slist_free_all(headers); + curl_easy_cleanup(curl); + } + return code; +} + +SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { + int32_t code = -1; + char *pCont = NULL; + int64_t contentLen; + SJson *pJson = NULL; + SCurlResp curlRsp = {0}; + + if (type == ANALYTICS_HTTP_TYPE_GET) { + if (taosCurlGetRequest(url, &curlRsp) != 0) { + terrno = TSDB_CODE_ANA_URL_CANT_ACCESS; + goto _OVER; + } + } else { + code = taosAnalBufGetCont(pBuf, &pCont, &contentLen); + if (code != 0) { + terrno = code; + goto _OVER; + } + if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen) != 0) { + terrno = TSDB_CODE_ANA_URL_CANT_ACCESS; + goto _OVER; + } + } + + if (curlRsp.data == NULL || curlRsp.dataLen == 0) { + terrno = TSDB_CODE_ANA_URL_RSP_IS_NULL; + goto _OVER; + } + + pJson = tjsonParse(curlRsp.data); + if (pJson == NULL) { + if (curlRsp.data[0] == '<') { + terrno = TSDB_CODE_ANA_ANODE_RETURN_ERROR; + } else { + terrno = TSDB_CODE_INVALID_JSON_FORMAT; + } + goto _OVER; + } + +_OVER: + if (curlRsp.data != NULL) taosMemoryFreeClear(curlRsp.data); + if (pCont != NULL) taosMemoryFree(pCont); + return pJson; +} + +static int32_t taosAnalJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) { + int32_t code = 0; + int64_t contLen; + char *pCont = NULL; + TdFilePtr pFile = NULL; + + pFile = taosOpenFile(fileName, TD_FILE_READ); + if (pFile == NULL) { + code = terrno; + goto _OVER; + } + + code = taosFStatFile(pFile, &contLen, NULL); + if (code != 0) goto _OVER; + + pCont = taosMemoryMalloc(contLen + 1); + if (pCont == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + if (taosReadFile(pFile, pCont, contLen) != contLen) { + code = terrno; + goto _OVER; + } + + pCont[contLen] = '\0'; + +_OVER: + if (code == 0) { + *ppCont = pCont; + *pContLen = contLen; + } else { + if (pCont != NULL) taosMemoryFree(pCont); + } + if (pFile != NULL) taosCloseFile(&pFile); + return code; +} + +static int32_t taosAnalJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { + char buf[64] = {0}; + int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal); + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + return 0; +} + +static int32_t taosAnalJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { + char buf[128] = {0}; + int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": \"%s\",\n", optName, optVal); + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + return 0; +} + +static int32_t taosAnalJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { + char buf[128] = {0}; + int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal); + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + return 0; +} + +static int32_t taosAnalJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen) { + if (bufLen <= 0) { + bufLen = strlen(buf); + } + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + return 0; +} + +static int32_t taosAnalJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "{\n", 0); } + +static int32_t tsosAnalJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { + pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); + if (pBuf->filePtr == NULL) { + return terrno; + } + + pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalyticsColBuf)); + if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY; + pBuf->numOfCols = numOfCols; + + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) { + return taosAnalJsonBufWriteStart(pBuf); + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SAnalyticsColBuf *pCol = &pBuf->pCols[i]; + snprintf(pCol->fileName, sizeof(pCol->fileName), "%s-c%d", pBuf->fileName, i); + pCol->filePtr = + taosOpenFile(pCol->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); + if (pCol->filePtr == NULL) { + return terrno; + } + } + + return taosAnalJsonBufWriteStart(pBuf); +} + +static int32_t taosAnalJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { + char buf[128] = {0}; + bool first = (colIndex == 0); + bool last = (colIndex == pBuf->numOfCols - 1); + + if (first) { + if (taosAnalJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) { + return terrno; + } + } + + int32_t bufLen = tsnprintf(buf, sizeof(buf), " [\"%s\", \"%s\", %d]%s\n", colName, tDataTypes[colType].name, + tDataTypes[colType].bytes, last ? "" : ","); + if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { + return terrno; + } + + if (last) { + if (taosAnalJsonBufWriteStr(pBuf, "],\n", 0) != 0) { + return terrno; + } + } + + return 0; +} + +static int32_t taosAnalJsonBufWriteDataBegin(SAnalyticBuf *pBuf) { + return taosAnalJsonBufWriteStr(pBuf, "\"data\": [\n", 0); +} + +static int32_t taosAnalJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) { + if (bufLen <= 0) { + bufLen = strlen(buf); + } + + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) { + int32_t ret = taosWriteFile(pBuf->filePtr, buf, bufLen); + if (ret != bufLen) { + return terrno; + } + } else { + int32_t ret = taosWriteFile(pBuf->pCols[colIndex].filePtr, buf, bufLen); + if (ret != bufLen) { + return terrno; + } + } + + return 0; +} + +static int32_t taosAnalJsonBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { + return taosAnalJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex); +} + +static int32_t taosAnalJsonBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { + if (colIndex == pBuf->numOfCols - 1) { + return taosAnalJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex); + + } else { + return taosAnalJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex); + } +} + +static int32_t taosAnalJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { + char buf[64]; + int32_t bufLen = 0; + + if (pBuf->pCols[colIndex].numOfRows != 0) { + buf[bufLen] = ','; + buf[bufLen + 1] = '\n'; + buf[bufLen + 2] = 0; + bufLen += 2; + } + + switch (colType) { + case TSDB_DATA_TYPE_BOOL: + bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", (*((int8_t *)colValue) == 1) ? 1 : 0); + break; + case TSDB_DATA_TYPE_TINYINT: + bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int8_t *)colValue); + break; + case TSDB_DATA_TYPE_UTINYINT: + bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint8_t *)colValue); + break; + case TSDB_DATA_TYPE_SMALLINT: + bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int16_t *)colValue); + break; + case TSDB_DATA_TYPE_USMALLINT: + bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint16_t *)colValue); + break; + case TSDB_DATA_TYPE_INT: + bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int32_t *)colValue); + break; + case TSDB_DATA_TYPE_UINT: + bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint32_t *)colValue); + break; + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_TIMESTAMP: + bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRId64 "", *(int64_t *)colValue); + break; + case TSDB_DATA_TYPE_UBIGINT: + bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRIu64 "", *(uint64_t *)colValue); + break; + case TSDB_DATA_TYPE_FLOAT: + bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_FLOAT_VAL(colValue)); + break; + case TSDB_DATA_TYPE_DOUBLE: + bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_DOUBLE_VAL(colValue)); + break; + default: + buf[bufLen] = '\0'; + } + + pBuf->pCols[colIndex].numOfRows++; + return taosAnalJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex); +} + +static int32_t taosAnalJsonBufWriteDataEnd(SAnalyticBuf *pBuf) { + int32_t code = 0; + char *pCont = NULL; + int64_t contLen = 0; + + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { + for (int32_t i = 0; i < pBuf->numOfCols; ++i) { + SAnalyticsColBuf *pCol = &pBuf->pCols[i]; + + code = taosFsyncFile(pCol->filePtr); + if (code != 0) return code; + + code = taosCloseFile(&pCol->filePtr); + if (code != 0) return code; + + code = taosAnalJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen); + if (code != 0) return code; + + code = taosAnalJsonBufWriteStr(pBuf, pCont, contLen); + if (code != 0) return code; + + taosMemoryFreeClear(pCont); + contLen = 0; + } + } + + return taosAnalJsonBufWriteStr(pBuf, "],\n", 0); +} + +static int32_t taosAnalJsonBufWriteEnd(SAnalyticBuf *pBuf) { + int32_t code = taosAnalJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows); + if (code != 0) return code; + + return taosAnalJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0); +} + +int32_t taosAnalJsonBufClose(SAnalyticBuf *pBuf) { + int32_t code = taosAnalJsonBufWriteEnd(pBuf); + if (code != 0) return code; + + if (pBuf->filePtr != NULL) { + code = taosFsyncFile(pBuf->filePtr); + if (code != 0) return code; + code = taosCloseFile(&pBuf->filePtr); + if (code != 0) return code; + } + + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { + for (int32_t i = 0; i < pBuf->numOfCols; ++i) { + SAnalyticsColBuf *pCol = &pBuf->pCols[i]; + if (pCol->filePtr != NULL) { + code = taosFsyncFile(pCol->filePtr); + if (code != 0) return code; + code = taosCloseFile(&pCol->filePtr); + if (code != 0) return code; + } + } + } + + return 0; +} + +void taosAnalBufDestroy(SAnalyticBuf *pBuf) { + if (pBuf->fileName[0] != 0) { + if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr); + // taosRemoveFile(pBuf->fileName); + pBuf->fileName[0] = 0; + } + + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { + for (int32_t i = 0; i < pBuf->numOfCols; ++i) { + SAnalyticsColBuf *pCol = &pBuf->pCols[i]; + if (pCol->fileName[0] != 0) { + if (pCol->filePtr != NULL) (void)taosCloseFile(&pCol->filePtr); + if (taosRemoveFile(pCol->fileName) != 0) { + uError("failed to remove file %s", pCol->fileName); + } + pCol->fileName[0] = 0; + } + } + } + + taosMemoryFreeClear(pBuf->pCols); + pBuf->numOfCols = 0; +} + +int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { + return tsosAnalJsonBufOpen(pBuf, numOfCols); + } else { + return TSDB_CODE_ANA_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteOptStr(pBuf, optName, optVal); + } else { + return TSDB_CODE_ANA_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteOptInt(pBuf, optName, optVal); + } else { + return TSDB_CODE_ANA_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteOptFloat(pBuf, optName, optVal); + } else { + return TSDB_CODE_ANA_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteColMeta(pBuf, colIndex, colType, colName); + } else { + return TSDB_CODE_ANA_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteDataBegin(pBuf); + } else { + return TSDB_CODE_ANA_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteColBegin(pBuf, colIndex); + } else { + return TSDB_CODE_ANA_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteColData(pBuf, colIndex, colType, colValue); + } else { + return TSDB_CODE_ANA_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteColEnd(pBuf, colIndex); + } else { + return TSDB_CODE_ANA_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufWriteDataEnd(pBuf); + } else { + return TSDB_CODE_ANA_BUF_INVALID_TYPE; + } +} + +int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufClose(pBuf); + } else { + return TSDB_CODE_ANA_BUF_INVALID_TYPE; + } +} + +static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen) { + *ppCont = NULL; + *pContLen = 0; + + if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { + return taosAnalJsonBufGetCont(pBuf->fileName, ppCont, pContLen); + } else { + return TSDB_CODE_ANA_BUF_INVALID_TYPE; + } +} + +//#else +// +//int32_t taosAnalyticsInit() { return 0; } +//void taosAnalyticsCleanup() {} +//SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; } +// +//int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } +//bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; } +//bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) { return true; } +//int64_t taosAnalGetVersion() { return 0; } +//void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {} +// +//int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; } +//int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; } +//int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; } +//int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; } +//int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; } +//int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; } +//int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } +//int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; } +//int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } +//int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; } +//int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { return 0; } +//void taosAnalBufDestroy(SAnalyticBuf *pBuf) {} +// +//const char *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; } +//EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; } +//const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; } +// +//#endif \ No newline at end of file From 237be3319e1811db8336207b15339a940d701d53 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 22 Jan 2025 13:07:35 +0800 Subject: [PATCH 3/9] fix(analysis): fix compiler error on windows. --- source/common/CMakeLists.txt | 46 ++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index 6d2648502d..047e8be193 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -47,35 +47,39 @@ if(${TD_WINDOWS}) PRIVATE "${TD_SOURCE_DIR}/contrib/pthread" PRIVATE "${TD_SOURCE_DIR}/contrib/msvcregex" ) + + target_link_libraries( + common + + PUBLIC os + PUBLIC util + INTERFACE api + ) + +else() + find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) + find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) + find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) + + target_link_libraries( + common + + PUBLIC ${CURL_LIBRARY} + PUBLIC ${SSL_LIBRARY} + PUBLIC ${CRYPTO_LIBRARY} + + PUBLIC os + PUBLIC util + INTERFACE api + ) endif() -find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) -find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) -find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) - -target_link_libraries( - common - - PUBLIC ${CURL_LIBRARY} - PUBLIC ${SSL_LIBRARY} - PUBLIC ${CRYPTO_LIBRARY} - - PUBLIC os - PUBLIC util - INTERFACE api -) - - - if(${BUILD_S3}) if(${BUILD_WITH_S3}) set(CMAKE_FIND_LIBRARY_SUFFIXES ".a") set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2) find_library(S3_LIBRARY s3) -# find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) find_library(XML2_LIBRARY xml2) -# find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) -# find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) target_link_libraries( common From d6dc966ed12ff5cc697d0b46c2d2fe77f6a7a4ed Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 22 Jan 2025 15:12:43 +0800 Subject: [PATCH 4/9] fix(analysis): fix compiler error on windows. --- cmake/cmake.options | 11 +-- source/common/CMakeLists.txt | 8 +- source/common/src/tanalytics.c | 78 ++++++++++--------- source/dnode/mnode/impl/CMakeLists.txt | 6 +- source/dnode/mnode/impl/src/mndAnode.c | 26 ++++++- .../libs/executor/src/anomalywindowoperator.c | 12 +++ source/libs/executor/src/forecastoperator.c | 74 ++++++++++-------- 7 files changed, 128 insertions(+), 87 deletions(-) diff --git a/cmake/cmake.options b/cmake/cmake.options index 4c69544466..3e655b1796 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -166,7 +166,9 @@ IF(${BUILD_WITH_ANALYSIS}) set(BUILD_WITH_S3 ON) ENDIF() -set(BUILD_WITH_ANALYSIS ON) +IF(${TD_LINUX}) + set(BUILD_WITH_ANALYSIS ON) +ENDIF() IF(${BUILD_S3}) @@ -207,13 +209,6 @@ option( off ) - -option( - BUILD_WITH_NURAFT - "If build with NuRaft" - OFF -) - option( BUILD_WITH_UV "If build with libuv" diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index 047e8be193..33a6099daf 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -49,11 +49,11 @@ if(${TD_WINDOWS}) ) target_link_libraries( - common + common - PUBLIC os - PUBLIC util - INTERFACE api + PUBLIC os + PUBLIC util + INTERFACE api ) else() diff --git a/source/common/src/tanalytics.c b/source/common/src/tanalytics.c index aa9dd842ac..c78e979b99 100644 --- a/source/common/src/tanalytics.c +++ b/source/common/src/tanalytics.c @@ -16,8 +16,8 @@ #define _DEFAULT_SOURCE #include "tanalytics.h" #include "ttypes.h" -#include "tutil.h" +#ifdef USE_ANALYTICS #include #define ANALYTICS_ALOG_SPLIT_CHAR "," @@ -215,20 +215,20 @@ static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void return 0; } - int64_t newDataSize = (int64_t) contLen * nmemb; + int64_t newDataSize = (int64_t)contLen * nmemb; int64_t size = pRsp->dataLen + newDataSize; if (pRsp->data == NULL) { pRsp->data = taosMemoryMalloc(size + 1); if (pRsp->data == NULL) { - uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno)); - return 0; // return the recv length, if failed, return 0 + uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno)); + return 0; // return the recv length, if failed, return 0 } } else { - char* p = taosMemoryRealloc(pRsp->data, size + 1); + char *p = taosMemoryRealloc(pRsp->data, size + 1); if (p == NULL) { - uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno)); - return 0; // return the recv length, if failed, return 0 + uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno)); + return 0; // return the recv length, if failed, return 0 } pRsp->data = p; @@ -472,7 +472,7 @@ static int32_t taosAnalJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, } int32_t bufLen = tsnprintf(buf, sizeof(buf), " [\"%s\", \"%s\", %d]%s\n", colName, tDataTypes[colType].name, - tDataTypes[colType].bytes, last ? "" : ","); + tDataTypes[colType].bytes, last ? "" : ","); if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { return terrno; } @@ -762,33 +762,35 @@ static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pC } } -//#else -// -//int32_t taosAnalyticsInit() { return 0; } -//void taosAnalyticsCleanup() {} -//SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; } -// -//int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } -//bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; } -//bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) { return true; } -//int64_t taosAnalGetVersion() { return 0; } -//void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {} -// -//int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; } -//int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; } -//int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; } -//int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; } -//int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; } -//int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; } -//int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } -//int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; } -//int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } -//int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; } -//int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { return 0; } -//void taosAnalBufDestroy(SAnalyticBuf *pBuf) {} -// -//const char *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; } -//EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; } -//const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; } -// -//#endif \ No newline at end of file +#else + +int32_t taosAnalyticsInit() { return 0; } +void taosAnalyticsCleanup() {} +SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; } + +int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } +bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; } +bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) { return true; } +int64_t taosAnalGetVersion() { return 0; } +void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {} + +int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; } +int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; } +int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; } +int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; } +int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { + return 0; +} +int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; } +int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } +int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; } +int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } +int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; } +int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { return 0; } +void taosAnalBufDestroy(SAnalyticBuf *pBuf) {} + +const char *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; } +EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; } +const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; } + +#endif \ No newline at end of file diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index ad36d8c8ae..e4e184eee0 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -16,10 +16,10 @@ if(TD_ENTERPRISE) ELSEIF(${BUILD_WITH_COS}) add_definitions(-DUSE_COS) endif() +endif() - if(${BUILD_WITH_ANALYSIS}) - add_definitions(-DUSE_ANALYTICS) - endif() +if(${BUILD_WITH_ANALYSIS}) + add_definitions(-DUSE_ANALYTICS) endif() add_library(mnode STATIC ${MNODE_SRC}) diff --git a/source/dnode/mnode/impl/src/mndAnode.c b/source/dnode/mnode/impl/src/mndAnode.c index de3f7a607c..c64208600a 100644 --- a/source/dnode/mnode/impl/src/mndAnode.c +++ b/source/dnode/mnode/impl/src/mndAnode.c @@ -24,6 +24,8 @@ #include "tanalytics.h" #include "tjson.h" +#ifdef USE_ANALYTICS + #define TSDB_ANODE_VER_NUMBER 1 #define TSDB_ANODE_RESERVE_SIZE 64 @@ -877,4 +879,26 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) { _OVER: tFreeRetrieveAnalAlgoRsp(&rsp); TAOS_RETURN(code); -} \ No newline at end of file +} + +#else + +static int32_t mndProcessUnsupportReq(SRpcMsg *pReq) { return TSDB_CODE_OPS_NOT_SUPPORT; } +static int32_t mndRetrieveUnsupport(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { + return TSDB_CODE_OPS_NOT_SUPPORT; +} + +int32_t mndInitAnode(SMnode *pMnode) { + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ANODE, mndProcessUnsupportReq); + mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_ANODE, mndProcessUnsupportReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_ANODE, mndProcessUnsupportReq); + mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_ANAL_ALGO, mndProcessUnsupportReq); + + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ANODE, mndRetrieveUnsupport); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ANODE_FULL, mndRetrieveUnsupport); + return 0; +} + +void mndCleanupAnode(SMnode *pMnode) {} + +#endif \ No newline at end of file diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index 8e539d52d9..3124fa0b57 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -26,6 +26,8 @@ #include "tjson.h" #include "ttime.h" +#ifdef USE_ANALYTICS + typedef struct { SArray* blocks; // SSDataBlock* SArray* windows; // STimeWindow @@ -657,3 +659,13 @@ _OVER: return code; } + +#else + +int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, + SOperatorInfo** pOptrInfo) { + return TSDB_CODE_OPS_NOT_SUPPORT; +} +void destroyForecastInfo(void* param) {} + +#endif \ No newline at end of file diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index 9eecad8644..02b122830c 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -12,13 +12,12 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ + #include "executorInt.h" #include "filter.h" -#include "function.h" #include "functionMgt.h" #include "operator.h" #include "querytask.h" -#include "storageapi.h" #include "tanalytics.h" #include "tcommon.h" #include "tcompare.h" @@ -26,25 +25,27 @@ #include "tfill.h" #include "ttime.h" +#ifdef USE_ANALYTICS + typedef struct { - char algoName[TSDB_ANALYTIC_ALGO_NAME_LEN]; - char algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN]; - char algoOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN]; - int64_t maxTs; - int64_t minTs; - int64_t numOfRows; - uint64_t groupId; - int64_t optRows; - int64_t cachedRows; - int32_t numOfBlocks; - int16_t resTsSlot; - int16_t resValSlot; - int16_t resLowSlot; - int16_t resHighSlot; - int16_t inputTsSlot; - int16_t inputValSlot; - int8_t inputValType; - int8_t inputPrecision; + char algoName[TSDB_ANALYTIC_ALGO_NAME_LEN]; + char algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN]; + char algoOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN]; + int64_t maxTs; + int64_t minTs; + int64_t numOfRows; + uint64_t groupId; + int64_t optRows; + int64_t cachedRows; + int32_t numOfBlocks; + int16_t resTsSlot; + int16_t resValSlot; + int16_t resLowSlot; + int16_t resHighSlot; + int16_t inputTsSlot; + int16_t inputValSlot; + int8_t inputValType; + int8_t inputPrecision; SAnalyticBuf analBuf; } SForecastSupp; @@ -116,7 +117,7 @@ static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, con static int32_t forecastCloseBuf(SForecastSupp* pSupp) { SAnalyticBuf* pBuf = &pSupp->analBuf; - int32_t code = 0; + int32_t code = 0; for (int32_t i = 0; i < 2; ++i) { code = taosAnalBufWriteColEnd(pBuf, i); @@ -176,7 +177,6 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp) { code = taosAnalBufWriteOptInt(pBuf, "start", start); if (code != 0) return code; - bool hasEvery = taosAnalGetOptInt(pSupp->algoOpt, "every", &every); if (!hasEvery) { qDebug("forecast every not found from %s, use %" PRId64, pSupp->algoOpt, every); @@ -190,14 +190,14 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp) { static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) { SAnalyticBuf* pBuf = &pSupp->analBuf; - int32_t resCurRow = pBlock->info.rows; - int8_t tmpI8; - int16_t tmpI16; - int32_t tmpI32; - int64_t tmpI64; - float tmpFloat; - double tmpDouble; - int32_t code = 0; + int32_t resCurRow = pBlock->info.rows; + int8_t tmpI8; + int16_t tmpI16; + int32_t tmpI32; + int64_t tmpI64; + float tmpFloat; + double tmpDouble; + int32_t code = 0; SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot); if (NULL == pResValCol) { @@ -354,8 +354,8 @@ _OVER: } static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock, const char* pId) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SAnalyticBuf* pBuf = &pSupp->analBuf; code = forecastCloseBuf(pSupp); @@ -540,7 +540,7 @@ static int32_t forecastParseAlgo(SForecastSupp* pSupp) { static int32_t forecastCreateBuf(SForecastSupp* pSupp) { SAnalyticBuf* pBuf = &pSupp->analBuf; - int64_t ts = 0; // taosGetTimestampMs(); + int64_t ts = 0; // taosGetTimestampMs(); pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL; snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-forecast-%" PRId64, tsTempDir, ts); @@ -660,3 +660,11 @@ static void destroyForecastInfo(void* param) { taosMemoryFreeClear(param); } +#else + +int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, + SOperatorInfo** pOptrInfo) { + return TSDB_CODE_OPS_NOT_SUPPORT; +} + +#endif From 30ae443ce0289b1a8353d62ed123db112e7b6ca4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 22 Jan 2025 19:14:05 +0800 Subject: [PATCH 5/9] fix(analysis): fix deploy error. --- contrib/CMakeLists.txt | 8 +++++--- contrib/test/CMakeLists.txt | 8 -------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 78eded3928..9c719eb68d 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -17,7 +17,6 @@ elseif(${BUILD_WITH_COS}) file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.1/) cat("${TD_SUPPORT_DIR}/mxml_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3}) cat("${TD_SUPPORT_DIR}/apr_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3}) - cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3}) endif(${BUILD_WITH_COS}) configure_file(${CONTRIB_TMP_FILE3} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt") @@ -148,9 +147,7 @@ endif(${BUILD_WITH_SQLITE}) # s3 if(${BUILD_WITH_S3}) - cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/xml2_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) - cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/libs3_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/azure_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) add_definitions(-DUSE_S3) @@ -183,6 +180,11 @@ if(${BUILD_GEOS}) cat("${TD_SUPPORT_DIR}/geos_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) endif() +if (${BUILD_WITH_ANALYSIS}) + cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) + cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) +endif() + # if(${BUILD_PCRE2}) cat("${TD_SUPPORT_DIR}/pcre2_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) diff --git a/contrib/test/CMakeLists.txt b/contrib/test/CMakeLists.txt index 5d613dfed2..318d00b92c 100644 --- a/contrib/test/CMakeLists.txt +++ b/contrib/test/CMakeLists.txt @@ -20,14 +20,6 @@ if(${BUILD_WITH_SQLITE}) add_subdirectory(sqlite) endif(${BUILD_WITH_SQLITE}) -if(${BUILD_WITH_CRAFT}) - add_subdirectory(craft) -endif(${BUILD_WITH_CRAFT}) - -if(${BUILD_WITH_TRAFT}) - # add_subdirectory(traft) -endif(${BUILD_WITH_TRAFT}) - if(${BUILD_S3}) add_subdirectory(azure) endif() From 883b1b79a2a6b8ac1121fc426770cece18cac383 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 23 Jan 2025 00:29:18 +0800 Subject: [PATCH 6/9] enh(analysis): add the algorithm in telemetry report. --- include/common/tanalytics.h | 2 +- source/common/CMakeLists.txt | 4 ++ source/common/src/tanalytics.c | 11 ++-- source/dnode/mnode/impl/inc/mndAnode.h | 5 +- source/dnode/mnode/impl/src/mndAnode.c | 28 ++++++++- source/dnode/mnode/impl/src/mndMain.c | 2 +- source/dnode/mnode/impl/src/mndTelem.c | 79 +++++++++++++++++++++----- source/util/CMakeLists.txt | 4 -- 8 files changed, 108 insertions(+), 27 deletions(-) diff --git a/include/common/tanalytics.h b/include/common/tanalytics.h index 6ebdb38fa6..344093245b 100644 --- a/include/common/tanalytics.h +++ b/include/common/tanalytics.h @@ -86,7 +86,7 @@ int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf); int32_t taosAnalBufClose(SAnalyticBuf *pBuf); void taosAnalBufDestroy(SAnalyticBuf *pBuf); -const char *taosAnalAlgoStr(EAnalAlgoType algoType); +const char *taosAnalysisAlgoType(EAnalAlgoType algoType); EAnalAlgoType taosAnalAlgoInt(const char *algoName); const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType); diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index 33a6099daf..ac8fea90e5 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -15,6 +15,10 @@ if(DEFINED GRANT_CFG_INCLUDE_DIR) add_definitions(-DGRANTS_CFG) endif() +if(${BUILD_WITH_ANALYSIS}) + add_definitions(-DUSE_ANALYTICS) +endif() + if(TD_GRANT) ADD_DEFINITIONS(-D_GRANT) endif() diff --git a/source/common/src/tanalytics.c b/source/common/src/tanalytics.c index c78e979b99..0ed67eed0a 100644 --- a/source/common/src/tanalytics.c +++ b/source/common/src/tanalytics.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "tanalytics.h" #include "ttypes.h" +#include "tutil.h" #ifdef USE_ANALYTICS #include @@ -35,7 +36,7 @@ typedef struct { static SAlgoMgmt tsAlgos = {0}; static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen); -const char *taosAnalAlgoStr(EAnalAlgoType type) { +const char *taosAnalysisAlgoType(EAnalAlgoType type) { switch (type) { case ANAL_ALGO_TYPE_ANOMALY_DETECT: return "anomaly-detection"; @@ -59,7 +60,7 @@ const char *taosAnalAlgoUrlStr(EAnalAlgoType type) { EAnalAlgoType taosAnalAlgoInt(const char *name) { for (EAnalAlgoType i = 0; i < ANAL_ALGO_TYPE_END; ++i) { - if (strcasecmp(name, taosAnalAlgoStr(i)) == 0) { + if (strcasecmp(name, taosAnalysisAlgoType(i)) == 0) { return i; } } @@ -187,12 +188,12 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, SAnalyticsUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen); if (pUrl != NULL) { tstrncpy(url, pUrl->url, urlLen); - uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url); + uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalysisAlgoType(type), url); } else { url[0] = 0; terrno = TSDB_CODE_ANA_ALGO_NOT_FOUND; code = terrno; - uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type)); + uError("algo:%s, type:%s, url not found", algoName, taosAnalysisAlgoType(type)); } if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) { @@ -789,7 +790,7 @@ int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; } int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { return 0; } void taosAnalBufDestroy(SAnalyticBuf *pBuf) {} -const char *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; } +const char *taosAnalysisAlgoType(EAnalAlgoType algoType) { return 0; } EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; } const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; } diff --git a/source/dnode/mnode/impl/inc/mndAnode.h b/source/dnode/mnode/impl/inc/mndAnode.h index 63e8f9090e..d92d35a0fc 100644 --- a/source/dnode/mnode/impl/inc/mndAnode.h +++ b/source/dnode/mnode/impl/inc/mndAnode.h @@ -22,8 +22,9 @@ extern "C" { #endif -int32_t mndInitAnode(SMnode *pMnode); -void mndCleanupAnode(SMnode *pMnode); +int32_t mndInitAnode(SMnode* pMnode); +void mndCleanupAnode(SMnode* pMnode); +void mndRetrieveAlgoList(SMnode* pMnode, SArray* pFc, SArray* pAd); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndAnode.c b/source/dnode/mnode/impl/src/mndAnode.c index c64208600a..34c177e0c9 100644 --- a/source/dnode/mnode/impl/src/mndAnode.c +++ b/source/dnode/mnode/impl/src/mndAnode.c @@ -637,6 +637,32 @@ static void mndCancelGetNextAnode(SMnode *pMnode, void *pIter) { sdbCancelFetchByType(pSdb, pIter, SDB_ANODE); } +// todo handle multiple anode case, remove the duplicate algos +void mndRetrieveAlgoList(SMnode* pMnode, SArray* pFc, SArray* pAd) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SAnodeObj *pObj = NULL; + + while (1) { + pIter = sdbFetch(pSdb, SDB_ANODE, pIter, (void **)&pObj); + if (pIter == NULL) { + break; + } + + if (pObj->numOfAlgos >= ANAL_ALGO_TYPE_END) { + if (pObj->algos[ANAL_ALGO_TYPE_ANOMALY_DETECT] != NULL) { + taosArrayAddAll(pAd, pObj->algos[ANAL_ALGO_TYPE_ANOMALY_DETECT]); + } + + if (pObj->algos[ANAL_ALGO_TYPE_FORECAST] != NULL) { + taosArrayAddAll(pFc, pObj->algos[ANAL_ALGO_TYPE_FORECAST]); + } + } + + sdbRelease(pSdb, pObj); + } +} + static int32_t mndRetrieveAnodesFull(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -661,7 +687,7 @@ static int32_t mndRetrieveAnodesFull(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false); if (code != 0) goto _end; - STR_TO_VARSTR(buf, taosAnalAlgoStr(t)); + STR_TO_VARSTR(buf, taosAnalysisAlgoType(t)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); code = colDataSetVal(pColInfo, numOfRows, buf, false); if (code != 0) goto _end; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index b241af5adb..03ccd7b530 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -411,7 +411,7 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) { mndStreamConsensusChkpt(pMnode); } - if (sec % tsTelemInterval == (TMIN(86400, (tsTelemInterval - 1)))) { + if (sec % 20 == (TMIN(86400, (20 - 1)))) { mndPullupTelem(pMnode); } diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index bd613d7e69..3172abacf4 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -19,6 +19,7 @@ #include "mndSync.h" #include "thttp.h" #include "tjson.h" +#include "mndAnode.h" typedef struct { int64_t numOfDnode; @@ -32,6 +33,7 @@ typedef struct { int64_t totalPoints; int64_t totalStorage; int64_t compStorage; + int32_t numOfAnalysisAlgos; } SMnodeStat; static void mndGetStat(SMnode* pMnode, SMnodeStat* pStat) { @@ -58,18 +60,21 @@ static void mndGetStat(SMnode* pMnode, SMnodeStat* pStat) { sdbRelease(pSdb, pVgroup); } +} - pStat->numOfChildTable = 100; - pStat->numOfColumn = 200; - pStat->totalPoints = 300; - pStat->totalStorage = 400; - pStat->compStorage = 500; +static int32_t algoToJson(const void* pObj, SJson* pJson) { + const SAnodeAlgo* pNode = (const SAnodeAlgo*)pObj; + int32_t code = tjsonAddStringToObject(pJson, "name", pNode->name); + return code; } static void mndBuildRuntimeInfo(SMnode* pMnode, SJson* pJson) { SMnodeStat mstat = {0}; int32_t code = 0; int32_t lino = 0; + SArray* pFcList = NULL; + SArray* pAdList = NULL; + mndGetStat(pMnode, &mstat); TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfDnode", mstat.numOfDnode), &lino, _OVER); @@ -82,8 +87,51 @@ static void mndBuildRuntimeInfo(SMnode* pMnode, SJson* pJson) { TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfPoint", mstat.totalPoints), &lino, _OVER); TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "totalStorage", mstat.totalStorage), &lino, _OVER); TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "compStorage", mstat.compStorage), &lino, _OVER); + + pFcList = taosArrayInit(4, sizeof(SAnodeAlgo)); + pAdList = taosArrayInit(4, sizeof(SAnodeAlgo)); + if (pFcList == NULL || pAdList == NULL) { + lino = __LINE__; + goto _OVER; + } + + mndRetrieveAlgoList(pMnode, pFcList, pAdList); + + SJson *items = tjsonAddArrayToObject(pJson, "forecast"); + TSDB_CHECK_NULL(items, code, lino, _OVER, terrno); + + for(int32_t i = 0; i < taosArrayGetSize(pFcList); ++i) { + SJson *item = tjsonCreateObject(); + + TSDB_CHECK_NULL(item, code, lino, _OVER, terrno); + TAOS_CHECK_GOTO(tjsonAddItemToArray(items, item), &lino, _OVER); + + SAnodeAlgo *p = taosArrayGet(pFcList, i); + TSDB_CHECK_NULL(p, code, lino, _OVER, terrno); + TAOS_CHECK_GOTO(tjsonAddStringToObject(item, "name", p->name), &lino, _OVER); + } + + SJson *items1 = tjsonAddArrayToObject(pJson, "anomaly_detection"); + TSDB_CHECK_NULL(items1, code, lino, _OVER, terrno); + + for(int32_t i = 0; i < taosArrayGetSize(pAdList); ++i) { + SJson *item = tjsonCreateObject(); + + TSDB_CHECK_NULL(item, code, lino, _OVER, terrno); + TAOS_CHECK_GOTO(tjsonAddItemToArray(items1, item), &lino, _OVER); + + SAnodeAlgo *p = taosArrayGet(pAdList, i); + TSDB_CHECK_NULL(p, code, lino, _OVER, terrno); + TAOS_CHECK_GOTO(tjsonAddStringToObject(item, "name", p->name), &lino, _OVER); + } + _OVER: - if (code != 0) mError("failed to mndBuildRuntimeInfo at line:%d since %s", lino, tstrerror(code)); + taosArrayDestroy(pFcList); + taosArrayDestroy(pAdList); + + if (code != 0) { + mError("failed to mndBuildRuntimeInfo at line:%d since %s", lino, tstrerror(code)); + } } static char* mndBuildTelemetryReport(SMnode* pMnode) { @@ -136,21 +184,24 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) { int32_t line = 0; SMnode* pMnode = pReq->info.node; STelemMgmt* pMgmt = &pMnode->telemMgmt; - if (!tsEnableTelem) return 0; + + if (!tsEnableTelem) { + return 0; + } (void)taosThreadMutexLock(&pMgmt->lock); char* pCont = mndBuildTelemetryReport(pMnode); (void)taosThreadMutexUnlock(&pMgmt->lock); - if (pCont == NULL) { - return 0; - } + TSDB_CHECK_NULL(pCont, code, line, _end, terrno); + code = taosSendTelemReport(&pMgmt->addrMgt, tsTelemUri, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT); taosMemoryFree(pCont); return code; + _end: if (code != 0) { - mError("%s failed to send at line %d since %s", __func__, line, tstrerror(code)); + mError("%s failed to send telemetry report, line %d since %s", __func__, line, tstrerror(code)); } taosMemoryFree(pCont); return code; @@ -161,15 +212,17 @@ int32_t mndInitTelem(SMnode* pMnode) { STelemMgmt* pMgmt = &pMnode->telemMgmt; (void)taosThreadMutexInit(&pMgmt->lock, NULL); - if ((code = taosGetEmail(pMgmt->email, sizeof(pMgmt->email))) != 0) + if ((code = taosGetEmail(pMgmt->email, sizeof(pMgmt->email))) != 0) { mWarn("failed to get email since %s", tstrerror(code)); + } + code = taosTelemetryMgtInit(&pMgmt->addrMgt, tsTelemServer); if (code != 0) { mError("failed to init telemetry management since %s", tstrerror(code)); return code; } - mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer); + mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer); return 0; } diff --git a/source/util/CMakeLists.txt b/source/util/CMakeLists.txt index 2633bb3268..d606d83712 100644 --- a/source/util/CMakeLists.txt +++ b/source/util/CMakeLists.txt @@ -17,10 +17,6 @@ else() MESSAGE(STATUS "enable assert core") endif(${ASSERT_NOT_CORE}) -if(${BUILD_WITH_ANALYSIS}) - add_definitions(-DUSE_ANALYTICS) -endif() - target_include_directories( util PUBLIC "${TD_SOURCE_DIR}/include/util" From 989c20078ac6ac4cf956f3254b37f9a81799718b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 23 Jan 2025 00:32:46 +0800 Subject: [PATCH 7/9] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndMain.c | 2 +- source/dnode/mnode/impl/src/mndTelem.c | 40 ++++++++++++++------------ 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 03ccd7b530..b241af5adb 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -411,7 +411,7 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) { mndStreamConsensusChkpt(pMnode); } - if (sec % 20 == (TMIN(86400, (20 - 1)))) { + if (sec % tsTelemInterval == (TMIN(86400, (tsTelemInterval - 1)))) { mndPullupTelem(pMnode); } diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index 3172abacf4..5eee1ed3c4 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -97,32 +97,36 @@ static void mndBuildRuntimeInfo(SMnode* pMnode, SJson* pJson) { mndRetrieveAlgoList(pMnode, pFcList, pAdList); - SJson *items = tjsonAddArrayToObject(pJson, "forecast"); - TSDB_CHECK_NULL(items, code, lino, _OVER, terrno); + if (taosArrayGetSize(pFcList) > 0) { + SJson* items = tjsonAddArrayToObject(pJson, "forecast"); + TSDB_CHECK_NULL(items, code, lino, _OVER, terrno); - for(int32_t i = 0; i < taosArrayGetSize(pFcList); ++i) { - SJson *item = tjsonCreateObject(); + for (int32_t i = 0; i < taosArrayGetSize(pFcList); ++i) { + SJson* item = tjsonCreateObject(); - TSDB_CHECK_NULL(item, code, lino, _OVER, terrno); - TAOS_CHECK_GOTO(tjsonAddItemToArray(items, item), &lino, _OVER); + TSDB_CHECK_NULL(item, code, lino, _OVER, terrno); + TAOS_CHECK_GOTO(tjsonAddItemToArray(items, item), &lino, _OVER); - SAnodeAlgo *p = taosArrayGet(pFcList, i); - TSDB_CHECK_NULL(p, code, lino, _OVER, terrno); - TAOS_CHECK_GOTO(tjsonAddStringToObject(item, "name", p->name), &lino, _OVER); + SAnodeAlgo* p = taosArrayGet(pFcList, i); + TSDB_CHECK_NULL(p, code, lino, _OVER, terrno); + TAOS_CHECK_GOTO(tjsonAddStringToObject(item, "name", p->name), &lino, _OVER); + } } - SJson *items1 = tjsonAddArrayToObject(pJson, "anomaly_detection"); - TSDB_CHECK_NULL(items1, code, lino, _OVER, terrno); + if (taosArrayGetSize(pAdList) > 0) { + SJson* items1 = tjsonAddArrayToObject(pJson, "anomaly_detection"); + TSDB_CHECK_NULL(items1, code, lino, _OVER, terrno); - for(int32_t i = 0; i < taosArrayGetSize(pAdList); ++i) { - SJson *item = tjsonCreateObject(); + for (int32_t i = 0; i < taosArrayGetSize(pAdList); ++i) { + SJson* item = tjsonCreateObject(); - TSDB_CHECK_NULL(item, code, lino, _OVER, terrno); - TAOS_CHECK_GOTO(tjsonAddItemToArray(items1, item), &lino, _OVER); + TSDB_CHECK_NULL(item, code, lino, _OVER, terrno); + TAOS_CHECK_GOTO(tjsonAddItemToArray(items1, item), &lino, _OVER); - SAnodeAlgo *p = taosArrayGet(pAdList, i); - TSDB_CHECK_NULL(p, code, lino, _OVER, terrno); - TAOS_CHECK_GOTO(tjsonAddStringToObject(item, "name", p->name), &lino, _OVER); + SAnodeAlgo* p = taosArrayGet(pAdList, i); + TSDB_CHECK_NULL(p, code, lino, _OVER, terrno); + TAOS_CHECK_GOTO(tjsonAddStringToObject(item, "name", p->name), &lino, _OVER); + } } _OVER: From 55ca679571f8880c63f378596c243fca09a8a6c9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 23 Jan 2025 09:06:55 +0800 Subject: [PATCH 8/9] fix(analysis): fix compiling error on windows. --- source/dnode/mnode/impl/src/mndAnode.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndAnode.c b/source/dnode/mnode/impl/src/mndAnode.c index 34c177e0c9..9f5635a74b 100644 --- a/source/dnode/mnode/impl/src/mndAnode.c +++ b/source/dnode/mnode/impl/src/mndAnode.c @@ -926,5 +926,6 @@ int32_t mndInitAnode(SMnode *pMnode) { } void mndCleanupAnode(SMnode *pMnode) {} +void mndRetrieveAlgoList(SMnode *pMnode, SArray *pFc, SArray *pAd) {} #endif \ No newline at end of file From 02b32eea161a07948c59cd1727f3c32f5ab86728 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 24 Jan 2025 09:48:31 +0800 Subject: [PATCH 9/9] fix: cquery not quit issue --- source/libs/qworker/src/qworker.c | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 5dd43ca064..1df8dcda95 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -559,13 +559,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_ERR_JRET(ctx->pJobInfo->errCode); } - if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) { - QW_TASK_ELOG("query already end, phase:%d", phase); - QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR); - } - switch (phase) { case QW_PHASE_PRE_QUERY: { + if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) { + QW_TASK_ELOG("query already end, phase:%d", phase); + QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR); + } + if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_ELOG("task already dropped at phase %s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR); @@ -592,6 +592,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu break; } case QW_PHASE_PRE_FETCH: { + if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) { + QW_TASK_ELOG("query already end, phase:%d", phase); + QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR); + } + if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase)); QW_ERR_JRET(ctx->rspCode); @@ -614,6 +619,12 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu break; } case QW_PHASE_PRE_CQUERY: { + if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) { + QW_TASK_ELOG("query already end, phase:%d", phase); + code = ctx->rspCode; + goto _return; + } + if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_ERR_JRET(ctx->rspCode);