diff --git a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h
index 8dbe63d75a..713ab2111d 100644
--- a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h
+++ b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h
@@ -143,7 +143,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp
* Signature: (J)Lcom/taosdata/jdbc/TSDBResultSetRowData;
*/
JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp
- (JNIEnv *, jobject, jlong, jint);
+ (JNIEnv *, jobject, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c
index 6ab1b73d1e..98a19184ed 100644
--- a/src/client/src/TSDBJNIConnector.c
+++ b/src/client/src/TSDBJNIConnector.c
@@ -552,107 +552,20 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNI
return sub;
}
-static jobject convert_one_row(JNIEnv *env, TAOS_ROW row, TAOS_FIELD* fields, int num_fields) {
- jobject rowobj = (*env)->NewObject(env, g_rowdataClass, g_rowdataConstructor, num_fields);
- jniTrace("created a rowdata object, rowobj:%p", rowobj);
-
- for (int i = 0; i < num_fields; i++) {
- if (row[i] == NULL) {
- continue;
- }
-
- switch (fields[i].type) {
- case TSDB_DATA_TYPE_BOOL:
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetBooleanFp, i, (jboolean)(*((char *)row[i]) == 1));
- break;
- case TSDB_DATA_TYPE_TINYINT:
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteFp, i, (jbyte) * ((char *)row[i]));
- break;
- case TSDB_DATA_TYPE_SMALLINT:
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetShortFp, i, (jshort) * ((short *)row[i]));
- break;
- case TSDB_DATA_TYPE_INT:
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetIntFp, i, (jint) * (int *)row[i]);
- break;
- case TSDB_DATA_TYPE_BIGINT:
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetLongFp, i, (jlong) * ((int64_t *)row[i]));
- break;
- case TSDB_DATA_TYPE_FLOAT: {
- float fv = 0;
- fv = GET_FLOAT_VAL(row[i]);
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetFloatFp, i, (jfloat)fv);
- }
- break;
- case TSDB_DATA_TYPE_DOUBLE:{
- double dv = 0;
- dv = GET_DOUBLE_VAL(row[i]);
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetDoubleFp, i, (jdouble)dv);
- }
- break;
- case TSDB_DATA_TYPE_BINARY: {
- char tmp[TSDB_MAX_BYTES_PER_ROW] = {0};
- strncpy(tmp, row[i], (size_t) fields[i].bytes); // handle the case that terminated does not exist
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetStringFp, i, (*env)->NewStringUTF(env, tmp));
-
- memset(tmp, 0, (size_t) fields[i].bytes);
- break;
- }
- case TSDB_DATA_TYPE_NCHAR:
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteArrayFp, i,
- jniFromNCharToByteArray(env, (char*)row[i], fields[i].bytes));
- break;
- case TSDB_DATA_TYPE_TIMESTAMP:
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetTimestampFp, i, (jlong) * ((int64_t *)row[i]));
- break;
- default:
- break;
- }
- }
- return rowobj;
-}
-
-JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub, jint timeout) {
+JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub) {
jniTrace("jobj:%p, in TSDBJNIConnector_consumeImp, sub:%ld", jobj, sub);
jniGetGlobalMethod(env);
TAOS_SUB *tsub = (TAOS_SUB *)sub;
- jobject rows = (*env)->NewObject(env, g_arrayListClass, g_arrayListConstructFp);
- int64_t start = taosGetTimestampMs();
- int count = 0;
+ TAOS_RES *res = taos_consume(tsub);
- while (true) {
- TAOS_RES * res = taos_consume(tsub);
- if (res == NULL) {
- jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub);
- return NULL;
- }
-
- TAOS_FIELD *fields = taos_fetch_fields(res);
- int num_fields = taos_num_fields(res);
- while (true) {
- TAOS_ROW row = taos_fetch_row(res);
- if (row == NULL) {
- break;
- }
- jobject rowobj = convert_one_row(env, row, fields, num_fields);
- (*env)->CallBooleanMethod(env, rows, g_arrayListAddFp, rowobj);
- count++;
- }
-
- if (count > 0) {
- break;
- }
- if (timeout == -1) {
- continue;
- }
- if (((int)(taosGetTimestampMs() - start)) >= timeout) {
- jniTrace("jobj:%p, sub:%ld, timeout", jobj, sub);
- break;
- }
+ if (res == NULL) {
+ jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub);
+ return 0l;
}
- return rows;
+ return res;
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp(JNIEnv *env, jobject jobj, jlong sub, jboolean keepProgress) {
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/DatabaseMetaDataResultSet.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/DatabaseMetaDataResultSet.java
index 027d2197a3..9ee66472a3 100644
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/DatabaseMetaDataResultSet.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/DatabaseMetaDataResultSet.java
@@ -102,41 +102,49 @@ public class DatabaseMetaDataResultSet implements ResultSet {
@Override
public byte getByte(int columnIndex) throws SQLException {
+ columnIndex--;
return (byte) rowCursor.getInt(columnIndex, columnMetaDataList.get(columnIndex).getColType());
}
@Override
public short getShort(int columnIndex) throws SQLException {
+ columnIndex--;
return (short) rowCursor.getInt(columnIndex, columnMetaDataList.get(columnIndex).getColType());
}
@Override
public int getInt(int columnIndex) throws SQLException {
+ columnIndex--;
return rowCursor.getInt(columnIndex, columnMetaDataList.get(columnIndex).getColType());
}
@Override
public long getLong(int columnIndex) throws SQLException {
+ columnIndex--;
return rowCursor.getLong(columnIndex, columnMetaDataList.get(columnIndex).getColType());
}
@Override
public float getFloat(int columnIndex) throws SQLException {
+ columnIndex--;
return rowCursor.getFloat(columnIndex, columnMetaDataList.get(columnIndex).getColType());
}
@Override
public double getDouble(int columnIndex) throws SQLException {
+ columnIndex--;
return rowCursor.getDouble(columnIndex, columnMetaDataList.get(columnIndex).getColType());
}
@Override
public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
+ columnIndex--;
return new BigDecimal(rowCursor.getDouble(columnIndex, columnMetaDataList.get(columnIndex).getColType()));
}
@Override
public byte[] getBytes(int columnIndex) throws SQLException {
+ columnIndex--;
return (rowCursor.getString(columnIndex, columnMetaDataList.get(columnIndex).getColType())).getBytes();
}
@@ -152,6 +160,7 @@ public class DatabaseMetaDataResultSet implements ResultSet {
@Override
public Timestamp getTimestamp(int columnIndex) throws SQLException {
+ columnIndex--;
return rowCursor.getTimestamp(columnIndex);
}
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java
index 4640f6b446..062cb63cfd 100644
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java
@@ -84,6 +84,14 @@ public class TSDBConnection implements Connection {
}
}
+ public TSDBSubscribe createSubscribe() throws SQLException {
+ if (!this.connector.isClosed()) {
+ return new TSDBSubscribe(this.connector);
+ } else {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ }
+ }
+
public PreparedStatement prepareStatement(String sql) throws SQLException {
if (!this.connector.isClosed()) {
return new TSDBPreparedStatement(this.connector, sql);
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
index 3adb601822..d7f73a3fca 100755
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
@@ -261,31 +261,31 @@ public class TSDBJNIConnector {
/**
* Subscribe to a table in TSDB
*/
- public long subscribe(String host, String user, String password, String database, String table, long time, int period) {
- return subscribeImp(host, user, password, database, table, time, period);
+ public long subscribe(String topic, String sql, boolean restart, int period) {
+ return subscribeImp(this.taos, restart, topic, sql, period);
}
- private native long subscribeImp(String host, String user, String password, String database, String table, long time, int period);
+ public native long subscribeImp(long connection, boolean restart, String topic, String sql, int period);
/**
* Consume a subscribed table
*/
- public TSDBResultSetRowData consume(long subscription) {
+ public long consume(long subscription) {
return this.consumeImp(subscription);
}
- private native TSDBResultSetRowData consumeImp(long subscription);
+ private native long consumeImp(long subscription);
/**
* Unsubscribe a table
*
* @param subscription
*/
- public void unsubscribe(long subscription) {
- unsubscribeImp(subscription);
+ public void unsubscribe(long subscription, boolean isKeep) {
+ unsubscribeImp(subscription, isKeep);
}
- private native void unsubscribeImp(long subscription);
+ private native void unsubscribeImp(long subscription, boolean isKeep);
/**
* Validate if a create table sql statement is correct without actually creating that table
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java
index 8acf779756..961633b8ae 100644
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java
@@ -51,47 +51,47 @@ public class TSDBResultSet implements ResultSet {
private boolean lastWasNull = false;
private final int COLUMN_INDEX_START_VALUE = 1;
- public TSDBJNIConnector getJniConnector() {
- return jniConnector;
- }
+ public TSDBJNIConnector getJniConnector() {
+ return jniConnector;
+ }
- public void setJniConnector(TSDBJNIConnector jniConnector) {
- this.jniConnector = jniConnector;
- }
+ public void setJniConnector(TSDBJNIConnector jniConnector) {
+ this.jniConnector = jniConnector;
+ }
- public long getResultSetPointer() {
- return resultSetPointer;
- }
+ public long getResultSetPointer() {
+ return resultSetPointer;
+ }
- public void setResultSetPointer(long resultSetPointer) {
- this.resultSetPointer = resultSetPointer;
- }
+ public void setResultSetPointer(long resultSetPointer) {
+ this.resultSetPointer = resultSetPointer;
+ }
- public List getColumnMetaDataList() {
- return columnMetaDataList;
- }
+ public List getColumnMetaDataList() {
+ return columnMetaDataList;
+ }
- public void setColumnMetaDataList(List columnMetaDataList) {
- this.columnMetaDataList = columnMetaDataList;
- }
+ public void setColumnMetaDataList(List columnMetaDataList) {
+ this.columnMetaDataList = columnMetaDataList;
+ }
- public TSDBResultSetRowData getRowData() {
- return rowData;
- }
+ public TSDBResultSetRowData getRowData() {
+ return rowData;
+ }
- public void setRowData(TSDBResultSetRowData rowData) {
- this.rowData = rowData;
- }
+ public void setRowData(TSDBResultSetRowData rowData) {
+ this.rowData = rowData;
+ }
- public boolean isLastWasNull() {
- return lastWasNull;
- }
+ public boolean isLastWasNull() {
+ return lastWasNull;
+ }
- public void setLastWasNull(boolean lastWasNull) {
- this.lastWasNull = lastWasNull;
- }
+ public void setLastWasNull(boolean lastWasNull) {
+ this.lastWasNull = lastWasNull;
+ }
- public TSDBResultSet() {
+ public TSDBResultSet() {
}
public TSDBResultSet(TSDBJNIConnector connecter, long resultSetPointer) throws SQLException {
@@ -119,7 +119,7 @@ public class TSDBResultSet implements ResultSet {
public boolean next() throws SQLException {
if (rowData != null) {
- this.rowData.clear();
+ this.rowData.clear();
}
int code = this.jniConnector.fetchRow(this.resultSetPointer, this.rowData);
@@ -154,119 +154,119 @@ public class TSDBResultSet implements ResultSet {
public String getString(int columnIndex) throws SQLException {
String res = null;
int colIndex = getTrueColumnIndex(columnIndex);
-
- this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+
+ this.lastWasNull = this.rowData.wasNull(colIndex);
+ if (!lastWasNull) {
+ res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
public boolean getBoolean(int columnIndex) throws SQLException {
- boolean res = false;
+ boolean res = false;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
- res = this.rowData.getBoolean(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+ res = this.rowData.getBoolean(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
public byte getByte(int columnIndex) throws SQLException {
- byte res = 0;
+ byte res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
- res = (byte) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+ res = (byte) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
public short getShort(int columnIndex) throws SQLException {
- short res = 0;
+ short res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
- res = (short) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+ res = (short) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
public int getInt(int columnIndex) throws SQLException {
- int res = 0;
+ int res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+ if (!lastWasNull) {
+ res = this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
public long getLong(int columnIndex) throws SQLException {
- long res = 0l;
+ long res = 0l;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+ if (!lastWasNull) {
+ res = this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
public float getFloat(int columnIndex) throws SQLException {
- float res = 0;
- int colIndex = getTrueColumnIndex(columnIndex);
+ float res = 0;
+ int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getFloat(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+ if (!lastWasNull) {
+ res = this.rowData.getFloat(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
public double getDouble(int columnIndex) throws SQLException {
- double res = 0;
+ double res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getDouble(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+ if (!lastWasNull) {
+ res = this.rowData.getDouble(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
/*
* (non-Javadoc)
- *
+ *
* @see java.sql.ResultSet#getBigDecimal(int, int)
- *
+ *
* @deprecated Use {@code getBigDecimal(int columnIndex)} or {@code
* getBigDecimal(String columnLabel)}
*/
@Deprecated
public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
- BigDecimal res = null;
+ BigDecimal res = null;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()));
- }
+ if (!lastWasNull) {
+ res = new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()));
+ }
return res;
}
public byte[] getBytes(int columnIndex) throws SQLException {
- byte[] res = null;
+ byte[] res = null;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType()).getBytes();
- }
+ if (!lastWasNull) {
+ res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType()).getBytes();
+ }
return res;
}
@@ -281,13 +281,13 @@ public class TSDBResultSet implements ResultSet {
}
public Timestamp getTimestamp(int columnIndex) throws SQLException {
- Timestamp res = null;
+ Timestamp res = null;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getTimestamp(colIndex);
- }
+ if (!lastWasNull) {
+ res = this.rowData.getTimestamp(colIndex);
+ }
return res;
}
@@ -297,9 +297,9 @@ public class TSDBResultSet implements ResultSet {
/*
* (non-Javadoc)
- *
+ *
* @see java.sql.ResultSet#getUnicodeStream(int)
- *
+ *
* * @deprecated use getCharacterStream
in place of
* getUnicodeStream
*/
@@ -409,13 +409,13 @@ public class TSDBResultSet implements ResultSet {
}
public int findColumn(String columnLabel) throws SQLException {
- Iterator colMetaDataIt = this.columnMetaDataList.iterator();
- while (colMetaDataIt.hasNext()) {
- ColumnMetaData colMetaData = colMetaDataIt.next();
- if (colMetaData.getColName() != null && colMetaData.getColName().equalsIgnoreCase(columnLabel)) {
- return colMetaData.getColIndex() + 1;
- }
- }
+ Iterator colMetaDataIt = this.columnMetaDataList.iterator();
+ while (colMetaDataIt.hasNext()) {
+ ColumnMetaData colMetaData = colMetaDataIt.next();
+ if (colMetaData.getColName() != null && colMetaData.getColName().equalsIgnoreCase(columnLabel)) {
+ return colMetaData.getColIndex() + 1;
+ }
+ }
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
}
@@ -882,7 +882,7 @@ public class TSDBResultSet implements ResultSet {
}
public String getNString(int columnIndex) throws SQLException {
- int colIndex = getTrueColumnIndex(columnIndex);
+ int colIndex = getTrueColumnIndex(columnIndex);
return (String) rowData.get(colIndex);
}
@@ -1017,17 +1017,17 @@ public class TSDBResultSet implements ResultSet {
public T getObject(String columnLabel, Class type) throws SQLException {
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
-
+
private int getTrueColumnIndex(int columnIndex) throws SQLException {
if (columnIndex < this.COLUMN_INDEX_START_VALUE) {
throw new SQLException("Column Index out of range, " + columnIndex + " < " + this.COLUMN_INDEX_START_VALUE);
}
-
+
int numOfCols = this.columnMetaDataList.size();
if (columnIndex > numOfCols) {
throw new SQLException("Column Index out of range, " + columnIndex + " > " + numOfCols);
}
-
+
return columnIndex - 1;
}
}
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java
new file mode 100644
index 0000000000..3b479aafc3
--- /dev/null
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java
@@ -0,0 +1,185 @@
+/***************************************************************************
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *****************************************************************************/
+package com.taosdata.jdbc;
+
+import javax.management.OperationsException;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.*;
+
+public class TSDBSubscribe {
+ private TSDBJNIConnector connecter = null;
+ private static ScheduledExecutorService pool;
+ private static Map timerTaskMap = new ConcurrentHashMap<>();
+ private static Map scheduledMap = new ConcurrentHashMap();
+
+ private static class TimerInstance {
+ private static final ScheduledExecutorService instance = Executors.newScheduledThreadPool(1);
+ }
+
+ public static ScheduledExecutorService getTimerInstance() {
+ return TimerInstance.instance;
+ }
+
+ public TSDBSubscribe(TSDBJNIConnector connecter) throws SQLException {
+ if (null != connecter) {
+ this.connecter = connecter;
+ } else {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ }
+ }
+
+ /**
+ * sync subscribe
+ *
+ * @param topic
+ * @param sql
+ * @param restart
+ * @param period
+ * @throws SQLException
+ */
+ public long subscribe(String topic, String sql, boolean restart, int period) throws SQLException {
+ if (this.connecter.isClosed()) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ }
+ if (period < 1000) {
+ throw new SQLException(TSDBConstants.WrapErrMsg(TSDBConstants.INVALID_VARIABLES));
+ }
+ return this.connecter.subscribe(topic, sql, restart, period);
+ }
+
+ /**
+ * async subscribe
+ *
+ * @param topic
+ * @param sql
+ * @param restart
+ * @param period
+ * @param callBack
+ * @throws SQLException
+ */
+ public long subscribe(String topic, String sql, boolean restart, int period, TSDBSubscribeCallBack callBack) throws SQLException {
+ if (this.connecter.isClosed()) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ }
+ final long subscription = this.connecter.subscribe(topic, sql, restart, period);
+ if (null != callBack) {
+ pool = getTimerInstance();
+
+ TSDBTimerTask timerTask = new TSDBTimerTask(subscription, callBack);
+
+ timerTaskMap.put(subscription, timerTask);
+
+ ScheduledFuture scheduledFuture = pool.scheduleAtFixedRate(timerTask, 1, 1000, TimeUnit.MILLISECONDS);
+ scheduledMap.put(subscription, scheduledFuture);
+ }
+ return subscription;
+ }
+
+ public TSDBResultSet consume(long subscription) throws OperationsException, SQLException {
+ if (this.connecter.isClosed()) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ }
+ if (0 == subscription) {
+ throw new OperationsException("Invalid use of consume");
+ }
+ long resultSetPointer = this.connecter.consume(subscription);
+
+ if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ } else if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) {
+ return null;
+ } else {
+ return new TSDBResultSet(this.connecter, resultSetPointer);
+ }
+ }
+
+ /**
+ * cancel subscribe
+ *
+ * @param subscription
+ * @param isKeep
+ * @throws SQLException
+ */
+ public void unsubscribe(long subscription, boolean isKeep) throws SQLException {
+ if (this.connecter.isClosed()) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ }
+
+ if (null != timerTaskMap.get(subscription)) {
+ synchronized (timerTaskMap.get(subscription)) {
+ while (1 == timerTaskMap.get(subscription).getState()) {
+ try {
+ Thread.sleep(10);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ timerTaskMap.get(subscription).setState(2);
+ if (!timerTaskMap.isEmpty() && timerTaskMap.containsKey(subscription)) {
+ timerTaskMap.get(subscription).cancel();
+ timerTaskMap.remove(subscription);
+ scheduledMap.get(subscription).cancel(false);
+ scheduledMap.remove(subscription);
+ }
+ this.connecter.unsubscribe(subscription, isKeep);
+ }
+ } else {
+ this.connecter.unsubscribe(subscription, isKeep);
+ }
+ }
+
+ class TSDBTimerTask extends TimerTask {
+ private long subscription;
+ private TSDBSubscribeCallBack callBack;
+ // 0: not running 1: running 2: cancel
+ private int state = 0;
+
+ public TSDBTimerTask(long subscription, TSDBSubscribeCallBack callBack) {
+ this.subscription = subscription;
+ this.callBack = callBack;
+ }
+
+ public int getState() {
+ return this.state;
+ }
+
+ public void setState(int state) {
+ this.state = state;
+ }
+
+ @Override
+ public void run() {
+ synchronized (this) {
+ if (2 == state) {
+ return;
+ }
+
+ state = 1;
+
+ try {
+ TSDBResultSet resultSet = consume(subscription);
+ callBack.invoke(resultSet);
+ } catch (Exception e) {
+ this.cancel();
+ throw new RuntimeException(e);
+ }
+ state = 0;
+ }
+ }
+ }
+}
+
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribeCallBack.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribeCallBack.java
new file mode 100644
index 0000000000..c1b9b02fa8
--- /dev/null
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribeCallBack.java
@@ -0,0 +1,19 @@
+/***************************************************************************
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *****************************************************************************/
+package com.taosdata.jdbc;
+
+public interface TSDBSubscribeCallBack {
+ void invoke(TSDBResultSet resultSet);
+}
diff --git a/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java b/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java
new file mode 100644
index 0000000000..5b2b6367ec
--- /dev/null
+++ b/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java
@@ -0,0 +1,83 @@
+import com.taosdata.jdbc.*;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Properties;
+
+public class TestAsyncTSDBSubscribe {
+ public static void main(String[] args) {
+ String usage = "java -cp taos-jdbcdriver-1.0.3_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName -topic topicName " +
+ "-tname tableName -h host";
+ if (args.length < 2) {
+ System.err.println(usage);
+ return;
+ }
+
+ String dbName = "";
+ String tName = "";
+ String host = "localhost";
+ String topic = "";
+ for (int i = 0; i < args.length; i++) {
+ if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ dbName = args[++i];
+ }
+ if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ tName = args[++i];
+ }
+ if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ host = args[++i];
+ }
+ if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ topic = args[++i];
+ }
+ }
+ if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) {
+ System.err.println(usage);
+ return;
+ }
+
+ Connection connection = null;
+ TSDBSubscribe subscribe = null;
+ long subscribId = 0;
+ try {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ Properties properties = new Properties();
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
+ connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata", properties);
+ String rawSql = "select * from " + tName + ";";
+ subscribe = ((TSDBConnection) connection).createSubscribe();
+ subscribId = subscribe.subscribe(topic, rawSql, false, 1000, new CallBack("first"));
+ long subscribId2 = subscribe.subscribe("test", rawSql, false, 1000, new CallBack("second"));
+ int a = 0;
+ Thread.sleep(2000);
+ subscribe.unsubscribe(subscribId, true);
+ System.err.println("cancel subscribe");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static class CallBack implements TSDBSubscribeCallBack {
+ private String name = "";
+
+ public CallBack(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void invoke(TSDBResultSet resultSet) {
+ try {
+ while (null !=resultSet && resultSet.next()) {
+ System.out.print("callback_" + name + ": ");
+ for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
+ System.out.printf(i + ": " + resultSet.getString(i) + "\t");
+ }
+ System.out.println();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git a/src/connector/jdbc/src/test/java/TestPreparedStatement.java b/src/connector/jdbc/src/test/java/TestPreparedStatement.java
index 2e2cc0ede6..3b84645b5b 100644
--- a/src/connector/jdbc/src/test/java/TestPreparedStatement.java
+++ b/src/connector/jdbc/src/test/java/TestPreparedStatement.java
@@ -10,9 +10,9 @@ public class TestPreparedStatement {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
Properties properties = new Properties();
- properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, "192.168.1.117");
- Connection connection = DriverManager.getConnection("jdbc:TAOS://192.168.1.117:0/?user=root&password=taosdata", properties);
- String rawSql = "SELECT ts, c1 FROM (select c1, ts from db.tb1) SUB_QRY";
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, "localhost");
+ Connection connection = DriverManager.getConnection("jdbc:TAOS://localhost:0/?user=root&password=taosdata", properties);
+ String rawSql = "select * from test.log0601";
// String[] params = new String[]{"ts", "c1"};
PreparedStatement pstmt = (TSDBPreparedStatement) connection.prepareStatement(rawSql);
ResultSet resSet = pstmt.executeQuery();
diff --git a/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java b/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java
new file mode 100644
index 0000000000..f12924c8a6
--- /dev/null
+++ b/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java
@@ -0,0 +1,83 @@
+import com.taosdata.jdbc.TSDBConnection;
+import com.taosdata.jdbc.TSDBDriver;
+import com.taosdata.jdbc.TSDBResultSet;
+import com.taosdata.jdbc.TSDBSubscribe;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Properties;
+
+public class TestTSDBSubscribe {
+ public static void main(String[] args) throws Exception {
+ String usage = "java -cp taos-jdbcdriver-1.0.3_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName " +
+ "-topic topicName -tname tableName -h host";
+ if (args.length < 2) {
+ System.err.println(usage);
+ return;
+ }
+
+ String dbName = "";
+ String tName = "";
+ String host = "localhost";
+ String topic = "";
+ for (int i = 0; i < args.length; i++) {
+ if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ dbName = args[++i];
+ }
+ if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ tName = args[++i];
+ }
+ if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ host = args[++i];
+ }
+ if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ topic = args[++i];
+ }
+ }
+ if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) {
+ System.err.println(usage);
+ return;
+ }
+
+ Connection connection = null;
+ TSDBSubscribe subscribe = null;
+ long subscribId = 0;
+ try {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ Properties properties = new Properties();
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
+ connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata"
+ , properties);
+ String rawSql = "select * from " + tName + ";";
+ subscribe = ((TSDBConnection) connection).createSubscribe();
+ subscribId = subscribe.subscribe(topic, rawSql, false, 1000);
+ int a = 0;
+ while (true) {
+ Thread.sleep(900);
+ TSDBResultSet resSet = subscribe.consume(subscribId);
+
+ while (resSet.next()) {
+ for (int i = 1; i <= resSet.getMetaData().getColumnCount(); i++) {
+ System.out.printf(i + ": " + resSet.getString(i) + "\t");
+ }
+ System.out.println("\n======" + a + "==========");
+ }
+
+ a++;
+ if (a >= 10) {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (null != subscribe && 0 != subscribId) {
+ subscribe.unsubscribe(subscribId, true);
+ }
+ if (null != connection) {
+ connection.close();
+ }
+ }
+ }
+}
diff --git a/src/kit/taosnetwork/client.c b/src/kit/taosnetwork/client.c
new file mode 100644
index 0000000000..545ff79d80
--- /dev/null
+++ b/src/kit/taosnetwork/client.c
@@ -0,0 +1,128 @@
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#define BUFFER_SIZE 200
+
+typedef struct {
+ int port;
+ char *host[15];
+} info;
+
+void *checkPort(void *sarg) {
+ info *pinfo = (info *)sarg;
+ int port = pinfo->port;
+ char *host = *pinfo->host;
+ int clientSocket;
+
+ struct sockaddr_in serverAddr;
+ char sendbuf[BUFFER_SIZE];
+ char recvbuf[BUFFER_SIZE];
+ int iDataNum;
+ if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("socket");
+ return NULL;
+ }
+ serverAddr.sin_family = AF_INET;
+ serverAddr.sin_port = htons(port);
+
+ serverAddr.sin_addr.s_addr = inet_addr(host);
+
+ printf("=================================\n");
+ if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
+ perror("connect");
+ return NULL;
+ }
+ printf("Connect to: %s:%d...success\n", host, port);
+
+ sprintf(sendbuf, "send port_%d", port);
+ send(clientSocket, sendbuf, strlen(sendbuf), 0);
+ printf("Send msg_%d: %s\n", port, sendbuf);
+
+ recvbuf[0] = '\0';
+ iDataNum = recv(clientSocket, recvbuf, BUFFER_SIZE, 0);
+ recvbuf[iDataNum] = '\0';
+ printf("Read ack msg_%d: %s\n", port, recvbuf);
+
+ printf("=================================\n");
+ close(clientSocket);
+ return NULL;
+}
+
+void *checkUPort(void *sarg) {
+ info *pinfo = (info *)sarg;
+ int port = pinfo->port;
+ char *host = *pinfo->host;
+ int clientSocket;
+
+ struct sockaddr_in serverAddr;
+ char sendbuf[BUFFER_SIZE];
+ char recvbuf[BUFFER_SIZE];
+ int iDataNum;
+ if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
+ perror("socket");
+ return NULL;
+ }
+ serverAddr.sin_family = AF_INET;
+ serverAddr.sin_port = htons(port);
+
+ serverAddr.sin_addr.s_addr = inet_addr(host);
+
+ printf("=================================\n");
+ // if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
+ // perror("connect");
+ // return NULL;
+ // }
+
+ // printf("Connect to: %s:%d...success\n", host, port);
+
+ sprintf(sendbuf, "send msg port_%d by udp", port);
+
+ socklen_t sin_size = sizeof(*(struct sockaddr*)&serverAddr);
+
+ sendto(clientSocket, sendbuf, strlen(sendbuf), 0, (struct sockaddr *)&serverAddr, (int)sin_size);
+
+ printf("Send msg_%d by udp: %s\n", port, sendbuf);
+
+ recvbuf[0] = '\0';
+ iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
+ recvbuf[iDataNum] = '\0';
+ printf("Read ack msg_%d from udp: %s\n", port, recvbuf);
+
+ printf("=================================\n");
+ close(clientSocket);
+ return NULL;
+}
+
+int main() {
+ int port = 6020;
+ char *host = "127.0.0.1";
+ info *infos = malloc(10 * sizeof(info));
+ info *uinfos = malloc(10 * sizeof(info));
+
+ for (size_t i = 0; i < 10; i++) {
+ port++;
+ printf("For test: %s:%d\n", host, port);
+
+ info *pinfo = infos++;
+ *pinfo->host = host;
+ pinfo->port = port;
+ checkPort(pinfo);
+
+ info *uinfo = uinfos++;
+ *uinfo->host = host;
+ uinfo->port = port;
+ checkUPort(uinfo);
+ }
+}
\ No newline at end of file
diff --git a/src/kit/taosnetwork/server.c b/src/kit/taosnetwork/server.c
new file mode 100644
index 0000000000..2533c53ca1
--- /dev/null
+++ b/src/kit/taosnetwork/server.c
@@ -0,0 +1,190 @@
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#define BUFFER_SIZE 200
+
+typedef struct {
+ int port;
+ int type; // 0: tcp, 1: udo, default: 0
+} info;
+
+static void *bindPort(void *sarg) {
+ info *pinfo = (info *)sarg;
+ int port = pinfo->port;
+ int type = pinfo->type;
+ int serverSocket;
+
+ struct sockaddr_in server_addr;
+ struct sockaddr_in clientAddr;
+ int addr_len = sizeof(clientAddr);
+ int client;
+ char buffer[BUFFER_SIZE];
+ int iDataNum;
+
+ if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
+ perror("socket");
+ return NULL;
+ }
+
+ bzero(&server_addr, sizeof(server_addr));
+ server_addr.sin_family = AF_INET;
+ server_addr.sin_port = htons(port);
+ server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+ if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
+ perror("connect");
+ return NULL;
+ }
+
+ if (listen(serverSocket, 5) < 0) {
+ perror("listen");
+ return NULL;
+ }
+
+ printf("Bind port: %d success\n", port);
+ while (1) {
+ client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len);
+ if (client < 0) {
+ perror("accept");
+ continue;
+ }
+ printf("=================================\n");
+
+ printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port);
+ while (1) {
+ buffer[0] = '\0';
+ iDataNum = recv(client, buffer, BUFFER_SIZE, 0);
+
+ if (iDataNum < 0) {
+ perror("recv null");
+ continue;
+ }
+ if (iDataNum > 0) {
+ buffer[iDataNum] = '\0';
+ printf("read msg:%s\n", buffer);
+ if (strcmp(buffer, "quit") == 0) break;
+ buffer[0] = '\0';
+
+ sprintf(buffer, "ack port_%d", port);
+ printf("send ack msg:%s\n", buffer);
+
+ send(client, buffer, strlen(buffer), 0);
+ break;
+ }
+ }
+ printf("=================================\n");
+ }
+ close(serverSocket);
+ return NULL;
+}
+
+static void *bindUPort(void *sarg) {
+ info *pinfo = (info *)sarg;
+ int port = pinfo->port;
+ int type = pinfo->type;
+ int serverSocket;
+
+ struct sockaddr_in server_addr;
+ struct sockaddr_in clientAddr;
+ int addr_len = sizeof(clientAddr);
+ int client;
+ char buffer[BUFFER_SIZE];
+ int iDataNum;
+
+ if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
+ perror("socket");
+ return NULL;
+ }
+
+ bzero(&server_addr, sizeof(server_addr));
+ server_addr.sin_family = AF_INET;
+ server_addr.sin_port = htons(port);
+ server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+ if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
+ perror("connect");
+ return NULL;
+ }
+
+ socklen_t sin_size;
+ printf("Bind port: %d success\n", port);
+
+ while (1) {
+ buffer[0] = '\0';
+
+ sin_size = sizeof(*(struct sockaddr *)&server_addr);
+
+ iDataNum = recvfrom(serverSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &sin_size);
+
+ if (iDataNum < 0) {
+ perror("recvfrom null");
+ continue;
+ }
+ if (iDataNum > 0) {
+ printf("=================================\n");
+
+ printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port);
+ buffer[iDataNum] = '\0';
+ printf("Read msg from udp:%s\n", buffer);
+ if (strcmp(buffer, "quit") == 0) break;
+ buffer[0] = '\0';
+
+ sprintf(buffer, "ack port_%d by udp", port);
+ printf("Send ack msg by udp:%s\n", buffer);
+
+ sendto(serverSocket, buffer, strlen(buffer), 0, (struct sockaddr *)&clientAddr, (int)sin_size);
+
+ send(client, buffer, strlen(buffer), 0);
+ printf("=================================\n");
+ }
+ }
+
+ close(serverSocket);
+ return NULL;
+}
+
+
+int main() {
+ int port = 6020;
+ pthread_t *pids = malloc(20 * sizeof(pthread_t));
+ info * infos = malloc(10 * sizeof(info));
+ info * uinfos = malloc(10 * sizeof(info));
+
+ for (size_t i = 0; i < 10; i++) {
+ port++;
+
+ info *pinfo = infos++;
+ pinfo->port = port;
+
+ if (pthread_create(pids + i, NULL, bindPort, pinfo) != 0) //创建线程
+ { //创建线程失败
+ printf("创建线程失败: %d.\n", port);
+ exit(0);
+ }
+
+ info *uinfo = uinfos++;
+ uinfo->port = port;
+ uinfo->type = 1;
+ if (pthread_create(pids + 10 + i, NULL, bindUPort, uinfo) != 0) //创建线程
+ { //创建线程失败
+ printf("创建线程失败: %d.\n", port);
+ exit(0);
+ }
+ }
+ for (int i = 0; i < 10; i++) {
+ pthread_join(pids[i], NULL);
+ pthread_join(pids[(10 + i)], NULL);
+ }
+}