Merge branch '3.0' into enh/TD-30933-3.0
This commit is contained in:
commit
c47cbc42cc
|
@ -35,7 +35,7 @@ typedef struct SVnodeMgmt {
|
|||
SWWorkerPool fetchPool;
|
||||
SSingleWorker mgmtWorker;
|
||||
SSingleWorker mgmtMultiWorker;
|
||||
SHashObj *hash;
|
||||
SHashObj *runngingHash;
|
||||
SHashObj *closedHash;
|
||||
SHashObj *creatingHash;
|
||||
TdThreadRwlock lock;
|
||||
|
@ -98,7 +98,8 @@ SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict);
|
|||
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
|
||||
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
|
||||
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed);
|
||||
void vmRemoveFromCreatingHash(SVnodeMgmt *pMgmt, int32_t vgId);
|
||||
void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId);
|
||||
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId);
|
||||
|
||||
// vmHandle.c
|
||||
SArray *vmGetMsgHandles();
|
||||
|
|
|
@ -23,7 +23,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod
|
|||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
|
||||
int32_t num = 0;
|
||||
int32_t size = taosHashGetSize(pMgmt->hash);
|
||||
int32_t size = taosHashGetSize(pMgmt->runngingHash);
|
||||
int32_t closedSize = taosHashGetSize(pMgmt->closedHash);
|
||||
size += closedSize;
|
||||
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
|
||||
|
@ -32,7 +32,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod
|
|||
return terrno;
|
||||
}
|
||||
|
||||
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||
void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
|
||||
while (pIter) {
|
||||
SVnodeObj **ppVnode = pIter;
|
||||
SVnodeObj *pVnode = *ppVnode;
|
||||
|
@ -40,9 +40,9 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod
|
|||
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||
dTrace("vgId:%d,acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
|
||||
pVnodes[num++] = (*ppVnode);
|
||||
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||
pIter = taosHashIterate(pMgmt->runngingHash, pIter);
|
||||
} else {
|
||||
taosHashCancelIterate(pMgmt->hash, pIter);
|
||||
taosHashCancelIterate(pMgmt->runngingHash, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -71,7 +71,7 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV
|
|||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
|
||||
int32_t num = 0;
|
||||
int32_t size = taosHashGetSize(pMgmt->hash);
|
||||
int32_t size = taosHashGetSize(pMgmt->runngingHash);
|
||||
int32_t creatingSize = taosHashGetSize(pMgmt->creatingHash);
|
||||
size += creatingSize;
|
||||
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
|
||||
|
@ -80,7 +80,7 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV
|
|||
return terrno;
|
||||
}
|
||||
|
||||
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||
void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
|
||||
while (pIter) {
|
||||
SVnodeObj **ppVnode = pIter;
|
||||
SVnodeObj *pVnode = *ppVnode;
|
||||
|
@ -88,9 +88,9 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV
|
|||
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||
dTrace("vgId:%d,acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
|
||||
pVnodes[num++] = (*ppVnode);
|
||||
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||
pIter = taosHashIterate(pMgmt->runngingHash, pIter);
|
||||
} else {
|
||||
taosHashCancelIterate(pMgmt->hash, pIter);
|
||||
taosHashCancelIterate(pMgmt->runngingHash, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -119,14 +119,14 @@ int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeOb
|
|||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
|
||||
int32_t num = 0;
|
||||
int32_t size = taosHashGetSize(pMgmt->hash);
|
||||
int32_t size = taosHashGetSize(pMgmt->runngingHash);
|
||||
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
|
||||
if (pVnodes == NULL) {
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||
void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
|
||||
while (pIter) {
|
||||
SVnodeObj **ppVnode = pIter;
|
||||
SVnodeObj *pVnode = *ppVnode;
|
||||
|
@ -134,9 +134,9 @@ int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeOb
|
|||
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||
dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
|
||||
pVnodes[num++] = (*ppVnode);
|
||||
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||
pIter = taosHashIterate(pMgmt->runngingHash, pIter);
|
||||
} else {
|
||||
taosHashCancelIterate(pMgmt->hash, pIter);
|
||||
taosHashCancelIterate(pMgmt->runngingHash, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
|
|||
|
||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
|
||||
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||
void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
|
||||
while (pIter) {
|
||||
SVnodeObj **ppVnode = pIter;
|
||||
if (ppVnode == NULL || *ppVnode == NULL) continue;
|
||||
|
@ -43,7 +43,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
|
|||
if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
|
||||
dError("failed to push vnode load");
|
||||
}
|
||||
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||
pIter = taosHashIterate(pMgmt->runngingHash, pIter);
|
||||
}
|
||||
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
|
@ -55,7 +55,7 @@ void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
|||
|
||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
|
||||
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||
void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
|
||||
while (pIter) {
|
||||
SVnodeObj **ppVnode = pIter;
|
||||
if (ppVnode == NULL || *ppVnode == NULL) continue;
|
||||
|
@ -71,7 +71,7 @@ void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
|||
}
|
||||
}
|
||||
}
|
||||
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||
pIter = taosHashIterate(pMgmt->runngingHash, pIter);
|
||||
}
|
||||
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
|
@ -140,7 +140,7 @@ void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
|
|||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
for (int i = 0; i < list_size; i++) {
|
||||
int32_t vgroup_id = vgroup_ids[i];
|
||||
void *vnode = taosHashGet(pMgmt->hash, &vgroup_id, sizeof(int32_t));
|
||||
void *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
|
||||
if (vnode == NULL) {
|
||||
r = taos_counter_delete(tsInsertCounter, keys[i]);
|
||||
if (r) {
|
||||
|
@ -381,7 +381,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
if (vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs) < 0) {
|
||||
dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
|
||||
vmReleaseVnode(pMgmt, pVnode);
|
||||
vmRemoveFromCreatingHash(pMgmt, req.vgId);
|
||||
vmCleanPrimaryDisk(pMgmt, req.vgId);
|
||||
(void)tFreeSCreateVnodeReq(&req);
|
||||
code = terrno != 0 ? terrno : -1;
|
||||
return code;
|
||||
|
@ -423,25 +423,11 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
_OVER:
|
||||
vmRemoveFromCreatingHash(pMgmt, req.vgId);
|
||||
vmCleanPrimaryDisk(pMgmt, req.vgId);
|
||||
|
||||
if (code != 0) {
|
||||
int32_t r = 0;
|
||||
r = taosThreadRwlockWrlock(&pMgmt->lock);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to lock since %s", req.vgId, tstrerror(r));
|
||||
}
|
||||
if (r == 0) {
|
||||
dInfo("vgId:%d, remove from hash", req.vgId);
|
||||
r = taosHashRemove(pMgmt->hash, &req.vgId, sizeof(int32_t));
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to remove vnode since %s", req.vgId, tstrerror(r));
|
||||
}
|
||||
}
|
||||
r = taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to unlock since %s", req.vgId, tstrerror(r));
|
||||
}
|
||||
vmCloseFailedVnode(pMgmt, req.vgId);
|
||||
|
||||
vnodeClose(pImpl);
|
||||
vnodeDestroy(0, path, pMgmt->pTfs, 0);
|
||||
} else {
|
||||
|
|
|
@ -25,7 +25,7 @@ int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
|
|||
SVnodeObj *pVnode = NULL;
|
||||
|
||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
int32_t r = taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
||||
int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
||||
if (pVnode != NULL) {
|
||||
diskId = pVnode->diskPrimary;
|
||||
}
|
||||
|
@ -33,6 +33,82 @@ int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
|
|||
return diskId;
|
||||
}
|
||||
|
||||
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
|
||||
if (!ppVnode || !(*ppVnode)) return;
|
||||
|
||||
SVnodeObj *pVnode = *ppVnode;
|
||||
|
||||
int32_t refCount = atomic_load_32(&pVnode->refCount);
|
||||
while (refCount > 0) {
|
||||
dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
|
||||
taosMsleep(200);
|
||||
refCount = atomic_load_32(&pVnode->refCount);
|
||||
}
|
||||
|
||||
taosMemoryFree(pVnode->path);
|
||||
taosMemoryFree(pVnode);
|
||||
ppVnode[0] = NULL;
|
||||
}
|
||||
|
||||
static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t diskId) {
|
||||
int32_t code = 0;
|
||||
SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
|
||||
if (pCreatingVnode == NULL) {
|
||||
dError("failed to alloc vnode since %s", terrstr());
|
||||
return terrno;
|
||||
}
|
||||
(void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
|
||||
|
||||
pCreatingVnode->vgId = vgId;
|
||||
pCreatingVnode->diskPrimary = diskId;
|
||||
|
||||
code = taosThreadRwlockWrlock(&pMgmt->lock);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(pCreatingVnode);
|
||||
return code;
|
||||
}
|
||||
|
||||
dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
|
||||
code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
|
||||
if (code != 0) {
|
||||
dError("vgId:%d, failed to put vnode to creatingHash", vgId);
|
||||
taosMemoryFree(pCreatingVnode);
|
||||
}
|
||||
|
||||
int32_t r = taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
|
||||
SVnodeObj *pOld = NULL;
|
||||
|
||||
(void)taosThreadRwlockWrlock(&pMgmt->lock);
|
||||
int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to get vnode from creating Hash", vgId);
|
||||
}
|
||||
dTrace("vgId:%d, remove from creating Hash", vgId);
|
||||
r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to remove vnode from hash", vgId);
|
||||
}
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
|
||||
if (pOld) {
|
||||
dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
|
||||
vmFreeVnodeObj(&pOld);
|
||||
}
|
||||
|
||||
_OVER:
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
|
||||
}
|
||||
}
|
||||
|
||||
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
|
||||
int32_t code = 0;
|
||||
STfs *pTfs = pMgmt->pTfs;
|
||||
|
@ -91,45 +167,15 @@ int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
|
|||
diskId = id;
|
||||
}
|
||||
}
|
||||
|
||||
SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
|
||||
if (pCreatingVnode == NULL) {
|
||||
code = -1;
|
||||
if (terrno != 0) code = terrno;
|
||||
dError("failed to alloc vnode since %s", tstrerror(code));
|
||||
int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
|
||||
}
|
||||
goto _OVER;
|
||||
}
|
||||
(void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
|
||||
|
||||
pCreatingVnode->vgId = vgId;
|
||||
pCreatingVnode->diskPrimary = diskId;
|
||||
|
||||
code = taosThreadRwlockWrlock(&pMgmt->lock);
|
||||
code = vmRegisterCreatingState(pMgmt, vgId, diskId);
|
||||
if (code != 0) {
|
||||
int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
|
||||
}
|
||||
taosMemoryFree(pCreatingVnode);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
|
||||
code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
|
||||
if (code != 0) {
|
||||
dError("vgId:%d, failed to put vnode to creatingHash", vgId);
|
||||
taosMemoryFree(pCreatingVnode);
|
||||
}
|
||||
|
||||
int32_t r = taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
|
||||
}
|
||||
|
||||
code = taosThreadMutexUnlock(&pMgmt->mutex);
|
||||
if (code != 0) {
|
||||
goto _OVER;
|
||||
|
@ -154,11 +200,13 @@ _OVER:
|
|||
}
|
||||
}
|
||||
|
||||
void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingState(pMgmt, vgId); }
|
||||
|
||||
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
|
||||
SVnodeObj *pVnode = NULL;
|
||||
|
||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
int32_t r = taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
||||
int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
||||
if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
|
||||
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||
pVnode = NULL;
|
||||
|
@ -182,21 +230,75 @@ void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
//(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
}
|
||||
|
||||
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
|
||||
if (!ppVnode || !(*ppVnode)) return;
|
||||
static int32_t vmRegisterRunningState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||
SVnodeObj *pOld = NULL;
|
||||
|
||||
SVnodeObj *pVnode = *ppVnode;
|
||||
int32_t r = taosHashGetDup(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
|
||||
}
|
||||
if (pOld) {
|
||||
vmFreeVnodeObj(&pOld);
|
||||
}
|
||||
int32_t code = taosHashPut(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
|
||||
|
||||
int32_t refCount = atomic_load_32(&pVnode->refCount);
|
||||
while (refCount > 0) {
|
||||
dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
|
||||
taosMsleep(200);
|
||||
refCount = atomic_load_32(&pVnode->refCount);
|
||||
return code;
|
||||
}
|
||||
|
||||
static void vmUnRegisterRunningState(SVnodeMgmt *pMgmt, int32_t vgId) {
|
||||
dInfo("vgId:%d, remove from hash", vgId);
|
||||
int32_t r = taosHashRemove(pMgmt->runngingHash, &vgId, sizeof(int32_t));
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to remove vnode since %s", vgId, tstrerror(r));
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t vmRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||
int32_t code = 0;
|
||||
SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
|
||||
if (pClosedVnode == NULL) {
|
||||
dError("failed to alloc vnode since %s", terrstr());
|
||||
return terrno;
|
||||
}
|
||||
(void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
|
||||
|
||||
pClosedVnode->vgId = pVnode->vgId;
|
||||
pClosedVnode->dropped = pVnode->dropped;
|
||||
pClosedVnode->vgVersion = pVnode->vgVersion;
|
||||
pClosedVnode->diskPrimary = pVnode->diskPrimary;
|
||||
pClosedVnode->toVgId = pVnode->toVgId;
|
||||
|
||||
SVnodeObj *pOld = NULL;
|
||||
int32_t r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
|
||||
}
|
||||
if (pOld) {
|
||||
vmFreeVnodeObj(&pOld);
|
||||
}
|
||||
dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
|
||||
r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
|
||||
}
|
||||
|
||||
taosMemoryFree(pVnode->path);
|
||||
taosMemoryFree(pVnode);
|
||||
ppVnode[0] = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
static void vmUnRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||
SVnodeObj *pOld = NULL;
|
||||
int32_t r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
|
||||
}
|
||||
if (pOld != NULL) {
|
||||
vmFreeVnodeObj(&pOld);
|
||||
dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
|
||||
r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
||||
|
@ -233,32 +335,8 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
|||
}
|
||||
|
||||
(void)taosThreadRwlockWrlock(&pMgmt->lock);
|
||||
|
||||
SVnodeObj *pOld = NULL;
|
||||
|
||||
int32_t r = taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
|
||||
}
|
||||
if (pOld) {
|
||||
vmFreeVnodeObj(&pOld);
|
||||
}
|
||||
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
|
||||
|
||||
pOld = NULL;
|
||||
r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
|
||||
}
|
||||
if (pOld != NULL) {
|
||||
vmFreeVnodeObj(&pOld);
|
||||
dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
|
||||
r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t code = vmRegisterRunningState(pMgmt, pVnode);
|
||||
vmUnRegisterClosedState(pMgmt, pVnode);
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
|
||||
return code;
|
||||
|
@ -273,38 +351,12 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
|
|||
}
|
||||
|
||||
(void)taosThreadRwlockWrlock(&pMgmt->lock);
|
||||
int32_t r = taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
|
||||
}
|
||||
vmUnRegisterRunningState(pMgmt, pVnode->vgId);
|
||||
if (keepClosed) {
|
||||
SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
|
||||
if (pClosedVnode == NULL) {
|
||||
dError("failed to alloc vnode since %s", terrstr());
|
||||
if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
return;
|
||||
}
|
||||
(void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
|
||||
|
||||
pClosedVnode->vgId = pVnode->vgId;
|
||||
pClosedVnode->dropped = pVnode->dropped;
|
||||
pClosedVnode->vgVersion = pVnode->vgVersion;
|
||||
pClosedVnode->diskPrimary = pVnode->diskPrimary;
|
||||
pClosedVnode->toVgId = pVnode->toVgId;
|
||||
|
||||
SVnodeObj *pOld = NULL;
|
||||
r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
|
||||
}
|
||||
if (pOld) {
|
||||
vmFreeVnodeObj(&pOld);
|
||||
}
|
||||
dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
|
||||
r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
|
||||
}
|
||||
};
|
||||
}
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
|
||||
|
@ -391,6 +443,21 @@ _closed:
|
|||
vmFreeVnodeObj(&pVnode);
|
||||
}
|
||||
|
||||
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
|
||||
int32_t r = 0;
|
||||
r = taosThreadRwlockWrlock(&pMgmt->lock);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
|
||||
}
|
||||
if (r == 0) {
|
||||
vmUnRegisterRunningState(pMgmt, vgId);
|
||||
}
|
||||
r = taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
|
||||
int32_t srcVgId = pCfg->vgId;
|
||||
int32_t dstVgId = pCfg->toVgId;
|
||||
|
@ -482,8 +549,9 @@ static void *vmOpenVnodeInThread(void *param) {
|
|||
}
|
||||
|
||||
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
|
||||
pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (pMgmt->hash == NULL) {
|
||||
pMgmt->runngingHash =
|
||||
taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (pMgmt->runngingHash == NULL) {
|
||||
dError("failed to init vnode hash since %s", terrstr());
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -579,32 +647,6 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void vmRemoveFromCreatingHash(SVnodeMgmt *pMgmt, int32_t vgId) {
|
||||
SVnodeObj *pOld = NULL;
|
||||
|
||||
(void)taosThreadRwlockWrlock(&pMgmt->lock);
|
||||
int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to get vnode from creating Hash", vgId);
|
||||
}
|
||||
dTrace("vgId:%d, remove from creating Hash", vgId);
|
||||
r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to remove vnode from hash", vgId);
|
||||
}
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
|
||||
if (pOld) {
|
||||
dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
|
||||
vmFreeVnodeObj(&pOld);
|
||||
}
|
||||
|
||||
_OVER:
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
|
||||
}
|
||||
}
|
||||
|
||||
static void *vmCloseVnodeInThread(void *param) {
|
||||
SVnodeThread *pThread = param;
|
||||
SVnodeMgmt *pMgmt = pThread->pMgmt;
|
||||
|
@ -693,9 +735,9 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
|
|||
taosMemoryFree(ppVnodes);
|
||||
}
|
||||
|
||||
if (pMgmt->hash != NULL) {
|
||||
taosHashCleanup(pMgmt->hash);
|
||||
pMgmt->hash = NULL;
|
||||
if (pMgmt->runngingHash != NULL) {
|
||||
taosHashCleanup(pMgmt->runngingHash);
|
||||
pMgmt->runngingHash = NULL;
|
||||
}
|
||||
|
||||
void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
|
||||
|
|
|
@ -52,7 +52,8 @@ _exit:
|
|||
} else {
|
||||
metaDebug("vgId:%d %s success", TD_VID(pMeta->pVnode), __func__);
|
||||
}
|
||||
return 0;
|
||||
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
// commit the meta txn
|
||||
|
@ -85,7 +86,8 @@ _exit:
|
|||
} else {
|
||||
metaDebug("vgId:%d %s success", TD_VID(pMeta->pVnode), __func__);
|
||||
}
|
||||
return code;
|
||||
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
// abort the meta txn
|
||||
|
|
Loading…
Reference in New Issue