Merge pull request #22545 from taosdata/feat/3.0/TS-3701
feat(driver): add committed assignment API for jdbc
This commit is contained in:
commit
d6019c1e36
|
@ -92,6 +92,10 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSubscriptionIm
|
|||
*/
|
||||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNIEnv *, jobject, jlong, jlong);
|
||||
|
||||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *, jobject, jlong);
|
||||
|
||||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *, jobject, jlong, jstring,
|
||||
jint, jlong);
|
||||
/*
|
||||
* Class: com_taosdata_jdbc_tmq_TMQConnector
|
||||
* Method: tmqCommitAsync
|
||||
|
@ -102,6 +106,12 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
|
|||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *, jobject, jlong, jlong,
|
||||
jobject);
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *, jobject, jlong,
|
||||
jobject);
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *, jobject, jlong,
|
||||
jstring, jint, jlong, jobject);
|
||||
|
||||
/*
|
||||
* Class: com_taosdata_jdbc_tmq_TMQConnector
|
||||
* Method: tmqUnsubscribeImp
|
||||
|
@ -179,6 +189,11 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv
|
|||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong,
|
||||
jstring, jobject);
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *, jobject, jlong, jstring,
|
||||
jint);
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *, jobject, jlong, jstring, jint);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -291,6 +291,39 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI
|
|||
TAOS_RES *res = (TAOS_RES *)jres;
|
||||
return tmq_commit_sync(tmq, res);
|
||||
}
|
||||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *env, jobject jobj, jlong jtmq) {
|
||||
tmq_t *tmq = (tmq_t *)jtmq;
|
||||
if (tmq == NULL) {
|
||||
jniError("jobj:%p, tmq is closed", jobj);
|
||||
return TMQ_CONSUMER_NULL;
|
||||
}
|
||||
|
||||
return tmq_commit_sync(tmq, NULL);
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *env, jobject jobj,
|
||||
jlong jtmq, jstring jtopic,
|
||||
jint vgId, jlong offset) {
|
||||
tmq_t *tmq = (tmq_t *)jtmq;
|
||||
if (tmq == NULL) {
|
||||
jniDebug("jobj:%p, tmq is closed", jobj);
|
||||
return TMQ_CONSUMER_NULL;
|
||||
}
|
||||
|
||||
if (jtopic == NULL) {
|
||||
jniDebug("jobj:%p, topic is null", jobj);
|
||||
return TMQ_TOPIC_NULL;
|
||||
}
|
||||
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
|
||||
|
||||
int code = tmq_commit_offset_sync(tmq, topicName, vgId, offset);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
jniError("jobj:%p, tmq commit offset error, code:%d, msg:%s", jobj, code, tmq_err2str(code));
|
||||
}
|
||||
|
||||
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
|
||||
return code;
|
||||
}
|
||||
|
||||
// deprecated
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
|
||||
|
@ -319,6 +352,27 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsy
|
|||
tmq_commit_async(tmq, res, consumer_callback, offset);
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *env, jobject jobj,
|
||||
jlong jtmq, jobject offset) {
|
||||
tmqGlobalMethod(env);
|
||||
tmq_t *tmq = (tmq_t *)jtmq;
|
||||
|
||||
offset = (*env)->NewGlobalRef(env, offset);
|
||||
tmq_commit_async(tmq, NULL, consumer_callback, offset);
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *env, jobject jobj,
|
||||
jlong jtmq, jstring jtopic,
|
||||
jint vgId, jlong offset,
|
||||
jobject callback) {
|
||||
tmqGlobalMethod(env);
|
||||
tmq_t *tmq = (tmq_t *)jtmq;
|
||||
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
|
||||
|
||||
callback = (*env)->NewGlobalRef(env, callback);
|
||||
tmq_commit_offset_async(tmq, topicName, vgId, offset, consumer_callback, callback);
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj,
|
||||
jlong jtmq) {
|
||||
tmq_t *tmq = (tmq_t *)jtmq;
|
||||
|
@ -497,9 +551,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign
|
|||
int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment);
|
||||
|
||||
if (res != TSDB_CODE_SUCCESS) {
|
||||
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
|
||||
jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res,
|
||||
tmq_err2str(res));
|
||||
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
|
||||
tmq_free_assignment(pAssign);
|
||||
return (jint)res;
|
||||
}
|
||||
|
@ -518,3 +572,55 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign
|
|||
tmq_free_assignment(pAssign);
|
||||
return JNI_SUCCESS;
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *env, jobject jobj, jlong jtmq,
|
||||
jstring jtopic, jint vgId) {
|
||||
tmq_t *tmq = (tmq_t *)jtmq;
|
||||
if (tmq == NULL) {
|
||||
jniDebug("jobj:%p, tmq is closed", jobj);
|
||||
return TMQ_CONSUMER_NULL;
|
||||
}
|
||||
|
||||
if (jtopic == NULL) {
|
||||
jniDebug("jobj:%p, topic is null", jobj);
|
||||
return TMQ_TOPIC_NULL;
|
||||
}
|
||||
|
||||
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
|
||||
|
||||
int64_t offset = tmq_committed(tmq, topicName, vgId);
|
||||
|
||||
if (offset < JNI_SUCCESS && offset != -2147467247) {
|
||||
jniError("jobj:%p, tmq get committed offset error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName,
|
||||
vgId, offset, tmq_err2str(offset));
|
||||
}
|
||||
|
||||
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
|
||||
return (jlong)offset;
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *env, jobject jobj, jlong jtmq,
|
||||
jstring jtopic, jint vgId) {
|
||||
tmq_t *tmq = (tmq_t *)jtmq;
|
||||
if (tmq == NULL) {
|
||||
jniDebug("jobj:%p, tmq is closed", jobj);
|
||||
return TMQ_CONSUMER_NULL;
|
||||
}
|
||||
|
||||
if (jtopic == NULL) {
|
||||
jniDebug("jobj:%p, topic is null", jobj);
|
||||
return TMQ_TOPIC_NULL;
|
||||
}
|
||||
|
||||
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
|
||||
|
||||
int64_t offset = tmq_position(tmq, topicName, vgId);
|
||||
|
||||
if (offset < JNI_SUCCESS) {
|
||||
jniError("jobj:%p, tmq get position error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, vgId,
|
||||
offset, tmq_err2str(offset));
|
||||
}
|
||||
|
||||
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
|
||||
return (jlong)offset;
|
||||
}
|
Loading…
Reference in New Issue