Merge pull request #17468 from taosdata/enh/TD-19611
enh: close vnode with multiple threads
This commit is contained in:
commit
c58daa705b
|
@ -217,17 +217,80 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
|
|||
}
|
||||
}
|
||||
|
||||
static void *vmCloseVnodeInThread(void *param) {
|
||||
SVnodeThread *pThread = param;
|
||||
SVnodeMgmt *pMgmt = pThread->pMgmt;
|
||||
|
||||
dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
|
||||
setThreadName("close-vnodes");
|
||||
|
||||
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
|
||||
SVnodeObj *pVnode = pThread->ppVnodes[v];
|
||||
|
||||
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
|
||||
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
|
||||
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
|
||||
tmsgReportStartup("vnode-close", stepDesc);
|
||||
|
||||
vmCloseVnode(pMgmt, pVnode);
|
||||
}
|
||||
|
||||
dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
|
||||
dInfo("start to close all vnodes");
|
||||
|
||||
int32_t numOfVnodes = 0;
|
||||
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
|
||||
|
||||
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||
if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
|
||||
vmCloseVnode(pMgmt, ppVnodes[i]);
|
||||
int32_t threadNum = tsNumOfCores / 2;
|
||||
if (threadNum < 1) threadNum = 1;
|
||||
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
|
||||
|
||||
SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
|
||||
for (int32_t t = 0; t < threadNum; ++t) {
|
||||
threads[t].threadIndex = t;
|
||||
threads[t].pMgmt = pMgmt;
|
||||
threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
|
||||
}
|
||||
|
||||
for (int32_t v = 0; v < numOfVnodes; ++v) {
|
||||
int32_t t = v % threadNum;
|
||||
SVnodeThread *pThread = &threads[t];
|
||||
if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
|
||||
pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
|
||||
}
|
||||
}
|
||||
|
||||
pMgmt->state.openVnodes = 0;
|
||||
dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
|
||||
|
||||
for (int32_t t = 0; t < threadNum; ++t) {
|
||||
SVnodeThread *pThread = &threads[t];
|
||||
if (pThread->vnodeNum == 0) continue;
|
||||
|
||||
TdThreadAttr thAttr;
|
||||
taosThreadAttrInit(&thAttr);
|
||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
|
||||
dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
|
||||
}
|
||||
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
}
|
||||
|
||||
for (int32_t t = 0; t < threadNum; ++t) {
|
||||
SVnodeThread *pThread = &threads[t];
|
||||
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
|
||||
taosThreadJoin(pThread->thread, NULL);
|
||||
taosThreadClear(&pThread->thread);
|
||||
}
|
||||
taosMemoryFree(pThread->ppVnodes);
|
||||
}
|
||||
taosMemoryFree(threads);
|
||||
|
||||
if (ppVnodes != NULL) {
|
||||
taosMemoryFree(ppVnodes);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue