From 79031c47a2492cdc445c01cb49f3ab11a8360ccc Mon Sep 17 00:00:00 2001 From: huolibo Date: Thu, 25 May 2023 17:39:01 +0800 Subject: [PATCH] enh: add get offset --- .../client/jni/com_taosdata_jdbc_tmq_TMQConnector.h | 7 +++++++ source/client/src/clientTmqConnector.c | 13 +++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h index 094e947476..422bcd57ac 100644 --- a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h +++ b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h @@ -158,6 +158,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetVgroupId(JN */ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableName(JNIEnv *, jobject, jlong); +/* + * Class: com_taosdata_jdbc_tmq_TMQConnector + * Method: tmqGetOffset + * Signature: (J)Ljava/lang/String; + */ +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNIEnv *, jobject, jlong); + /* * Class: com_taosdata_jdbc_tmq_TMQConnector * Method: fetchBlockImp diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c index 5edaca48f8..6b0bfc09e8 100644 --- a/source/client/src/clientTmqConnector.c +++ b/source/client/src/clientTmqConnector.c @@ -390,6 +390,15 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableNam return (*env)->NewStringUTF(env, tmq_get_table_name(res)); } +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNIEnv *env, jobject jobj, jlong jres) { + TAOS_RES *res = (TAOS_RES *)jres; + if (res == NULL) { + jniDebug("jobj:%p, invalid res handle", jobj); + return NULL; + } + return tmq_get_vgroup_offset(res); +} + JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *env, jobject jobj, jlong con, jlong res, jobject rowobj, jobject arrayListObj) { @@ -491,7 +500,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign (*env)->ReleaseStringUTFChars(env, jtopic, topicName); jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res, tmq_err2str(res)); - taosMemoryFree(pAssign); + tmq_free_assignment(pAssign); return (jint)res; } @@ -506,6 +515,6 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign (*env)->CallVoidMethod(env, jassignment, g_assignmentSetEnd, assignment.end); (*env)->CallBooleanMethod(env, jarrayList, g_arrayListAddFp, jassignment); } - taosMemoryFree(pAssign); + tmq_free_assignment(pAssign); return JNI_SUCCESS; }