From 13680e2ee6ac6594e496695176c452edd0391e16 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 11 Nov 2022 10:25:16 +0800 Subject: [PATCH 1/4] chore: set stream fill history on for tsma --- source/dnode/mnode/impl/src/mndSma.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 56e725fac7..698c07d9bc 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -532,6 +532,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.sql = strdup(pCreate->sql); streamObj.smaId = smaObj.uid; streamObj.watermark = pCreate->watermark; + streamObj.fillHistory = STREAM_FILL_HISTORY_ON; streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE; streamObj.triggerParam = pCreate->maxDelay; streamObj.ast = strdup(smaObj.ast); From 5651bde6fe6bbe7f474634d6267e40d551d4e972 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 11 Nov 2022 10:54:46 +0800 Subject: [PATCH 2/4] enh: use lock in buf pool only for rsma vnode --- source/dnode/vnode/src/inc/vnd.h | 1 + source/dnode/vnode/src/vnd/vnodeBufPool.c | 30 +++++++++++++++++------ 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 8f8691cfc2..ecb34cd9b4 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -65,6 +65,7 @@ struct SVBufPool { SVnode* pVnode; volatile int32_t nRef; TdThreadSpinlock lock; + bool isLock; int64_t size; uint8_t* ptr; SVBufPoolNode* pTail; diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 6ac2ce1c16..86cd8af4d6 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -27,10 +27,15 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) return -1; } - if (taosThreadSpinInit(&pPool->lock, 0) != 0) { - taosMemoryFree(pPool); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + if (VND_IS_RSMA(pVnode)) { + if (taosThreadSpinInit(&pPool->lock, 0) != 0) { + taosMemoryFree(pPool); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + pPool->isLock = true; + } else { + pPool->isLock = false; } pPool->next = NULL; @@ -49,7 +54,9 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) static int vnodeBufPoolDestroy(SVBufPool *pPool) { vnodeBufPoolReset(pPool); - taosThreadSpinDestroy(&pPool->lock); + if (pPool->isLock) { + taosThreadSpinDestroy(&pPool->lock); + } taosMemoryFree(pPool); return 0; } @@ -114,7 +121,10 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { void *p = NULL; ASSERT(pPool != NULL); - taosThreadSpinLock(&pPool->lock); + if (pPool->isLock) { + taosThreadSpinLock(&pPool->lock); + } + if (pPool->node.size >= pPool->ptr - pPool->node.data + size) { // allocate from the anchor node p = pPool->ptr; @@ -125,7 +135,9 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { pNode = taosMemoryMalloc(sizeof(*pNode) + size); if (pNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - taosThreadSpinUnlock(&pPool->lock); + if (pPool->isLock) { + taosThreadSpinUnlock(&pPool->lock); + } return NULL; } @@ -138,7 +150,9 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { pPool->size = pPool->size + sizeof(*pNode) + size; } - taosThreadSpinUnlock(&pPool->lock); + if (pPool->isLock) { + taosThreadSpinUnlock(&pPool->lock); + } return p; } From 936daff09bd38716b0c9f87a0ad5966b2bbe47e9 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 11 Nov 2022 10:57:48 +0800 Subject: [PATCH 3/4] chore: revert the code change --- source/dnode/vnode/src/inc/vnd.h | 1 - source/dnode/vnode/src/vnd/vnodeBufPool.c | 30 +++++++---------------- 2 files changed, 9 insertions(+), 22 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index ecb34cd9b4..8f8691cfc2 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -65,7 +65,6 @@ struct SVBufPool { SVnode* pVnode; volatile int32_t nRef; TdThreadSpinlock lock; - bool isLock; int64_t size; uint8_t* ptr; SVBufPoolNode* pTail; diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 86cd8af4d6..3069e5224e 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -27,15 +27,10 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) return -1; } - if (VND_IS_RSMA(pVnode)) { - if (taosThreadSpinInit(&pPool->lock, 0) != 0) { - taosMemoryFree(pPool); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - pPool->isLock = true; - } else { - pPool->isLock = false; + if (taosThreadSpinInit(&pPool->lock, 0) != 0) { + taosMemoryFree(pPool); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } pPool->next = NULL; @@ -54,9 +49,7 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) static int vnodeBufPoolDestroy(SVBufPool *pPool) { vnodeBufPoolReset(pPool); - if (pPool->isLock) { - taosThreadSpinDestroy(&pPool->lock); - } + taosThreadSpinDestroy(&pPool->lock); taosMemoryFree(pPool); return 0; } @@ -121,9 +114,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { void *p = NULL; ASSERT(pPool != NULL); - if (pPool->isLock) { - taosThreadSpinLock(&pPool->lock); - } + taosThreadSpinLock(&pPool->lock); if (pPool->node.size >= pPool->ptr - pPool->node.data + size) { // allocate from the anchor node @@ -135,9 +126,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { pNode = taosMemoryMalloc(sizeof(*pNode) + size); if (pNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - if (pPool->isLock) { - taosThreadSpinUnlock(&pPool->lock); - } + taosThreadSpinUnlock(&pPool->lock); return NULL; } @@ -150,9 +139,8 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { pPool->size = pPool->size + sizeof(*pNode) + size; } - if (pPool->isLock) { - taosThreadSpinUnlock(&pPool->lock); - } + + taosThreadSpinUnlock(&pPool->lock); return p; } From 37b7e34fe26e26e97de3a0487065ebe4f103fbd3 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 11 Nov 2022 10:58:25 +0800 Subject: [PATCH 4/4] chore: revert the code change --- source/dnode/vnode/src/vnd/vnodeBufPool.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 3069e5224e..6ac2ce1c16 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -115,7 +115,6 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { ASSERT(pPool != NULL); taosThreadSpinLock(&pPool->lock); - if (pPool->node.size >= pPool->ptr - pPool->node.data + size) { // allocate from the anchor node p = pPool->ptr; @@ -139,7 +138,6 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { pPool->size = pPool->size + sizeof(*pNode) + size; } - taosThreadSpinUnlock(&pPool->lock); return p; }