diff --git a/importSampleData/bin/taosimport b/importSampleData/bin/taosimport deleted file mode 100755 index 235fde9f06..0000000000 Binary files a/importSampleData/bin/taosimport and /dev/null differ diff --git a/src/client/inc/tscLog.h b/src/client/inc/tscLog.h index 94adcfe17a..9d01edae36 100644 --- a/src/client/inc/tscLog.h +++ b/src/client/inc/tscLog.h @@ -27,11 +27,11 @@ extern int32_t tscEmbedded; #define tscFatal(...) { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} #define tscError(...) { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} -#define tscWarn(...) { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} -#define tscInfo(...) { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC INFO ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} -#define tscDebug(...) { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC DEBUG ", cDebugFlag, __VA_ARGS__); }} -#define tscTrace(...) { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC TRACE ", cDebugFlag, __VA_ARGS__); }} -#define tscDebugL(...){ if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC DEBUG ", cDebugFlag, __VA_ARGS__); }} +#define tscWarn(...) { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} +#define tscInfo(...) { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} +#define tscDebug(...) { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} +#define tscTrace(...) { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} +#define tscDebugL(...){ if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", cDebugFlag, __VA_ARGS__); }} #ifdef __cplusplus } diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c index eb9b1cb479..549a0e8d0d 100644 --- a/src/client/src/TSDBJNIConnector.c +++ b/src/client/src/TSDBJNIConnector.c @@ -24,10 +24,10 @@ #define jniFatal(...) { if (jniDebugFlag & DEBUG_FATAL) { taosPrintLog("JNI FATAL ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); }} #define jniError(...) { if (jniDebugFlag & DEBUG_ERROR) { taosPrintLog("JNI ERROR ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); }} -#define jniWarn(...) { if (jniDebugFlag & DEBUG_WARN) { taosPrintLog("JNI WARN ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); }} -#define jniInfo(...) { if (jniDebugFlag & DEBUG_INFO) { taosPrintLog("JNI INFO ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); }} -#define jniDebug(...) { if (jniDebugFlag & DEBUG_DEBUG) { taosPrintLog("JNI DEBUG ", jniDebugFlag, __VA_ARGS__); }} -#define jniTrace(...) { if (jniDebugFlag & DEBUG_TRACE) { taosPrintLog("JNI TRACE ", jniDebugFlag, __VA_ARGS__); }} +#define jniWarn(...) { if (jniDebugFlag & DEBUG_WARN) { taosPrintLog("JNI WARN ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); }} +#define jniInfo(...) { if (jniDebugFlag & DEBUG_INFO) { taosPrintLog("JNI ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); }} +#define jniDebug(...) { if (jniDebugFlag & DEBUG_DEBUG) { taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); }} +#define jniTrace(...) { if (jniDebugFlag & DEBUG_TRACE) { taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); }} int __init = 0; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index cae67673e9..8c194c031d 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -478,6 +478,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { tscDebug("%p redo parse sql string to build submit block", pSql); pCmd->parseFinished = false; + tscResetSqlCmdObj(pCmd); + code = tsParseSql(pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { @@ -492,7 +494,23 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { */ pSql->fp = pSql->fetchFp; // restore the fp tscHandleInsertRetry(pSql); - } else {// in case of other query type, continue + } else if (pCmd->command == TSDB_SQL_SELECT) { // in case of other query type, continue + tscDebug("%p redo parse sql string and proceed", pSql); + //tscDebug("before %p fp:%p, fetchFp:%p", pSql, pSql->fp, pSql->fetchFp); + pCmd->parseFinished = false; + tscResetSqlCmdObj(pCmd); + + //tscDebug("after %p fp:%p, fetchFp:%p", pSql, pSql->fp, pSql->fetchFp); + code = tsParseSql(pSql, true); + + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + return; + } else if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + tscProcessSql(pSql); + } else { // in all other cases, simple retry tscProcessSql(pSql); } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 564d5ae23f..d8af6d5c87 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -246,43 +246,52 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; } else { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - if (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID || - rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { - if (pCmd->command == TSDB_SQL_CONNECT) { - rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - rpcFreeCont(rpcMsg->pCont); - return; - } else if (pCmd->command == TSDB_SQL_HB) { - rpcMsg->code = TSDB_CODE_RPC_NOT_READY; - rpcFreeCont(rpcMsg->pCont); - return; - } else if (pCmd->command == TSDB_SQL_META) { - // get table meta query will not retry, do nothing + // if (rpcMsg->code != TSDB_CODE_RPC_NETWORK_UNAVAIL) { + // if (pCmd->command == TSDB_SQL_CONNECT) { + // rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + // rpcFreeCont(rpcMsg->pCont); + // return; + // } + + // if (pCmd->command == TSDB_SQL_HB) { + // rpcMsg->code = TSDB_CODE_RPC_NOT_READY; + // rpcFreeCont(rpcMsg->pCont); + // return; + // } + + // if (pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_DESCRIBE_TABLE || + // pCmd->command == TSDB_SQL_STABLEVGROUP || pCmd->command == TSDB_SQL_SHOW || + // pCmd->command == TSDB_SQL_RETRIEVE) { + // // get table meta/vgroup query will not retry, do nothing + // } + // } + + if ((pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_INSERT || + pCmd->command == TSDB_SQL_UPDATE_TAGS_VAL) && + (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID || + rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) { + tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry); + // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema + if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { + pSql->cmd.submitSchema = 1; + } + + pSql->res.code = rpcMsg->code; // keep the previous error code + if (pSql->retry > pSql->maxRetry) { + tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); } else { - tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry); + rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name); - // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema - if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { - pSql->cmd.submitSchema = 1; - } - - pSql->res.code = rpcMsg->code; // keep the previous error code - if (pSql->retry > pSql->maxRetry) { - tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); - } else { - rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name); - - // if there is an error occurring, proceed to the following error handling procedure. - // todo add test cases - if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - rpcFreeCont(rpcMsg->pCont); - return; - } + // if there is an error occurring, proceed to the following error handling procedure. + // todo add test cases + if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + rpcFreeCont(rpcMsg->pCont); + return; } } } } - + pRes->rspLen = 0; if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) { diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 8048a2389b..5d5e546943 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -181,6 +181,19 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha return NULL; } +TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t userLen, + const char *pass, uint8_t passLen, const char *db, uint8_t dbLen, uint16_t port) { + char ipBuf[TSDB_EP_LEN] = {0}; + char userBuf[TSDB_USER_LEN] = {0}; + char passBuf[TSDB_PASSWORD_LEN] = {0}; + char dbBuf[TSDB_DB_NAME_LEN] = {0}; + strncpy(ipBuf, ip, MIN(TSDB_EP_LEN - 1, ipLen)); + strncpy(userBuf, user, MIN(TSDB_USER_LEN - 1, userLen)); + strncpy(passBuf, pass, MIN(TSDB_PASSWORD_LEN - 1,passLen)); + strncpy(dbBuf, db, MIN(TSDB_DB_NAME_LEN - 1, dbLen)); + return taos_connect(ipBuf, userBuf, passBuf, dbBuf, port); +} + TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) { @@ -249,7 +262,14 @@ TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) { tsem_wait(&pSql->rspSem); return pSql; } - +TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) { + char* buf = malloc(sqlLen + 1); + buf[sqlLen] = 0; + strncpy(buf, sqlstr, sqlLen); + TAOS_RES *res = taos_query(taos, buf); + free(buf); + return res; +} int taos_result_precision(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; if (pSql == NULL || pSql->signature != pSql) return 0; diff --git a/src/common/inc/tulog.h b/src/common/inc/tulog.h index 6365b21ef9..2dc2895e63 100644 --- a/src/common/inc/tulog.h +++ b/src/common/inc/tulog.h @@ -25,15 +25,15 @@ extern "C" { extern int32_t uDebugFlag; extern int32_t tscEmbedded; -#define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} +#define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} #define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} -#define uWarn(...) { if (uDebugFlag & DEBUG_WARN) { taosPrintLog("UTL WARN ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} -#define uInfo(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLog("UTL INFO ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} -#define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL DEBUG ", uDebugFlag, __VA_ARGS__); }} -#define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL TRACE ", uDebugFlag, __VA_ARGS__); }} +#define uWarn(...) { if (uDebugFlag & DEBUG_WARN) { taosPrintLog("UTL WARN ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} +#define uInfo(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLog("UTL ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} +#define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }} +#define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }} #define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); } -#define pPrint(...) { taosPrintLog("APP INFO ", 255, __VA_ARGS__); } +#define pPrint(...) { taosPrintLog("APP ", 255, __VA_ARGS__); } #ifdef __cplusplus } diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 7e551759f9..12ea4ad78d 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -384,9 +384,11 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { } void tdResetDataCols(SDataCols *pCols) { - pCols->numOfRows = 0; - for (int i = 0; i < pCols->maxCols; i++) { - dataColReset(pCols->cols + i); + if (pCols != NULL) { + pCols->numOfRows = 0; + for (int i = 0; i < pCols->maxCols; i++) { + dataColReset(pCols->cols + i); + } } } diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index f2324e786a..f1f4db7265 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -194,7 +194,7 @@ int32_t monitorDebugFlag = 131; int32_t qDebugFlag = 131; int32_t rpcDebugFlag = 131; int32_t uDebugFlag = 131; -int32_t debugFlag = 131; +int32_t debugFlag = 0; int32_t sDebugFlag = 135; int32_t wDebugFlag = 135; int32_t tsdbDebugFlag = 131; @@ -202,7 +202,7 @@ int32_t tsdbDebugFlag = 131; static pthread_once_t tsInitGlobalCfgOnce = PTHREAD_ONCE_INIT; void taosSetAllDebugFlag() { - for (int32_t i = 0; i < tsGlobalConfigNum; ++i) { + if (debugFlag != 0) { mDebugFlag = debugFlag; sdbDebugFlag = debugFlag; dDebugFlag = debugFlag; @@ -219,8 +219,8 @@ void taosSetAllDebugFlag() { wDebugFlag = debugFlag; tsdbDebugFlag = debugFlag; qDebugFlag = debugFlag; + uInfo("all debug flag are set to %d", debugFlag); } - uInfo("all debug flag are set to %d", debugFlag); } bool taosCfgDynamicOptions(char *msg) { diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/BatchInsertTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/BatchInsertTest.java new file mode 100644 index 0000000000..cb78a5ca0e --- /dev/null +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/BatchInsertTest.java @@ -0,0 +1,123 @@ +package com.taosdata.jdbc; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.sql.*; +import java.util.Properties; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.*; + +import static org.junit.Assert.assertTrue; + +public class BatchInsertTest extends BaseTest { + + static Connection connection = null; + static Statement statement = null; + static String dbName = "test"; + static String stbName = "meters"; + static String host = "localhost"; + static int numOfTables = 30; + final static int numOfRecordsPerTable = 1000; + static long ts = 1496732686000l; + final static String tablePrefix = "t"; + + @Before + public void createDatabase() throws SQLException { + try { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + } catch (ClassNotFoundException e) { + return; + } + + Properties properties = new Properties(); + properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host); + properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); + properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8"); + properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); + connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + "?user=root&password=taosdata" + , properties); + + statement = connection.createStatement(); + statement.executeUpdate("drop database if exists " + dbName); + statement.executeUpdate("create database if not exists " + dbName); + statement.executeUpdate("use " + dbName); + + String createTableSql = "create table " + stbName + "(ts timestamp, f1 int, f2 int, f3 int) tags(areaid int, loc binary(20))"; + statement.executeUpdate(createTableSql); + + for(int i = 0; i < numOfTables; i++) { + String loc = i % 2 == 0 ? "beijing" : "shanghai"; + String createSubTalbesSql = "create table " + tablePrefix + i + " using " + stbName + " tags(" + i + ", '" + loc + "')"; + statement.executeUpdate(createSubTalbesSql); + } + + } + + @Test + public void testBatchInsert() throws SQLException{ + + ExecutorService executorService = Executors.newFixedThreadPool(numOfTables); + + for (int i = 0; i < numOfTables; i++) { + final int index = i; + executorService.execute(new Runnable() { + @Override + public void run() { + try { + long startTime = System.currentTimeMillis(); + Statement statement = connection.createStatement(); // get statement + StringBuilder sb = new StringBuilder(); + sb.append("INSERT INTO " + tablePrefix + index + " VALUES"); + Random rand = new Random(); + for (int j = 1; j <= numOfRecordsPerTable; j++) { + sb.append("(" + (ts + j) + ", "); + sb.append(rand.nextInt(100) + ", "); + sb.append(rand.nextInt(100) + ", "); + sb.append(rand.nextInt(100) + ")"); + } + statement.addBatch(sb.toString()); + statement.executeBatch(); + long endTime = System.currentTimeMillis(); + System.out.println("Thread " + index + " takes " + (endTime - startTime) + " microseconds"); + connection.commit(); + statement.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + + executorService.shutdown(); + + try { + executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + Statement statement = connection.createStatement(); + ResultSet rs = statement.executeQuery("select * from meters"); + int num = 0; + while (rs.next()) { + num++; + } + assertEquals(num, numOfTables * numOfRecordsPerTable); + rs.close(); + } + + + @After + public void close() throws Exception { + statement.close(); + connection.close(); + Thread.sleep(10); + } + +} \ No newline at end of file diff --git a/src/dnode/inc/dnodeInt.h b/src/dnode/inc/dnodeInt.h index 76f2f41673..f4cbee1d13 100644 --- a/src/dnode/inc/dnodeInt.h +++ b/src/dnode/inc/dnodeInt.h @@ -26,10 +26,10 @@ extern int32_t dDebugFlag; #define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); }} #define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); }} -#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", 255, __VA_ARGS__); }} -#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND INFO ", 255, __VA_ARGS__); }} -#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND DEBUG ", dDebugFlag, __VA_ARGS__); }} -#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND TRACE ", dDebugFlag, __VA_ARGS__); }} +#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", 255, __VA_ARGS__); }} +#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", 255, __VA_ARGS__); }} +#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} +#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 4577a5c31d..338fcd287c 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -72,6 +72,7 @@ static void *dnodeProcessMgmtQueue(void *param); static int32_t dnodeOpenVnodes(); static void dnodeCloseVnodes(); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); +static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); @@ -79,6 +80,7 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); int32_t dnodeInitMgmt() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; @@ -388,7 +390,7 @@ static void dnodeCloseVnodes() { dInfo("total vnodes:%d are all closed", numOfVnodes); } -static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { +static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) { SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.cfgVersion = htonl(pCreate->cfg.cfgVersion); @@ -408,14 +410,35 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId); } + return pCreate; +} + +static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { + SMDCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg); + void *pVnode = vnodeAcquireVnode(pCreate->cfg.vgId); if (pVnode != NULL) { - dDebug("vgId:%d, already exist, processed as alter msg", pCreate->cfg.vgId); - int32_t code = vnodeAlter(pVnode, pCreate); + dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId); + vnodeRelease(pVnode); + return TSDB_CODE_SUCCESS; + } else { + dDebug("vgId:%d, create vnode msg is received", pCreate->cfg.vgId); + return vnodeCreate(pCreate); + } +} + +static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { + SMDAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg); + + void *pVnode = vnodeAcquireVnode(pAlter->cfg.vgId); + if (pVnode != NULL) { + dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId); + int32_t code = vnodeAlter(pVnode, pAlter); vnodeRelease(pVnode); return code; } else { - return vnodeCreate(pCreate); + dError("vgId:%d, vnode not exist, can't alter it", pAlter->cfg.vgId); + return TSDB_CODE_VND_INVALID_VGROUP_ID; } } diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index 18fdb4d090..2a3436583f 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -44,6 +44,7 @@ int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVnodeWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue; diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 4eba81e29d..901e0061e9 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -135,7 +135,7 @@ int32_t main(int32_t argc, char *argv[]) { static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) { if (signum == SIGUSR1) { - taosCfgDynamicOptions("debugFlag 151"); + taosCfgDynamicOptions("debugFlag 143"); return; } if (signum == SIGUSR2) { diff --git a/src/inc/taos.h b/src/inc/taos.h index 1d609bc7db..d6f1883572 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -67,6 +67,8 @@ DLL_EXPORT void taos_init(); DLL_EXPORT void taos_cleanup(); DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); +DLL_EXPORT TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t userLen, + const char *pass, uint8_t passLen, const char *db, uint8_t dbLen, uint16_t port); DLL_EXPORT void taos_close(TAOS *taos); typedef struct TAOS_BIND { @@ -88,6 +90,7 @@ TAOS_RES * taos_stmt_use_result(TAOS_STMT *stmt); int taos_stmt_close(TAOS_STMT *stmt); DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); +DLL_EXPORT TAOS_RES *taos_query_c(TAOS *taos, const char *sql, uint32_t sqlLen); DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result DLL_EXPORT void taos_free_result(TAOS_RES *res); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index dec3f60b64..e30efcfd37 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -54,6 +54,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_TABLE, "create-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_TABLE, "drop-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_TABLE, "alter-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_VNODE, "create-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_VNODE, "drop-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" ) @@ -628,7 +629,7 @@ typedef struct { char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN]; SMDVnodeCfg cfg; SMDVnodeDesc nodes[TSDB_MAX_REPLICA]; -} SMDCreateVnodeMsg; +} SMDCreateVnodeMsg, SMDAlterVnodeMsg; typedef struct { char tableId[TSDB_TABLE_ID_LEN]; diff --git a/src/kit/taosdump/taosdump.c b/src/kit/taosdump/taosdump.c index 70f18b32bd..a1b1998298 100644 --- a/src/kit/taosdump/taosdump.c +++ b/src/kit/taosdump/taosdump.c @@ -179,8 +179,8 @@ static struct argp_option options[] = { {"start-time", 'S', "START_TIME", 0, "Start time to dump.", 3}, {"end-time", 'E', "END_TIME", 0, "End time to dump.", 3}, {"data-batch", 'N', "DATA_BATCH", 0, "Number of data point per insert statement. Default is 1.", 3}, - {"table-batch", 'T', "TABLE_BATCH", 0, "Number of table dumpout into one output file. Default is 1.", 3}, - {"thread_num", 't', "THREAD_NUM", 0, "Number of thread for dump in file. Default is 5.", 3}, + {"table-batch", 't', "TABLE_BATCH", 0, "Number of table dumpout into one output file. Default is 1.", 3}, + {"thread_num", 'T', "THREAD_NUM", 0, "Number of thread for dump in file. Default is 5.", 3}, {"allow-sys", 'a', 0, 0, "Allow to dump sys database", 3}, {0}}; @@ -304,10 +304,10 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'N': arguments->data_batch = atoi(arg); break; - case 'T': + case 't': arguments->table_batch = atoi(arg); break; - case 't': + case 'T': arguments->thread_num = atoi(arg); break; case OPT_ABORT: @@ -406,7 +406,7 @@ int main(int argc, char *argv[]) { printf("password: %s\n", tsArguments.password); printf("port: %u\n", tsArguments.port); printf("cversion: %s\n", tsArguments.cversion); - printf("mysqlFlag: %d", tsArguments.mysqlFlag); + printf("mysqlFlag: %d\n", tsArguments.mysqlFlag); printf("outpath: %s\n", tsArguments.outpath); printf("inpath: %s\n", tsArguments.inpath); printf("encode: %s\n", tsArguments.encode); @@ -821,7 +821,7 @@ _exit_failure: return -1; } -int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon) { +int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon, bool isSuperTable) { TAOS_ROW row = NULL; TAOS_RES *tmpResult = NULL; int count = 0; @@ -832,6 +832,13 @@ int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon) { return -1; } + char* tbuf = (char *)malloc(COMMAND_SIZE); + if (tbuf == NULL) { + fprintf(stderr, "failed to allocate memory\n"); + free(tempCommand); + return -1; + } + sprintf(tempCommand, "describe %s", table); tmpResult = taos_query(taosCon, tempCommand); @@ -862,6 +869,92 @@ int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon) { taos_free_result(tmpResult); tmpResult = NULL; + if (isSuperTable) { + free(tempCommand); + return count; + } + + // if chidl-table have tag, using select tagName from table to get tagValue + for (int i = 0 ; i < count; i++) { + if (strcmp(tableDes->cols[i].note, "TAG") != 0) continue; + + + sprintf(tempCommand, "select %s from %s", tableDes->cols[i].field, table); + + tmpResult = taos_query(taosCon, tempCommand); + code = taos_errno(tmpResult); + if (code != 0) { + fprintf(stderr, "failed to run command %s\n", tempCommand); + free(tempCommand); + taos_free_result(tmpResult); + return -1; + } + + fields = taos_fetch_fields(tmpResult); + + row = taos_fetch_row(tmpResult); + if (NULL == row) { + fprintf(stderr, " fetch failed to run command %s\n", tempCommand); + free(tempCommand); + taos_free_result(tmpResult); + return -1; + } + + switch (fields[0].type) { + case TSDB_DATA_TYPE_BOOL: + sprintf(tableDes->cols[i].note, "%d", ((((int)(*((char *)row[0]))) == 1) ? 1 : 0)); + break; + case TSDB_DATA_TYPE_TINYINT: + sprintf(tableDes->cols[i].note, "%d", (int)(*((char *)row[0]))); + break; + case TSDB_DATA_TYPE_SMALLINT: + sprintf(tableDes->cols[i].note, "%d", (int)(*((short *)row[0]))); + break; + case TSDB_DATA_TYPE_INT: + sprintf(tableDes->cols[i].note, "%d", *((int *)row[0])); + break; + case TSDB_DATA_TYPE_BIGINT: + sprintf(tableDes->cols[i].note, "%" PRId64 "", *((int64_t *)row[0])); + break; + case TSDB_DATA_TYPE_FLOAT: + sprintf(tableDes->cols[i].note, "%f", GET_FLOAT_VAL(row[0])); + break; + case TSDB_DATA_TYPE_DOUBLE: + sprintf(tableDes->cols[i].note, "%f", GET_DOUBLE_VAL(row[0])); + break; + case TSDB_DATA_TYPE_BINARY: + tableDes->cols[i].note[0] = '\''; + converStringToReadable((char *)row[0], fields[0].bytes, tbuf, COMMAND_SIZE); + char* pstr = stpcpy(&(tableDes->cols[i].note[1]), tbuf); + *(pstr++) = '\''; + break; + case TSDB_DATA_TYPE_NCHAR: + convertNCharToReadable((char *)row[0], fields[0].bytes, tbuf, COMMAND_SIZE); + sprintf(tableDes->cols[i].note, "\'%s\'", tbuf); + break; + case TSDB_DATA_TYPE_TIMESTAMP: + sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[0]); + #if 0 + if (!arguments->mysqlFlag) { + sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[0]); + } else { + char buf[64] = "\0"; + int64_t ts = *((int64_t *)row[0]); + time_t tt = (time_t)(ts / 1000); + struct tm *ptm = localtime(&tt); + strftime(buf, 64, "%y-%m-%d %H:%M:%S", ptm); + sprintf(tableDes->cols[i].note, "\'%s.%03d\'", buf, (int)(ts % 1000)); + } + #endif + break; + default: + break; + } + + taos_free_result(tmpResult); + tmpResult = NULL; + } + free(tempCommand); return count; @@ -886,23 +979,25 @@ int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FI memset(tableDes, 0, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS); */ - count = taosGetTableDes(table, tableDes, taosCon); + count = taosGetTableDes(table, tableDes, taosCon, false); if (count < 0) { free(tableDes); return -1; } + // create child-table using super-table taosDumpCreateMTableClause(tableDes, metric, count, fp); } else { // dump table definition - count = taosGetTableDes(table, tableDes, taosCon); + count = taosGetTableDes(table, tableDes, taosCon, false); if (count < 0) { free(tableDes); return -1; } + // create normal-table or super-table taosDumpCreateTableClause(tableDes, count, fp); } @@ -1033,7 +1128,7 @@ int32_t taosDumpStable(char *table, FILE *fp, TAOS* taosCon) { exit(-1); } - count = taosGetTableDes(table, tableDes, taosCon); + count = taosGetTableDes(table, tableDes, taosCon, true); if (count < 0) { free(tableDes); @@ -1083,7 +1178,6 @@ int32_t taosDumpCreateSuperTableClause(TAOS* taosCon, char* dbName, FILE *fp) taos_free_result(tmpResult); exit(-1); } - taos_free_result(tmpResult); TAOS_FIELD *fields = taos_fetch_fields(tmpResult); @@ -1291,14 +1385,16 @@ void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols if (counter != count_temp) { if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || strcasecmp(tableDes->cols[counter].type, "nchar") == 0) { - pstr += sprintf(pstr, ", \'%s\'", tableDes->cols[counter].note); + //pstr += sprintf(pstr, ", \'%s\'", tableDes->cols[counter].note); + pstr += sprintf(pstr, ", %s", tableDes->cols[counter].note); } else { pstr += sprintf(pstr, ", %s", tableDes->cols[counter].note); } } else { if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || strcasecmp(tableDes->cols[counter].type, "nchar") == 0) { - pstr += sprintf(pstr, "\'%s\'", tableDes->cols[counter].note); + //pstr += sprintf(pstr, "\'%s\'", tableDes->cols[counter].note); + pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); } else { pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); } @@ -1363,7 +1459,7 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* return -1; } - numFields = taos_field_count(taosCon); + numFields = taos_field_count(tmpResult); assert(numFields > 0); TAOS_FIELD *fields = taos_fetch_fields(tmpResult); tbuf = (char *)malloc(COMMAND_SIZE); @@ -2015,6 +2111,7 @@ int taosDumpInOneFile(TAOS * taos, FILE* fp, char* fcharset, char* encode, c } memcpy(cmd + cmd_len, line, read_len); + cmd[read_len + cmd_len]= '\0'; if (queryDB(taos, cmd)) { fprintf(stderr, "error sql: linenu:%d, file:%s\n", lineNo, fileName); } diff --git a/src/mnode/inc/mnodeInt.h b/src/mnode/inc/mnodeInt.h index a450a8027f..44626fd167 100644 --- a/src/mnode/inc/mnodeInt.h +++ b/src/mnode/inc/mnodeInt.h @@ -29,17 +29,17 @@ extern int32_t sdbDebugFlag; // mnode log function #define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", 255, __VA_ARGS__); }} #define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", 255, __VA_ARGS__); }} -#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", 255, __VA_ARGS__); }} -#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND INFO ", 255, __VA_ARGS__); }} -#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND DEBUG ", mDebugFlag, __VA_ARGS__); }} -#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND TRACE ", mDebugFlag, __VA_ARGS__); }} +#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", 255, __VA_ARGS__); }} +#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", 255, __VA_ARGS__); }} +#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }} +#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }} #define sdbFatal(...) { if (sdbDebugFlag & DEBUG_FATAL) { taosPrintLog("SDB FATAL ", 255, __VA_ARGS__); }} #define sdbError(...) { if (sdbDebugFlag & DEBUG_ERROR) { taosPrintLog("SDB ERROR ", 255, __VA_ARGS__); }} -#define sdbWarn(...) { if (sdbDebugFlag & DEBUG_WARN) { taosPrintLog("SDB WARN ", 255, __VA_ARGS__); }} -#define sdbInfo(...) { if (sdbDebugFlag & DEBUG_INFO) { taosPrintLog("SDB INFO ", 255, __VA_ARGS__); }} -#define sdbDebug(...) { if (sdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("SDB DEBUG ", sdbDebugFlag, __VA_ARGS__); }} -#define sdbTrace(...) { if (sdbDebugFlag & DEBUG_TRACE) { taosPrintLog("SDB TRACE ", sdbDebugFlag, __VA_ARGS__); }} +#define sdbWarn(...) { if (sdbDebugFlag & DEBUG_WARN) { taosPrintLog("SDB WARN ", 255, __VA_ARGS__); }} +#define sdbInfo(...) { if (sdbDebugFlag & DEBUG_INFO) { taosPrintLog("SDB ", 255, __VA_ARGS__); }} +#define sdbDebug(...) { if (sdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }} +#define sdbTrace(...) { if (sdbDebugFlag & DEBUG_TRACE) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }} #define mLError(...) { monitorSaveLog(2, __VA_ARGS__); mError(__VA_ARGS__) } #define mLWarn(...) { monitorSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) } diff --git a/src/mnode/inc/mnodeVgroup.h b/src/mnode/inc/mnodeVgroup.h index 6ddf8e44b9..3f1da89605 100644 --- a/src/mnode/inc/mnodeVgroup.h +++ b/src/mnode/inc/mnodeVgroup.h @@ -44,9 +44,9 @@ int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_ void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable); void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable); -void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle); void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle); void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); +void mnodeSendAlterVgroupMsg(SVgObj *pVgroup); SRpcIpSet mnodeGetIpSetFromVgroup(SVgObj *pVgroup); SRpcIpSet mnodeGetIpSetFromIp(char *ep); diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index e3b1603856..fb97d6f380 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -913,7 +913,7 @@ static int32_t mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) { pIter = mnodeGetNextVgroup(pIter, &pVgroup); if (pVgroup == NULL) break; if (pVgroup->pDb == pDb) { - mnodeSendCreateVgroupMsg(pVgroup, NULL); + mnodeSendAlterVgroupMsg(pVgroup); } mnodeDecVgroupRef(pVgroup); } diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 40b9784e0e..e08f8ca0db 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -252,13 +252,15 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { int32_t processedCount = atomic_add_fetch_32(&pOper->processedCount, 1); if (processedCount <= 1) { if (pMsg != NULL) { - sdbDebug("app:%p:%p, waiting for confirm this operation, count:%d", pMsg->rpcMsg.ahandle, pMsg, processedCount); + sdbDebug("app:%p:%p, waiting for confirm this operation, count:%d result:%s", pMsg->rpcMsg.ahandle, pMsg, + processedCount, tstrerror(code)); } return; } if (pMsg != NULL) { - sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg); + sdbDebug("app:%p:%p, is confirmed and will do callback func, result:%s", pMsg->rpcMsg.ahandle, pMsg, + tstrerror(code)); } if (pOper->cb != NULL) { diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 05e300d242..8b4d62a8b0 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -40,9 +40,18 @@ typedef enum { TAOS_VG_STATUS_READY, - TAOS_VG_STATUS_DROPPING + TAOS_VG_STATUS_DROPPING, + TAOS_VG_STATUS_CREATING, + TAOS_VG_STATUS_UPDATING, } EVgroupStatus; +char* vgroupStatus[] = { + "ready", + "dropping", + "creating", + "updating" +}; + static void *tsVgroupSdb = NULL; static int32_t tsVgUpdateSize = 0; @@ -50,6 +59,7 @@ static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup); static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg); +static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg); static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) ; static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); @@ -83,6 +93,7 @@ static int32_t mnodeVgroupActionInsert(SSdbOper *pOper) { } pVgroup->pDb = pDb; + pVgroup->status = TAOS_VG_STATUS_CREATING; pVgroup->accessState = TSDB_VN_ALL_ACCCESS; if (mnodeAllocVgroupIdPool(pVgroup) < 0) { mError("vgId:%d, failed to init idpool for vgroups", pVgroup->vgId); @@ -217,6 +228,7 @@ int32_t mnodeInitVgroups() { mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mnodeGetVgroupMeta); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mnodeRetrieveVgroups); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp); + mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessAlterVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mnodeProcessDropVnodeRsp); mnodeAddPeerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mnodeProcessVnodeCfgMsg); @@ -247,7 +259,7 @@ void mnodeUpdateVgroup(SVgObj *pVgroup) { if (sdbUpdateRow(&oper) != TSDB_CODE_SUCCESS) { mError("vgId:%d, failed to update vgroup", pVgroup->vgId); } - mnodeSendCreateVgroupMsg(pVgroup, NULL); + mnodeSendAlterVgroupMsg(pVgroup); } /* @@ -271,10 +283,17 @@ void mnodeCheckUnCreatedVgroup(SDnodeObj *pDnode, SVnodeLoad *pVloads, int32_t o pNextV++; } - if (i == openVnodes && pVgroup->status == TAOS_VG_STATUS_READY) { - mnodeSendCreateVgroupMsg(pVgroup, NULL); + if (i == openVnodes) { + if (pVgroup->status == TAOS_VG_STATUS_CREATING || pVgroup->status == TAOS_VG_STATUS_DROPPING) { + mDebug("vgId:%d, not exist in dnode:%d and status is %s, do nothing", pVgroup->vgId, pDnode->dnodeId, + vgroupStatus[pVgroup->status]); + } else { + mDebug("vgId:%d, not exist in dnode:%d and status is %s, send create msg", pVgroup->vgId, pDnode->dnodeId, + vgroupStatus[pVgroup->status]); + mnodeSendCreateVgroupMsg(pVgroup, NULL); + } } - + mnodeDecVgroupRef(pVgroup); } @@ -314,7 +333,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl mError("dnode:%d, vgId:%d, vnode cfgVersion:%d repica:%d not match with mnode cfgVersion:%d replica:%d", pDnode->dnodeId, pVload->vgId, pVload->cfgVersion, pVload->replica, pVgroup->pDb->cfgVersion, pVgroup->numOfVnodes); - mnodeSendCreateVgroupMsg(pVgroup, NULL); + mnodeSendAlterVgroupMsg(pVgroup); } } @@ -461,6 +480,10 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; sdbDeleteRow(&desc); return code; + } else { + pVgroup->status = TAOS_VG_STATUS_READY; + SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; + sdbUpdateRow(&desc); } mInfo("app:%p:%p, vgId:%d, is created in mnode, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, @@ -536,7 +559,7 @@ void mnodeCleanupVgroups() { tsVgroupSdb = NULL; } -int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { +static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { SDbObj *pDb = mnodeGetDb(pShow->db); if (pDb == NULL) { return TSDB_CODE_MND_DB_NOT_SELECTED; @@ -630,7 +653,7 @@ static bool mnodeFilterVgroups(SVgObj *pVgroup, STableObj *pTable) { } } -int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) { +static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; SVgObj *pVgroup = NULL; int32_t cols = 0; @@ -733,7 +756,7 @@ void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { } } -SMDCreateVnodeMsg *mnodeBuildCreateVnodeMsg(SVgObj *pVgroup) { +static SMDCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) { SDbObj *pDb = pVgroup->pDb; if (pDb == NULL) return NULL; @@ -800,8 +823,31 @@ SRpcIpSet mnodeGetIpSetFromIp(char *ep) { return ipSet; } -void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { - SMDCreateVnodeMsg *pCreate = mnodeBuildCreateVnodeMsg(pVgroup); +static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet) { + SMDAlterVnodeMsg *pAlter = mnodeBuildVnodeMsg(pVgroup); + SRpcMsg rpcMsg = { + .ahandle = NULL, + .pCont = pAlter, + .contLen = pAlter ? sizeof(SMDAlterVnodeMsg) : 0, + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_ALTER_VNODE + }; + dnodeSendMsgToDnode(ipSet, &rpcMsg); +} + +void mnodeSendAlterVgroupMsg(SVgObj *pVgroup) { + mDebug("vgId:%d, send alter all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, + pVgroup->dbName); + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + SRpcIpSet ipSet = mnodeGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); + mDebug("vgId:%d, index:%d, send alter vnode msg to dnode %s", pVgroup->vgId, i, + pVgroup->vnodeGid[i].pDnode->dnodeEp); + mnodeSendAlterVnodeMsg(pVgroup, &ipSet); + } +} + +static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { + SMDCreateVnodeMsg *pCreate = mnodeBuildVnodeMsg(pVgroup); SRpcMsg rpcMsg = { .ahandle = ahandle, .pCont = pCreate, @@ -823,6 +869,10 @@ void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { } } +static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) { + mDebug("alter vnode rsp received"); +} + static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { if (rpcMsg->ahandle == NULL) return; diff --git a/src/plugins/http/inc/httpLog.h b/src/plugins/http/inc/httpLog.h index f4c20a40d5..9c145a43e8 100644 --- a/src/plugins/http/inc/httpLog.h +++ b/src/plugins/http/inc/httpLog.h @@ -22,10 +22,10 @@ extern int32_t httpDebugFlag; #define httpFatal(...) { if (httpDebugFlag & DEBUG_FATAL) { taosPrintLog("HTP FATAL ", 255, __VA_ARGS__); }} #define httpError(...) { if (httpDebugFlag & DEBUG_ERROR) { taosPrintLog("HTP ERROR ", 255, __VA_ARGS__); }} -#define httpWarn(...) { if (httpDebugFlag & DEBUG_WARN) { taosPrintLog("HTP WARN ", 255, __VA_ARGS__); }} -#define httpInfo(...) { if (httpDebugFlag & DEBUG_INFO) { taosPrintLog("HTP INFO ", 255, __VA_ARGS__); }} -#define httpDebug(...) { if (httpDebugFlag & DEBUG_DEBUG) { taosPrintLog("HTP DEBUG ", httpDebugFlag, __VA_ARGS__); }} -#define httpTrace(...) { if (httpDebugFlag & DEBUG_TRACE) { taosPrintLog("HTP TRACE ", httpDebugFlag, __VA_ARGS__); }} -#define httpTraceL(...){ if (httpDebugFlag & DEBUG_TRACE) { taosPrintLongString("HTP TRACE ", httpDebugFlag, __VA_ARGS__); }} +#define httpWarn(...) { if (httpDebugFlag & DEBUG_WARN) { taosPrintLog("HTP WARN ", 255, __VA_ARGS__); }} +#define httpInfo(...) { if (httpDebugFlag & DEBUG_INFO) { taosPrintLog("HTP ", 255, __VA_ARGS__); }} +#define httpDebug(...) { if (httpDebugFlag & DEBUG_DEBUG) { taosPrintLog("HTP ", httpDebugFlag, __VA_ARGS__); }} +#define httpTrace(...) { if (httpDebugFlag & DEBUG_TRACE) { taosPrintLog("HTP ", httpDebugFlag, __VA_ARGS__); }} +#define httpTraceL(...){ if (httpDebugFlag & DEBUG_TRACE) { taosPrintLongString("HTP ", httpDebugFlag, __VA_ARGS__); }} #endif diff --git a/src/plugins/monitor/src/monitorMain.c b/src/plugins/monitor/src/monitorMain.c index 36468b3fdb..22cb111b83 100644 --- a/src/plugins/monitor/src/monitorMain.c +++ b/src/plugins/monitor/src/monitorMain.c @@ -30,10 +30,10 @@ #define monitorFatal(...) { if (monitorDebugFlag & DEBUG_FATAL) { taosPrintLog("MON FATAL ", 255, __VA_ARGS__); }} #define monitorError(...) { if (monitorDebugFlag & DEBUG_ERROR) { taosPrintLog("MON ERROR ", 255, __VA_ARGS__); }} -#define monitorWarn(...) { if (monitorDebugFlag & DEBUG_WARN) { taosPrintLog("MON WARN ", 255, __VA_ARGS__); }} -#define monitorInfo(...) { if (monitorDebugFlag & DEBUG_INFO) { taosPrintLog("MON INFO ", 255, __VA_ARGS__); }} -#define monitorDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON DEBUG ", monitorDebugFlag, __VA_ARGS__); }} -#define monitorTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON TRACE ", monitorDebugFlag, __VA_ARGS__); }} +#define monitorWarn(...) { if (monitorDebugFlag & DEBUG_WARN) { taosPrintLog("MON WARN ", 255, __VA_ARGS__); }} +#define monitorInfo(...) { if (monitorDebugFlag & DEBUG_INFO) { taosPrintLog("MON ", 255, __VA_ARGS__); }} +#define monitorDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }} +#define monitorTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }} #define SQL_LENGTH 1024 #define LOG_LEN_STR 100 diff --git a/src/plugins/mqtt/inc/mqttLog.h b/src/plugins/mqtt/inc/mqttLog.h index 5d5f98a13b..e186b81112 100644 --- a/src/plugins/mqtt/inc/mqttLog.h +++ b/src/plugins/mqtt/inc/mqttLog.h @@ -22,9 +22,9 @@ extern int32_t mqttDebugFlag; #define mqttFatal(...) { if (mqttDebugFlag & DEBUG_FATAL) { taosPrintLog("MQT FATAL ", 255, __VA_ARGS__); }} #define mqttError(...) { if (mqttDebugFlag & DEBUG_ERROR) { taosPrintLog("MQT ERROR ", 255, __VA_ARGS__); }} -#define mqttWarn(...) { if (mqttDebugFlag & DEBUG_WARN) { taosPrintLog("MQT WARN ", 255, __VA_ARGS__); }} -#define mqttInfo(...) { if (mqttDebugFlag & DEBUG_INFO) { taosPrintLog("MQT INFO ", 255, __VA_ARGS__); }} -#define mqttDebug(...) { if (mqttDebugFlag & DEBUG_DEBUG) { taosPrintLog("MQT DEBUG ", mqttDebugFlag, __VA_ARGS__); }} -#define mqttTrace(...) { if (mqttDebugFlag & DEBUG_TRACE) { taosPrintLog("MQT TRACE ", mqttDebugFlag, __VA_ARGS__); }} +#define mqttWarn(...) { if (mqttDebugFlag & DEBUG_WARN) { taosPrintLog("MQT WARN ", 255, __VA_ARGS__); }} +#define mqttInfo(...) { if (mqttDebugFlag & DEBUG_INFO) { taosPrintLog("MQT ", 255, __VA_ARGS__); }} +#define mqttDebug(...) { if (mqttDebugFlag & DEBUG_DEBUG) { taosPrintLog("MQT ", mqttDebugFlag, __VA_ARGS__); }} +#define mqttTrace(...) { if (mqttDebugFlag & DEBUG_TRACE) { taosPrintLog("MQT ", mqttDebugFlag, __VA_ARGS__); }} #endif diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 405ab7e26e..fb8750323f 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -24,7 +24,7 @@ #include "qtsbuf.h" #include "taosdef.h" #include "tarray.h" -#include "tref.h" +#include "tlockfree.h" #include "tsdb.h" #include "tsqlfunction.h" #include "query.h" diff --git a/src/query/inc/queryLog.h b/src/query/inc/queryLog.h index fd1322e995..a1c447a6eb 100644 --- a/src/query/inc/queryLog.h +++ b/src/query/inc/queryLog.h @@ -27,10 +27,10 @@ extern int32_t tscEmbedded; #define qFatal(...) { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", 255, __VA_ARGS__); }} #define qError(...) { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", 255, __VA_ARGS__); }} -#define qWarn(...) { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", 255, __VA_ARGS__); }} -#define qInfo(...) { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY INFO ", 255, __VA_ARGS__); }} -#define qDebug(...) { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY DEBUG ", qDebugFlag, __VA_ARGS__); }} -#define qTrace(...) { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY TRACE ", qDebugFlag, __VA_ARGS__); }} +#define qWarn(...) { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", 255, __VA_ARGS__); }} +#define qInfo(...) { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", 255, __VA_ARGS__); }} +#define qDebug(...) { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} +#define qTrace(...) { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} #ifdef __cplusplus } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 706eb0c95c..92426633d9 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2250,7 +2250,13 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; - while (tsdbNextDataBlock(pQueryHandle)) { + while (true) { + if (!tsdbNextDataBlock(pQueryHandle)) { + if (terrno != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, terrno); + } + break; + } summary->totalBlocks += 1; if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { @@ -3234,6 +3240,9 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI // add ref for table pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo); + if (pRuntimeEnv->pSecQueryHandle == NULL) { + longjmp(pRuntimeEnv->env, terrno); + } setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); @@ -3306,6 +3315,9 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { } pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo); + if (pRuntimeEnv->pSecQueryHandle == NULL) { + longjmp(pRuntimeEnv->env, terrno); + } pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); @@ -3928,7 +3940,14 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle; SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; - while (tsdbNextDataBlock(pQueryHandle)) { + while (true) { + if (!tsdbNextDataBlock(pQueryHandle)) { + if (terrno != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, terrno); + } + break; + } + if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); @@ -3972,7 +3991,14 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { STableQueryInfo *pTableQueryInfo = pQuery->current; SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; - while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { + while (true) { + if (!tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { + if (terrno != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, terrno); + } + break; + } + tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle, &blockInfo); if (QUERY_IS_ASC_QUERY(pQuery)) { @@ -4071,16 +4097,16 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { return true; } -static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) { +static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; if (onlyQueryTags(pQuery)) { - return; + return TSDB_CODE_SUCCESS; } if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pRuntimeEnv))) { - return; + return TSDB_CODE_SUCCESS; } STsdbQueryCond cond = { @@ -4102,6 +4128,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) { cond.twindow = pCheckInfo->win; } + terrno = TSDB_CODE_SUCCESS; if (isFirstLastRowQuery(pQuery)) { pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo); } else if (isPointInterpoQuery(pQuery)) { @@ -4109,6 +4136,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) { } else { pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo); } + return terrno; } static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) { @@ -4145,7 +4173,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo setScanLimitationByResultBuffer(pQuery); changeExecuteScanOrder(pQInfo, false); - setupQueryHandle(tsdb, pQInfo, isSTableQuery); + code = setupQueryHandle(tsdb, pQInfo, isSTableQuery); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pQInfo->tsdb = tsdb; pQInfo->vgId = vgId; @@ -4269,7 +4300,14 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - while (tsdbNextDataBlock(pQueryHandle)) { + while (true) { + if (!tsdbNextDataBlock(pQueryHandle)) { + if (terrno != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, terrno); + } + break; + } + summary->totalBlocks += 1; if (IS_QUERY_KILLED(pQInfo)) { @@ -4349,6 +4387,9 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo); taosArrayDestroy(tx); taosArrayDestroy(g1); + if (pRuntimeEnv->pQueryHandle == NULL) { + longjmp(pRuntimeEnv->env, terrno); + } if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->cur.vgroupIndex == -1) { @@ -4416,7 +4457,13 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } else { pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo); } - + + taosArrayDestroy(tx); + taosArrayDestroy(g1); + if (pRuntimeEnv->pQueryHandle == NULL) { + longjmp(pRuntimeEnv->env, terrno); + } + initCtxOutputBuf(pRuntimeEnv); SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); @@ -4480,6 +4527,9 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo); taosArrayDestroy(g1); taosArrayDestroy(tx); + if (pRuntimeEnv->pQueryHandle == NULL) { + longjmp(pRuntimeEnv->env, terrno); + } SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); assert(taosArrayGetSize(s) >= 1); @@ -4674,7 +4724,10 @@ static void doSaveContext(SQInfo *pQInfo) { pRuntimeEnv->prevGroupId = INT32_MIN; pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo); - + if (pRuntimeEnv->pSecQueryHandle == NULL) { + longjmp(pRuntimeEnv->env, terrno); + } + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); disableFuncInReverseScan(pQInfo); @@ -6512,8 +6565,8 @@ void* qOpenQueryMgmt(int32_t vgId) { } static void queryMgmtKillQueryFn(void* handle) { - void** h = (void**) handle; - qKillQuery(*h); + void** fp = (void**)handle; + qKillQuery(*fp); } void qQueryMgmtNotifyClosed(void* pQMgmt) { diff --git a/src/rpc/inc/rpcLog.h b/src/rpc/inc/rpcLog.h index f0f5c84ff9..10e8476691 100644 --- a/src/rpc/inc/rpcLog.h +++ b/src/rpc/inc/rpcLog.h @@ -27,10 +27,10 @@ extern int32_t tscEmbedded; #define tFatal(...) { if (rpcDebugFlag & DEBUG_FATAL) { taosPrintLog("RPC FATAL ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }} #define tError(...) { if (rpcDebugFlag & DEBUG_ERROR) { taosPrintLog("RPC ERROR ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }} -#define tWarn(...) { if (rpcDebugFlag & DEBUG_WARN) { taosPrintLog("RPC WARN ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }} -#define tInfo(...) { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC INFO ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }} -#define tDebug(...) { if (rpcDebugFlag & DEBUG_DEBUG) { taosPrintLog("RPC DEBUG ", rpcDebugFlag, __VA_ARGS__); }} -#define tTrace(...) { if (rpcDebugFlag & DEBUG_TRACE) { taosPrintLog("RPC TRACE ", rpcDebugFlag, __VA_ARGS__); }} +#define tWarn(...) { if (rpcDebugFlag & DEBUG_WARN) { taosPrintLog("RPC WARN ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }} +#define tInfo(...) { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }} +#define tDebug(...) { if (rpcDebugFlag & DEBUG_DEBUG) { taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); }} +#define tTrace(...) { if (rpcDebugFlag & DEBUG_TRACE) { taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); }} #define tDump(x, y) { if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); }} #ifdef __cplusplus diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index bfc1aa8932..40f2dac660 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -21,11 +21,10 @@ #include "tkvstore.h" #include "tlist.h" #include "tlog.h" -#include "tref.h" +#include "tlockfree.h" #include "tsdb.h" #include "tskiplist.h" #include "tutil.h" -#include "trwlatch.h" #ifdef __cplusplus extern "C" { @@ -35,10 +34,10 @@ extern int tsdbDebugFlag; #define tsdbFatal(...) { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} #define tsdbError(...) { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} -#define tsdbWarn(...) { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} -#define tsdbInfo(...) { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB INFO ", 255, __VA_ARGS__); }} -#define tsdbDebug(...) { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB DEBUG ", tsdbDebugFlag, __VA_ARGS__); }} -#define tsdbTrace(...) { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB TRACE ", tsdbDebugFlag, __VA_ARGS__); }} +#define tsdbWarn(...) { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} +#define tsdbInfo(...) { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} +#define tsdbDebug(...) { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} +#define tsdbTrace(...) { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} #define TSDB_MAX_TABLE_SCHEMAS 16 #define TSDB_FILE_HEAD_SIZE 512 diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 2195760e88..2b1248c8a9 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -187,7 +187,10 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock; pQueryHandle->allocSize = 0; - tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb); + if (tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) { + free(pQueryHandle); + return NULL; + } tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem); size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList); @@ -246,11 +249,11 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) { STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo); - - pQueryHandle->type = TSDB_QUERY_TYPE_LAST; - pQueryHandle->order = TSDB_ORDER_DESC; - - changeQueryHandleForLastrowQuery(pQueryHandle); + if (pQueryHandle != NULL) { + pQueryHandle->type = TSDB_QUERY_TYPE_LAST; + pQueryHandle->order = TSDB_ORDER_DESC; + changeQueryHandleForLastrowQuery(pQueryHandle); + } return pQueryHandle; } @@ -272,9 +275,10 @@ SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) { TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo) { STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo); - - pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; - changeQueryHandleForInterpQuery(pQueryHandle); + if (pQueryHandle != NULL) { + pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; + changeQueryHandleForInterpQuery(pQueryHandle); + } return pQueryHandle; } @@ -1238,10 +1242,12 @@ static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex]; // assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset); +#if 0 // TODO: temporarily comment off requested by Dr. Liao if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset && pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) { tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset); } +#endif return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1; } @@ -1525,7 +1531,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { pSecQueryHandle->activeIndex = 0; pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock; - tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb); + if (tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb) != 0) { + free(pSecQueryHandle); + return false; + } tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem); // allocate buffer in order to load data blocks from file @@ -1612,6 +1621,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { // TODO: opt by consider the scan order bool ret = doHasDataInBuffer(pQueryHandle); + terrno = TSDB_CODE_SUCCESS; elapsedTime = taosGetTimestampUs() - stime; pQueryHandle->cost.checkForNextTime += elapsedTime; diff --git a/src/util/inc/tcache.h b/src/util/inc/tcache.h index 3da604d152..5a3545fd8f 100644 --- a/src/util/inc/tcache.h +++ b/src/util/inc/tcache.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "os.h" -#include "tref.h" +#include "tlockfree.h" #include "hash.h" typedef void (*__cache_free_fn_t)(void*); diff --git a/src/util/inc/tref.h b/src/util/inc/tlockfree.h similarity index 66% rename from src/util/inc/tref.h rename to src/util/inc/tlockfree.h index 0503325326..e425d71d27 100644 --- a/src/util/inc/tref.h +++ b/src/util/inc/tlockfree.h @@ -12,12 +12,17 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ - -#ifndef TDENGINE_TREF_H -#define TDENGINE_TREF_H +#ifndef __TD_LOCK_FREE_H__ +#define __TD_LOCK_FREE_H__ #include "os.h" +#ifdef __cplusplus +extern "C" { +#endif + + +// reference counting typedef void (*_ref_fn_t)(const void* pObj); #define T_REF_DECLARE() \ @@ -55,4 +60,47 @@ typedef void (*_ref_fn_t)(const void* pObj); #define T_REF_VAL_GET(x) (x)->_ref.val -#endif // TDENGINE_TREF_H + + +// single writer multiple reader lock +typedef int32_t SRWLatch; + +void taosInitRWLatch(SRWLatch *pLatch); +void taosWLockLatch(SRWLatch *pLatch); +void taosWUnLockLatch(SRWLatch *pLatch); +void taosRLockLatch(SRWLatch *pLatch); +void taosRUnLockLatch(SRWLatch *pLatch); + + + +// copy on read +#define taosCorBeginRead(x) for (uint32_t i_ = 1; 1; ++i_) { \ + int32_t old_ = atomic_load_32(x); \ + if (old_ & 0x00000001) { \ + if (i_ % 1000 == 0) { \ + sched_yield(); \ + } \ + continue; \ + } + +#define taosCorEndRead(x) \ + if (atomic_load_32(x) == old_) { \ + break; \ + } \ + } + +#define taosCorBeginWrite(x) taosCorBeginRead(x) \ + if (atomic_val_compare_exchange_32((x), old_, old_ + 1) != old_) { \ + continue; \ + } + +#define taosCorEndWrite(x) atomic_add_fetch_32((x), 1); \ + break; \ + } + + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/util/inc/trwlatch.h b/src/util/inc/trwlatch.h deleted file mode 100644 index c6923f0e90..0000000000 --- a/src/util/inc/trwlatch.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#ifndef __TD_RWLATCH_H__ -#define __TD_RWLATCH_H__ - -#ifdef __cplusplus -extern "C" { -#endif - -#include - -typedef int32_t SRWLatch; - -void taosInitRWLatch(SRWLatch *pLatch); -void taosWLockLatch(SRWLatch *pLatch); -void taosWUnLockLatch(SRWLatch *pLatch); -void taosRLockLatch(SRWLatch *pLatch); -void taosRUnLockLatch(SRWLatch *pLatch); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/util/src/trwlatch.c b/src/util/src/tlockfree.c similarity index 98% rename from src/util/src/trwlatch.c rename to src/util/src/tlockfree.c index cc027aa3df..3161518a36 100644 --- a/src/util/src/trwlatch.c +++ b/src/util/src/tlockfree.c @@ -15,8 +15,7 @@ // #define _GNU_SOURCE // #include -#include "trwlatch.h" -#include "os.h" +#include "tlockfree.h" #define TD_RWLATCH_WRITE_FLAG 0x40000000 diff --git a/src/util/src/ttimer.c b/src/util/src/ttimer.c index 62aa2007b2..5cdc6ff918 100644 --- a/src/util/src/ttimer.c +++ b/src/util/src/ttimer.c @@ -24,10 +24,10 @@ extern int32_t tscEmbedded; #define tmrFatal(...) { if (tmrDebugFlag & DEBUG_FATAL) { taosPrintLog("TMR FATAL ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }} #define tmrError(...) { if (tmrDebugFlag & DEBUG_ERROR) { taosPrintLog("TMR ERROR ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }} -#define tmrWarn(...) { if (tmrDebugFlag & DEBUG_WARN) { taosPrintLog("TMR WARN ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }} -#define tmrInfo(...) { if (tmrDebugFlag & DEBUG_INFO) { taosPrintLog("TMR INFO ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }} -#define tmrDebug(...) { if (tmrDebugFlag & DEBUG_DEBUG) { taosPrintLog("TMR DEBUG ", tmrDebugFlag, __VA_ARGS__); }} -#define tmrTrace(...) { if (tmrDebugFlag & DEBUG_TRACE) { taosPrintLog("TMR TRACE ", tmrDebugFlag, __VA_ARGS__); }} +#define tmrWarn(...) { if (tmrDebugFlag & DEBUG_WARN) { taosPrintLog("TMR WARN ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }} +#define tmrInfo(...) { if (tmrDebugFlag & DEBUG_INFO) { taosPrintLog("TMR ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }} +#define tmrDebug(...) { if (tmrDebugFlag & DEBUG_DEBUG) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }} +#define tmrTrace(...) { if (tmrDebugFlag & DEBUG_TRACE) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }} #define TIMER_STATE_WAITING 0 #define TIMER_STATE_EXPIRED 1 diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 4f22c7784d..77db4fd04c 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -29,10 +29,10 @@ extern int32_t vDebugFlag; #define vFatal(...) { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", 255, __VA_ARGS__); }} #define vError(...) { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", 255, __VA_ARGS__); }} -#define vWarn(...) { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("VND WARN ", 255, __VA_ARGS__); }} -#define vInfo(...) { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND INFO ", 255, __VA_ARGS__); }} -#define vDebug(...) { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND DEBUG ", vDebugFlag, __VA_ARGS__); }} -#define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND TRACE ", vDebugFlag, __VA_ARGS__); }} +#define vWarn(...) { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("VND WARN ", 255, __VA_ARGS__); }} +#define vInfo(...) { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", 255, __VA_ARGS__); }} +#define vDebug(...) { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }} +#define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }} typedef struct { int32_t vgId; // global vnode group ID diff --git a/src/wal/src/walMain.c b/src/wal/src/walMain.c index 9001d29415..a0e2ccb51d 100644 --- a/src/wal/src/walMain.c +++ b/src/wal/src/walMain.c @@ -35,10 +35,10 @@ #define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", 255, __VA_ARGS__); }} #define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", 255, __VA_ARGS__); }} -#define wWarn(...) { if (wDebugFlag & DEBUG_WARN) { taosPrintLog("WAL WARN ", 255, __VA_ARGS__); }} -#define wInfo(...) { if (wDebugFlag & DEBUG_INFO) { taosPrintLog("WAL INFO ", 255, __VA_ARGS__); }} -#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL DEBUG ", wDebugFlag, __VA_ARGS__); }} -#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL TRACE ", wDebugFlag, __VA_ARGS__); }} +#define wWarn(...) { if (wDebugFlag & DEBUG_WARN) { taosPrintLog("WAL WARN ", 255, __VA_ARGS__); }} +#define wInfo(...) { if (wDebugFlag & DEBUG_INFO) { taosPrintLog("WAL ", 255, __VA_ARGS__); }} +#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} +#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} typedef struct { uint64_t version; diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 9af72af471..9e38e04b63 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -84,8 +84,17 @@ class WorkerThread: # Let us have a DB connection of our own if (gConfig.per_thread_db_connection): # type: ignore # print("connector_type = {}".format(gConfig.connector_type)) - self._dbConn = DbConn.createNative() if ( - gConfig.connector_type == 'native') else DbConn.createRest() + if gConfig.connector_type == 'native': + self._dbConn = DbConn.createNative() + elif gConfig.connector_type == 'rest': + self._dbConn = DbConn.createRest() + elif gConfig.connector_type == 'mixed': + if Dice.throw(2) == 0: # 1/2 chance + self._dbConn = DbConn.createNative() + else: + self._dbConn = DbConn.createRest() + else: + raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type)) self._dbInUse = False # if "use db" was executed already @@ -130,22 +139,15 @@ class WorkerThread: while True: tc = self._tc # Thread Coordinator, the overall master tc.crossStepBarrier() # shared barrier first, INCLUDING the last one - logger.debug( - "[TRD] Worker thread [{}] exited barrier...".format( - self._tid)) + logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid)) self.crossStepGate() # then per-thread gate, after being tapped - logger.debug( - "[TRD] Worker thread [{}] exited step gate...".format( - self._tid)) + logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid)) if not self._tc.isRunning(): - logger.debug( - "[TRD] Thread Coordinator not running any more, worker thread now stopping...") + logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break # Fetch a task from the Thread Coordinator - logger.debug( - "[TRD] Worker thread [{}] about to fetch task".format( - self._tid)) + logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid)) task = tc.fetchTask() # Execute such a task @@ -154,9 +156,7 @@ class WorkerThread: self._tid, task.__class__.__name__)) task.execute(self) tc.saveExecutedTask(task) - logger.debug( - "[TRD] Worker thread [{}] finished executing task".format( - self._tid)) + logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) self._dbInUse = False # there may be changes between steps @@ -255,101 +255,124 @@ class ThreadCoordinator: self._runStatus = MainExec.STATUS_STOPPING self._execStats.registerFailure("User Interruption") + def _runShouldEnd(self, transitionFailed, hasAbortedTask): + maxSteps = gConfig.max_steps # type: ignore + if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9 + return True + if self._runStatus != MainExec.STATUS_RUNNING: + return True + if transitionFailed: + return True + if hasAbortedTask: + return True + return False + + def _hasAbortedTask(self): # from execution of previous step + for task in self._executedTasks: + if task.isAborted(): + # print("Task aborted: {}".format(task)) + # hasAbortedTask = True + return True + return False + + def _releaseAllWorkerThreads(self, transitionFailed): + self._curStep += 1 # we are about to get into next step. TODO: race condition here! + # Now not all threads had time to go to sleep + logger.debug( + "--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) + + # A new TE for the new step + self._te = None # set to empty first, to signal worker thread to stop + if not transitionFailed: # only if not failed + self._te = TaskExecutor(self._curStep) + + logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format( + self._curStep)) # Now not all threads had time to go to sleep + # Worker threads will wake up at this point, and each execute it's own task + self.tapAllThreads() # release all worker thread from their "gate" + + def _syncAtBarrier(self): + # Now main thread (that's us) is ready to enter a step + # let other threads go past the pool barrier, but wait at the + # thread gate + logger.debug("[TRD] Main thread about to cross the barrier") + self.crossStepBarrier() + self._stepBarrier.reset() # Other worker threads should now be at the "gate" + logger.debug("[TRD] Main thread finished crossing the barrier") + + def _doTransition(self): + transitionFailed = False + try: + sm = self._dbManager.getStateMachine() + logger.debug("[STT] starting transitions") + # at end of step, transiton the DB state + sm.transition(self._executedTasks) + logger.debug("[STT] transition ended") + # Due to limitation (or maybe not) of the Python library, + # we cannot share connections across threads + if sm.hasDatabase(): + for t in self._pool.threadList: + logger.debug("[DB] use db for all worker threads") + t.useDb() + # t.execSql("use db") # main thread executing "use + # db" on behalf of every worker thread + except taos.error.ProgrammingError as err: + if (err.msg == 'network unavailable'): # broken DB connection + logger.info("DB connection broken, execution failed") + traceback.print_stack() + transitionFailed = True + self._te = None # Not running any more + self._execStats.registerFailure("Broken DB Connection") + # continue # don't do that, need to tap all threads at + # end, and maybe signal them to stop + else: + raise + + self.resetExecutedTasks() # clear the tasks after we are done + # Get ready for next step + logger.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed)) + return transitionFailed + def run(self): self._pool.createAndStartThreads(self) # Coordinate all threads step by step self._curStep = -1 # not started yet - maxSteps = gConfig.max_steps # type: ignore + self._execStats.startExec() # start the stop watch transitionFailed = False hasAbortedTask = False - while(self._curStep < maxSteps - 1 and - (not transitionFailed) and - (self._runStatus == MainExec.STATUS_RUNNING) and - (not hasAbortedTask)): # maxStep==10, last curStep should be 9 - - if not gConfig.debug: - # print this only if we are not in debug mode + while not self._runShouldEnd(transitionFailed, hasAbortedTask): + if not gConfig.debug: # print this only if we are not in debug mode print(".", end="", flush=True) - logger.debug("[TRD] Main thread going to sleep") - - # Now main thread (that's us) is ready to enter a step - # let other threads go past the pool barrier, but wait at the - # thread gate - self.crossStepBarrier() - self._stepBarrier.reset() # Other worker threads should now be at the "gate" + + self._syncAtBarrier() # For now just cross the barrier # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" # We use this period to do house keeping work, when all worker # threads are QUIET. - hasAbortedTask = False - for task in self._executedTasks: - if task.isAborted(): - print("Task aborted: {}".format(task)) - hasAbortedTask = True - break - - if hasAbortedTask: # do transition only if tasks are error free + hasAbortedTask = self._hasAbortedTask() # from previous step + if hasAbortedTask: + logger.info("Aborted task encountered, exiting test program") self._execStats.registerFailure("Aborted Task Encountered") - else: - try: - sm = self._dbManager.getStateMachine() - logger.debug("[STT] starting transitions") - # at end of step, transiton the DB state - sm.transition(self._executedTasks) - logger.debug("[STT] transition ended") - # Due to limitation (or maybe not) of the Python library, - # we cannot share connections across threads - if sm.hasDatabase(): - for t in self._pool.threadList: - logger.debug("[DB] use db for all worker threads") - t.useDb() - # t.execSql("use db") # main thread executing "use - # db" on behalf of every worker thread - except taos.error.ProgrammingError as err: - if (err.msg == 'network unavailable'): # broken DB connection - logger.info("DB connection broken, execution failed") - traceback.print_stack() - transitionFailed = True - self._te = None # Not running any more - self._execStats.registerFailure("Broken DB Connection") - # continue # don't do that, need to tap all threads at - # end, and maybe signal them to stop - else: - raise - # finally: - # pass + break # do transition only if tasks are error free - self.resetExecutedTasks() # clear the tasks after we are done + # Ending previous step + transitionFailed = self._doTransition() # To start, we end step -1 first + # Then we move on to the next step + self._releaseAllWorkerThreads(transitionFailed) - # Get ready for next step - logger.debug("<-- Step {} finished".format(self._curStep)) - self._curStep += 1 # we are about to get into next step. TODO: race condition here! - # Now not all threads had time to go to sleep - logger.debug( - "\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) + if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate" + logger.debug("Abnormal ending of main thraed") + else: # regular ending, workers waiting at "barrier" + logger.debug("Regular ending, main thread waiting for all worker threads to stop...") + self._syncAtBarrier() - # A new TE for the new step - if not transitionFailed: # only if not failed - self._te = TaskExecutor(self._curStep) - - logger.debug( - "[TRD] Main thread waking up at step {}, tapping worker threads".format( - self._curStep)) # Now not all threads had time to go to sleep - # Worker threads will wake up at this point, and each execute it's - # own task - self.tapAllThreads() - - logger.debug("Main thread ready to finish up...") - if not transitionFailed: # only in regular situations - self.crossStepBarrier() # Cross it one last time, after all threads finish - self._stepBarrier.reset() - logger.debug("Main thread in exclusive zone...") - self._te = None # No more executor, time to end - logger.debug("Main thread tapping all threads one last time...") - self.tapAllThreads() # Let the threads run one last time + self._te = None # No more executor, time to end + logger.debug("Main thread tapping all threads one last time...") + self.tapAllThreads() # Let the threads run one last time + logger.debug("\r\n\n--> Main thread ready to finish up...") logger.debug("Main thread joining all threads") self._pool.joinAll() # Get all threads to finish logger.info("\nAll worker threads finished") @@ -514,7 +537,7 @@ class LinearQueue(): class DbConn: TYPE_NATIVE = "native-c" - TYPE_REST = "rest-api" + TYPE_REST = "rest-api" TYPE_INVALID = "invalid" @classmethod @@ -620,9 +643,13 @@ class DbConnRest(DbConn): self.isOpen = False def _doSql(self, sql): - r = requests.post(self._url, - data=sql, - auth=HTTPBasicAuth('root', 'taosdata')) + try: + r = requests.post(self._url, + data = sql, + auth = HTTPBasicAuth('root', 'taosdata')) + except: + print("REST API Failure (TODO: more info here)") + raise rj = r.json() # Sanity check for the "Json Result" if ('status' not in rj): @@ -717,7 +744,7 @@ class MyTDSql: class DbConnNative(DbConn): def __init__(self): super().__init__() - self._type = self.TYPE_REST + self._type = self.TYPE_NATIVE self._conn = None self._cursor = None @@ -736,10 +763,16 @@ class DbConnNative(DbConn): break return buildPath + connInfoDisplayed = False def openByType(self): # Open connection cfgPath = self.getBuildPath() + "/test/cfg" + hostAddr = "127.0.0.1" + if not self.connInfoDisplayed: + logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath)) + self.connInfoDisplayed = True + self._conn = taos.connect( - host="127.0.0.1", + host=hostAddr, config=cfgPath) # TODO: make configurable self._cursor = self._conn.cursor() @@ -2254,8 +2287,9 @@ class ClientManager: def sigIntHandler(self, signalNumber, frame): if self._status != MainExec.STATUS_RUNNING: - print("Ignoring repeated SIGINT...") - return # do nothing if it's already not running + print("Repeated SIGINT received, forced exit...") + # return # do nothing if it's already not running + sys.exit(-1) self._status = MainExec.STATUS_STOPPING # immediately set our status print("Terminating program...") @@ -2394,6 +2428,27 @@ def main(): ''')) + # parser.add_argument('-a', '--auto-start-service', action='store_true', + # help='Automatically start/stop the TDengine service (default: false)') + # parser.add_argument('-c', '--connector-type', action='store', default='native', type=str, + # help='Connector type to use: native, rest, or mixed (default: 10)') + # parser.add_argument('-d', '--debug', action='store_true', + # help='Turn on DEBUG mode for more logging (default: false)') + # parser.add_argument('-e', '--run-tdengine', action='store_true', + # help='Run TDengine service in foreground (default: false)') + # parser.add_argument('-l', '--larger-data', action='store_true', + # help='Write larger amount of data during write operations (default: false)') + # parser.add_argument('-p', '--per-thread-db-connection', action='store_true', + # help='Use a single shared db connection (default: false)') + # parser.add_argument('-r', '--record-ops', action='store_true', + # help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') + # parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int, + # help='Maximum number of steps to run (default: 100)') + # parser.add_argument('-t', '--num-threads', action='store', default=5, type=int, + # help='Number of threads to run (default: 10)') + # parser.add_argument('-x', '--continue-on-exception', action='store_true', + # help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)') + parser.add_argument( '-a', '--auto-start-service', diff --git a/tests/pytest/util/dnodes-no-random-fail.py b/tests/pytest/util/dnodes-no-random-fail.py new file mode 100644 index 0000000000..88b2049464 --- /dev/null +++ b/tests/pytest/util/dnodes-no-random-fail.py @@ -0,0 +1,501 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +import os.path +import subprocess +from util.log import * + + +class TDSimClient: + def __init__(self): + self.testCluster = False + + self.cfgDict = { + "numOfLogLines": "100000000", + "numOfThreadsPerCore": "2.0", + "locale": "en_US.UTF-8", + "charset": "UTF-8", + "asyncLog": "0", + "anyIp": "0", + "sdbDebugFlag": "135", + "rpcDebugFlag": "135", + "tmrDebugFlag": "131", + "cDebugFlag": "135", + "udebugFlag": "135", + "jnidebugFlag": "135", + "qdebugFlag": "135", + } + + def init(self, path): + self.__init__() + self.path = path + + def getLogDir(self): + self.logDir = "%s/sim/psim/log" % (self.path) + return self.logDir + + def getCfgDir(self): + self.cfgDir = "%s/sim/psim/cfg" % (self.path) + return self.cfgDir + + def setTestCluster(self, value): + self.testCluster = value + + def addExtraCfg(self, option, value): + self.cfgDict.update({option: value}) + + def cfg(self, option, value): + cmd = "echo '%s %s' >> %s" % (option, value, self.cfgPath) + if os.system(cmd) != 0: + tdLog.exit(cmd) + + def deploy(self): + self.logDir = "%s/sim/psim/log" % (self.path) + self.cfgDir = "%s/sim/psim/cfg" % (self.path) + self.cfgPath = "%s/sim/psim/cfg/taos.cfg" % (self.path) + + cmd = "rm -rf " + self.logDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "mkdir -p " + self.logDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "rm -rf " + self.cfgDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "mkdir -p " + self.cfgDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "touch " + self.cfgPath + if os.system(cmd) != 0: + tdLog.exit(cmd) + + if self.testCluster: + self.cfg("masterIp", "192.168.0.1") + self.cfg("secondIp", "192.168.0.2") + self.cfg("logDir", self.logDir) + + for key, value in self.cfgDict.items(): + self.cfg(key, value) + + tdLog.debug("psim is deployed and configured by %s" % (self.cfgPath)) + + +class TDDnode: + def __init__(self, index): + self.index = index + self.running = 0 + self.deployed = 0 + self.testCluster = False + self.valgrind = 0 + + def init(self, path): + self.path = path + + def setTestCluster(self, value): + self.testCluster = value + + def setValgrind(self, value): + self.valgrind = value + + def getDataSize(self): + totalSize = 0 + + if (self.deployed == 1): + for dirpath, dirnames, filenames in os.walk(self.dataDir): + for f in filenames: + fp = os.path.join(dirpath, f) + + if not os.path.islink(fp): + totalSize = totalSize + os.path.getsize(fp) + + return totalSize + + def deploy(self): + self.logDir = "%s/sim/dnode%d/log" % (self.path, self.index) + self.dataDir = "%s/sim/dnode%d/data" % (self.path, self.index) + self.cfgDir = "%s/sim/dnode%d/cfg" % (self.path, self.index) + self.cfgPath = "%s/sim/dnode%d/cfg/taos.cfg" % ( + self.path, self.index) + + cmd = "rm -rf " + self.dataDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "rm -rf " + self.logDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "rm -rf " + self.cfgDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "mkdir -p " + self.dataDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "mkdir -p " + self.logDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "mkdir -p " + self.cfgDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "touch " + self.cfgPath + if os.system(cmd) != 0: + tdLog.exit(cmd) + + if self.testCluster: + self.startIP() + + if self.testCluster: + self.cfg("masterIp", "192.168.0.1") + self.cfg("secondIp", "192.168.0.2") + self.cfg("publicIp", "192.168.0.%d" % (self.index)) + self.cfg("internalIp", "192.168.0.%d" % (self.index)) + self.cfg("privateIp", "192.168.0.%d" % (self.index)) + self.cfg("dataDir", self.dataDir) + self.cfg("logDir", self.logDir) + self.cfg("numOfLogLines", "100000000") + self.cfg("mnodeEqualVnodeNum", "0") + self.cfg("walLevel", "2") + self.cfg("fsync", "1000") + self.cfg("statusInterval", "1") + self.cfg("numOfTotalVnodes", "64") + self.cfg("numOfMnodes", "3") + self.cfg("numOfThreadsPerCore", "2.0") + self.cfg("monitor", "0") + self.cfg("maxVnodeConnections", "30000") + self.cfg("maxMgmtConnections", "30000") + self.cfg("maxMeterConnections", "30000") + self.cfg("maxShellConns", "30000") + self.cfg("locale", "en_US.UTF-8") + self.cfg("charset", "UTF-8") + self.cfg("asyncLog", "0") + self.cfg("anyIp", "0") + self.cfg("dDebugFlag", "135") + self.cfg("mDebugFlag", "135") + self.cfg("sdbDebugFlag", "135") + self.cfg("rpcDebugFlag", "135") + self.cfg("tmrDebugFlag", "131") + self.cfg("cDebugFlag", "135") + self.cfg("httpDebugFlag", "135") + self.cfg("monitorDebugFlag", "135") + self.cfg("udebugFlag", "135") + self.cfg("jnidebugFlag", "135") + self.cfg("qdebugFlag", "135") + self.deployed = 1 + tdLog.debug( + "dnode:%d is deployed and configured by %s" % + (self.index, self.cfgPath)) + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root)-len("/build/bin")] + break + return buildPath + + def start(self): + buildPath = self.getBuildPath() + + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + + binPath = buildPath + "/build/bin/taosd" + + if self.deployed == 0: + tdLog.exit("dnode:%d is not deployed" % (self.index)) + + if self.valgrind == 0: + cmd = "nohup %s -c %s --random-file-fail-factor 0 > /dev/null 2>&1 & " % ( + binPath, self.cfgDir) + else: + valgrindCmdline = "valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes" + + cmd = "nohup %s %s -c %s 2>&1 & " % ( + valgrindCmdline, binPath, self.cfgDir) + + print(cmd) + + if os.system(cmd) != 0: + tdLog.exit(cmd) + self.running = 1 + tdLog.debug("dnode:%d is running with %s " % (self.index, cmd)) + + tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index)) + time.sleep(5) + + def stop(self): + if self.valgrind == 0: + toBeKilled = "taosd" + else: + toBeKilled = "valgrind.bin" + + if self.running != 0: + psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + + while(processID): + killCmd = "kill -INT %s > /dev/null 2>&1" % processID + os.system(killCmd) + time.sleep(1) + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + for port in range(6030, 6041): + fuserCmd = "fuser -k -n tcp %d" % port + os.system(fuserCmd) + if self.valgrind: + time.sleep(2) + + self.running = 0 + tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index)) + + def forcestop(self): + if self.valgrind == 0: + toBeKilled = "taosd" + else: + toBeKilled = "valgrind.bin" + + if self.running != 0: + psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + + while(processID): + killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + os.system(killCmd) + time.sleep(1) + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + for port in range(6030, 6041): + fuserCmd = "fuser -k -n tcp %d" % port + os.system(fuserCmd) + if self.valgrind: + time.sleep(2) + + self.running = 0 + tdLog.debug("dnode:%d is stopped by kill -KILL" % (self.index)) + + def startIP(self): + cmd = "sudo ifconfig lo:%d 192.168.0.%d up" % (self.index, self.index) + if os.system(cmd) != 0: + tdLog.exit(cmd) + + def stopIP(self): + cmd = "sudo ifconfig lo:%d 192.168.0.%d down" % ( + self.index, self.index) + if os.system(cmd) != 0: + tdLog.exit(cmd) + + def cfg(self, option, value): + cmd = "echo '%s %s' >> %s" % (option, value, self.cfgPath) + if os.system(cmd) != 0: + tdLog.exit(cmd) + + def getDnodeRootDir(self, index): + dnodeRootDir = "%s/sim/psim/dnode%d" % (self.path, index) + return dnodeRootDir + + def getDnodesRootDir(self): + dnodesRootDir = "%s/sim/psim" % (self.path) + return dnodesRootDir + + +class TDDnodes: + def __init__(self): + self.dnodes = [] + self.dnodes.append(TDDnode(1)) + self.dnodes.append(TDDnode(2)) + self.dnodes.append(TDDnode(3)) + self.dnodes.append(TDDnode(4)) + self.dnodes.append(TDDnode(5)) + self.dnodes.append(TDDnode(6)) + self.dnodes.append(TDDnode(7)) + self.dnodes.append(TDDnode(8)) + self.dnodes.append(TDDnode(9)) + self.dnodes.append(TDDnode(10)) + self.simDeployed = False + + def init(self, path): + psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") + while(processID): + killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + os.system(killCmd) + time.sleep(1) + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + + psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") + while(processID): + killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + os.system(killCmd) + time.sleep(1) + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + + binPath = os.path.dirname(os.path.realpath(__file__)) + binPath = binPath + "/../../../debug/" + tdLog.debug("binPath %s" % (binPath)) + binPath = os.path.realpath(binPath) + tdLog.debug("binPath real path %s" % (binPath)) + + # cmd = "sudo cp %s/build/lib/libtaos.so /usr/local/lib/taos/" % (binPath) + # tdLog.debug(cmd) + # os.system(cmd) + + # cmd = "sudo cp %s/build/bin/taos /usr/local/bin/taos/" % (binPath) + # if os.system(cmd) != 0 : + # tdLog.exit(cmd) + # tdLog.debug("execute %s" % (cmd)) + + # cmd = "sudo cp %s/build/bin/taosd /usr/local/bin/taos/" % (binPath) + # if os.system(cmd) != 0 : + # tdLog.exit(cmd) + # tdLog.debug("execute %s" % (cmd)) + + if path == "": + # self.path = os.path.expanduser('~') + self.path = os.path.abspath(binPath + "../../") + else: + self.path = os.path.realpath(path) + + for i in range(len(self.dnodes)): + self.dnodes[i].init(self.path) + + self.sim = TDSimClient() + self.sim.init(self.path) + + def setTestCluster(self, value): + self.testCluster = value + + def setValgrind(self, value): + self.valgrind = value + + def deploy(self, index): + self.sim.setTestCluster(self.testCluster) + + if (self.simDeployed == False): + self.sim.deploy() + self.simDeployed = True + + self.check(index) + self.dnodes[index - 1].setTestCluster(self.testCluster) + self.dnodes[index - 1].setValgrind(self.valgrind) + self.dnodes[index - 1].deploy() + + def cfg(self, index, option, value): + self.check(index) + self.dnodes[index - 1].cfg(option, value) + + def start(self, index): + self.check(index) + self.dnodes[index - 1].start() + + def stop(self, index): + self.check(index) + self.dnodes[index - 1].stop() + + def getDataSize(self, index): + self.check(index) + return self.dnodes[index - 1].getDataSize() + + def forcestop(self, index): + self.check(index) + self.dnodes[index - 1].forcestop() + + def startIP(self, index): + self.check(index) + + if self.testCluster: + self.dnodes[index - 1].startIP() + + def stopIP(self, index): + self.check(index) + + if self.dnodes[index - 1].testCluster: + self.dnodes[index - 1].stopIP() + + def check(self, index): + if index < 1 or index > 10: + tdLog.exit("index:%d should on a scale of [1, 10]" % (index)) + + def stopAll(self): + tdLog.info("stop all dnodes") + for i in range(len(self.dnodes)): + self.dnodes[i].stop() + + psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep | awk '{print $2}'" + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") + if processID: + cmd = "sudo systemctl stop taosd" + os.system(cmd) + # if os.system(cmd) != 0 : + # tdLog.exit(cmd) + psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") + while(processID): + killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + os.system(killCmd) + time.sleep(1) + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + + psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") + while(processID): + killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + os.system(killCmd) + time.sleep(1) + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + + # if os.system(cmd) != 0 : + # tdLog.exit(cmd) + + def getDnodesRootDir(self): + dnodesRootDir = "%s/sim" % (self.path) + return dnodesRootDir + + def getSimCfgPath(self): + return self.sim.getCfgDir() + + def getSimLogPath(self): + return self.sim.getLogDir() + + def addSimExtraCfg(self, option, value): + self.sim.addExtraCfg(option, value) + + +tdDnodes = TDDnodes() diff --git a/tests/pytest/util/dnodes-random-fail.py b/tests/pytest/util/dnodes-random-fail.py index db3a5fea93..df4af0c58b 100644 --- a/tests/pytest/util/dnodes-random-fail.py +++ b/tests/pytest/util/dnodes-random-fail.py @@ -175,7 +175,8 @@ class TDDnode: self.cfg("logDir", self.logDir) self.cfg("numOfLogLines", "100000000") self.cfg("mnodeEqualVnodeNum", "0") - self.cfg("walLevel", "1") + self.cfg("walLevel", "2") + self.cfg("fsync", "1000") self.cfg("statusInterval", "1") self.cfg("numOfTotalVnodes", "64") self.cfg("numOfMnodes", "3") @@ -235,12 +236,12 @@ class TDDnode: tdLog.exit("dnode:%d is not deployed" % (self.index)) if self.valgrind == 0: - cmd = "nohup %s -c %s > /dev/null 2>&1 & " % ( + cmd = "nohup %s -c %s --alloc-random-fail --random-file-fail-factor 5 > /dev/null 2>&1 & " % ( binPath, self.cfgDir) else: valgrindCmdline = "valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes" - cmd = "nohup %s %s -c %s --random-file-fail-factor 5 2>&1 & " % ( + cmd = "nohup %s %s -c %s 2>&1 & " % ( valgrindCmdline, binPath, self.cfgDir) print(cmd) diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index b67f509cc6..56e63bae11 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -177,7 +177,8 @@ class TDDnode: self.cfg("logDir", self.logDir) self.cfg("numOfLogLines", "100000000") self.cfg("mnodeEqualVnodeNum", "0") - self.cfg("walLevel", "1") + self.cfg("walLevel", "2") + self.cfg("fsync", "1000") self.cfg("statusInterval", "1") self.cfg("numOfTotalVnodes", "64") self.cfg("numOfMnodes", "3") diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 55d66ad872..445baa9e45 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -110,24 +110,24 @@ echo "second ${HOSTNAME}:7200" >> $TAOS_CFG echo "serverPort ${NODE}" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG -echo "debugFlag 131" >> $TAOS_CFG -echo "mDebugFlag 131" >> $TAOS_CFG -echo "sdbDebugFlag 131" >> $TAOS_CFG -echo "dDebugFlag 131" >> $TAOS_CFG -echo "vDebugFlag 131" >> $TAOS_CFG -echo "tsdbDebugFlag 131" >> $TAOS_CFG +echo "debugFlag 0" >> $TAOS_CFG +echo "mDebugFlag 135" >> $TAOS_CFG +echo "sdbDebugFlag 135" >> $TAOS_CFG +echo "dDebugFlag 135" >> $TAOS_CFG +echo "vDebugFlag 135" >> $TAOS_CFG +echo "tsdbDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 135" >> $TAOS_CFG -echo "jnidebugFlag 131" >> $TAOS_CFG -echo "odbcdebugFlag 131" >> $TAOS_CFG +echo "jnidebugFlag 135" >> $TAOS_CFG +echo "odbcdebugFlag 135" >> $TAOS_CFG echo "httpDebugFlag 143" >> $TAOS_CFG -echo "monitorDebugFlag 131" >> $TAOS_CFG -echo "mqttDebugFlag 131" >> $TAOS_CFG -echo "qdebugFlag 131" >> $TAOS_CFG -echo "rpcDebugFlag 131" >> $TAOS_CFG +echo "monitorDebugFlag 135" >> $TAOS_CFG +echo "mqttDebugFlag 135" >> $TAOS_CFG +echo "qdebugFlag 135" >> $TAOS_CFG +echo "rpcDebugFlag 135" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG -echo "udebugFlag 131" >> $TAOS_CFG -echo "sdebugFlag 131" >> $TAOS_CFG -echo "wdebugFlag 131" >> $TAOS_CFG +echo "udebugFlag 143" >> $TAOS_CFG +echo "sdebugFlag 135" >> $TAOS_CFG +echo "wdebugFlag 135" >> $TAOS_CFG echo "monitor 0" >> $TAOS_CFG echo "monitorInterval 1" >> $TAOS_CFG echo "http 0" >> $TAOS_CFG diff --git a/tests/script/sh/exec-no-random-fail.sh b/tests/script/sh/exec-no-random-fail.sh new file mode 100755 index 0000000000..04a663bc5a --- /dev/null +++ b/tests/script/sh/exec-no-random-fail.sh @@ -0,0 +1,113 @@ +#!/bin/bash + +# if [ $# != 4 || $# != 5 ]; then + # echo "argument list need input : " + # echo " -n nodeName" + # echo " -s start/stop" + # echo " -c clear" + # exit 1 +# fi + +NODE_NAME= +EXEC_OPTON= +CLEAR_OPTION="false" +while getopts "n:s:u:x:ct" arg +do + case $arg in + n) + NODE_NAME=$OPTARG + ;; + s) + EXEC_OPTON=$OPTARG + ;; + c) + CLEAR_OPTION="clear" + ;; + t) + SHELL_OPTION="true" + ;; + u) + USERS=$OPTARG + ;; + x) + SIGNAL=$OPTARG + ;; + ?) + echo "unkown argument" + ;; + esac +done + +SCRIPT_DIR=`dirname $0` +cd $SCRIPT_DIR/../ +SCRIPT_DIR=`pwd` + +IN_TDINTERNAL="community" +if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then + cd ../../.. +else + cd ../../ +fi + +TAOS_DIR=`pwd` +TAOSD_DIR=`find . -name "taosd"|grep bin|head -n1` + +if [[ "$TAOSD_DIR" == *"$IN_TDINTERNAL"* ]]; then + BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3` +else + BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2` +fi + +BUILD_DIR=$TAOS_DIR/$BIN_DIR/build + +SIM_DIR=$TAOS_DIR/sim +NODE_DIR=$SIM_DIR/$NODE_NAME +EXE_DIR=$BUILD_DIR/bin +CFG_DIR=$NODE_DIR/cfg +LOG_DIR=$NODE_DIR/log +DATA_DIR=$NODE_DIR/data +MGMT_DIR=$NODE_DIR/data/mgmt +TSDB_DIR=$NODE_DIR/data/tsdb + +TAOS_CFG=$NODE_DIR/cfg/taos.cfg + +echo ------------ $EXEC_OPTON $NODE_NAME + +TAOS_FLAG=$SIM_DIR/tsim/flag +if [ -f "$TAOS_FLAG" ]; then + EXE_DIR=/usr/local/bin/taos +fi + +if [ "$CLEAR_OPTION" = "clear" ]; then + echo rm -rf $MGMT_DIR $TSDB_DIR + rm -rf $TSDB_DIR + rm -rf $MGMT_DIR +fi + +if [ "$EXEC_OPTON" = "start" ]; then + echo "ExcuteCmd:" $EXE_DIR/taosd -c $CFG_DIR + + if [ "$SHELL_OPTION" = "true" ]; then + nohup valgrind --log-file=${LOG_DIR}/valgrind.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 & + else + nohup $EXE_DIR/taosd -c $CFG_DIR --random-file-fail-factor 0 > /dev/null 2>&1 & + fi + +else + #relative path + RCFG_DIR=sim/$NODE_NAME/cfg + PID=`ps -ef|grep taosd | grep $RCFG_DIR | grep -v grep | awk '{print $2}'` + while [ -n "$PID" ] + do + if [ "$SIGNAL" = "SIGINT" ]; then + echo try to kill by signal SIGINT + kill -SIGINT $PID + else + echo try to kill by signal SIGKILL + kill -9 $PID + fi + sleep 1 + PID=`ps -ef|grep taosd | grep $RCFG_DIR | grep -v grep | awk '{print $2}'` + done +fi + diff --git a/tests/script/sh/exec-random-fail.sh b/tests/script/sh/exec-random-fail.sh index 7ba301617c..a354021684 100755 --- a/tests/script/sh/exec-random-fail.sh +++ b/tests/script/sh/exec-random-fail.sh @@ -90,7 +90,7 @@ if [ "$EXEC_OPTON" = "start" ]; then if [ "$SHELL_OPTION" = "true" ]; then nohup valgrind --log-file=${LOG_DIR}/valgrind.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 & else - nohup $EXE_DIR/taosd -c $CFG_DIR --random-file-fail-factor 5 > /dev/null 2>&1 & + nohup $EXE_DIR/taosd -c $CFG_DIR --alloc-random-fail --random-file-fail-factor 5 > /dev/null 2>&1 & fi else diff --git a/tests/script/tmp/mnodes.sim b/tests/script/tmp/mnodes.sim index afc068b3f1..b4e176f221 100644 --- a/tests/script/tmp/mnodes.sim +++ b/tests/script/tmp/mnodes.sim @@ -52,9 +52,9 @@ system sh/cfg.sh -n dnode1 -c qdebugFlag -v 131 system sh/cfg.sh -n dnode2 -c qdebugFlag -v 131 system sh/cfg.sh -n dnode3 -c qdebugFlag -v 131 -system sh/cfg.sh -n dnode1 -c cDebugFlag -v 131 -system sh/cfg.sh -n dnode2 -c cDebugFlag -v 131 -system sh/cfg.sh -n dnode3 -c cDebugFlag -v 131 +system sh/cfg.sh -n dnode1 -c cDebugFlag -v 135 +system sh/cfg.sh -n dnode2 -c cDebugFlag -v 135 +system sh/cfg.sh -n dnode3 -c cDebugFlag -v 135 system sh/cfg.sh -n dnode1 -c udebugFlag -v 131 system sh/cfg.sh -n dnode2 -c udebugFlag -v 131 @@ -64,6 +64,10 @@ system sh/cfg.sh -n dnode1 -c wdebugFlag -v 131 system sh/cfg.sh -n dnode2 -c wdebugFlag -v 131 system sh/cfg.sh -n dnode3 -c wdebugFlag -v 131 +system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 1000000 +system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 1000000 +system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 1000000 + print ============== deploy system sh/exec.sh -n dnode1 -s start diff --git a/tests/script/tmp/prepare.sim b/tests/script/tmp/prepare.sim index 459f6e7665..0a0c512b26 100644 --- a/tests/script/tmp/prepare.sim +++ b/tests/script/tmp/prepare.sim @@ -15,21 +15,13 @@ system sh/cfg.sh -n dnode2 -c numOfMnodes -v 1 system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1 system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode1 -c activeCode -v eglxDLzRpslJWl7OxrPZ2K3sQ5631AP9SVpezsaz2dhJWl7OxrPZ2ElaXs7Gs9nYSVpezsaz2djGIj5StnQ3ZvLHcsE8cwcN + system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4 system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4 system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4 system sh/cfg.sh -n dnode4 -c mnodeEqualVnodeNum -v 4 -system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 20 -system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 20 -system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 20 -system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 20 - -system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 2 -system sh/cfg.sh -n dnode2 -c maxVgroupsPerDb -v 2 -system sh/cfg.sh -n dnode3 -c maxVgroupsPerDb -v 2 -system sh/cfg.sh -n dnode4 -c maxVgroupsPerDb -v 2 - system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 100000 system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 100000 system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 100000 diff --git a/tests/test/c/createTablePerformance.c b/tests/test/c/createTablePerformance.c index 3edffd2a5e..2e334fa9b2 100644 --- a/tests/test/c/createTablePerformance.c +++ b/tests/test/c/createTablePerformance.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "taoserror.h" #include "taos.h" #include "tulog.h" #include "ttime.h" @@ -154,7 +155,7 @@ void *threadFunc(void *param) { TAOS_RES *pSql = taos_query(con, qstr); code = taos_errno(pSql); if (code != 0) { - pError("failed to create table %s%d, reason:%s", stableName, t, taos_errstr(con)); + pError("failed to create table %s%d, reason:%s", stableName, t, tstrerror(code)); } taos_free_result(pSql); } diff --git a/tests/tsim/inc/sim.h b/tests/tsim/inc/sim.h index 293427d240..18af21a506 100644 --- a/tests/tsim/inc/sim.h +++ b/tests/tsim/inc/sim.h @@ -53,10 +53,10 @@ #define simFatal(...) { if (simDebugFlag & DEBUG_FATAL) { taosPrintLog("SIM FATAL ", 255, __VA_ARGS__); }} #define simError(...) { if (simDebugFlag & DEBUG_ERROR) { taosPrintLog("SIM ERROR ", 255, __VA_ARGS__); }} -#define simWarn(...) { if (simDebugFlag & DEBUG_WARN) { taosPrintLog("SIM WARN ", 255, __VA_ARGS__); }} -#define simInfo(...) { if (simDebugFlag & DEBUG_INFO) { taosPrintLog("SIM INFO ", 255, __VA_ARGS__); }} -#define simDebug(...) { if (simDebugFlag & DEBUG_DEBUG) { taosPrintLog("SIM DEBUG ", simDebugFlag, __VA_ARGS__); }} -#define simTrace(...) { if (simDebugFlag & DEBUG_TRACE) { taosPrintLog("SIM TRACE ", simDebugFlag, __VA_ARGS__); }} +#define simWarn(...) { if (simDebugFlag & DEBUG_WARN) { taosPrintLog("SIM WARN ", 255, __VA_ARGS__); }} +#define simInfo(...) { if (simDebugFlag & DEBUG_INFO) { taosPrintLog("SIM ", 255, __VA_ARGS__); }} +#define simDebug(...) { if (simDebugFlag & DEBUG_DEBUG) { taosPrintLog("SIM ", simDebugFlag, __VA_ARGS__); }} +#define simTrace(...) { if (simDebugFlag & DEBUG_TRACE) { taosPrintLog("SIM ", simDebugFlag, __VA_ARGS__); }} enum { SIM_SCRIPT_TYPE_MAIN, SIM_SCRIPT_TYPE_BACKGROUND };