From 3629958b43a6bddd72fedf633c158ec09948053f Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 23 May 2022 20:31:35 +0800 Subject: [PATCH 1/2] feat(tmq): serializer and deserializer for tq exec --- include/util/tencode.h | 2 +- source/dnode/vnode/src/inc/tq.h | 7 ++-- source/dnode/vnode/src/tq/tq.c | 57 +++++++++++++++++++++++++++++---- source/libs/wal/src/walMgmt.c | 16 ++++----- 4 files changed, 65 insertions(+), 17 deletions(-) diff --git a/include/util/tencode.h b/include/util/tencode.h index af38d694e2..cbacd59fa7 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -82,7 +82,7 @@ typedef struct { do { \ SEncoder coder = {0}; \ tEncoderInit(&coder, NULL, 0); \ - if ((E)(&coder, S) == 0) { \ + if ((E)(&coder, S) >= 0) { \ SIZE = coder.pos; \ RET = 0; \ } else { \ diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index f89df4a96f..40b490da47 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -20,9 +20,9 @@ #include "executor.h" #include "os.h" -#include "tcache.h" #include "thash.h" #include "tmsg.h" +#include "tqueue.h" #include "trpc.h" #include "ttimer.h" #include "wal.h" @@ -86,6 +86,9 @@ typedef struct { qTaskInfo_t task[5]; } STqExec; +int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec); +int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec); + struct STQ { char* path; SHashObj* pushMgr; // consumerId -> STqExec* @@ -93,7 +96,7 @@ struct STQ { SHashObj* pStreamTasks; SVnode* pVnode; SWal* pWal; - // TDB* pTdb; + TDB* pTdb; }; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6ca523b580..f7b4ba93a6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -14,14 +14,25 @@ */ #include "tq.h" -#include "tqueue.h" int32_t tqInit() { - // + int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1); + if (old == 0) { + tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ"); + if (tqMgmt.timer == NULL) { + atomic_store_8(&tqMgmt.inited, 0); + return -1; + } + } return 0; } -void tqCleanUp() {} +void tqCleanUp() { + int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2); + if (old != 1) return; + taosTmrCleanUp(tqMgmt.timer); + atomic_store_8(&tqMgmt.inited, 0); +} STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { STQ* pTq = taosMemoryMalloc(sizeof(STQ)); @@ -32,9 +43,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { pTq->path = strdup(path); pTq->pVnode = pVnode; pTq->pWal = pWal; - /*if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) {*/ - /*ASSERT(0);*/ - /*}*/ + if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) { + ASSERT(0); + } pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); @@ -51,11 +62,45 @@ void tqClose(STQ* pTq) { taosHashCleanup(pTq->execs); taosHashCleanup(pTq->pStreamTasks); taosHashCleanup(pTq->pushMgr); + tdbClose(pTq->pTdb); taosMemoryFree(pTq); } // TODO } +int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeCStr(pEncoder, pExec->subKey) < 0) return -1; + if (tEncodeI64(pEncoder, pExec->consumerId) < 0) return -1; + if (tEncodeI32(pEncoder, pExec->epoch) < 0) return -1; + if (tEncodeI8(pEncoder, pExec->subType) < 0) return -1; + if (tEncodeI8(pEncoder, pExec->withTbName) < 0) return -1; + if (tEncodeI8(pEncoder, pExec->withSchema) < 0) return -1; + if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1; + if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { + if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1; + // TODO encode modified exec + } + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pExec->subKey) < 0) return -1; + if (tDecodeI64(pDecoder, &pExec->consumerId) < 0) return -1; + if (tDecodeI32(pDecoder, &pExec->epoch) < 0) return -1; + if (tDecodeI8(pDecoder, &pExec->subType) < 0) return -1; + if (tDecodeI8(pDecoder, &pExec->withTbName) < 0) return -1; + if (tDecodeI8(pDecoder, &pExec->withSchema) < 0) return -1; + if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1; + if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { + if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1; + // TODO decode modified exec + } + tEndDecode(pDecoder); + return 0; +} int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { void* pIter = NULL; while (1) { diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index ada1f599f2..f2f423ddf5 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -14,17 +14,17 @@ */ #define _DEFAULT_SOURCE -#include "tcompare.h" #include "os.h" #include "taoserror.h" +#include "tcompare.h" #include "tref.h" #include "walInt.h" typedef struct { - int8_t stop; - int8_t inited; - uint32_t seq; - int32_t refSetId; + int8_t stop; + int8_t inited; + uint32_t seq; + int32_t refSetId; TdThread thread; } SWalMgmt; @@ -53,13 +53,14 @@ int32_t walInit() { } void walCleanUp() { - int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0); - if (old == 0) { + int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 2); + if (old != 1) { return; } walStopThread(); taosCloseRef(tsWal.refSetId); wInfo("wal module is cleaned up"); + atomic_store_8(&tsWal.inited, 0); } SWal *walOpen(const char *path, SWalCfg *pCfg) { @@ -126,7 +127,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { } if (walCheckAndRepairIdx(pWal) < 0) { - } wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, From 0ad9c4cebd4a5eb4fcb62768e940d84b9ad8615c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 23 May 2022 20:40:37 +0800 Subject: [PATCH 2/2] fix: init once --- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 22 +++++++++++---- source/libs/wal/src/walMgmt.c | 47 ++++++++++++++++++++------------- 3 files changed, 47 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 40b490da47..ad3f8cc869 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -104,7 +104,7 @@ typedef struct { tmr_h timer; } STqMgmt; -static STqMgmt tqMgmt; +static STqMgmt tqMgmt = {0}; // init once int tqInit(); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f7b4ba93a6..bd48ed9b4c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -16,22 +16,34 @@ #include "tq.h" int32_t tqInit() { - int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1); + int8_t old; + while (1) { + old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2); + if (old != 2) break; + } + if (old == 0) { tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ"); if (tqMgmt.timer == NULL) { atomic_store_8(&tqMgmt.inited, 0); return -1; } + atomic_store_8(&tqMgmt.inited, 1); } return 0; } void tqCleanUp() { - int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2); - if (old != 1) return; - taosTmrCleanUp(tqMgmt.timer); - atomic_store_8(&tqMgmt.inited, 0); + int8_t old; + while (1) { + old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2); + if (old != 2) break; + } + + if (old == 1) { + taosTmrCleanUp(tqMgmt.timer); + atomic_store_8(&tqMgmt.inited, 0); + } } STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index f2f423ddf5..71cd6de73f 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -36,31 +36,42 @@ static void walFreeObj(void *pWal); int64_t walGetSeq() { return (int64_t)atomic_load_32(&tsWal.seq); } int32_t walInit() { - int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1); - if (old == 1) return 0; - - tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); - - int32_t code = walCreateThread(); - if (code != 0) { - wError("failed to init wal module since %s", tstrerror(code)); - atomic_store_8(&tsWal.inited, 0); - return code; + int8_t old; + while (1) { + old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 2); + if (old != 2) break; + } + + if (old == 0) { + tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); + + int32_t code = walCreateThread(); + if (code != 0) { + wError("failed to init wal module since %s", tstrerror(code)); + atomic_store_8(&tsWal.inited, 0); + return code; + } + + wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId); + atomic_store_8(&tsWal.inited, 1); } - wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId); return 0; } void walCleanUp() { - int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 2); - if (old != 1) { - return; + int8_t old; + while (1) { + old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 2); + if (old != 2) break; + } + + if (old == 1) { + walStopThread(); + taosCloseRef(tsWal.refSetId); + wInfo("wal module is cleaned up"); + atomic_store_8(&tsWal.inited, 0); } - walStopThread(); - taosCloseRef(tsWal.refSetId); - wInfo("wal module is cleaned up"); - atomic_store_8(&tsWal.inited, 0); } SWal *walOpen(const char *path, SWalCfg *pCfg) {