enh(driver): tmq async commit callback (#20114)
This commit is contained in:
parent
90c6641cff
commit
3e9cc93b9f
|
@ -99,6 +99,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI
|
|||
*/
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *, jobject, jlong, jlong, jobject);
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *, jobject, jlong, jlong,
|
||||
jobject);
|
||||
|
||||
/*
|
||||
* Class: com_taosdata_jdbc_tmq_TMQConnector
|
||||
* Method: tmqUnsubscribeImp
|
||||
|
|
|
@ -56,6 +56,7 @@ jmethodID g_createConsumerErrorCallback;
|
|||
jmethodID g_topicListCallback;
|
||||
|
||||
jclass g_consumerClass;
|
||||
// deprecated
|
||||
jmethodID g_commitCallback;
|
||||
|
||||
void jniGetGlobalMethod(JNIEnv *env) {
|
||||
|
|
|
@ -17,6 +17,36 @@
|
|||
#include "jniCommon.h"
|
||||
#include "taos.h"
|
||||
|
||||
int __init_tmq = 0;
|
||||
jmethodID g_offsetCallback;
|
||||
|
||||
void tmqGlobalMethod(JNIEnv *env) {
|
||||
// make sure init function executed once
|
||||
switch (atomic_val_compare_exchange_32(&__init_tmq, 0, 1)) {
|
||||
case 0:
|
||||
break;
|
||||
case 1:
|
||||
do {
|
||||
taosMsleep(0);
|
||||
} while (atomic_load_32(&__init_tmq) == 1);
|
||||
case 2:
|
||||
return;
|
||||
}
|
||||
|
||||
if (g_vm == NULL) {
|
||||
(*env)->GetJavaVM(env, &g_vm);
|
||||
}
|
||||
|
||||
jclass offset = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/OffsetWaitCallback");
|
||||
jclass g_offsetCallbackClass = (*env)->NewGlobalRef(env, offset);
|
||||
g_offsetCallback = (*env)->GetMethodID(env, g_offsetCallbackClass, "commitCallbackHandler", "(I)V");
|
||||
(*env)->DeleteLocalRef(env, offset);
|
||||
|
||||
atomic_store_32(&__init_tmq, 2);
|
||||
jniDebug("tmq method register finished");
|
||||
}
|
||||
|
||||
// deprecated
|
||||
void commit_cb(tmq_t *tmq, int32_t code, void *param) {
|
||||
JNIEnv *env = NULL;
|
||||
int status = (*g_vm)->GetEnv(g_vm, (void **)&env, JNI_VERSION_1_6);
|
||||
|
@ -40,6 +70,28 @@ void commit_cb(tmq_t *tmq, int32_t code, void *param) {
|
|||
env = NULL;
|
||||
}
|
||||
|
||||
void consumer_callback(tmq_t *tmq, int32_t code, void *param) {
|
||||
JNIEnv *env = NULL;
|
||||
int status = (*g_vm)->GetEnv(g_vm, (void **)&env, JNI_VERSION_1_6);
|
||||
bool needDetach = false;
|
||||
if (status < 0) {
|
||||
if ((*g_vm)->AttachCurrentThread(g_vm, (void **)&env, NULL) != 0) {
|
||||
return;
|
||||
}
|
||||
needDetach = true;
|
||||
}
|
||||
|
||||
jobject obj = (jobject)param;
|
||||
(*env)->CallVoidMethod(env, obj, g_offsetCallback, code);
|
||||
(*env)->DeleteGlobalRef(env, obj);
|
||||
param = NULL;
|
||||
|
||||
if (needDetach) {
|
||||
(*g_vm)->DetachCurrentThread(g_vm);
|
||||
}
|
||||
env = NULL;
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqConfNewImp(JNIEnv *env, jobject jobj) {
|
||||
tmq_conf_t *conf = tmq_conf_new();
|
||||
jniGetGlobalMethod(env);
|
||||
|
@ -201,6 +253,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI
|
|||
return tmq_commit_sync(tmq, res);
|
||||
}
|
||||
|
||||
// deprecated
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
|
||||
jlong jres, jobject consumer) {
|
||||
tmq_t *tmq = (tmq_t *)jtmq;
|
||||
|
@ -213,6 +266,19 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
|
|||
tmq_commit_async(tmq, res, commit_cb, consumer);
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
|
||||
jlong jres, jobject offset) {
|
||||
tmqGlobalMethod(env);
|
||||
tmq_t *tmq = (tmq_t *)jtmq;
|
||||
if (tmq == NULL) {
|
||||
jniError("jobj:%p, tmq is closed", jobj);
|
||||
return;
|
||||
}
|
||||
TAOS_RES *res = (TAOS_RES *)jres;
|
||||
offset = (*env)->NewGlobalRef(env, offset);
|
||||
tmq_commit_async(tmq, res, consumer_callback, offset);
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj,
|
||||
jlong jtmq) {
|
||||
tmq_t *tmq = (tmq_t *)jtmq;
|
||||
|
|
Loading…
Reference in New Issue