From 98f62165ae7bbc3e969f96df291f72c3b610ca12 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 1 Jun 2022 13:37:33 +0800 Subject: [PATCH] refactor(tmq): tq meta --- example/src/tmq.c | 4 - source/dnode/vnode/CMakeLists.txt | 7 +- source/dnode/vnode/src/inc/tq.h | 7 ++ source/dnode/vnode/src/tq/tq.c | 126 +-------------------------- source/dnode/vnode/src/tq/tqMeta.c | 134 +++++++++++++++++++++++++++++ source/dnode/vnode/src/tq/tqPush.c | 2 + 6 files changed, 151 insertions(+), 129 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 5eecb5e4cc..e61ad69e6b 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -165,7 +165,6 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "group.id", "tg2"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); - /*tmq_conf_set(conf, "td.connect.db", "abc1");*/ tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "enable.auto.commit", "false"); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); @@ -191,7 +190,6 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { return; } int32_t cnt = 0; - /*clock_t startTime = clock();*/ while (running) { TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 0); if (tmqmessage) { @@ -204,8 +202,6 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { /*break;*/ } } - /*clock_t endTime = clock();*/ - /*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/ err = tmq_consumer_close(tmq); if (err) diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 17445b7abe..ea2a256663 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -52,10 +52,11 @@ target_sources( # tq "src/tq/tq.c" "src/tq/tqExec.c" - "src/tq/tqCommit.c" - "src/tq/tqOffset.c" - "src/tq/tqPush.c" + "src/tq/tqMeta.c" "src/tq/tqRead.c" + "src/tq/tqOffset.c" + #"src/tq/tqPush.c" + #"src/tq/tqCommit.c" ) target_include_directories( vnode diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 34a7ff823a..89ea969d92 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -168,6 +168,13 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead* int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId); +// tqMeta + +int32_t tqMetaOpen(STQ* pTq); +int32_t tqMetaClose(STQ* pTq); +int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle); +int32_t tqMetaDeleteHandle(STQ* pTq, const char* key); + // tqOffset STqOffsetStore* STqOffsetOpen(STqOffsetCfg*); void STqOffsetClose(STqOffsetStore*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4bce829b10..93f305ba77 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -14,7 +14,6 @@ */ #include "tq.h" -#include "tdbInt.h" int32_t tqInit() { int8_t old; @@ -47,51 +46,6 @@ void tqCleanUp() { } } -int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_t kLen2) { - return strcmp(pKey1, pKey2); -} - -int32_t tqStoreHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { - int32_t code; - int32_t vlen; - tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); - ASSERT(code == 0); - - void* buf = taosMemoryCalloc(1, vlen); - if (buf == NULL) { - ASSERT(0); - } - - SEncoder encoder; - tEncoderInit(&encoder, buf, vlen); - - if (tEncodeSTqHandle(&encoder, pHandle) < 0) { - ASSERT(0); - } - - TXN txn; - - if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - ASSERT(0); - } - - if (tdbBegin(pTq->pMetaStore, &txn) < 0) { - ASSERT(0); - } - - if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) { - ASSERT(0); - } - - if (tdbCommit(pTq->pMetaStore, &txn) < 0) { - ASSERT(0); - } - - tEncoderClear(&encoder); - taosMemoryFree(buf); - return 0; -} - STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { STQ* pTq = taosMemoryMalloc(sizeof(STQ)); if (pTq == NULL) { @@ -108,60 +62,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); - if (tdbOpen(path, 16 * 1024, 1, &pTq->pMetaStore) < 0) { - ASSERT(0); - } - - if (tdbTbOpen("handles", -1, -1, tqExecKeyCompare, pTq->pMetaStore, &pTq->pExecStore) < 0) { - ASSERT(0); - } - - TXN txn; - - if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { - ASSERT(0); - } - - TBC* pCur; - if (tdbTbcOpen(pTq->pExecStore, &pCur, &txn) < 0) { - ASSERT(0); - } - - void* pKey; - int kLen; - void* pVal; - int vLen; - - tdbTbcMoveToFirst(pCur); - SDecoder decoder; - - while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { - STqHandle handle; - tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - tDecodeSTqHandle(&decoder, &handle); - handle.pWalReader = walOpenReadHandle(pTq->pVnode->pWal); - for (int32_t i = 0; i < 5; i++) { - handle.execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); - } - if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - for (int32_t i = 0; i < 5; i++) { - SReadHandle reader = { - .reader = handle.execHandle.pExecReader[i], - .meta = pTq->pVnode->pMeta, - .pMsgCb = &pTq->pVnode->msgCb, - }; - handle.execHandle.exec.execCol.task[i] = - qCreateStreamExecTaskInfo(handle.execHandle.exec.execCol.qmsg, &reader); - ASSERT(handle.execHandle.exec.execCol.task[i]); - } - } else { - handle.execHandle.exec.execDb.pFilterOutTbUid = - taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - } - taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle)); - } - - if (tdbTxnClose(&txn) < 0) { + if (tqMetaOpen(pTq) < 0) { ASSERT(0); } @@ -174,7 +75,7 @@ void tqClose(STQ* pTq) { taosHashCleanup(pTq->handles); taosHashCleanup(pTq->pStreamTasks); taosHashCleanup(pTq->pushMgr); - tdbClose(pTq->pMetaStore); + tqMetaClose(pTq); taosMemoryFree(pTq); } // TODO @@ -256,9 +157,6 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ taosWLockLatch(&pHandle->pushHandle.lock); - /*SRpcHandleInfo* pInfo = atomic_load_ptr(&pHandle->pushHandle.pInfo);*/ - /*ASSERT(pInfo);*/ - SMqDataBlkRsp rsp = {0}; rsp.reqOffset = pHandle->pushHandle.reqOffset; rsp.blockData = taosArrayInit(0, sizeof(void*)); @@ -303,7 +201,6 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ }; tmsgSendRsp(&resp); - /*atomic_store_ptr(&pHandle->pushHandle.pInfo, NULL);*/ memset(&pHandle->pushHandle.info, 0, sizeof(SRpcHandleInfo)); taosWUnLockLatch(&pHandle->pushHandle.lock); @@ -508,24 +405,9 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey)); ASSERT(code == 0); - TXN txn; - - if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) { ASSERT(0); } - - if (tdbBegin(pTq->pMetaStore, &txn) < 0) { - ASSERT(0); - } - - if (tdbTbDelete(pTq->pExecStore, pReq->subKey, (int)strlen(pReq->subKey), &txn) < 0) { - /*ASSERT(0);*/ - } - - if (tdbCommit(pTq->pMetaStore, &txn) < 0) { - ASSERT(0); - } - return 0; } @@ -583,7 +465,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { atomic_add_fetch_32(&pHandle->epoch, 1); } - if (tqStoreHandle(pTq, req.subKey, pHandle) < 0) { + if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { // TODO } return 0; diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index f2f48bbc8a..74162a9f49 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -12,3 +12,137 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include "tdbInt.h" +#include "tq.h" + +int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_t kLen2) { + return strcmp(pKey1, pKey2); +} + +int32_t tqMetaOpen(STQ* pTq) { + if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaStore) < 0) { + ASSERT(0); + } + + if (tdbTbOpen("handles", -1, -1, tqExecKeyCompare, pTq->pMetaStore, &pTq->pExecStore) < 0) { + ASSERT(0); + } + + TXN txn; + + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { + ASSERT(0); + } + + TBC* pCur; + if (tdbTbcOpen(pTq->pExecStore, &pCur, &txn) < 0) { + ASSERT(0); + } + + void* pKey; + int kLen; + void* pVal; + int vLen; + + tdbTbcMoveToFirst(pCur); + SDecoder decoder; + + while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { + STqHandle handle; + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + tDecodeSTqHandle(&decoder, &handle); + handle.pWalReader = walOpenReadHandle(pTq->pVnode->pWal); + for (int32_t i = 0; i < 5; i++) { + handle.execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); + } + if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + for (int32_t i = 0; i < 5; i++) { + SReadHandle reader = { + .reader = handle.execHandle.pExecReader[i], + .meta = pTq->pVnode->pMeta, + .pMsgCb = &pTq->pVnode->msgCb, + }; + handle.execHandle.exec.execCol.task[i] = + qCreateStreamExecTaskInfo(handle.execHandle.exec.execCol.qmsg, &reader); + ASSERT(handle.execHandle.exec.execCol.task[i]); + } + } else { + handle.execHandle.exec.execDb.pFilterOutTbUid = + taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + } + taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle)); + } + + if (tdbTxnClose(&txn) < 0) { + ASSERT(0); + } + return 0; +} + +int32_t tqMetaClose(STQ* pTq) { + tdbClose(pTq->pMetaStore); + return 0; +} + +int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { + int32_t code; + int32_t vlen; + tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); + ASSERT(code == 0); + + void* buf = taosMemoryCalloc(1, vlen); + if (buf == NULL) { + ASSERT(0); + } + + SEncoder encoder; + tEncoderInit(&encoder, buf, vlen); + + if (tEncodeSTqHandle(&encoder, pHandle) < 0) { + ASSERT(0); + } + + TXN txn; + + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + ASSERT(0); + } + + if (tdbBegin(pTq->pMetaStore, &txn) < 0) { + ASSERT(0); + } + + if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) { + ASSERT(0); + } + + if (tdbCommit(pTq->pMetaStore, &txn) < 0) { + ASSERT(0); + } + + tEncoderClear(&encoder); + taosMemoryFree(buf); + return 0; +} + +int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { + TXN txn; + + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + ASSERT(0); + } + + if (tdbBegin(pTq->pMetaStore, &txn) < 0) { + ASSERT(0); + } + + if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), &txn) < 0) { + /*ASSERT(0);*/ + } + + if (tdbCommit(pTq->pMetaStore, &txn) < 0) { + ASSERT(0); + } + + return 0; +} diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index f2f48bbc8a..e31566f3fa 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -12,3 +12,5 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ + +#include "tq.h"