[td-4385] add java api for dynamic create table during insert data.
This commit is contained in:
parent
5789abb8be
commit
9bfe74aa8c
|
@ -218,11 +218,19 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
|
|||
|
||||
/*
|
||||
* Class: com_taosdata_jdbc_TSDBJNIConnector
|
||||
* Method: executeBatchImp
|
||||
* Method: closeStmt
|
||||
* Signature: (JJ)I
|
||||
*/
|
||||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
|
||||
|
||||
/**
|
||||
* Class: com_taosdata_jdbc_TSDBJNIConnector
|
||||
* Method: setTableNameTagsImp
|
||||
* Signature: (JLjava/lang/String;I[B[B[B[BJ)I
|
||||
*/
|
||||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp
|
||||
(JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -749,7 +749,6 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
|
|||
}
|
||||
|
||||
jniDebug("jobj:%p, conn:%p, set stmt bind table name:%s", jobj, tsconn, name);
|
||||
|
||||
(*env)->ReleaseStringUTFChars(env, jname, name);
|
||||
return JNI_SUCCESS;
|
||||
}
|
||||
|
@ -762,7 +761,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
|
|||
return JNI_CONNECTION_NULL;
|
||||
}
|
||||
|
||||
TAOS_STMT* pStmt = (TAOS_STMT*) stmt;
|
||||
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
|
||||
if (pStmt == NULL) {
|
||||
jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
|
||||
return JNI_SQL_NULL;
|
||||
|
@ -777,14 +776,14 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
|
|||
}
|
||||
|
||||
len = (*env)->GetArrayLength(env, lengthList);
|
||||
char *lengthArray = (char*) calloc(1, len);
|
||||
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte*) lengthArray);
|
||||
char *lengthArray = (char *)calloc(1, len);
|
||||
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte *)lengthArray);
|
||||
if ((*env)->ExceptionCheck(env)) {
|
||||
}
|
||||
|
||||
len = (*env)->GetArrayLength(env, nullList);
|
||||
char *nullArray = (char*) calloc(1, len);
|
||||
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte*) nullArray);
|
||||
char *nullArray = (char *)calloc(1, len);
|
||||
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte *)nullArray);
|
||||
if ((*env)->ExceptionCheck(env)) {
|
||||
}
|
||||
|
||||
|
@ -799,22 +798,10 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
|
|||
b->length = (int32_t*)lengthArray;
|
||||
|
||||
// set the length and is_null array
|
||||
switch(dataType) {
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
int32_t bytes = tDataTypes[dataType].bytes;
|
||||
for(int32_t i = 0; i < numOfRows; ++i) {
|
||||
b->length[i] = bytes;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_BINARY: {
|
||||
// do nothing
|
||||
if (!IS_VAR_DATA_TYPE(dataType)) {
|
||||
int32_t bytes = tDataTypes[dataType].bytes;
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
b->length[i] = bytes;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -878,3 +865,73 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv
|
|||
jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon);
|
||||
return JNI_SUCCESS;
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp(JNIEnv *env, jobject jobj,
|
||||
jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList, jbyteArray lengthList, jbyteArray nullList, jlong conn) {
|
||||
TAOS *tsconn = (TAOS *)conn;
|
||||
if (tsconn == NULL) {
|
||||
jniError("jobj:%p, connection already closed", jobj);
|
||||
return JNI_CONNECTION_NULL;
|
||||
}
|
||||
|
||||
TAOS_STMT* pStmt = (TAOS_STMT*) stmt;
|
||||
if (pStmt == NULL) {
|
||||
jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn);
|
||||
return JNI_SQL_NULL;
|
||||
}
|
||||
|
||||
jsize len = (*env)->GetArrayLength(env, tags);
|
||||
char *tagsData = (char *)calloc(1, len);
|
||||
(*env)->GetByteArrayRegion(env, tags, 0, len, (jbyte *)tagsData);
|
||||
if ((*env)->ExceptionCheck(env)) {
|
||||
// todo handle error
|
||||
}
|
||||
|
||||
len = (*env)->GetArrayLength(env, lengthList);
|
||||
int64_t *lengthArray = (int64_t*) calloc(1, len);
|
||||
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte*) lengthArray);
|
||||
if ((*env)->ExceptionCheck(env)) {
|
||||
}
|
||||
|
||||
len = (*env)->GetArrayLength(env, typeList);
|
||||
char *typeArray = (char*) calloc(1, len);
|
||||
(*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte*) typeArray);
|
||||
if ((*env)->ExceptionCheck(env)) {
|
||||
}
|
||||
|
||||
len = (*env)->GetArrayLength(env, nullList);
|
||||
int32_t *nullArray = (int32_t*) calloc(1, len);
|
||||
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte*) nullArray);
|
||||
if ((*env)->ExceptionCheck(env)) {
|
||||
}
|
||||
|
||||
const char *name = (*env)->GetStringUTFChars(env, tableName, NULL);
|
||||
char* curTags = tagsData;
|
||||
|
||||
TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND));
|
||||
for(int32_t i = 0; i < numOfTags; ++i) {
|
||||
tagsBind[i].buffer_type = typeArray[i];
|
||||
tagsBind[i].buffer = curTags;
|
||||
tagsBind[i].is_null = &nullArray[i];
|
||||
tagsBind[i].length = (uintptr_t*) &lengthArray[i];
|
||||
|
||||
curTags += lengthArray[i];
|
||||
}
|
||||
|
||||
int32_t code = taos_stmt_set_tbname_tags((void*)stmt, name, tagsBind);
|
||||
jniDebug("jobj:%p, conn:%p, set table name:%s, numOfTags:%d", jobj, tsconn, name,
|
||||
numOfTags);
|
||||
|
||||
tfree(tagsData);
|
||||
tfree(lengthArray);
|
||||
tfree(typeArray);
|
||||
tfree(nullArray);
|
||||
(*env)->ReleaseStringUTFChars(env, tableName, name);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
|
||||
return JNI_TDENGINE_ERROR;
|
||||
}
|
||||
|
||||
return JNI_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -310,6 +310,16 @@ public class TSDBJNIConnector {
|
|||
|
||||
private native int setBindTableNameImp(long stmt, String name, long conn);
|
||||
|
||||
public void setBindTableNameAndTags(long stmt, String tableName, int numOfTags, ByteBuffer tags, ByteBuffer typeList, ByteBuffer lengthList, ByteBuffer nullList) throws SQLException {
|
||||
int code = setTableNameTagsImp(stmt, tableName, numOfTags, tags.array(), typeList.array(), lengthList.array(),
|
||||
nullList.array(), this.taos);
|
||||
if (code != TSDBConstants.JNI_SUCCESS) {
|
||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind table name and corresponding tags");
|
||||
}
|
||||
}
|
||||
|
||||
private native int setTableNameTagsImp(long stmt, String name, int numOfTags, byte[] tags, byte[] typeList, byte[] lengthList, byte[] nullList, long conn);
|
||||
|
||||
public void bindColumnDataArray(long stmt, ByteBuffer colDataList, ByteBuffer lengthList, ByteBuffer isNullList, int type, int bytes, int numOfRows,int columnIndex) throws SQLException {
|
||||
int code = bindColDataImp(stmt, colDataList.array(), lengthList.array(), isNullList.array(), type, bytes, numOfRows, columnIndex, this.taos);
|
||||
if (code != TSDBConstants.JNI_SUCCESS) {
|
||||
|
|
|
@ -41,6 +41,9 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
|
|||
private boolean isPrepared;
|
||||
|
||||
private ArrayList<ColumnInfo> colData;
|
||||
private ArrayList<TableTagInfo> tableTags;
|
||||
private int tagValueLength;
|
||||
|
||||
private String tableName;
|
||||
private long nativeStmtHandle = 0;
|
||||
|
||||
|
@ -63,8 +66,8 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
|
|||
|
||||
if (parameterCnt > 1) {
|
||||
// the table name is also a parameter, so ignore it.
|
||||
this.colData = new ArrayList<ColumnInfo>(parameterCnt - 1);
|
||||
this.colData.addAll(Collections.nCopies(parameterCnt - 1, null));
|
||||
this.colData = new ArrayList<ColumnInfo>();
|
||||
this.tableTags = new ArrayList<TableTagInfo>();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -562,11 +565,109 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
|
|||
}
|
||||
};
|
||||
|
||||
private static class TableTagInfo {
|
||||
private boolean isNull;
|
||||
private Object value;
|
||||
private int type;
|
||||
public TableTagInfo(Object value, int type) {
|
||||
this.value = value;
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public static TableTagInfo createNullTag(int type) {
|
||||
TableTagInfo info = new TableTagInfo(null, type);
|
||||
info.isNull = true;
|
||||
return info;
|
||||
}
|
||||
};
|
||||
|
||||
public void setTableName(String name) {
|
||||
this.tableName = name;
|
||||
}
|
||||
|
||||
private void ensureTagCapacity(int index) {
|
||||
if (this.tableTags.size() < index + 1) {
|
||||
int delta = index + 1 - this.tableTags.size();
|
||||
this.tableTags.addAll(Collections.nCopies(delta, null));
|
||||
}
|
||||
}
|
||||
|
||||
public void setTagNull(int index, int type) {
|
||||
ensureTagCapacity(index);
|
||||
this.tableTags.set(index, TableTagInfo.createNullTag(type));
|
||||
}
|
||||
|
||||
public void setTagBoolean(int index, boolean value) {
|
||||
ensureTagCapacity(index);
|
||||
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_BOOL));
|
||||
this.tagValueLength += Byte.BYTES;
|
||||
}
|
||||
|
||||
public void setTagInt(int index, int value) {
|
||||
ensureTagCapacity(index);
|
||||
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_INT));
|
||||
this.tagValueLength += Integer.BYTES;
|
||||
}
|
||||
|
||||
public void setTagByte(int index, byte value) {
|
||||
ensureTagCapacity(index);
|
||||
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_TINYINT));
|
||||
this.tagValueLength += Byte.BYTES;
|
||||
}
|
||||
|
||||
public void setTagShort(int index, short value) {
|
||||
ensureTagCapacity(index);
|
||||
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_SMALLINT));
|
||||
this.tagValueLength += Short.BYTES;
|
||||
}
|
||||
|
||||
public void setTagLong(int index, long value) {
|
||||
ensureTagCapacity(index);
|
||||
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_BIGINT));
|
||||
this.tagValueLength += Long.BYTES;
|
||||
}
|
||||
|
||||
public void setTagTimestamp(int index, long value) {
|
||||
ensureTagCapacity(index);
|
||||
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP));
|
||||
this.tagValueLength += Long.BYTES;
|
||||
}
|
||||
|
||||
public void setTagFloat(int index, float value) {
|
||||
ensureTagCapacity(index);
|
||||
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_FLOAT));
|
||||
this.tagValueLength += Float.BYTES;
|
||||
}
|
||||
|
||||
public void setTagDouble(int index, double value) {
|
||||
ensureTagCapacity(index);
|
||||
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_DOUBLE));
|
||||
this.tagValueLength += Double.BYTES;
|
||||
}
|
||||
|
||||
public void setTagString(int index, String value) {
|
||||
ensureTagCapacity(index);
|
||||
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_BINARY));
|
||||
this.tagValueLength += value.getBytes().length;
|
||||
}
|
||||
|
||||
public void setTagNString(int index, String value) {
|
||||
ensureTagCapacity(index);
|
||||
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_NCHAR));
|
||||
|
||||
String charset = TaosGlobalConfig.getCharset();
|
||||
try {
|
||||
this.tagValueLength += value.getBytes(charset).length;
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public <T> void setValueImpl(int columnIndex, ArrayList<T> list, int type, int bytes) throws SQLException {
|
||||
if (this.colData.size() == 0) {
|
||||
this.colData.addAll(Collections.nCopies(this.parameters.length - 1 - this.tableTags.size(), null));
|
||||
|
||||
}
|
||||
ColumnInfo col = (ColumnInfo) this.colData.get(columnIndex);
|
||||
if (col == null) {
|
||||
ColumnInfo p = new ColumnInfo();
|
||||
|
@ -641,7 +742,122 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
|
|||
|
||||
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
|
||||
this.nativeStmtHandle = connector.prepareStmt(rawSql);
|
||||
connector.setBindTableName(this.nativeStmtHandle, this.tableName);
|
||||
|
||||
if (this.tableTags == null) {
|
||||
connector.setBindTableName(this.nativeStmtHandle, this.tableName);
|
||||
} else {
|
||||
int num = this.tableTags.size();
|
||||
ByteBuffer tagDataList = ByteBuffer.allocate(this.tagValueLength);
|
||||
tagDataList.order(ByteOrder.LITTLE_ENDIAN);
|
||||
|
||||
ByteBuffer typeList = ByteBuffer.allocate(num);
|
||||
typeList.order(ByteOrder.LITTLE_ENDIAN);
|
||||
|
||||
ByteBuffer lengthList = ByteBuffer.allocate(num * Long.BYTES);
|
||||
lengthList.order(ByteOrder.LITTLE_ENDIAN);
|
||||
|
||||
ByteBuffer isNullList = ByteBuffer.allocate(num * Integer.BYTES);
|
||||
isNullList.order(ByteOrder.LITTLE_ENDIAN);
|
||||
|
||||
for (int i = 0; i < num; ++i) {
|
||||
TableTagInfo tag = this.tableTags.get(i);
|
||||
if (tag.isNull) {
|
||||
typeList.put((byte) tag.type);
|
||||
isNullList.putInt(1);
|
||||
lengthList.putLong(0);
|
||||
continue;
|
||||
}
|
||||
|
||||
switch (tag.type) {
|
||||
case TSDBConstants.TSDB_DATA_TYPE_INT: {
|
||||
Integer val = (Integer) tag.value;
|
||||
tagDataList.putInt(val);
|
||||
lengthList.putLong(Integer.BYTES);
|
||||
break;
|
||||
}
|
||||
case TSDBConstants.TSDB_DATA_TYPE_TINYINT: {
|
||||
Byte val = (Byte) tag.value;
|
||||
tagDataList.put(val);
|
||||
lengthList.putLong(Byte.BYTES);
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDBConstants.TSDB_DATA_TYPE_BOOL: {
|
||||
Boolean val = (Boolean) tag.value;
|
||||
tagDataList.put((byte) (val ? 1 : 0));
|
||||
lengthList.putLong(Byte.BYTES);
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT: {
|
||||
Short val = (Short) tag.value;
|
||||
tagDataList.putShort(val);
|
||||
lengthList.putLong(Short.BYTES);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP:
|
||||
case TSDBConstants.TSDB_DATA_TYPE_BIGINT: {
|
||||
Long val = (Long) tag.value;
|
||||
tagDataList.putLong(val == null ? 0 : val);
|
||||
lengthList.putLong(Long.BYTES);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDBConstants.TSDB_DATA_TYPE_FLOAT: {
|
||||
Float val = (Float) tag.value;
|
||||
tagDataList.putFloat(val == null ? 0 : val);
|
||||
lengthList.putLong(Float.BYTES);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
|
||||
Double val = (Double) tag.value;
|
||||
tagDataList.putDouble(val == null ? 0 : val);
|
||||
lengthList.putLong(Double.BYTES);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDBConstants.TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
|
||||
String charset = TaosGlobalConfig.getCharset();
|
||||
String val = (String) tag.value;
|
||||
|
||||
byte[] b = null;
|
||||
try {
|
||||
if (tag.type == TSDBConstants.TSDB_DATA_TYPE_BINARY) {
|
||||
b = val.getBytes();
|
||||
} else {
|
||||
b = val.getBytes(charset);
|
||||
}
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
tagDataList.put(b);
|
||||
lengthList.putLong(b.length);
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDBConstants.TSDB_DATA_TYPE_UTINYINT:
|
||||
case TSDBConstants.TSDB_DATA_TYPE_USMALLINT:
|
||||
case TSDBConstants.TSDB_DATA_TYPE_UINT:
|
||||
case TSDBConstants.TSDB_DATA_TYPE_UBIGINT: {
|
||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "not support data types");
|
||||
}
|
||||
}
|
||||
|
||||
typeList.put((byte) tag.type);
|
||||
isNullList.putInt(tag.isNull? 1 : 0);
|
||||
}
|
||||
|
||||
connector.setBindTableNameAndTags(this.nativeStmtHandle, this.tableName, this.tableTags.size(), tagDataList,
|
||||
typeList, lengthList, isNullList);
|
||||
}
|
||||
|
||||
ColumnInfo colInfo = (ColumnInfo) this.colData.get(0);
|
||||
if (colInfo == null) {
|
||||
|
|
Loading…
Reference in New Issue