Merge pull request #2132 from taosdata/hotfix/taos-tools
Hotfix/taos tools
This commit is contained in:
commit
3c7b4637ff
|
@ -142,8 +142,8 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp
|
|||
* Method: consumeImp
|
||||
* Signature: (J)Lcom/taosdata/jdbc/TSDBResultSetRowData;
|
||||
*/
|
||||
JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp
|
||||
(JNIEnv *, jobject, jlong, jint);
|
||||
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp
|
||||
(JNIEnv *, jobject, jlong);
|
||||
|
||||
/*
|
||||
* Class: com_taosdata_jdbc_TSDBJNIConnector
|
||||
|
|
|
@ -551,8 +551,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionIm
|
|||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNIEnv *env, jobject jobj, jlong con,
|
||||
jboolean restart, jstring jtopic,
|
||||
jstring jsql, jint jinterval) {
|
||||
jboolean restart, jstring jtopic, jstring jsql, jint jinterval) {
|
||||
jlong sub = 0;
|
||||
TAOS *taos = (TAOS *)con;
|
||||
char *topic = NULL;
|
||||
|
@ -640,49 +639,23 @@ static jobject convert_one_row(JNIEnv *env, TAOS_ROW row, TAOS_FIELD *fields, in
|
|||
return rowobj;
|
||||
}
|
||||
|
||||
JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub,
|
||||
jint timeout) {
|
||||
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub) {
|
||||
jniTrace("jobj:%p, in TSDBJNIConnector_consumeImp, sub:%ld", jobj, sub);
|
||||
jniGetGlobalMethod(env);
|
||||
|
||||
TAOS_SUB *tsub = (TAOS_SUB *)sub;
|
||||
jobject rows = (*env)->NewObject(env, g_arrayListClass, g_arrayListConstructFp);
|
||||
|
||||
int64_t start = taosGetTimestampMs();
|
||||
int count = 0;
|
||||
int count = 0;
|
||||
|
||||
while (true) {
|
||||
TAOS_RES *res = taos_consume(tsub);
|
||||
if (res == NULL) {
|
||||
jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub);
|
||||
return NULL;
|
||||
}
|
||||
TAOS_RES *res = taos_consume(tsub);
|
||||
|
||||
TAOS_FIELD *fields = taos_fetch_fields(res);
|
||||
int num_fields = taos_num_fields(res);
|
||||
while (true) {
|
||||
TAOS_ROW row = taos_fetch_row(res);
|
||||
if (row == NULL) {
|
||||
break;
|
||||
}
|
||||
jobject rowobj = convert_one_row(env, row, fields, num_fields);
|
||||
(*env)->CallBooleanMethod(env, rows, g_arrayListAddFp, rowobj);
|
||||
count++;
|
||||
}
|
||||
|
||||
if (count > 0) {
|
||||
break;
|
||||
}
|
||||
if (timeout == -1) {
|
||||
continue;
|
||||
}
|
||||
if (((int)(taosGetTimestampMs() - start)) >= timeout) {
|
||||
jniTrace("jobj:%p, sub:%ld, timeout", jobj, sub);
|
||||
break;
|
||||
}
|
||||
if (res == NULL) {
|
||||
jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return rows;
|
||||
return res;
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp(JNIEnv *env, jobject jobj, jlong sub,
|
||||
|
|
|
@ -160,6 +160,7 @@ public class DatabaseMetaDataResultSet implements ResultSet {
|
|||
|
||||
@Override
|
||||
public Timestamp getTimestamp(int columnIndex) throws SQLException {
|
||||
columnIndex--;
|
||||
return rowCursor.getTimestamp(columnIndex);
|
||||
}
|
||||
|
||||
|
|
|
@ -84,6 +84,14 @@ public class TSDBConnection implements Connection {
|
|||
}
|
||||
}
|
||||
|
||||
public TSDBSubscribe createSubscribe() throws SQLException {
|
||||
if (!this.connector.isClosed()) {
|
||||
return new TSDBSubscribe(this.connector);
|
||||
} else {
|
||||
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
|
||||
}
|
||||
}
|
||||
|
||||
public PreparedStatement prepareStatement(String sql) throws SQLException {
|
||||
if (!this.connector.isClosed()) {
|
||||
return new TSDBPreparedStatement(this.connector, sql);
|
||||
|
|
|
@ -266,31 +266,31 @@ public class TSDBJNIConnector {
|
|||
/**
|
||||
* Subscribe to a table in TSDB
|
||||
*/
|
||||
public long subscribe(String host, String user, String password, String database, String table, long time, int period) {
|
||||
return subscribeImp(host, user, password, database, table, time, period);
|
||||
public long subscribe(String topic, String sql, boolean restart, int period) {
|
||||
return subscribeImp(this.taos, restart, topic, sql, period);
|
||||
}
|
||||
|
||||
private native long subscribeImp(String host, String user, String password, String database, String table, long time, int period);
|
||||
public native long subscribeImp(long connection, boolean restart, String topic, String sql, int period);
|
||||
|
||||
/**
|
||||
* Consume a subscribed table
|
||||
*/
|
||||
public TSDBResultSetRowData consume(long subscription) {
|
||||
public long consume(long subscription) {
|
||||
return this.consumeImp(subscription);
|
||||
}
|
||||
|
||||
private native TSDBResultSetRowData consumeImp(long subscription);
|
||||
private native long consumeImp(long subscription);
|
||||
|
||||
/**
|
||||
* Unsubscribe a table
|
||||
*
|
||||
* @param subscription
|
||||
*/
|
||||
public void unsubscribe(long subscription) {
|
||||
unsubscribeImp(subscription);
|
||||
public void unsubscribe(long subscription, boolean isKeep) {
|
||||
unsubscribeImp(subscription, isKeep);
|
||||
}
|
||||
|
||||
private native void unsubscribeImp(long subscription);
|
||||
private native void unsubscribeImp(long subscription, boolean isKeep);
|
||||
|
||||
/**
|
||||
* Validate if a <I>create table</I> sql statement is correct without actually creating that table
|
||||
|
|
|
@ -51,47 +51,47 @@ public class TSDBResultSet implements ResultSet {
|
|||
private boolean lastWasNull = false;
|
||||
private final int COLUMN_INDEX_START_VALUE = 1;
|
||||
|
||||
public TSDBJNIConnector getJniConnector() {
|
||||
return jniConnector;
|
||||
}
|
||||
public TSDBJNIConnector getJniConnector() {
|
||||
return jniConnector;
|
||||
}
|
||||
|
||||
public void setJniConnector(TSDBJNIConnector jniConnector) {
|
||||
this.jniConnector = jniConnector;
|
||||
}
|
||||
public void setJniConnector(TSDBJNIConnector jniConnector) {
|
||||
this.jniConnector = jniConnector;
|
||||
}
|
||||
|
||||
public long getResultSetPointer() {
|
||||
return resultSetPointer;
|
||||
}
|
||||
public long getResultSetPointer() {
|
||||
return resultSetPointer;
|
||||
}
|
||||
|
||||
public void setResultSetPointer(long resultSetPointer) {
|
||||
this.resultSetPointer = resultSetPointer;
|
||||
}
|
||||
public void setResultSetPointer(long resultSetPointer) {
|
||||
this.resultSetPointer = resultSetPointer;
|
||||
}
|
||||
|
||||
public List<ColumnMetaData> getColumnMetaDataList() {
|
||||
return columnMetaDataList;
|
||||
}
|
||||
public List<ColumnMetaData> getColumnMetaDataList() {
|
||||
return columnMetaDataList;
|
||||
}
|
||||
|
||||
public void setColumnMetaDataList(List<ColumnMetaData> columnMetaDataList) {
|
||||
this.columnMetaDataList = columnMetaDataList;
|
||||
}
|
||||
public void setColumnMetaDataList(List<ColumnMetaData> columnMetaDataList) {
|
||||
this.columnMetaDataList = columnMetaDataList;
|
||||
}
|
||||
|
||||
public TSDBResultSetRowData getRowData() {
|
||||
return rowData;
|
||||
}
|
||||
public TSDBResultSetRowData getRowData() {
|
||||
return rowData;
|
||||
}
|
||||
|
||||
public void setRowData(TSDBResultSetRowData rowData) {
|
||||
this.rowData = rowData;
|
||||
}
|
||||
public void setRowData(TSDBResultSetRowData rowData) {
|
||||
this.rowData = rowData;
|
||||
}
|
||||
|
||||
public boolean isLastWasNull() {
|
||||
return lastWasNull;
|
||||
}
|
||||
public boolean isLastWasNull() {
|
||||
return lastWasNull;
|
||||
}
|
||||
|
||||
public void setLastWasNull(boolean lastWasNull) {
|
||||
this.lastWasNull = lastWasNull;
|
||||
}
|
||||
public void setLastWasNull(boolean lastWasNull) {
|
||||
this.lastWasNull = lastWasNull;
|
||||
}
|
||||
|
||||
public TSDBResultSet() {
|
||||
public TSDBResultSet() {
|
||||
}
|
||||
|
||||
public TSDBResultSet(TSDBJNIConnector connecter, long resultSetPointer) throws SQLException {
|
||||
|
@ -119,7 +119,7 @@ public class TSDBResultSet implements ResultSet {
|
|||
|
||||
public boolean next() throws SQLException {
|
||||
if (rowData != null) {
|
||||
this.rowData.clear();
|
||||
this.rowData.clear();
|
||||
}
|
||||
|
||||
int code = this.jniConnector.fetchRow(this.resultSetPointer, this.rowData);
|
||||
|
@ -154,119 +154,119 @@ public class TSDBResultSet implements ResultSet {
|
|||
public String getString(int columnIndex) throws SQLException {
|
||||
String res = null;
|
||||
int colIndex = getTrueColumnIndex(columnIndex);
|
||||
|
||||
this.lastWasNull = this.rowData.wasNull(colIndex);
|
||||
if (!lastWasNull) {
|
||||
res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
|
||||
this.lastWasNull = this.rowData.wasNull(colIndex);
|
||||
if (!lastWasNull) {
|
||||
res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public boolean getBoolean(int columnIndex) throws SQLException {
|
||||
boolean res = false;
|
||||
boolean res = false;
|
||||
int colIndex = getTrueColumnIndex(columnIndex);
|
||||
|
||||
this.lastWasNull = this.rowData.wasNull(colIndex);
|
||||
if (!lastWasNull) {
|
||||
res = this.rowData.getBoolean(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
res = this.rowData.getBoolean(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public byte getByte(int columnIndex) throws SQLException {
|
||||
byte res = 0;
|
||||
byte res = 0;
|
||||
int colIndex = getTrueColumnIndex(columnIndex);
|
||||
|
||||
this.lastWasNull = this.rowData.wasNull(colIndex);
|
||||
if (!lastWasNull) {
|
||||
res = (byte) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
res = (byte) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public short getShort(int columnIndex) throws SQLException {
|
||||
short res = 0;
|
||||
short res = 0;
|
||||
int colIndex = getTrueColumnIndex(columnIndex);
|
||||
|
||||
this.lastWasNull = this.rowData.wasNull(colIndex);
|
||||
if (!lastWasNull) {
|
||||
res = (short) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
res = (short) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public int getInt(int columnIndex) throws SQLException {
|
||||
int res = 0;
|
||||
int res = 0;
|
||||
int colIndex = getTrueColumnIndex(columnIndex);
|
||||
|
||||
this.lastWasNull = this.rowData.wasNull(colIndex);
|
||||
if (!lastWasNull) {
|
||||
res = this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
if (!lastWasNull) {
|
||||
res = this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public long getLong(int columnIndex) throws SQLException {
|
||||
long res = 0l;
|
||||
long res = 0l;
|
||||
int colIndex = getTrueColumnIndex(columnIndex);
|
||||
|
||||
this.lastWasNull = this.rowData.wasNull(colIndex);
|
||||
if (!lastWasNull) {
|
||||
res = this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
if (!lastWasNull) {
|
||||
res = this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public float getFloat(int columnIndex) throws SQLException {
|
||||
float res = 0;
|
||||
int colIndex = getTrueColumnIndex(columnIndex);
|
||||
float res = 0;
|
||||
int colIndex = getTrueColumnIndex(columnIndex);
|
||||
|
||||
this.lastWasNull = this.rowData.wasNull(colIndex);
|
||||
if (!lastWasNull) {
|
||||
res = this.rowData.getFloat(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
if (!lastWasNull) {
|
||||
res = this.rowData.getFloat(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public double getDouble(int columnIndex) throws SQLException {
|
||||
double res = 0;
|
||||
double res = 0;
|
||||
int colIndex = getTrueColumnIndex(columnIndex);
|
||||
|
||||
this.lastWasNull = this.rowData.wasNull(colIndex);
|
||||
if (!lastWasNull) {
|
||||
res = this.rowData.getDouble(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
if (!lastWasNull) {
|
||||
res = this.rowData.getDouble(colIndex, this.columnMetaDataList.get(colIndex).getColType());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
*
|
||||
* @see java.sql.ResultSet#getBigDecimal(int, int)
|
||||
*
|
||||
*
|
||||
* @deprecated Use {@code getBigDecimal(int columnIndex)} or {@code
|
||||
* getBigDecimal(String columnLabel)}
|
||||
*/
|
||||
@Deprecated
|
||||
public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
|
||||
BigDecimal res = null;
|
||||
BigDecimal res = null;
|
||||
int colIndex = getTrueColumnIndex(columnIndex);
|
||||
|
||||
this.lastWasNull = this.rowData.wasNull(colIndex);
|
||||
if (!lastWasNull) {
|
||||
res = new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()));
|
||||
}
|
||||
if (!lastWasNull) {
|
||||
res = new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public byte[] getBytes(int columnIndex) throws SQLException {
|
||||
byte[] res = null;
|
||||
byte[] res = null;
|
||||
int colIndex = getTrueColumnIndex(columnIndex);
|
||||
|
||||
this.lastWasNull = this.rowData.wasNull(colIndex);
|
||||
if (!lastWasNull) {
|
||||
res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType()).getBytes();
|
||||
}
|
||||
if (!lastWasNull) {
|
||||
res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType()).getBytes();
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -281,13 +281,13 @@ public class TSDBResultSet implements ResultSet {
|
|||
}
|
||||
|
||||
public Timestamp getTimestamp(int columnIndex) throws SQLException {
|
||||
Timestamp res = null;
|
||||
Timestamp res = null;
|
||||
int colIndex = getTrueColumnIndex(columnIndex);
|
||||
|
||||
this.lastWasNull = this.rowData.wasNull(colIndex);
|
||||
if (!lastWasNull) {
|
||||
res = this.rowData.getTimestamp(colIndex);
|
||||
}
|
||||
if (!lastWasNull) {
|
||||
res = this.rowData.getTimestamp(colIndex);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -297,9 +297,9 @@ public class TSDBResultSet implements ResultSet {
|
|||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
*
|
||||
* @see java.sql.ResultSet#getUnicodeStream(int)
|
||||
*
|
||||
*
|
||||
* * @deprecated use <code>getCharacterStream</code> in place of
|
||||
* <code>getUnicodeStream</code>
|
||||
*/
|
||||
|
@ -409,13 +409,13 @@ public class TSDBResultSet implements ResultSet {
|
|||
}
|
||||
|
||||
public int findColumn(String columnLabel) throws SQLException {
|
||||
Iterator<ColumnMetaData> colMetaDataIt = this.columnMetaDataList.iterator();
|
||||
while (colMetaDataIt.hasNext()) {
|
||||
ColumnMetaData colMetaData = colMetaDataIt.next();
|
||||
if (colMetaData.getColName() != null && colMetaData.getColName().equalsIgnoreCase(columnLabel)) {
|
||||
return colMetaData.getColIndex() + 1;
|
||||
}
|
||||
}
|
||||
Iterator<ColumnMetaData> colMetaDataIt = this.columnMetaDataList.iterator();
|
||||
while (colMetaDataIt.hasNext()) {
|
||||
ColumnMetaData colMetaData = colMetaDataIt.next();
|
||||
if (colMetaData.getColName() != null && colMetaData.getColName().equalsIgnoreCase(columnLabel)) {
|
||||
return colMetaData.getColIndex() + 1;
|
||||
}
|
||||
}
|
||||
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
|
||||
}
|
||||
|
||||
|
@ -882,7 +882,7 @@ public class TSDBResultSet implements ResultSet {
|
|||
}
|
||||
|
||||
public String getNString(int columnIndex) throws SQLException {
|
||||
int colIndex = getTrueColumnIndex(columnIndex);
|
||||
int colIndex = getTrueColumnIndex(columnIndex);
|
||||
return (String) rowData.get(colIndex);
|
||||
}
|
||||
|
||||
|
@ -1017,17 +1017,17 @@ public class TSDBResultSet implements ResultSet {
|
|||
public <T> T getObject(String columnLabel, Class<T> type) throws SQLException {
|
||||
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
|
||||
}
|
||||
|
||||
|
||||
private int getTrueColumnIndex(int columnIndex) throws SQLException {
|
||||
if (columnIndex < this.COLUMN_INDEX_START_VALUE) {
|
||||
throw new SQLException("Column Index out of range, " + columnIndex + " < " + this.COLUMN_INDEX_START_VALUE);
|
||||
}
|
||||
|
||||
|
||||
int numOfCols = this.columnMetaDataList.size();
|
||||
if (columnIndex > numOfCols) {
|
||||
throw new SQLException("Column Index out of range, " + columnIndex + " > " + numOfCols);
|
||||
}
|
||||
|
||||
|
||||
return columnIndex - 1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,185 @@
|
|||
/***************************************************************************
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*****************************************************************************/
|
||||
package com.taosdata.jdbc;
|
||||
|
||||
import javax.management.OperationsException;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Map;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
public class TSDBSubscribe {
|
||||
private TSDBJNIConnector connecter = null;
|
||||
private static ScheduledExecutorService pool;
|
||||
private static Map<Long, TSDBTimerTask> timerTaskMap = new ConcurrentHashMap<>();
|
||||
private static Map<Long, ScheduledFuture> scheduledMap = new ConcurrentHashMap();
|
||||
|
||||
private static class TimerInstance {
|
||||
private static final ScheduledExecutorService instance = Executors.newScheduledThreadPool(1);
|
||||
}
|
||||
|
||||
public static ScheduledExecutorService getTimerInstance() {
|
||||
return TimerInstance.instance;
|
||||
}
|
||||
|
||||
public TSDBSubscribe(TSDBJNIConnector connecter) throws SQLException {
|
||||
if (null != connecter) {
|
||||
this.connecter = connecter;
|
||||
} else {
|
||||
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* sync subscribe
|
||||
*
|
||||
* @param topic
|
||||
* @param sql
|
||||
* @param restart
|
||||
* @param period
|
||||
* @throws SQLException
|
||||
*/
|
||||
public long subscribe(String topic, String sql, boolean restart, int period) throws SQLException {
|
||||
if (this.connecter.isClosed()) {
|
||||
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
|
||||
}
|
||||
if (period < 1000) {
|
||||
throw new SQLException(TSDBConstants.WrapErrMsg(TSDBConstants.INVALID_VARIABLES));
|
||||
}
|
||||
return this.connecter.subscribe(topic, sql, restart, period);
|
||||
}
|
||||
|
||||
/**
|
||||
* async subscribe
|
||||
*
|
||||
* @param topic
|
||||
* @param sql
|
||||
* @param restart
|
||||
* @param period
|
||||
* @param callBack
|
||||
* @throws SQLException
|
||||
*/
|
||||
public long subscribe(String topic, String sql, boolean restart, int period, TSDBSubscribeCallBack callBack) throws SQLException {
|
||||
if (this.connecter.isClosed()) {
|
||||
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
|
||||
}
|
||||
final long subscription = this.connecter.subscribe(topic, sql, restart, period);
|
||||
if (null != callBack) {
|
||||
pool = getTimerInstance();
|
||||
|
||||
TSDBTimerTask timerTask = new TSDBTimerTask(subscription, callBack);
|
||||
|
||||
timerTaskMap.put(subscription, timerTask);
|
||||
|
||||
ScheduledFuture scheduledFuture = pool.scheduleAtFixedRate(timerTask, 1, 1000, TimeUnit.MILLISECONDS);
|
||||
scheduledMap.put(subscription, scheduledFuture);
|
||||
}
|
||||
return subscription;
|
||||
}
|
||||
|
||||
public TSDBResultSet consume(long subscription) throws OperationsException, SQLException {
|
||||
if (this.connecter.isClosed()) {
|
||||
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
|
||||
}
|
||||
if (0 == subscription) {
|
||||
throw new OperationsException("Invalid use of consume");
|
||||
}
|
||||
long resultSetPointer = this.connecter.consume(subscription);
|
||||
|
||||
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
|
||||
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
|
||||
} else if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) {
|
||||
return null;
|
||||
} else {
|
||||
return new TSDBResultSet(this.connecter, resultSetPointer);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* cancel subscribe
|
||||
*
|
||||
* @param subscription
|
||||
* @param isKeep
|
||||
* @throws SQLException
|
||||
*/
|
||||
public void unsubscribe(long subscription, boolean isKeep) throws SQLException {
|
||||
if (this.connecter.isClosed()) {
|
||||
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
|
||||
}
|
||||
|
||||
if (null != timerTaskMap.get(subscription)) {
|
||||
synchronized (timerTaskMap.get(subscription)) {
|
||||
while (1 == timerTaskMap.get(subscription).getState()) {
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
timerTaskMap.get(subscription).setState(2);
|
||||
if (!timerTaskMap.isEmpty() && timerTaskMap.containsKey(subscription)) {
|
||||
timerTaskMap.get(subscription).cancel();
|
||||
timerTaskMap.remove(subscription);
|
||||
scheduledMap.get(subscription).cancel(false);
|
||||
scheduledMap.remove(subscription);
|
||||
}
|
||||
this.connecter.unsubscribe(subscription, isKeep);
|
||||
}
|
||||
} else {
|
||||
this.connecter.unsubscribe(subscription, isKeep);
|
||||
}
|
||||
}
|
||||
|
||||
class TSDBTimerTask extends TimerTask {
|
||||
private long subscription;
|
||||
private TSDBSubscribeCallBack callBack;
|
||||
// 0: not running 1: running 2: cancel
|
||||
private int state = 0;
|
||||
|
||||
public TSDBTimerTask(long subscription, TSDBSubscribeCallBack callBack) {
|
||||
this.subscription = subscription;
|
||||
this.callBack = callBack;
|
||||
}
|
||||
|
||||
public int getState() {
|
||||
return this.state;
|
||||
}
|
||||
|
||||
public void setState(int state) {
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (this) {
|
||||
if (2 == state) {
|
||||
return;
|
||||
}
|
||||
|
||||
state = 1;
|
||||
|
||||
try {
|
||||
TSDBResultSet resultSet = consume(subscription);
|
||||
callBack.invoke(resultSet);
|
||||
} catch (Exception e) {
|
||||
this.cancel();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
state = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
/***************************************************************************
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*****************************************************************************/
|
||||
package com.taosdata.jdbc;
|
||||
|
||||
public interface TSDBSubscribeCallBack {
|
||||
void invoke(TSDBResultSet resultSet);
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
import com.taosdata.jdbc.*;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.util.Properties;
|
||||
|
||||
public class TestAsyncTSDBSubscribe {
|
||||
public static void main(String[] args) {
|
||||
String usage = "java -cp taos-jdbcdriver-1.0.3_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName -topic topicName " +
|
||||
"-tname tableName -h host";
|
||||
if (args.length < 2) {
|
||||
System.err.println(usage);
|
||||
return;
|
||||
}
|
||||
|
||||
String dbName = "";
|
||||
String tName = "";
|
||||
String host = "localhost";
|
||||
String topic = "";
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) {
|
||||
dbName = args[++i];
|
||||
}
|
||||
if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) {
|
||||
tName = args[++i];
|
||||
}
|
||||
if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) {
|
||||
host = args[++i];
|
||||
}
|
||||
if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) {
|
||||
topic = args[++i];
|
||||
}
|
||||
}
|
||||
if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) {
|
||||
System.err.println(usage);
|
||||
return;
|
||||
}
|
||||
|
||||
Connection connection = null;
|
||||
TSDBSubscribe subscribe = null;
|
||||
long subscribId = 0;
|
||||
try {
|
||||
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
|
||||
connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata", properties);
|
||||
String rawSql = "select * from " + tName + ";";
|
||||
subscribe = ((TSDBConnection) connection).createSubscribe();
|
||||
subscribId = subscribe.subscribe(topic, rawSql, false, 1000, new CallBack("first"));
|
||||
long subscribId2 = subscribe.subscribe("test", rawSql, false, 1000, new CallBack("second"));
|
||||
int a = 0;
|
||||
Thread.sleep(2000);
|
||||
subscribe.unsubscribe(subscribId, true);
|
||||
System.err.println("cancel subscribe");
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private static class CallBack implements TSDBSubscribeCallBack {
|
||||
private String name = "";
|
||||
|
||||
public CallBack(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void invoke(TSDBResultSet resultSet) {
|
||||
try {
|
||||
while (null !=resultSet && resultSet.next()) {
|
||||
System.out.print("callback_" + name + ": ");
|
||||
for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
|
||||
System.out.printf(i + ": " + resultSet.getString(i) + "\t");
|
||||
}
|
||||
System.out.println();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -10,9 +10,9 @@ public class TestPreparedStatement {
|
|||
try {
|
||||
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, "192.168.1.117");
|
||||
Connection connection = DriverManager.getConnection("jdbc:TAOS://192.168.1.117:0/?user=root&password=taosdata", properties);
|
||||
String rawSql = "SELECT ts, c1 FROM (select c1, ts from db.tb1) SUB_QRY";
|
||||
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, "localhost");
|
||||
Connection connection = DriverManager.getConnection("jdbc:TAOS://localhost:0/?user=root&password=taosdata", properties);
|
||||
String rawSql = "select * from test.log0601";
|
||||
// String[] params = new String[]{"ts", "c1"};
|
||||
PreparedStatement pstmt = (TSDBPreparedStatement) connection.prepareStatement(rawSql);
|
||||
ResultSet resSet = pstmt.executeQuery();
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
import com.taosdata.jdbc.TSDBConnection;
|
||||
import com.taosdata.jdbc.TSDBDriver;
|
||||
import com.taosdata.jdbc.TSDBResultSet;
|
||||
import com.taosdata.jdbc.TSDBSubscribe;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.util.Properties;
|
||||
|
||||
public class TestTSDBSubscribe {
|
||||
public static void main(String[] args) throws Exception {
|
||||
String usage = "java -cp taos-jdbcdriver-1.0.3_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName " +
|
||||
"-topic topicName -tname tableName -h host";
|
||||
if (args.length < 2) {
|
||||
System.err.println(usage);
|
||||
return;
|
||||
}
|
||||
|
||||
String dbName = "";
|
||||
String tName = "";
|
||||
String host = "localhost";
|
||||
String topic = "";
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) {
|
||||
dbName = args[++i];
|
||||
}
|
||||
if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) {
|
||||
tName = args[++i];
|
||||
}
|
||||
if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) {
|
||||
host = args[++i];
|
||||
}
|
||||
if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) {
|
||||
topic = args[++i];
|
||||
}
|
||||
}
|
||||
if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) {
|
||||
System.err.println(usage);
|
||||
return;
|
||||
}
|
||||
|
||||
Connection connection = null;
|
||||
TSDBSubscribe subscribe = null;
|
||||
long subscribId = 0;
|
||||
try {
|
||||
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
|
||||
connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata"
|
||||
, properties);
|
||||
String rawSql = "select * from " + tName + ";";
|
||||
subscribe = ((TSDBConnection) connection).createSubscribe();
|
||||
subscribId = subscribe.subscribe(topic, rawSql, false, 1000);
|
||||
int a = 0;
|
||||
while (true) {
|
||||
Thread.sleep(900);
|
||||
TSDBResultSet resSet = subscribe.consume(subscribId);
|
||||
|
||||
while (resSet.next()) {
|
||||
for (int i = 1; i <= resSet.getMetaData().getColumnCount(); i++) {
|
||||
System.out.printf(i + ": " + resSet.getString(i) + "\t");
|
||||
}
|
||||
System.out.println("\n======" + a + "==========");
|
||||
}
|
||||
|
||||
a++;
|
||||
if (a >= 10) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
if (null != subscribe && 0 != subscribId) {
|
||||
subscribe.unsubscribe(subscribId, true);
|
||||
}
|
||||
if (null != connection) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,136 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <arpa/inet.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <netdb.h>
|
||||
#include <netinet/in.h>
|
||||
#include <pthread.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#define BUFFER_SIZE 200
|
||||
|
||||
typedef struct {
|
||||
int port;
|
||||
char *host[15];
|
||||
} info;
|
||||
|
||||
void *checkPort(void *sarg) {
|
||||
info *pinfo = (info *)sarg;
|
||||
int port = pinfo->port;
|
||||
char *host = *pinfo->host;
|
||||
int clientSocket;
|
||||
|
||||
struct sockaddr_in serverAddr;
|
||||
char sendbuf[BUFFER_SIZE];
|
||||
char recvbuf[BUFFER_SIZE];
|
||||
int iDataNum;
|
||||
if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
|
||||
perror("socket");
|
||||
return NULL;
|
||||
}
|
||||
serverAddr.sin_family = AF_INET;
|
||||
serverAddr.sin_port = htons(port);
|
||||
|
||||
serverAddr.sin_addr.s_addr = inet_addr(host);
|
||||
|
||||
printf("=================================\n");
|
||||
if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
|
||||
perror("connect");
|
||||
return NULL;
|
||||
}
|
||||
printf("Connect to: %s:%d...success\n", host, port);
|
||||
|
||||
sprintf(sendbuf, "send port_%d", port);
|
||||
send(clientSocket, sendbuf, strlen(sendbuf), 0);
|
||||
printf("Send msg_%d: %s\n", port, sendbuf);
|
||||
|
||||
recvbuf[0] = '\0';
|
||||
iDataNum = recv(clientSocket, recvbuf, BUFFER_SIZE, 0);
|
||||
recvbuf[iDataNum] = '\0';
|
||||
printf("Read ack msg_%d: %s\n", port, recvbuf);
|
||||
|
||||
printf("=================================\n");
|
||||
close(clientSocket);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *checkUPort(void *sarg) {
|
||||
info *pinfo = (info *)sarg;
|
||||
int port = pinfo->port;
|
||||
char *host = *pinfo->host;
|
||||
int clientSocket;
|
||||
|
||||
struct sockaddr_in serverAddr;
|
||||
char sendbuf[BUFFER_SIZE];
|
||||
char recvbuf[BUFFER_SIZE];
|
||||
int iDataNum;
|
||||
if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
|
||||
perror("socket");
|
||||
return NULL;
|
||||
}
|
||||
serverAddr.sin_family = AF_INET;
|
||||
serverAddr.sin_port = htons(port);
|
||||
|
||||
serverAddr.sin_addr.s_addr = inet_addr(host);
|
||||
|
||||
printf("=================================\n");
|
||||
|
||||
sprintf(sendbuf, "send msg port_%d by udp", port);
|
||||
|
||||
socklen_t sin_size = sizeof(*(struct sockaddr*)&serverAddr);
|
||||
|
||||
sendto(clientSocket, sendbuf, strlen(sendbuf), 0, (struct sockaddr *)&serverAddr, (int)sin_size);
|
||||
|
||||
printf("Send msg_%d by udp: %s\n", port, sendbuf);
|
||||
|
||||
recvbuf[0] = '\0';
|
||||
iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
|
||||
recvbuf[iDataNum] = '\0';
|
||||
printf("Read ack msg_%d from udp: %s\n", port, recvbuf);
|
||||
|
||||
printf("=================================\n");
|
||||
close(clientSocket);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main() {
|
||||
int port = 6020;
|
||||
char *host = "127.0.0.1";
|
||||
info *tinfo = malloc(sizeof(info));
|
||||
info *uinfo = malloc(sizeof(info));
|
||||
|
||||
for (size_t i = 0; i < 30; i++) {
|
||||
port++;
|
||||
printf("For test: %s:%d\n", host, port);
|
||||
|
||||
*tinfo->host = host;
|
||||
tinfo->port = port;
|
||||
checkPort(tinfo);
|
||||
|
||||
*uinfo->host = host;
|
||||
uinfo->port = port;
|
||||
checkUPort(uinfo);
|
||||
}
|
||||
free(tinfo);
|
||||
free(uinfo);
|
||||
}
|
|
@ -0,0 +1,204 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <arpa/inet.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <netdb.h>
|
||||
#include <netinet/in.h>
|
||||
#include <pthread.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#define BUFFER_SIZE 200
|
||||
|
||||
typedef struct {
|
||||
int port;
|
||||
int type; // 0: tcp, 1: udo, default: 0
|
||||
} info;
|
||||
|
||||
static void *bindPort(void *sarg) {
|
||||
info *pinfo = (info *)sarg;
|
||||
int port = pinfo->port;
|
||||
int type = pinfo->type;
|
||||
int serverSocket;
|
||||
|
||||
struct sockaddr_in server_addr;
|
||||
struct sockaddr_in clientAddr;
|
||||
int addr_len = sizeof(clientAddr);
|
||||
int client;
|
||||
char buffer[BUFFER_SIZE];
|
||||
int iDataNum;
|
||||
|
||||
if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
|
||||
perror("socket");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
bzero(&server_addr, sizeof(server_addr));
|
||||
server_addr.sin_family = AF_INET;
|
||||
server_addr.sin_port = htons(port);
|
||||
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
|
||||
|
||||
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
|
||||
perror("connect");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (listen(serverSocket, 5) < 0) {
|
||||
perror("listen");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
printf("Bind port: %d success\n", port);
|
||||
while (1) {
|
||||
client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len);
|
||||
if (client < 0) {
|
||||
perror("accept");
|
||||
continue;
|
||||
}
|
||||
printf("=================================\n");
|
||||
|
||||
printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port);
|
||||
while (1) {
|
||||
buffer[0] = '\0';
|
||||
iDataNum = recv(client, buffer, BUFFER_SIZE, 0);
|
||||
|
||||
if (iDataNum < 0) {
|
||||
perror("recv null");
|
||||
continue;
|
||||
}
|
||||
if (iDataNum > 0) {
|
||||
buffer[iDataNum] = '\0';
|
||||
printf("read msg:%s\n", buffer);
|
||||
if (strcmp(buffer, "quit") == 0) break;
|
||||
buffer[0] = '\0';
|
||||
|
||||
sprintf(buffer, "ack port_%d", port);
|
||||
printf("send ack msg:%s\n", buffer);
|
||||
|
||||
send(client, buffer, strlen(buffer), 0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
printf("=================================\n");
|
||||
}
|
||||
close(serverSocket);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void *bindUPort(void *sarg) {
|
||||
info *pinfo = (info *)sarg;
|
||||
int port = pinfo->port;
|
||||
int type = pinfo->type;
|
||||
int serverSocket;
|
||||
|
||||
struct sockaddr_in server_addr;
|
||||
struct sockaddr_in clientAddr;
|
||||
int addr_len = sizeof(clientAddr);
|
||||
int client;
|
||||
char buffer[BUFFER_SIZE];
|
||||
int iDataNum;
|
||||
|
||||
if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
|
||||
perror("socket");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
bzero(&server_addr, sizeof(server_addr));
|
||||
server_addr.sin_family = AF_INET;
|
||||
server_addr.sin_port = htons(port);
|
||||
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
|
||||
|
||||
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
|
||||
perror("connect");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
socklen_t sin_size;
|
||||
printf("Bind port: %d success\n", port);
|
||||
|
||||
while (1) {
|
||||
buffer[0] = '\0';
|
||||
|
||||
sin_size = sizeof(*(struct sockaddr *)&server_addr);
|
||||
|
||||
iDataNum = recvfrom(serverSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &sin_size);
|
||||
|
||||
if (iDataNum < 0) {
|
||||
perror("recvfrom null");
|
||||
continue;
|
||||
}
|
||||
if (iDataNum > 0) {
|
||||
printf("=================================\n");
|
||||
|
||||
printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port);
|
||||
buffer[iDataNum] = '\0';
|
||||
printf("Read msg from udp:%s\n", buffer);
|
||||
if (strcmp(buffer, "quit") == 0) break;
|
||||
buffer[0] = '\0';
|
||||
|
||||
sprintf(buffer, "ack port_%d by udp", port);
|
||||
printf("Send ack msg by udp:%s\n", buffer);
|
||||
|
||||
sendto(serverSocket, buffer, strlen(buffer), 0, (struct sockaddr *)&clientAddr, (int)sin_size);
|
||||
|
||||
send(client, buffer, strlen(buffer), 0);
|
||||
printf("=================================\n");
|
||||
}
|
||||
}
|
||||
|
||||
close(serverSocket);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
int main() {
|
||||
int port = 6020;
|
||||
pthread_t *pids = malloc(60 * sizeof(pthread_t));
|
||||
info * infos = malloc(30 * sizeof(info));
|
||||
info * uinfos = malloc(30 * sizeof(info));
|
||||
|
||||
for (size_t i = 0; i < 30; i++) {
|
||||
port++;
|
||||
|
||||
info *pinfo = infos++;
|
||||
pinfo->port = port;
|
||||
|
||||
if (pthread_create(pids + i, NULL, bindPort, pinfo) != 0) //创建线程
|
||||
{ //创建线程失败
|
||||
printf("创建线程失败: %d.\n", port);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
info *uinfo = uinfos++;
|
||||
uinfo->port = port;
|
||||
uinfo->type = 1;
|
||||
if (pthread_create(pids + 30 + i, NULL, bindUPort, uinfo) != 0) //创建线程
|
||||
{ //创建线程失败
|
||||
printf("创建线程失败: %d.\n", port);
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < 30; i++) {
|
||||
pthread_join(pids[i], NULL);
|
||||
pthread_join(pids[(10 + i)], NULL);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <netinet/in.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/socket.h>
|
||||
#include <unistd.h>
|
||||
#define SERVER_PORT 8000
|
||||
#define SIZE 200
|
||||
|
||||
int main() {
|
||||
struct sockaddr_in servaddr, cliaddr;
|
||||
socklen_t cliaddr_len;
|
||||
int client_sockfd;
|
||||
char buf[SIZE];
|
||||
char recvbuf[SIZE];
|
||||
|
||||
int i, n, flag = 0;
|
||||
|
||||
int len, iDataNum;
|
||||
|
||||
client_sockfd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
bzero(&servaddr, sizeof(servaddr));
|
||||
servaddr.sin_family = AF_INET;
|
||||
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
|
||||
servaddr.sin_port = htons(SERVER_PORT);
|
||||
|
||||
if (connect(client_sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
|
||||
printf("Connected error..\n");
|
||||
return 0;
|
||||
}
|
||||
printf("Connected to server..\n");
|
||||
|
||||
/*循环的发送接收信息并打印接收信息(可以按需发送)--recv返回接收到的字节数,send返回发送的字节数*/
|
||||
while (1) {
|
||||
printf("Enter string to send:");
|
||||
scanf("%s", buf);
|
||||
if (!strcmp(buf, "quit")) {
|
||||
break;
|
||||
}
|
||||
len = (sizeof buf);
|
||||
|
||||
recvbuf[0] = '\0';
|
||||
|
||||
iDataNum = recv(client_sockfd, recvbuf, SIZE, 0);
|
||||
|
||||
recvbuf[iDataNum] = '\0';
|
||||
|
||||
printf("%s\n", recvbuf);
|
||||
}
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <netinet/in.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/socket.h>
|
||||
#include <unistd.h>
|
||||
#define SERVER_PORT 8000
|
||||
#define SIZE 200
|
||||
|
||||
int main() {
|
||||
struct sockaddr_in servaddr, cliaddr;
|
||||
socklen_t cliaddr_len;
|
||||
int listenfd, connfd;
|
||||
char buf[BUFSIZ];
|
||||
int i, n, flag = 0;
|
||||
|
||||
listenfd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
bzero(&servaddr, sizeof(servaddr));
|
||||
servaddr.sin_family = AF_INET;
|
||||
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
|
||||
servaddr.sin_port = htons(SERVER_PORT);
|
||||
bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
|
||||
listen(listenfd, 20);
|
||||
|
||||
printf("Accepting connections..\n");
|
||||
while (1) {
|
||||
cliaddr_len = sizeof(cliaddr);
|
||||
connfd = accept(listenfd, (struct sockaddr *)&cliaddr,
|
||||
&cliaddr_len); //如果得不到客户端发来的消息,将会被阻塞,一直等到消息到来
|
||||
n = read(connfd, buf, SIZE); //如果n<=0,表示客户端已断开
|
||||
while (1) {
|
||||
if (n != 0) {
|
||||
for (i = 0; i < n; i++) printf("%c", buf[i]); //输出客户端发来的信息
|
||||
} else {
|
||||
printf("Client say close the connection..\n");
|
||||
break;
|
||||
}
|
||||
n = read(connfd, buf, SIZE);
|
||||
}
|
||||
close(connfd);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
#include <netinet/in.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/types.h>
|
||||
|
||||
#define SERVER_PORT 8888
|
||||
#define BUFF_LEN 512
|
||||
#define SERVER_IP "172.0.5.182"
|
||||
|
||||
void udp_msg_sender(int fd, struct sockaddr* dst) {}
|
||||
|
||||
/*
|
||||
client:
|
||||
socket-->sendto-->revcfrom-->close
|
||||
*/
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
int client_fd;
|
||||
struct sockaddr_in ser_addr;
|
||||
|
||||
client_fd = socket(AF_INET, SOCK_DGRAM, 0);
|
||||
if (client_fd < 0) {
|
||||
printf("create socket fail!\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
memset(&ser_addr, 0, sizeof(ser_addr));
|
||||
ser_addr.sin_family = AF_INET;
|
||||
// ser_addr.sin_addr.s_addr = inet_addr(SERVER_IP);
|
||||
ser_addr.sin_addr.s_addr = htonl(INADDR_ANY); //注意网络序转换
|
||||
ser_addr.sin_port = htons(SERVER_PORT); //注意网络序转换
|
||||
|
||||
socklen_t len;
|
||||
struct sockaddr_in src;
|
||||
while (1) {
|
||||
char buf[BUFF_LEN] = "TEST UDP MSG!\n";
|
||||
len = sizeof(*(struct sockaddr*)&ser_addr);
|
||||
printf("client:%s\n", buf); //打印自己发送的信息
|
||||
sendto(client_fd, buf, BUFF_LEN, 0, (struct sockaddr*)&ser_addr, len);
|
||||
memset(buf, 0, BUFF_LEN);
|
||||
recvfrom(client_fd, buf, BUFF_LEN, 0, (struct sockaddr*)&src, &len); //接收来自server的信息
|
||||
printf("server:%s\n", buf);
|
||||
sleep(1); //一秒发送一次消息
|
||||
}
|
||||
|
||||
close(client_fd);
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
#include <netinet/in.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/types.h>
|
||||
|
||||
#define SERVER_PORT 8888
|
||||
#define BUFF_LEN 1024
|
||||
|
||||
void handle_udp_msg(int fd) {
|
||||
char buf[BUFF_LEN]; //接收缓冲区,1024字节
|
||||
socklen_t len;
|
||||
int count;
|
||||
struct sockaddr_in clent_addr; // clent_addr用于记录发送方的地址信息
|
||||
while (1) {
|
||||
memset(buf, 0, BUFF_LEN);
|
||||
len = sizeof(clent_addr);
|
||||
count =
|
||||
recvfrom(fd, buf, BUFF_LEN, 0, (struct sockaddr*)&clent_addr, &len); // recvfrom是拥塞函数,没有数据就一直拥塞
|
||||
if (count == -1) {
|
||||
printf("recieve data fail!\n");
|
||||
return;
|
||||
}
|
||||
printf("client:%s\n", buf); //打印client发过来的信息
|
||||
memset(buf, 0, BUFF_LEN);
|
||||
sprintf(buf, "I have recieved %d bytes data!\n", count); //回复client
|
||||
printf("server:%s\n", buf); //打印自己发送的信息给
|
||||
sendto(fd, buf, BUFF_LEN, 0, (struct sockaddr*)&clent_addr,
|
||||
len); //发送信息给client,注意使用了clent_addr结构体指针
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
server:
|
||||
socket-->bind-->recvfrom-->sendto-->close
|
||||
*/
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
int server_fd, ret;
|
||||
struct sockaddr_in ser_addr;
|
||||
|
||||
server_fd = socket(AF_INET, SOCK_DGRAM, 0); // AF_INET:IPV4;SOCK_DGRAM:UDP
|
||||
if (server_fd < 0) {
|
||||
printf("create socket fail!\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
memset(&ser_addr, 0, sizeof(ser_addr));
|
||||
ser_addr.sin_family = AF_INET;
|
||||
ser_addr.sin_addr.s_addr = htonl(INADDR_ANY); // IP地址,需要进行网络序转换,INADDR_ANY:本地地址
|
||||
ser_addr.sin_port = htons(SERVER_PORT); //端口号,需要网络序转换
|
||||
|
||||
ret = bind(server_fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
|
||||
if (ret < 0) {
|
||||
printf("socket bind fail!\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
handle_udp_msg(server_fd); //处理接收到的数据
|
||||
|
||||
close(server_fd);
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue