From d69c5b1840cfdd707c2ac3886290ab0dceb468d3 Mon Sep 17 00:00:00 2001 From: xjzhou Date: Tue, 2 Jul 2024 10:18:56 +0800 Subject: [PATCH 01/20] isStmtBind --- source/libs/parser/src/parInsertSql.c | 16 ++- tests/taosc_test/taoscTest.cpp | 194 +++++++++++++++++++++++--- 2 files changed, 180 insertions(+), 30 deletions(-) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 9393a62e26..b053cd95a0 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -30,6 +30,7 @@ typedef struct SInsertParseContext { bool forceUpdate; bool needTableTagVal; bool needRequest; // whether or not request server + bool isStmtBind; // whether is stmt } SInsertParseContext; typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param); @@ -1978,7 +1979,6 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow, SToken* pToken) { SBoundColInfo* pCols = &pTableCxt->boundColsInfo; - bool isParseBindParam = false; SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta); int32_t code = TSDB_CODE_SUCCESS; @@ -1996,7 +1996,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC SColVal* pVal = taosArrayGet(pTableCxt->pValues, pCols->pColIndex[i]); if (pToken->type == TK_NK_QUESTION) { - isParseBindParam = true; + pCxt->isStmtBind = true; if (NULL == pCxt->pComCxt->pStmtCb) { code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pToken->z); break; @@ -2007,8 +2007,8 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC break; } - if (isParseBindParam) { - code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values"); + if (pCxt->isStmtBind) { + code = buildInvalidOperationMsg(&pCxt->msg, "stmt bind param does not support normal value in sql"); break; } @@ -2025,7 +2025,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC } } - if (TSDB_CODE_SUCCESS == code && !isParseBindParam) { + if (TSDB_CODE_SUCCESS == code && !pCxt->isStmtBind) { SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); if (TSDB_CODE_SUCCESS == code) { @@ -2035,7 +2035,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC } } - if (TSDB_CODE_SUCCESS == code && !isParseBindParam) { + if (TSDB_CODE_SUCCESS == code && !pCxt->isStmtBind) { *pGotRow = true; } @@ -2410,6 +2410,7 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif } if (TK_NK_QUESTION == pTbName->type) { + pCxt->isStmtBind = true; if (NULL == pCxt->pComCxt->pStmtCb) { return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pTbName->z); } @@ -2935,7 +2936,8 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal .missCache = false, .usingDuplicateTable = false, .needRequest = true, - .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)}; + .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false), + .isStmtBind = false}; int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery); if (TSDB_CODE_SUCCESS == code) { diff --git a/tests/taosc_test/taoscTest.cpp b/tests/taosc_test/taoscTest.cpp index 3f49b11b70..d3f6f50547 100644 --- a/tests/taosc_test/taoscTest.cpp +++ b/tests/taosc_test/taoscTest.cpp @@ -32,29 +32,29 @@ class taoscTest : public ::testing::Test { protected: static void SetUpTestCase() { - printf("start test setup.\n"); - TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_TRUE(taos != nullptr); - - TAOS_RES* res = taos_query(taos, "drop database IF EXISTS taosc_test_db;"); - if (taos_errno(res) != 0) { - printf("error in drop database taosc_test_db, reason:%s\n", taos_errstr(res)); - return; - } - taosSsleep(5); - taos_free_result(res); - printf("drop database taosc_test_db,finished.\n"); - - res = taos_query(taos, "create database taosc_test_db;"); - if (taos_errno(res) != 0) { - printf("error in create database taosc_test_db, reason:%s\n", taos_errstr(res)); - return; - } - taosSsleep(5); - taos_free_result(res); - printf("create database taosc_test_db,finished.\n"); - - taos_close(taos); +// printf("start test setup.\n"); +// TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0); +// ASSERT_TRUE(taos != nullptr); +// +// TAOS_RES* res = taos_query(taos, "drop database IF EXISTS taosc_test_db;"); +// if (taos_errno(res) != 0) { +// printf("error in drop database taosc_test_db, reason:%s\n", taos_errstr(res)); +// return; +// } +// taosSsleep(5); +// taos_free_result(res); +// printf("drop database taosc_test_db,finished.\n"); +// +// res = taos_query(taos, "create database taosc_test_db;"); +// if (taos_errno(res) != 0) { +// printf("error in create database taosc_test_db, reason:%s\n", taos_errstr(res)); +// return; +// } +// taosSsleep(5); +// taos_free_result(res); +// printf("create database taosc_test_db,finished.\n"); +// +// taos_close(taos); } static void TearDownTestCase() {} @@ -99,6 +99,154 @@ void queryCallback(void* param, void* res, int32_t code) { taos_fetch_raw_block_a(res, fetchCallback, param); } +/** + * @brief execute sql only. + * + * @param taos + * @param sql + */ +void executeSQL(TAOS *taos, const char *sql) { + TAOS_RES *res = taos_query(taos, sql); + int code = taos_errno(res); + if (code != 0) { + printf("%s\n", taos_errstr(res)); + taos_free_result(res); + taos_close(taos); + exit(EXIT_FAILURE); + } + taos_free_result(res); +} + +/** + * @brief check return status and exit program when error occur. + * + * @param stmt + * @param code + * @param msg + */ +void checkErrorCode(TAOS_STMT *stmt, int code, const char* msg) { + if (code != 0) { + printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt)); + taos_stmt_close(stmt); + exit(EXIT_FAILURE); + } +} + +typedef struct { + int64_t ts; + float current; + int voltage; + float phase; +} Row; + + +/** + * @brief insert data using stmt API + * + * @param taos + */ +void insertData(TAOS *taos) { + // init + TAOS_STMT *stmt = taos_stmt_init(taos); + // prepare +// const char *sql = "INSERT INTO ?.d1001 USING meters TAGS(?, ?) values(?, ?, ?, ?)"; +// const char *sql = "INSERT INTO ?.? USING meters TAGS(?, ?) values(?, ?, ?, ?)"; +// const char *sql = "INSERT INTO power.? USING meters TAGS(?, ?) values(?, ?, ?, ?)"; +// const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) values(?, ?, ?, ?)"; +// const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) values(?, ?, ?, ?)"; + const char *sql = "insert into huawei USING meters TAGS(?, ?) values(?, ?, ?, ?)"; + int code = taos_stmt_prepare(stmt, sql, 0); + checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare"); + // bind table name and tags + TAOS_MULTI_BIND tags[2]; + char *location = "California.SanFrancisco"; + int groupId = 2; + tags[0].buffer_type = TSDB_DATA_TYPE_BINARY; + tags[0].buffer_length = strlen(location); + tags[0].length = (int32_t *)&tags[0].buffer_length; + tags[0].buffer = location; + tags[0].is_null = NULL; + + tags[1].buffer_type = TSDB_DATA_TYPE_INT; + tags[1].buffer_length = sizeof(int); + tags[1].length = (int32_t *)&tags[1].buffer_length; + tags[1].buffer = &groupId; + tags[1].is_null = NULL; + +// code = taos_stmt_set_tbname_tags(stmt, "duck", tags); +// checkErrorCode(stmt, code, "failed to execute taos_stmt_set_dbname_tbname_tags"); + + // insert two rows with multi binds + TAOS_MULTI_BIND params[4]; + // values to bind + int64_t ts[] = {1648432611250, 1648432611778}; + float current[] = {10.3, 12.6}; + int voltage[] = {219, 218}; + float phase[] = {0.31, 0.33}; + // is_null array + char is_null[2] = {0}; + // length array + int32_t int64Len[2] = {sizeof(int64_t)}; + int32_t floatLen[2] = {sizeof(float)}; + int32_t intLen[2] = {sizeof(int)}; + + params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[0].buffer_length = sizeof(int64_t); + params[0].buffer = ts; + params[0].length = int64Len; + params[0].is_null = is_null; + params[0].num = 2; + + params[1].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[1].buffer_length = sizeof(float); + params[1].buffer = current; + params[1].length = floatLen; + params[1].is_null = is_null; + params[1].num = 2; + + params[2].buffer_type = TSDB_DATA_TYPE_INT; + params[2].buffer_length = sizeof(int); + params[2].buffer = voltage; + params[2].length = intLen; + params[2].is_null = is_null; + params[2].num = 2; + + params[3].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[3].buffer_length = sizeof(float); + params[3].buffer = phase; + params[3].length = floatLen; + params[3].is_null = is_null; + params[3].num = 2; + + code = taos_stmt_bind_param_batch(stmt, params); // bind batch + checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param_batch"); + code = taos_stmt_add_batch(stmt); // add batch + checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch"); + // execute + code = taos_stmt_execute(stmt); + checkErrorCode(stmt, code, "failed to execute taos_stmt_execute"); + int affectedRows = taos_stmt_affected_rows(stmt); + printf("successfully inserted %d rows\n", affectedRows); + + // close + taos_stmt_close(stmt); +} + +TEST_F(taoscTest, taos_stmt_test) { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 6030); + if (taos == NULL) { + printf("failed to connect to server"); + exit(EXIT_FAILURE); + } +// executeSQL(taos, "drop database if exists power"); +// executeSQL(taos, "create database power"); + executeSQL(taos, "use power"); +// executeSQL(taos, "create stable meters (ts timestamp, current float, voltage int, phase float) tags (location binary(64), groupId int)"); + insertData(taos); + taos_close(taos); + taos_cleanup(); +} + TEST_F(taoscTest, taos_query_a_test) { char sql[1024] = {0}; int32_t code = 0; From 88aa15e944f88c9b44d148fcc2efc07520778f7b Mon Sep 17 00:00:00 2001 From: xjzhou Date: Tue, 2 Jul 2024 11:05:54 +0800 Subject: [PATCH 02/20] enh: Enhance error handling for stmt --- include/libs/parser/parser.h | 1 + source/client/inc/clientInt.h | 1 + source/client/src/clientImpl.c | 4 +++- source/client/src/clientStmt.c | 1 + source/libs/parser/src/parInsertSql.c | 2 +- 5 files changed, 7 insertions(+), 2 deletions(-) diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index ad41b9a542..3ac357055e 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -89,6 +89,7 @@ typedef struct SParseContext { bool isView; bool isAudit; bool nodeOffline; + bool isStmtBind; const char* svrVer; SArray* pTableMetaPos; // sql table pos => catalog data pos SArray* pTableVgroupPos; // sql table pos => catalog data pos diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 7a84215e12..d05abb2051 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -283,6 +283,7 @@ typedef struct SRequestObj { bool inRetry; bool isSubReq; bool inCallback; + bool isStmtBind; // is statement bind parameter uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t retry; int64_t allocatorRefId; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 11d3797157..080e2dc32a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -206,6 +206,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, (*pRequest)->sqlstr[sqlLen] = 0; (*pRequest)->sqlLen = sqlLen; (*pRequest)->validateOnly = validateSql; + (*pRequest)->isStmtBind = false; ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param; @@ -266,7 +267,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), .enableSysInfo = pTscObj->sysInfo, .svrVer = pTscObj->sVer, - .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)}; + .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes), + .isStmtBind = pRequest->isStmtBind}; cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog); diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index e8b76d34c2..38a16d8fbd 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -72,6 +72,7 @@ static int32_t stmtCreateRequest(STscStmt* pStmt) { } if (TSDB_CODE_SUCCESS == code) { pStmt->exec.pRequest->syncQuery = true; + pStmt->exec.pRequest->isStmtBind = true; } } diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index b053cd95a0..7af376f21c 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -30,7 +30,7 @@ typedef struct SInsertParseContext { bool forceUpdate; bool needTableTagVal; bool needRequest; // whether or not request server - bool isStmtBind; // whether is stmt + bool isStmtBind; // whether is stmt bind } SInsertParseContext; typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param); From 67217a9bed837cc1dcd05ab35e03a3823523845a Mon Sep 17 00:00:00 2001 From: xjzhou Date: Tue, 2 Jul 2024 18:01:54 +0800 Subject: [PATCH 03/20] Return an error early when an error has already occurred in stmt --- source/client/src/clientStmt.c | 50 ++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 38a16d8fbd..21d2cbf447 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -831,6 +831,7 @@ TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions) { pStmt->bInfo.needParse = true; pStmt->sql.status = STMT_INIT; pStmt->reqid = reqid; + pStmt->errCode = TSDB_CODE_SUCCESS; if (NULL != pOptions) { memcpy(&pStmt->options, pOptions, sizeof(pStmt->options)); @@ -883,6 +884,10 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { STMT_DLOG_E("start to prepare"); + if (pStmt->errCode != TSDB_CODE_SUCCESS) { + return pStmt->errCode; + } + if (pStmt->sql.status >= STMT_PREPARE) { STMT_ERR_RET(stmtResetStmt(pStmt)); } @@ -954,6 +959,10 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) { STMT_DLOG("start to set tbName: %s", tbName); + if (pStmt->errCode != TSDB_CODE_SUCCESS) { + return pStmt->errCode; + } + STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME)); int32_t insert = 0; @@ -1000,6 +1009,10 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) { STMT_DLOG_E("start to set tbTags"); + if (pStmt->errCode != TSDB_CODE_SUCCESS) { + return pStmt->errCode; + } + STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS)); if (pStmt->bInfo.inExecCache) { @@ -1022,6 +1035,10 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) { } int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) { + if (pStmt->errCode != TSDB_CODE_SUCCESS) { + return pStmt->errCode; + } + if (STMT_TYPE_QUERY == pStmt->sql.type) { tscError("invalid operation to get query tag fileds"); STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); @@ -1040,6 +1057,10 @@ int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields } int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) { + if (pStmt->errCode != TSDB_CODE_SUCCESS) { + return pStmt->errCode; + } + if (STMT_TYPE_QUERY == pStmt->sql.type) { tscError("invalid operation to get query column fileds"); STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); @@ -1151,8 +1172,13 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) { STMT_DLOG("start to bind stmt data, colIdx: %d", colIdx); + if (pStmt->errCode != TSDB_CODE_SUCCESS) { + return pStmt->errCode; + } + STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND)); + if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->sql.type) { pStmt->bInfo.needParse = false; @@ -1308,6 +1334,10 @@ int stmtAddBatch(TAOS_STMT* stmt) { STMT_DLOG_E("start to add batch"); + if (pStmt->errCode != TSDB_CODE_SUCCESS) { + return pStmt->errCode; + } + STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH)); if (pStmt->sql.stbInterlaceMode) { @@ -1472,6 +1502,10 @@ int stmtExec(TAOS_STMT* stmt) { STMT_DLOG_E("start to exec"); + if (pStmt->errCode != TSDB_CODE_SUCCESS) { + return pStmt->errCode; + } + STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); if (STMT_TYPE_QUERY == pStmt->sql.type) { @@ -1600,6 +1634,10 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) { STMT_DLOG_E("start to get tag fields"); + if (pStmt->errCode != TSDB_CODE_SUCCESS) { + return pStmt->errCode; + } + if (STMT_TYPE_QUERY == pStmt->sql.type) { STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR); } @@ -1638,6 +1676,10 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) { STMT_DLOG_E("start to get col fields"); + if (pStmt->errCode != TSDB_CODE_SUCCESS) { + return pStmt->errCode; + } + if (STMT_TYPE_QUERY == pStmt->sql.type) { STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR); } @@ -1675,6 +1717,10 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) { STMT_DLOG_E("start to get param num"); + if (pStmt->errCode != TSDB_CODE_SUCCESS) { + return pStmt->errCode; + } + STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS)); if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && @@ -1707,6 +1753,10 @@ int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) { STMT_DLOG_E("start to get param"); + if (pStmt->errCode != TSDB_CODE_SUCCESS) { + return pStmt->errCode; + } + if (STMT_TYPE_QUERY == pStmt->sql.type) { STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR); } From 64e7c4c84266ac4ef74353eff09863cb66767242 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 4 Jul 2024 14:56:56 +0800 Subject: [PATCH 04/20] fix:[TS-4921] send data to queue error if monitor thread starts later or failed --- include/libs/monitor/clientMonitor.h | 2 +- source/client/src/clientEnv.c | 6 ++- source/client/src/clientMonitor.c | 64 ++++++++++++++++------------ source/client/src/clientMsgHandler.c | 13 +++--- 4 files changed, 48 insertions(+), 37 deletions(-) diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h index 4c7ab6f65a..1e6db8c00a 100644 --- a/include/libs/monitor/clientMonitor.h +++ b/include/libs/monitor/clientMonitor.h @@ -65,7 +65,7 @@ typedef struct { } MonitorSlowLogData; void monitorClose(); -void monitorInit(); +int32_t monitorInit(); void monitorClientSQLReqInit(int64_t clusterKey); void monitorClientSlowQueryInit(int64_t clusterId); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 3a821768f8..b227a6bd96 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -867,7 +867,10 @@ void taos_init_imp(void) { tscError("failed to init conv"); return; } - + if (monitorInit() != 0){ + tscError("failed to init monitor"); + return; + } rpcInit(); SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; @@ -891,7 +894,6 @@ void taos_init_imp(void) { taosThreadMutexInit(&appInfo.mutex, NULL); tscCrashReportInit(); - monitorInit(); tscDebug("client is initialized successfully"); } diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 479ea76fe3..c3345bf58d 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -18,6 +18,7 @@ int32_t quitCnt = 0; tsem2_t monitorSem; STaosQueue* monitorQueue; SHashObj* monitorSlowLogHash; +char tmpSlowLogPath[PATH_MAX] = {0}; static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size){ if (tsTempDir == NULL) { @@ -690,28 +691,6 @@ static void* monitorThreadFunc(void *param){ } #endif - char tmpPath[PATH_MAX] = {0}; - if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0){ - return NULL; - } - - if (taosMulModeMkDir(tmpPath, 0777, true) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - printf("failed to create dir:%s since %s", tmpPath, terrstr()); - return NULL; - } - - if (tsem2_init(&monitorSem, 0, 0) != 0) { - tscError("sem init error since %s", terrstr()); - return NULL; - } - - monitorQueue = taosOpenQueue(); - if(monitorQueue == NULL){ - tscError("open queue error since %s", terrstr()); - return NULL; - } - if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) { return NULL; } @@ -747,7 +726,7 @@ static void* monitorThreadFunc(void *param){ monitorSendAllSlowLogFromTempDir(slowLogData->clusterId); } } else if(slowLogData->type == SLOW_LOG_WRITE){ - monitorWriteSlowLog2File(slowLogData, tmpPath); + monitorWriteSlowLog2File(slowLogData, tmpSlowLogPath); } else if(slowLogData->type == SLOW_LOG_READ_RUNNING){ monitorSendSlowLogAtRunning(slowLogData->clusterId); } else if(slowLogData->type == SLOW_LOG_READ_QUIT){ @@ -799,27 +778,59 @@ static void tscMonitorStop() { } } -void monitorInit() { +int32_t monitorInit() { tscInfo("[monitor] tscMonitor init"); monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (monitorCounterHash == NULL) { tscError("failed to create monitorCounterHash"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient); monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (monitorSlowLogHash == NULL) { tscError("failed to create monitorSlowLogHash"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient); monitorTimer = taosTmrInit(0, 0, 0, "MONITOR"); if (monitorTimer == NULL) { tscError("failed to create monitor timer"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath)) < 0){ + terrno = TSDB_CODE_TSC_INTERNAL_ERROR; + return -1; + } + + if (taosMulModeMkDir(tmpSlowLogPath, 0777, true) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr()); + return -1; + } + + if (tsem2_init(&monitorSem, 0, 0) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + tscError("sem init error since %s", terrstr()); + return -1; + } + + monitorQueue = taosOpenQueue(); + if(monitorQueue == NULL){ + tscError("open queue error since %s", terrstr()); + return -1; } taosInitRWLatch(&monitorLock); - tscMonitortInit(); + if (tscMonitortInit() != 0){ + return -1; + } + return 0; } void monitorClose() { @@ -845,9 +856,6 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){ } *slowLogData = data; tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%d", slowLogData->clusterId, slowLogData->type); - while (atomic_load_32(&slowLogFlag) == -1) { - taosMsleep(5); - } if (taosWriteQitem(monitorQueue, slowLogData) == 0){ tsem2_post(&monitorSem); }else{ diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index e5baa7137e..d587deffc5 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -154,13 +154,14 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { if(taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL){ if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){ tscError("failed to put appInfo into appInfo.pInstMapByClusterId"); + }else{ + MonitorSlowLogData data = {0}; + data.clusterId = pTscObj->pAppInfo->clusterId; + data.type = SLOW_LOG_READ_BEGINNIG; + monitorPutData2MonitorQueue(data); + monitorClientSlowQueryInit(connectRsp.clusterId); + monitorClientSQLReqInit(connectRsp.clusterId); } - MonitorSlowLogData data = {0}; - data.clusterId = pTscObj->pAppInfo->clusterId; - data.type = SLOW_LOG_READ_BEGINNIG; - monitorPutData2MonitorQueue(data); - monitorClientSlowQueryInit(connectRsp.clusterId); - monitorClientSQLReqInit(connectRsp.clusterId); } taosThreadMutexLock(&clientHbMgr.lock); From 3151d0663c87d9e5df89d0b9da9e4b5e3e704fcc Mon Sep 17 00:00:00 2001 From: xjzhou Date: Thu, 4 Jul 2024 16:45:10 +0800 Subject: [PATCH 05/20] update --- source/libs/parser/src/parInsertSql.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index cd7288f1f5..d4b9f20f51 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -2938,7 +2938,7 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal .usingDuplicateTable = false, .needRequest = true, .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false), - .isStmtBind = false}; + .isStmtBind = pCxt->isStmtBind}; int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery); if (TSDB_CODE_SUCCESS == code) { From 79f1e90743e883b8c7b7c034ba139e895fa2e88b Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 4 Jul 2024 17:11:53 +0800 Subject: [PATCH 06/20] fix: oom in rpc queue --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 3 ++- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 4 +++- source/util/src/tqueue.c | 2 ++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 45d1486912..8c1b33cb14 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -287,7 +287,8 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { return -1; } - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); + EQItype itype = APPLY_QUEUE == qtype ? DEF_QITEM : RPC_QITEM; + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen); if (pMsg == NULL) { rpcFreeCont(pRpc->pCont); pRpc->pCont = NULL; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 74bf1f964c..bc269a6410 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -208,7 +208,9 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { } pRpc->info.wrapper = pWrapper; - pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); + + EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // resp msg is not limited by tsRpcQueueMemoryUsed + pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen); if (pMsg == NULL) goto _OVER; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 7a4eb09b99..aa8834c89f 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -494,6 +494,8 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo * qall->start = queue->head; qall->numOfItems = queue->numOfItems; qall->memOfItems = queue->memOfItems; + qall->unAccessedNumOfItems = queue->numOfItems; + qall->unAccessMemOfItems = queue->memOfItems; code = qall->numOfItems; qinfo->ahandle = queue->ahandle; From a9a6747ac09cead5d0104def2a7b963ee1556d95 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 4 Jul 2024 18:12:55 +0800 Subject: [PATCH 07/20] fix: oom in rpc queue --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index bc269a6410..c3dfc1a64c 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -209,7 +209,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { pRpc->info.wrapper = pWrapper; - EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // resp msg is not limited by tsRpcQueueMemoryUsed + EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg should not be restricted by tsRpcQueueMemoryUsed pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen); if (pMsg == NULL) goto _OVER; From 81577b82222013e84a485004f5b8a4846d8f9002 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 4 Jul 2024 18:19:08 +0800 Subject: [PATCH 08/20] fix: oom in rpc queue --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index c3dfc1a64c..99d641ff3f 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -209,7 +209,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { pRpc->info.wrapper = pWrapper; - EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg should not be restricted by tsRpcQueueMemoryUsed + EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg is not restricted by tsRpcQueueMemoryUsed pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen); if (pMsg == NULL) goto _OVER; From 2b9df7b45ce99cc54a8b6f43c6b4ce965199c397 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 4 Jul 2024 18:36:17 +0800 Subject: [PATCH 09/20] fix: oom in rpc queue --- source/util/src/tqueue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index aa8834c89f..45a8a462fb 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -162,7 +162,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize) { int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize); if (alloced > tsRpcQueueMemoryAllowed) { uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced, - tsRpcQueueMemoryUsed); + tsRpcQueueMemoryAllowed); atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize); taosMemoryFree(pNode); terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE; From 8087dbe16d8db8beaa76c134312d45793cc7d9f2 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 5 Jul 2024 09:41:34 +0800 Subject: [PATCH 10/20] fix[TS-4921] set flag -1 if init monitor failed --- source/client/src/clientEnv.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index b227a6bd96..ecfa1e3392 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -864,10 +864,12 @@ void taos_init_imp(void) { initQueryModuleMsgHandle(); if (taosConvInit() != 0) { + tscInitRes = -1; tscError("failed to init conv"); return; } if (monitorInit() != 0){ + tscInitRes = -1; tscError("failed to init monitor"); return; } From 20c8e3168c8e7924e7abb41693bc2cdcdc64bc6a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 5 Jul 2024 09:51:28 +0800 Subject: [PATCH 11/20] fix[TD-30895] heap use after free --- source/client/src/clientMonitor.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index c3345bf58d..032b7cdeea 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -480,7 +480,6 @@ static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, Td sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, *fileName, pTransporter, epSet); *fileName = NULL; } - tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log file:%p, data:%s", pFile, data); } } @@ -511,7 +510,6 @@ static void monitorSendSlowLogAtRunning(int64_t clusterId){ if(data != NULL){ sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep); } - tscDebug("[monitor] monitorSendSlowLogAtRunning send slow log:%s", data); } } @@ -542,7 +540,6 @@ static bool monitorSendSlowLogAtQuit(int64_t clusterId) { if(data != NULL){ sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep); } - tscInfo("[monitor] monitorSendSlowLogAtQuit send slow log:%s", data); } return false; } @@ -568,7 +565,6 @@ static void monitorSendAllSlowLogAtQuit(){ if(data != NULL && sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep) == 0){ quitCnt ++; } - tscInfo("[monitor] monitorSendAllSlowLogAtQuit send slow log :%s", data); } } } @@ -619,7 +615,6 @@ static void monitorSendAllSlowLog(){ if(data != NULL){ sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep); } - tscDebug("[monitor] monitorSendAllSlowLog send slow log :%s", data); } } } From 6c5acdfc4bb5c247fb5c85347e356b3cd9b43387 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 5 Jul 2024 14:03:12 +0800 Subject: [PATCH 12/20] fix:[TS-4921]refactor code --- include/libs/monitor/clientMonitor.h | 7 +++ source/client/src/clientMonitor.c | 80 ++++++++++++---------------- 2 files changed, 40 insertions(+), 47 deletions(-) diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h index 1e6db8c00a..3bb325921e 100644 --- a/include/libs/monitor/clientMonitor.h +++ b/include/libs/monitor/clientMonitor.h @@ -38,6 +38,13 @@ typedef enum { SLOW_LOG_READ_QUIT = 3, } SLOW_LOG_QUEUE_TYPE; +char* queueTypeStr[] = { + "SLOW_LOG_WRITE", + "SLOW_LOG_READ_RUNNING", + "SLOW_LOG_READ_BEGINNIG", + "SLOW_LOG_READ_QUIT" +}; + #define SLOW_LOG_SEND_SIZE_MAX 1024*1024 typedef struct { diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 032b7cdeea..d1a9897caa 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -454,6 +454,10 @@ static int64_t getFileSize(char* path){ } static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter, SEpSet *epSet){ + if (data == NULL){ + taosMemoryFree(fileName); + return -1; + } MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData)); if(pParam == NULL){ taosMemoryFree(data); @@ -469,17 +473,26 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64 return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam); } -static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset, void* pTransporter, SEpSet *epSet){ +static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offset, int64_t size, SLOW_LOG_QUEUE_TYPE type, char* fileName){ + SAppInstInfo* pInst = getAppInstByClusterId(clusterId); + if(pInst == NULL){ + tscError("failed to get app instance by clusterId:%" PRId64, clusterId); + return -1; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + char* data = readFile(pFile, offset, size); + return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName, pInst->pTransporter, &ep); +} + +static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset){ int64_t size = getFileSize(*fileName); if(size <= offset){ processFileInTheEnd(pFile, *fileName); tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName); }else{ - char* data = readFile(pFile, &offset, size); - if(data != NULL){ - sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, *fileName, pTransporter, epSet); - *fileName = NULL; - } + int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName); + tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log clusterId:%"PRId64",ret:%d", clusterId, code); + *fileName = NULL; } } @@ -500,16 +513,8 @@ static void monitorSendSlowLogAtRunning(int64_t clusterId){ tscDebug("[monitor] monitorSendSlowLogAtRunning truncate file to 0 file:%p", pClient->pFile); pClient->offset = 0; }else{ - SAppInstInfo* pInst = getAppInstByClusterId(clusterId); - if(pInst == NULL){ - tscError("failed to get app instance by clusterId:%" PRId64, clusterId); - return; - } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - char* data = readFile(pClient->pFile, &pClient->offset, size); - if(data != NULL){ - sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep); - } + int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL); + tscDebug("[monitor] monitorSendSlowLogAtRunning send slow log clusterId:%"PRId64",ret:%d", clusterId, code); } } @@ -531,15 +536,8 @@ static bool monitorSendSlowLogAtQuit(int64_t clusterId) { return true; } }else{ - SAppInstInfo* pInst = getAppInstByClusterId(clusterId); - if(pInst == NULL) { - return true; - } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - char* data = readFile(pClient->pFile, &pClient->offset, size); - if(data != NULL){ - sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep); - } + int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL); + tscDebug("[monitor] monitorSendSlowLogAtQuit send slow log clusterId:%"PRId64",ret:%d", clusterId, code); } return false; } @@ -556,13 +554,9 @@ static void monitorSendAllSlowLogAtQuit(){ pClient->pFile = NULL; }else if(pClient->offset == 0){ int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); - SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); - if(pInst == NULL) { - continue; - } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - char* data = readFile(pClient->pFile, &pClient->offset, size); - if(data != NULL && sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep) == 0){ + int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL); + tscDebug("[monitor] monitorSendAllSlowLogAtQuit send slow log clusterId:%"PRId64",ret:%d", *clusterId, code); + if (code == 0){ quitCnt ++; } } @@ -610,11 +604,8 @@ static void monitorSendAllSlowLog(){ } continue; } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - char* data = readFile(pClient->pFile, &pClient->offset, size); - if(data != NULL){ - sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep); - } + int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL); + tscDebug("[monitor] monitorSendAllSlowLog send slow log clusterId:%"PRId64",ret:%d", *clusterId, code); } } } @@ -627,7 +618,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ return; } char namePrefix[PATH_MAX] = {0}; - if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) { + if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, clusterId) < 0) { tscError("failed to generate slow log file name prefix"); return; } @@ -652,7 +643,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || strstr(name, namePrefix) == NULL) { - tscInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId); + tscInfo("skip file:%s, for cluster id:%"PRIx64, name, clusterId); continue; } @@ -668,9 +659,8 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ taosCloseFile(&pFile); continue; } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); char *tmp = taosStrdup(filename); - monitorSendSlowLogAtBeginning(pInst->clusterId, &tmp, pFile, 0, pInst->pTransporter, &ep); + monitorSendSlowLogAtBeginning(clusterId, &tmp, pFile, 0); taosMemoryFree(tmp); } @@ -712,11 +702,7 @@ static void* monitorThreadFunc(void *param){ if (slowLogData != NULL) { if (slowLogData->type == SLOW_LOG_READ_BEGINNIG){ if(slowLogData->pFile != NULL){ - SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId); - if(pInst != NULL) { - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset, pInst->pTransporter, &ep); - } + monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset); }else{ monitorSendAllSlowLogFromTempDir(slowLogData->clusterId); } @@ -850,7 +836,7 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){ return -1; } *slowLogData = data; - tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%d", slowLogData->clusterId, slowLogData->type); + tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%s, data:%s", slowLogData->clusterId, queueTypeStr[slowLogData->type], slowLogData->data); if (taosWriteQitem(monitorQueue, slowLogData) == 0){ tsem2_post(&monitorSem); }else{ From b8012df90906dfb3702d7df1dfcccadf9539a182 Mon Sep 17 00:00:00 2001 From: xjzhou Date: Fri, 5 Jul 2024 14:18:51 +0800 Subject: [PATCH 13/20] handle fixed table name int sql --- source/client/src/clientStmt.c | 6 ++++++ source/libs/parser/src/parInsertSql.c | 7 +++++++ 2 files changed, 13 insertions(+) diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 21d2cbf447..17b52521b8 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -1015,6 +1015,12 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) { STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS)); + SBoundColInfo *tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags; + if (tags_info->numOfBound <= 0 || tags_info->numOfCols <= 0) { + tscWarn("no tags bound in sql, will not bound tags"); + return TSDB_CODE_SUCCESS; + } + if (pStmt->bInfo.inExecCache) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index d4b9f20f51..313d9449d2 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -2444,6 +2444,13 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif pTbName->n = strlen(tbName); } + if (pCxt->isStmtBind) { + if (TK_NK_ID == pTbName->type || (tbNameAfterDbName != NULL && *(tbNameAfterDbName + 1) != '?')) { + // In SQL statements, the table name has already been specified. + parserWarn("0x%" PRIx64 " table name is specified in sql, ignore the table name in bind param", pCxt->pComCxt->requestId); + } + } + *pHasData = true; return TSDB_CODE_SUCCESS; } From 48560ddf4332fc2c51646147b3b2133e97057e29 Mon Sep 17 00:00:00 2001 From: xjzhou Date: Fri, 5 Jul 2024 14:23:54 +0800 Subject: [PATCH 14/20] recover taoscTest.cpp --- tests/taosc_test/taoscTest.cpp | 201 +++++---------------------------- 1 file changed, 26 insertions(+), 175 deletions(-) diff --git a/tests/taosc_test/taoscTest.cpp b/tests/taosc_test/taoscTest.cpp index d3f6f50547..1b051f555e 100644 --- a/tests/taosc_test/taoscTest.cpp +++ b/tests/taosc_test/taoscTest.cpp @@ -30,31 +30,31 @@ #include "taos.h" class taoscTest : public ::testing::Test { - protected: +protected: static void SetUpTestCase() { -// printf("start test setup.\n"); -// TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0); -// ASSERT_TRUE(taos != nullptr); -// -// TAOS_RES* res = taos_query(taos, "drop database IF EXISTS taosc_test_db;"); -// if (taos_errno(res) != 0) { -// printf("error in drop database taosc_test_db, reason:%s\n", taos_errstr(res)); -// return; -// } -// taosSsleep(5); -// taos_free_result(res); -// printf("drop database taosc_test_db,finished.\n"); -// -// res = taos_query(taos, "create database taosc_test_db;"); -// if (taos_errno(res) != 0) { -// printf("error in create database taosc_test_db, reason:%s\n", taos_errstr(res)); -// return; -// } -// taosSsleep(5); -// taos_free_result(res); -// printf("create database taosc_test_db,finished.\n"); -// -// taos_close(taos); + printf("start test setup.\n"); + TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_TRUE(taos != nullptr); + + TAOS_RES* res = taos_query(taos, "drop database IF EXISTS taosc_test_db;"); + if (taos_errno(res) != 0) { + printf("error in drop database taosc_test_db, reason:%s\n", taos_errstr(res)); + return; + } + taosSsleep(5); + taos_free_result(res); + printf("drop database taosc_test_db,finished.\n"); + + res = taos_query(taos, "create database taosc_test_db;"); + if (taos_errno(res) != 0) { + printf("error in create database taosc_test_db, reason:%s\n", taos_errstr(res)); + return; + } + taosSsleep(5); + taos_free_result(res); + printf("create database taosc_test_db,finished.\n"); + + taos_close(taos); } static void TearDownTestCase() {} @@ -99,154 +99,6 @@ void queryCallback(void* param, void* res, int32_t code) { taos_fetch_raw_block_a(res, fetchCallback, param); } -/** - * @brief execute sql only. - * - * @param taos - * @param sql - */ -void executeSQL(TAOS *taos, const char *sql) { - TAOS_RES *res = taos_query(taos, sql); - int code = taos_errno(res); - if (code != 0) { - printf("%s\n", taos_errstr(res)); - taos_free_result(res); - taos_close(taos); - exit(EXIT_FAILURE); - } - taos_free_result(res); -} - -/** - * @brief check return status and exit program when error occur. - * - * @param stmt - * @param code - * @param msg - */ -void checkErrorCode(TAOS_STMT *stmt, int code, const char* msg) { - if (code != 0) { - printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt)); - taos_stmt_close(stmt); - exit(EXIT_FAILURE); - } -} - -typedef struct { - int64_t ts; - float current; - int voltage; - float phase; -} Row; - - -/** - * @brief insert data using stmt API - * - * @param taos - */ -void insertData(TAOS *taos) { - // init - TAOS_STMT *stmt = taos_stmt_init(taos); - // prepare -// const char *sql = "INSERT INTO ?.d1001 USING meters TAGS(?, ?) values(?, ?, ?, ?)"; -// const char *sql = "INSERT INTO ?.? USING meters TAGS(?, ?) values(?, ?, ?, ?)"; -// const char *sql = "INSERT INTO power.? USING meters TAGS(?, ?) values(?, ?, ?, ?)"; -// const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) values(?, ?, ?, ?)"; -// const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) values(?, ?, ?, ?)"; - const char *sql = "insert into huawei USING meters TAGS(?, ?) values(?, ?, ?, ?)"; - int code = taos_stmt_prepare(stmt, sql, 0); - checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare"); - // bind table name and tags - TAOS_MULTI_BIND tags[2]; - char *location = "California.SanFrancisco"; - int groupId = 2; - tags[0].buffer_type = TSDB_DATA_TYPE_BINARY; - tags[0].buffer_length = strlen(location); - tags[0].length = (int32_t *)&tags[0].buffer_length; - tags[0].buffer = location; - tags[0].is_null = NULL; - - tags[1].buffer_type = TSDB_DATA_TYPE_INT; - tags[1].buffer_length = sizeof(int); - tags[1].length = (int32_t *)&tags[1].buffer_length; - tags[1].buffer = &groupId; - tags[1].is_null = NULL; - -// code = taos_stmt_set_tbname_tags(stmt, "duck", tags); -// checkErrorCode(stmt, code, "failed to execute taos_stmt_set_dbname_tbname_tags"); - - // insert two rows with multi binds - TAOS_MULTI_BIND params[4]; - // values to bind - int64_t ts[] = {1648432611250, 1648432611778}; - float current[] = {10.3, 12.6}; - int voltage[] = {219, 218}; - float phase[] = {0.31, 0.33}; - // is_null array - char is_null[2] = {0}; - // length array - int32_t int64Len[2] = {sizeof(int64_t)}; - int32_t floatLen[2] = {sizeof(float)}; - int32_t intLen[2] = {sizeof(int)}; - - params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; - params[0].buffer_length = sizeof(int64_t); - params[0].buffer = ts; - params[0].length = int64Len; - params[0].is_null = is_null; - params[0].num = 2; - - params[1].buffer_type = TSDB_DATA_TYPE_FLOAT; - params[1].buffer_length = sizeof(float); - params[1].buffer = current; - params[1].length = floatLen; - params[1].is_null = is_null; - params[1].num = 2; - - params[2].buffer_type = TSDB_DATA_TYPE_INT; - params[2].buffer_length = sizeof(int); - params[2].buffer = voltage; - params[2].length = intLen; - params[2].is_null = is_null; - params[2].num = 2; - - params[3].buffer_type = TSDB_DATA_TYPE_FLOAT; - params[3].buffer_length = sizeof(float); - params[3].buffer = phase; - params[3].length = floatLen; - params[3].is_null = is_null; - params[3].num = 2; - - code = taos_stmt_bind_param_batch(stmt, params); // bind batch - checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param_batch"); - code = taos_stmt_add_batch(stmt); // add batch - checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch"); - // execute - code = taos_stmt_execute(stmt); - checkErrorCode(stmt, code, "failed to execute taos_stmt_execute"); - int affectedRows = taos_stmt_affected_rows(stmt); - printf("successfully inserted %d rows\n", affectedRows); - - // close - taos_stmt_close(stmt); -} - -TEST_F(taoscTest, taos_stmt_test) { - TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 6030); - if (taos == NULL) { - printf("failed to connect to server"); - exit(EXIT_FAILURE); - } -// executeSQL(taos, "drop database if exists power"); -// executeSQL(taos, "create database power"); - executeSQL(taos, "use power"); -// executeSQL(taos, "create stable meters (ts timestamp, current float, voltage int, phase float) tags (location binary(64), groupId int)"); - insertData(taos); - taos_close(taos); - taos_cleanup(); -} - TEST_F(taoscTest, taos_query_a_test) { char sql[1024] = {0}; int32_t code = 0; @@ -336,7 +188,7 @@ TEST_F(taoscTest, taos_query_test) { void queryCallback2(void* param, void* res, int32_t code) { ASSERT_TRUE(code == 0); ASSERT_TRUE(param == pUserParam); - // After using taos_query_a to query, using taos_fetch_row in the callback will cause blocking. + // After using taos_query_a to query, using taos_fetch_row in the callback will cause blocking. // Reason: schProcessOnCbBegin SCH_LOCK_TASK(pTask) TAOS_ROW row; row = taos_fetch_row(res); @@ -402,7 +254,7 @@ TEST_F(taoscTest, taos_query_a_fetch_row) { printf("taos_query_a_fetch_row taos_fetch_row start...\n"); while ((row = taos_fetch_row(*pres))) { - getRecordCounts++; + getRecordCounts++; } printf("taos_query_a_fetch_row taos_fetch_row end. %p record count:%d.\n", *pres, getRecordCounts); taos_free_result(*pres); @@ -412,4 +264,3 @@ TEST_F(taoscTest, taos_query_a_fetch_row) { printf("taos_query_a_fetch_row test finished.\n"); } - From 3a4412b2829a20b241150d22913647137dfd981d Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 5 Jul 2024 06:24:53 +0000 Subject: [PATCH 15/20] fix/TD-30876 --- include/common/tglobal.h | 1 - include/libs/monitor/monitor.h | 1 - source/common/src/tglobal.c | 3 -- source/dnode/mgmt/mgmt_dnode/inc/dmInt.h | 1 - source/dnode/mgmt/mgmt_dnode/src/dmInt.c | 1 - source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 9 ---- source/dnode/mgmt/node_mgmt/inc/dmMgmt.h | 1 - source/dnode/mgmt/node_mgmt/src/dmEnv.c | 1 - source/dnode/mgmt/node_mgmt/src/dmMonitor.c | 10 ---- source/dnode/mgmt/node_util/inc/dmUtil.h | 1 - source/libs/monitor/src/monMain.c | 54 +++++++++------------ 11 files changed, 23 insertions(+), 60 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 96b9617fc4..3fd3cc4ca9 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -134,7 +134,6 @@ extern uint16_t tsMonitorPort; extern int32_t tsMonitorMaxLogs; extern bool tsMonitorComp; extern bool tsMonitorLogProtocol; -extern int32_t tsMonitorIntervalForBasic; extern bool tsMonitorForceV2; // audit diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index 9d7878ecf7..6007d52bb4 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -226,7 +226,6 @@ void monSetQmInfo(SMonQmInfo *pInfo); void monSetSmInfo(SMonSmInfo *pInfo); void monSetBmInfo(SMonBmInfo *pInfo); void monGenAndSendReport(); -void monGenAndSendReportBasic(); void monSendContent(char *pCont, const char* uri); void tFreeSMonMmInfo(SMonMmInfo *pInfo); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3372c7b1cc..84d4e7b7e7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -111,7 +111,6 @@ uint16_t tsMonitorPort = 6043; int32_t tsMonitorMaxLogs = 100; bool tsMonitorComp = false; bool tsMonitorLogProtocol = false; -int32_t tsMonitorIntervalForBasic = 30; bool tsMonitorForceV2 = true; // audit @@ -712,7 +711,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "monitorComp", tsMonitorComp, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "monitorLogProtocol", tsMonitorLogProtocol, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "monitorIntervalForBasic", tsMonitorIntervalForBasic, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "monitorForceV2", tsMonitorForceV2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; @@ -1165,7 +1163,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsMonitorComp = cfgGetItem(pCfg, "monitorComp")->bval; tsQueryRspPolicy = cfgGetItem(pCfg, "queryRspPolicy")->i32; tsMonitorLogProtocol = cfgGetItem(pCfg, "monitorLogProtocol")->bval; - tsMonitorIntervalForBasic = cfgGetItem(pCfg, "monitorIntervalForBasic")->i32; tsMonitorForceV2 = cfgGetItem(pCfg, "monitorForceV2")->i32; tsEnableAudit = cfgGetItem(pCfg, "audit")->bval; diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index 46f8dd06d4..be9ff56674 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -43,7 +43,6 @@ typedef struct SDnodeMgmt { GetMnodeLoadsFp getMnodeLoadsFp; GetQnodeLoadsFp getQnodeLoadsFp; int32_t statusSeq; - SendMonitorReportFp sendMonitorReportFpBasic; } SDnodeMgmt; // dmHandle.c diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index a651fbf060..b9dd45f1c0 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -65,7 +65,6 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->processDropNodeFp = pInput->processDropNodeFp; pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp; pMgmt->sendAuditRecordsFp = pInput->sendAuditRecordFp; - pMgmt->sendMonitorReportFpBasic = pInput->sendMonitorReportFpBasic; pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp; pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp; pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index c48b614f96..eafa10aa32 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -175,15 +175,6 @@ static void *dmMonitorThreadFp(void *param) { taosMemoryTrim(0); } } - - if(tsMonitorForceV2){ - if (curTime < lastTimeForBasic) lastTimeForBasic = curTime; - float intervalForBasic = (curTime - lastTimeForBasic) / 1000.0f; - if (intervalForBasic >= tsMonitorIntervalForBasic) { - (*pMgmt->sendMonitorReportFpBasic)(); - lastTimeForBasic = curTime; - } - } } return NULL; diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 90e44e5acc..bc6a4652e7 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -128,7 +128,6 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); // dmMonitor.c void dmSendMonitorReport(); void dmSendAuditRecords(); -void dmSendMonitorReportBasic(); void dmGetVnodeLoads(SMonVloadInfo *pInfo); void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo); void dmGetMnodeLoads(SMonMloadInfo *pInfo); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 54a118b666..4be1af30b5 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -394,7 +394,6 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { .processDropNodeFp = dmProcessDropNodeReq, .sendMonitorReportFp = dmSendMonitorReport, .sendAuditRecordFp = auditSendRecordsInBatch, - .sendMonitorReportFpBasic = dmSendMonitorReportBasic, .getVnodeLoadsFp = dmGetVnodeLoads, .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite, .getMnodeLoadsFp = dmGetMnodeLoads, diff --git a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c index 21e25f5535..d3197282b6 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c @@ -123,16 +123,6 @@ void dmSendMonitorReport() { monGenAndSendReport(); } -void dmSendMonitorReportBasic() { - if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return; - dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort); - - SDnode *pDnode = dmInstance(); - dmGetDmMonitorInfoBasic(pDnode); - dmGetMmMonitorInfo(pDnode); - monGenAndSendReportBasic(); -} - //Todo: put this in seperate file in the future void dmSendAuditRecords() { auditSendRecordsInBatch(); diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index aea3286d76..d316a82af2 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -155,7 +155,6 @@ typedef struct { ProcessDropNodeFp processDropNodeFp; SendMonitorReportFp sendMonitorReportFp; SendAuditRecordsFp sendAuditRecordFp; - SendMonitorReportFp sendMonitorReportFpBasic; GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsLiteFp; GetMnodeLoadsFp getMnodeLoadsFp; diff --git a/source/libs/monitor/src/monMain.c b/source/libs/monitor/src/monMain.c index 21c196872c..3389780916 100644 --- a/source/libs/monitor/src/monMain.c +++ b/source/libs/monitor/src/monMain.c @@ -568,6 +568,25 @@ void monSendReport(SMonInfo *pMonitor){ } } +void monSendReportBasic(SMonInfo *pMonitor) { + char *pCont = tjsonToString(pMonitor->pJson); + if (tsMonitorLogProtocol) { + if (pCont != NULL) { + uInfoL("report cont basic:\n%s", pCont); + } else { + uInfo("report cont basic is null"); + } + } + if (pCont != NULL) { + EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT; + if (taosSendHttpReport(tsMonitor.cfg.server, tsMonFwBasicUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != + 0) { + uError("failed to send monitor msg"); + } + taosMemoryFree(pCont); + } +} + void monGenAndSendReport() { SMonInfo *pMonitor = monCreateMonitorInfo(); if (pMonitor == NULL) return; @@ -595,38 +614,11 @@ void monGenAndSendReport() { monGenVnodeRoleTable(pMonitor); monSendPromReport(); - } - - monCleanupMonitorInfo(pMonitor); -} - -void monSendReportBasic(SMonInfo *pMonitor){ - char *pCont = tjsonToString(pMonitor->pJson); - if(tsMonitorLogProtocol){ - if(pCont != NULL){ - uInfoL("report cont basic:\n%s", pCont); + if (pMonitor->mmInfo.cluster.first_ep_dnode_id != 0) { + monGenBasicJsonBasic(pMonitor); + monGenClusterJsonBasic(pMonitor); + monSendReportBasic(pMonitor); } - else{ - uInfo("report cont basic is null"); - } - } - if (pCont != NULL) { - EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT; - if (taosSendHttpReport(tsMonitor.cfg.server, tsMonFwBasicUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) { - uError("failed to send monitor msg"); - } - taosMemoryFree(pCont); - } -} - -void monGenAndSendReportBasic() { - SMonInfo *pMonitor = monCreateMonitorInfo(); - - monGenBasicJsonBasic(pMonitor); - monGenClusterJsonBasic(pMonitor); - - if (pMonitor->mmInfo.cluster.first_ep_dnode_id != 0) { - monSendReportBasic(pMonitor); } monCleanupMonitorInfo(pMonitor); From 587df3b86debf418c5bbd8a5a389a94bc5b2fccb Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 4 Jul 2024 10:00:50 +0000 Subject: [PATCH 16/20] fix/TD-30877 --- source/os/src/osSysinfo.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 50eb8413c0..911c89d4b3 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -29,6 +29,10 @@ typedef struct { uint64_t nice; uint64_t system; uint64_t idle; + uint64_t wa; + uint64_t hi; + uint64_t si; + uint64_t st; } SysCpuInfo; typedef struct { @@ -173,8 +177,9 @@ static int32_t taosGetSysCpuInfo(SysCpuInfo *cpuInfo) { } char cpu[10] = {0}; - sscanf(line, "%s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64, cpu, &cpuInfo->user, &cpuInfo->nice, &cpuInfo->system, - &cpuInfo->idle); + sscanf(line, "%s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64, cpu, + &cpuInfo->user, &cpuInfo->nice, &cpuInfo->system, &cpuInfo->idle, &cpuInfo->wa, &cpuInfo->hi, &cpuInfo->si, + &cpuInfo->st); taosCloseFile(&pFile); #endif @@ -576,7 +581,7 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) { SysCpuInfo sysCpu = {0}; ProcCpuInfo procCpu = {0}; if (taosGetSysCpuInfo(&sysCpu) == 0 && taosGetProcCpuInfo(&procCpu) == 0) { - curSysUsed = sysCpu.user + sysCpu.nice + sysCpu.system; + curSysUsed = sysCpu.user + sysCpu.nice + sysCpu.system + sysCpu.wa + sysCpu.hi + sysCpu.si + sysCpu.st; curSysTotal = curSysUsed + sysCpu.idle; curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime; From 89acd1693c6604e2324ed7c0487d29e5f9a501fa Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 5 Jul 2024 00:34:34 +0000 Subject: [PATCH 17/20] fix/TD-30877 --- source/os/src/osSysinfo.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 911c89d4b3..9bd60be2b6 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -33,6 +33,8 @@ typedef struct { uint64_t hi; uint64_t si; uint64_t st; + uint64_t guest; + uint64_t guest_nice; } SysCpuInfo; typedef struct { @@ -177,9 +179,11 @@ static int32_t taosGetSysCpuInfo(SysCpuInfo *cpuInfo) { } char cpu[10] = {0}; - sscanf(line, "%s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64, cpu, - &cpuInfo->user, &cpuInfo->nice, &cpuInfo->system, &cpuInfo->idle, &cpuInfo->wa, &cpuInfo->hi, &cpuInfo->si, - &cpuInfo->st); + sscanf(line, + "%s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 + " %" PRIu64, + cpu, &cpuInfo->user, &cpuInfo->nice, &cpuInfo->system, &cpuInfo->idle, &cpuInfo->wa, &cpuInfo->hi, + &cpuInfo->si, &cpuInfo->st, &cpuInfo->guest, &cpuInfo->guest_nice); taosCloseFile(&pFile); #endif @@ -581,7 +585,8 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) { SysCpuInfo sysCpu = {0}; ProcCpuInfo procCpu = {0}; if (taosGetSysCpuInfo(&sysCpu) == 0 && taosGetProcCpuInfo(&procCpu) == 0) { - curSysUsed = sysCpu.user + sysCpu.nice + sysCpu.system + sysCpu.wa + sysCpu.hi + sysCpu.si + sysCpu.st; + curSysUsed = sysCpu.user + sysCpu.nice + sysCpu.system + sysCpu.wa + sysCpu.hi + sysCpu.si + sysCpu.st + + sysCpu.guest + sysCpu.guest_nice; curSysTotal = curSysUsed + sysCpu.idle; curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime; From 9d42b31d4a017b9737da7a342e5b988765059a62 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 5 Jul 2024 16:12:13 +0800 Subject: [PATCH 18/20] fix:[TS-4921]refactor code --- include/libs/monitor/clientMonitor.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h index 3bb325921e..0085173ecd 100644 --- a/include/libs/monitor/clientMonitor.h +++ b/include/libs/monitor/clientMonitor.h @@ -38,7 +38,7 @@ typedef enum { SLOW_LOG_READ_QUIT = 3, } SLOW_LOG_QUEUE_TYPE; -char* queueTypeStr[] = { +static char* queueTypeStr[] = { "SLOW_LOG_WRITE", "SLOW_LOG_READ_RUNNING", "SLOW_LOG_READ_BEGINNIG", From c9153b8176f31c55568518d1f6221b72f098c182 Mon Sep 17 00:00:00 2001 From: xjzhou Date: Fri, 5 Jul 2024 16:27:40 +0800 Subject: [PATCH 19/20] update CI test case stmt_error --- tests/system-test/1-insert/stmt_error.py | 70 +++++++++++++++++++++--- 1 file changed, 62 insertions(+), 8 deletions(-) diff --git a/tests/system-test/1-insert/stmt_error.py b/tests/system-test/1-insert/stmt_error.py index c6d747c317..0bfbedb9a1 100644 --- a/tests/system-test/1-insert/stmt_error.py +++ b/tests/system-test/1-insert/stmt_error.py @@ -24,7 +24,7 @@ class TDTestCase: case1 : [TD-11899] : this is an test case for check stmt error use . ''' return - + def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__) @@ -49,7 +49,7 @@ class TDTestCase: ff float, dd double, bb binary(65059), nn nchar(100), tt timestamp)", ) conn.load_table_info("log") - + stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") params = new_bind_params(16) @@ -123,7 +123,7 @@ class TDTestCase: ff float, dd double, bb binary(100), nn nchar(100), tt timestamp , error_data int )", ) conn.load_table_info("log") - + stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,1000)") params = new_bind_params(16) @@ -195,20 +195,74 @@ class TDTestCase: except Exception as err: conn.close() raise err - + + def test_stmt_nornmal_value_error(self, conn): + # type: (TaosConnection) -> None + dbname = "pytest_taos_stmt_error" + try: + conn.execute("drop database if exists %s" % dbname) + conn.execute("create database if not exists %s" % dbname) + conn.select_db(dbname) + + conn.execute( + "create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\ + bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \ + ff float, dd double, bb binary(100), nn nchar(100), tt timestamp , error_data int )", + ) + conn.load_table_info("log") + + + stmt = conn.statement("insert into log values(NOW(),?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + params = new_bind_params(16) + params[0].timestamp(1626861392589, PrecisionEnum.Milliseconds) + params[1].bool(True) + params[2].tinyint(None) + params[3].tinyint(2) + params[4].smallint(3) + params[5].int(4) + params[6].bigint(5) + params[7].tinyint_unsigned(6) + params[8].smallint_unsigned(7) + params[9].int_unsigned(8) + params[10].bigint_unsigned(9) + params[11].float(10.1) + params[12].double(10.11) + params[13].binary("hello") + params[14].nchar("stmt") + params[15].timestamp(1626861392589, PrecisionEnum.Milliseconds) + + stmt.bind_param(params) + stmt.execute() + + conn.close() + + except Exception as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + raise err + def run(self): - + self.test_stmt_insert(self.conn()) try: self.test_stmt_insert_error(self.conn()) except Exception as error : - - if str(error)=='[0x0200]: no mix usage for ? and values': + + if str(error)=='[0x0200]: stmt bind param does not support normal value in sql': tdLog.info('=========stmt error occured for bind part column ==============') else: tdLog.exit("expect error(%s) not occured" % str(error)) - try: + try: + self.test_stmt_nornmal_value_error(self.conn()) + except Exception as error : + + if str(error)=='[0x0200]: stmt bind param does not support normal value in sql': + tdLog.info('=========stmt error occured for bind part column ==============') + else: + tdLog.exit("expect error(%s) not occured" % str(error)) + + try: self.test_stmt_insert_error_null_timestamp(self.conn()) tdLog.exit("expect error not occured - 1") except Exception as error : From 514108c3bfa5f85f9ea94bf7e45d7455bf7df0cc Mon Sep 17 00:00:00 2001 From: zhiyong Date: Sun, 7 Jul 2024 16:33:20 +0800 Subject: [PATCH 20/20] test: move TS_5105 to queryBugs --- tests/army/query/queryBugs.py | 32 +++++++++++++-- tests/army/query/query_last_row_repeatly.py | 44 --------------------- tests/parallel_test/cases.task | 1 - 3 files changed, 28 insertions(+), 49 deletions(-) delete mode 100644 tests/army/query/query_last_row_repeatly.py diff --git a/tests/army/query/queryBugs.py b/tests/army/query/queryBugs.py index ca28ff549c..7583382290 100644 --- a/tests/army/query/queryBugs.py +++ b/tests/army/query/queryBugs.py @@ -28,8 +28,7 @@ from frame import * class TDTestCase(TBase): - - # fix + # fix def FIX_TD_30686(self): tdLog.info("check bug TD_30686 ...\n") sqls = [ @@ -49,6 +48,32 @@ class TDTestCase(TBase): ] tdSql.checkDataMem(sql, results) + def FIX_TS_5105(self): + tdLog.info("check bug TS_5105 ...\n") + ts1 = "2024-07-03 10:00:00.000" + ts2 = "2024-07-03 13:00:00.000" + sqls = [ + "drop database if exists ts_5105", + "create database ts_5105 cachemodel 'both';", + "use ts_5105;", + "CREATE STABLE meters (ts timestamp, current float) TAGS (location binary(64), groupId int);", + "CREATE TABLE d1001 USING meters TAGS ('California.B', 2);", + "CREATE TABLE d1002 USING meters TAGS ('California.S', 3);", + f"INSERT INTO d1001 VALUES ('{ts1}', 10);", + f"INSERT INTO d1002 VALUES ('{ts2}', 13);", + ] + tdSql.executes(sqls) + + sql = "select last(ts), last_row(ts) from meters;" + + # 执行多次,有些时候last_row(ts)会返回错误的值,详见TS-5105 + for i in range(1, 10): + tdLog.debug(f"{i}th execute sql: {sql}") + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, ts2) + tdSql.checkData(0, 1, ts2) + # run def run(self): tdLog.debug(f"start to excute {__file__}") @@ -57,11 +82,10 @@ class TDTestCase(TBase): self.FIX_TD_30686() # TS BUGS - + self.FIX_TS_5105() tdLog.success(f"{__file__} successfully executed") - tdCases.addLinux(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/army/query/query_last_row_repeatly.py b/tests/army/query/query_last_row_repeatly.py deleted file mode 100644 index 3cca032176..0000000000 --- a/tests/army/query/query_last_row_repeatly.py +++ /dev/null @@ -1,44 +0,0 @@ -# -*- coding: utf-8 -*- - -from frame.log import * -from frame.cases import * -from frame.sql import * -from frame.caseBase import * -from frame import * - - -class TDTestCase(TBase): - def init(self, conn, logSql, replicaVar=1): - self.replicaVar = int(replicaVar) - tdLog.debug("start to execute %s" % __file__) - tdSql.init(conn.cursor(), logSql) - - def run(self): - sqls = [ - "drop database if exists ts_5101", - "create database ts_5101 cachemodel 'both';", - "use ts_5101;", - "CREATE STABLE meters (ts timestamp, current float) TAGS (location binary(64), groupId int);", - "CREATE TABLE d1001 USING meters TAGS ('California.B', 2);", - "CREATE TABLE d1002 USING meters TAGS ('California.S', 3);", - "INSERT INTO d1001 VALUES ('2024-07-03 10:00:00.000', 10);", - "INSERT INTO d1002 VALUES ('2024-07-03 13:00:00.000', 13);", - ] - tdSql.executes(sqls) - - # 执行多次,有些时候last_row(ts)会返回错误的值,详见TS-5105 - for i in range(1, 10): - sql = "select last(ts), last_row(ts) from meters;" - tdLog.debug(f"{i}th execute sql: {sql}") - tdSql.query(sql) - tdSql.checkRows(1) - tdSql.checkData(0, 0, "2024-07-03 13:00:00.000") - tdSql.checkData(0, 1, "2024-07-03 13:00:00.000") - - def stop(self): - tdSql.close() - tdLog.success("%s successfully executed" % __file__) - - -tdCases.addWindows(__file__, TDTestCase()) -tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index aff5bedaf8..5667255f9f 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -20,7 +20,6 @@ ,,y,army,./pytest.sh python3 ./test.py -f insert/test_column_tag_boundary.py ,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_desc.py -N 3 -L 3 -D 2 ,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_null.py -,,y,army,./pytest.sh python3 ./test.py -f query/query_last_row_repeatly.py ,,y,army,./pytest.sh python3 ./test.py -f cluster/incSnapshot.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f query/query_basic.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f query/accuracy/test_query_accuracy.py