Merge pull request #6080 from taosdata/feature/TD-4038
[TD-4038]support bind multiple tables
This commit is contained in:
commit
bb7e1b224c
|
@ -48,6 +48,8 @@ void tscLockByThread(int64_t *lockedBy);
|
||||||
|
|
||||||
void tscUnlockByThread(int64_t *lockedBy);
|
void tscUnlockByThread(int64_t *lockedBy);
|
||||||
|
|
||||||
|
int tsInsertInitialCheck(SSqlObj *pSql);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -154,13 +154,12 @@ typedef struct STagCond {
|
||||||
|
|
||||||
typedef struct SParamInfo {
|
typedef struct SParamInfo {
|
||||||
int32_t idx;
|
int32_t idx;
|
||||||
char type;
|
uint8_t type;
|
||||||
uint8_t timePrec;
|
uint8_t timePrec;
|
||||||
int16_t bytes;
|
int16_t bytes;
|
||||||
uint32_t offset;
|
uint32_t offset;
|
||||||
} SParamInfo;
|
} SParamInfo;
|
||||||
|
|
||||||
|
|
||||||
typedef struct SBoundColumn {
|
typedef struct SBoundColumn {
|
||||||
bool hasVal; // denote if current column has bound or not
|
bool hasVal; // denote if current column has bound or not
|
||||||
int32_t offset; // all column offset value
|
int32_t offset; // all column offset value
|
||||||
|
@ -372,7 +371,8 @@ typedef struct SSqlObj {
|
||||||
tsem_t rspSem;
|
tsem_t rspSem;
|
||||||
SSqlCmd cmd;
|
SSqlCmd cmd;
|
||||||
SSqlRes res;
|
SSqlRes res;
|
||||||
|
bool isBind;
|
||||||
|
|
||||||
SSubqueryState subState;
|
SSubqueryState subState;
|
||||||
struct SSqlObj **pSubs;
|
struct SSqlObj **pSubs;
|
||||||
|
|
||||||
|
|
|
@ -100,7 +100,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp
|
||||||
/*
|
/*
|
||||||
* Class: com_taosdata_jdbc_TSDBJNIConnector
|
* Class: com_taosdata_jdbc_TSDBJNIConnector
|
||||||
* Method: isUpdateQueryImp
|
* Method: isUpdateQueryImp
|
||||||
* Signature: (J)J
|
* Signature: (JJ)I
|
||||||
*/
|
*/
|
||||||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_isUpdateQueryImp
|
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_isUpdateQueryImp
|
||||||
(JNIEnv *env, jobject jobj, jlong con, jlong tres);
|
(JNIEnv *env, jobject jobj, jlong con, jlong tres);
|
||||||
|
@ -185,6 +185,44 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp
|
||||||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_validateCreateTableSqlImp
|
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_validateCreateTableSqlImp
|
||||||
(JNIEnv *, jobject, jlong, jbyteArray);
|
(JNIEnv *, jobject, jlong, jbyteArray);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Class: com_taosdata_jdbc_TSDBJNIConnector
|
||||||
|
* Method: prepareStmtImp
|
||||||
|
* Signature: ([BJ)I
|
||||||
|
*/
|
||||||
|
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp
|
||||||
|
(JNIEnv *, jobject, jbyteArray, jlong);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Class: com_taosdata_jdbc_TSDBJNIConnector
|
||||||
|
* Method: setBindTableNameImp
|
||||||
|
* Signature: (JLjava/lang/String;J)I
|
||||||
|
*/
|
||||||
|
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameImp
|
||||||
|
(JNIEnv *, jobject, jlong, jstring, jlong);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Class: com_taosdata_jdbc_TSDBJNIConnector
|
||||||
|
* Method: bindColDataImp
|
||||||
|
* Signature: (J[B[B[BIIIIJ)J
|
||||||
|
*/
|
||||||
|
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp
|
||||||
|
(JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jbyteArray, jint, jint, jint, jint, jlong);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Class: com_taosdata_jdbc_TSDBJNIConnector
|
||||||
|
* Method: executeBatchImp
|
||||||
|
* Signature: (JJ)I
|
||||||
|
*/
|
||||||
|
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Class: com_taosdata_jdbc_TSDBJNIConnector
|
||||||
|
* Method: executeBatchImp
|
||||||
|
* Signature: (JJ)I
|
||||||
|
*/
|
||||||
|
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -687,4 +687,194 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TDDBJNIConnector_getResultTimePrec
|
||||||
}
|
}
|
||||||
|
|
||||||
return taos_result_precision(result);
|
return taos_result_precision(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp(JNIEnv *env, jobject jobj, jbyteArray jsql, jlong con) {
|
||||||
|
TAOS *tscon = (TAOS *)con;
|
||||||
|
if (tscon == NULL) {
|
||||||
|
jniError("jobj:%p, connection already closed", jobj);
|
||||||
|
return JNI_CONNECTION_NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (jsql == NULL) {
|
||||||
|
jniError("jobj:%p, conn:%p, empty sql string", jobj, tscon);
|
||||||
|
return JNI_SQL_NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
jsize len = (*env)->GetArrayLength(env, jsql);
|
||||||
|
|
||||||
|
char *str = (char *) calloc(1, sizeof(char) * (len + 1));
|
||||||
|
if (str == NULL) {
|
||||||
|
jniError("jobj:%p, conn:%p, alloc memory failed", jobj, tscon);
|
||||||
|
return JNI_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)str);
|
||||||
|
if ((*env)->ExceptionCheck(env)) {
|
||||||
|
// todo handle error
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_STMT* pStmt = taos_stmt_init(tscon);
|
||||||
|
int32_t code = taos_stmt_prepare(pStmt, str, len);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
|
||||||
|
return JNI_TDENGINE_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
free(str);
|
||||||
|
return (jlong) pStmt;
|
||||||
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameImp(JNIEnv *env, jobject jobj, jlong stmt, jstring jname, jlong conn) {
|
||||||
|
TAOS *tsconn = (TAOS *)conn;
|
||||||
|
if (tsconn == NULL) {
|
||||||
|
jniError("jobj:%p, connection already closed", jobj);
|
||||||
|
return JNI_CONNECTION_NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_STMT* pStmt = (TAOS_STMT*) stmt;
|
||||||
|
if (pStmt == NULL) {
|
||||||
|
jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn);
|
||||||
|
return JNI_SQL_NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
const char *name = (*env)->GetStringUTFChars(env, jname, NULL);
|
||||||
|
|
||||||
|
int32_t code = taos_stmt_set_tbname((void*)stmt, name);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
(*env)->ReleaseStringUTFChars(env, jname, name);
|
||||||
|
|
||||||
|
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
|
||||||
|
return JNI_TDENGINE_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
jniDebug("jobj:%p, conn:%p, set stmt bind table name:%s", jobj, tsconn, name);
|
||||||
|
|
||||||
|
(*env)->ReleaseStringUTFChars(env, jname, name);
|
||||||
|
return JNI_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(JNIEnv *env, jobject jobj, jlong stmt,
|
||||||
|
jbyteArray colDataList, jbyteArray lengthList, jbyteArray nullList, jint dataType, jint dataBytes, jint numOfRows, jint colIndex, jlong con) {
|
||||||
|
TAOS *tscon = (TAOS *)con;
|
||||||
|
if (tscon == NULL) {
|
||||||
|
jniError("jobj:%p, connection already closed", jobj);
|
||||||
|
return JNI_CONNECTION_NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_STMT* pStmt = (TAOS_STMT*) stmt;
|
||||||
|
if (pStmt == NULL) {
|
||||||
|
jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
|
||||||
|
return JNI_SQL_NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo refactor
|
||||||
|
jsize len = (*env)->GetArrayLength(env, colDataList);
|
||||||
|
char *colBuf = (char *)calloc(1, len);
|
||||||
|
(*env)->GetByteArrayRegion(env, colDataList, 0, len, (jbyte *)colBuf);
|
||||||
|
if ((*env)->ExceptionCheck(env)) {
|
||||||
|
// todo handle error
|
||||||
|
}
|
||||||
|
|
||||||
|
len = (*env)->GetArrayLength(env, lengthList);
|
||||||
|
char *lengthArray = (char*) calloc(1, len);
|
||||||
|
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte*) lengthArray);
|
||||||
|
if ((*env)->ExceptionCheck(env)) {
|
||||||
|
}
|
||||||
|
|
||||||
|
len = (*env)->GetArrayLength(env, nullList);
|
||||||
|
char *nullArray = (char*) calloc(1, len);
|
||||||
|
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte*) nullArray);
|
||||||
|
if ((*env)->ExceptionCheck(env)) {
|
||||||
|
}
|
||||||
|
|
||||||
|
// bind multi-rows with only one invoke.
|
||||||
|
TAOS_MULTI_BIND* b = calloc(1, sizeof(TAOS_MULTI_BIND));
|
||||||
|
|
||||||
|
b->num = numOfRows;
|
||||||
|
b->buffer_type = dataType; // todo check data type
|
||||||
|
b->buffer_length = IS_VAR_DATA_TYPE(dataType)? dataBytes:tDataTypes[dataType].bytes;
|
||||||
|
b->is_null = nullArray;
|
||||||
|
b->buffer = colBuf;
|
||||||
|
b->length = (int32_t*)lengthArray;
|
||||||
|
|
||||||
|
// set the length and is_null array
|
||||||
|
switch(dataType) {
|
||||||
|
case TSDB_DATA_TYPE_INT:
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
|
case TSDB_DATA_TYPE_BIGINT: {
|
||||||
|
int32_t bytes = tDataTypes[dataType].bytes;
|
||||||
|
for(int32_t i = 0; i < numOfRows; ++i) {
|
||||||
|
b->length[i] = bytes;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
|
case TSDB_DATA_TYPE_BINARY: {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = taos_stmt_bind_single_param_batch(pStmt, b, colIndex);
|
||||||
|
tfree(b->length);
|
||||||
|
tfree(b->buffer);
|
||||||
|
tfree(b->is_null);
|
||||||
|
tfree(b);
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
|
||||||
|
return JNI_TDENGINE_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
return JNI_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(JNIEnv *env, jobject jobj, jlong stmt, jlong con) {
|
||||||
|
TAOS *tscon = (TAOS *)con;
|
||||||
|
if (tscon == NULL) {
|
||||||
|
jniError("jobj:%p, connection already closed", jobj);
|
||||||
|
return JNI_CONNECTION_NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_STMT *pStmt = (TAOS_STMT*) stmt;
|
||||||
|
if (pStmt == NULL) {
|
||||||
|
jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
|
||||||
|
return JNI_SQL_NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_stmt_add_batch(pStmt);
|
||||||
|
int32_t code = taos_stmt_execute(pStmt);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
|
||||||
|
return JNI_TDENGINE_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
jniDebug("jobj:%p, conn:%p, batch execute", jobj, tscon);
|
||||||
|
return JNI_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, jlong con) {
|
||||||
|
TAOS *tscon = (TAOS *)con;
|
||||||
|
if (tscon == NULL) {
|
||||||
|
jniError("jobj:%p, connection already closed", jobj);
|
||||||
|
return JNI_CONNECTION_NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_STMT *pStmt = (TAOS_STMT*) stmt;
|
||||||
|
if (pStmt == NULL) {
|
||||||
|
jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
|
||||||
|
return JNI_SQL_NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = taos_stmt_close(pStmt);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
|
||||||
|
return JNI_TDENGINE_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon);
|
||||||
|
return JNI_SUCCESS;
|
||||||
|
}
|
||||||
|
|
|
@ -386,7 +386,7 @@ int32_t tsParseOneColumn(SSchema *pSchema, SStrToken *pToken, char *payload, cha
|
||||||
* The server time/client time should not be mixed up in one sql string
|
* The server time/client time should not be mixed up in one sql string
|
||||||
* Do not employ sort operation is not involved if server time is used.
|
* Do not employ sort operation is not involved if server time is used.
|
||||||
*/
|
*/
|
||||||
static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
|
int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
|
||||||
// once the data block is disordered, we do NOT keep previous timestamp any more
|
// once the data block is disordered, we do NOT keep previous timestamp any more
|
||||||
if (!pDataBlocks->ordered) {
|
if (!pDataBlocks->ordered) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -411,6 +411,7 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start
|
||||||
|
|
||||||
if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
|
if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
|
||||||
pDataBlocks->ordered = false;
|
pDataBlocks->ordered = false;
|
||||||
|
tscWarn("NOT ordered input timestamp");
|
||||||
}
|
}
|
||||||
|
|
||||||
pDataBlocks->prevTS = k;
|
pDataBlocks->prevTS = k;
|
||||||
|
@ -693,6 +694,8 @@ void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
|
||||||
pBlocks->numOfRows = i + 1;
|
pBlocks->numOfRows = i + 1;
|
||||||
dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
|
dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dataBuf->prevTS = INT64_MIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, STableDataBlocks* dataBuf, int32_t *totalNum) {
|
static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, STableDataBlocks* dataBuf, int32_t *totalNum) {
|
||||||
|
@ -1262,7 +1265,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
|
||||||
goto _clean;
|
goto _clean;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) { // merge according to vgId
|
if ((pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCmd->pTableBlockHashList) > 0) { // merge according to vgId
|
||||||
if ((code = tscMergeTableDataBlocks(pSql, true)) != TSDB_CODE_SUCCESS) {
|
if ((code = tscMergeTableDataBlocks(pSql, true)) != TSDB_CODE_SUCCESS) {
|
||||||
goto _clean;
|
goto _clean;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include "tscSubquery.h"
|
#include "tscSubquery.h"
|
||||||
|
|
||||||
int tsParseInsertSql(SSqlObj *pSql);
|
int tsParseInsertSql(SSqlObj *pSql);
|
||||||
|
int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start);
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
// functions for normal statement preparation
|
// functions for normal statement preparation
|
||||||
|
@ -43,10 +44,32 @@ typedef struct SNormalStmt {
|
||||||
tVariant* params;
|
tVariant* params;
|
||||||
} SNormalStmt;
|
} SNormalStmt;
|
||||||
|
|
||||||
|
typedef struct SMultiTbStmt {
|
||||||
|
bool nameSet;
|
||||||
|
uint64_t currentUid;
|
||||||
|
uint32_t tbNum;
|
||||||
|
SStrToken tbname;
|
||||||
|
SHashObj *pTableHash;
|
||||||
|
SHashObj *pTableBlockHashList; // data block for each table
|
||||||
|
} SMultiTbStmt;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
STMT_INIT = 1,
|
||||||
|
STMT_PREPARE,
|
||||||
|
STMT_SETTBNAME,
|
||||||
|
STMT_BIND,
|
||||||
|
STMT_BIND_COL,
|
||||||
|
STMT_ADD_BATCH,
|
||||||
|
STMT_EXECUTE
|
||||||
|
} STMT_ST;
|
||||||
|
|
||||||
typedef struct STscStmt {
|
typedef struct STscStmt {
|
||||||
bool isInsert;
|
bool isInsert;
|
||||||
|
bool multiTbInsert;
|
||||||
|
int16_t last;
|
||||||
STscObj* taos;
|
STscObj* taos;
|
||||||
SSqlObj* pSql;
|
SSqlObj* pSql;
|
||||||
|
SMultiTbStmt mtb;
|
||||||
SNormalStmt normal;
|
SNormalStmt normal;
|
||||||
} STscStmt;
|
} STscStmt;
|
||||||
|
|
||||||
|
@ -135,7 +158,7 @@ static int normalStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
tscDebug("param %d: type mismatch or invalid", i);
|
tscDebug("0x%"PRIx64" bind column%d: type mismatch or invalid", stmt->pSql->self, i);
|
||||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -255,12 +278,13 @@ static char* normalStmtBuildSql(STscStmt* stmt) {
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
// functions for insertion statement preparation
|
// functions for insertion statement preparation
|
||||||
static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
|
static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) {
|
||||||
if (bind->is_null != NULL && *(bind->is_null)) {
|
if (bind->is_null != NULL && *(bind->is_null)) {
|
||||||
setNull(data + param->offset, param->type, param->bytes);
|
setNull(data + param->offset, param->type, param->bytes);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
if (0) {
|
if (0) {
|
||||||
// allow user bind param data with different type
|
// allow user bind param data with different type
|
||||||
union {
|
union {
|
||||||
|
@ -641,6 +665,7 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (bind->buffer_type != param->type) {
|
if (bind->buffer_type != param->type) {
|
||||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
|
@ -690,29 +715,106 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(data + param->offset, bind->buffer, size);
|
memcpy(data + param->offset, bind->buffer, size);
|
||||||
|
if (param->offset == 0) {
|
||||||
|
if (tsCheckTimestamp(pBlock, data + param->offset) != TSDB_CODE_SUCCESS) {
|
||||||
|
tscError("invalid timestamp");
|
||||||
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int doBindBatchParam(STableDataBlocks* pBlock, SParamInfo* param, TAOS_MULTI_BIND* bind, int32_t rowNum) {
|
||||||
|
if (bind->buffer_type != param->type || !isValidDataType(param->type)) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(param->type) && bind->length == NULL) {
|
||||||
|
tscError("BINARY/NCHAR no length");
|
||||||
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < bind->num; ++i) {
|
||||||
|
char* data = pBlock->pData + sizeof(SSubmitBlk) + pBlock->rowSize * (rowNum + i);
|
||||||
|
|
||||||
|
if (bind->is_null != NULL && bind->is_null[i]) {
|
||||||
|
setNull(data + param->offset, param->type, param->bytes);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!IS_VAR_DATA_TYPE(param->type)) {
|
||||||
|
memcpy(data + param->offset, (char *)bind->buffer + bind->buffer_length * i, tDataTypes[param->type].bytes);
|
||||||
|
|
||||||
|
if (param->offset == 0) {
|
||||||
|
if (tsCheckTimestamp(pBlock, data + param->offset) != TSDB_CODE_SUCCESS) {
|
||||||
|
tscError("invalid timestamp");
|
||||||
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (param->type == TSDB_DATA_TYPE_BINARY) {
|
||||||
|
if (bind->length[i] > (uintptr_t)param->bytes) {
|
||||||
|
tscError("binary length too long, ignore it, max:%d, actual:%d", param->bytes, (int32_t)bind->length[i]);
|
||||||
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
|
}
|
||||||
|
int16_t bsize = (short)bind->length[i];
|
||||||
|
STR_WITH_SIZE_TO_VARSTR(data + param->offset, (char *)bind->buffer + bind->buffer_length * i, bsize);
|
||||||
|
} else if (param->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
if (bind->length[i] > (uintptr_t)param->bytes) {
|
||||||
|
tscError("nchar string length too long, ignore it, max:%d, actual:%d", param->bytes, (int32_t)bind->length[i]);
|
||||||
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t output = 0;
|
||||||
|
if (!taosMbsToUcs4((char *)bind->buffer + bind->buffer_length * i, bind->length[i], varDataVal(data + param->offset), param->bytes - VARSTR_HEADER_SIZE, &output)) {
|
||||||
|
tscError("convert nchar string to UCS4_LE failed:%s", (char*)((char *)bind->buffer + bind->buffer_length * i));
|
||||||
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
varDataSetLen(data + param->offset, output);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
|
static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
|
||||||
SSqlCmd* pCmd = &stmt->pSql->cmd;
|
SSqlCmd* pCmd = &stmt->pSql->cmd;
|
||||||
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0);
|
|
||||||
|
|
||||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
|
||||||
if (pCmd->pTableBlockHashList == NULL) {
|
|
||||||
pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
STableDataBlocks* pBlock = NULL;
|
STableDataBlocks* pBlock = NULL;
|
||||||
|
|
||||||
|
if (pStmt->multiTbInsert) {
|
||||||
|
if (pCmd->pTableBlockHashList == NULL) {
|
||||||
|
tscError("0x%"PRIx64" Table block hash list is empty", pStmt->pSql->self);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pCmd->pTableBlockHashList, (const char*)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid));
|
||||||
|
if (t1 == NULL) {
|
||||||
|
tscError("0x%"PRIx64" no table data block in hash list, uid:%" PRId64 , pStmt->pSql->self, pStmt->mtb.currentUid);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t ret =
|
pBlock = *t1;
|
||||||
tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
|
} else {
|
||||||
pTableMeta->tableInfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pBlock, NULL);
|
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0);
|
||||||
if (ret != 0) {
|
|
||||||
// todo handle error
|
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||||
|
if (pCmd->pTableBlockHashList == NULL) {
|
||||||
|
pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t ret =
|
||||||
|
tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
|
||||||
|
pTableMeta->tableInfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pBlock, NULL);
|
||||||
|
if (ret != 0) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t totalDataSize = sizeof(SSubmitBlk) + pCmd->batchSize * pBlock->rowSize;
|
uint32_t totalDataSize = sizeof(SSubmitBlk) + (pCmd->batchSize + 1) * pBlock->rowSize;
|
||||||
if (totalDataSize > pBlock->nAllocSize) {
|
if (totalDataSize > pBlock->nAllocSize) {
|
||||||
const double factor = 1.5;
|
const double factor = 1.5;
|
||||||
|
|
||||||
|
@ -729,9 +831,9 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
|
||||||
for (uint32_t j = 0; j < pBlock->numOfParams; ++j) {
|
for (uint32_t j = 0; j < pBlock->numOfParams; ++j) {
|
||||||
SParamInfo* param = &pBlock->params[j];
|
SParamInfo* param = &pBlock->params[j];
|
||||||
|
|
||||||
int code = doBindParam(data, param, &bind[param->idx]);
|
int code = doBindParam(pBlock, data, param, &bind[param->idx], 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tscDebug("param %d: type mismatch or invalid", param->idx);
|
tscDebug("0x%"PRIx64" bind column %d: type mismatch or invalid", pStmt->pSql->self, param->idx);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -739,9 +841,135 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int insertStmtBindParamBatch(STscStmt* stmt, TAOS_MULTI_BIND* bind, int colIdx) {
|
||||||
|
SSqlCmd* pCmd = &stmt->pSql->cmd;
|
||||||
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
int rowNum = bind->num;
|
||||||
|
|
||||||
|
STableDataBlocks* pBlock = NULL;
|
||||||
|
|
||||||
|
if (pStmt->multiTbInsert) {
|
||||||
|
if (pCmd->pTableBlockHashList == NULL) {
|
||||||
|
tscError("0x%"PRIx64" Table block hash list is empty", pStmt->pSql->self);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pCmd->pTableBlockHashList, (const char*)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid));
|
||||||
|
if (t1 == NULL) {
|
||||||
|
tscError("0x%"PRIx64" no table data block in hash list, uid:%" PRId64 , pStmt->pSql->self, pStmt->mtb.currentUid);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlock = *t1;
|
||||||
|
} else {
|
||||||
|
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0);
|
||||||
|
|
||||||
|
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||||
|
if (pCmd->pTableBlockHashList == NULL) {
|
||||||
|
pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t ret =
|
||||||
|
tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
|
||||||
|
pTableMeta->tableInfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pBlock, NULL);
|
||||||
|
if (ret != 0) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(colIdx == -1 || (colIdx >= 0 && colIdx < pBlock->numOfParams));
|
||||||
|
|
||||||
|
uint32_t totalDataSize = sizeof(SSubmitBlk) + (pCmd->batchSize + rowNum) * pBlock->rowSize;
|
||||||
|
if (totalDataSize > pBlock->nAllocSize) {
|
||||||
|
const double factor = 1.5;
|
||||||
|
|
||||||
|
void* tmp = realloc(pBlock->pData, (uint32_t)(totalDataSize * factor));
|
||||||
|
if (tmp == NULL) {
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlock->pData = (char*)tmp;
|
||||||
|
pBlock->nAllocSize = (uint32_t)(totalDataSize * factor);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (colIdx == -1) {
|
||||||
|
for (uint32_t j = 0; j < pBlock->numOfParams; ++j) {
|
||||||
|
SParamInfo* param = &pBlock->params[j];
|
||||||
|
if (bind[param->idx].num != rowNum) {
|
||||||
|
tscError("0x%"PRIx64" param %d: num[%d:%d] not match", pStmt->pSql->self, param->idx, rowNum, bind[param->idx].num);
|
||||||
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
int code = doBindBatchParam(pBlock, param, &bind[param->idx], pCmd->batchSize);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tscError("0x%"PRIx64" bind column %d: type mismatch or invalid", pStmt->pSql->self, param->idx);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pCmd->batchSize += rowNum - 1;
|
||||||
|
} else {
|
||||||
|
SParamInfo* param = &pBlock->params[colIdx];
|
||||||
|
|
||||||
|
int code = doBindBatchParam(pBlock, param, bind, pCmd->batchSize);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tscError("0x%"PRIx64" bind column %d: type mismatch or invalid", pStmt->pSql->self, param->idx);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (colIdx == (pBlock->numOfParams - 1)) {
|
||||||
|
pCmd->batchSize += rowNum - 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int insertStmtUpdateBatch(STscStmt* stmt) {
|
||||||
|
SSqlObj* pSql = stmt->pSql;
|
||||||
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
STableDataBlocks* pBlock = NULL;
|
||||||
|
|
||||||
|
if (pCmd->batchSize > INT16_MAX) {
|
||||||
|
tscError("too many record:%d", pCmd->batchSize);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(pCmd->numOfClause == 1);
|
||||||
|
if (taosHashGetSize(pCmd->pTableBlockHashList) == 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pCmd->pTableBlockHashList, (const char*)&stmt->mtb.currentUid, sizeof(stmt->mtb.currentUid));
|
||||||
|
if (t1 == NULL) {
|
||||||
|
tscError("0x%"PRIx64" no table data block in hash list, uid:%" PRId64 , pSql->self, stmt->mtb.currentUid);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlock = *t1;
|
||||||
|
|
||||||
|
STableMeta* pTableMeta = pBlock->pTableMeta;
|
||||||
|
|
||||||
|
pBlock->size = sizeof(SSubmitBlk) + pCmd->batchSize * pBlock->rowSize;
|
||||||
|
SSubmitBlk* pBlk = (SSubmitBlk*) pBlock->pData;
|
||||||
|
pBlk->numOfRows = pCmd->batchSize;
|
||||||
|
pBlk->dataLen = 0;
|
||||||
|
pBlk->uid = pTableMeta->id.uid;
|
||||||
|
pBlk->tid = pTableMeta->id.tid;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int insertStmtAddBatch(STscStmt* stmt) {
|
static int insertStmtAddBatch(STscStmt* stmt) {
|
||||||
SSqlCmd* pCmd = &stmt->pSql->cmd;
|
SSqlCmd* pCmd = &stmt->pSql->cmd;
|
||||||
++pCmd->batchSize;
|
++pCmd->batchSize;
|
||||||
|
|
||||||
|
if (stmt->multiTbInsert) {
|
||||||
|
return insertStmtUpdateBatch(stmt);
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -835,6 +1063,83 @@ static int insertStmtExecute(STscStmt* stmt) {
|
||||||
return pSql->res.code;
|
return pSql->res.code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void insertBatchClean(STscStmt* pStmt) {
|
||||||
|
SSqlCmd *pCmd = &pStmt->pSql->cmd;
|
||||||
|
SSqlObj *pSql = pStmt->pSql;
|
||||||
|
int32_t size = taosHashGetSize(pCmd->pTableBlockHashList);
|
||||||
|
|
||||||
|
// data block reset
|
||||||
|
pCmd->batchSize = 0;
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < size; ++i) {
|
||||||
|
if (pCmd->pTableNameList && pCmd->pTableNameList[i]) {
|
||||||
|
tfree(pCmd->pTableNameList[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tfree(pCmd->pTableNameList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
STableDataBlocks** p = taosHashIterate(pCmd->pTableBlockHashList, NULL);
|
||||||
|
|
||||||
|
STableDataBlocks* pOneTableBlock = *p;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
|
||||||
|
|
||||||
|
pOneTableBlock->size = sizeof(SSubmitBlk);
|
||||||
|
|
||||||
|
pBlocks->numOfRows = 0;
|
||||||
|
|
||||||
|
p = taosHashIterate(pCmd->pTableBlockHashList, p);
|
||||||
|
if (p == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOneTableBlock = *p;
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
||||||
|
pCmd->numOfTables = 0;
|
||||||
|
|
||||||
|
taosHashEmpty(pCmd->pTableBlockHashList);
|
||||||
|
tscFreeSqlResult(pSql);
|
||||||
|
tscFreeSubobj(pSql);
|
||||||
|
tfree(pSql->pSubs);
|
||||||
|
pSql->subState.numOfSub = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int insertBatchStmtExecute(STscStmt* pStmt) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if(pStmt->mtb.nameSet == false) {
|
||||||
|
tscError("0x%"PRIx64" no table name set", pStmt->pSql->self);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStmt->pSql->retry = pStmt->pSql->maxRetry + 1; //no retry
|
||||||
|
|
||||||
|
if (taosHashGetSize(pStmt->pSql->cmd.pTableBlockHashList) > 0) { // merge according to vgId
|
||||||
|
if ((code = tscMergeTableDataBlocks(pStmt->pSql, false)) != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tscHandleMultivnodeInsert(pStmt->pSql);
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for the callback function to post the semaphore
|
||||||
|
tsem_wait(&pStmt->pSql->rspSem);
|
||||||
|
|
||||||
|
insertBatchClean(pStmt);
|
||||||
|
|
||||||
|
return pStmt->pSql->res.code;
|
||||||
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
// interface functions
|
// interface functions
|
||||||
|
|
||||||
|
@ -866,7 +1171,9 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
|
||||||
pSql->signature = pSql;
|
pSql->signature = pSql;
|
||||||
pSql->pTscObj = pObj;
|
pSql->pTscObj = pObj;
|
||||||
pSql->maxRetry = TSDB_MAX_REPLICA;
|
pSql->maxRetry = TSDB_MAX_REPLICA;
|
||||||
|
pSql->isBind = true;
|
||||||
pStmt->pSql = pSql;
|
pStmt->pSql = pSql;
|
||||||
|
pStmt->last = STMT_INIT;
|
||||||
|
|
||||||
return pStmt;
|
return pStmt;
|
||||||
}
|
}
|
||||||
|
@ -879,6 +1186,13 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
|
||||||
return TSDB_CODE_TSC_DISCONNECTED;
|
return TSDB_CODE_TSC_DISCONNECTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pStmt->last != STMT_INIT) {
|
||||||
|
tscError("prepare status error, last:%d", pStmt->last);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStmt->last = STMT_PREPARE;
|
||||||
|
|
||||||
SSqlObj* pSql = pStmt->pSql;
|
SSqlObj* pSql = pStmt->pSql;
|
||||||
size_t sqlLen = strlen(sql);
|
size_t sqlLen = strlen(sql);
|
||||||
|
|
||||||
|
@ -917,6 +1231,36 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
|
||||||
|
|
||||||
registerSqlObj(pSql);
|
registerSqlObj(pSql);
|
||||||
|
|
||||||
|
int32_t ret = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
if ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t index = 0;
|
||||||
|
SStrToken sToken = tStrGetToken(pCmd->curSql, &index, false);
|
||||||
|
|
||||||
|
if (sToken.n == 0) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_SQL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sToken.n == 1 && sToken.type == TK_QUESTION) {
|
||||||
|
pStmt->multiTbInsert = true;
|
||||||
|
pStmt->mtb.tbname = sToken;
|
||||||
|
pStmt->mtb.nameSet = false;
|
||||||
|
if (pStmt->mtb.pTableHash == NULL) {
|
||||||
|
pStmt->mtb.pTableHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||||
|
}
|
||||||
|
if (pStmt->mtb.pTableBlockHashList == NULL) {
|
||||||
|
pStmt->mtb.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStmt->multiTbInsert = false;
|
||||||
|
memset(&pStmt->mtb, 0, sizeof(pStmt->mtb));
|
||||||
|
|
||||||
int32_t code = tsParseSql(pSql, true);
|
int32_t code = tsParseSql(pSql, true);
|
||||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
// wait for the callback function to post the semaphore
|
// wait for the callback function to post the semaphore
|
||||||
|
@ -931,6 +1275,105 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
|
||||||
return normalStmtPrepare(pStmt);
|
return normalStmtPrepare(pStmt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
|
||||||
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
SSqlObj* pSql = pStmt->pSql;
|
||||||
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
|
||||||
|
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (name == NULL) {
|
||||||
|
terrno = TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
tscError("0x%"PRIx64" name is NULL", pSql->self);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStmt->multiTbInsert == false || !tscIsInsertData(pSql->sqlstr)) {
|
||||||
|
terrno = TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
tscError("0x%"PRIx64" not multi table insert", pSql->self);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStmt->last == STMT_INIT || pStmt->last == STMT_BIND || pStmt->last == STMT_BIND_COL) {
|
||||||
|
tscError("0x%"PRIx64" settbname status error, last:%d", pSql->self, pStmt->last);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStmt->last = STMT_SETTBNAME;
|
||||||
|
|
||||||
|
uint64_t* uid = (uint64_t*)taosHashGet(pStmt->mtb.pTableHash, name, strlen(name));
|
||||||
|
if (uid != NULL) {
|
||||||
|
pStmt->mtb.currentUid = *uid;
|
||||||
|
|
||||||
|
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pStmt->mtb.pTableBlockHashList, (const char*)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid));
|
||||||
|
if (t1 == NULL) {
|
||||||
|
tscError("0x%"PRIx64" no table data block in hash list, uid:%" PRId64 , pSql->self, pStmt->mtb.currentUid);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSubmitBlk* pBlk = (SSubmitBlk*) (*t1)->pData;
|
||||||
|
pCmd->batchSize = pBlk->numOfRows;
|
||||||
|
|
||||||
|
taosHashPut(pCmd->pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)t1, POINTER_BYTES);
|
||||||
|
|
||||||
|
tscDebug("0x%"PRIx64" table:%s is already prepared, uid:%" PRIu64, pSql->self, name, pStmt->mtb.currentUid);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStmt->mtb.tbname = tscReplaceStrToken(&pSql->sqlstr, &pStmt->mtb.tbname, name);
|
||||||
|
pStmt->mtb.nameSet = true;
|
||||||
|
|
||||||
|
tscDebug("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
|
||||||
|
|
||||||
|
pSql->cmd.parseFinished = 0;
|
||||||
|
pSql->cmd.numOfParams = 0;
|
||||||
|
pSql->cmd.batchSize = 0;
|
||||||
|
|
||||||
|
if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) {
|
||||||
|
SHashObj* hashList = pCmd->pTableBlockHashList;
|
||||||
|
pCmd->pTableBlockHashList = NULL;
|
||||||
|
tscResetSqlCmd(pCmd, true);
|
||||||
|
pCmd->pTableBlockHashList = hashList;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = tsParseSql(pStmt->pSql, true);
|
||||||
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
|
// wait for the callback function to post the semaphore
|
||||||
|
tsem_wait(&pStmt->pSql->rspSem);
|
||||||
|
|
||||||
|
code = pStmt->pSql->res.code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0);
|
||||||
|
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||||
|
STableDataBlocks* pBlock = NULL;
|
||||||
|
code = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
|
||||||
|
pTableMeta->tableInfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pBlock, NULL);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSubmitBlk* blk = (SSubmitBlk*)pBlock->pData;
|
||||||
|
blk->numOfRows = 0;
|
||||||
|
|
||||||
|
pStmt->mtb.currentUid = pTableMeta->id.uid;
|
||||||
|
pStmt->mtb.tbNum++;
|
||||||
|
|
||||||
|
taosHashPut(pStmt->mtb.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES);
|
||||||
|
|
||||||
|
taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid));
|
||||||
|
|
||||||
|
tscDebug("0x%"PRIx64" table:%s is prepared, uid:%" PRIx64, pSql->self, name, pStmt->mtb.currentUid);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int taos_stmt_close(TAOS_STMT* stmt) {
|
int taos_stmt_close(TAOS_STMT* stmt) {
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
if (!pStmt->isInsert) {
|
if (!pStmt->isInsert) {
|
||||||
|
@ -943,6 +1386,13 @@ int taos_stmt_close(TAOS_STMT* stmt) {
|
||||||
}
|
}
|
||||||
free(normal->parts);
|
free(normal->parts);
|
||||||
free(normal->sql);
|
free(normal->sql);
|
||||||
|
} else {
|
||||||
|
if (pStmt->multiTbInsert) {
|
||||||
|
taosHashCleanup(pStmt->mtb.pTableHash);
|
||||||
|
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, true);
|
||||||
|
taosHashCleanup(pStmt->pSql->cmd.pTableBlockHashList);
|
||||||
|
pStmt->pSql->cmd.pTableBlockHashList = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_free_result(pStmt->pSql);
|
taos_free_result(pStmt->pSql);
|
||||||
|
@ -952,18 +1402,122 @@ int taos_stmt_close(TAOS_STMT* stmt) {
|
||||||
|
|
||||||
int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) {
|
int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) {
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
}
|
||||||
|
|
||||||
if (pStmt->isInsert) {
|
if (pStmt->isInsert) {
|
||||||
|
if (pStmt->multiTbInsert) {
|
||||||
|
if (pStmt->last != STMT_SETTBNAME && pStmt->last != STMT_ADD_BATCH) {
|
||||||
|
tscError("0x%"PRIx64" bind param status error, last:%d", pStmt->pSql->self, pStmt->last);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pStmt->last != STMT_PREPARE && pStmt->last != STMT_ADD_BATCH && pStmt->last != STMT_EXECUTE) {
|
||||||
|
tscError("0x%"PRIx64" bind param status error, last:%d", pStmt->pSql->self, pStmt->last);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pStmt->last = STMT_BIND;
|
||||||
|
|
||||||
return insertStmtBindParam(pStmt, bind);
|
return insertStmtBindParam(pStmt, bind);
|
||||||
} else {
|
} else {
|
||||||
return normalStmtBindParam(pStmt, bind);
|
return normalStmtBindParam(pStmt, bind);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind) {
|
||||||
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
|
||||||
|
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (bind == NULL || bind->num <= 0 || bind->num > INT16_MAX) {
|
||||||
|
tscError("0x%"PRIx64" invalid parameter", pStmt->pSql->self);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pStmt->isInsert) {
|
||||||
|
tscError("0x%"PRIx64" not or invalid batch insert", pStmt->pSql->self);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStmt->multiTbInsert) {
|
||||||
|
if (pStmt->last != STMT_SETTBNAME && pStmt->last != STMT_ADD_BATCH) {
|
||||||
|
tscError("0x%"PRIx64" bind param status error, last:%d", pStmt->pSql->self, pStmt->last);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pStmt->last != STMT_PREPARE && pStmt->last != STMT_ADD_BATCH && pStmt->last != STMT_EXECUTE) {
|
||||||
|
tscError("0x%"PRIx64" bind param status error, last:%d", pStmt->pSql->self, pStmt->last);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pStmt->last = STMT_BIND;
|
||||||
|
|
||||||
|
return insertStmtBindParamBatch(pStmt, bind, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int colIdx) {
|
||||||
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (bind == NULL || bind->num <= 0 || bind->num > INT16_MAX) {
|
||||||
|
tscError("0x%"PRIx64" invalid parameter", pStmt->pSql->self);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pStmt->isInsert) {
|
||||||
|
tscError("0x%"PRIx64" not or invalid batch insert", pStmt->pSql->self);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStmt->multiTbInsert) {
|
||||||
|
if (pStmt->last != STMT_SETTBNAME && pStmt->last != STMT_ADD_BATCH && pStmt->last != STMT_BIND_COL) {
|
||||||
|
tscError("0x%"PRIx64" bind param status error, last:%d", pStmt->pSql->self, pStmt->last);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pStmt->last != STMT_PREPARE && pStmt->last != STMT_ADD_BATCH && pStmt->last != STMT_BIND_COL && pStmt->last != STMT_EXECUTE) {
|
||||||
|
tscError("0x%"PRIx64" bind param status error, last:%d", pStmt->pSql->self, pStmt->last);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pStmt->last = STMT_BIND_COL;
|
||||||
|
|
||||||
|
return insertStmtBindParamBatch(pStmt, bind, colIdx);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int taos_stmt_add_batch(TAOS_STMT* stmt) {
|
int taos_stmt_add_batch(TAOS_STMT* stmt) {
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
}
|
||||||
|
|
||||||
if (pStmt->isInsert) {
|
if (pStmt->isInsert) {
|
||||||
|
if (pStmt->last != STMT_BIND && pStmt->last != STMT_BIND_COL) {
|
||||||
|
tscError("0x%"PRIx64" add batch status error, last:%d", pStmt->pSql->self, pStmt->last);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStmt->last = STMT_ADD_BATCH;
|
||||||
|
|
||||||
return insertStmtAddBatch(pStmt);
|
return insertStmtAddBatch(pStmt);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_COM_OPS_NOT_SUPPORT;
|
return TSDB_CODE_COM_OPS_NOT_SUPPORT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -978,8 +1532,24 @@ int taos_stmt_reset(TAOS_STMT* stmt) {
|
||||||
int taos_stmt_execute(TAOS_STMT* stmt) {
|
int taos_stmt_execute(TAOS_STMT* stmt) {
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
}
|
||||||
|
|
||||||
if (pStmt->isInsert) {
|
if (pStmt->isInsert) {
|
||||||
ret = insertStmtExecute(pStmt);
|
if (pStmt->last != STMT_ADD_BATCH) {
|
||||||
|
tscError("0x%"PRIx64" exec status error, last:%d", pStmt->pSql->self, pStmt->last);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStmt->last = STMT_EXECUTE;
|
||||||
|
|
||||||
|
if (pStmt->multiTbInsert) {
|
||||||
|
ret = insertBatchStmtExecute(pStmt);
|
||||||
|
} else {
|
||||||
|
ret = insertStmtExecute(pStmt);
|
||||||
|
}
|
||||||
} else { // normal stmt query
|
} else { // normal stmt query
|
||||||
char* sql = normalStmtBuildSql(pStmt);
|
char* sql = normalStmtBuildSql(pStmt);
|
||||||
if (sql == NULL) {
|
if (sql == NULL) {
|
||||||
|
@ -1074,7 +1644,7 @@ int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (idx<0 || idx>=pBlock->numOfParams) {
|
if (idx<0 || idx>=pBlock->numOfParams) {
|
||||||
tscError("param %d: out of range", idx);
|
tscError("0x%"PRIx64" param %d: out of range", pStmt->pSql->self, idx);
|
||||||
abort();
|
abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1255,67 +1255,73 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
|
||||||
|
|
||||||
STableDataBlocks* pOneTableBlock = *p;
|
STableDataBlocks* pOneTableBlock = *p;
|
||||||
while(pOneTableBlock) {
|
while(pOneTableBlock) {
|
||||||
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
|
|
||||||
int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta);
|
|
||||||
STableDataBlocks* dataBuf = NULL;
|
|
||||||
|
|
||||||
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
|
|
||||||
INSERT_HEAD_SIZE, 0, &pOneTableBlock->tableName, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
|
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
|
||||||
tscError("0x%"PRIx64" failed to prepare the data block buffer for merging table data, code:%d", pSql->self, ret);
|
|
||||||
taosHashCleanup(pVnodeDataBlockHashList);
|
|
||||||
tscDestroyBlockArrayList(pVnodeDataBlockList);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
|
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
|
||||||
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
|
if (pBlocks->numOfRows > 0) {
|
||||||
|
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
|
||||||
if (dataBuf->nAllocSize < destSize) {
|
int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta);
|
||||||
while (dataBuf->nAllocSize < destSize) {
|
STableDataBlocks* dataBuf = NULL;
|
||||||
dataBuf->nAllocSize = (uint32_t)(dataBuf->nAllocSize * 1.5);
|
|
||||||
}
|
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
|
||||||
|
INSERT_HEAD_SIZE, 0, &pOneTableBlock->tableName, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
|
||||||
char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize);
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
if (tmp != NULL) {
|
tscError("0x%"PRIx64" failed to prepare the data block buffer for merging table data, code:%d", pSql->self, ret);
|
||||||
dataBuf->pData = tmp;
|
|
||||||
memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
|
|
||||||
} else { // failed to allocate memory, free already allocated memory and return error code
|
|
||||||
tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pSql->self, dataBuf->nAllocSize);
|
|
||||||
|
|
||||||
taosHashCleanup(pVnodeDataBlockHashList);
|
taosHashCleanup(pVnodeDataBlockHashList);
|
||||||
tscDestroyBlockArrayList(pVnodeDataBlockList);
|
tscDestroyBlockArrayList(pVnodeDataBlockList);
|
||||||
tfree(dataBuf->pData);
|
return ret;
|
||||||
|
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
|
||||||
|
|
||||||
|
if (dataBuf->nAllocSize < destSize) {
|
||||||
|
while (dataBuf->nAllocSize < destSize) {
|
||||||
|
dataBuf->nAllocSize = (uint32_t)(dataBuf->nAllocSize * 1.5);
|
||||||
|
}
|
||||||
|
|
||||||
|
char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize);
|
||||||
|
if (tmp != NULL) {
|
||||||
|
dataBuf->pData = tmp;
|
||||||
|
memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
|
||||||
|
} else { // failed to allocate memory, free already allocated memory and return error code
|
||||||
|
tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pSql->self, dataBuf->nAllocSize);
|
||||||
|
|
||||||
|
taosHashCleanup(pVnodeDataBlockHashList);
|
||||||
|
tscDestroyBlockArrayList(pVnodeDataBlockList);
|
||||||
|
tfree(dataBuf->pData);
|
||||||
|
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tscSortRemoveDataBlockDupRows(pOneTableBlock);
|
||||||
|
char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
|
||||||
|
|
||||||
|
tscDebug("0x%"PRIx64" name:%s, name:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql->self, tNameGetTableName(&pOneTableBlock->tableName),
|
||||||
|
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
|
||||||
|
|
||||||
|
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
|
||||||
|
|
||||||
|
pBlocks->tid = htonl(pBlocks->tid);
|
||||||
|
pBlocks->uid = htobe64(pBlocks->uid);
|
||||||
|
pBlocks->sversion = htonl(pBlocks->sversion);
|
||||||
|
pBlocks->numOfRows = htons(pBlocks->numOfRows);
|
||||||
|
pBlocks->schemaLen = 0;
|
||||||
|
|
||||||
|
// erase the empty space reserved for binary data
|
||||||
|
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pCmd->submitSchema);
|
||||||
|
assert(finalLen <= len);
|
||||||
|
|
||||||
|
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
|
||||||
|
assert(dataBuf->size <= dataBuf->nAllocSize);
|
||||||
|
|
||||||
|
// the length does not include the SSubmitBlk structure
|
||||||
|
pBlocks->dataLen = htonl(finalLen);
|
||||||
|
dataBuf->numOfTables += 1;
|
||||||
|
|
||||||
|
pBlocks->numOfRows = 0;
|
||||||
|
}else {
|
||||||
|
tscDebug("0x%"PRIx64" table %s data block is empty", pSql->self, pOneTableBlock->tableName.tname);
|
||||||
}
|
}
|
||||||
|
|
||||||
tscSortRemoveDataBlockDupRows(pOneTableBlock);
|
|
||||||
char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
|
|
||||||
|
|
||||||
tscDebug("0x%"PRIx64" name:%s, name:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql->self, tNameGetTableName(&pOneTableBlock->tableName),
|
|
||||||
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
|
|
||||||
|
|
||||||
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
|
|
||||||
|
|
||||||
pBlocks->tid = htonl(pBlocks->tid);
|
|
||||||
pBlocks->uid = htobe64(pBlocks->uid);
|
|
||||||
pBlocks->sversion = htonl(pBlocks->sversion);
|
|
||||||
pBlocks->numOfRows = htons(pBlocks->numOfRows);
|
|
||||||
pBlocks->schemaLen = 0;
|
|
||||||
|
|
||||||
// erase the empty space reserved for binary data
|
|
||||||
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pCmd->submitSchema);
|
|
||||||
assert(finalLen <= len);
|
|
||||||
|
|
||||||
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
|
|
||||||
assert(dataBuf->size <= dataBuf->nAllocSize);
|
|
||||||
|
|
||||||
// the length does not include the SSubmitBlk structure
|
|
||||||
pBlocks->dataLen = htonl(finalLen);
|
|
||||||
dataBuf->numOfTables += 1;
|
|
||||||
|
|
||||||
p = taosHashIterate(pCmd->pTableBlockHashList, p);
|
p = taosHashIterate(pCmd->pTableBlockHashList, p);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -84,10 +84,12 @@ public abstract class AbstractResultSet extends WrapperImpl implements ResultSet
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@Deprecated
|
||||||
public InputStream getUnicodeStream(int columnIndex) throws SQLException {
|
public InputStream getUnicodeStream(int columnIndex) throws SQLException {
|
||||||
if (isClosed())
|
if (isClosed()) {
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESULTSET_CLOSED);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESULTSET_CLOSED);
|
||||||
|
}
|
||||||
|
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,6 +173,7 @@ public abstract class AbstractResultSet extends WrapperImpl implements ResultSet
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@Deprecated
|
||||||
public InputStream getUnicodeStream(String columnLabel) throws SQLException {
|
public InputStream getUnicodeStream(String columnLabel) throws SQLException {
|
||||||
return getUnicodeStream(findColumn(columnLabel));
|
return getUnicodeStream(findColumn(columnLabel));
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,7 @@ public class TSDBConnection extends AbstractConnection {
|
||||||
this.databaseMetaData.setConnection(this);
|
this.databaseMetaData.setConnection(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TSDBJNIConnector getConnection() {
|
public TSDBJNIConnector getConnector() {
|
||||||
return this.connector;
|
return this.connector;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ public class TSDBConnection extends AbstractConnection {
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new TSDBStatement(this, this.connector);
|
return new TSDBStatement(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TSDBSubscribe subscribe(String topic, String sql, boolean restart) throws SQLException {
|
public TSDBSubscribe subscribe(String topic, String sql, boolean restart) throws SQLException {
|
||||||
|
@ -74,14 +74,18 @@ public class TSDBConnection extends AbstractConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
public PreparedStatement prepareStatement(String sql) throws SQLException {
|
public PreparedStatement prepareStatement(String sql) throws SQLException {
|
||||||
if (isClosed())
|
if (isClosed()) {
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
|
||||||
return new TSDBPreparedStatement(this, this.connector, sql);
|
}
|
||||||
|
|
||||||
|
return new TSDBPreparedStatement(this, sql);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() throws SQLException {
|
public void close() throws SQLException {
|
||||||
if (isClosed)
|
if (isClosed) {
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
this.connector.closeConnection();
|
this.connector.closeConnection();
|
||||||
this.isClosed = true;
|
this.isClosed = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ package com.taosdata.jdbc;
|
||||||
|
|
||||||
import com.taosdata.jdbc.utils.TaosInfo;
|
import com.taosdata.jdbc.utils.TaosInfo;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.SQLWarning;
|
import java.sql.SQLWarning;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -29,10 +30,13 @@ public class TSDBJNIConnector {
|
||||||
private static volatile Boolean isInitialized = false;
|
private static volatile Boolean isInitialized = false;
|
||||||
|
|
||||||
private TaosInfo taosInfo = TaosInfo.getInstance();
|
private TaosInfo taosInfo = TaosInfo.getInstance();
|
||||||
|
|
||||||
// Connection pointer used in C
|
// Connection pointer used in C
|
||||||
private long taos = TSDBConstants.JNI_NULL_POINTER;
|
private long taos = TSDBConstants.JNI_NULL_POINTER;
|
||||||
|
|
||||||
// result set status in current connection
|
// result set status in current connection
|
||||||
private boolean isResultsetClosed = true;
|
private boolean isResultsetClosed = true;
|
||||||
|
|
||||||
private int affectedRows = -1;
|
private int affectedRows = -1;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
|
@ -75,7 +79,6 @@ public class TSDBJNIConnector {
|
||||||
|
|
||||||
public boolean connect(String host, int port, String dbName, String user, String password) throws SQLException {
|
public boolean connect(String host, int port, String dbName, String user, String password) throws SQLException {
|
||||||
if (this.taos != TSDBConstants.JNI_NULL_POINTER) {
|
if (this.taos != TSDBConstants.JNI_NULL_POINTER) {
|
||||||
// this.closeConnectionImp(this.taos);
|
|
||||||
closeConnection();
|
closeConnection();
|
||||||
this.taos = TSDBConstants.JNI_NULL_POINTER;
|
this.taos = TSDBConstants.JNI_NULL_POINTER;
|
||||||
}
|
}
|
||||||
|
@ -97,12 +100,6 @@ public class TSDBJNIConnector {
|
||||||
* @throws SQLException
|
* @throws SQLException
|
||||||
*/
|
*/
|
||||||
public long executeQuery(String sql) throws SQLException {
|
public long executeQuery(String sql) throws SQLException {
|
||||||
// close previous result set if the user forgets to invoke the
|
|
||||||
// free method to close previous result set.
|
|
||||||
// if (!this.isResultsetClosed) {
|
|
||||||
// freeResultSet(taosResultSetPointer);
|
|
||||||
// }
|
|
||||||
|
|
||||||
Long pSql = 0l;
|
Long pSql = 0l;
|
||||||
try {
|
try {
|
||||||
pSql = this.executeQueryImp(sql.getBytes(TaosGlobalConfig.getCharset()), this.taos);
|
pSql = this.executeQueryImp(sql.getBytes(TaosGlobalConfig.getCharset()), this.taos);
|
||||||
|
@ -169,37 +166,14 @@ public class TSDBJNIConnector {
|
||||||
private native long isUpdateQueryImp(long connection, long pSql);
|
private native long isUpdateQueryImp(long connection, long pSql);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Free resultset operation from C to release resultset pointer by JNI
|
* Free result set operation from C to release result set pointer by JNI
|
||||||
*/
|
*/
|
||||||
public int freeResultSet(long pSql) {
|
public int freeResultSet(long pSql) {
|
||||||
int res = TSDBConstants.JNI_SUCCESS;
|
int res = this.freeResultSetImp(this.taos, pSql);
|
||||||
// if (result != taosResultSetPointer && taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
|
|
||||||
// throw new RuntimeException("Invalid result set pointer");
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if (taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
|
|
||||||
res = this.freeResultSetImp(this.taos, pSql);
|
|
||||||
// taosResultSetPointer = TSDBConstants.JNI_NULL_POINTER;
|
|
||||||
// }
|
|
||||||
|
|
||||||
isResultsetClosed = true;
|
isResultsetClosed = true;
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Close the open result set which is associated to the current connection. If the result set is already
|
|
||||||
* closed, return 0 for success.
|
|
||||||
*/
|
|
||||||
// public int freeResultSet() {
|
|
||||||
// int resCode = TSDBConstants.JNI_SUCCESS;
|
|
||||||
// if (!isResultsetClosed) {
|
|
||||||
// resCode = this.freeResultSetImp(this.taos, this.taosResultSetPointer);
|
|
||||||
// taosResultSetPointer = TSDBConstants.JNI_NULL_POINTER;
|
|
||||||
// isResultsetClosed = true;
|
|
||||||
// }
|
|
||||||
// return resCode;
|
|
||||||
// }
|
|
||||||
|
|
||||||
private native int freeResultSetImp(long connection, long result);
|
private native int freeResultSetImp(long connection, long result);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -246,6 +220,7 @@ public class TSDBJNIConnector {
|
||||||
*/
|
*/
|
||||||
public void closeConnection() throws SQLException {
|
public void closeConnection() throws SQLException {
|
||||||
int code = this.closeConnectionImp(this.taos);
|
int code = this.closeConnectionImp(this.taos);
|
||||||
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL);
|
||||||
} else if (code == 0) {
|
} else if (code == 0) {
|
||||||
|
@ -253,6 +228,7 @@ public class TSDBJNIConnector {
|
||||||
} else {
|
} else {
|
||||||
throw new SQLException("Undefined error code returned by TDengine when closing a connection");
|
throw new SQLException("Undefined error code returned by TDengine when closing a connection");
|
||||||
}
|
}
|
||||||
|
|
||||||
// invoke closeConnectionImpl only here
|
// invoke closeConnectionImpl only here
|
||||||
taosInfo.connect_close_increment();
|
taosInfo.connect_close_increment();
|
||||||
}
|
}
|
||||||
|
@ -289,7 +265,7 @@ public class TSDBJNIConnector {
|
||||||
private native void unsubscribeImp(long subscription, boolean isKeep);
|
private native void unsubscribeImp(long subscription, boolean isKeep);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Validate if a <I>create table</I> sql statement is correct without actually creating that table
|
* Validate if a <I>create table</I> SQL statement is correct without actually creating that table
|
||||||
*/
|
*/
|
||||||
public boolean validateCreateTableSql(String sql) {
|
public boolean validateCreateTableSql(String sql) {
|
||||||
int res = validateCreateTableSqlImp(taos, sql.getBytes());
|
int res = validateCreateTableSqlImp(taos, sql.getBytes());
|
||||||
|
@ -297,4 +273,66 @@ public class TSDBJNIConnector {
|
||||||
}
|
}
|
||||||
|
|
||||||
private native int validateCreateTableSqlImp(long connection, byte[] sqlBytes);
|
private native int validateCreateTableSqlImp(long connection, byte[] sqlBytes);
|
||||||
|
|
||||||
|
public long prepareStmt(String sql) throws SQLException {
|
||||||
|
Long stmt = 0L;
|
||||||
|
try {
|
||||||
|
stmt = prepareStmtImp(sql.getBytes(), this.taos);
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_ENCODING);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stmt == TSDBConstants.JNI_CONNECTION_NULL) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stmt == TSDBConstants.JNI_SQL_NULL) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_SQL_NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stmt == TSDBConstants.JNI_OUT_OF_MEMORY) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
return stmt;
|
||||||
|
}
|
||||||
|
|
||||||
|
private native long prepareStmtImp(byte[] sql, long con);
|
||||||
|
|
||||||
|
public void setBindTableName(long stmt, String tableName) throws SQLException {
|
||||||
|
int code = setBindTableNameImp(stmt, tableName, this.taos);
|
||||||
|
if (code != TSDBConstants.JNI_SUCCESS) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to set table name");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private native int setBindTableNameImp(long stmt, String name, long conn);
|
||||||
|
|
||||||
|
public void bindColumnDataArray(long stmt, ByteBuffer colDataList, ByteBuffer lengthList, ByteBuffer isNullList, int type, int bytes, int numOfRows,int columnIndex) throws SQLException {
|
||||||
|
int code = bindColDataImp(stmt, colDataList.array(), lengthList.array(), isNullList.array(), type, bytes, numOfRows, columnIndex, this.taos);
|
||||||
|
if (code != TSDBConstants.JNI_SUCCESS) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind column data");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private native int bindColDataImp(long stmt, byte[] colDataList, byte[] lengthList, byte[] isNullList, int type, int bytes, int numOfRows, int columnIndex, long conn);
|
||||||
|
|
||||||
|
public void executeBatch(long stmt) throws SQLException {
|
||||||
|
int code = executeBatchImp(stmt, this.taos);
|
||||||
|
if (code != TSDBConstants.JNI_SUCCESS) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to execute batch bind");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private native int executeBatchImp(long stmt, long con);
|
||||||
|
|
||||||
|
public void closeBatch(long stmt) throws SQLException {
|
||||||
|
int code = closeStmt(stmt, this.taos);
|
||||||
|
if (code != TSDBConstants.JNI_SUCCESS) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to close batch bind");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private native int closeStmt(long stmt, long con);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,33 +18,40 @@ import com.taosdata.jdbc.utils.Utils;
|
||||||
|
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.Reader;
|
import java.io.Reader;
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
import java.sql.*;
|
import java.sql.*;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TDengine only supports a subset of the standard SQL, thus this implemetation of the
|
* TDengine only supports a subset of the standard SQL, thus this implementation of the
|
||||||
* standard JDBC API contains more or less some adjustments customized for certain
|
* standard JDBC API contains more or less some adjustments customized for certain
|
||||||
* compatibility needs.
|
* compatibility needs.
|
||||||
*/
|
*/
|
||||||
public class TSDBPreparedStatement extends TSDBStatement implements PreparedStatement {
|
public class TSDBPreparedStatement extends TSDBStatement implements PreparedStatement {
|
||||||
|
|
||||||
private String rawSql;
|
private String rawSql;
|
||||||
private Object[] parameters;
|
private Object[] parameters;
|
||||||
private boolean isPrepared;
|
private boolean isPrepared;
|
||||||
|
|
||||||
|
private ArrayList<ColumnInfo> colData;
|
||||||
|
private String tableName;
|
||||||
|
private long nativeStmtHandle = 0;
|
||||||
|
|
||||||
private volatile TSDBParameterMetaData parameterMetaData;
|
private volatile TSDBParameterMetaData parameterMetaData;
|
||||||
|
|
||||||
TSDBPreparedStatement(TSDBConnection connection, TSDBJNIConnector connecter, String sql) {
|
TSDBPreparedStatement(TSDBConnection connection, String sql) {
|
||||||
super(connection, connecter);
|
super(connection);
|
||||||
init(sql);
|
init(sql);
|
||||||
|
|
||||||
|
int parameterCnt = 0;
|
||||||
if (sql.contains("?")) {
|
if (sql.contains("?")) {
|
||||||
int parameterCnt = 0;
|
|
||||||
for (int i = 0; i < sql.length(); i++) {
|
for (int i = 0; i < sql.length(); i++) {
|
||||||
if ('?' == sql.charAt(i)) {
|
if ('?' == sql.charAt(i)) {
|
||||||
parameterCnt++;
|
parameterCnt++;
|
||||||
|
@ -53,6 +60,12 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
|
||||||
parameters = new Object[parameterCnt];
|
parameters = new Object[parameterCnt];
|
||||||
this.isPrepared = true;
|
this.isPrepared = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (parameterCnt > 1) {
|
||||||
|
// the table name is also a parameter, so ignore it.
|
||||||
|
this.colData = new ArrayList<ColumnInfo>(parameterCnt - 1);
|
||||||
|
this.colData.addAll(Collections.nCopies(parameterCnt - 1, null));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void init(String sql) {
|
private void init(String sql) {
|
||||||
|
@ -260,10 +273,14 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setObject(int parameterIndex, Object x) throws SQLException {
|
public void setObject(int parameterIndex, Object x) throws SQLException {
|
||||||
if (isClosed())
|
if (isClosed()) {
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
||||||
if (parameterIndex < 1 && parameterIndex >= parameters.length)
|
}
|
||||||
|
|
||||||
|
if (parameterIndex < 1 && parameterIndex >= parameters.length) {
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_PARAMETER_INDEX_OUT_RANGE);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_PARAMETER_INDEX_OUT_RANGE);
|
||||||
|
}
|
||||||
|
|
||||||
parameters[parameterIndex - 1] = x;
|
parameters[parameterIndex - 1] = x;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,9 +317,10 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setRef(int parameterIndex, Ref x) throws SQLException {
|
public void setRef(int parameterIndex, Ref x) throws SQLException {
|
||||||
if (isClosed())
|
if (isClosed()) {
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
||||||
|
}
|
||||||
|
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -515,4 +533,276 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////
|
||||||
|
// NOTE: the following APIs are not JDBC compatible
|
||||||
|
// set the bind table name
|
||||||
|
private static class ColumnInfo {
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
private ArrayList data;
|
||||||
|
private int type;
|
||||||
|
private int bytes;
|
||||||
|
private boolean typeIsSet;
|
||||||
|
|
||||||
|
public ColumnInfo() {
|
||||||
|
this.typeIsSet = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setType(int type) throws SQLException {
|
||||||
|
if (this.isTypeSet()) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data type has been set");
|
||||||
|
}
|
||||||
|
|
||||||
|
this.typeIsSet = true;
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTypeSet() {
|
||||||
|
return this.typeIsSet;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public void setTableName(String name) {
|
||||||
|
this.tableName = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public <T> void setValueImpl(int columnIndex, ArrayList<T> list, int type, int bytes) throws SQLException {
|
||||||
|
ColumnInfo col = (ColumnInfo) this.colData.get(columnIndex);
|
||||||
|
if (col == null) {
|
||||||
|
ColumnInfo p = new ColumnInfo();
|
||||||
|
p.setType(type);
|
||||||
|
p.bytes = bytes;
|
||||||
|
p.data = (ArrayList<?>) list.clone();
|
||||||
|
this.colData.set(columnIndex, p);
|
||||||
|
} else {
|
||||||
|
if (col.type != type) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data type mismatch");
|
||||||
|
}
|
||||||
|
col.data.addAll(list);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setInt(int columnIndex, ArrayList<Integer> list) throws SQLException {
|
||||||
|
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_INT, Integer.BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFloat(int columnIndex, ArrayList<Float> list) throws SQLException {
|
||||||
|
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_FLOAT, Float.BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTimestamp(int columnIndex, ArrayList<Long> list) throws SQLException {
|
||||||
|
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP, Long.BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLong(int columnIndex, ArrayList<Long> list) throws SQLException {
|
||||||
|
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_BIGINT, Long.BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDouble(int columnIndex, ArrayList<Double> list) throws SQLException {
|
||||||
|
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_DOUBLE, Double.BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBoolean(int columnIndex, ArrayList<Boolean> list) throws SQLException {
|
||||||
|
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_BOOL, Byte.BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setByte(int columnIndex, ArrayList<Byte> list) throws SQLException {
|
||||||
|
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_TINYINT, Byte.BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setShort(int columnIndex, ArrayList<Short> list) throws SQLException {
|
||||||
|
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_SMALLINT, Short.BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setString(int columnIndex, ArrayList<String> list, int size) throws SQLException {
|
||||||
|
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_BINARY, size);
|
||||||
|
}
|
||||||
|
|
||||||
|
// note: expand the required space for each NChar character
|
||||||
|
public void setNString(int columnIndex, ArrayList<String> list, int size) throws SQLException {
|
||||||
|
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_NCHAR, size * Integer.BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void columnDataAddBatch() throws SQLException {
|
||||||
|
// pass the data block to native code
|
||||||
|
if (rawSql == null) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "sql statement not set yet");
|
||||||
|
}
|
||||||
|
|
||||||
|
// table name is not set yet, abort
|
||||||
|
if (this.tableName == null) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "table name not set yet");
|
||||||
|
}
|
||||||
|
|
||||||
|
int numOfCols = this.colData.size();
|
||||||
|
if (numOfCols == 0) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data not bind");
|
||||||
|
}
|
||||||
|
|
||||||
|
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
|
||||||
|
this.nativeStmtHandle = connector.prepareStmt(rawSql);
|
||||||
|
connector.setBindTableName(this.nativeStmtHandle, this.tableName);
|
||||||
|
|
||||||
|
ColumnInfo colInfo = (ColumnInfo) this.colData.get(0);
|
||||||
|
if (colInfo == null) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data not bind");
|
||||||
|
}
|
||||||
|
|
||||||
|
int rows = colInfo.data.size();
|
||||||
|
for (int i = 0; i < numOfCols; ++i) {
|
||||||
|
ColumnInfo col1 = this.colData.get(i);
|
||||||
|
if (col1 == null || !col1.isTypeSet()) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data not bind");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rows != col1.data.size()) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "the rows in column data not identical");
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer colDataList = ByteBuffer.allocate(rows * col1.bytes);
|
||||||
|
colDataList.order(ByteOrder.LITTLE_ENDIAN);
|
||||||
|
|
||||||
|
ByteBuffer lengthList = ByteBuffer.allocate(rows * Integer.BYTES);
|
||||||
|
lengthList.order(ByteOrder.LITTLE_ENDIAN);
|
||||||
|
|
||||||
|
ByteBuffer isNullList = ByteBuffer.allocate(rows * Byte.BYTES);
|
||||||
|
isNullList.order(ByteOrder.LITTLE_ENDIAN);
|
||||||
|
|
||||||
|
switch (col1.type) {
|
||||||
|
case TSDBConstants.TSDB_DATA_TYPE_INT: {
|
||||||
|
for (int j = 0; j < rows; ++j) {
|
||||||
|
Integer val = (Integer) col1.data.get(j);
|
||||||
|
colDataList.putInt(val == null? Integer.MIN_VALUE:val);
|
||||||
|
isNullList.put((byte) (val == null? 1:0));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDBConstants.TSDB_DATA_TYPE_TINYINT: {
|
||||||
|
for (int j = 0; j < rows; ++j) {
|
||||||
|
Byte val = (Byte) col1.data.get(j);
|
||||||
|
colDataList.put(val == null? 0:val);
|
||||||
|
isNullList.put((byte) (val == null? 1:0));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDBConstants.TSDB_DATA_TYPE_BOOL: {
|
||||||
|
for (int j = 0; j < rows; ++j) {
|
||||||
|
Boolean val = (Boolean) col1.data.get(j);
|
||||||
|
if (val == null) {
|
||||||
|
colDataList.put((byte) 0);
|
||||||
|
} else {
|
||||||
|
colDataList.put((byte) (val? 1:0));
|
||||||
|
}
|
||||||
|
|
||||||
|
isNullList.put((byte) (val == null? 1:0));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT: {
|
||||||
|
for (int j = 0; j < rows; ++j) {
|
||||||
|
Short val = (Short) col1.data.get(j);
|
||||||
|
colDataList.putShort(val == null? 0:val);
|
||||||
|
isNullList.put((byte) (val == null? 1:0));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
|
case TSDBConstants.TSDB_DATA_TYPE_BIGINT: {
|
||||||
|
for (int j = 0; j < rows; ++j) {
|
||||||
|
Long val = (Long) col1.data.get(j);
|
||||||
|
colDataList.putLong(val == null? 0:val);
|
||||||
|
isNullList.put((byte) (val == null? 1:0));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDBConstants.TSDB_DATA_TYPE_FLOAT: {
|
||||||
|
for (int j = 0; j < rows; ++j) {
|
||||||
|
Float val = (Float) col1.data.get(j);
|
||||||
|
colDataList.putFloat(val == null? 0:val);
|
||||||
|
isNullList.put((byte) (val == null? 1:0));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
|
||||||
|
for (int j = 0; j < rows; ++j) {
|
||||||
|
Double val = (Double) col1.data.get(j);
|
||||||
|
colDataList.putDouble(val == null? 0:val);
|
||||||
|
isNullList.put((byte) (val == null? 1:0));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDBConstants.TSDB_DATA_TYPE_NCHAR:
|
||||||
|
case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
|
||||||
|
String charset = TaosGlobalConfig.getCharset();
|
||||||
|
for (int j = 0; j < rows; ++j) {
|
||||||
|
String val = (String) col1.data.get(j);
|
||||||
|
|
||||||
|
colDataList.position(j * col1.bytes); // seek to the correct position
|
||||||
|
if (val != null) {
|
||||||
|
byte[] b = null;
|
||||||
|
try {
|
||||||
|
if (col1.type == TSDBConstants.TSDB_DATA_TYPE_BINARY) {
|
||||||
|
b = val.getBytes();
|
||||||
|
} else {
|
||||||
|
b = val.getBytes(charset);
|
||||||
|
}
|
||||||
|
} catch (UnsupportedEncodingException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (val.length() > col1.bytes) {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "string data too long");
|
||||||
|
}
|
||||||
|
|
||||||
|
colDataList.put(b);
|
||||||
|
lengthList.putInt(b.length);
|
||||||
|
isNullList.put((byte) 0);
|
||||||
|
} else {
|
||||||
|
lengthList.putInt(0);
|
||||||
|
isNullList.put((byte) 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDBConstants.TSDB_DATA_TYPE_UTINYINT:
|
||||||
|
case TSDBConstants.TSDB_DATA_TYPE_USMALLINT:
|
||||||
|
case TSDBConstants.TSDB_DATA_TYPE_UINT:
|
||||||
|
case TSDBConstants.TSDB_DATA_TYPE_UBIGINT: {
|
||||||
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "not support data types");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
connector.bindColumnDataArray(this.nativeStmtHandle, colDataList, lengthList, isNullList, col1.type, col1.bytes, rows, i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void columnDataExecuteBatch() throws SQLException {
|
||||||
|
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
|
||||||
|
connector.executeBatch(this.nativeStmtHandle);
|
||||||
|
this.columnDataClearBatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void columnDataClearBatch() {
|
||||||
|
int size = this.colData.size();
|
||||||
|
this.colData.clear();
|
||||||
|
|
||||||
|
this.colData.addAll(Collections.nCopies(size, null));
|
||||||
|
this.tableName = null; // clear the table name
|
||||||
|
}
|
||||||
|
|
||||||
|
public void columnDataCloseBatch() throws SQLException {
|
||||||
|
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
|
||||||
|
connector.closeBatch(this.nativeStmtHandle);
|
||||||
|
|
||||||
|
this.nativeStmtHandle = 0L;
|
||||||
|
this.tableName = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,8 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.taosdata.jdbc.utils.NullType;
|
||||||
|
|
||||||
public class TSDBResultSetBlockData {
|
public class TSDBResultSetBlockData {
|
||||||
private int numOfRows = 0;
|
private int numOfRows = 0;
|
||||||
private int rowIndex = 0;
|
private int rowIndex = 0;
|
||||||
|
@ -164,59 +166,7 @@ public class TSDBResultSetBlockData {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class NullType {
|
|
||||||
private static final byte NULL_BOOL_VAL = 0x2;
|
|
||||||
private static final String NULL_STR = "null";
|
|
||||||
|
|
||||||
public String toString() {
|
|
||||||
return NullType.NULL_STR;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static boolean isBooleanNull(byte val) {
|
|
||||||
return val == NullType.NULL_BOOL_VAL;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isTinyIntNull(byte val) {
|
|
||||||
return val == Byte.MIN_VALUE;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isSmallIntNull(short val) {
|
|
||||||
return val == Short.MIN_VALUE;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isIntNull(int val) {
|
|
||||||
return val == Integer.MIN_VALUE;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isBigIntNull(long val) {
|
|
||||||
return val == Long.MIN_VALUE;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isFloatNull(float val) {
|
|
||||||
return Float.isNaN(val);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isDoubleNull(double val) {
|
|
||||||
return Double.isNaN(val);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isBinaryNull(byte[] val, int length) {
|
|
||||||
if (length != Byte.BYTES) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return val[0] == 0xFF;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isNcharNull(byte[] val, int length) {
|
|
||||||
if (length != Integer.BYTES) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return (val[0] & val[1] & val[2] & val[3]) == 0xFF;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The original type may not be a string type, but will be converted to by
|
* The original type may not be a string type, but will be converted to by
|
||||||
|
@ -488,8 +438,8 @@ public class TSDBResultSetBlockData {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
String ss = TaosGlobalConfig.getCharset();
|
String charset = TaosGlobalConfig.getCharset();
|
||||||
return new String(dest, ss);
|
return new String(dest, charset);
|
||||||
} catch (UnsupportedEncodingException e) {
|
} catch (UnsupportedEncodingException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,7 +84,8 @@ public class TSDBResultSetRowData {
|
||||||
data.set(col, value);
|
data.set(col, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getInt(int col, int srcType) throws SQLException {
|
@SuppressWarnings("deprecation")
|
||||||
|
public int getInt(int col, int srcType) throws SQLException {
|
||||||
Object obj = data.get(col);
|
Object obj = data.get(col);
|
||||||
|
|
||||||
switch (srcType) {
|
switch (srcType) {
|
||||||
|
@ -128,7 +129,7 @@ public class TSDBResultSetRowData {
|
||||||
long value = (long) obj;
|
long value = (long) obj;
|
||||||
if (value < 0)
|
if (value < 0)
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_NUMERIC_VALUE_OUT_OF_RANGE);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_NUMERIC_VALUE_OUT_OF_RANGE);
|
||||||
return new Long(value).intValue();
|
return Long.valueOf(value).intValue();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,8 +19,6 @@ import java.sql.ResultSet;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
|
||||||
public class TSDBStatement extends AbstractStatement {
|
public class TSDBStatement extends AbstractStatement {
|
||||||
|
|
||||||
private TSDBJNIConnector connector;
|
|
||||||
/**
|
/**
|
||||||
* Status of current statement
|
* Status of current statement
|
||||||
*/
|
*/
|
||||||
|
@ -29,29 +27,26 @@ public class TSDBStatement extends AbstractStatement {
|
||||||
private TSDBConnection connection;
|
private TSDBConnection connection;
|
||||||
private TSDBResultSet resultSet;
|
private TSDBResultSet resultSet;
|
||||||
|
|
||||||
public void setConnection(TSDBConnection connection) {
|
TSDBStatement(TSDBConnection connection) {
|
||||||
this.connection = connection;
|
this.connection = connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBStatement(TSDBConnection connection, TSDBJNIConnector connector) {
|
|
||||||
this.connection = connection;
|
|
||||||
this.connector = connector;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ResultSet executeQuery(String sql) throws SQLException {
|
public ResultSet executeQuery(String sql) throws SQLException {
|
||||||
// check if closed
|
// check if closed
|
||||||
if (isClosed())
|
if (isClosed()) {
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
||||||
|
}
|
||||||
|
|
||||||
//TODO: 如果在executeQuery方法中执行insert语句,那么先执行了SQL,再通过pSql来检查是否为一个insert语句,但这个insert SQL已经执行成功了
|
//TODO: 如果在executeQuery方法中执行insert语句,那么先执行了SQL,再通过pSql来检查是否为一个insert语句,但这个insert SQL已经执行成功了
|
||||||
|
|
||||||
// execute query
|
// execute query
|
||||||
long pSql = this.connector.executeQuery(sql);
|
long pSql = this.connection.getConnector().executeQuery(sql);
|
||||||
// if pSql is create/insert/update/delete/alter SQL
|
// if pSql is create/insert/update/delete/alter SQL
|
||||||
if (this.connector.isUpdateQuery(pSql)) {
|
if (this.connection.getConnector().isUpdateQuery(pSql)) {
|
||||||
this.connector.freeResultSet(pSql);
|
this.connection.getConnector().freeResultSet(pSql);
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_WITH_EXECUTEQUERY);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_WITH_EXECUTEQUERY);
|
||||||
}
|
}
|
||||||
TSDBResultSet res = new TSDBResultSet(this, this.connector, pSql);
|
TSDBResultSet res = new TSDBResultSet(this, this.connection.getConnector(), pSql);
|
||||||
res.setBatchFetch(this.connection.getBatchFetch());
|
res.setBatchFetch(this.connection.getBatchFetch());
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
@ -60,14 +55,14 @@ public class TSDBStatement extends AbstractStatement {
|
||||||
if (isClosed())
|
if (isClosed())
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
||||||
|
|
||||||
long pSql = this.connector.executeQuery(sql);
|
long pSql = this.connection.getConnector().executeQuery(sql);
|
||||||
// if pSql is create/insert/update/delete/alter SQL
|
// if pSql is create/insert/update/delete/alter SQL
|
||||||
if (!this.connector.isUpdateQuery(pSql)) {
|
if (!this.connection.getConnector().isUpdateQuery(pSql)) {
|
||||||
this.connector.freeResultSet(pSql);
|
this.connection.getConnector().freeResultSet(pSql);
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_WITH_EXECUTEUPDATE);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_WITH_EXECUTEUPDATE);
|
||||||
}
|
}
|
||||||
int affectedRows = this.connector.getAffectedRows(pSql);
|
int affectedRows = this.connection.getConnector().getAffectedRows(pSql);
|
||||||
this.connector.freeResultSet(pSql);
|
this.connection.getConnector().freeResultSet(pSql);
|
||||||
return affectedRows;
|
return affectedRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,30 +76,29 @@ public class TSDBStatement extends AbstractStatement {
|
||||||
|
|
||||||
public boolean execute(String sql) throws SQLException {
|
public boolean execute(String sql) throws SQLException {
|
||||||
// check if closed
|
// check if closed
|
||||||
if (isClosed())
|
if (isClosed()) {
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
||||||
|
}
|
||||||
|
|
||||||
// execute query
|
// execute query
|
||||||
long pSql = this.connector.executeQuery(sql);
|
long pSql = this.connection.getConnector().executeQuery(sql);
|
||||||
// if pSql is create/insert/update/delete/alter SQL
|
// if pSql is create/insert/update/delete/alter SQL
|
||||||
if (this.connector.isUpdateQuery(pSql)) {
|
if (this.connection.getConnector().isUpdateQuery(pSql)) {
|
||||||
this.affectedRows = this.connector.getAffectedRows(pSql);
|
this.affectedRows = this.connection.getConnector().getAffectedRows(pSql);
|
||||||
this.connector.freeResultSet(pSql);
|
this.connection.getConnector().freeResultSet(pSql);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.resultSet = new TSDBResultSet(this, this.connector, pSql);
|
this.resultSet = new TSDBResultSet(this, this.connection.getConnector(), pSql);
|
||||||
this.resultSet.setBatchFetch(this.connection.getBatchFetch());
|
this.resultSet.setBatchFetch(this.connection.getBatchFetch());
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ResultSet getResultSet() throws SQLException {
|
public ResultSet getResultSet() throws SQLException {
|
||||||
if (isClosed())
|
if (isClosed()) {
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
||||||
// long resultSetPointer = connector.getResultSet();
|
}
|
||||||
// TSDBResultSet resSet = null;
|
|
||||||
// if (resultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
|
|
||||||
// resSet = new TSDBResultSet(connector, resultSetPointer);
|
|
||||||
// }
|
|
||||||
return this.resultSet;
|
return this.resultSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,12 +109,20 @@ public class TSDBStatement extends AbstractStatement {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Connection getConnection() throws SQLException {
|
public Connection getConnection() throws SQLException {
|
||||||
if (isClosed())
|
if (isClosed()) {
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
|
||||||
if (this.connector == null)
|
}
|
||||||
|
|
||||||
|
if (this.connection.getConnector() == null) {
|
||||||
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL);
|
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL);
|
||||||
|
}
|
||||||
|
|
||||||
return this.connection;
|
return this.connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setConnection(TSDBConnection connection) {
|
||||||
|
this.connection = connection;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isClosed() throws SQLException {
|
public boolean isClosed() throws SQLException {
|
||||||
return isClosed;
|
return isClosed;
|
||||||
|
|
|
@ -1,26 +1,15 @@
|
||||||
package com.taosdata.jdbc.rs;
|
package com.taosdata.jdbc.rs;
|
||||||
|
|
||||||
import com.google.common.collect.Range;
|
|
||||||
import com.google.common.collect.RangeSet;
|
|
||||||
import com.google.common.collect.TreeRangeSet;
|
|
||||||
import com.taosdata.jdbc.TSDBError;
|
import com.taosdata.jdbc.TSDBError;
|
||||||
import com.taosdata.jdbc.TSDBErrorNumbers;
|
import com.taosdata.jdbc.TSDBErrorNumbers;
|
||||||
import com.taosdata.jdbc.utils.SqlSyntaxValidator;
|
|
||||||
import com.taosdata.jdbc.utils.Utils;
|
import com.taosdata.jdbc.utils.Utils;
|
||||||
|
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.Reader;
|
import java.io.Reader;
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.charset.Charset;
|
|
||||||
import java.sql.*;
|
import java.sql.*;
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.regex.Matcher;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import java.util.stream.IntStream;
|
|
||||||
|
|
||||||
public class RestfulPreparedStatement extends RestfulStatement implements PreparedStatement {
|
public class RestfulPreparedStatement extends RestfulStatement implements PreparedStatement {
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
package com.taosdata.jdbc.utils;
|
||||||
|
|
||||||
|
public class NullType {
|
||||||
|
private static final byte NULL_BOOL_VAL = 0x2;
|
||||||
|
private static final String NULL_STR = "null";
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return NullType.NULL_STR;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isBooleanNull(byte val) {
|
||||||
|
return val == NullType.NULL_BOOL_VAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isTinyIntNull(byte val) {
|
||||||
|
return val == Byte.MIN_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isSmallIntNull(short val) {
|
||||||
|
return val == Short.MIN_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isIntNull(int val) {
|
||||||
|
return val == Integer.MIN_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isBigIntNull(long val) {
|
||||||
|
return val == Long.MIN_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isFloatNull(float val) {
|
||||||
|
return Float.isNaN(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isDoubleNull(double val) {
|
||||||
|
return Double.isNaN(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isBinaryNull(byte[] val, int length) {
|
||||||
|
if (length != Byte.BYTES) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return val[0] == 0xFF;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isNcharNull(byte[] val, int length) {
|
||||||
|
if (length != Integer.BYTES) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (val[0] & val[1] & val[2] & val[3]) == 0xFF;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static byte getBooleanNull() {
|
||||||
|
return NullType.NULL_BOOL_VAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static byte getTinyintNull() {
|
||||||
|
return Byte.MIN_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int getIntNull() {
|
||||||
|
return Integer.MIN_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static short getSmallIntNull() {
|
||||||
|
return Short.MIN_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long getBigIntNull() {
|
||||||
|
return Long.MIN_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int getFloatNull() {
|
||||||
|
return 0x7FF00000;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long getDoubleNull() {
|
||||||
|
return 0x7FFFFF0000000000L;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static byte getBinaryNull() {
|
||||||
|
return (byte) 0xFF;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static byte[] getNcharNull() {
|
||||||
|
return new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF};
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -3,7 +3,6 @@ package com.taosdata.jdbc;
|
||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
import com.google.common.primitives.Longs;
|
import com.google.common.primitives.Longs;
|
||||||
import com.google.common.primitives.Shorts;
|
import com.google.common.primitives.Shorts;
|
||||||
import com.taosdata.jdbc.rs.RestfulResultSet;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -177,7 +176,8 @@ public class TSDBResultSetTest {
|
||||||
rs.getAsciiStream("f1");
|
rs.getAsciiStream("f1");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = SQLFeatureNotSupportedException.class)
|
@SuppressWarnings("deprecation")
|
||||||
|
@Test(expected = SQLFeatureNotSupportedException.class)
|
||||||
public void getUnicodeStream() throws SQLException {
|
public void getUnicodeStream() throws SQLException {
|
||||||
rs.getUnicodeStream("f1");
|
rs.getUnicodeStream("f1");
|
||||||
}
|
}
|
||||||
|
@ -326,7 +326,7 @@ public class TSDBResultSetTest {
|
||||||
|
|
||||||
@Test(expected = SQLFeatureNotSupportedException.class)
|
@Test(expected = SQLFeatureNotSupportedException.class)
|
||||||
public void getRow() throws SQLException {
|
public void getRow() throws SQLException {
|
||||||
int row = rs.getRow();
|
rs.getRow();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = SQLFeatureNotSupportedException.class)
|
@Test(expected = SQLFeatureNotSupportedException.class)
|
||||||
|
@ -405,12 +405,12 @@ public class TSDBResultSetTest {
|
||||||
|
|
||||||
@Test(expected = SQLFeatureNotSupportedException.class)
|
@Test(expected = SQLFeatureNotSupportedException.class)
|
||||||
public void updateByte() throws SQLException {
|
public void updateByte() throws SQLException {
|
||||||
rs.updateByte(1, new Byte("0"));
|
rs.updateByte(1, (byte) 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = SQLFeatureNotSupportedException.class)
|
@Test(expected = SQLFeatureNotSupportedException.class)
|
||||||
public void updateShort() throws SQLException {
|
public void updateShort() throws SQLException {
|
||||||
rs.updateShort(1, new Short("0"));
|
rs.updateShort(1, (short) 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = SQLFeatureNotSupportedException.class)
|
@Test(expected = SQLFeatureNotSupportedException.class)
|
||||||
|
|
|
@ -82,6 +82,7 @@ typedef struct TAOS_BIND {
|
||||||
uintptr_t buffer_length; // unused
|
uintptr_t buffer_length; // unused
|
||||||
uintptr_t *length;
|
uintptr_t *length;
|
||||||
int * is_null;
|
int * is_null;
|
||||||
|
|
||||||
int is_unsigned; // unused
|
int is_unsigned; // unused
|
||||||
int * error; // unused
|
int * error; // unused
|
||||||
union {
|
union {
|
||||||
|
@ -99,12 +100,25 @@ typedef struct TAOS_BIND {
|
||||||
unsigned int allocated;
|
unsigned int allocated;
|
||||||
} TAOS_BIND;
|
} TAOS_BIND;
|
||||||
|
|
||||||
|
typedef struct TAOS_MULTI_BIND {
|
||||||
|
int buffer_type;
|
||||||
|
void *buffer;
|
||||||
|
uintptr_t buffer_length;
|
||||||
|
int32_t *length;
|
||||||
|
char *is_null;
|
||||||
|
int num;
|
||||||
|
} TAOS_MULTI_BIND;
|
||||||
|
|
||||||
|
|
||||||
TAOS_STMT *taos_stmt_init(TAOS *taos);
|
TAOS_STMT *taos_stmt_init(TAOS *taos);
|
||||||
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
|
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
|
||||||
|
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name);
|
||||||
int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
|
int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
|
||||||
int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
|
int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
|
||||||
int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
|
int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
|
||||||
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind);
|
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind);
|
||||||
|
int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind);
|
||||||
|
int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int colIdx);
|
||||||
int taos_stmt_add_batch(TAOS_STMT *stmt);
|
int taos_stmt_add_batch(TAOS_STMT *stmt);
|
||||||
int taos_stmt_execute(TAOS_STMT *stmt);
|
int taos_stmt_execute(TAOS_STMT *stmt);
|
||||||
TAOS_RES * taos_stmt_use_result(TAOS_STMT *stmt);
|
TAOS_RES * taos_stmt_use_result(TAOS_STMT *stmt);
|
||||||
|
|
|
@ -560,6 +560,28 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SStrToken tscReplaceStrToken(char **str, SStrToken *token, const char* newToken) {
|
||||||
|
char *src = *str;
|
||||||
|
size_t nsize = strlen(newToken);
|
||||||
|
int32_t size = (int32_t)strlen(*str) - token->n + (int32_t)nsize + 1;
|
||||||
|
int32_t bsize = (int32_t)((uint64_t)token->z - (uint64_t)src);
|
||||||
|
SStrToken ntoken;
|
||||||
|
|
||||||
|
*str = calloc(1, size);
|
||||||
|
|
||||||
|
strncpy(*str, src, bsize);
|
||||||
|
strcat(*str, newToken);
|
||||||
|
strcat(*str, token->z + token->n);
|
||||||
|
|
||||||
|
ntoken.n = (uint32_t)nsize;
|
||||||
|
ntoken.z = *str + bsize;
|
||||||
|
|
||||||
|
tfree(src);
|
||||||
|
|
||||||
|
return ntoken;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) {
|
SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) {
|
||||||
SStrToken t0 = {0};
|
SStrToken t0 = {0};
|
||||||
|
|
||||||
|
|
|
@ -182,6 +182,9 @@ static FORCE_INLINE int32_t tGetNumericStringType(const SStrToken* pToken) {
|
||||||
|
|
||||||
void taosCleanupKeywordsTable();
|
void taosCleanupKeywordsTable();
|
||||||
|
|
||||||
|
SStrToken tscReplaceStrToken(char **str, SStrToken *token, const char* newToken);
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -22,6 +22,7 @@ clean:
|
||||||
rm $(ROOT)asyncdemo
|
rm $(ROOT)asyncdemo
|
||||||
rm $(ROOT)demo
|
rm $(ROOT)demo
|
||||||
rm $(ROOT)prepare
|
rm $(ROOT)prepare
|
||||||
|
rm $(ROOT)batchprepare
|
||||||
rm $(ROOT)stream
|
rm $(ROOT)stream
|
||||||
rm $(ROOT)subscribe
|
rm $(ROOT)subscribe
|
||||||
rm $(ROOT)apitest
|
rm $(ROOT)apitest
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,17 @@
|
||||||
|
# Copyright (c) 2017 by TAOS Technologies, Inc.
|
||||||
|
# todo: library dependency, header file dependency
|
||||||
|
|
||||||
|
ROOT=./
|
||||||
|
TARGET=exe
|
||||||
|
LFLAGS = '-Wl,-rpath,/usr/local/taos/driver/' -ltaos -lpthread -lm -lrt
|
||||||
|
CFLAGS = -O0 -g -Wall -Wno-deprecated -fPIC -Wno-unused-result -Wconversion \
|
||||||
|
-Wno-char-subscripts -D_REENTRANT -Wno-format -D_REENTRANT -DLINUX \
|
||||||
|
-Wno-unused-function -D_M_X64 -I/usr/local/taos/include -std=gnu99
|
||||||
|
|
||||||
|
all: $(TARGET)
|
||||||
|
|
||||||
|
exe:
|
||||||
|
gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS)
|
||||||
|
|
||||||
|
clean:
|
||||||
|
rm $(ROOT)batchprepare
|
Loading…
Reference in New Issue