fix/TD-32863-add-creating-hash
This commit is contained in:
parent
3c34ce6e96
commit
b141e0e2ba
|
@ -37,6 +37,7 @@ typedef struct SVnodeMgmt {
|
|||
SSingleWorker mgmtMultiWorker;
|
||||
SHashObj *hash;
|
||||
SHashObj *closedHash;
|
||||
SHashObj *creatingHash;
|
||||
TdThreadRwlock lock;
|
||||
TdThreadMutex mutex;
|
||||
SVnodesStat state;
|
||||
|
@ -97,6 +98,7 @@ SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict);
|
|||
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
|
||||
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
|
||||
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed);
|
||||
void vmRemoveFromCreatingHash(SVnodeMgmt *pMgmt, int32_t vgId);
|
||||
|
||||
// vmHandle.c
|
||||
SArray *vmGetMsgHandles();
|
||||
|
@ -114,6 +116,7 @@ int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t
|
|||
int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt);
|
||||
int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes);
|
||||
int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes);
|
||||
int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes);
|
||||
|
||||
// vmWorker.c
|
||||
int32_t vmStartWorker(SVnodeMgmt *pMgmt);
|
||||
|
|
|
@ -67,6 +67,54 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
|
||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
|
||||
int32_t num = 0;
|
||||
int32_t size = taosHashGetSize(pMgmt->hash);
|
||||
int32_t creatingSize = taosHashGetSize(pMgmt->creatingHash);
|
||||
size += creatingSize;
|
||||
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
|
||||
if (pVnodes == NULL) {
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||
while (pIter) {
|
||||
SVnodeObj **ppVnode = pIter;
|
||||
SVnodeObj *pVnode = *ppVnode;
|
||||
if (pVnode && num < size) {
|
||||
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||
dTrace("vgId:%d,acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
|
||||
pVnodes[num++] = (*ppVnode);
|
||||
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||
} else {
|
||||
taosHashCancelIterate(pMgmt->hash, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pMgmt->creatingHash, NULL);
|
||||
while (pIter) {
|
||||
SVnodeObj **ppVnode = pIter;
|
||||
SVnodeObj *pVnode = *ppVnode;
|
||||
if (pVnode && num < size) {
|
||||
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||
dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
|
||||
pVnodes[num++] = (*ppVnode);
|
||||
pIter = taosHashIterate(pMgmt->creatingHash, pIter);
|
||||
} else {
|
||||
taosHashCancelIterate(pMgmt->creatingHash, pIter);
|
||||
}
|
||||
}
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
|
||||
*numOfVnodes = num;
|
||||
*ppVnodes = pVnodes;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
|
||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
|
||||
|
|
|
@ -421,6 +421,8 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
vmRemoveFromCreatingHash(pMgmt, req.vgId);
|
||||
|
||||
_OVER:
|
||||
if (code != 0) {
|
||||
int32_t r = 0;
|
||||
|
|
|
@ -62,7 +62,7 @@ int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
|
|||
int32_t numOfVnodes = 0;
|
||||
SVnodeObj **ppVnodes = NULL;
|
||||
|
||||
code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
|
||||
code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
@ -92,6 +92,25 @@ int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
(void)taosThreadRwlockWrlock(&pMgmt->lock);
|
||||
|
||||
SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
|
||||
if (pCreatingVnode == NULL) {
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
dError("failed to alloc vnode since %s", terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
(void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
|
||||
|
||||
pCreatingVnode->vgId = vgId;
|
||||
pCreatingVnode->diskPrimary = diskId;
|
||||
|
||||
int32_t r = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to put vnode to creatingHash", vgId);
|
||||
}
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
|
||||
_OVER:
|
||||
|
||||
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||
|
@ -234,12 +253,12 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
|
|||
}
|
||||
if (keepClosed) {
|
||||
SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
|
||||
(void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
|
||||
if (pVnode == NULL) {
|
||||
dError("vgId:%d, failed to alloc vnode since %s", pVnode->vgId, terrstr());
|
||||
if (pClosedVnode == NULL) {
|
||||
dError("failed to alloc vnode since %s", terrstr());
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
return;
|
||||
}
|
||||
(void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
|
||||
|
||||
pClosedVnode->vgId = pVnode->vgId;
|
||||
pClosedVnode->dropped = pVnode->dropped;
|
||||
|
@ -445,11 +464,18 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
|
|||
|
||||
pMgmt->closedHash =
|
||||
taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (pMgmt->hash == NULL) {
|
||||
if (pMgmt->closedHash == NULL) {
|
||||
dError("failed to init vnode closed hash since %s", terrstr());
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pMgmt->creatingHash =
|
||||
taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (pMgmt->creatingHash == NULL) {
|
||||
dError("failed to init vnode creatingHash hash since %s", terrstr());
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SWrapperCfg *pCfgs = NULL;
|
||||
int32_t numOfVnodes = 0;
|
||||
if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
|
||||
|
@ -527,6 +553,28 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void vmRemoveFromCreatingHash(SVnodeMgmt *pMgmt, int32_t vgId) {
|
||||
(void)taosThreadRwlockWrlock(&pMgmt->lock);
|
||||
SVnodeObj *pOld = NULL;
|
||||
int32_t r = taosHashGetDup(pMgmt->closedHash, &vgId, sizeof(int32_t), (void *)&pOld);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to get vnode from closedHash", vgId);
|
||||
}
|
||||
if (pOld) {
|
||||
vmFreeVnodeObj(&pOld);
|
||||
}
|
||||
r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to remove vnode from hash", vgId);
|
||||
}
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
|
||||
_OVER:
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
|
||||
}
|
||||
}
|
||||
|
||||
static void *vmCloseVnodeInThread(void *param) {
|
||||
SVnodeThread *pThread = param;
|
||||
SVnodeMgmt *pMgmt = pThread->pMgmt;
|
||||
|
@ -632,6 +680,18 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
|
|||
pMgmt->closedHash = NULL;
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pMgmt->creatingHash, NULL);
|
||||
while (pIter) {
|
||||
SVnodeObj **ppVnode = pIter;
|
||||
vmFreeVnodeObj(ppVnode);
|
||||
pIter = taosHashIterate(pMgmt->creatingHash, pIter);
|
||||
}
|
||||
|
||||
if (pMgmt->creatingHash != NULL) {
|
||||
taosHashCleanup(pMgmt->creatingHash);
|
||||
pMgmt->creatingHash = NULL;
|
||||
}
|
||||
|
||||
dInfo("total vnodes:%d are all closed", numOfVnodes);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue