From f9b1906127a67711d127af40d4c4ce89964f6b97 Mon Sep 17 00:00:00 2001 From: huolibo Date: Fri, 12 May 2023 17:41:55 +0800 Subject: [PATCH 1/4] feat(driver): jdbc add tmq seek function --- .../jni/com_taosdata_jdbc_tmq_TMQConnector.h | 6 + source/client/src/clientTmqConnector.c | 118 +++++++++++++++++- 2 files changed, 120 insertions(+), 4 deletions(-) diff --git a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h index c035b6598c..094e947476 100644 --- a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h +++ b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h @@ -166,6 +166,12 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableNam JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *, jobject, jlong, jlong, jobject, jobject); +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv *, jobject, jlong, jstring, jint, + jlong); + +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong, + jstring, jobject); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c index 894c51d13c..5edaca48f8 100644 --- a/source/client/src/clientTmqConnector.c +++ b/source/client/src/clientTmqConnector.c @@ -17,9 +17,16 @@ #include "jniCommon.h" #include "taos.h" -int __init_tmq = 0; +int __init_tmq = 0; jmethodID g_offsetCallback; +jclass g_assignmentClass; +jmethodID g_assignmentConstructor; +jmethodID g_assignmentSetVgId; +jmethodID g_assignmentSetCurrentOffset; +jmethodID g_assignmentSetBegin; +jmethodID g_assignmentSetEnd; + void tmqGlobalMethod(JNIEnv *env) { // make sure init function executed once switch (atomic_val_compare_exchange_32(&__init_tmq, 0, 1)) { @@ -46,6 +53,38 @@ void tmqGlobalMethod(JNIEnv *env) { jniDebug("tmq method register finished"); } +int __init_assignment = 0; +void tmqAssignmentMethod(JNIEnv *env) { + // make sure init function executed once + switch (atomic_val_compare_exchange_32(&__init_assignment, 0, 1)) { + case 0: + break; + case 1: + do { + taosMsleep(0); + } while (atomic_load_32(&__init_assignment) == 1); + case 2: + return; + } + + if (g_vm == NULL) { + (*env)->GetJavaVM(env, &g_vm); + } + + jclass assignment = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/Assignment"); + g_assignmentClass = (*env)->NewGlobalRef(env, assignment); + g_assignmentConstructor = (*env)->GetMethodID(env, g_assignmentClass, "", "()V"); + g_assignmentSetVgId = (*env)->GetMethodID(env, g_assignmentClass, "setVgId", "(I)V"); // int + g_assignmentSetCurrentOffset = (*env)->GetMethodID(env, g_assignmentClass, "setCurrentOffset", "(J)V"); // long + g_assignmentSetBegin = (*env)->GetMethodID(env, g_assignmentClass, "setBegin", "(J)V"); // long + g_assignmentSetEnd = (*env)->GetMethodID(env, g_assignmentClass, "setEnd", "(J)V"); // long + + (*env)->DeleteLocalRef(env, assignment); + + atomic_store_32(&__init_assignment, 2); + jniDebug("tmq method assignment finished"); +} + // deprecated void commit_cb(tmq_t *tmq, int32_t code, void *param) { JNIEnv *env = NULL; @@ -266,8 +305,9 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN tmq_commit_async(tmq, res, commit_cb, consumer); } -JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq, - jlong jres, jobject offset) { +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj, + jlong jtmq, jlong jres, + jobject offset) { tmqGlobalMethod(env); tmq_t *tmq = (tmq_t *)jtmq; if (tmq == NULL) { @@ -369,7 +409,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp( jniDebug("jobj:%p, conn:%p, resultset:%p, no data to retrieve", jobj, tscon, (void *)res); return JNI_FETCH_END; } else { - jniError("jobj:%p, conn:%p, query interrupted, tmq fetch block error code:%d, msg:%s", jobj, tscon, error_code, taos_errstr(tres)); + jniError("jobj:%p, conn:%p, query interrupted, tmq fetch block error code:%d, msg:%s", jobj, tscon, error_code, + taos_errstr(tres)); return JNI_RESULT_SET_NULL; } } @@ -399,3 +440,72 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp( (*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, jniFromNCharToByteArray(env, (char *)data, len)); return JNI_SUCCESS; } + +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv *env, jobject jobj, jlong jtmq, + jstring jtopic, jint partition, + jlong offset) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniDebug("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + if (jtopic == NULL) { + jniDebug("jobj:%p, topic is null", jobj); + return TMQ_TOPIC_NULL; + } + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + int32_t res = tmq_offset_seek(tmq, topicName, partition, offset); + + if (res != TSDB_CODE_SUCCESS) { + jniError("jobj:%p, tmq seek error, code:%d, msg:%s", jobj, res, tmq_err2str(res)); + } + + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); + return (jint)res; +} + +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *env, jobject jobj, + jlong jtmq, jstring jtopic, + jobject jarrayList) { + tmqAssignmentMethod(env); + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniDebug("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + if (jtopic == NULL) { + jniDebug("jobj:%p, topic is null", jobj); + return TMQ_TOPIC_NULL; + } + + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + tmq_topic_assignment *pAssign = NULL; + int32_t numOfAssignment = 0; + int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment); + + if (res != TSDB_CODE_SUCCESS) { + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); + jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res, + tmq_err2str(res)); + taosMemoryFree(pAssign); + return (jint)res; + } + + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); + + for (int i = 0; i < numOfAssignment; ++i) { + tmq_topic_assignment assignment = pAssign[i]; + jobject jassignment = (*env)->NewObject(env, g_assignmentClass, g_assignmentConstructor); + (*env)->CallVoidMethod(env, jassignment, g_assignmentSetVgId, assignment.vgId); + (*env)->CallVoidMethod(env, jassignment, g_assignmentSetCurrentOffset, assignment.currentOffset); + (*env)->CallVoidMethod(env, jassignment, g_assignmentSetBegin, assignment.begin); + (*env)->CallVoidMethod(env, jassignment, g_assignmentSetEnd, assignment.end); + (*env)->CallBooleanMethod(env, jarrayList, g_arrayListAddFp, jassignment); + } + taosMemoryFree(pAssign); + return JNI_SUCCESS; +} From 79031c47a2492cdc445c01cb49f3ab11a8360ccc Mon Sep 17 00:00:00 2001 From: huolibo Date: Thu, 25 May 2023 17:39:01 +0800 Subject: [PATCH 2/4] enh: add get offset --- .../client/jni/com_taosdata_jdbc_tmq_TMQConnector.h | 7 +++++++ source/client/src/clientTmqConnector.c | 13 +++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h index 094e947476..422bcd57ac 100644 --- a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h +++ b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h @@ -158,6 +158,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetVgroupId(JN */ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableName(JNIEnv *, jobject, jlong); +/* + * Class: com_taosdata_jdbc_tmq_TMQConnector + * Method: tmqGetOffset + * Signature: (J)Ljava/lang/String; + */ +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNIEnv *, jobject, jlong); + /* * Class: com_taosdata_jdbc_tmq_TMQConnector * Method: fetchBlockImp diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c index 5edaca48f8..6b0bfc09e8 100644 --- a/source/client/src/clientTmqConnector.c +++ b/source/client/src/clientTmqConnector.c @@ -390,6 +390,15 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableNam return (*env)->NewStringUTF(env, tmq_get_table_name(res)); } +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNIEnv *env, jobject jobj, jlong jres) { + TAOS_RES *res = (TAOS_RES *)jres; + if (res == NULL) { + jniDebug("jobj:%p, invalid res handle", jobj); + return NULL; + } + return tmq_get_vgroup_offset(res); +} + JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *env, jobject jobj, jlong con, jlong res, jobject rowobj, jobject arrayListObj) { @@ -491,7 +500,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign (*env)->ReleaseStringUTFChars(env, jtopic, topicName); jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res, tmq_err2str(res)); - taosMemoryFree(pAssign); + tmq_free_assignment(pAssign); return (jint)res; } @@ -506,6 +515,6 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign (*env)->CallVoidMethod(env, jassignment, g_assignmentSetEnd, assignment.end); (*env)->CallBooleanMethod(env, jarrayList, g_arrayListAddFp, jassignment); } - taosMemoryFree(pAssign); + tmq_free_assignment(pAssign); return JNI_SUCCESS; } From a70cceb440f431bcb6c96fdb41bd4b21e22d7aa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E8=93=93?= Date: Fri, 26 May 2023 11:14:30 +0800 Subject: [PATCH 3/4] fix: add return value --- source/client/src/clientTmqConnector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c index 6b0bfc09e8..08c802d967 100644 --- a/source/client/src/clientTmqConnector.c +++ b/source/client/src/clientTmqConnector.c @@ -394,7 +394,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNI TAOS_RES *res = (TAOS_RES *)jres; if (res == NULL) { jniDebug("jobj:%p, invalid res handle", jobj); - return NULL; + return -1; } return tmq_get_vgroup_offset(res); } From 2e5ac6b0c2007ec488cf6efde8f8c864fd6f1209 Mon Sep 17 00:00:00 2001 From: huolibo Date: Tue, 30 May 2023 11:33:46 +0800 Subject: [PATCH 4/4] fix: change return value --- source/client/src/clientTmqConnector.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c index 08c802d967..6ec82aa6ef 100644 --- a/source/client/src/clientTmqConnector.c +++ b/source/client/src/clientTmqConnector.c @@ -375,7 +375,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetVgroupId(JN TAOS_RES *res = (TAOS_RES *)jres; if (res == NULL) { jniDebug("jobj:%p, invalid res handle", jobj); - return -1; + return JNI_RESULT_SET_NULL; } return tmq_get_vgroup_id(res); } @@ -394,7 +394,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNI TAOS_RES *res = (TAOS_RES *)jres; if (res == NULL) { jniDebug("jobj:%p, invalid res handle", jobj); - return -1; + return JNI_RESULT_SET_NULL; } return tmq_get_vgroup_offset(res); }