Merge pull request #26506 from taosdata/fix/TS-4785-3.0

fix/TS-4785
This commit is contained in:
Hongze Cheng 2024-07-15 09:05:59 +08:00 committed by GitHub
commit 3cbedf3f4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 59 additions and 2 deletions

View File

@ -34,12 +34,14 @@ typedef struct SVnodeMgmt {
SAutoQWorkerPool streamPool; SAutoQWorkerPool streamPool;
SWWorkerPool fetchPool; SWWorkerPool fetchPool;
SSingleWorker mgmtWorker; SSingleWorker mgmtWorker;
SSingleWorker mgmtMultiWorker;
SHashObj *hash; SHashObj *hash;
TdThreadRwlock lock; TdThreadRwlock lock;
SVnodesStat state; SVnodesStat state;
STfs *pTfs; STfs *pTfs;
TdThread thread; TdThread thread;
bool stop; bool stop;
TdThreadMutex createLock;
} SVnodeMgmt; } SVnodeMgmt;
typedef struct { typedef struct {
@ -69,6 +71,7 @@ typedef struct {
STaosQueue *pQueryQ; STaosQueue *pQueryQ;
STaosQueue *pStreamQ; STaosQueue *pStreamQ;
STaosQueue *pFetchQ; STaosQueue *pFetchQ;
STaosQueue *pMultiMgmQ;
} SVnodeObj; } SVnodeObj;
typedef struct { typedef struct {
@ -125,6 +128,7 @@ int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -377,11 +377,14 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
goto _OVER; goto _OVER;
} }
taosThreadMutexLock(&pMgmt->createLock);
code = vmWriteVnodeListToFile(pMgmt); code = vmWriteVnodeListToFile(pMgmt);
if (code != 0) { if (code != 0) {
code = terrno != 0 ? terrno : code; code = terrno != 0 ? terrno : code;
taosThreadMutexUnlock(&pMgmt->createLock);
goto _OVER; goto _OVER;
} }
taosThreadMutexUnlock(&pMgmt->createLock);
_OVER: _OVER:
if (code != 0) { if (code != 0) {
@ -985,7 +988,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_S3MIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_S3MIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;

View File

@ -439,6 +439,8 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
dInfo("start to close all vnodes"); dInfo("start to close all vnodes");
tSingleWorkerCleanup(&pMgmt->mgmtWorker); tSingleWorkerCleanup(&pMgmt->mgmtWorker);
dInfo("vnodes mgmt worker is stopped"); dInfo("vnodes mgmt worker is stopped");
tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
dInfo("vnodes multiple mgmt worker is stopped");
int32_t numOfVnodes = 0; int32_t numOfVnodes = 0;
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
@ -506,6 +508,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) {
vmStopWorker(pMgmt); vmStopWorker(pMgmt);
vnodeCleanup(); vnodeCleanup();
taosThreadRwlockDestroy(&pMgmt->lock); taosThreadRwlockDestroy(&pMgmt->lock);
taosThreadMutexDestroy(&pMgmt->createLock);
taosMemoryFree(pMgmt); taosMemoryFree(pMgmt);
} }
@ -580,6 +583,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize; pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
pMgmt->msgCb.mgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
taosThreadRwlockInit(&pMgmt->lock, NULL); taosThreadRwlockInit(&pMgmt->lock, NULL);
taosThreadMutexInit(&pMgmt->createLock, NULL);
pMgmt->pTfs = pInput->pTfs; pMgmt->pTfs = pInput->pTfs;
if (pMgmt->pTfs == NULL) { if (pMgmt->pTfs == NULL) {

View File

@ -28,6 +28,31 @@ static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
static void vmProcessMultiMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
const STraceId *trace = &pMsg->info.traceId;
dGTrace("msg:%p, get from vnode-multi-mgmt queue", pMsg);
switch (pMsg->msgType) {
case TDMT_DND_CREATE_VNODE:
code = vmProcessCreateVnodeReq(pMgmt, pMsg);
break;
}
if (IsReq(pMsg)) {
if (code != 0) {
if (terrno != 0) code = terrno;
dGError("msg:%p, failed to process since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
}
vmSendRsp(pMsg, code);
}
dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeMgmt *pMgmt = pInfo->ahandle; SVnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1; int32_t code = -1;
@ -271,6 +296,13 @@ int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsg
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_QUEUE); } int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_QUEUE); }
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId;
dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg);
taosWriteQitem(pMgmt->mgmtMultiWorker.queue, pMsg);
return 0;
}
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
dGTrace("msg:%p, put into vnode-mgmt queue", pMsg); dGTrace("msg:%p, put into vnode-mgmt queue", pMsg);
@ -415,6 +447,20 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1; if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1;
int32_t threadNum = 0;
if (tsNumOfCores == 1) {
threadNum = 2;
} else {
threadNum = tsNumOfCores;
}
SSingleWorkerCfg multiMgmtCfg = {.min = threadNum,
.max = threadNum,
.name = "vnode-multi-mgmt",
.fp = (FItem)vmProcessMultiMgmtQueue,
.param = pMgmt};
if (tSingleWorkerInit(&pMgmt->mgmtMultiWorker, &multiMgmtCfg) != 0) return -1;
dDebug("vnode workers are initialized"); dDebug("vnode workers are initialized");
return 0; return 0;
} }

View File

@ -310,7 +310,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
pWrapper = &pDnode->wrappers[ntype]; pWrapper = &pDnode->wrappers[ntype];
if (taosMkDir(pWrapper->path) != 0) { if (taosMkDir(pWrapper->path) != 0) {
dmReleaseWrapper(pWrapper); taosThreadMutexUnlock(&pDnode->mutex);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to create dir:%s since %s", pWrapper->path, terrstr()); dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
return -1; return -1;