diff --git a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h index 422bcd57ac..ebc4eacdf9 100644 --- a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h +++ b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h @@ -92,6 +92,10 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSubscriptionIm */ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNIEnv *, jobject, jlong, jlong); +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *, jobject, jlong); + +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *, jobject, jlong, jstring, + jint, jlong); /* * Class: com_taosdata_jdbc_tmq_TMQConnector * Method: tmqCommitAsync @@ -102,6 +106,12 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *, jobject, jlong, jlong, jobject); +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *, jobject, jlong, + jobject); + +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *, jobject, jlong, + jstring, jint, jlong, jobject); + /* * Class: com_taosdata_jdbc_tmq_TMQConnector * Method: tmqUnsubscribeImp @@ -179,6 +189,11 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong, jstring, jobject); +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *, jobject, jlong, jstring, + jint); + +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *, jobject, jlong, jstring, jint); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c index 6ec82aa6ef..2bea738c23 100644 --- a/source/client/src/clientTmqConnector.c +++ b/source/client/src/clientTmqConnector.c @@ -291,6 +291,39 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI TAOS_RES *res = (TAOS_RES *)jres; return tmq_commit_sync(tmq, res); } +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *env, jobject jobj, jlong jtmq) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniError("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + return tmq_commit_sync(tmq, NULL); +} + +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *env, jobject jobj, + jlong jtmq, jstring jtopic, + jint vgId, jlong offset) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniDebug("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + if (jtopic == NULL) { + jniDebug("jobj:%p, topic is null", jobj); + return TMQ_TOPIC_NULL; + } + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + int code = tmq_commit_offset_sync(tmq, topicName, vgId, offset); + if (code != TSDB_CODE_SUCCESS) { + jniError("jobj:%p, tmq commit offset error, code:%d, msg:%s", jobj, code, tmq_err2str(code)); + } + + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); + return code; +} // deprecated JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq, @@ -319,6 +352,27 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsy tmq_commit_async(tmq, res, consumer_callback, offset); } +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *env, jobject jobj, + jlong jtmq, jobject offset) { + tmqGlobalMethod(env); + tmq_t *tmq = (tmq_t *)jtmq; + + offset = (*env)->NewGlobalRef(env, offset); + tmq_commit_async(tmq, NULL, consumer_callback, offset); +} + +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *env, jobject jobj, + jlong jtmq, jstring jtopic, + jint vgId, jlong offset, + jobject callback) { + tmqGlobalMethod(env); + tmq_t *tmq = (tmq_t *)jtmq; + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + callback = (*env)->NewGlobalRef(env, callback); + tmq_commit_offset_async(tmq, topicName, vgId, offset, consumer_callback, callback); +} + JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj, jlong jtmq) { tmq_t *tmq = (tmq_t *)jtmq; @@ -497,9 +551,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment); if (res != TSDB_CODE_SUCCESS) { - (*env)->ReleaseStringUTFChars(env, jtopic, topicName); jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res, tmq_err2str(res)); + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); tmq_free_assignment(pAssign); return (jint)res; } @@ -518,3 +572,55 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign tmq_free_assignment(pAssign); return JNI_SUCCESS; } + +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *env, jobject jobj, jlong jtmq, + jstring jtopic, jint vgId) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniDebug("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + if (jtopic == NULL) { + jniDebug("jobj:%p, topic is null", jobj); + return TMQ_TOPIC_NULL; + } + + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + int64_t offset = tmq_committed(tmq, topicName, vgId); + + if (offset < JNI_SUCCESS && offset != -2147467247) { + jniError("jobj:%p, tmq get committed offset error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, + vgId, offset, tmq_err2str(offset)); + } + + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); + return (jlong)offset; +} + +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *env, jobject jobj, jlong jtmq, + jstring jtopic, jint vgId) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniDebug("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + if (jtopic == NULL) { + jniDebug("jobj:%p, topic is null", jobj); + return TMQ_TOPIC_NULL; + } + + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + int64_t offset = tmq_position(tmq, topicName, vgId); + + if (offset < JNI_SUCCESS) { + jniError("jobj:%p, tmq get position error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, vgId, + offset, tmq_err2str(offset)); + } + + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); + return (jlong)offset; +} \ No newline at end of file