Merge pull request #29632 from taosdata/enh/analysis
enh(analysis): enable community edition to support anodes.
This commit is contained in:
commit
bc0570f9ad
|
@ -166,6 +166,10 @@ IF(${BUILD_WITH_ANALYSIS})
|
|||
set(BUILD_WITH_S3 ON)
|
||||
ENDIF()
|
||||
|
||||
IF(${TD_LINUX})
|
||||
set(BUILD_WITH_ANALYSIS ON)
|
||||
ENDIF()
|
||||
|
||||
IF(${BUILD_S3})
|
||||
|
||||
IF(${BUILD_WITH_S3})
|
||||
|
@ -205,13 +209,6 @@ option(
|
|||
off
|
||||
)
|
||||
|
||||
|
||||
option(
|
||||
BUILD_WITH_NURAFT
|
||||
"If build with NuRaft"
|
||||
OFF
|
||||
)
|
||||
|
||||
option(
|
||||
BUILD_WITH_UV
|
||||
"If build with libuv"
|
||||
|
|
|
@ -17,7 +17,6 @@ elseif(${BUILD_WITH_COS})
|
|||
file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.1/)
|
||||
cat("${TD_SUPPORT_DIR}/mxml_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
|
||||
cat("${TD_SUPPORT_DIR}/apr_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
|
||||
cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
|
||||
endif(${BUILD_WITH_COS})
|
||||
|
||||
configure_file(${CONTRIB_TMP_FILE3} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt")
|
||||
|
@ -148,9 +147,7 @@ endif(${BUILD_WITH_SQLITE})
|
|||
|
||||
# s3
|
||||
if(${BUILD_WITH_S3})
|
||||
cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
cat("${TD_SUPPORT_DIR}/xml2_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
cat("${TD_SUPPORT_DIR}/libs3_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
cat("${TD_SUPPORT_DIR}/azure_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
add_definitions(-DUSE_S3)
|
||||
|
@ -183,6 +180,11 @@ if(${BUILD_GEOS})
|
|||
cat("${TD_SUPPORT_DIR}/geos_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
endif()
|
||||
|
||||
if (${BUILD_WITH_ANALYSIS})
|
||||
cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
endif()
|
||||
|
||||
#
|
||||
if(${BUILD_PCRE2})
|
||||
cat("${TD_SUPPORT_DIR}/pcre2_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
|
|
|
@ -20,14 +20,6 @@ if(${BUILD_WITH_SQLITE})
|
|||
add_subdirectory(sqlite)
|
||||
endif(${BUILD_WITH_SQLITE})
|
||||
|
||||
if(${BUILD_WITH_CRAFT})
|
||||
add_subdirectory(craft)
|
||||
endif(${BUILD_WITH_CRAFT})
|
||||
|
||||
if(${BUILD_WITH_TRAFT})
|
||||
# add_subdirectory(traft)
|
||||
endif(${BUILD_WITH_TRAFT})
|
||||
|
||||
if(${BUILD_S3})
|
||||
add_subdirectory(azure)
|
||||
endif()
|
||||
|
|
|
@ -86,7 +86,7 @@ int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf);
|
|||
int32_t taosAnalBufClose(SAnalyticBuf *pBuf);
|
||||
void taosAnalBufDestroy(SAnalyticBuf *pBuf);
|
||||
|
||||
const char *taosAnalAlgoStr(EAnalAlgoType algoType);
|
||||
const char *taosAnalysisAlgoType(EAnalAlgoType algoType);
|
||||
EAnalAlgoType taosAnalAlgoInt(const char *algoName);
|
||||
const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType);
|
||||
|
||||
|
|
|
@ -15,6 +15,10 @@ if(DEFINED GRANT_CFG_INCLUDE_DIR)
|
|||
add_definitions(-DGRANTS_CFG)
|
||||
endif()
|
||||
|
||||
if(${BUILD_WITH_ANALYSIS})
|
||||
add_definitions(-DUSE_ANALYTICS)
|
||||
endif()
|
||||
|
||||
if(TD_GRANT)
|
||||
ADD_DEFINITIONS(-D_GRANT)
|
||||
endif()
|
||||
|
@ -34,7 +38,9 @@ endif()
|
|||
|
||||
target_include_directories(
|
||||
common
|
||||
PUBLIC "$ENV{HOME}/.cos-local.2/include"
|
||||
PUBLIC "${TD_SOURCE_DIR}/include/common"
|
||||
|
||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||
PRIVATE "${GRANT_CFG_INCLUDE_DIR}"
|
||||
)
|
||||
|
@ -45,30 +51,39 @@ if(${TD_WINDOWS})
|
|||
PRIVATE "${TD_SOURCE_DIR}/contrib/pthread"
|
||||
PRIVATE "${TD_SOURCE_DIR}/contrib/msvcregex"
|
||||
)
|
||||
endif()
|
||||
|
||||
target_link_libraries(
|
||||
target_link_libraries(
|
||||
common
|
||||
|
||||
PUBLIC os
|
||||
PUBLIC util
|
||||
INTERFACE api
|
||||
)
|
||||
)
|
||||
|
||||
else()
|
||||
find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
|
||||
find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
|
||||
find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
|
||||
|
||||
target_link_libraries(
|
||||
common
|
||||
|
||||
PUBLIC ${CURL_LIBRARY}
|
||||
PUBLIC ${SSL_LIBRARY}
|
||||
PUBLIC ${CRYPTO_LIBRARY}
|
||||
|
||||
PUBLIC os
|
||||
PUBLIC util
|
||||
INTERFACE api
|
||||
)
|
||||
endif()
|
||||
|
||||
if(${BUILD_S3})
|
||||
if(${BUILD_WITH_S3})
|
||||
target_include_directories(
|
||||
common
|
||||
|
||||
PUBLIC "$ENV{HOME}/.cos-local.2/include"
|
||||
)
|
||||
|
||||
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
|
||||
set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2)
|
||||
find_library(S3_LIBRARY s3)
|
||||
find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
|
||||
find_library(XML2_LIBRARY xml2)
|
||||
find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
|
||||
find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
|
||||
target_link_libraries(
|
||||
common
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ typedef struct {
|
|||
static SAlgoMgmt tsAlgos = {0};
|
||||
static int32_t taosAnalBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen);
|
||||
|
||||
const char *taosAnalAlgoStr(EAnalAlgoType type) {
|
||||
const char *taosAnalysisAlgoType(EAnalAlgoType type) {
|
||||
switch (type) {
|
||||
case ANAL_ALGO_TYPE_ANOMALY_DETECT:
|
||||
return "anomaly-detection";
|
||||
|
@ -60,7 +60,7 @@ const char *taosAnalAlgoUrlStr(EAnalAlgoType type) {
|
|||
|
||||
EAnalAlgoType taosAnalAlgoInt(const char *name) {
|
||||
for (EAnalAlgoType i = 0; i < ANAL_ALGO_TYPE_END; ++i) {
|
||||
if (strcasecmp(name, taosAnalAlgoStr(i)) == 0) {
|
||||
if (strcasecmp(name, taosAnalysisAlgoType(i)) == 0) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
|
@ -188,12 +188,12 @@ int32_t taosAnalGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url,
|
|||
SAnalyticsUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen);
|
||||
if (pUrl != NULL) {
|
||||
tstrncpy(url, pUrl->url, urlLen);
|
||||
uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalAlgoStr(type), url);
|
||||
uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalysisAlgoType(type), url);
|
||||
} else {
|
||||
url[0] = 0;
|
||||
terrno = TSDB_CODE_ANA_ALGO_NOT_FOUND;
|
||||
code = terrno;
|
||||
uError("algo:%s, type:%s, url not found", algoName, taosAnalAlgoStr(type));
|
||||
uError("algo:%s, type:%s, url not found", algoName, taosAnalysisAlgoType(type));
|
||||
}
|
||||
|
||||
if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) {
|
||||
|
@ -216,19 +216,19 @@ static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void
|
|||
return 0;
|
||||
}
|
||||
|
||||
int64_t newDataSize = (int64_t) contLen * nmemb;
|
||||
int64_t newDataSize = (int64_t)contLen * nmemb;
|
||||
int64_t size = pRsp->dataLen + newDataSize;
|
||||
|
||||
if (pRsp->data == NULL) {
|
||||
pRsp->data = taosMemoryMalloc(size + 1);
|
||||
if (pRsp->data == NULL) {
|
||||
uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno));
|
||||
uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno));
|
||||
return 0; // return the recv length, if failed, return 0
|
||||
}
|
||||
} else {
|
||||
char* p = taosMemoryRealloc(pRsp->data, size + 1);
|
||||
char *p = taosMemoryRealloc(pRsp->data, size + 1);
|
||||
if (p == NULL) {
|
||||
uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t) size + 1, tstrerror(terrno));
|
||||
uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno));
|
||||
return 0; // return the recv length, if failed, return 0
|
||||
}
|
||||
|
||||
|
@ -779,7 +779,9 @@ int32_t tsosAnalBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; }
|
|||
int32_t taosAnalBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; }
|
||||
int32_t taosAnalBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; }
|
||||
int32_t taosAnalBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; }
|
||||
int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) { return 0; }
|
||||
int32_t taosAnalBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
|
||||
return 0;
|
||||
}
|
||||
int32_t taosAnalBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; }
|
||||
int32_t taosAnalBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
|
||||
int32_t taosAnalBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; }
|
||||
|
@ -788,7 +790,7 @@ int32_t taosAnalBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; }
|
|||
int32_t taosAnalBufClose(SAnalyticBuf *pBuf) { return 0; }
|
||||
void taosAnalBufDestroy(SAnalyticBuf *pBuf) {}
|
||||
|
||||
const char *taosAnalAlgoStr(EAnalAlgoType algoType) { return 0; }
|
||||
const char *taosAnalysisAlgoType(EAnalAlgoType algoType) { return 0; }
|
||||
EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; }
|
||||
const char *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; }
|
||||
|
|
@ -16,10 +16,10 @@ if(TD_ENTERPRISE)
|
|||
ELSEIF(${BUILD_WITH_COS})
|
||||
add_definitions(-DUSE_COS)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
if(${BUILD_WITH_ANALYSIS})
|
||||
if(${BUILD_WITH_ANALYSIS})
|
||||
add_definitions(-DUSE_ANALYTICS)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
add_library(mnode STATIC ${MNODE_SRC})
|
||||
|
|
|
@ -22,8 +22,9 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
int32_t mndInitAnode(SMnode *pMnode);
|
||||
void mndCleanupAnode(SMnode *pMnode);
|
||||
int32_t mndInitAnode(SMnode* pMnode);
|
||||
void mndCleanupAnode(SMnode* pMnode);
|
||||
void mndRetrieveAlgoList(SMnode* pMnode, SArray* pFc, SArray* pAd);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -637,6 +637,32 @@ static void mndCancelGetNextAnode(SMnode *pMnode, void *pIter) {
|
|||
sdbCancelFetchByType(pSdb, pIter, SDB_ANODE);
|
||||
}
|
||||
|
||||
// todo handle multiple anode case, remove the duplicate algos
|
||||
void mndRetrieveAlgoList(SMnode* pMnode, SArray* pFc, SArray* pAd) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SAnodeObj *pObj = NULL;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_ANODE, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (pObj->numOfAlgos >= ANAL_ALGO_TYPE_END) {
|
||||
if (pObj->algos[ANAL_ALGO_TYPE_ANOMALY_DETECT] != NULL) {
|
||||
taosArrayAddAll(pAd, pObj->algos[ANAL_ALGO_TYPE_ANOMALY_DETECT]);
|
||||
}
|
||||
|
||||
if (pObj->algos[ANAL_ALGO_TYPE_FORECAST] != NULL) {
|
||||
taosArrayAddAll(pFc, pObj->algos[ANAL_ALGO_TYPE_FORECAST]);
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveAnodesFull(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
@ -661,7 +687,7 @@ static int32_t mndRetrieveAnodesFull(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
|||
code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
|
||||
if (code != 0) goto _end;
|
||||
|
||||
STR_TO_VARSTR(buf, taosAnalAlgoStr(t));
|
||||
STR_TO_VARSTR(buf, taosAnalysisAlgoType(t));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, buf, false);
|
||||
if (code != 0) goto _end;
|
||||
|
@ -900,5 +926,6 @@ int32_t mndInitAnode(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
void mndCleanupAnode(SMnode *pMnode) {}
|
||||
void mndRetrieveAlgoList(SMnode *pMnode, SArray *pFc, SArray *pAd) {}
|
||||
|
||||
#endif
|
|
@ -19,6 +19,7 @@
|
|||
#include "mndSync.h"
|
||||
#include "thttp.h"
|
||||
#include "tjson.h"
|
||||
#include "mndAnode.h"
|
||||
|
||||
typedef struct {
|
||||
int64_t numOfDnode;
|
||||
|
@ -32,6 +33,7 @@ typedef struct {
|
|||
int64_t totalPoints;
|
||||
int64_t totalStorage;
|
||||
int64_t compStorage;
|
||||
int32_t numOfAnalysisAlgos;
|
||||
} SMnodeStat;
|
||||
|
||||
static void mndGetStat(SMnode* pMnode, SMnodeStat* pStat) {
|
||||
|
@ -58,18 +60,21 @@ static void mndGetStat(SMnode* pMnode, SMnodeStat* pStat) {
|
|||
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
}
|
||||
|
||||
pStat->numOfChildTable = 100;
|
||||
pStat->numOfColumn = 200;
|
||||
pStat->totalPoints = 300;
|
||||
pStat->totalStorage = 400;
|
||||
pStat->compStorage = 500;
|
||||
static int32_t algoToJson(const void* pObj, SJson* pJson) {
|
||||
const SAnodeAlgo* pNode = (const SAnodeAlgo*)pObj;
|
||||
int32_t code = tjsonAddStringToObject(pJson, "name", pNode->name);
|
||||
return code;
|
||||
}
|
||||
|
||||
static void mndBuildRuntimeInfo(SMnode* pMnode, SJson* pJson) {
|
||||
SMnodeStat mstat = {0};
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SArray* pFcList = NULL;
|
||||
SArray* pAdList = NULL;
|
||||
|
||||
mndGetStat(pMnode, &mstat);
|
||||
|
||||
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfDnode", mstat.numOfDnode), &lino, _OVER);
|
||||
|
@ -82,8 +87,55 @@ static void mndBuildRuntimeInfo(SMnode* pMnode, SJson* pJson) {
|
|||
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfPoint", mstat.totalPoints), &lino, _OVER);
|
||||
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "totalStorage", mstat.totalStorage), &lino, _OVER);
|
||||
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "compStorage", mstat.compStorage), &lino, _OVER);
|
||||
|
||||
pFcList = taosArrayInit(4, sizeof(SAnodeAlgo));
|
||||
pAdList = taosArrayInit(4, sizeof(SAnodeAlgo));
|
||||
if (pFcList == NULL || pAdList == NULL) {
|
||||
lino = __LINE__;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
mndRetrieveAlgoList(pMnode, pFcList, pAdList);
|
||||
|
||||
if (taosArrayGetSize(pFcList) > 0) {
|
||||
SJson* items = tjsonAddArrayToObject(pJson, "forecast");
|
||||
TSDB_CHECK_NULL(items, code, lino, _OVER, terrno);
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pFcList); ++i) {
|
||||
SJson* item = tjsonCreateObject();
|
||||
|
||||
TSDB_CHECK_NULL(item, code, lino, _OVER, terrno);
|
||||
TAOS_CHECK_GOTO(tjsonAddItemToArray(items, item), &lino, _OVER);
|
||||
|
||||
SAnodeAlgo* p = taosArrayGet(pFcList, i);
|
||||
TSDB_CHECK_NULL(p, code, lino, _OVER, terrno);
|
||||
TAOS_CHECK_GOTO(tjsonAddStringToObject(item, "name", p->name), &lino, _OVER);
|
||||
}
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pAdList) > 0) {
|
||||
SJson* items1 = tjsonAddArrayToObject(pJson, "anomaly_detection");
|
||||
TSDB_CHECK_NULL(items1, code, lino, _OVER, terrno);
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pAdList); ++i) {
|
||||
SJson* item = tjsonCreateObject();
|
||||
|
||||
TSDB_CHECK_NULL(item, code, lino, _OVER, terrno);
|
||||
TAOS_CHECK_GOTO(tjsonAddItemToArray(items1, item), &lino, _OVER);
|
||||
|
||||
SAnodeAlgo* p = taosArrayGet(pAdList, i);
|
||||
TSDB_CHECK_NULL(p, code, lino, _OVER, terrno);
|
||||
TAOS_CHECK_GOTO(tjsonAddStringToObject(item, "name", p->name), &lino, _OVER);
|
||||
}
|
||||
}
|
||||
|
||||
_OVER:
|
||||
if (code != 0) mError("failed to mndBuildRuntimeInfo at line:%d since %s", lino, tstrerror(code));
|
||||
taosArrayDestroy(pFcList);
|
||||
taosArrayDestroy(pAdList);
|
||||
|
||||
if (code != 0) {
|
||||
mError("failed to mndBuildRuntimeInfo at line:%d since %s", lino, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
static char* mndBuildTelemetryReport(SMnode* pMnode) {
|
||||
|
@ -136,21 +188,24 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) {
|
|||
int32_t line = 0;
|
||||
SMnode* pMnode = pReq->info.node;
|
||||
STelemMgmt* pMgmt = &pMnode->telemMgmt;
|
||||
if (!tsEnableTelem) return 0;
|
||||
|
||||
if (!tsEnableTelem) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
(void)taosThreadMutexLock(&pMgmt->lock);
|
||||
char* pCont = mndBuildTelemetryReport(pMnode);
|
||||
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||
|
||||
if (pCont == NULL) {
|
||||
return 0;
|
||||
}
|
||||
TSDB_CHECK_NULL(pCont, code, line, _end, terrno);
|
||||
|
||||
code = taosSendTelemReport(&pMgmt->addrMgt, tsTelemUri, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT);
|
||||
taosMemoryFree(pCont);
|
||||
return code;
|
||||
|
||||
_end:
|
||||
if (code != 0) {
|
||||
mError("%s failed to send at line %d since %s", __func__, line, tstrerror(code));
|
||||
mError("%s failed to send telemetry report, line %d since %s", __func__, line, tstrerror(code));
|
||||
}
|
||||
taosMemoryFree(pCont);
|
||||
return code;
|
||||
|
@ -161,15 +216,17 @@ int32_t mndInitTelem(SMnode* pMnode) {
|
|||
STelemMgmt* pMgmt = &pMnode->telemMgmt;
|
||||
|
||||
(void)taosThreadMutexInit(&pMgmt->lock, NULL);
|
||||
if ((code = taosGetEmail(pMgmt->email, sizeof(pMgmt->email))) != 0)
|
||||
if ((code = taosGetEmail(pMgmt->email, sizeof(pMgmt->email))) != 0) {
|
||||
mWarn("failed to get email since %s", tstrerror(code));
|
||||
}
|
||||
|
||||
code = taosTelemetryMgtInit(&pMgmt->addrMgt, tsTelemServer);
|
||||
if (code != 0) {
|
||||
mError("failed to init telemetry management since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer);
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -12,13 +12,12 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "executorInt.h"
|
||||
#include "filter.h"
|
||||
#include "function.h"
|
||||
#include "functionMgt.h"
|
||||
#include "operator.h"
|
||||
#include "querytask.h"
|
||||
#include "storageapi.h"
|
||||
#include "tanalytics.h"
|
||||
#include "tcommon.h"
|
||||
#include "tcompare.h"
|
||||
|
@ -178,7 +177,6 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp) {
|
|||
code = taosAnalBufWriteOptInt(pBuf, "start", start);
|
||||
if (code != 0) return code;
|
||||
|
||||
|
||||
bool hasEvery = taosAnalGetOptInt(pSupp->algoOpt, "every", &every);
|
||||
if (!hasEvery) {
|
||||
qDebug("forecast every not found from %s, use %" PRId64, pSupp->algoOpt, every);
|
||||
|
|
|
@ -559,13 +559,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
|||
QW_ERR_JRET(ctx->pJobInfo->errCode);
|
||||
}
|
||||
|
||||
switch (phase) {
|
||||
case QW_PHASE_PRE_QUERY: {
|
||||
if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) {
|
||||
QW_TASK_ELOG("query already end, phase:%d", phase);
|
||||
QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
|
||||
}
|
||||
|
||||
switch (phase) {
|
||||
case QW_PHASE_PRE_QUERY: {
|
||||
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
||||
QW_TASK_ELOG("task already dropped at phase %s", qwPhaseStr(phase));
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
|
||||
|
@ -592,6 +592,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
|||
break;
|
||||
}
|
||||
case QW_PHASE_PRE_FETCH: {
|
||||
if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) {
|
||||
QW_TASK_ELOG("query already end, phase:%d", phase);
|
||||
QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
|
||||
}
|
||||
|
||||
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
||||
QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
|
||||
QW_ERR_JRET(ctx->rspCode);
|
||||
|
@ -614,6 +619,12 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
|||
break;
|
||||
}
|
||||
case QW_PHASE_PRE_CQUERY: {
|
||||
if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) {
|
||||
QW_TASK_ELOG("query already end, phase:%d", phase);
|
||||
code = ctx->rspCode;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
||||
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
|
||||
QW_ERR_JRET(ctx->rspCode);
|
||||
|
|
|
@ -17,10 +17,6 @@ else()
|
|||
MESSAGE(STATUS "enable assert core")
|
||||
endif(${ASSERT_NOT_CORE})
|
||||
|
||||
if(${BUILD_WITH_ANALYSIS})
|
||||
add_definitions(-DUSE_ANALYTICS)
|
||||
endif()
|
||||
|
||||
target_include_directories(
|
||||
util
|
||||
PUBLIC "${TD_SOURCE_DIR}/include/util"
|
||||
|
|
Loading…
Reference in New Issue