Merge pull request #17013 from taosdata/fix/TD-19205
fix: restore vnodes in multi-threads
This commit is contained in:
commit
479218fb18
|
@ -74,6 +74,7 @@ typedef struct {
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
SVnodeMgmt *pMgmt;
|
SVnodeMgmt *pMgmt;
|
||||||
SWrapperCfg *pCfgs;
|
SWrapperCfg *pCfgs;
|
||||||
|
SVnodeObj **ppVnodes;
|
||||||
} SVnodeThread;
|
} SVnodeThread;
|
||||||
|
|
||||||
// vmInt.c
|
// vmInt.c
|
||||||
|
|
|
@ -218,14 +218,14 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
|
||||||
dInfo("start to close all vnodes");
|
dInfo("start to close all vnodes");
|
||||||
|
|
||||||
int32_t numOfVnodes = 0;
|
int32_t numOfVnodes = 0;
|
||||||
SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
|
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||||
vmCloseVnode(pMgmt, pVnodes[i]);
|
vmCloseVnode(pMgmt, ppVnodes[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnodes != NULL) {
|
if (ppVnodes != NULL) {
|
||||||
taosMemoryFree(pVnodes);
|
taosMemoryFree(ppVnodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMgmt->hash != NULL) {
|
if (pMgmt->hash != NULL) {
|
||||||
|
@ -331,22 +331,92 @@ static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vmStart(SVnodeMgmt *pMgmt) {
|
static void *vmRestoreVnodeInThread(void *param) {
|
||||||
int32_t numOfVnodes = 0;
|
SVnodeThread *pThread = param;
|
||||||
SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
|
SVnodeMgmt *pMgmt = pThread->pMgmt;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
|
||||||
SVnodeObj *pVnode = pVnodes[i];
|
setThreadName("restore-vnodes");
|
||||||
vnodeStart(pVnode->pImpl);
|
|
||||||
|
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
|
||||||
|
SVnodeObj *pVnode = pThread->ppVnodes[v];
|
||||||
|
|
||||||
|
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
|
||||||
|
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
|
||||||
|
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
|
||||||
|
tmsgReportStartup("vnode-restore", stepDesc);
|
||||||
|
|
||||||
|
int32_t code = vnodeStart(pVnode->pImpl);
|
||||||
|
if (code != 0) {
|
||||||
|
dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
|
||||||
|
pThread->failed++;
|
||||||
|
} else {
|
||||||
|
dDebug("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
|
||||||
|
pThread->opened++;
|
||||||
|
atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
|
||||||
|
pThread->failed);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
|
||||||
|
int32_t numOfVnodes = 0;
|
||||||
|
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
|
||||||
|
|
||||||
|
int32_t threadNum = tsNumOfCores / 2;
|
||||||
|
if (threadNum < 1) threadNum = 1;
|
||||||
|
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
|
||||||
|
|
||||||
|
SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
|
||||||
|
for (int32_t t = 0; t < threadNum; ++t) {
|
||||||
|
threads[t].threadIndex = t;
|
||||||
|
threads[t].pMgmt = pMgmt;
|
||||||
|
threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t v = 0; v < numOfVnodes; ++v) {
|
||||||
|
int32_t t = v % threadNum;
|
||||||
|
SVnodeThread *pThread = &threads[t];
|
||||||
|
pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
|
||||||
|
}
|
||||||
|
|
||||||
|
pMgmt->state.openVnodes = 0;
|
||||||
|
dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
|
||||||
|
|
||||||
|
for (int32_t t = 0; t < threadNum; ++t) {
|
||||||
|
SVnodeThread *pThread = &threads[t];
|
||||||
|
if (pThread->vnodeNum == 0) continue;
|
||||||
|
|
||||||
|
TdThreadAttr thAttr;
|
||||||
|
taosThreadAttrInit(&thAttr);
|
||||||
|
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
|
||||||
|
dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadAttrDestroy(&thAttr);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t t = 0; t < threadNum; ++t) {
|
||||||
|
SVnodeThread *pThread = &threads[t];
|
||||||
|
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
|
||||||
|
taosThreadJoin(pThread->thread, NULL);
|
||||||
|
taosThreadClear(&pThread->thread);
|
||||||
|
}
|
||||||
|
taosMemoryFree(pThread->ppVnodes);
|
||||||
|
}
|
||||||
|
taosMemoryFree(threads);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||||
SVnodeObj *pVnode = pVnodes[i];
|
SVnodeObj *pVnode = ppVnodes[i];
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
vmReleaseVnode(pMgmt, pVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnodes != NULL) {
|
if (ppVnodes != NULL) {
|
||||||
taosMemoryFree(pVnodes);
|
taosMemoryFree(ppVnodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -360,7 +430,7 @@ SMgmtFunc vmGetMgmtFunc() {
|
||||||
SMgmtFunc mgmtFunc = {0};
|
SMgmtFunc mgmtFunc = {0};
|
||||||
mgmtFunc.openFp = vmInit;
|
mgmtFunc.openFp = vmInit;
|
||||||
mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
|
mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
|
||||||
mgmtFunc.startFp = (NodeStartFp)vmStart;
|
mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
|
||||||
mgmtFunc.stopFp = (NodeStopFp)vmStop;
|
mgmtFunc.stopFp = (NodeStopFp)vmStop;
|
||||||
mgmtFunc.requiredFp = vmRequire;
|
mgmtFunc.requiredFp = vmRequire;
|
||||||
mgmtFunc.getHandlesFp = vmGetMsgHandles;
|
mgmtFunc.getHandlesFp = vmGetMsgHandles;
|
||||||
|
|
|
@ -12,7 +12,7 @@ $tb = $tbPrefix . $i
|
||||||
print =============== step1
|
print =============== step1
|
||||||
sql drop database -x step1
|
sql drop database -x step1
|
||||||
step1:
|
step1:
|
||||||
sql create database $db
|
sql create database $db vgroups 2
|
||||||
sql use $db
|
sql use $db
|
||||||
sql create table $tb (ts timestamp, speed int)
|
sql create table $tb (ts timestamp, speed int)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue