diff --git a/cmake/cmake.options b/cmake/cmake.options index e3b5782d85..4c69544466 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -166,6 +166,8 @@ IF(${BUILD_WITH_ANALYSIS}) set(BUILD_WITH_S3 ON) ENDIF() +set(BUILD_WITH_ANALYSIS ON) + IF(${BUILD_S3}) IF(${BUILD_WITH_S3}) diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index 39380a0644..6d2648502d 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -34,7 +34,9 @@ endif() target_include_directories( common + PUBLIC "$ENV{HOME}/.cos-local.2/include" PUBLIC "${TD_SOURCE_DIR}/include/common" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${GRANT_CFG_INCLUDE_DIR}" ) @@ -47,28 +49,33 @@ if(${TD_WINDOWS}) ) endif() +find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) +find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) +find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) + target_link_libraries( common + + PUBLIC ${CURL_LIBRARY} + PUBLIC ${SSL_LIBRARY} + PUBLIC ${CRYPTO_LIBRARY} + PUBLIC os PUBLIC util INTERFACE api ) + + if(${BUILD_S3}) if(${BUILD_WITH_S3}) - target_include_directories( - common - - PUBLIC "$ENV{HOME}/.cos-local.2/include" - ) - set(CMAKE_FIND_LIBRARY_SUFFIXES ".a") set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2) find_library(S3_LIBRARY s3) - find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) +# find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) find_library(XML2_LIBRARY xml2) - find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) - find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) +# find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) +# find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) target_link_libraries( common diff --git a/source/dnode/mnode/impl/src/mndAnode.c b/source/dnode/mnode/impl/src/mndAnode.c index c64208600a..de3f7a607c 100644 --- a/source/dnode/mnode/impl/src/mndAnode.c +++ b/source/dnode/mnode/impl/src/mndAnode.c @@ -24,8 +24,6 @@ #include "tanalytics.h" #include "tjson.h" -#ifdef USE_ANALYTICS - #define TSDB_ANODE_VER_NUMBER 1 #define TSDB_ANODE_RESERVE_SIZE 64 @@ -879,26 +877,4 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) { _OVER: tFreeRetrieveAnalAlgoRsp(&rsp); TAOS_RETURN(code); -} - -#else - -static int32_t mndProcessUnsupportReq(SRpcMsg *pReq) { return TSDB_CODE_OPS_NOT_SUPPORT; } -static int32_t mndRetrieveUnsupport(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { - return TSDB_CODE_OPS_NOT_SUPPORT; -} - -int32_t mndInitAnode(SMnode *pMnode) { - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ANODE, mndProcessUnsupportReq); - mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_ANODE, mndProcessUnsupportReq); - mndSetMsgHandle(pMnode, TDMT_MND_DROP_ANODE, mndProcessUnsupportReq); - mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_ANAL_ALGO, mndProcessUnsupportReq); - - mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ANODE, mndRetrieveUnsupport); - mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ANODE_FULL, mndRetrieveUnsupport); - return 0; -} - -void mndCleanupAnode(SMnode *pMnode) {} - -#endif \ No newline at end of file +} \ No newline at end of file diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index eb72edb964..8e539d52d9 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -26,8 +26,6 @@ #include "tjson.h" #include "ttime.h" -#ifdef USE_ANALYTICS - typedef struct { SArray* blocks; // SSDataBlock* SArray* windows; // STimeWindow @@ -659,13 +657,3 @@ _OVER: return code; } - -#else - -int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, - SOperatorInfo** pOptrInfo) { - return TSDB_CODE_OPS_NOT_SUPPORT; -} -void destroyForecastInfo(void* param) {} - -#endif diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index 2985e5e000..9eecad8644 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -26,8 +26,6 @@ #include "tfill.h" #include "ttime.h" -#ifdef USE_ANALYTICS - typedef struct { char algoName[TSDB_ANALYTIC_ALGO_NAME_LEN]; char algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN]; @@ -662,11 +660,3 @@ static void destroyForecastInfo(void* param) { taosMemoryFreeClear(param); } -#else - -int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, - SOperatorInfo** pOptrInfo) { - return TSDB_CODE_OPS_NOT_SUPPORT; -} - -#endif diff --git a/source/util/src/tanalytics.c b/source/util/src/tanalytics.c deleted file mode 100644 index bf2cb4fd07..0000000000 --- a/source/util/src/tanalytics.c +++ /dev/null @@ -1,795 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "tanalytics.h" -#include "ttypes.h" -#include "tutil.h" - -#ifdef USE_ANALYTICS -#include -#define ANALYTICS_ALOG_SPLIT_CHAR "," - -typedef struct { - int64_t ver; - SHashObj *hash; // algoname:algotype -> SAnalyticsUrl - TdThreadMutex lock; -} SAlgoMgmt; - -typedef struct { - char *data; - int64_t dataLen; -} SCurlResp; - -static SAlgoMgmt tsAlgos = {0}; -static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen); - -const char *taosAnalAlgoStr(EAnalAlgoType type) { - switch (type) { - case ANAL_ALGO_TYPE_ANOMALY_DETECT: - return "anomaly-detection"; - case ANAL_ALGO_TYPE_FORECAST: - return "forecast"; - default: - return "unknown"; - } -} - -const char *taosAnalAlgoUrlStr(EAnalAlgoType type) { - switch (type) { - case ANAL_ALGO_TYPE_ANOMALY_DETECT: - return "anomaly-detect"; - case ANAL_ALGO_TYPE_FORECAST: - return "forecast"; - default: - return "unknown"; - } -} - -EAnalAlgoType taosAnalAlgoInt(const char *name) { - for (EAnalAlgoType i = 0; i < ANAL_ALGO_TYPE_END; ++i) { - if (strcasecmp(name, taosAnalAlgoStr(i)) == 0) { - return i; - } - } - - return ANAL_ALGO_TYPE_END; -} - -int32_t taosAnalyticsInit() { - if (curl_global_init(CURL_GLOBAL_ALL) != 0) { - uError("failed to init curl"); - return -1; - } - - tsAlgos.ver = 0; - if (taosThreadMutexInit(&tsAlgos.lock, NULL) != 0) { - uError("failed to init algo mutex"); - return -1; - } - - tsAlgos.hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); - if (tsAlgos.hash == NULL) { - uError("failed to init algo hash"); - return -1; - } - - uInfo("analysis env is initialized"); - return 0; -} - -static void taosAnalFreeHash(SHashObj *hash) { - void *pIter = taosHashIterate(hash, NULL); - while (pIter != NULL) { - SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter; - taosMemoryFree(pUrl->url); - pIter = taosHashIterate(hash, pIter); - } - taosHashCleanup(hash); -} - -void taosAnalyticsCleanup() { - curl_global_cleanup(); - if (taosThreadMutexDestroy(&tsAlgos.lock) != 0) { - uError("failed to destroy anal lock"); - } - taosAnalFreeHash(tsAlgos.hash); - tsAlgos.hash = NULL; - uInfo("analysis env is cleaned up"); -} - -void taosAnalUpdate(int64_t newVer, SHashObj *pHash) { - if (newVer > tsAlgos.ver) { - if (taosThreadMutexLock(&tsAlgos.lock) == 0) { - SHashObj *hash = tsAlgos.hash; - tsAlgos.ver = newVer; - tsAlgos.hash = pHash; - if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) { - uError("failed to unlock hash") - } - taosAnalFreeHash(hash); - } - } else { - taosAnalFreeHash(pHash); - } -} - -bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { - char buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0}; - char *pStart = NULL; - char *pEnd = NULL; - - pStart = strstr(option, optName); - if (pStart == NULL) { - return false; - } - - pEnd = strstr(pStart, ANALYTICS_ALOG_SPLIT_CHAR); - if (optMaxLen > 0) { - if (pEnd > pStart) { - int32_t len = (int32_t)(pEnd - pStart); - len = MIN(len + 1, TSDB_ANALYTIC_ALGO_OPTION_LEN); - tstrncpy(buf, pStart, len); - } else { - int32_t len = MIN(tListLen(buf), strlen(pStart) + 1); - tstrncpy(buf, pStart, len); - } - - char *pRight = strstr(buf, "="); - if (pRight == NULL) { - return false; - } else { - pRight += 1; - } - - int32_t unused = strtrim(pRight); - - int32_t vLen = MIN(optMaxLen, strlen(pRight) + 1); - tstrncpy(optValue, pRight, vLen); - } - - return true; -} - -bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) { - char buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0}; - int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName); - - char *pos1 = strstr(option, buf); - char *pos2 = strstr(option, ANALYTICS_ALOG_SPLIT_CHAR); - if (pos1 != NULL) { - *optValue = taosStr2Int64(pos1 + bufLen, NULL, 10); - return true; - } else { - return false; - } -} - -int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { - int32_t code = 0; - char name[TSDB_ANALYTIC_ALGO_KEY_LEN] = {0}; - int32_t nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", type, algoName); - - char *unused = strntolower(name, name, nameLen); - - if (taosThreadMutexLock(&tsAlgos.lock) == 0) { - SAnalyticsUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen); - if (pUrl != NULL) { - tstrncpy(url, pUrl->url, urlLen); - uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url); - } else { - url[0] = 0; - terrno = TSDB_CODE_ANA_ALGO_NOT_FOUND; - code = terrno; - uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type)); - } - - if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) { - uError("failed to unlock hash"); - return TSDB_CODE_OUT_OF_MEMORY; - } - } - - return code; -} - -int64_t taosAnalGetVersion() { return tsAlgos.ver; } - -static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) { - SCurlResp *pRsp = userdata; - if (contLen == 0 || nmemb == 0 || pCont == NULL) { - pRsp->dataLen = 0; - pRsp->data = NULL; - uError("curl response is received, len:%" PRId64, pRsp->dataLen); - return 0; - } - - int64_t newDataSize = (int64_t) contLen * nmemb; - int64_t size = pRsp->dataLen + newDataSize; - - if (pRsp->data == NULL) { - pRsp->data = taosMemoryMalloc(size + 1); - if (pRsp->data == NULL) { - uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno)); - return 0; // return the recv length, if failed, return 0 - } - } else { - char* p = taosMemoryRealloc(pRsp->data, size + 1); - if (p == NULL) { - uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno)); - return 0; // return the recv length, if failed, return 0 - } - - pRsp->data = p; - } - - if (pRsp->data != NULL) { - (void)memcpy(pRsp->data + pRsp->dataLen, pCont, newDataSize); - - pRsp->dataLen = size; - pRsp->data[size] = 0; - - uDebugL("curl response is received, len:%" PRId64 ", content:%s", size, pRsp->data); - return newDataSize; - } else { - pRsp->dataLen = 0; - uError("failed to malloc curl response"); - return 0; - } -} - -static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp) { - CURL *curl = NULL; - CURLcode code = 0; - - curl = curl_easy_init(); - if (curl == NULL) { - uError("failed to create curl handle"); - return -1; - } - - if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 100) != 0) goto _OVER; - - uDebug("curl get request will sent, url:%s", url); - code = curl_easy_perform(curl); - if (code != CURLE_OK) { - uError("failed to perform curl action, code:%d", code); - } - -_OVER: - if (curl != NULL) curl_easy_cleanup(curl); - return code; -} - -static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen) { - struct curl_slist *headers = NULL; - CURL *curl = NULL; - CURLcode code = 0; - - curl = curl_easy_init(); - if (curl == NULL) { - uError("failed to create curl handle"); - return -1; - } - - headers = curl_slist_append(headers, "Content-Type:application/json;charset=UTF-8"); - if (curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 60000) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_POST, 1) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen) != 0) goto _OVER; - if (curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf) != 0) goto _OVER; - - uDebugL("curl post request will sent, url:%s len:%d content:%s", url, bufLen, buf); - code = curl_easy_perform(curl); - if (code != CURLE_OK) { - uError("failed to perform curl action, code:%d", code); - } - -_OVER: - if (curl != NULL) { - curl_slist_free_all(headers); - curl_easy_cleanup(curl); - } - return code; -} - -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { - int32_t code = -1; - char *pCont = NULL; - int64_t contentLen; - SJson *pJson = NULL; - SCurlResp curlRsp = {0}; - - if (type == ANALYTICS_HTTP_TYPE_GET) { - if (taosCurlGetRequest(url, &curlRsp) != 0) { - terrno = TSDB_CODE_ANA_URL_CANT_ACCESS; - goto _OVER; - } - } else { - code = taosAnalBufGetCont(pBuf, &pCont, &contentLen); - if (code != 0) { - terrno = code; - goto _OVER; - } - if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen) != 0) { - terrno = TSDB_CODE_ANA_URL_CANT_ACCESS; - goto _OVER; - } - } - - if (curlRsp.data == NULL || curlRsp.dataLen == 0) { - terrno = TSDB_CODE_ANA_URL_RSP_IS_NULL; - goto _OVER; - } - - pJson = tjsonParse(curlRsp.data); - if (pJson == NULL) { - if (curlRsp.data[0] == '<') { - terrno = TSDB_CODE_ANA_ANODE_RETURN_ERROR; - } else { - terrno = TSDB_CODE_INVALID_JSON_FORMAT; - } - goto _OVER; - } - -_OVER: - if (curlRsp.data != NULL) taosMemoryFreeClear(curlRsp.data); - if (pCont != NULL) taosMemoryFree(pCont); - return pJson; -} - -static int32_t taosAnalJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) { - int32_t code = 0; - int64_t contLen; - char *pCont = NULL; - TdFilePtr pFile = NULL; - - pFile = taosOpenFile(fileName, TD_FILE_READ); - if (pFile == NULL) { - code = terrno; - goto _OVER; - } - - code = taosFStatFile(pFile, &contLen, NULL); - if (code != 0) goto _OVER; - - pCont = taosMemoryMalloc(contLen + 1); - if (pCont == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - - if (taosReadFile(pFile, pCont, contLen) != contLen) { - code = terrno; - goto _OVER; - } - - pCont[contLen] = '\0'; - -_OVER: - if (code == 0) { - *ppCont = pCont; - *pContLen = contLen; - } else { - if (pCont != NULL) taosMemoryFree(pCont); - } - if (pFile != NULL) taosCloseFile(&pFile); - return code; -} - -static int32_t taosAnalJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { - char buf[64] = {0}; - int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal); - if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { - return terrno; - } - return 0; -} - -static int32_t taosAnalJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { - char buf[128] = {0}; - int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": \"%s\",\n", optName, optVal); - if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { - return terrno; - } - return 0; -} - -static int32_t taosAnalJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { - char buf[128] = {0}; - int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal); - if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { - return terrno; - } - return 0; -} - -static int32_t taosAnalJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen) { - if (bufLen <= 0) { - bufLen = strlen(buf); - } - if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { - return terrno; - } - return 0; -} - -static int32_t taosAnalJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalJsonBufWriteStr(pBuf, "{\n", 0); } - -static int32_t tsosAnalJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { - pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); - if (pBuf->filePtr == NULL) { - return terrno; - } - - pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalyticsColBuf)); - if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY; - pBuf->numOfCols = numOfCols; - - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) { - return taosAnalJsonBufWriteStart(pBuf); - } - - for (int32_t i = 0; i < numOfCols; ++i) { - SAnalyticsColBuf *pCol = &pBuf->pCols[i]; - snprintf(pCol->fileName, sizeof(pCol->fileName), "%s-c%d", pBuf->fileName, i); - pCol->filePtr = - taosOpenFile(pCol->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); - if (pCol->filePtr == NULL) { - return terrno; - } - } - - return taosAnalJsonBufWriteStart(pBuf); -} - -static int32_t taosAnalJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { - char buf[128] = {0}; - bool first = (colIndex == 0); - bool last = (colIndex == pBuf->numOfCols - 1); - - if (first) { - if (taosAnalJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) { - return terrno; - } - } - - int32_t bufLen = tsnprintf(buf, sizeof(buf), " [\"%s\", \"%s\", %d]%s\n", colName, tDataTypes[colType].name, - tDataTypes[colType].bytes, last ? "" : ","); - if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) { - return terrno; - } - - if (last) { - if (taosAnalJsonBufWriteStr(pBuf, "],\n", 0) != 0) { - return terrno; - } - } - - return 0; -} - -static int32_t taosAnalJsonBufWriteDataBegin(SAnalyticBuf *pBuf) { - return taosAnalJsonBufWriteStr(pBuf, "\"data\": [\n", 0); -} - -static int32_t taosAnalJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) { - if (bufLen <= 0) { - bufLen = strlen(buf); - } - - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) { - int32_t ret = taosWriteFile(pBuf->filePtr, buf, bufLen); - if (ret != bufLen) { - return terrno; - } - } else { - int32_t ret = taosWriteFile(pBuf->pCols[colIndex].filePtr, buf, bufLen); - if (ret != bufLen) { - return terrno; - } - } - - return 0; -} - -static int32_t taosAnalJsonBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { - return taosAnalJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex); -} - -static int32_t taosAnalJsonBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { - if (colIndex == pBuf->numOfCols - 1) { - return taosAnalJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex); - - } else { - return taosAnalJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex); - } -} - -static int32_t taosAnalJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { - char buf[64]; - int32_t bufLen = 0; - - if (pBuf->pCols[colIndex].numOfRows != 0) { - buf[bufLen] = ','; - buf[bufLen + 1] = '\n'; - buf[bufLen + 2] = 0; - bufLen += 2; - } - - switch (colType) { - case TSDB_DATA_TYPE_BOOL: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", (*((int8_t *)colValue) == 1) ? 1 : 0); - break; - case TSDB_DATA_TYPE_TINYINT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int8_t *)colValue); - break; - case TSDB_DATA_TYPE_UTINYINT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint8_t *)colValue); - break; - case TSDB_DATA_TYPE_SMALLINT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int16_t *)colValue); - break; - case TSDB_DATA_TYPE_USMALLINT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint16_t *)colValue); - break; - case TSDB_DATA_TYPE_INT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int32_t *)colValue); - break; - case TSDB_DATA_TYPE_UINT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint32_t *)colValue); - break; - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_TIMESTAMP: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRId64 "", *(int64_t *)colValue); - break; - case TSDB_DATA_TYPE_UBIGINT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRIu64 "", *(uint64_t *)colValue); - break; - case TSDB_DATA_TYPE_FLOAT: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_FLOAT_VAL(colValue)); - break; - case TSDB_DATA_TYPE_DOUBLE: - bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_DOUBLE_VAL(colValue)); - break; - default: - buf[bufLen] = '\0'; - } - - pBuf->pCols[colIndex].numOfRows++; - return taosAnalJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex); -} - -static int32_t taosAnalJsonBufWriteDataEnd(SAnalyticBuf *pBuf) { - int32_t code = 0; - char *pCont = NULL; - int64_t contLen = 0; - - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - for (int32_t i = 0; i < pBuf->numOfCols; ++i) { - SAnalyticsColBuf *pCol = &pBuf->pCols[i]; - - code = taosFsyncFile(pCol->filePtr); - if (code != 0) return code; - - code = taosCloseFile(&pCol->filePtr); - if (code != 0) return code; - - code = taosAnalJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen); - if (code != 0) return code; - - code = taosAnalJsonBufWriteStr(pBuf, pCont, contLen); - if (code != 0) return code; - - taosMemoryFreeClear(pCont); - contLen = 0; - } - } - - return taosAnalJsonBufWriteStr(pBuf, "],\n", 0); -} - -static int32_t taosAnalJsonBufWriteEnd(SAnalyticBuf *pBuf) { - int32_t code = taosAnalJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows); - if (code != 0) return code; - - return taosAnalJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0); -} - -int32_t taosAnalJsonBufClose(SAnalyticBuf *pBuf) { - int32_t code = taosAnalJsonBufWriteEnd(pBuf); - if (code != 0) return code; - - if (pBuf->filePtr != NULL) { - code = taosFsyncFile(pBuf->filePtr); - if (code != 0) return code; - code = taosCloseFile(&pBuf->filePtr); - if (code != 0) return code; - } - - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - for (int32_t i = 0; i < pBuf->numOfCols; ++i) { - SAnalyticsColBuf *pCol = &pBuf->pCols[i]; - if (pCol->filePtr != NULL) { - code = taosFsyncFile(pCol->filePtr); - if (code != 0) return code; - code = taosCloseFile(&pCol->filePtr); - if (code != 0) return code; - } - } - } - - return 0; -} - -void taosAnalBufDestroy(SAnalyticBuf *pBuf) { - if (pBuf->fileName[0] != 0) { - if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr); - // taosRemoveFile(pBuf->fileName); - pBuf->fileName[0] = 0; - } - - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - for (int32_t i = 0; i < pBuf->numOfCols; ++i) { - SAnalyticsColBuf *pCol = &pBuf->pCols[i]; - if (pCol->fileName[0] != 0) { - if (pCol->filePtr != NULL) (void)taosCloseFile(&pCol->filePtr); - if (taosRemoveFile(pCol->fileName) != 0) { - uError("failed to remove file %s", pCol->fileName); - } - pCol->fileName[0] = 0; - } - } - } - - taosMemoryFreeClear(pBuf->pCols); - pBuf->numOfCols = 0; -} - -int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return tsosAnalJsonBufOpen(pBuf, numOfCols); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteOptStr(pBuf, optName, optVal); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteOptInt(pBuf, optName, optVal); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteOptFloat(pBuf, optName, optVal); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteColMeta(pBuf, colIndex, colType, colName); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteDataBegin(pBuf); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteColBegin(pBuf, colIndex); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteColData(pBuf, colIndex, colType, colValue); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteColEnd(pBuf, colIndex); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufWriteDataEnd(pBuf); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufClose(pBuf); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen) { - *ppCont = NULL; - *pContLen = 0; - - if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { - return taosAnalJsonBufGetCont(pBuf->fileName, ppCont, pContLen); - } else { - return TSDB_CODE_ANA_BUF_INVALID_TYPE; - } -} - -#else - -int32_t taosAnalyticsInit() { return 0; } -void taosAnalyticsCleanup() {} -SJson *taosAnalSendReqRetJson(const char *url, EAnalHttpType type, SAnalyticBuf *pBuf) { return NULL; } - -int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; } -bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; } -bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValue) { return true; } -int64_t taosAnalGetVersion() { return 0; } -void taosAnalUpdate(int64_t newVer, SHashObj *pHash) {} - -int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; } -int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; } -int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; } -int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; } -int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; } -int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; } -int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } -int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; } -int32_t taosAnalBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; } -int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; } -int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { return 0; } -void taosAnalBufDestroy(SAnalyticBuf *pBuf) {} - -const char *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; } -EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; } -const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; } - -#endif \ No newline at end of file