diff --git a/alert/README.md b/alert/README.md
index 547f3a0381..b8b8c92a27 100644
--- a/alert/README.md
+++ b/alert/README.md
@@ -61,7 +61,7 @@ The use of each configuration item is:
* **port**: This is the `http` service port which enables other application to manage rules by `restful API`.
* **database**: rules are stored in a `sqlite` database, this is the path of the database file (if the file does not exist, the alert application creates it automatically).
-* **tdengine**: connection string of `TDEngine` server, note the database name should be put in the `sql` field of a rule in most cases, thus it should NOT be included in the string.
+* **tdengine**: connection string of `TDEngine` server (please refer the documentation of GO connector for the detailed format of this string), note the database name should be put in the `sql` field of a rule in most cases, thus it should NOT be included in the string.
* **log > level**: log level, could be `production` or `debug`.
* **log > path**: log output file path.
* **receivers > alertManager**: the alert application pushes alerts to `AlertManager` at this URL.
diff --git a/alert/README_cn.md b/alert/README_cn.md
index 938b23a584..f659e997e3 100644
--- a/alert/README_cn.md
+++ b/alert/README_cn.md
@@ -58,7 +58,7 @@ $ go build
* **port**:报警监测程序支持使用 `restful API` 对规则进行管理,这个参数用于配置 `http` 服务的侦听端口。
* **database**:报警监测程序将规则保存到了一个 `sqlite` 数据库中,这个参数用于指定数据库文件的路径(不需要提前创建这个文件,如果它不存在,程序会自动创建它)。
-* **tdengine**:`TDEngine` 的连接字符串,一般来说,数据库名应该在报警规则的 `sql` 语句中指定,所以这个字符串中 **不** 应包含数据库名。
+* **tdengine**:`TDEngine` 的连接字符串(这个字符串的详细格式说明请见 GO 连接器的文档),一般来说,数据库名应该在报警规则的 `sql` 语句中指定,所以这个字符串中 **不** 应包含数据库名。
* **log > level**:日志的记录级别,可选 `production` 或 `debug`。
* **log > path**:日志文件的路径。
* **receivers > alertManager**:报警监测程序会将报警推送到 `AlertManager`,在这里指定 `AlertManager` 的接收地址。
diff --git a/alert/app/rule.go b/alert/app/rule.go
index 44596ca26d..236e5bd755 100644
--- a/alert/app/rule.go
+++ b/alert/app/rule.go
@@ -84,6 +84,7 @@ func (alert *Alert) doRefresh(firing bool, rule *Rule) bool {
case firing && (alert.State == AlertStateWaiting):
alert.StartsAt = time.Now()
+ alert.EndsAt = time.Time{}
if rule.For.Nanoseconds() > 0 {
alert.State = AlertStatePending
return false
@@ -95,6 +96,7 @@ func (alert *Alert) doRefresh(firing bool, rule *Rule) bool {
return false
}
alert.StartsAt = alert.StartsAt.Add(rule.For.Duration)
+ alert.EndsAt = time.Time{}
alert.State = AlertStateFiring
case firing && (alert.State == AlertStateFiring):
diff --git a/cmake/install.inc b/cmake/install.inc
index 9bbcc2cf40..746e493a17 100755
--- a/cmake/install.inc
+++ b/cmake/install.inc
@@ -9,7 +9,7 @@ ELSEIF (TD_WINDOWS)
ELSE ()
SET(CMAKE_INSTALL_PREFIX C:/TDengine)
ENDIF ()
-
+
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/go DESTINATION connector)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/nodejs DESTINATION connector)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/python DESTINATION connector)
@@ -20,12 +20,12 @@ ELSEIF (TD_WINDOWS)
INSTALL(FILES ${TD_COMMUNITY_DIR}/src/inc/taoserror.h DESTINATION include)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.lib DESTINATION driver)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.exp DESTINATION driver)
- INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.dll DESTINATION driver)
-
+ INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.dll DESTINATION driver)
+
IF (TD_POWER)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/power.exe DESTINATION .)
- ELSE ()
- INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taos.exe DESTINATION .)
+ ELSE ()
+ INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taos.exe DESTINATION .)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taosdemo.exe DESTINATION .)
ENDIF ()
diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh
index aedfb0a683..ddf7114f08 100755
--- a/packaging/tools/install.sh
+++ b/packaging/tools/install.sh
@@ -312,7 +312,7 @@ function install_data() {
}
function install_connector() {
- ${csudo} cp -rf ${script_dir}/connector/* ${install_main_dir}/connector
+ ${csudo} cp -rf ${script_dir}/connector/ ${install_main_dir}/
}
function install_examples() {
diff --git a/packaging/tools/install_client.sh b/packaging/tools/install_client.sh
index 24586d3390..34a9bfaecb 100755
--- a/packaging/tools/install_client.sh
+++ b/packaging/tools/install_client.sh
@@ -163,7 +163,7 @@ function install_log() {
}
function install_connector() {
- ${csudo} cp -rf ${script_dir}/connector/* ${install_main_dir}/connector
+ ${csudo} cp -rf ${script_dir}/connector/ ${install_main_dir}/
}
function install_examples() {
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index f619edd221..a2600785c3 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -20,6 +20,6 @@ ADD_SUBDIRECTORY(tsdb)
ADD_SUBDIRECTORY(wal)
ADD_SUBDIRECTORY(cq)
ADD_SUBDIRECTORY(dnode)
-ADD_SUBDIRECTORY(connector/odbc)
+#ADD_SUBDIRECTORY(connector/odbc)
ADD_SUBDIRECTORY(connector/jdbc)
diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h
index ce67344b03..2c7c2f51d0 100644
--- a/src/client/inc/tscLocalMerge.h
+++ b/src/client/inc/tscLocalMerge.h
@@ -84,9 +84,9 @@ typedef struct SRetrieveSupport {
} SRetrieveSupport;
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc,
- SColumnModel **pFinalModel, uint32_t nBufferSize);
+ SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSize);
-void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel,
+void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel* pFFModel,
int32_t numOfVnodes);
int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data,
diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h
index 3226f70528..f7832c9818 100644
--- a/src/client/inc/tscSubquery.h
+++ b/src/client/inc/tscSubquery.h
@@ -39,9 +39,9 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
int32_t tscHandleInsertRetry(SSqlObj* pSql);
void tscBuildResFromSubqueries(SSqlObj *pSql);
-TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult);
+TAOS_ROW doSetResultRowData(SSqlObj *pSql);
-char *getArithemicInputSrc(void *param, const char *name, int32_t colId);
+char *getArithmeticInputSrc(void *param, const char *name, int32_t colId);
#ifdef __cplusplus
}
diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h
index 223fb5d226..bde27d2932 100644
--- a/src/client/inc/tscUtil.h
+++ b/src/client/inc/tscUtil.h
@@ -282,6 +282,7 @@ int tscSetMgmtEpSetFromCfg(const char *first, const char *second);
bool tscSetSqlOwner(SSqlObj* pSql);
void tscClearSqlOwner(SSqlObj* pSql);
+int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h
index 6b3d97d6f9..ff36cf0f5a 100644
--- a/src/client/inc/tsclient.h
+++ b/src/client/inc/tsclient.h
@@ -293,32 +293,32 @@ typedef struct SResRec {
} SResRec;
typedef struct {
- int64_t numOfRows; // num of results in current retrieved
- int64_t numOfRowsGroup; // num of results of current group
- int64_t numOfTotal; // num of total results
- int64_t numOfClauseTotal; // num of total result in current subclause
- char * pRsp;
- int32_t rspType;
- int32_t rspLen;
- uint64_t qhandle;
- int64_t uid;
- int64_t useconds;
- int64_t offset; // offset value from vnode during projection query of stable
- int32_t row;
- int16_t numOfCols;
- int16_t precision;
- bool completed;
- int32_t code;
- int32_t numOfGroups;
- SResRec * pGroupRec;
- char * data;
- TAOS_ROW tsrow;
- int32_t* length; // length for each field for current row
- char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
- SColumnIndex * pColumnIndex;
+ int32_t numOfRows; // num of results in current retrieval
+ int64_t numOfRowsGroup; // num of results of current group
+ int64_t numOfTotal; // num of total results
+ int64_t numOfClauseTotal; // num of total result in current subclause
+ char * pRsp;
+ int32_t rspType;
+ int32_t rspLen;
+ uint64_t qhandle;
+ int64_t useconds;
+ int64_t offset; // offset value from vnode during projection query of stable
+ int32_t row;
+ int16_t numOfCols;
+ int16_t precision;
+ bool completed;
+ int32_t code;
+ int32_t numOfGroups;
+ SResRec * pGroupRec;
+ char * data;
+ TAOS_ROW tsrow;
+ TAOS_ROW urow;
+ int32_t* length; // length for each field for current row
+ char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
+ SColumnIndex* pColumnIndex;
+
SArithmeticSupport* pArithSup; // support the arithmetic expression calculation on agg functions
-
- struct SLocalReducer *pLocalReducer;
+ struct SLocalReducer* pLocalReducer;
} SSqlRes;
typedef struct STscObj {
@@ -425,6 +425,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
+void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscResetSqlCmdObj(SSqlCmd *pCmd, bool removeFromCache);
@@ -471,8 +472,9 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
int32_t bytes = pInfo->field.bytes;
char* pData = pRes->data + (int32_t)(offset * pRes->numOfRows + bytes * pRes->row);
+ UNUSED(pData);
- // user defined constant value output columns
+// user defined constant value output columns
if (pInfo->pSqlExpr != NULL && TSDB_COL_IS_UD_COL(pInfo->pSqlExpr->colInfo.flag)) {
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
pData = pInfo->pSqlExpr->param[1].pz;
diff --git a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h
index eaea91d1bf..582bd6bac0 100644
--- a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h
+++ b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h
@@ -129,6 +129,14 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaData
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp
(JNIEnv *, jobject, jlong, jlong, jobject);
+/*
+ * Class: com_taosdata_jdbc_TSDBJNIConnector
+ * Method: fetchBlockImp
+ * Signature: (JJLcom/taosdata/jdbc/TSDBResultSetBlockData;)I
+ */
+JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp
+ (JNIEnv *, jobject, jlong, jlong, jobject);
+
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: closeConnectionImp
diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c
index 4e2272eb05..a8829499a3 100644
--- a/src/client/src/TSDBJNIConnector.c
+++ b/src/client/src/TSDBJNIConnector.c
@@ -17,7 +17,6 @@
#include "taos.h"
#include "tlog.h"
#include "tscUtil.h"
-#include "tsclient.h"
#include "com_taosdata_jdbc_TSDBJNIConnector.h"
@@ -57,6 +56,10 @@ jmethodID g_rowdataSetStringFp;
jmethodID g_rowdataSetTimestampFp;
jmethodID g_rowdataSetByteArrayFp;
+jmethodID g_blockdataSetByteArrayFp;
+jmethodID g_blockdataSetNumOfRowsFp;
+jmethodID g_blockdataSetNumOfColsFp;
+
#define JNI_SUCCESS 0
#define JNI_TDENGINE_ERROR -1
#define JNI_CONNECTION_NULL -2
@@ -66,7 +69,7 @@ jmethodID g_rowdataSetByteArrayFp;
#define JNI_FETCH_END -6
#define JNI_OUT_OF_MEMORY -7
-void jniGetGlobalMethod(JNIEnv *env) {
+static void jniGetGlobalMethod(JNIEnv *env) {
// make sure init function executed once
switch (atomic_val_compare_exchange_32(&__init, 0, 1)) {
case 0:
@@ -114,10 +117,31 @@ void jniGetGlobalMethod(JNIEnv *env) {
g_rowdataSetByteArrayFp = (*env)->GetMethodID(env, g_rowdataClass, "setByteArray", "(I[B)V");
(*env)->DeleteLocalRef(env, rowdataClass);
+ jclass blockdataClass = (*env)->FindClass(env, "com/taosdata/jdbc/TSDBResultSetBlockData");
+ jclass g_blockdataClass = (*env)->NewGlobalRef(env, blockdataClass);
+ g_blockdataSetByteArrayFp = (*env)->GetMethodID(env, g_blockdataClass, "setByteArray", "(II[B)V");
+ g_blockdataSetNumOfRowsFp = (*env)->GetMethodID(env, g_blockdataClass, "setNumOfRows", "(I)V");
+ g_blockdataSetNumOfColsFp = (*env)->GetMethodID(env, g_blockdataClass, "setNumOfCols", "(I)V");
+ (*env)->DeleteLocalRef(env, blockdataClass);
+
atomic_store_32(&__init, 2);
jniDebug("native method register finished");
}
+static int32_t check_for_params(jobject jobj, jlong conn, jlong res) {
+ if ((TAOS*) conn == NULL) {
+ jniError("jobj:%p, connection is closed", jobj);
+ return JNI_CONNECTION_NULL;
+ }
+
+ if ((TAOS_RES *) res == NULL) {
+ jniError("jobj:%p, conn:%p, res is null", jobj, (TAOS*) conn);
+ return JNI_RESULT_SET_NULL;
+ }
+
+ return JNI_SUCCESS;
+}
+
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setAllocModeImp(JNIEnv *env, jobject jobj, jint jMode,
jstring jPath, jboolean jAutoDump) {
if (jPath != NULL) {
@@ -192,39 +216,37 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setOptions(JNIEnv
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEnv *env, jobject jobj, jstring jhost,
jint jport, jstring jdbName, jstring juser,
jstring jpass) {
- jlong ret = 0;
+ jlong ret = 0;
const char *host = NULL;
- const char *dbname = NULL;
const char *user = NULL;
const char *pass = NULL;
+ const char *dbname = NULL;
if (jhost != NULL) {
host = (*env)->GetStringUTFChars(env, jhost, NULL);
}
+
if (jdbName != NULL) {
dbname = (*env)->GetStringUTFChars(env, jdbName, NULL);
}
+
if (juser != NULL) {
user = (*env)->GetStringUTFChars(env, juser, NULL);
}
+
if (jpass != NULL) {
pass = (*env)->GetStringUTFChars(env, jpass, NULL);
}
if (user == NULL) {
- jniDebug("jobj:%p, user is null, use default user %s", jobj, TSDB_DEFAULT_USER);
+ jniDebug("jobj:%p, user not specified, use default user %s", jobj, TSDB_DEFAULT_USER);
}
+
if (pass == NULL) {
- jniDebug("jobj:%p, pass is null, use default password", jobj);
+ jniDebug("jobj:%p, pass not specified, use default password", jobj);
}
- /*
- * set numOfThreadsPerCore = 0
- * means only one thread for client side scheduler
- */
- tsNumOfThreadsPerCore = 0.0;
-
- ret = (jlong)taos_connect((char *)host, (char *)user, (char *)pass, (char *)dbname, (uint16_t)jport);
+ ret = (jlong) taos_connect((char *)host, (char *)user, (char *)pass, (char *)dbname, (uint16_t)jport);
if (ret == 0) {
jniError("jobj:%p, conn:%p, connect to database failed, host=%s, user=%s, dbname=%s, port=%d", jobj, (void *)ret,
(char *)host, (char *)user, (char *)dbname, (int32_t)jport);
@@ -233,10 +255,21 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEn
(char *)host, (char *)user, (char *)dbname, (int32_t)jport);
}
- if (host != NULL) (*env)->ReleaseStringUTFChars(env, jhost, host);
- if (dbname != NULL) (*env)->ReleaseStringUTFChars(env, jdbName, dbname);
- if (user != NULL) (*env)->ReleaseStringUTFChars(env, juser, user);
- if (pass != NULL) (*env)->ReleaseStringUTFChars(env, jpass, pass);
+ if (host != NULL) {
+ (*env)->ReleaseStringUTFChars(env, jhost, host);
+ }
+
+ if (dbname != NULL) {
+ (*env)->ReleaseStringUTFChars(env, jdbName, dbname);
+ }
+
+ if (user != NULL) {
+ (*env)->ReleaseStringUTFChars(env, juser, user);
+ }
+
+ if (pass != NULL) {
+ (*env)->ReleaseStringUTFChars(env, jpass, pass);
+ }
return ret;
}
@@ -245,64 +278,53 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp(
jbyteArray jsql, jlong con) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
- jniError("jobj:%p, connection is already closed", jobj);
+ jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
if (jsql == NULL) {
- jniError("jobj:%p, conn:%p, sql is null", jobj, tscon);
+ jniError("jobj:%p, conn:%p, empty sql string", jobj, tscon);
return JNI_SQL_NULL;
}
jsize len = (*env)->GetArrayLength(env, jsql);
- char *dst = (char *)calloc(1, sizeof(char) * (len + 1));
- if (dst == NULL) {
- jniError("jobj:%p, conn:%p, can not alloc memory", jobj, tscon);
+ char *str = (char *) calloc(1, sizeof(char) * (len + 1));
+ if (str == NULL) {
+ jniError("jobj:%p, conn:%p, alloc memory failed", jobj, tscon);
return JNI_OUT_OF_MEMORY;
}
- (*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)dst);
+ (*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)str);
if ((*env)->ExceptionCheck(env)) {
// todo handle error
}
- jniDebug("jobj:%p, conn:%p, sql:%s", jobj, tscon, dst);
-
- SSqlObj *pSql = taos_query(tscon, dst);
+ SSqlObj *pSql = taos_query(tscon, str);
int32_t code = taos_errno(pSql);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s, msg:%s", jobj, tscon, tstrerror(code), taos_errstr(pSql));
} else {
- int32_t affectRows = 0;
if (pSql->cmd.command == TSDB_SQL_INSERT) {
- affectRows = taos_affected_rows(pSql);
+ int32_t affectRows = taos_affected_rows(pSql);
jniDebug("jobj:%p, conn:%p, code:%s, affect rows:%d", jobj, tscon, tstrerror(code), affectRows);
} else {
jniDebug("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
}
}
- free(dst);
- return (jlong)pSql;
+ free(str);
+ return (jlong) pSql;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrCodeImp(JNIEnv *env, jobject jobj, jlong con, jlong tres) {
- TAOS *tscon = (TAOS *)con;
- if (tscon == NULL) {
- jniError("jobj:%p, connection is closed", jobj);
- return (jint)TSDB_CODE_TSC_INVALID_CONNECTION;
+ int32_t code = check_for_params(jobj, con, tres);
+ if (code != JNI_SUCCESS) {
+ return code;
}
- if ((void *)tres == NULL) {
- jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
- return JNI_RESULT_SET_NULL;
- }
-
- TAOS_RES *pSql = (TAOS_RES *)tres;
-
- return (jint)taos_errno(pSql);
+ return (jint)taos_errno((TAOS_RES*) tres);
}
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(JNIEnv *env, jobject jobj, jlong tres) {
@@ -313,23 +335,16 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(J
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(JNIEnv *env, jobject jobj, jlong con,
jlong tres) {
TAOS *tscon = (TAOS *)con;
- if (tscon == NULL) {
- jniError("jobj:%p, connection is closed", jobj);
- return JNI_CONNECTION_NULL;
- }
-
- if ((void *)tres == NULL) {
- jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
- return JNI_RESULT_SET_NULL;
+ int32_t code = check_for_params(jobj, con, tres);
+ if (code != JNI_SUCCESS) {
+ return code;
}
SSqlObj *pSql = (TAOS_RES *)tres;
- STscObj *pObj = pSql->pTscObj;
-
if (tscIsUpdateQuery(pSql)) {
- jniDebug("jobj:%p, conn:%p, update query, no resultset, %p", jobj, pObj, (void *)tres);
+ jniDebug("jobj:%p, conn:%p, update query, no resultset, %p", jobj, tscon, (void *)tres);
} else {
- jniDebug("jobj:%p, conn:%p, get resultset, %p", jobj, pObj, (void *)tres);
+ jniDebug("jobj:%p, conn:%p, get resultset, %p", jobj, tscon, (void *)tres);
}
return tres;
@@ -337,15 +352,9 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_isUpdateQueryImp(JNIEnv *env, jobject jobj, jlong con,
jlong tres) {
- TAOS *tscon = (TAOS *)con;
- if (tscon == NULL) {
- jniError("jobj:%p, connection is closed", jobj);
- return JNI_CONNECTION_NULL;
- }
-
- if ((void *)tres == NULL) {
- jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
- return JNI_RESULT_SET_NULL;
+ int32_t code = check_for_params(jobj, con, tres);
+ if (code != JNI_SUCCESS) {
+ return code;
}
SSqlObj *pSql = (TAOS_RES *)tres;
@@ -355,37 +364,27 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_isUpdateQueryImp(
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_freeResultSetImp(JNIEnv *env, jobject jobj, jlong con,
jlong res) {
- TAOS *tscon = (TAOS *)con;
- if (tscon == NULL) {
- jniError("jobj:%p, connection is closed", jobj);
- return JNI_CONNECTION_NULL;
- }
-
- if ((void *)res == NULL) {
- jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
- return JNI_RESULT_SET_NULL;
+ int32_t code = check_for_params(jobj, con, res);
+ if (code != JNI_SUCCESS) {
+ return code;
}
taos_free_result((void *)res);
- jniDebug("jobj:%p, conn:%p, free resultset:%p", jobj, tscon, (void *)res);
+ jniDebug("jobj:%p, conn:%p, free resultset:%p", jobj, (TAOS*) con, (void *)res);
+
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsImp(JNIEnv *env, jobject jobj, jlong con,
jlong res) {
TAOS *tscon = (TAOS *)con;
- if (tscon == NULL) {
- jniError("jobj:%p, connection is closed", jobj);
- return JNI_CONNECTION_NULL;
- }
-
- if ((void *)res == NULL) {
- jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
- return JNI_RESULT_SET_NULL;
+ int32_t code = check_for_params(jobj, con, res);
+ if (code != JNI_SUCCESS) {
+ return code;
}
jint ret = taos_affected_rows((SSqlObj *)res);
- jniDebug("jobj:%p, conn:%p, sql:%p, res: %p, affect rows:%d", jobj, tscon, (void *)con, (void *)res, (int32_t)ret);
+ jniDebug("jobj:%p, conn:%p, sql:%p, res: %p, affect rows:%d", jobj, tscon, (TAOS *)con, (TAOS_RES *)res, (int32_t)ret);
return ret;
}
@@ -394,27 +393,20 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaData
jlong con, jlong res,
jobject arrayListObj) {
TAOS *tscon = (TAOS *)con;
- if (tscon == NULL) {
- jniError("jobj:%p, connection is closed", jobj);
- return JNI_CONNECTION_NULL;
+ int32_t code = check_for_params(jobj, con, res);
+ if (code != JNI_SUCCESS) {
+ return code;
}
- TAOS_RES *result = (TAOS_RES *)res;
- if (result == NULL) {
- jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
- return JNI_RESULT_SET_NULL;
- }
-
- TAOS_FIELD *fields = taos_fetch_fields(result);
- int num_fields = taos_num_fields(result);
-
- // jobject arrayListObj = (*env)->NewObject(env, g_arrayListClass, g_arrayListConstructFp, "");
+ TAOS_RES* tres = (TAOS_RES*) res;
+ TAOS_FIELD *fields = taos_fetch_fields(tres);
+ int32_t num_fields = taos_num_fields(tres);
if (num_fields == 0) {
- jniError("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, (void *)res, num_fields);
+ jniError("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, tres, num_fields);
return JNI_NUM_OF_FIELDS_0;
} else {
- jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, (void *)res, num_fields);
+ jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, tres, num_fields);
for (int i = 0; i < num_fields; ++i) {
jobject metadataObj = (*env)->NewObject(env, g_metadataClass, g_metadataConstructFp);
(*env)->SetIntField(env, metadataObj, g_metadataColtypeField, fields[i].type);
@@ -457,21 +449,21 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
}
TAOS_FIELD *fields = taos_fetch_fields(result);
- int num_fields = taos_num_fields(result);
- if (num_fields == 0) {
- jniError("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, (void*)res, num_fields);
+ int32_t numOfFields = taos_num_fields(result);
+ if (numOfFields == 0) {
+ jniError("jobj:%p, conn:%p, resultset:%p, fields size %d", jobj, tscon, (void*)res, numOfFields);
return JNI_NUM_OF_FIELDS_0;
}
TAOS_ROW row = taos_fetch_row(result);
if (row == NULL) {
- int tserrno = taos_errno(result);
- if (tserrno == 0) {
- jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d, fetch row to the end", jobj, tscon, (void*)res, num_fields);
+ int code = taos_errno(result);
+ if (code == TSDB_CODE_SUCCESS) {
+ jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d, fetch row to the end", jobj, tscon, (void*)res, numOfFields);
return JNI_FETCH_END;
} else {
- jniDebug("jobj:%p, conn:%p, interruptted query", jobj, tscon);
+ jniDebug("jobj:%p, conn:%p, interrupted query", jobj, tscon);
return JNI_RESULT_SET_NULL;
}
}
@@ -480,7 +472,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
char tmp[TSDB_MAX_BYTES_PER_ROW] = {0};
- for (int i = 0; i < num_fields; i++) {
+ for (int i = 0; i < numOfFields; i++) {
if (row[i] == NULL) {
continue;
}
@@ -534,6 +526,45 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
return JNI_SUCCESS;
}
+JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp(JNIEnv *env, jobject jobj, jlong con,
+ jlong res, jobject rowobj) {
+ TAOS * tscon = (TAOS *)con;
+ int32_t code = check_for_params(jobj, con, res);
+ if (code != JNI_SUCCESS) {
+ return code;
+ }
+
+ TAOS_RES * tres = (TAOS_RES *)res;
+ TAOS_FIELD *fields = taos_fetch_fields(tres);
+
+ int32_t numOfFields = taos_num_fields(tres);
+ assert(numOfFields > 0);
+
+ TAOS_ROW row = NULL;
+ int32_t numOfRows = taos_fetch_block(tres, &row);
+ if (numOfRows == 0) {
+ code = taos_errno(tres);
+ if (code == JNI_SUCCESS) {
+ jniDebug("jobj:%p, conn:%p, resultset:%p, numOfFields:%d, no data to retrieve", jobj, tscon, (void *)res,
+ numOfFields);
+ return JNI_FETCH_END;
+ } else {
+ jniDebug("jobj:%p, conn:%p, query interrupted", jobj, tscon);
+ return JNI_RESULT_SET_NULL;
+ }
+ }
+
+ (*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfRowsFp, (jint)numOfRows);
+ (*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfColsFp, (jint)numOfFields);
+
+ for (int i = 0; i < numOfFields; i++) {
+ (*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, i, fields[i].bytes * numOfRows,
+ jniFromNCharToByteArray(env, (char *)row[i], fields[i].bytes * numOfRows));
+ }
+
+ return JNI_SUCCESS;
+}
+
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionImp(JNIEnv *env, jobject jobj,
jlong con) {
TAOS *tscon = (TAOS *)con;
@@ -589,7 +620,6 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEn
jniGetGlobalMethod(env);
TAOS_SUB *tsub = (TAOS_SUB *)sub;
-
TAOS_RES *res = taos_consume(tsub);
if (res == NULL) {
@@ -621,16 +651,16 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_validateCreateTab
jsize len = (*env)->GetArrayLength(env, jsql);
- char *dst = (char *)calloc(1, sizeof(char) * (len + 1));
- (*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)dst);
+ char *str = (char *)calloc(1, sizeof(char) * (len + 1));
+ (*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)str);
if ((*env)->ExceptionCheck(env)) {
// todo handle error
}
- int code = taos_validate_sql(tscon, dst);
+ int code = taos_validate_sql(tscon, str);
jniDebug("jobj:%p, conn:%p, code is %d", jobj, tscon, code);
- free(dst);
+ free(str);
return code;
}
diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c
index e9e8214c4c..3ff8a68d8f 100644
--- a/src/client/src/tscAsync.c
+++ b/src/client/src/tscAsync.c
@@ -91,8 +91,8 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
int32_t sqlLen = (int32_t)strlen(sqlstr);
if (sqlLen > tsMaxSQLStringLen) {
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
- terrno = TSDB_CODE_TSC_INVALID_SQL;
- tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_SQL);
+ terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
+ tscQueueAsyncError(fp, param, terrno);
return;
}
diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c
index 35dc94f37b..d39b833374 100644
--- a/src/client/src/tscFunctionImpl.c
+++ b/src/client/src/tscFunctionImpl.c
@@ -130,11 +130,11 @@ typedef struct STopBotInfo {
} STopBotInfo;
// leastsquares do not apply to super table
-typedef struct SLeastsquareInfo {
+typedef struct SLeastsquaresInfo {
double mat[2][3];
double startVal;
int64_t num;
-} SLeastsquareInfo;
+} SLeastsquaresInfo;
typedef struct SAPercentileInfo {
SHistogramInfo *pHisto;
@@ -316,7 +316,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*interBytes = (int16_t)sizeof(SPercentileInfo);
} else if (functionId == TSDB_FUNC_LEASTSQR) {
*type = TSDB_DATA_TYPE_BINARY;
- *bytes = TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE; // string
+ *bytes = MAX(TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE, sizeof(SLeastsquaresInfo)); // string
*interBytes = *bytes;
} else if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST) {
*type = TSDB_DATA_TYPE_BINARY;
@@ -681,7 +681,7 @@ static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, i
}
// no result for first query, data block is required
- if (GET_RES_INFO(pCtx)->numOfRes <= 0) {
+ if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) {
return BLK_DATA_ALL_NEEDED;
} else {
return BLK_DATA_NO_NEEDED;
@@ -693,7 +693,7 @@ static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, in
return BLK_DATA_NO_NEEDED;
}
- if (GET_RES_INFO(pCtx)->numOfRes <= 0) {
+ if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) {
return BLK_DATA_ALL_NEEDED;
} else {
return BLK_DATA_NO_NEEDED;
@@ -2756,7 +2756,7 @@ static bool leastsquares_function_setup(SQLFunctionCtx *pCtx) {
}
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
- SLeastsquareInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
+ SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
// 2*3 matrix
pInfo->startVal = pCtx->param[0].dKey;
@@ -2783,7 +2783,7 @@ static bool leastsquares_function_setup(SQLFunctionCtx *pCtx) {
static void leastsquares_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
- SLeastsquareInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
+ SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
double(*param)[3] = pInfo->mat;
double x = pInfo->startVal;
@@ -2853,40 +2853,40 @@ static void leastsquares_function_f(SQLFunctionCtx *pCtx, int32_t index) {
return;
}
- SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
- SLeastsquareInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
+ SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
+ SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
double(*param)[3] = pInfo->mat;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_INT: {
int32_t *p = pData;
- LEASTSQR_CAL(param, pInfo->startVal, p, index, pCtx->param[1].dKey);
+ LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey);
break;
};
case TSDB_DATA_TYPE_TINYINT: {
int8_t *p = pData;
- LEASTSQR_CAL(param, pInfo->startVal, p, index, pCtx->param[1].dKey);
+ LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey);
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t *p = pData;
- LEASTSQR_CAL(param, pInfo->startVal, p, index, pCtx->param[1].dKey);
+ LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t *p = pData;
- LEASTSQR_CAL(param, pInfo->startVal, p, index, pCtx->param[1].dKey);
+ LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float *p = pData;
- LEASTSQR_CAL(param, pInfo->startVal, p, index, pCtx->param[1].dKey);
+ LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double *p = pData;
- LEASTSQR_CAL(param, pInfo->startVal, p, index, pCtx->param[1].dKey);
+ LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey);
break;
}
default:
@@ -2904,15 +2904,10 @@ static void leastsquares_function_f(SQLFunctionCtx *pCtx, int32_t index) {
static void leastsquares_finalizer(SQLFunctionCtx *pCtx) {
// no data in query
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
- SLeastsquareInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
+ SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pInfo->num == 0) {
- if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
- setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
- } else {
- setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
- }
-
+ setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
return;
}
diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c
index 310c4c6657..538e652f3c 100644
--- a/src/client/src/tscLocal.c
+++ b/src/client/src/tscLocal.c
@@ -341,7 +341,7 @@ TAOS_ROW tscFetchRow(void *param) {
return NULL;
}
- void* data = doSetResultRowData(pSql, true);
+ void* data = doSetResultRowData(pSql);
tscClearSqlOwner(pSql);
return data;
diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c
index b07c7ca66d..3c7d46f914 100644
--- a/src/client/src/tscLocalMerge.c
+++ b/src/client/src/tscLocalMerge.c
@@ -30,8 +30,6 @@ typedef struct SCompareParam {
int32_t groupOrderType;
} SCompareParam;
-static void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize);
-
int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
int32_t pLeftIdx = *(int32_t *)pLeft;
int32_t pRightIdx = *(int32_t *)pRight;
@@ -174,14 +172,14 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
SSqlRes* pRes = &pSql->res;
if (pMemBuffer == NULL) {
- tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
+ tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
tscError("%p pMemBuffer is NULL", pMemBuffer);
pRes->code = TSDB_CODE_TSC_APP_ERROR;
return;
}
if (pDesc->pColumnModel == NULL) {
- tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
+ tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
tscError("%p no local buffer or intermediate result format model", pSql);
pRes->code = TSDB_CODE_TSC_APP_ERROR;
return;
@@ -199,7 +197,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
}
if (numOfFlush == 0 || numOfBuffer == 0) {
- tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
+ tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
tscDebug("%p retrieved no data", pSql);
return;
}
@@ -208,7 +206,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
tscError("%p Invalid value of buffer capacity %d and page size %d ", pSql, pDesc->pColumnModel->capacity,
pMemBuffer[0]->pageSize);
- tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
+ tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
pRes->code = TSDB_CODE_TSC_APP_ERROR;
return;
}
@@ -219,7 +217,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (pReducer == NULL) {
tscError("%p failed to create local merge structure, out of memory", pSql);
- tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
+ tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return;
}
@@ -336,6 +334,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->resColModel = finalmodel;
pReducer->resColModel->capacity = pReducer->nResultBufSize;
+ pReducer->finalModel = pFFModel;
+
assert(pReducer->finalRowSize > 0);
if (pReducer->finalRowSize > 0) {
pReducer->resColModel->capacity /= pReducer->finalRowSize;
@@ -533,7 +533,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
tfree(pLocalReducer->pFinalRes);
tfree(pLocalReducer->discardData);
- tscLocalReducerEnvDestroy(pLocalReducer->pExtMemBuffer, pLocalReducer->pDesc, pLocalReducer->resColModel,
+ tscLocalReducerEnvDestroy(pLocalReducer->pExtMemBuffer, pLocalReducer->pDesc, pLocalReducer->resColModel, pLocalReducer->finalModel,
pLocalReducer->numOfVnode);
for (int32_t i = 0; i < pLocalReducer->numOfBuffer; ++i) {
tfree(pLocalReducer->pLocalDataSrc[i]);
@@ -657,7 +657,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
}
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc,
- SColumnModel **pFinalModel, uint32_t nBufferSizes) {
+ SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSizes) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
@@ -755,6 +755,18 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
*pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity);
+ memset(pSchema, 0, sizeof(SSchema) * size);
+ size = tscNumOfFields(pQueryInfo);
+
+ for(int32_t i = 0; i < size; ++i) {
+ SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
+ pSchema[i].bytes = pField->field.bytes;
+ pSchema[i].type = pField->field.type;
+ tstrncpy(pSchema[i].name, pField->field.name, tListLen(pSchema[i].name));
+ }
+
+ *pFFModel = createColumnModel(pSchema, (int32_t) size, capacity);
+
tfree(pSchema);
return TSDB_CODE_SUCCESS;
}
@@ -765,9 +777,11 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
* @param pFinalModel
* @param numOfVnodes
*/
-void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel,
+void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel *pFFModel,
int32_t numOfVnodes) {
destroyColumnModel(pFinalModel);
+ destroyColumnModel(pFFModel);
+
tOrderDescDestroy(pDesc);
for (int32_t i = 0; i < numOfVnodes; ++i) {
@@ -870,17 +884,17 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer,
tFilePage * pBeforeFillData = pLocalReducer->pResultBuf;
pRes->data = pLocalReducer->pFinalRes;
- pRes->numOfRows = pBeforeFillData->num;
+ pRes->numOfRows = (int32_t) pBeforeFillData->num;
if (pQueryInfo->limit.offset > 0) {
if (pQueryInfo->limit.offset < pRes->numOfRows) {
- int32_t prevSize = (int32_t)pBeforeFillData->num;
- tColModelErase(pLocalReducer->resColModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
+ int32_t prevSize = (int32_t) pBeforeFillData->num;
+ tColModelErase(pLocalReducer->finalModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
/* remove the hole in column model */
- tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize);
+ tColModelCompact(pLocalReducer->finalModel, pBeforeFillData, prevSize);
- pRes->numOfRows -= pQueryInfo->limit.offset;
+ pRes->numOfRows -= (int32_t) pQueryInfo->limit.offset;
pQueryInfo->limit.offset = 0;
} else {
pQueryInfo->limit.offset -= pRes->numOfRows;
@@ -900,7 +914,7 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer,
pRes->numOfRows -= overflow;
pBeforeFillData->num -= overflow;
- tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize);
+ tColModelCompact(pLocalReducer->finalModel, pBeforeFillData, prevSize);
// set remain data to be discarded, and reset the interpolation information
savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
@@ -948,7 +962,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
}
pRes->data = pLocalReducer->pFinalRes;
- pRes->numOfRows = newRows;
+ pRes->numOfRows = (int32_t) newRows;
pQueryInfo->limit.offset = 0;
break;
@@ -1242,7 +1256,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur
tColModelCompact(pModel, pResBuf, pModel->capacity);
if (tscIsSecondStageQuery(pQueryInfo)) {
- doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize);
+ pLocalReducer->finalRowSize = doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize);
}
#ifdef _DEBUG_VIEW
@@ -1612,7 +1626,7 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
pRes->data = pRes->pLocalReducer->pResultBuf->data;
}
-void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) {
+int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) {
char* pbuf = calloc(1, pOutput->num * rowSize);
size_t size = tscNumOfFields(pQueryInfo);
@@ -1637,7 +1651,7 @@ void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t r
// calculate the result from several other columns
if (pSup->pArithExprInfo != NULL) {
arithSup.pArithExpr = pSup->pArithExprInfo;
- tExprTreeCalcTraverse(arithSup.pArithExpr->pExpr, (int32_t) pOutput->num, pbuf + pOutput->num*offset, &arithSup, TSDB_ORDER_ASC, getArithemicInputSrc);
+ tExprTreeCalcTraverse(arithSup.pArithExpr->pExpr, (int32_t) pOutput->num, pbuf + pOutput->num*offset, &arithSup, TSDB_ORDER_ASC, getArithmeticInputSrc);
} else {
SSqlExpr* pExpr = pSup->pSqlExpr;
memcpy(pbuf + pOutput->num * offset, pExpr->offset * pOutput->num + pOutput->data, pExpr->resBytes * pOutput->num);
@@ -1647,8 +1661,10 @@ void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t r
}
assert(finalRowSize <= rowSize);
- memcpy(pOutput->data, pbuf, pOutput->num * finalRowSize);
+ memcpy(pOutput->data, pbuf, pOutput->num * offset);
tfree(pbuf);
tfree(arithSup.data);
+
+ return offset;
}
\ No newline at end of file
diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c
index 1739e4348c..68f2ecbf0e 100644
--- a/src/client/src/tscPrepare.c
+++ b/src/client/src/tscPrepare.c
@@ -268,7 +268,6 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
if (1) {
// allow user bind param data with different type
- short size = 0;
union {
int8_t v1;
int16_t v2;
@@ -600,7 +599,7 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
if ((*bind->length) > (uintptr_t)param->bytes) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
- size = (short)*bind->length;
+ short size = (short)*bind->length;
STR_WITH_SIZE_TO_VARSTR(data + param->offset, bind->buffer, size);
return TSDB_CODE_SUCCESS;
} break;
diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c
index b55326bbd3..15d2647c51 100644
--- a/src/client/src/tscSQLParser.c
+++ b/src/client/src/tscSQLParser.c
@@ -1454,13 +1454,13 @@ static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo) {
}
}
- SColumnIndex index = {0};
// set the constant column value always attached to first table.
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, PRIMARYKEY_TIMESTAMP_COL_INDEX);
// add the timestamp column into the output columns
+ SColumnIndex index = {0}; // primary timestamp column info
int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
tscAddSpecialColumnForSelect(pQueryInfo, numOfCols, TSDB_FUNC_PRJ, &index, pSchema, TSDB_COL_NORMAL);
@@ -2432,6 +2432,8 @@ int32_t getTableIndexImpl(SStrToken* pTableToken, SQueryInfo* pQueryInfo, SColum
if (pTableToken->n == 0) { // only one table and no table name prefix in column name
if (pQueryInfo->numOfTables == 1) {
pIndex->tableIndex = 0;
+ } else {
+ pIndex->tableIndex = COLUMN_INDEX_INITIAL_VAL;
}
return TSDB_CODE_SUCCESS;
@@ -3950,9 +3952,6 @@ static void doExtractExprForSTable(SSqlCmd* pCmd, tSQLExpr** pExpr, SQueryInfo*
return;
}
- SStrToken t = {0};
- extractTableNameFromToken(&pLeft->colInfo, &t);
-
*pOut = *pExpr;
(*pExpr) = NULL;
@@ -4187,7 +4186,7 @@ static void cleanQueryExpr(SCondExpr* pCondExpr) {
static void doAddJoinTagsColumnsIntoTagList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type) && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
- SColumnIndex index = {0};
+ SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, &pCondExpr->pJoinExpr->pLeft->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
tscError("%p: invalid column name (left)", pQueryInfo);
@@ -4604,7 +4603,7 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
}
SStrToken columnName = {pVar->nLen, pVar->nType, pVar->pz};
- SColumnIndex index = {0};
+ SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // super table query
if (getColumnIndexByName(pCmd, &columnName, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
@@ -5509,7 +5508,9 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) {
tscAddSpecialColumnForSelect(pQueryInfo, (int32_t)size, TSDB_FUNC_PRJ, &colIndex, pSchema, TSDB_COL_NORMAL);
- SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, (int32_t)size);
+ int32_t numOfFields = tscNumOfFields(pQueryInfo);
+ SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, numOfFields - 1);
+
doLimitOutputNormalColOfGroupby(pInfo->pSqlExpr);
pInfo->visible = false;
}
@@ -6412,7 +6413,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return code;
}
- tVariantListItem* p1 = taosArrayGet(pQuerySql->from, i);
+ tVariantListItem* p1 = taosArrayGet(pQuerySql->from, i + 1);
if (p1->pVar.nType != TSDB_DATA_TYPE_BINARY) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg11);
}
@@ -6620,7 +6621,7 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS
}
}
} else if (pSqlExpr->nSQLOptr == TK_ID) { // column name, normal column arithmetic expression
- SColumnIndex index = {0};
+ SColumnIndex index = COLUMN_INDEX_INITIALIZER;
int32_t ret = getColumnIndexByName(pCmd, &pSqlExpr->colInfo, pQueryInfo, &index);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c
index e8b6cb284e..cbc5604a27 100644
--- a/src/client/src/tscServer.c
+++ b/src/client/src/tscServer.c
@@ -547,7 +547,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
- int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
+ int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs * 2);
int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0;
@@ -787,8 +787,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
}
- if(tscIsSecondStageQuery(pQueryInfo)) {
- size_t output = tscNumOfFields(pQueryInfo);
+ size_t output = tscNumOfFields(pQueryInfo);
+
+ if (tscIsSecondStageQuery(pQueryInfo)) {
pQueryMsg->secondStageOutput = htonl((int32_t) output);
SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
@@ -1437,19 +1438,6 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS;
}
-static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
- if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
- return pRes->code;
- }
-
- for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
- int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
- pRes->tsrow[i] = (unsigned char*)((char*) pRes->data + offset * pRes->numOfRows);
- }
-
- return 0;
-}
-
/*
* this function can only be called once.
* by using pRes->rspType to denote its status
@@ -1460,15 +1448,18 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
- SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
-
pRes->code = TSDB_CODE_SUCCESS;
if (pRes->rspType == 0) {
pRes->numOfRows = numOfRes;
pRes->row = 0;
pRes->rspType = 1;
- tscSetResultPointer(pQueryInfo, pRes);
+ SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
+ if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
+ return pRes->code;
+ }
+
+ tscSetResRawPtr(pRes, pQueryInfo);
} else {
tscResetForNextRetrieve(pRes);
}
@@ -1512,10 +1503,11 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
}
pRes->code = tscDoLocalMerge(pSql);
- SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
+ SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscCreateResPointerInfo(pRes, pQueryInfo);
+ tscSetResRawPtr(pRes, pQueryInfo);
}
pRes->row = 0;
@@ -2195,7 +2187,16 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
return pRes->code;
}
-
+
+ STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
+ if (pCmd->command == TSDB_SQL_RETRIEVE) {
+ tscSetResRawPtr(pRes, pQueryInfo);
+ } else if ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) {
+ tscSetResRawPtr(pRes, pQueryInfo);
+ } else if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
+ tscSetResRawPtr(pRes, pQueryInfo);
+ }
+
if (pSql->pSubscription != NULL) {
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
@@ -2217,7 +2218,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
}
pRes->row = 0;
- tscDebug("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
+ tscDebug("%p numOfRows:%d, offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
return 0;
}
diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c
index a7b859b294..020305a0a8 100644
--- a/src/client/src/tscSql.c
+++ b/src/client/src/tscSql.c
@@ -321,7 +321,7 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, TAOS_RES
if (sqlLen > (uint32_t)tsMaxSQLStringLen) {
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
- terrno = TSDB_CODE_TSC_INVALID_SQL;
+ terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL;
}
@@ -394,7 +394,7 @@ int taos_affected_rows(TAOS_RES *tres) {
SSqlObj* pSql = (SSqlObj*) tres;
if (pSql == NULL || pSql->signature != pSql) return 0;
- return (int)(pSql->res.numOfRows);
+ return pSql->res.numOfRows;
}
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
@@ -443,50 +443,30 @@ int taos_retrieve(TAOS_RES *res) {
if (pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
- tscProcessSql(pSql);
- return (int)pRes->numOfRows;
+ tscProcessSql(pSql);
+ return pRes->numOfRows;
}
-int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
- SSqlObj *pSql = (SSqlObj *)res;
- SSqlCmd *pCmd = &pSql->cmd;
+static bool needToFetchNewBlock(SSqlObj* pSql) {
SSqlRes *pRes = &pSql->res;
+ SSqlCmd *pCmd = &pSql->cmd;
- if (pRes->qhandle == 0 || pSql->signature != pSql) {
- *rows = NULL;
- return 0;
- }
-
- // Retrieve new block
- tscResetForNextRetrieve(pRes);
- if (pCmd->command < TSDB_SQL_LOCAL) {
- pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
- }
-
- tscProcessSql(pSql);
- if (pRes->numOfRows == 0) {
- *rows = NULL;
- return 0;
- }
-
- // secondary merge has handle this situation
- if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
- pRes->numOfClauseTotal += pRes->numOfRows;
- }
-
- SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
- if (pQueryInfo == NULL)
- return 0;
-
- assert(0);
- for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
- tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i, 0);
- }
-
- *rows = pRes->tsrow;
-
- return (int)((pQueryInfo->order.order == TSDB_ORDER_DESC) ? pRes->numOfRows : -pRes->numOfRows);
+ return (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
+ (pCmd->command == TSDB_SQL_RETRIEVE ||
+ pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
+ pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
+ pCmd->command == TSDB_SQL_FETCH ||
+ pCmd->command == TSDB_SQL_SHOW ||
+ pCmd->command == TSDB_SQL_SHOW_CREATE_TABLE ||
+ pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE ||
+ pCmd->command == TSDB_SQL_SELECT ||
+ pCmd->command == TSDB_SQL_DESCRIBE_TABLE ||
+ pCmd->command == TSDB_SQL_SERV_STATUS ||
+ pCmd->command == TSDB_SQL_CURRENT_DB ||
+ pCmd->command == TSDB_SQL_SERV_VERSION ||
+ pCmd->command == TSDB_SQL_CLI_VERSION ||
+ pCmd->command == TSDB_SQL_CURRENT_USER);
}
TAOS_ROW taos_fetch_row(TAOS_RES *res) {
@@ -509,77 +489,50 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
// set the sql object owner
tscSetSqlOwner(pSql);
- // current data set are exhausted, fetch more data from node
- if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
- (pCmd->command == TSDB_SQL_RETRIEVE ||
- pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
- pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
- pCmd->command == TSDB_SQL_FETCH ||
- pCmd->command == TSDB_SQL_SHOW ||
- pCmd->command == TSDB_SQL_SHOW_CREATE_TABLE ||
- pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE ||
- pCmd->command == TSDB_SQL_SELECT ||
- pCmd->command == TSDB_SQL_DESCRIBE_TABLE ||
- pCmd->command == TSDB_SQL_SERV_STATUS ||
- pCmd->command == TSDB_SQL_CURRENT_DB ||
- pCmd->command == TSDB_SQL_SERV_VERSION ||
- pCmd->command == TSDB_SQL_CLI_VERSION ||
- pCmd->command == TSDB_SQL_CURRENT_USER )) {
+ // current data set are exhausted, fetch more result from node
+ if (pRes->row >= pRes->numOfRows && needToFetchNewBlock(pSql)) {
taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
tsem_wait(&pSql->rspSem);
}
- void* data = doSetResultRowData(pSql, true);
+ void* data = doSetResultRowData(pSql);
tscClearSqlOwner(pSql);
return data;
}
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
-#if 0
SSqlObj *pSql = (SSqlObj *)res;
- SSqlCmd *pCmd = &pSql->cmd;
- SSqlRes *pRes = &pSql->res;
-
- int nRows = 0;
-
if (pSql == NULL || pSql->signature != pSql) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
- *rows = NULL;
return 0;
}
- // projection query on metric, pipeline retrieve data from vnode list,
- // instead of two-stage mergednodeProcessMsgFromShell free qhandle
- nRows = taos_fetch_block_impl(res, rows);
+ SSqlCmd *pCmd = &pSql->cmd;
+ SSqlRes *pRes = &pSql->res;
- // current subclause is completed, try the next subclause
- while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) {
- SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
-
- pSql->cmd.command = pQueryInfo->command;
- pCmd->clauseIndex++;
-
- pRes->numOfTotal += pRes->numOfClauseTotal;
- pRes->numOfClauseTotal = 0;
- pRes->rspType = 0;
-
- pSql->subState.numOfSub = 0;
- tfree(pSql->pSubs);
-
- assert(pSql->fp == NULL);
-
- tscDebug("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause);
- tscProcessSql(pSql);
-
- nRows = taos_fetch_block_impl(res, rows);
+ if (pRes->qhandle == 0 ||
+ pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED ||
+ pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
+ pCmd->command == TSDB_SQL_INSERT) {
+ return 0;
}
- return nRows;
-#endif
+ tscResetForNextRetrieve(pRes);
- (*rows) = taos_fetch_row(res);
- return ((*rows) != NULL)? 1:0;
+ // set the sql object owner
+ tscSetSqlOwner(pSql);
+
+ // current data set are exhausted, fetch more data from node
+ if (needToFetchNewBlock(pSql)) {
+ taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
+ tsem_wait(&pSql->rspSem);
+ }
+
+ *rows = pRes->urow;
+
+ tscClearSqlOwner(pSql);
+ return pRes->numOfRows;
}
int taos_select_db(TAOS *taos, const char *db) {
@@ -600,7 +553,7 @@ int taos_select_db(TAOS *taos, const char *db) {
}
// send free message to vnode to free qhandle and corresponding resources in vnode
-static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) {
+static bool tscKillQueryInDnode(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
@@ -795,6 +748,25 @@ void taos_stop_query(TAOS_RES *res) {
tscDebug("%p query is cancelled", res);
}
+bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
+ SSqlObj *pSql = (SSqlObj *)res;
+ if (pSql == NULL || pSql->signature != pSql) {
+ return true;
+ }
+
+ SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
+ if (pQueryInfo == NULL) {
+ return true;
+ }
+
+ SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, col);
+ if (col < 0 || col >= tscNumOfFields(pQueryInfo) || row < 0 || row > pSql->res.numOfRows) {
+ return true;
+ }
+
+ return isNull(((char*) pSql->res.urow[col]) + row * pInfo->field.bytes, pInfo->field.type);
+}
+
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
int len = 0;
for (int i = 0; i < num_fields; ++i) {
@@ -892,18 +864,16 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
int32_t sqlLen = (int32_t)strlen(sql);
if (sqlLen > tsMaxSQLStringLen) {
tscError("%p sql too long", pSql);
- pRes->code = TSDB_CODE_TSC_INVALID_SQL;
tfree(pSql);
- return pRes->code;
+ return TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
}
pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1);
if (pSql->sqlstr == NULL) {
- pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("%p failed to malloc sql string buffer", pSql);
tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pSql), pObj);
tfree(pSql);
- return pRes->code;
+ return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
strtolower(pSql->sqlstr, sql);
diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c
index bc522d4007..6ebbeeef41 100644
--- a/src/client/src/tscSubquery.c
+++ b/src/client/src/tscSubquery.c
@@ -594,7 +594,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
if (taosArrayGetSize(result) > 0) {
SVgroupTableInfo* prevGroup = taosArrayGet(result, taosArrayGetSize(result) - 1);
- tscDebug("%p vgId:%d, tables:%"PRId64, pSql, prevGroup->vgInfo.vgId, taosArrayGetSize(prevGroup->itemList));
+ tscDebug("%p vgId:%d, tables:%"PRIzu, pSql, prevGroup->vgInfo.vgId, taosArrayGetSize(prevGroup->itemList));
}
taosArrayPush(result, &info);
@@ -612,7 +612,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
if (taosArrayGetSize(result) > 0) {
SVgroupTableInfo* g = taosArrayGet(result, taosArrayGetSize(result) - 1);
- tscDebug("%p vgId:%d, tables:%"PRId64, pSql, g->vgInfo.vgId, taosArrayGetSize(g->itemList));
+ tscDebug("%p vgId:%d, tables:%"PRIzu, pSql, g->vgInfo.vgId, taosArrayGetSize(g->itemList));
}
}
@@ -753,7 +753,7 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
}
#endif
- tscDebug("%p tags match complete, result: %"PRId64", %"PRId64, pParentSql, t1, t2);
+ tscDebug("%p tags match complete, result: %"PRIzu", %"PRIzu, pParentSql, t1, t2);
return TSDB_CODE_SUCCESS;
}
@@ -948,7 +948,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
if (!pRes->completed) {
taosGetTmpfilePath("ts-join", pSupporter->path);
pSupporter->f = fopen(pSupporter->path, "w");
- pRes->row = (int32_t)pRes->numOfRows;
+ pRes->row = pRes->numOfRows;
taos_fetch_rows_a(tres, tsCompRetrieveCallback, param);
return;
@@ -974,7 +974,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// TODO check for failure
pSupporter->f = fopen(pSupporter->path, "w");
- pRes->row = (int32_t)pRes->numOfRows;
+ pRes->row = pRes->numOfRows;
// set the callback function
pSql->fp = tscJoinQueryCallback;
@@ -1085,7 +1085,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
if (pRes1->row > 0 && pRes1->numOfRows > 0) {
- tscDebug("%p sub:%p index:%d numOfRows:%"PRId64" total:%"PRId64 " (not retrieve)", pParentSql, pParentSql->pSubs[i], i,
+ tscDebug("%p sub:%p index:%d numOfRows:%d total:%"PRId64 " (not retrieve)", pParentSql, pParentSql->pSubs[i], i,
pRes1->numOfRows, pRes1->numOfTotal);
assert(pRes1->row < pRes1->numOfRows);
} else {
@@ -1093,7 +1093,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
pRes1->numOfClauseTotal += pRes1->numOfRows;
}
- tscDebug("%p sub:%p index:%d numOfRows:%"PRId64" total:%"PRId64, pParentSql, pParentSql->pSubs[i], i,
+ tscDebug("%p sub:%p index:%d numOfRows:%d total:%"PRId64, pParentSql, pParentSql->pSubs[i], i,
pRes1->numOfRows, pRes1->numOfTotal);
}
}
@@ -1644,6 +1644,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tExtMemBuffer ** pMemoryBuf = NULL;
tOrderDescriptor *pDesc = NULL;
SColumnModel *pModel = NULL;
+ SColumnModel *pFinalModel = NULL;
pRes->qhandle = 0x1; // hack the qhandle check
@@ -1662,7 +1663,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
assert(pState->numOfSub > 0);
- int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
+ int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFinalModel, nBufferSize);
if (ret != 0) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscQueueAsyncRes(pSql);
@@ -1677,7 +1678,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
if (pSql->pSubs == NULL) {
tfree(pSql->pSubs);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
- tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
+ tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel,pState->numOfSub);
tscQueueAsyncRes(pSql);
return ret;
@@ -1707,6 +1708,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->subqueryIndex = i;
trs->pParentSql = pSql;
trs->pFinalColModel = pModel;
+ trs->pFFColModel = pFinalModel;
SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
if (pNew == NULL) {
@@ -1730,13 +1732,13 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
- tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
+ tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource
}
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
- tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
+ tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code;
}
@@ -1876,7 +1878,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tstrerror(pParentSql->res.code));
// release allocated resource
- tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
+ tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, trsupport->pFFColModel,
pState->numOfSub);
tscFreeRetrieveSup(pSql);
@@ -2030,7 +2032,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
assert(pRes->numOfRows == numOfRows);
int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
- tscDebug("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d", pParentSql, pSql,
+ tscDebug("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d", pParentSql, pSql,
pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
@@ -2057,7 +2059,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
}
int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
- (int32_t)pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
+ pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
if (ret != 0) { // set no disk space error info, and abort retry
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
} else if (pRes->completed) {
@@ -2169,7 +2171,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
return;
}
- tscDebug("%p Async insertion completed, total inserted:%" PRId64, pParentObj, pParentObj->res.numOfRows);
+ tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
// restore user defined fp
pParentObj->fp = pParentObj->fetchFp;
@@ -2312,10 +2314,10 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
return;
}
- int32_t totalSize = tscGetResRowLength(pQueryInfo->exprList);
+ int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList);
- assert(numOfRes * totalSize > 0);
- char* tmp = realloc(pRes->pRsp, numOfRes * totalSize);
+ assert(numOfRes * rowSize > 0);
+ char* tmp = realloc(pRes->pRsp, numOfRes * rowSize + sizeof(tFilePage));
if (tmp == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return;
@@ -2323,9 +2325,12 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
pRes->pRsp = tmp;
}
- pRes->data = pRes->pRsp;
+ tFilePage* pFilePage = (tFilePage*) pRes->pRsp;
+ pFilePage->num = numOfRes;
+ pRes->data = pFilePage->data;
char* data = pRes->data;
+
int16_t bytes = 0;
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
@@ -2352,6 +2357,17 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
pRes->numOfRows = numOfRes;
pRes->numOfClauseTotal += numOfRes;
+
+ int32_t finalRowSize = 0;
+ for(int32_t i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
+ TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
+ finalRowSize += pField->bytes;
+ }
+
+ doArithmeticCalculate(pQueryInfo, pFilePage, rowSize, finalRowSize);
+
+ pRes->data = pFilePage->data;
+ tscSetResRawPtr(pRes, pQueryInfo);
}
void tscBuildResFromSubqueries(SSqlObj *pSql) {
@@ -2364,13 +2380,12 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
if (pRes->tsrow == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
+ pRes->numOfCols = (int16_t) tscSqlExprNumOfExprs(pQueryInfo);
- size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
- pRes->numOfCols = (int16_t)numOfExprs;
-
- pRes->tsrow = calloc(numOfExprs, POINTER_BYTES);
- pRes->buffer = calloc(numOfExprs, POINTER_BYTES);
- pRes->length = calloc(numOfExprs, sizeof(int32_t));
+ pRes->tsrow = calloc(pRes->numOfCols, POINTER_BYTES);
+ pRes->urow = calloc(pRes->numOfCols, POINTER_BYTES);
+ pRes->buffer = calloc(pRes->numOfCols, POINTER_BYTES);
+ pRes->length = calloc(pRes->numOfCols, sizeof(int32_t));
if (pRes->tsrow == NULL || pRes->buffer == NULL || pRes->length == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
@@ -2390,7 +2405,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
}
}
-static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
+static UNUSED_FUNC void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
SSqlRes *pRes = &pSql->res;
if (pRes->tsrow[columnIndex] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
@@ -2414,7 +2429,7 @@ static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pF
}
}
-char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
+char *getArithmeticInputSrc(void *param, const char *name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *) param;
int32_t index = -1;
@@ -2432,7 +2447,7 @@ char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
return pSupport->data[index] + pSupport->offset * pExpr->resBytes;
}
-TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) {
+TAOS_ROW doSetResultRowData(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
@@ -2445,22 +2460,20 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
size_t size = tscNumOfFields(pQueryInfo);
- int32_t offset = 0;
-
for (int i = 0; i < size; ++i) {
- tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i, offset);
- TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
+ SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
- offset += pField->bytes;
+ int32_t type = pInfo->field.type;
+ int32_t bytes = pInfo->field.bytes;
- // primary key column cannot be null in interval query, no need to check
- if (i == 0 && pQueryInfo->interval.interval > 0) {
- continue;
+ if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) {
+ pRes->tsrow[i] = isNull(pRes->urow[i], type) ? NULL : pRes->urow[i];
+ } else {
+ pRes->tsrow[i] = isNull(pRes->urow[i], type) ? NULL : varDataVal(pRes->urow[i]);
+ pRes->length[i] = varDataLen(pRes->urow[i]);
}
- if (pRes->tsrow[i] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
- transferNcharData(pSql, i, pField);
- }
+ ((char**) pRes->urow)[i] += bytes;
}
pRes->row++; // index increase one-step
diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c
index 27824fc1ff..7a82bcaaab 100644
--- a/src/client/src/tscUtil.c
+++ b/src/client/src/tscUtil.c
@@ -220,13 +220,11 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
}
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo) {
- size_t numOfOutput = tscNumOfFields(pQueryInfo);
- size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
-
- if (numOfOutput == numOfExprs) {
+ if (tscIsProjectionQuery(pQueryInfo)) {
return false;
}
+ size_t numOfOutput = tscNumOfFields(pQueryInfo);
for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i)->pArithExprInfo;
if (pExprInfo != NULL) {
@@ -265,16 +263,20 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
if (pRes->tsrow == NULL) {
- int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
- pRes->numOfCols = numOfOutput;
+ pRes->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
- pRes->tsrow = calloc(numOfOutput, POINTER_BYTES);
- pRes->length = calloc(numOfOutput, sizeof(int32_t));
- pRes->buffer = calloc(numOfOutput, POINTER_BYTES);
+ pRes->tsrow = calloc(pRes->numOfCols, POINTER_BYTES);
+ pRes->urow = calloc(pRes->numOfCols, POINTER_BYTES);
+ pRes->length = calloc(pRes->numOfCols, sizeof(int32_t));
+ pRes->buffer = calloc(pRes->numOfCols, POINTER_BYTES);
// not enough memory
- if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) {
+ if (pRes->tsrow == NULL || pRes->urow == NULL || pRes->length == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) {
tfree(pRes->tsrow);
+ tfree(pRes->urow);
+ tfree(pRes->length);
+ tfree(pRes->buffer);
+
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pRes->code;
}
@@ -283,6 +285,71 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS;
}
+void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
+ assert(pRes->numOfCols > 0);
+
+ int32_t offset = 0;
+
+ for (int32_t i = 0; i < pRes->numOfCols; ++i) {
+ SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
+
+ pRes->urow[i] = pRes->data + offset * pRes->numOfRows;
+ pRes->length[i] = pInfo->field.bytes;
+
+ offset += pInfo->field.bytes;
+
+ // generated the user-defined column result
+ if (pInfo->pSqlExpr != NULL && TSDB_COL_IS_UD_COL(pInfo->pSqlExpr->colInfo.flag)) {
+ if (pInfo->pSqlExpr->param[1].nType == TSDB_DATA_TYPE_NULL) {
+ setNullN(pRes->urow[i], pInfo->field.type, pInfo->field.bytes, (int32_t) pRes->numOfRows);
+ } else {
+ if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR || pInfo->field.type == TSDB_DATA_TYPE_BINARY) {
+ assert(pInfo->pSqlExpr->param[1].nLen <= pInfo->field.bytes);
+
+ for (int32_t k = 0; k < pRes->numOfRows; ++k) {
+ char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
+
+ memcpy(varDataVal(p), pInfo->pSqlExpr->param[1].pz, pInfo->pSqlExpr->param[1].nLen);
+ varDataSetLen(p, pInfo->pSqlExpr->param[1].nLen);
+ }
+ } else {
+ for (int32_t k = 0; k < pRes->numOfRows; ++k) {
+ char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
+ memcpy(p, &pInfo->pSqlExpr->param[1].i64Key, pInfo->field.bytes);
+ }
+ }
+ }
+
+ } else if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR) {
+ // convert unicode to native code in a temporary buffer extra one byte for terminated symbol
+ pRes->buffer[i] = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
+
+ // string terminated char for binary data
+ memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows);
+
+ char* p = pRes->urow[i];
+ for (int32_t k = 0; k < pRes->numOfRows; ++k) {
+ char* dst = pRes->buffer[i] + k * pInfo->field.bytes;
+
+ if (isNull(p, TSDB_DATA_TYPE_NCHAR)) {
+ memcpy(dst, p, varDataTLen(p));
+ } else {
+ int32_t length = taosUcs4ToMbs(varDataVal(p), varDataLen(p), varDataVal(dst));
+ varDataSetLen(dst, length);
+
+ if (length == 0) {
+ tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
+ }
+ }
+
+ p += pInfo->field.bytes;
+ }
+
+ memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
+ }
+ }
+}
+
static void tscDestroyResPointerInfo(SSqlRes* pRes) {
if (pRes->buffer != NULL) { // free all buffers containing the multibyte string
for (int i = 0; i < pRes->numOfCols; i++) {
@@ -297,6 +364,7 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) {
tfree(pRes->tsrow);
tfree(pRes->length);
tfree(pRes->buffer);
+ tfree(pRes->urow);
tfree(pRes->pGroupRec);
tfree(pRes->pColumnIndex);
diff --git a/src/connector/grafanaplugin b/src/connector/grafanaplugin
index d598db167e..ec77d9049a 160000
--- a/src/connector/grafanaplugin
+++ b/src/connector/grafanaplugin
@@ -1 +1 @@
-Subproject commit d598db167eb256fe67409b7bb3d0eb7fffc3ff8c
+Subproject commit ec77d9049a719dabfd1a7c1122a209e201861944
diff --git a/src/connector/jdbc/pom.xml b/src/connector/jdbc/pom.xml
index 3b62f66d2e..e7124a0599 100755
--- a/src/connector/jdbc/pom.xml
+++ b/src/connector/jdbc/pom.xml
@@ -56,6 +56,23 @@
test
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.8
+
+
+ org.apache.commons
+ commons-lang3
+ 3.9
+
+
+ com.alibaba
+ fastjson
+ 1.2.58
+
+
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/AbstractTaosDriver.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/AbstractTaosDriver.java
new file mode 100644
index 0000000000..f864788bff
--- /dev/null
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/AbstractTaosDriver.java
@@ -0,0 +1,161 @@
+package com.taosdata.jdbc;
+
+import java.io.*;
+import java.sql.Driver;
+import java.sql.DriverPropertyInfo;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+public abstract class AbstractTaosDriver implements Driver {
+
+ private static final String TAOS_CFG_FILENAME = "taos.cfg";
+
+ /**
+ * @param cfgDirPath
+ * @return return the config dir
+ **/
+ protected File loadConfigDir(String cfgDirPath) {
+ if (cfgDirPath == null)
+ return loadDefaultConfigDir();
+ File cfgDir = new File(cfgDirPath);
+ if (!cfgDir.exists())
+ return loadDefaultConfigDir();
+ return cfgDir;
+ }
+
+ /**
+ * @return search the default config dir, if the config dir is not exist will return null
+ */
+ protected File loadDefaultConfigDir() {
+ File cfgDir;
+ File cfgDir_linux = new File("/etc/taos");
+ cfgDir = cfgDir_linux.exists() ? cfgDir_linux : null;
+ File cfgDir_windows = new File("C:\\TDengine\\cfg");
+ cfgDir = (cfgDir == null && cfgDir_windows.exists()) ? cfgDir_windows : cfgDir;
+ return cfgDir;
+ }
+
+ protected List loadConfigEndpoints(File cfgFile) {
+ List endpoints = new ArrayList<>();
+ try (BufferedReader reader = new BufferedReader(new FileReader(cfgFile))) {
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ if (line.trim().startsWith("firstEp") || line.trim().startsWith("secondEp")) {
+ endpoints.add(line.substring(line.indexOf('p') + 1).trim());
+ }
+ if (endpoints.size() > 1)
+ break;
+ }
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return endpoints;
+ }
+
+ protected void loadTaosConfig(Properties info) {
+ if ((info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null ||
+ info.getProperty(TSDBDriver.PROPERTY_KEY_HOST).isEmpty()) && (
+ info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null ||
+ info.getProperty(TSDBDriver.PROPERTY_KEY_PORT).isEmpty())) {
+ File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR));
+ File cfgFile = cfgDir.listFiles((dir, name) -> TAOS_CFG_FILENAME.equalsIgnoreCase(name))[0];
+ List endpoints = loadConfigEndpoints(cfgFile);
+ if (!endpoints.isEmpty()) {
+ info.setProperty(TSDBDriver.PROPERTY_KEY_HOST, endpoints.get(0).split(":")[0]);
+ info.setProperty(TSDBDriver.PROPERTY_KEY_PORT, endpoints.get(0).split(":")[1]);
+ }
+ }
+ }
+
+ protected DriverPropertyInfo[] getPropertyInfo(Properties info) {
+ DriverPropertyInfo hostProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_HOST, info.getProperty(TSDBDriver.PROPERTY_KEY_HOST));
+ hostProp.required = false;
+ hostProp.description = "Hostname";
+
+ DriverPropertyInfo portProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_PORT, info.getProperty(TSDBDriver.PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
+ portProp.required = false;
+ portProp.description = "Port";
+
+ DriverPropertyInfo dbProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_DBNAME, info.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME));
+ dbProp.required = false;
+ dbProp.description = "Database name";
+
+ DriverPropertyInfo userProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_USER, info.getProperty(TSDBDriver.PROPERTY_KEY_USER));
+ userProp.required = true;
+ userProp.description = "User";
+
+ DriverPropertyInfo passwordProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_PASSWORD, info.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD));
+ passwordProp.required = true;
+ passwordProp.description = "Password";
+
+ DriverPropertyInfo[] propertyInfo = new DriverPropertyInfo[5];
+ propertyInfo[0] = hostProp;
+ propertyInfo[1] = portProp;
+ propertyInfo[2] = dbProp;
+ propertyInfo[3] = userProp;
+ propertyInfo[4] = passwordProp;
+ return propertyInfo;
+ }
+
+ protected Properties parseURL(String url, Properties defaults) {
+ Properties urlProps = (defaults != null) ? defaults : new Properties();
+
+ // parse properties
+ int beginningOfSlashes = url.indexOf("//");
+ int index = url.indexOf("?");
+ if (index != -1) {
+ String paramString = url.substring(index + 1, url.length());
+ url = url.substring(0, index);
+ StringTokenizer queryParams = new StringTokenizer(paramString, "&");
+ while (queryParams.hasMoreElements()) {
+ String parameterValuePair = queryParams.nextToken();
+ int indexOfEqual = parameterValuePair.indexOf("=");
+ String parameter = null;
+ String value = null;
+ if (indexOfEqual != -1) {
+ parameter = parameterValuePair.substring(0, indexOfEqual);
+ if (indexOfEqual + 1 < parameterValuePair.length()) {
+ value = parameterValuePair.substring(indexOfEqual + 1);
+ }
+ }
+ if ((value != null && value.length() > 0) && (parameter != null && parameter.length() > 0)) {
+ urlProps.setProperty(parameter, value);
+ }
+ }
+ }
+
+ // parse Product Name
+ String dbProductName = url.substring(0, beginningOfSlashes);
+ dbProductName = dbProductName.substring(dbProductName.indexOf(":") + 1);
+ dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
+ // parse dbname
+ url = url.substring(beginningOfSlashes + 2);
+ int indexOfSlash = url.indexOf("/");
+ if (indexOfSlash != -1) {
+ if (indexOfSlash + 1 < url.length()) {
+ urlProps.setProperty(TSDBDriver.PROPERTY_KEY_DBNAME, url.substring(indexOfSlash + 1));
+ }
+ url = url.substring(0, indexOfSlash);
+ }
+ // parse port
+ int indexOfColon = url.indexOf(":");
+ if (indexOfColon != -1) {
+ if (indexOfColon + 1 < url.length()) {
+ urlProps.setProperty(TSDBDriver.PROPERTY_KEY_PORT, url.substring(indexOfColon + 1));
+ }
+ url = url.substring(0, indexOfColon);
+ }
+ // parse host
+ if (url != null && url.length() > 0 && url.trim().length() > 0) {
+ urlProps.setProperty(TSDBDriver.PROPERTY_KEY_HOST, url);
+ }
+ return urlProps;
+ }
+
+
+
+}
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/ColumnMetaData.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/ColumnMetaData.java
index 5c7f80c715..633fdcd5ab 100644
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/ColumnMetaData.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/ColumnMetaData.java
@@ -16,10 +16,10 @@ package com.taosdata.jdbc;
public class ColumnMetaData {
- int colType = 0;
- String colName = null;
- int colSize = -1;
- int colIndex = 0;
+ private int colType = 0;
+ private String colName = null;
+ private int colSize = -1;
+ private int colIndex = 0;
public int getColSize() {
return colSize;
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java
index f93412ffec..94abe39655 100644
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java
@@ -14,7 +14,6 @@
*****************************************************************************/
package com.taosdata.jdbc;
-import java.io.*;
import java.sql.Array;
import java.sql.Blob;
import java.sql.CallableStatement;
@@ -35,11 +34,10 @@ import java.util.*;
import java.util.concurrent.Executor;
public class TSDBConnection implements Connection {
+ protected Properties props = null;
private TSDBJNIConnector connector = null;
- protected Properties props = null;
-
private String catalog = null;
private TSDBDatabaseMetaData dbMetaData = null;
@@ -47,15 +45,21 @@ public class TSDBConnection implements Connection {
private Properties clientInfoProps = new Properties();
private int timeoutMilliseconds = 0;
-
- private String tsCharSet = "";
+
+ private boolean batchFetch = false;
public TSDBConnection(Properties info, TSDBDatabaseMetaData meta) throws SQLException {
this.dbMetaData = meta;
connect(info.getProperty(TSDBDriver.PROPERTY_KEY_HOST),
Integer.parseInt(info.getProperty(TSDBDriver.PROPERTY_KEY_PORT, "0")),
- info.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME), info.getProperty(TSDBDriver.PROPERTY_KEY_USER),
+ info.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME),
+ info.getProperty(TSDBDriver.PROPERTY_KEY_USER),
info.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD));
+
+ String batchLoad = info.getProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD);
+ if (batchLoad != null) {
+ this.batchFetch = Boolean.parseBoolean(batchLoad);
+ }
}
private void connect(String host, int port, String dbName, String user, String password) throws SQLException {
@@ -223,6 +227,14 @@ public class TSDBConnection implements Connection {
return this.prepareStatement(sql);
}
+
+ public Boolean getBatchFetch() {
+ return this.batchFetch;
+ }
+
+ public void setBatchFetch(Boolean batchFetch) {
+ this.batchFetch = batchFetch;
+ }
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBDriver.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBDriver.java
index 63c42ca399..06f88cebfa 100755
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBDriver.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBDriver.java
@@ -14,7 +14,6 @@
*****************************************************************************/
package com.taosdata.jdbc;
-import java.io.*;
import java.sql.*;
import java.util.*;
import java.util.logging.Logger;
@@ -38,7 +37,7 @@ import java.util.logging.Logger;
* register it with the DriverManager. This means that a user can load and
* register a driver by doing Class.forName("foo.bah.Driver")
*/
-public class TSDBDriver implements java.sql.Driver {
+public class TSDBDriver extends AbstractTaosDriver {
@Deprecated
private static final String URL_PREFIX1 = "jdbc:TSDB://";
@@ -87,6 +86,11 @@ public class TSDBDriver implements java.sql.Driver {
*/
public static final String PROPERTY_KEY_CHARSET = "charset";
+ /**
+ * fetch data from native function in a batch model
+ */
+ public static final String PROPERTY_KEY_BATCH_LOAD = "batchfetch";
+
private TSDBDatabaseMetaData dbMetaData = null;
static {
@@ -97,50 +101,6 @@ public class TSDBDriver implements java.sql.Driver {
}
}
- private List loadConfigEndpoints(File cfgFile) {
- List endpoints = new ArrayList<>();
- try (BufferedReader reader = new BufferedReader(new FileReader(cfgFile))) {
- String line = null;
- while ((line = reader.readLine()) != null) {
- if (line.trim().startsWith("firstEp") || line.trim().startsWith("secondEp")) {
- endpoints.add(line.substring(line.indexOf('p') + 1).trim());
- }
- if (endpoints.size() > 1)
- break;
- }
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return endpoints;
- }
-
- /**
- * @param cfgDirPath
- * @return return the config dir
- **/
- private File loadConfigDir(String cfgDirPath) {
- if (cfgDirPath == null)
- return loadDefaultConfigDir();
- File cfgDir = new File(cfgDirPath);
- if (!cfgDir.exists())
- return loadDefaultConfigDir();
- return cfgDir;
- }
-
- /**
- * @return search the default config dir, if the config dir is not exist will return null
- */
- private File loadDefaultConfigDir() {
- File cfgDir;
- File cfgDir_linux = new File("/etc/taos");
- cfgDir = cfgDir_linux.exists() ? cfgDir_linux : null;
- File cfgDir_windows = new File("C:\\TDengine\\cfg");
- cfgDir = (cfgDir == null && cfgDir_windows.exists()) ? cfgDir_windows : cfgDir;
- return cfgDir;
- }
-
public Connection connect(String url, Properties info) throws SQLException {
if (url == null)
throw new SQLException(TSDBConstants.WrapErrMsg("url is not set!"));
@@ -152,26 +112,12 @@ public class TSDBDriver implements java.sql.Driver {
if ((props = parseURL(url, info)) == null) {
return null;
}
-
//load taos.cfg start
- if ((info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null ||
- info.getProperty(TSDBDriver.PROPERTY_KEY_HOST).isEmpty()) && (
- info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null ||
- info.getProperty(TSDBDriver.PROPERTY_KEY_PORT).isEmpty())) {
- File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR));
- File cfgFile = cfgDir.listFiles((dir, name) -> "taos.cfg".equalsIgnoreCase(name))[0];
- List endpoints = loadConfigEndpoints(cfgFile);
- if (!endpoints.isEmpty()) {
- info.setProperty(TSDBDriver.PROPERTY_KEY_HOST, endpoints.get(0).split(":")[0]);
- info.setProperty(TSDBDriver.PROPERTY_KEY_PORT, endpoints.get(0).split(":")[1]);
- }
- }
+ loadTaosConfig(info);
try {
- TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR),
- (String) props.get(PROPERTY_KEY_LOCALE),
- (String) props.get(PROPERTY_KEY_CHARSET),
- (String) props.get(PROPERTY_KEY_TIME_ZONE));
+ TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR), (String) props.get(PROPERTY_KEY_LOCALE),
+ (String) props.get(PROPERTY_KEY_CHARSET), (String) props.get(PROPERTY_KEY_TIME_ZONE));
Connection newConn = new TSDBConnection(props, this.dbMetaData);
return newConn;
} catch (SQLWarning sqlWarning) {
@@ -208,39 +154,13 @@ public class TSDBDriver implements java.sql.Driver {
info = parseURL(url, info);
}
- DriverPropertyInfo hostProp = new DriverPropertyInfo(PROPERTY_KEY_HOST, info.getProperty(PROPERTY_KEY_HOST));
- hostProp.required = false;
- hostProp.description = "Hostname";
-
- DriverPropertyInfo portProp = new DriverPropertyInfo(PROPERTY_KEY_PORT, info.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
- portProp.required = false;
- portProp.description = "Port";
-
- DriverPropertyInfo dbProp = new DriverPropertyInfo(PROPERTY_KEY_DBNAME, info.getProperty(PROPERTY_KEY_DBNAME));
- dbProp.required = false;
- dbProp.description = "Database name";
-
- DriverPropertyInfo userProp = new DriverPropertyInfo(PROPERTY_KEY_USER, info.getProperty(PROPERTY_KEY_USER));
- userProp.required = true;
- userProp.description = "User";
-
- DriverPropertyInfo passwordProp = new DriverPropertyInfo(PROPERTY_KEY_PASSWORD, info.getProperty(PROPERTY_KEY_PASSWORD));
- passwordProp.required = true;
- passwordProp.description = "Password";
-
- DriverPropertyInfo[] propertyInfo = new DriverPropertyInfo[5];
- propertyInfo[0] = hostProp;
- propertyInfo[1] = portProp;
- propertyInfo[2] = dbProp;
- propertyInfo[3] = userProp;
- propertyInfo[4] = passwordProp;
-
- return propertyInfo;
+ return getPropertyInfo(info);
}
/**
* example: jdbc:TAOS://127.0.0.1:0/db?user=root&password=your_password
*/
+ @Override
public Properties parseURL(String url, Properties defaults) {
Properties urlProps = (defaults != null) ? defaults : new Properties();
if (url == null || url.length() <= 0 || url.trim().length() <= 0)
@@ -257,26 +177,21 @@ public class TSDBDriver implements java.sql.Driver {
url = url.substring(0, index);
StringTokenizer queryParams = new StringTokenizer(paramString, "&");
while (queryParams.hasMoreElements()) {
- String parameterValuePair = queryParams.nextToken();
- int indexOfEqual = parameterValuePair.indexOf("=");
- String parameter = null;
- String value = null;
- if (indexOfEqual != -1) {
- parameter = parameterValuePair.substring(0, indexOfEqual);
- if (indexOfEqual + 1 < parameterValuePair.length()) {
- value = parameterValuePair.substring(indexOfEqual + 1);
- }
- }
- if ((value != null && value.length() > 0) && (parameter != null && parameter.length() > 0)) {
- urlProps.setProperty(parameter, value);
+ String oneToken = queryParams.nextToken();
+ String[] pair = oneToken.split("=");
+
+ if ((pair[0] != null && pair[0].trim().length() > 0) && (pair[1] != null && pair[1].trim().length() > 0)) {
+ urlProps.setProperty(pair[0].trim(), pair[1].trim());
}
}
}
+
// parse Product Name
String dbProductName = url.substring(0, beginningOfSlashes);
dbProductName = dbProductName.substring(dbProductName.indexOf(":") + 1);
dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
- // parse dbname
+
+ // parse database name
url = url.substring(beginningOfSlashes + 2);
int indexOfSlash = url.indexOf("/");
if (indexOfSlash != -1) {
@@ -285,6 +200,7 @@ public class TSDBDriver implements java.sql.Driver {
}
url = url.substring(0, indexOfSlash);
}
+
// parse port
int indexOfColon = url.indexOf(":");
if (indexOfColon != -1) {
@@ -293,89 +209,15 @@ public class TSDBDriver implements java.sql.Driver {
}
url = url.substring(0, indexOfColon);
}
+
if (url != null && url.length() > 0 && url.trim().length() > 0) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_HOST, url);
}
-
+
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty(TSDBDriver.PROPERTY_KEY_USER));
-
- /*
- String urlForMeta = url;
- String dbProductName = url.substring(url.indexOf(":") + 1);
- dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
- int beginningOfSlashes = url.indexOf("//");
- url = url.substring(beginningOfSlashes + 2);
-
- String host = url.substring(0, url.indexOf(":"));
- url = url.substring(url.indexOf(":") + 1);
- urlProps.setProperty(PROPERTY_KEY_HOST, host);
-
- String port = url.substring(0, url.indexOf("/"));
- urlProps.setProperty(PROPERTY_KEY_PORT, port);
- url = url.substring(url.indexOf("/") + 1);
-
- if (url.indexOf("?") != -1) {
- String dbName = url.substring(0, url.indexOf("?"));
- urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName);
- url = url.trim().substring(url.indexOf("?") + 1);
- } else {
- // without user & password so return
- if (!url.trim().isEmpty()) {
- String dbName = url.trim();
- urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName);
- }
- this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty("user"));
- return urlProps;
- }
-
- String user = "";
-
- if (url.indexOf("&") == -1) {
- String[] kvPair = url.trim().split("=");
- if (kvPair.length == 2) {
- setPropertyValue(urlProps, kvPair);
- return urlProps;
- }
- }
-
- String[] queryStrings = url.trim().split("&");
- for (String queryStr : queryStrings) {
- String[] kvPair = queryStr.trim().split("=");
- if (kvPair.length < 2) {
- continue;
- }
- setPropertyValue(urlProps, kvPair);
- }
-
- user = urlProps.getProperty(PROPERTY_KEY_USER).toString();
- this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, user);
-*/
return urlProps;
}
- private void setPropertyValue(Properties property, String[] keyValuePair) {
- switch (keyValuePair[0].toLowerCase()) {
- case PROPERTY_KEY_USER:
- property.setProperty(PROPERTY_KEY_USER, keyValuePair[1]);
- break;
- case PROPERTY_KEY_PASSWORD:
- property.setProperty(PROPERTY_KEY_PASSWORD, keyValuePair[1]);
- break;
- case PROPERTY_KEY_TIME_ZONE:
- property.setProperty(PROPERTY_KEY_TIME_ZONE, keyValuePair[1]);
- break;
- case PROPERTY_KEY_LOCALE:
- property.setProperty(PROPERTY_KEY_LOCALE, keyValuePair[1]);
- break;
- case PROPERTY_KEY_CHARSET:
- property.setProperty(PROPERTY_KEY_CHARSET, keyValuePair[1]);
- break;
- case PROPERTY_KEY_CONFIG_DIR:
- property.setProperty(PROPERTY_KEY_CONFIG_DIR, keyValuePair[1]);
- break;
- }
- }
-
public int getMajorVersion() {
return 2;
}
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
index edc160e323..f918463439 100755
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
@@ -243,6 +243,11 @@ public class TSDBJNIConnector {
private native int fetchRowImp(long connection, long resultSet, TSDBResultSetRowData rowData);
+ public int fetchBlock(long resultSet, TSDBResultSetBlockData blockData) {
+ return this.fetchBlockImp(this.taos, resultSet, blockData);
+ }
+
+ private native int fetchBlockImp(long connection, long resultSet, TSDBResultSetBlockData blockData);
/**
* Execute close operation from C to release connection pointer by JNI
*
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java
index 961633b8ae..84a3f58f46 100644
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java
@@ -47,10 +47,14 @@ public class TSDBResultSet implements ResultSet {
private List columnMetaDataList = new ArrayList();
private TSDBResultSetRowData rowData;
+ private TSDBResultSetBlockData blockData;
+ private boolean batchFetch = false;
private boolean lastWasNull = false;
private final int COLUMN_INDEX_START_VALUE = 1;
+ private int rowIndex = 0;
+
public TSDBJNIConnector getJniConnector() {
return jniConnector;
}
@@ -67,6 +71,14 @@ public class TSDBResultSet implements ResultSet {
this.resultSetPointer = resultSetPointer;
}
+ public void setBatchFetch(boolean batchFetch) {
+ this.batchFetch = batchFetch;
+ }
+
+ public Boolean getBatchFetch() {
+ return this.batchFetch;
+ }
+
public List getColumnMetaDataList() {
return columnMetaDataList;
}
@@ -94,8 +106,8 @@ public class TSDBResultSet implements ResultSet {
public TSDBResultSet() {
}
- public TSDBResultSet(TSDBJNIConnector connecter, long resultSetPointer) throws SQLException {
- this.jniConnector = connecter;
+ public TSDBResultSet(TSDBJNIConnector connector, long resultSetPointer) throws SQLException {
+ this.jniConnector = connector;
this.resultSetPointer = resultSetPointer;
int code = this.jniConnector.getSchemaMetaData(this.resultSetPointer, this.columnMetaDataList);
if (code == TSDBConstants.JNI_CONNECTION_NULL) {
@@ -107,6 +119,7 @@ public class TSDBResultSet implements ResultSet {
}
this.rowData = new TSDBResultSetRowData(this.columnMetaDataList.size());
+ this.blockData = new TSDBResultSetBlockData(this.columnMetaDataList, this.columnMetaDataList.size());
}
public T unwrap(Class iface) throws SQLException {
@@ -118,21 +131,42 @@ public class TSDBResultSet implements ResultSet {
}
public boolean next() throws SQLException {
- if (rowData != null) {
- this.rowData.clear();
- }
+ if (this.getBatchFetch()) {
+ if (this.blockData.forward()) {
+ return true;
+ }
+
+ int code = this.jniConnector.fetchBlock(this.resultSetPointer, this.blockData);
+ this.blockData.reset();
+
+ if (code == TSDBConstants.JNI_CONNECTION_NULL) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ } else if (code == TSDBConstants.JNI_RESULT_SET_NULL) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_RESULT_SET_NULL));
+ } else if (code == TSDBConstants.JNI_NUM_OF_FIELDS_0) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_NUM_OF_FIELDS_0));
+ } else if (code == TSDBConstants.JNI_FETCH_END) {
+ return false;
+ }
- int code = this.jniConnector.fetchRow(this.resultSetPointer, this.rowData);
- if (code == TSDBConstants.JNI_CONNECTION_NULL) {
- throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
- } else if (code == TSDBConstants.JNI_RESULT_SET_NULL) {
- throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_RESULT_SET_NULL));
- } else if (code == TSDBConstants.JNI_NUM_OF_FIELDS_0) {
- throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_NUM_OF_FIELDS_0));
- } else if (code == TSDBConstants.JNI_FETCH_END) {
- return false;
- } else {
return true;
+ } else {
+ if (rowData != null) {
+ this.rowData.clear();
+ }
+
+ int code = this.jniConnector.fetchRow(this.resultSetPointer, this.rowData);
+ if (code == TSDBConstants.JNI_CONNECTION_NULL) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ } else if (code == TSDBConstants.JNI_RESULT_SET_NULL) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_RESULT_SET_NULL));
+ } else if (code == TSDBConstants.JNI_NUM_OF_FIELDS_0) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_NUM_OF_FIELDS_0));
+ } else if (code == TSDBConstants.JNI_FETCH_END) {
+ return false;
+ } else {
+ return true;
+ }
}
}
@@ -155,21 +189,30 @@ public class TSDBResultSet implements ResultSet {
String res = null;
int colIndex = getTrueColumnIndex(columnIndex);
- this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ if (!this.getBatchFetch()) {
+ this.lastWasNull = this.rowData.wasNull(colIndex);
+ if (!lastWasNull) {
+ res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
+ return res;
+ } else {
+ return this.blockData.getString(colIndex);
}
- return res;
}
public boolean getBoolean(int columnIndex) throws SQLException {
boolean res = false;
int colIndex = getTrueColumnIndex(columnIndex);
- this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getBoolean(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ if (!this.getBatchFetch()) {
+ this.lastWasNull = this.rowData.wasNull(colIndex);
+ if (!lastWasNull) {
+ res = this.rowData.getBoolean(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
+ } else {
+ return this.blockData.getBoolean(colIndex);
}
+
return res;
}
@@ -177,66 +220,91 @@ public class TSDBResultSet implements ResultSet {
byte res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
- this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = (byte) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ if (!this.getBatchFetch()) {
+ this.lastWasNull = this.rowData.wasNull(colIndex);
+ if (!lastWasNull) {
+ res = (byte) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
+ return res;
+ } else {
+ return (byte) this.blockData.getInt(colIndex);
}
- return res;
}
public short getShort(int columnIndex) throws SQLException {
short res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
- this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = (short) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ if (!this.getBatchFetch()) {
+ this.lastWasNull = this.rowData.wasNull(colIndex);
+ if (!lastWasNull) {
+ res = (short) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
+ return res;
+ } else {
+ return (short) this.blockData.getInt(colIndex);
}
- return res;
}
public int getInt(int columnIndex) throws SQLException {
int res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
- this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ if (!this.getBatchFetch()) {
+ this.lastWasNull = this.rowData.wasNull(colIndex);
+ if (!lastWasNull) {
+ res = this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
+ return res;
+ } else {
+ return this.blockData.getInt(colIndex);
}
- return res;
+
}
public long getLong(int columnIndex) throws SQLException {
long res = 0l;
int colIndex = getTrueColumnIndex(columnIndex);
- this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ if (!this.getBatchFetch()) {
+ this.lastWasNull = this.rowData.wasNull(colIndex);
+ if (!lastWasNull) {
+ res = this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
+ return res;
+ } else {
+ return this.blockData.getLong(colIndex);
}
- return res;
}
public float getFloat(int columnIndex) throws SQLException {
float res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
- this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getFloat(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ if (!this.getBatchFetch()) {
+ this.lastWasNull = this.rowData.wasNull(colIndex);
+ if (!lastWasNull) {
+ res = this.rowData.getFloat(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
+ return res;
+ } else {
+ return (float) this.blockData.getDouble(colIndex);
}
- return res;
}
public double getDouble(int columnIndex) throws SQLException {
double res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
- this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getDouble(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ if (!this.getBatchFetch()) {
+ this.lastWasNull = this.rowData.wasNull(colIndex);
+ if (!lastWasNull) {
+ res = this.rowData.getDouble(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
+ return res;
+ } else {
+ return this.blockData.getDouble(colIndex);
}
- return res;
}
/*
@@ -249,25 +317,11 @@ public class TSDBResultSet implements ResultSet {
*/
@Deprecated
public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
- BigDecimal res = null;
- int colIndex = getTrueColumnIndex(columnIndex);
-
- this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()));
- }
- return res;
+ return new BigDecimal(getLong(columnIndex));
}
public byte[] getBytes(int columnIndex) throws SQLException {
- byte[] res = null;
- int colIndex = getTrueColumnIndex(columnIndex);
-
- this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType()).getBytes();
- }
- return res;
+ return getString(columnIndex).getBytes();
}
public Date getDate(int columnIndex) throws SQLException {
@@ -284,11 +338,15 @@ public class TSDBResultSet implements ResultSet {
Timestamp res = null;
int colIndex = getTrueColumnIndex(columnIndex);
- this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getTimestamp(colIndex);
+ if (!this.getBatchFetch()) {
+ this.lastWasNull = this.rowData.wasNull(colIndex);
+ if (!lastWasNull) {
+ res = this.rowData.getTimestamp(colIndex);
+ }
+ return res;
+ } else {
+ return this.blockData.getTimestamp(columnIndex);
}
- return res;
}
public InputStream getAsciiStream(int columnIndex) throws SQLException {
@@ -400,8 +458,12 @@ public class TSDBResultSet implements ResultSet {
public Object getObject(int columnIndex) throws SQLException {
int colIndex = getTrueColumnIndex(columnIndex);
- this.lastWasNull = this.rowData.wasNull(colIndex);
- return this.rowData.get(colIndex);
+ if (!this.getBatchFetch()) {
+ this.lastWasNull = this.rowData.wasNull(colIndex);
+ return this.rowData.get(colIndex);
+ } else {
+ return this.blockData.get(colIndex);
+ }
}
public Object getObject(String columnLabel) throws SQLException {
@@ -433,8 +495,12 @@ public class TSDBResultSet implements ResultSet {
public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
int colIndex = getTrueColumnIndex(columnIndex);
- this.lastWasNull = this.rowData.wasNull(colIndex);
- return new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()));
+ if (!this.getBatchFetch()) {
+ this.lastWasNull = this.rowData.wasNull(colIndex);
+ return new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()));
+ } else {
+ return new BigDecimal(this.blockData.getLong(colIndex));
+ }
}
public BigDecimal getBigDecimal(String columnLabel) throws SQLException {
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSetBlockData.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSetBlockData.java
new file mode 100644
index 0000000000..9352cf5253
--- /dev/null
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSetBlockData.java
@@ -0,0 +1,497 @@
+/***************************************************************************
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *****************************************************************************/
+package com.taosdata.jdbc;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.DoubleBuffer;
+import java.nio.FloatBuffer;
+import java.nio.IntBuffer;
+import java.nio.LongBuffer;
+import java.nio.ShortBuffer;
+import java.sql.SQLDataException;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class TSDBResultSetBlockData {
+ private int numOfRows = 0;
+ private int rowIndex = 0;
+
+ private List columnMetaDataList;
+ private ArrayList