From b1862f4c163431e1e507d2e0f692f828c2cd13b8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 4 Jan 2021 16:08:28 +0800 Subject: [PATCH 1/3] TD-2640 --- src/inc/query.h | 1 + src/query/src/qExecutor.c | 13 +++++++++++ src/vnode/inc/vnodeRead.h | 1 + src/vnode/inc/vnodeWrite.h | 1 + src/vnode/src/vnodeRead.c | 45 ++++++++++++++++++++++++++------------ src/vnode/src/vnodeWrite.c | 18 +++++++++------ 6 files changed, 58 insertions(+), 21 deletions(-) diff --git a/src/inc/query.h b/src/inc/query.h index 5e1de77889..7342221cb9 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -86,6 +86,7 @@ void qDestroyQueryInfo(qinfo_t qHandle); void* qOpenQueryMgmt(int32_t vgId); void qQueryMgmtNotifyClosed(void* pExecutor); +void qQueryMgmtReOpen(void *pExecutor); void qCleanupQueryMgmt(void* pExecutor); void** qRegisterQInfo(void* pMgmt, uint64_t qInfo); void** qAcquireQInfo(void* pMgmt, uint64_t key); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d9630edb9e..1f76d47968 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -7697,6 +7697,19 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) { taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn); } +void qQueryMgmtReOpen(void *pQMgmt) { + if (pQMgmt == NULL) { + return; + } + + SQueryMgmt *pQueryMgmt = pQMgmt; + qDebug("vgId:%d, set querymgmt reopen", pQueryMgmt->vgId); + + pthread_mutex_lock(&pQueryMgmt->lock); + pQueryMgmt->closed = false; + pthread_mutex_unlock(&pQueryMgmt->lock); +} + void qCleanupQueryMgmt(void* pQMgmt) { if (pQMgmt == NULL) { return; diff --git a/src/vnode/inc/vnodeRead.h b/src/vnode/inc/vnodeRead.h index f2953d79f4..f5375d6ab0 100644 --- a/src/vnode/inc/vnodeRead.h +++ b/src/vnode/inc/vnodeRead.h @@ -27,6 +27,7 @@ void vnodeCleanupRead(void); int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam); void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead); int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead); +void vnodeWaitReadCompleted(void *pVnode); #ifdef __cplusplus } diff --git a/src/vnode/inc/vnodeWrite.h b/src/vnode/inc/vnodeWrite.h index 8b3f0fdb58..5238e45b81 100644 --- a/src/vnode/inc/vnodeWrite.h +++ b/src/vnode/inc/vnodeWrite.h @@ -27,6 +27,7 @@ void vnodeCleanupWrite(void); int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg); void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite); int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet); +void vnodeWaitWriteCompleted(void *pVnode); #ifdef __cplusplus } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index a972ffec1c..c864bc995b 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -88,22 +88,15 @@ void vnodeFreeFromRQueue(void *vparam, SVReadMsg *pRead) { vnodeRelease(pVnode); } -int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) { - SVnodeObj *pVnode = vparam; - - if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_QUERY) { - int32_t code = vnodeCheckRead(pVnode); - if (code != TSDB_CODE_SUCCESS) return code; - } - +static SVReadMsg *vnodeBuildVReadMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, int8_t qtype, SRpcMsg *pRpcMsg) { int32_t size = sizeof(SVReadMsg) + contLen; SVReadMsg *pRead = taosAllocateQitem(size); if (pRead == NULL) { - return TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + return NULL; } - if (rparam != NULL) { - SRpcMsg *pRpcMsg = rparam; + if (pRpcMsg != NULL) { pRead->rpcHandle = pRpcMsg->handle; pRead->rpcAhandle = pRpcMsg->ahandle; pRead->msgType = pRpcMsg->msgType; @@ -119,13 +112,35 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt pRead->qtype = qtype; atomic_add_fetch_32(&pVnode->refCount, 1); + + return pRead; +} + +int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) { + SVReadMsg *pRead = vnodeBuildVReadMsg(vparam, pCont, contLen, qtype, rparam); + if (pRead == NULL) { + assert(terrno != 0); + return terrno; + } + + SVnodeObj *pVnode = vparam; + + int32_t code = vnodeCheckRead(pVnode); + if (code != TSDB_CODE_SUCCESS) { + taosFreeQitem(pRead); + vnodeRelease(pVnode); + return code; + } + atomic_add_fetch_32(&pVnode->queuedRMsg, 1); - if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) { - vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); + if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) { + vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, + pVnode->queuedRMsg); return taosWriteQitem(pVnode->fqueue, qtype, pRead); } else { - vTrace("vgId:%d, write into vquery queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); + vTrace("vgId:%d, write into vquery queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, + pVnode->queuedRMsg); return taosWriteQitem(pVnode->qqueue, qtype, pRead); } } @@ -420,3 +435,5 @@ int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) { vTrace("QInfo:%p register qhandle to connect:%p", qhandle, handle); return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg)); } + +void vnodeWaitReadCompleted(void *pVnode) {} \ No newline at end of file diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 571c7d667a..b65251508d 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -120,12 +120,6 @@ static int32_t vnodeCheckWrite(SVnodeObj *pVnode) { return TSDB_CODE_APP_NOT_READY; } - if (vnodeInClosingStatus(pVnode)) { - vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], - pVnode->refCount, pVnode); - return TSDB_CODE_APP_NOT_READY; - } - if (pVnode->isFull) { vDebug("vgId:%d, vnode is full, refCount:%d", pVnode->vgId, pVnode->refCount); return TSDB_CODE_VND_IS_FULL; @@ -254,6 +248,14 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) { } } + if (!vnodeInReadyStatus(pVnode)) { + vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], + pVnode->refCount, pVnode); + taosFreeQitem(pWrite); + vnodeRelease(pVnode); + return TSDB_CODE_APP_NOT_READY; + } + int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1); if (queued > MAX_QUEUED_MSG_NUM) { int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3; @@ -337,4 +339,6 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { pWrite->processedCount); return TSDB_CODE_VND_ACTION_IN_PROGRESS; } -} \ No newline at end of file +} + +void vnodeWaitWriteCompleted(void *pVnode) {} \ No newline at end of file From e987cd8ce8e0b695b788667fcebdb7b3e340506e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 4 Jan 2021 20:19:03 +0800 Subject: [PATCH 2/3] minus change --- src/vnode/src/vnodeMain.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 8f44a248ce..9117ab03af 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -153,6 +153,11 @@ static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) { int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) { SVnodeObj *pVnode = vparam; + if (pVnode->dbCfgVersion == pVnodeCfg->cfg.dbCfgVersion && pVnode->vgCfgVersion == pVnodeCfg->cfg.vgCfgVersion) { + vDebug("vgId:%d, dbCfgVersion:%d and vgCfgVersion:%d not change", pVnode->vgId, pVnode->dbCfgVersion, + pVnode->vgCfgVersion); + return TSDB_CODE_SUCCESS; + } // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS // dbCfgVersion can be corrected by status msg From 899f9089fedcc460a9711f5dc84c66ee268b178f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 4 Jan 2021 20:50:56 +0800 Subject: [PATCH 3/3] TD-2640 --- src/vnode/src/vnodeMain.c | 20 +++++++------------- src/vnode/src/vnodeStatus.c | 20 +++++++++++++++++++- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 9117ab03af..eb43fba079 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -416,15 +416,12 @@ void vnodeDestroy(SVnodeObj *pVnode) { } void vnodeCleanUp(SVnodeObj *pVnode) { - if (!vnodeInInitStatus(pVnode)) { - // it may be in updateing or reset state, then it shall wait - int32_t i = 0; - while (!vnodeSetClosingStatus(pVnode)) { - if (++i % 1000 == 0) { - sched_yield(); - } - } - } + vDebug("vgId:%d, vnode will cleanup, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); + + vnodeSetClosingStatus(pVnode); + + // release local resources only after cutting off outside connections + qQueryMgmtNotifyClosed(pVnode->qMgmt); // stop replication module if (pVnode->sync > 0) { @@ -433,10 +430,7 @@ void vnodeCleanUp(SVnodeObj *pVnode) { syncStop(sync); } - vDebug("vgId:%d, vnode will cleanup, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); - - // release local resources only after cutting off outside connections - qQueryMgmtNotifyClosed(pVnode->qMgmt); + vDebug("vgId:%d, vnode is cleaned, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); vnodeRelease(pVnode); } diff --git a/src/vnode/src/vnodeStatus.c b/src/vnode/src/vnodeStatus.c index d09a6a8663..76618ffca7 100644 --- a/src/vnode/src/vnodeStatus.c +++ b/src/vnode/src/vnodeStatus.c @@ -15,6 +15,8 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "taosmsg.h" +#include "query.h" #include "vnodeStatus.h" char* vnodeStatus[] = { @@ -44,11 +46,13 @@ bool vnodeSetReadyStatus(SVnodeObj* pVnode) { vDebug("vgId:%d, cannot set status:ready, old:%s", pVnode->vgId, vnodeStatus[pVnode->status]); } + qQueryMgmtReOpen(pVnode->qMgmt); + pthread_mutex_unlock(&pVnode->statusMutex); return set; } -bool vnodeSetClosingStatus(SVnodeObj* pVnode) { +static bool vnodeSetClosingStatusImp(SVnodeObj* pVnode) { bool set = false; pthread_mutex_lock(&pVnode->statusMutex); @@ -63,6 +67,20 @@ bool vnodeSetClosingStatus(SVnodeObj* pVnode) { return set; } +bool vnodeSetClosingStatus(SVnodeObj* pVnode) { + if (!vnodeInInitStatus(pVnode)) { + // it may be in updating or reset state, then it shall wait + int32_t i = 0; + while (!vnodeSetClosingStatusImp(pVnode)) { + if (++i % 1000 == 0) { + sched_yield(); + } + } + } + + return true; +} + bool vnodeSetUpdatingStatus(SVnodeObj* pVnode) { bool set = false; pthread_mutex_lock(&pVnode->statusMutex);