From ff57f57c57ea0191a9da4cb0aeaaea3c68bbe13a Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 28 Jan 2022 17:19:12 +0800 Subject: [PATCH] refactor tmq --- include/client/taos.h | 63 ++++++++++++------ source/client/src/tmq.c | 144 ++++++++++++++++++++++------------------ 2 files changed, 121 insertions(+), 86 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 3db9046119..029aad8715 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -92,17 +92,6 @@ typedef struct taosField { typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code); -typedef struct tmq_t tmq_t; -typedef struct tmq_conf_t tmq_conf_t; -typedef struct tmq_list_t tmq_list_t; - -typedef struct tmq_message_t tmq_message_t; -typedef struct tmq_message_topic_t tmq_message_topic_t; -typedef struct tmq_message_tb_t tmq_message_tb_t; -typedef struct tmq_tb_iter_t tmq_tb_iter_t; -typedef struct tmq_message_col_t tmq_message_col_t; -typedef struct tmq_col_iter_t tmq_col_iter_t; - typedef struct TAOS_BIND { int buffer_type; void * buffer; @@ -205,27 +194,59 @@ DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); /* --------------------------TMQ INTERFACE------------------------------- */ -typedef struct tmq_resp_err_t tmq_resp_err_t; + +enum tmq_resp_err_t { + TMQ_RESP_ERR__SUCCESS = 0, + TMQ_RESP_ERR__FAIL = 1, +}; + +typedef enum tmq_resp_err_t tmq_resp_err_t; + +typedef struct tmq_t tmq_t; typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t; typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; +typedef struct tmq_conf_t tmq_conf_t; +typedef struct tmq_list_t tmq_list_t; +typedef struct tmq_message_t tmq_message_t; + typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param)); DLL_EXPORT tmq_list_t* tmq_list_new(); DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*); -DLL_EXPORT tmq_conf_t* tmq_conf_new(); - -DLL_EXPORT int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value); -DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb); - DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen); - DLL_EXPORT tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen); -DLL_EXPORT TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list); - +/* ------------------------TMQ CONSUMER INTERFACE------------------------ */ +DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list); +#if 0 +DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq); +DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics); +#endif DLL_EXPORT tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time); -DLL_EXPORT tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async); +#if 0 +DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t* tmq); +DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups); +DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups); +#endif +DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async); +#if 0 +DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); +#endif +/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ + +enum tmq_conf_res_t { + TMQ_CONF_UNKNOWN = -2, + TMQ_CONF_INVALID = -1, + TMQ_CONF_OK = 0, +}; + +typedef enum tmq_conf_res_t tmq_conf_res_t; + +DLL_EXPORT tmq_conf_t* tmq_conf_new(); +DLL_EXPORT void tmq_conf_destroy(tmq_conf_t* conf); +DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value); +DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb); #ifdef __cplusplus } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 632b1dddb4..a4af40285e 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -26,6 +26,51 @@ #include "tpagedfile.h" #include "tref.h" +struct tmq_list_t { + int32_t cnt; + int32_t tot; + char* elems[]; +}; +struct tmq_topic_vgroup_t { + char* topic; + int32_t vgId; + int64_t commitOffset; +}; + +struct tmq_topic_vgroup_list_t { + int32_t cnt; + int32_t size; + tmq_topic_vgroup_t* elems; +}; + +struct tmq_conf_t { + char clientId[256]; + char groupId[256]; + /*char* ip;*/ + /*uint16_t port;*/ + tmq_commit_cb* commit_cb; +}; + +struct tmq_t { + char groupId[256]; + char clientId[256]; + SRWLatch lock; + int64_t consumerId; + int64_t epoch; + int64_t status; + tsem_t rspSem; + STscObj* pTscObj; + tmq_commit_cb* commit_cb; + int32_t nextTopicIdx; + SArray* clientTopics; //SArray + //stat + int64_t pollCnt; +}; + +struct tmq_message_t { + SMqConsumeRsp rsp; +}; + typedef struct SMqClientVg { // statistics int64_t pollCnt; @@ -47,83 +92,43 @@ typedef struct SMqClientTopic { SArray* vgs; //SArray } SMqClientTopic; + typedef struct SMqAskEpCbParam { tmq_t* tmq; int32_t wait; } SMqAskEpCbParam; -struct tmq_resp_err_t { - int32_t code; -}; - -struct tmq_topic_vgroup_t { - char* topic; - int32_t vgId; - int64_t commitOffset; -}; - -struct tmq_topic_vgroup_list_t { - int32_t cnt; - int32_t size; - tmq_topic_vgroup_t* elems; -}; - typedef struct SMqConsumeCbParam { tmq_t* tmq; SMqClientVg* pVg; tmq_message_t** retMsg; } SMqConsumeCbParam; -struct tmq_conf_t { - char clientId[256]; - char groupId[256]; - char* ip; - uint16_t port; - tmq_commit_cb* commit_cb; -}; - -struct tmq_message_t { - SMqConsumeRsp rsp; -}; - +typedef struct SMqSubscribeCbParam { + tmq_t* tmq; + tsem_t rspSem; + tmq_resp_err_t rspErr; +} SMqSubscribeCbParam; tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); return conf; } -int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { +void tmq_conf_destroy(tmq_conf_t* conf) { + if(conf) free(conf); +} + +tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { if (strcmp(key, "group.id") == 0) { strcpy(conf->groupId, value); } if (strcmp(key, "client.id") == 0) { strcpy(conf->clientId, value); } - return 0; + return TMQ_CONF_OK; } -struct tmq_t { - char groupId[256]; - char clientId[256]; - SRWLatch lock; - int64_t consumerId; - int64_t epoch; - int64_t status; - tsem_t rspSem; - STscObj* pTscObj; - tmq_commit_cb* commit_cb; - int32_t nextTopicIdx; - SArray* clientTopics; //SArray - //stat - int64_t pollCnt; -}; - -struct tmq_list_t { - int32_t cnt; - int32_t tot; - char* elems[]; -}; - tmq_list_t* tmq_list_new() { tmq_list_t *ptr = malloc(sizeof(tmq_list_t) + 8 * sizeof(char*)); if (ptr == NULL) { @@ -141,6 +146,12 @@ int32_t tmq_list_append(tmq_list_t* ptr, char* src) { return 0; } +int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { + SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; + pParam->rspErr = code; + tsem_post(&pParam->rspSem); + return 0; +} tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* pTmq = calloc(sizeof(tmq_t), 1); @@ -161,7 +172,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs return pTmq; } -TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { +tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { SRequestObj *pRequest = NULL; int32_t sz = topic_list->cnt; //destroy ex @@ -219,27 +230,31 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tscError("failed to malloc sqlObj"); } + SMqSubscribeCbParam param = { + .rspErr = TMQ_RESP_ERR__SUCCESS, + .tmq = tmq + }; + tsem_init(¶m.rspSem, 0, 0); + pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - /*sendInfo->fp*/ + sendInfo->param = ¶m; + sendInfo->fp = tmqSubscribeCb; SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); - tsem_wait(&pRequest->body.rspSem); + tsem_wait(¶m.rspSem); + tsem_destroy(¶m.rspSem); _return: /*if (sendInfo != NULL) {*/ /*destroySendMsgInfo(sendInfo);*/ /*}*/ - if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { - pRequest->code = terrno; - } - - return pRequest; + return param.rspErr; } void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { @@ -611,10 +626,10 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* tmq_message = NULL; int64_t status = atomic_load_64(&tmq->status); - tmqAsyncAskEp(tmq, status == 0 || taosArrayGetSize(tmq->clientTopics)); + tmqAsyncAskEp(tmq, taosArrayGetSize(tmq->clientTopics)); /*if (blocking_time < 0) blocking_time = 500;*/ - blocking_time = 1000; + blocking_time = 1; if (taosArrayGetSize(tmq->clientTopics) == 0) { tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); @@ -674,16 +689,15 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { /*return pRequest;*/ } -tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { +tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { SMqConsumeReq req = {0}; - return NULL; + return 0; } void tmq_message_destroy(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; } - static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { assert(pMsgBody != NULL); tfree(pMsgBody->msgInfo.pData);