From 56626f2e764a091c0839d420939982cfb74994d4 Mon Sep 17 00:00:00 2001 From: huolibo Date: Thu, 27 Jul 2023 16:52:08 +0800 Subject: [PATCH 1/2] feat(driver): add committed assignment API for jdbc --- .../jni/com_taosdata_jdbc_tmq_TMQConnector.h | 15 +++ source/client/src/clientTmqConnector.c | 108 +++++++++++++++++- 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h index 422bcd57ac..ebc4eacdf9 100644 --- a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h +++ b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h @@ -92,6 +92,10 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSubscriptionIm */ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNIEnv *, jobject, jlong, jlong); +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *, jobject, jlong); + +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *, jobject, jlong, jstring, + jint, jlong); /* * Class: com_taosdata_jdbc_tmq_TMQConnector * Method: tmqCommitAsync @@ -102,6 +106,12 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *, jobject, jlong, jlong, jobject); +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *, jobject, jlong, + jobject); + +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *, jobject, jlong, + jstring, jint, jlong, jobject); + /* * Class: com_taosdata_jdbc_tmq_TMQConnector * Method: tmqUnsubscribeImp @@ -179,6 +189,11 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong, jstring, jobject); +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *, jobject, jlong, jstring, + jint); + +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *, jobject, jlong, jstring, jint); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c index 6ec82aa6ef..487a86a589 100644 --- a/source/client/src/clientTmqConnector.c +++ b/source/client/src/clientTmqConnector.c @@ -291,6 +291,39 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI TAOS_RES *res = (TAOS_RES *)jres; return tmq_commit_sync(tmq, res); } +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *env, jobject jobj, jlong jtmq) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniError("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + return tmq_commit_sync(tmq, NULL); +} + +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *env, jobject jobj, + jlong jtmq, jstring jtopic, + jint vgId, jlong offset) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniDebug("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + if (jtopic == NULL) { + jniDebug("jobj:%p, topic is null", jobj); + return TMQ_TOPIC_NULL; + } + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + int code = tmq_commit_offset_sync(tmq, topicName, vgId, offset); + if (code != TSDB_CODE_SUCCESS) { + jniError("jobj:%p, tmq commit offset error, code:%d, msg:%s", jobj, code, tmq_err2str(code)); + } + + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); + return code; +} // deprecated JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq, @@ -319,6 +352,27 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsy tmq_commit_async(tmq, res, consumer_callback, offset); } +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *env, jobject jobj, + jlong jtmq, jobject offset) { + tmqGlobalMethod(env); + tmq_t *tmq = (tmq_t *)jtmq; + + offset = (*env)->NewGlobalRef(env, offset); + tmq_commit_async(tmq, NULL, consumer_callback, offset); +} + +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *env, jobject jobj, + jlong jtmq, jstring jtopic, + jint vgId, jlong offset, + jobject callback) { + tmqGlobalMethod(env); + tmq_t *tmq = (tmq_t *)jtmq; + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + callback = (*env)->NewGlobalRef(env, callback); + tmq_commit_offset_async(tmq, topicName, vgId, offset, consumer_callback, callback); +} + JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj, jlong jtmq) { tmq_t *tmq = (tmq_t *)jtmq; @@ -497,9 +551,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment); if (res != TSDB_CODE_SUCCESS) { - (*env)->ReleaseStringUTFChars(env, jtopic, topicName); jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res, tmq_err2str(res)); + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); tmq_free_assignment(pAssign); return (jint)res; } @@ -518,3 +572,55 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign tmq_free_assignment(pAssign); return JNI_SUCCESS; } + +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *env, jobject jobj, jlong jtmq, + jstring jtopic, jint vgId) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniDebug("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + if (jtopic == NULL) { + jniDebug("jobj:%p, topic is null", jobj); + return TMQ_TOPIC_NULL; + } + + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + int64_t offset = tmq_committed(tmq, topicName, vgId); + + if (offset < JNI_SUCCESS) { + jniError("jobj:%p, tmq get committed offset error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, + vgId, offset, tmq_err2str(offset)); + } + + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); + return (jlong)offset; +} + +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *env, jobject jobj, jlong jtmq, + jstring jtopic, jint vgId) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniDebug("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + if (jtopic == NULL) { + jniDebug("jobj:%p, topic is null", jobj); + return TMQ_TOPIC_NULL; + } + + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + int64_t offset = tmq_position(tmq, topicName, vgId); + + if (offset < JNI_SUCCESS) { + jniError("jobj:%p, tmq get position error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, vgId, + offset, tmq_err2str(offset)); + } + + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); + return (jlong)offset; +} \ No newline at end of file From ed70cd1f63fa876aa8c2523e348c262dd2883e40 Mon Sep 17 00:00:00 2001 From: huolibo Date: Thu, 24 Aug 2023 10:55:41 +0800 Subject: [PATCH 2/2] fix: jni error log --- source/client/src/clientTmqConnector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c index 487a86a589..2bea738c23 100644 --- a/source/client/src/clientTmqConnector.c +++ b/source/client/src/clientTmqConnector.c @@ -590,7 +590,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp( int64_t offset = tmq_committed(tmq, topicName, vgId); - if (offset < JNI_SUCCESS) { + if (offset < JNI_SUCCESS && offset != -2147467247) { jniError("jobj:%p, tmq get committed offset error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, vgId, offset, tmq_err2str(offset)); }