Merge pull request #21279 from taosdata/feat/TD-24012
feat(driver): jdbc add tmq seek function
This commit is contained in:
commit
d4bd19068b
|
@ -158,6 +158,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetVgroupId(JN
|
||||||
*/
|
*/
|
||||||
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableName(JNIEnv *, jobject, jlong);
|
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableName(JNIEnv *, jobject, jlong);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Class: com_taosdata_jdbc_tmq_TMQConnector
|
||||||
|
* Method: tmqGetOffset
|
||||||
|
* Signature: (J)Ljava/lang/String;
|
||||||
|
*/
|
||||||
|
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNIEnv *, jobject, jlong);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Class: com_taosdata_jdbc_tmq_TMQConnector
|
* Class: com_taosdata_jdbc_tmq_TMQConnector
|
||||||
* Method: fetchBlockImp
|
* Method: fetchBlockImp
|
||||||
|
@ -166,6 +173,12 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableNam
|
||||||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *, jobject, jlong, jlong,
|
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *, jobject, jlong, jlong,
|
||||||
jobject, jobject);
|
jobject, jobject);
|
||||||
|
|
||||||
|
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv *, jobject, jlong, jstring, jint,
|
||||||
|
jlong);
|
||||||
|
|
||||||
|
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong,
|
||||||
|
jstring, jobject);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -20,6 +20,13 @@
|
||||||
int __init_tmq = 0;
|
int __init_tmq = 0;
|
||||||
jmethodID g_offsetCallback;
|
jmethodID g_offsetCallback;
|
||||||
|
|
||||||
|
jclass g_assignmentClass;
|
||||||
|
jmethodID g_assignmentConstructor;
|
||||||
|
jmethodID g_assignmentSetVgId;
|
||||||
|
jmethodID g_assignmentSetCurrentOffset;
|
||||||
|
jmethodID g_assignmentSetBegin;
|
||||||
|
jmethodID g_assignmentSetEnd;
|
||||||
|
|
||||||
void tmqGlobalMethod(JNIEnv *env) {
|
void tmqGlobalMethod(JNIEnv *env) {
|
||||||
// make sure init function executed once
|
// make sure init function executed once
|
||||||
switch (atomic_val_compare_exchange_32(&__init_tmq, 0, 1)) {
|
switch (atomic_val_compare_exchange_32(&__init_tmq, 0, 1)) {
|
||||||
|
@ -46,6 +53,38 @@ void tmqGlobalMethod(JNIEnv *env) {
|
||||||
jniDebug("tmq method register finished");
|
jniDebug("tmq method register finished");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int __init_assignment = 0;
|
||||||
|
void tmqAssignmentMethod(JNIEnv *env) {
|
||||||
|
// make sure init function executed once
|
||||||
|
switch (atomic_val_compare_exchange_32(&__init_assignment, 0, 1)) {
|
||||||
|
case 0:
|
||||||
|
break;
|
||||||
|
case 1:
|
||||||
|
do {
|
||||||
|
taosMsleep(0);
|
||||||
|
} while (atomic_load_32(&__init_assignment) == 1);
|
||||||
|
case 2:
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (g_vm == NULL) {
|
||||||
|
(*env)->GetJavaVM(env, &g_vm);
|
||||||
|
}
|
||||||
|
|
||||||
|
jclass assignment = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/Assignment");
|
||||||
|
g_assignmentClass = (*env)->NewGlobalRef(env, assignment);
|
||||||
|
g_assignmentConstructor = (*env)->GetMethodID(env, g_assignmentClass, "<init>", "()V");
|
||||||
|
g_assignmentSetVgId = (*env)->GetMethodID(env, g_assignmentClass, "setVgId", "(I)V"); // int
|
||||||
|
g_assignmentSetCurrentOffset = (*env)->GetMethodID(env, g_assignmentClass, "setCurrentOffset", "(J)V"); // long
|
||||||
|
g_assignmentSetBegin = (*env)->GetMethodID(env, g_assignmentClass, "setBegin", "(J)V"); // long
|
||||||
|
g_assignmentSetEnd = (*env)->GetMethodID(env, g_assignmentClass, "setEnd", "(J)V"); // long
|
||||||
|
|
||||||
|
(*env)->DeleteLocalRef(env, assignment);
|
||||||
|
|
||||||
|
atomic_store_32(&__init_assignment, 2);
|
||||||
|
jniDebug("tmq method assignment finished");
|
||||||
|
}
|
||||||
|
|
||||||
// deprecated
|
// deprecated
|
||||||
void commit_cb(tmq_t *tmq, int32_t code, void *param) {
|
void commit_cb(tmq_t *tmq, int32_t code, void *param) {
|
||||||
JNIEnv *env = NULL;
|
JNIEnv *env = NULL;
|
||||||
|
@ -266,8 +305,9 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
|
||||||
tmq_commit_async(tmq, res, commit_cb, consumer);
|
tmq_commit_async(tmq, res, commit_cb, consumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
|
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj,
|
||||||
jlong jres, jobject offset) {
|
jlong jtmq, jlong jres,
|
||||||
|
jobject offset) {
|
||||||
tmqGlobalMethod(env);
|
tmqGlobalMethod(env);
|
||||||
tmq_t *tmq = (tmq_t *)jtmq;
|
tmq_t *tmq = (tmq_t *)jtmq;
|
||||||
if (tmq == NULL) {
|
if (tmq == NULL) {
|
||||||
|
@ -335,7 +375,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetVgroupId(JN
|
||||||
TAOS_RES *res = (TAOS_RES *)jres;
|
TAOS_RES *res = (TAOS_RES *)jres;
|
||||||
if (res == NULL) {
|
if (res == NULL) {
|
||||||
jniDebug("jobj:%p, invalid res handle", jobj);
|
jniDebug("jobj:%p, invalid res handle", jobj);
|
||||||
return -1;
|
return JNI_RESULT_SET_NULL;
|
||||||
}
|
}
|
||||||
return tmq_get_vgroup_id(res);
|
return tmq_get_vgroup_id(res);
|
||||||
}
|
}
|
||||||
|
@ -350,6 +390,15 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableNam
|
||||||
return (*env)->NewStringUTF(env, tmq_get_table_name(res));
|
return (*env)->NewStringUTF(env, tmq_get_table_name(res));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNIEnv *env, jobject jobj, jlong jres) {
|
||||||
|
TAOS_RES *res = (TAOS_RES *)jres;
|
||||||
|
if (res == NULL) {
|
||||||
|
jniDebug("jobj:%p, invalid res handle", jobj);
|
||||||
|
return JNI_RESULT_SET_NULL;
|
||||||
|
}
|
||||||
|
return tmq_get_vgroup_offset(res);
|
||||||
|
}
|
||||||
|
|
||||||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *env, jobject jobj, jlong con,
|
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *env, jobject jobj, jlong con,
|
||||||
jlong res, jobject rowobj,
|
jlong res, jobject rowobj,
|
||||||
jobject arrayListObj) {
|
jobject arrayListObj) {
|
||||||
|
@ -369,7 +418,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(
|
||||||
jniDebug("jobj:%p, conn:%p, resultset:%p, no data to retrieve", jobj, tscon, (void *)res);
|
jniDebug("jobj:%p, conn:%p, resultset:%p, no data to retrieve", jobj, tscon, (void *)res);
|
||||||
return JNI_FETCH_END;
|
return JNI_FETCH_END;
|
||||||
} else {
|
} else {
|
||||||
jniError("jobj:%p, conn:%p, query interrupted, tmq fetch block error code:%d, msg:%s", jobj, tscon, error_code, taos_errstr(tres));
|
jniError("jobj:%p, conn:%p, query interrupted, tmq fetch block error code:%d, msg:%s", jobj, tscon, error_code,
|
||||||
|
taos_errstr(tres));
|
||||||
return JNI_RESULT_SET_NULL;
|
return JNI_RESULT_SET_NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -399,3 +449,72 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(
|
||||||
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, jniFromNCharToByteArray(env, (char *)data, len));
|
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, jniFromNCharToByteArray(env, (char *)data, len));
|
||||||
return JNI_SUCCESS;
|
return JNI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv *env, jobject jobj, jlong jtmq,
|
||||||
|
jstring jtopic, jint partition,
|
||||||
|
jlong offset) {
|
||||||
|
tmq_t *tmq = (tmq_t *)jtmq;
|
||||||
|
if (tmq == NULL) {
|
||||||
|
jniDebug("jobj:%p, tmq is closed", jobj);
|
||||||
|
return TMQ_CONSUMER_NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (jtopic == NULL) {
|
||||||
|
jniDebug("jobj:%p, topic is null", jobj);
|
||||||
|
return TMQ_TOPIC_NULL;
|
||||||
|
}
|
||||||
|
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
|
||||||
|
|
||||||
|
int32_t res = tmq_offset_seek(tmq, topicName, partition, offset);
|
||||||
|
|
||||||
|
if (res != TSDB_CODE_SUCCESS) {
|
||||||
|
jniError("jobj:%p, tmq seek error, code:%d, msg:%s", jobj, res, tmq_err2str(res));
|
||||||
|
}
|
||||||
|
|
||||||
|
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
|
||||||
|
return (jint)res;
|
||||||
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *env, jobject jobj,
|
||||||
|
jlong jtmq, jstring jtopic,
|
||||||
|
jobject jarrayList) {
|
||||||
|
tmqAssignmentMethod(env);
|
||||||
|
tmq_t *tmq = (tmq_t *)jtmq;
|
||||||
|
if (tmq == NULL) {
|
||||||
|
jniDebug("jobj:%p, tmq is closed", jobj);
|
||||||
|
return TMQ_CONSUMER_NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (jtopic == NULL) {
|
||||||
|
jniDebug("jobj:%p, topic is null", jobj);
|
||||||
|
return TMQ_TOPIC_NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
|
||||||
|
|
||||||
|
tmq_topic_assignment *pAssign = NULL;
|
||||||
|
int32_t numOfAssignment = 0;
|
||||||
|
int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment);
|
||||||
|
|
||||||
|
if (res != TSDB_CODE_SUCCESS) {
|
||||||
|
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
|
||||||
|
jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res,
|
||||||
|
tmq_err2str(res));
|
||||||
|
tmq_free_assignment(pAssign);
|
||||||
|
return (jint)res;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
|
||||||
|
|
||||||
|
for (int i = 0; i < numOfAssignment; ++i) {
|
||||||
|
tmq_topic_assignment assignment = pAssign[i];
|
||||||
|
jobject jassignment = (*env)->NewObject(env, g_assignmentClass, g_assignmentConstructor);
|
||||||
|
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetVgId, assignment.vgId);
|
||||||
|
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetCurrentOffset, assignment.currentOffset);
|
||||||
|
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetBegin, assignment.begin);
|
||||||
|
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetEnd, assignment.end);
|
||||||
|
(*env)->CallBooleanMethod(env, jarrayList, g_arrayListAddFp, jassignment);
|
||||||
|
}
|
||||||
|
tmq_free_assignment(pAssign);
|
||||||
|
return JNI_SUCCESS;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue