enh(tmq): remove old tmq_commit api

This commit is contained in:
Liu Jicong 2022-05-19 10:53:24 +08:00
parent 2216438c72
commit 86580cec09
4 changed files with 41 additions and 46 deletions

View File

@ -239,7 +239,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
msg_process(tmqmessage); msg_process(tmqmessage);
taos_free_result(tmqmessage); taos_free_result(tmqmessage);
tmq_commit(tmq, NULL, 1); tmq_commit_async(tmq, NULL, tmq_commit_cb_print, NULL);
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/ /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
} }
} }

View File

@ -232,11 +232,11 @@ DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time); DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets); DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
#if 0 #if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset); DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
#endif #endif

View File

@ -368,7 +368,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
/*msg_process(tmqmessage);*/ /*msg_process(tmqmessage);*/
taos_free_result(tmqmessage); taos_free_result(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0); if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit_sync(tmq, NULL);
} }
} }

View File

@ -37,10 +37,10 @@ typedef struct {
TdThread thread; TdThread thread;
int32_t consumerId; int32_t consumerId;
int32_t ifManualCommit; int32_t ifManualCommit;
//int32_t autoCommitIntervalMs; // 1000 ms // int32_t autoCommitIntervalMs; // 1000 ms
//char autoCommit[8]; // true, false // char autoCommit[8]; // true, false
//char autoOffsetRest[16]; // none, earliest, latest // char autoOffsetRest[16]; // none, earliest, latest
int32_t ifCheckData; int32_t ifCheckData;
int64_t expectMsgCnt; int64_t expectMsgCnt;
@ -99,21 +99,15 @@ static void printHelp() {
} }
void initLogFile() { void initLogFile() {
time_t now; time_t now;
struct tm curTime; struct tm curTime;
char filename[256]; char filename[256];
now = taosTime(NULL); now = taosTime(NULL);
taosLocalTime(&now, &curTime); taosLocalTime(&now, &curTime);
sprintf(filename,"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", sprintf(filename, "%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", configDir, curTime.tm_year + 1900,
configDir, curTime.tm_mon + 1, curTime.tm_mday, curTime.tm_hour, curTime.tm_min, curTime.tm_sec);
curTime.tm_year+1900, // sprintf(filename, "%s/../log/tmqlog.txt", configDir);
curTime.tm_mon+1,
curTime.tm_mday,
curTime.tm_hour,
curTime.tm_min,
curTime.tm_sec);
//sprintf(filename, "%s/../log/tmqlog.txt", configDir);
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
if (NULL == pFile) { if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", filename); fprintf(stderr, "Failed to open %s for save result\n", filename);
@ -137,9 +131,9 @@ void saveConfigToLogFile() {
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
//taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit); // taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit);
//taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs); // taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
//taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest); // taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
taosFprintfFile(g_fp, " Topics: "); taosFprintfFile(g_fp, " Topics: ");
for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) { for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]); taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
@ -234,9 +228,9 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable)
while (1) { while (1) {
TAOS_ROW row = taos_fetch_row(msg); TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break; if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg); TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg); int32_t numOfFields = taos_field_count(msg);
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
@ -276,7 +270,7 @@ void build_consumer(SThreadInfo* pInfo) {
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
//tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
@ -322,10 +316,10 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName, sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName,
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult); pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
time_t tTime = taosGetTimestampSec(); time_t tTime = taosGetTimestampSec();
struct tm tm = *taosLocalTime(&tTime, NULL); struct tm tm = *taosLocalTime(&tTime, NULL);
taosFprintfFile(g_fp, "# save result: %d-%02d-%02d %02d:%02d:%02d, sql: %s\n", tm.tm_year + 1900, tm.tm_mon + 1, taosFprintfFile(g_fp, "# save result: %d-%02d-%02d %02d:%02d:%02d, sql: %s\n", tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, sqlStr); tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, sqlStr);
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -357,11 +351,11 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs++; totalMsgs++;
if (totalRows >= pInfo->expectMsgCnt) { if (totalRows >= pInfo->expectMsgCnt) {
taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n"); taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n");
break; break;
} }
} else { } else {
taosFprintfFile(g_fp, "==== delay over time, so break\n"); taosFprintfFile(g_fp, "==== delay over time, so break\n");
break; break;
} }
} }
@ -397,8 +391,9 @@ void* consumeThreadFunc(void* param) {
if (pInfo->ifManualCommit) { if (pInfo->ifManualCommit) {
taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n"); taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n");
pPrint("tmq_commit() manual commit when consume end.\n"); pPrint("tmq_commit() manual commit when consume end.\n");
tmq_commit(pInfo->tmq, NULL, 0); /*tmq_commit(pInfo->tmq, NULL, 0);*/
tmq_commit_sync(pInfo->tmq, NULL);
} }
err = tmq_unsubscribe(pInfo->tmq); err = tmq_unsubscribe(pInfo->tmq);
@ -485,9 +480,9 @@ int32_t getConsumeInfo() {
int32_t* lengths = taos_fetch_lengths(pRes); int32_t* lengths = taos_fetch_lengths(pRes);
// set default value // set default value
//g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000; // g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
//memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true")); // memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
//memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast")); // memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
for (int i = 0; i < num_fields; ++i) { for (int i = 0; i < num_fields; ++i) {
if (row[i] == NULL || 0 == i) { if (row[i] == NULL || 0 == i) {