Merge pull request #4475 from taosdata/feature/wal
[TD-2370]<fix>: fix the deadlock when the system exits
This commit is contained in:
commit
48ba8aeb90
|
@ -15,6 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "tref.h"
|
||||
#include "tsync.h"
|
||||
#include "tglobal.h"
|
||||
#include "dnode.h"
|
||||
|
@ -28,7 +29,9 @@
|
|||
#include "mnodeUser.h"
|
||||
#include "mnodeVgroup.h"
|
||||
|
||||
static SBnMgmt tsBnMgmt;;
|
||||
extern int64_t tsDnodeRid;
|
||||
extern int64_t tsSdbRid;
|
||||
static SBnMgmt tsBnMgmt;
|
||||
static void bnMonitorDnodeModule();
|
||||
|
||||
static void bnLock() {
|
||||
|
@ -529,6 +532,9 @@ void bnCheckStatus() {
|
|||
void * pIter = NULL;
|
||||
SDnodeObj *pDnode = NULL;
|
||||
|
||||
void *dnodeSdb = taosAcquireRef(tsSdbRid, tsDnodeRid);
|
||||
if (dnodeSdb == NULL) return;
|
||||
|
||||
while (1) {
|
||||
pIter = mnodeGetNextDnode(pIter, &pDnode);
|
||||
if (pDnode == NULL) break;
|
||||
|
@ -543,6 +549,8 @@ void bnCheckStatus() {
|
|||
}
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
}
|
||||
|
||||
taosReleaseRef(tsSdbRid, tsDnodeRid);
|
||||
}
|
||||
|
||||
void bnCheckModules() {
|
||||
|
|
|
@ -366,7 +366,7 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
|
|||
queue->numOfItems--;
|
||||
atomic_sub_fetch_32(&qset->numOfItems, 1);
|
||||
code = 1;
|
||||
uTrace("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, *type, queue->numOfItems);
|
||||
uTrace("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, pNode->type, queue->numOfItems);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
|
|
|
@ -28,6 +28,7 @@ int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
|
|||
int32_t vnodeClose(int32_t vgId);
|
||||
|
||||
int32_t vnodeReset(SVnodeObj *pVnode);
|
||||
void vnodeCleanUp(SVnodeObj *pVnode);
|
||||
void vnodeDestroy(SVnodeObj *pVnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -23,8 +23,8 @@ extern "C" {
|
|||
|
||||
int32_t vnodeInitMWorker();
|
||||
void vnodeCleanupMWorker();
|
||||
int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle);
|
||||
int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle);
|
||||
int32_t vnodeCleanupInMWorker(SVnodeObj *pVnode);
|
||||
int32_t vnodeDestroyInMWorker(SVnodeObj *pVnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -26,8 +26,9 @@
|
|||
#include "vnodeSync.h"
|
||||
#include "vnodeVersion.h"
|
||||
#include "vnodeMgmt.h"
|
||||
#include "vnodeWorker.h"
|
||||
#include "vnodeMain.h"
|
||||
|
||||
static void vnodeCleanUp(SVnodeObj *pVnode);
|
||||
static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
|
||||
|
||||
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
|
||||
|
@ -110,8 +111,10 @@ int32_t vnodeDrop(int32_t vgId) {
|
|||
vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
||||
pVnode->dropped = 1;
|
||||
|
||||
// remove from hash, so new messages wont be consumed
|
||||
vnodeRemoveFromHash(pVnode);
|
||||
vnodeRelease(pVnode);
|
||||
vnodeCleanUp(pVnode);
|
||||
vnodeCleanupInMWorker(pVnode);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -309,6 +312,7 @@ int32_t vnodeOpen(int32_t vgId) {
|
|||
if (pVnode->sync <= 0) {
|
||||
vError("vgId:%d, failed to open sync, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
|
||||
tstrerror(terrno));
|
||||
vnodeRemoveFromHash(pVnode);
|
||||
vnodeCleanUp(pVnode);
|
||||
return terrno;
|
||||
}
|
||||
|
@ -322,6 +326,7 @@ int32_t vnodeClose(int32_t vgId) {
|
|||
if (pVnode == NULL) return 0;
|
||||
|
||||
vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode);
|
||||
vnodeRemoveFromHash(pVnode);
|
||||
vnodeRelease(pVnode);
|
||||
vnodeCleanUp(pVnode);
|
||||
|
||||
|
@ -398,11 +403,7 @@ void vnodeDestroy(SVnodeObj *pVnode) {
|
|||
tsdbDecCommitRef(vgId);
|
||||
}
|
||||
|
||||
|
||||
static void vnodeCleanUp(SVnodeObj *pVnode) {
|
||||
// remove from hash, so new messages wont be consumed
|
||||
vnodeRemoveFromHash(pVnode);
|
||||
|
||||
void vnodeCleanUp(SVnodeObj *pVnode) {
|
||||
if (!vnodeInInitStatus(pVnode)) {
|
||||
// it may be in updateing or reset state, then it shall wait
|
||||
int32_t i = 0;
|
||||
|
|
|
@ -114,7 +114,7 @@ void vnodeRelease(void *vparam) {
|
|||
}
|
||||
} else {
|
||||
vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", pVnode->vgId, refCount, pVnode);
|
||||
vnodeDestroy(pVnode);
|
||||
vnodeDestroyInMWorker(pVnode);
|
||||
int32_t count = taosHashGetSize(tsVnodesHash);
|
||||
vDebug("vgId:%d, vnode is destroyed, vnodes:%d", pVnode->vgId, count);
|
||||
}
|
||||
|
|
|
@ -21,10 +21,11 @@
|
|||
#include "tqueue.h"
|
||||
#include "tglobal.h"
|
||||
#include "vnodeWorker.h"
|
||||
#include "vnodeMain.h"
|
||||
|
||||
typedef enum {
|
||||
VNODE_WORKER_ACTION_CREATE,
|
||||
VNODE_WORKER_ACTION_DELETE
|
||||
VNODE_WORKER_ACTION_CLEANUP,
|
||||
VNODE_WORKER_ACTION_DESTROUY
|
||||
} EVMWorkerAction;
|
||||
|
||||
typedef struct {
|
||||
|
@ -132,14 +133,11 @@ void vnodeCleanupMWorker() {
|
|||
vnodeStopMWorker();
|
||||
}
|
||||
|
||||
static int32_t vnodeWriteIntoMWorker(int32_t vgId, EVMWorkerAction action,void *rpcHandle) {
|
||||
static int32_t vnodeWriteIntoMWorker(SVnodeObj *pVnode, EVMWorkerAction action, void *rpcHandle) {
|
||||
SVMWorkerMsg *pMsg = taosAllocateQitem(sizeof(SVMWorkerMsg));
|
||||
if (pMsg == NULL) return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||
|
||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||
if (pVnode == NULL) return TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->vgId = pVnode->vgId;
|
||||
pMsg->pVnode = pVnode;
|
||||
pMsg->rpcHandle = rpcHandle;
|
||||
pMsg->action = action;
|
||||
|
@ -150,29 +148,27 @@ static int32_t vnodeWriteIntoMWorker(int32_t vgId, EVMWorkerAction action,void *
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle) {
|
||||
vTrace("vgId:%d, will open in vmworker", vgId);
|
||||
return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_CREATE, rpcHandle);
|
||||
int32_t vnodeCleanupInMWorker(SVnodeObj *pVnode) {
|
||||
vTrace("vgId:%d, will cleanup in vmworker", pVnode->vgId);
|
||||
return vnodeWriteIntoMWorker(pVnode, VNODE_WORKER_ACTION_CLEANUP, NULL);
|
||||
}
|
||||
|
||||
int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle) {
|
||||
vTrace("vgId:%d, will cleanup in vmworker", vgId);
|
||||
return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_DELETE, rpcHandle);
|
||||
int32_t vnodeDestroyInMWorker(SVnodeObj *pVnode) {
|
||||
vTrace("vgId:%d, will destroy in vmworker", pVnode->vgId);
|
||||
return vnodeWriteIntoMWorker(pVnode, VNODE_WORKER_ACTION_DESTROUY, NULL);
|
||||
}
|
||||
|
||||
static void vnodeFreeMWorkerMsg(SVMWorkerMsg *pMsg) {
|
||||
vTrace("vgId:%d, disposed in vmworker", pMsg->vgId);
|
||||
vnodeRelease(pMsg->pVnode);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void vnodeSendVMWorkerRpcRsp(SVMWorkerMsg *pMsg) {
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->rpcHandle,
|
||||
.code = pMsg->code,
|
||||
};
|
||||
if (pMsg->rpcHandle != NULL) {
|
||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcHandle, .code = pMsg->code};
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
vnodeFreeMWorkerMsg(pMsg);
|
||||
}
|
||||
|
||||
|
@ -180,11 +176,11 @@ static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) {
|
|||
pMsg->code = 0;
|
||||
|
||||
switch (pMsg->action) {
|
||||
case VNODE_WORKER_ACTION_CREATE:
|
||||
pMsg->code = vnodeOpen(pMsg->vgId);
|
||||
case VNODE_WORKER_ACTION_CLEANUP:
|
||||
vnodeCleanUp(pMsg->pVnode);
|
||||
break;
|
||||
case VNODE_WORKER_ACTION_DELETE:
|
||||
pMsg->code = vnodeDrop(pMsg->vgId);
|
||||
case VNODE_WORKER_ACTION_DESTROUY:
|
||||
vnodeDestroy(pMsg->pVnode);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
|
Loading…
Reference in New Issue