diff --git a/example/src/tmq.c b/example/src/tmq.c index b4013f26ee..b79d21d051 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -167,7 +167,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "td.connect.pass", "taosdata"); /*tmq_conf_set(conf, "td.connect.db", "abc1");*/ tmq_conf_set(conf, "msg.with.table.name", "true"); - tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); assert(tmq); return tmq; @@ -176,6 +176,7 @@ tmq_t* build_consumer() { tmq_list_t* build_topic_list() { tmq_list_t* topic_list = tmq_list_new(); tmq_list_append(topic_list, "topic_ctb_column"); + /*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/ return topic_list; } @@ -190,7 +191,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { int32_t cnt = 0; /*clock_t startTime = clock();*/ while (running) { - TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 500); + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 0); if (tmqmessage) { cnt++; /*printf("get data\n");*/ diff --git a/include/client/taos.h b/include/client/taos.h index 486d5f5fef..0194357841 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -233,6 +233,8 @@ DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async); +DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param); +DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets); #if 0 DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset); @@ -251,7 +253,7 @@ typedef enum tmq_conf_res_t tmq_conf_res_t; DLL_EXPORT tmq_conf_t *tmq_conf_new(); DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); -DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); +DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 2c3f64c5c1..bc80a01c27 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -182,10 +182,13 @@ typedef struct { typedef struct { tmq_t* tmq; - int32_t async; + int8_t async; + int8_t automatic; + tmq_commit_cb* userCb; tsem_t rspSem; tmq_resp_err_t rspErr; SArray* offsets; + void* userParam; } SMqCommitCbParam; tmq_conf_t* tmq_conf_new() { @@ -314,6 +317,135 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { return sprintf(dst, "%s:%d", topicName, vg); } +int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { + SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; + pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; + if (pParam->async) { + if (pParam->automatic && pParam->tmq->commitCb) { + pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets, + pParam->tmq->commitCbUserParam); + } else if (!pParam->automatic && pParam->userCb) { + pParam->userCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets, pParam->userParam); + } + + if (pParam->offsets) { + taosArrayDestroy(pParam->offsets); + } + + taosMemoryFree(pParam); + } else { + tsem_post(&pParam->rspSem); + } + return 0; +} + +int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async, + tmq_commit_cb* userCb, void* userParam) { + SMqCMCommitOffsetReq req; + SArray* pOffsets = NULL; + void* buf = NULL; + SMqCommitCbParam* pParam = NULL; + SMsgSendInfo* sendInfo = NULL; + + if (offsets == NULL) { + pOffsets = taosArrayInit(0, sizeof(SMqOffset)); + for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + SMqOffset offset; + tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN); + tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN); + offset.vgId = pVg->vgId; + offset.offset = pVg->currentOffset; + taosArrayPush(pOffsets, &offset); + } + } + } else { + pOffsets = (SArray*)&offsets->container; + } + + req.num = (int32_t)pOffsets->size; + req.offsets = pOffsets->pData; + + int32_t code; + int32_t tlen = 0; + tEncodeSize(tEncodeSMqCMCommitOffsetReq, &req, tlen, code); + if (code < 0) { + goto END; + } + code = -1; + + buf = taosMemoryMalloc(tlen); + if (buf == NULL) { + goto END; + } + SEncoder encoder; + tEncoderInit(&encoder, buf, tlen); + tEncodeSMqCMCommitOffsetReq(&encoder, &req); + tEncoderClear(&encoder); + + pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam)); + if (pParam == NULL) { + goto END; + } + pParam->tmq = tmq; + pParam->automatic = automatic; + pParam->async = async; + pParam->offsets = pOffsets; + pParam->userCb = userCb; + pParam->userParam = userParam; + if (!async) tsem_init(&pParam->rspSem, 0, 0); + + sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); + if (sendInfo == NULL) goto END; + sendInfo->msgInfo = (SDataBuf){ + .pData = buf, + .len = tlen, + .handle = NULL, + }; + + sendInfo->requestId = generateRequestId(); + sendInfo->requestObjRefId = 0; + sendInfo->param = pParam; + sendInfo->fp = tmqCommitCb; + sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET; + + SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); + + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + + if (!async) { + tsem_wait(&pParam->rspSem); + code = pParam->rspErr; + tsem_destroy(&pParam->rspSem); + taosMemoryFree(pParam); + } + + // avoid double free if msg is sent + buf = NULL; + + code = 0; +END: + if (buf) taosMemoryFree(buf); + if (pParam) taosMemoryFree(pParam); + if (sendInfo) taosMemoryFree(sendInfo); + + if (code != 0 && async) { + if (automatic) { + tmq->commitCb(tmq, TMQ_RESP_ERR__FAIL, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam); + } else { + userCb(tmq, TMQ_RESP_ERR__FAIL, (tmq_topic_vgroup_list_t*)pOffsets, userParam); + } + } + + if (offsets == NULL) { + taosArrayDestroy(pOffsets); + } + return code; +} + void tmqAssignDelayedHbTask(void* param, void* tmrId) { tmq_t* tmq = (tmq_t*)param; int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); @@ -350,7 +482,8 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { tmqAskEp(tmq, true); taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { - tmq_commit(tmq, NULL, true); + /*tmq_commit(tmq, NULL, true);*/ + tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam); taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { } else { @@ -385,32 +518,11 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; pParam->rspErr = code; - tmq_t* tmq = pParam->tmq; + /*tmq_t* tmq = pParam->tmq;*/ tsem_post(&pParam->rspSem); return 0; } -int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { - SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; - pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; - if (pParam->tmq->commitCb) { - pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, NULL, pParam->tmq->commitCbUserParam); - } - if (!pParam->async) - tsem_post(&pParam->rspSem); - else { - if (pParam->offsets) { - taosArrayDestroy(pParam->offsets); - } - tsem_destroy(&pParam->rspSem); - /*if (pParam->pArray) {*/ - /*taosArrayDestroy(pParam->pArray);*/ - /*}*/ - taosMemoryFree(pParam); - } - return 0; -} - tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { if (*topics == NULL) { *topics = tmq_list_new(); @@ -541,6 +653,8 @@ FAIL: } tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) { + return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam); +#if 0 // TODO: add read write lock SRequestObj* pRequest = NULL; tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS; @@ -627,6 +741,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in } return resp; +#endif } tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { @@ -723,7 +838,7 @@ FAIL: return code; } -void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) { +void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) { // conf->commitCb = cb; conf->commitCbUserParam = param; @@ -1384,3 +1499,10 @@ const char* tmq_get_table_name(TAOS_RES* res) { } return NULL; } +DLL_EXPORT void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) { + tmqCommitInner(tmq, offsets, 0, 1, cb, param); +} + +DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) { + return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL); +} diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index a7d174c9c5..469bde0b38 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -196,6 +196,8 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) { } } + tDecoderClear(&decoder); + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("mq-commit-offset-trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index b3dba695a7..5cc3bb345a 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -37,10 +37,10 @@ typedef struct { TdThread thread; int32_t consumerId; - int32_t ifManualCommit; - //int32_t autoCommitIntervalMs; // 1000 ms - //char autoCommit[8]; // true, false - //char autoOffsetRest[16]; // none, earliest, latest + int32_t ifManualCommit; + // int32_t autoCommitIntervalMs; // 1000 ms + // char autoCommit[8]; // true, false + // char autoOffsetRest[16]; // none, earliest, latest int32_t ifCheckData; int64_t expectMsgCnt; @@ -99,21 +99,15 @@ static void printHelp() { } void initLogFile() { - time_t now; - struct tm curTime; - char filename[256]; + time_t now; + struct tm curTime; + char filename[256]; - now = taosTime(NULL); + now = taosTime(NULL); taosLocalTime(&now, &curTime); - sprintf(filename,"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", - configDir, - curTime.tm_year+1900, - curTime.tm_mon+1, - curTime.tm_mday, - curTime.tm_hour, - curTime.tm_min, - curTime.tm_sec); - //sprintf(filename, "%s/../log/tmqlog.txt", configDir); + sprintf(filename, "%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", configDir, curTime.tm_year + 1900, + curTime.tm_mon + 1, curTime.tm_mday, curTime.tm_hour, curTime.tm_min, curTime.tm_sec); + // sprintf(filename, "%s/../log/tmqlog.txt", configDir); TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); if (NULL == pFile) { fprintf(stderr, "Failed to open %s for save result\n", filename); @@ -137,9 +131,9 @@ void saveConfigToLogFile() { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); - //taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit); - //taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs); - //taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest); + // taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit); + // taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs); + // taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest); taosFprintfFile(g_fp, " Topics: "); for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) { taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]); @@ -234,17 +228,17 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) while (1) { TAOS_ROW row = taos_fetch_row(msg); - if (row == NULL) break; + if (row == NULL) break; - TAOS_FIELD* fields = taos_fetch_fields(msg); + TAOS_FIELD* fields = taos_fetch_fields(msg); int32_t numOfFields = taos_field_count(msg); - + taos_print_row(buf, row, fields, numOfFields); - - if (0 != g_stConfInfo.showRowFlag) { + + if (0 != g_stConfInfo.showRowFlag) { taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf); } - + totalRows++; } @@ -276,9 +270,9 @@ void build_consumer(SThreadInfo* pInfo) { tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); - //tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); + // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); - tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); // tmq_conf_set(conf, "group.id", "cgrp1"); for (int32_t i = 0; i < pInfo->numOfKey; i++) { @@ -299,7 +293,7 @@ void build_consumer(SThreadInfo* pInfo) { pInfo->tmq = tmq_consumer_new(conf, NULL, 0); tmq_conf_destroy(conf); - + return; } @@ -323,7 +317,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult); taosFprintfFile(g_fp, "== save result sql: %s \n", sqlStr); - + TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); @@ -354,11 +348,11 @@ void loop_consume(SThreadInfo* pInfo) { totalMsgs++; if (totalRows >= pInfo->expectMsgCnt) { - taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n"); + taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n"); break; } - } else { - taosFprintfFile(g_fp, "==== delay over time, so break\n"); + } else { + taosFprintfFile(g_fp, "==== delay over time, so break\n"); break; } } @@ -386,7 +380,7 @@ void* consumeThreadFunc(void* param) { pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); exit(-1); } - + tmq_list_destroy(pInfo->topicList); pInfo->topicList = NULL; @@ -394,17 +388,17 @@ void* consumeThreadFunc(void* param) { if (pInfo->ifManualCommit) { taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n"); - pPrint("tmq_commit() manual commit when consume end.\n"); + pPrint("tmq_commit() manual commit when consume end.\n"); tmq_commit(pInfo->tmq, NULL, 0); } - + err = tmq_unsubscribe(pInfo->tmq); if (err) { pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); pInfo->consumeMsgCnt = -1; return NULL; } - + err = tmq_consumer_close(pInfo->tmq); if (err) { pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); @@ -482,9 +476,9 @@ int32_t getConsumeInfo() { int32_t* lengths = taos_fetch_lengths(pRes); // set default value - //g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000; - //memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true")); - //memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast")); + // g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000; + // memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true")); + // memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast")); for (int i = 0; i < num_fields; ++i) { if (row[i] == NULL || 0 == i) {