Merge pull request #14818 from taosdata/fix/mnode

enh: adjust vnode fetch queue number
This commit is contained in:
Shengliang Guan 2022-07-12 20:43:05 +08:00 committed by GitHub
commit 148e3a5f97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 99 additions and 65 deletions

View File

@ -44,6 +44,8 @@ typedef struct STaosQset STaosQset;
typedef struct STaosQall STaosQall;
typedef struct {
void *ahandle;
void *fp;
void *queue;
int32_t workerId;
int32_t threadNum;
int64_t timestamp;
@ -65,6 +67,7 @@ void taosFreeQitem(void *pItem);
void taosWriteQitem(STaosQueue *queue, void *pItem);
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
bool taosQueueEmpty(STaosQueue *queue);
void taosUpdateItemSize(STaosQueue *queue, int32_t items);
int32_t taosQueueItemSize(STaosQueue *queue);
int64_t taosQueueMemorySize(STaosQueue *queue);
@ -81,8 +84,8 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle);
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue);
int32_t taosGetQueueNumber(STaosQset *qset);
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void **ahandle, FItem *itemFp);
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp);
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo);
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo);
void taosResetQsetThread(STaosQset *qset, void *pItem);
extern int64_t tsRpcQueueMemoryAllowed;

View File

@ -412,7 +412,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2);
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeFetchThreads = TRANGE(tsNumOfVnodeFetchThreads, 1, 1);
tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeWriteThreads = tsNumOfCores;

View File

@ -31,7 +31,7 @@ typedef struct SVnodeMgmt {
const char *path;
const char *name;
SQWorkerPool queryPool;
SQWorkerPool fetchPool;
SWWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool writePool;
SWWorkerPool applyPool;

View File

@ -31,7 +31,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
SVnodeObj *pVnode = *ppVnode;
if (pVnode && num < size) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
// dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
// dTrace("vgId:%d, acquire vnode list, ref:%d", pVnode->vgId, refCount);
pVnodes[num++] = (*ppVnode);
pIter = taosHashIterate(pMgmt->hash, pIter);
} else {

View File

@ -23,6 +23,7 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL || pVnode->dropped) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
pVnode = NULL;
} else {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
// dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount);
@ -82,6 +83,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
taosThreadRwlockUnlock(&pMgmt->lock);
vmReleaseVnode(pMgmt, pVnode);
dTrace("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
while (pVnode->refCount > 0) taosMsleep(10);
dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId);

View File

@ -81,21 +81,26 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem(pMsg);
}
static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeObj *pVnode = pInfo->ahandle;
const STraceId *trace = &pMsg->info.traceId;
static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr());
vmSendRsp(pMsg, code);
for (int32_t i = 0; i < numOfMsgs; ++i) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
const STraceId *trace = &pMsg->info.traceId;
dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr());
vmSendRsp(pMsg, code);
}
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
@ -201,9 +206,9 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
int32_t code = vmPutMsgToQueue(pMgmt, pMsg, qtype);
if (code != 0) {
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pMsg->pCont);
pRpc->pCont = NULL;
taosFreeQitem(pMsg);
}
return code;
@ -232,8 +237,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
default:
break;
}
vmReleaseVnode(pMgmt, pVnode);
}
vmReleaseVnode(pMgmt, pVnode);
return size;
}
@ -242,7 +247,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg);
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL ||
pVnode->pFetchQ == NULL) {
@ -250,7 +255,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
return -1;
}
dDebug("vgId:%d, queue is alloced", pVnode->vgId);
dDebug("vgId:%d, write-queue:%p is alloced", pVnode->vgId, pVnode->pWriteQ);
dDebug("vgId:%d, sync-queue:%p is alloced", pVnode->vgId, pVnode->pSyncQ);
dDebug("vgId:%d, apply-queue:%p is alloced", pVnode->vgId, pVnode->pApplyQ);
dDebug("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ);
dDebug("vgId:%d, fetch-queue:%p is alloced", pVnode->vgId, pVnode->pFetchQ);
return 0;
}
@ -259,7 +268,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ);
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
pVnode->pWriteQ = NULL;
pVnode->pSyncQ = NULL;
pVnode->pApplyQ = NULL;
@ -275,11 +284,10 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
pQPool->max = tsNumOfVnodeQueryThreads;
if (tQWorkerInit(pQPool) != 0) return -1;
SQWorkerPool *pFPool = &pMgmt->fetchPool;
SWWorkerPool *pFPool = &pMgmt->fetchPool;
pFPool->name = "vnode-fetch";
pFPool->min = tsNumOfVnodeFetchThreads;
pFPool->max = tsNumOfVnodeFetchThreads;
if (tQWorkerInit(pFPool) != 0) return -1;
if (tWWorkerInit(pFPool) != 0) return -1;
SWWorkerPool *pWPool = &pMgmt->writePool;
pWPool->name = "vnode-write";
@ -325,6 +333,6 @@ void vmStopWorker(SVnodeMgmt *pMgmt) {
tWWorkerCleanup(&pMgmt->applyPool);
tWWorkerCleanup(&pMgmt->syncPool);
tQWorkerCleanup(&pMgmt->queryPool);
tQWorkerCleanup(&pMgmt->fetchPool);
tWWorkerCleanup(&pMgmt->fetchPool);
dDebug("vnode workers are closed");
}

View File

@ -244,6 +244,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqOffsetVal reqOffset = pReq->reqOffset;
STqOffsetVal fetchOffsetNew;
// todo
workerId = 0;
// 1.find handle
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
/*ASSERT(pHandle);*/

View File

@ -242,13 +242,13 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
}
static void *syncIOConsumerFunc(void *param) {
SSyncIO * io = param;
STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg;
qall = taosAllocateQall();
SSyncIO *io = param;
STaosQall *qall = taosAllocateQall();
SRpcMsg *pRpcMsg, rpcMsg;
SQueueInfo qinfo = {0};
while (1) {
int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL);
int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, &qinfo);
sTrace("syncIOConsumerFunc %d msgs are received", numOfMsgs);
if (numOfMsgs <= 0) {
break;
@ -369,6 +369,8 @@ static void *syncIOConsumerFunc(void *param) {
taosFreeQitem(pRpcMsg);
}
taosUpdateItemSize(qinfo.queue, numOfMsgs);
}
taosFreeQall(qall);

View File

@ -31,12 +31,12 @@ void processShellMsg() {
STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg;
int type;
void * pvnode;
SQueueInfo qinfo = {0};
qall = taosAllocateQall();
while (1) {
int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode, NULL);
int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &qinfo);
tDebug("%d shell msgs are received", numOfMsgs);
if (numOfMsgs <= 0) break;
@ -86,6 +86,8 @@ void processShellMsg() {
rpcSendResponse(&nRpcMsg);
}
}
taosUpdateItemSize(qinfo.queue, numOfMsgs);
}
taosFreeQall(qall);

View File

@ -115,7 +115,7 @@ bool taosQueueEmpty(STaosQueue *queue) {
bool empty = false;
taosThreadMutexLock(&queue->mutex);
if (queue->head == NULL && queue->tail == NULL) {
if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 && queue->memOfItems == 0) {
empty = true;
}
taosThreadMutexUnlock(&queue->mutex);
@ -123,6 +123,14 @@ bool taosQueueEmpty(STaosQueue *queue) {
return empty;
}
void taosUpdateItemSize(STaosQueue *queue, int32_t items) {
if (queue == NULL) return;
taosThreadMutexLock(&queue->mutex);
queue->numOfItems -= items;
taosThreadMutexUnlock(&queue->mutex);
}
int32_t taosQueueItemSize(STaosQueue *queue) {
if (queue == NULL) return 0;
@ -257,6 +265,7 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
queue->tail = NULL;
queue->numOfItems = 0;
queue->memOfItems = 0;
uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, code, queue, queue->numOfItems, queue->memOfItems);
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
}
@ -397,7 +406,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void **ahandle, FItem *itemFp) {
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) {
STaosQnode *pNode = NULL;
int32_t code = 0;
@ -417,17 +426,18 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void
if (queue->head) {
pNode = queue->head;
*ppItem = pNode->item;
if (ahandle) *ahandle = queue->ahandle;
if (itemFp) *itemFp = queue->itemFp;
if (ts) *ts = pNode->timestamp;
qinfo->ahandle = queue->ahandle;
qinfo->fp = queue->itemFp;
qinfo->queue = queue;
qinfo->timestamp = pNode->timestamp;
queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL;
queue->numOfItems--;
// queue->numOfItems--;
queue->memOfItems -= pNode->size;
atomic_sub_fetch_32(&qset->numOfItems, 1);
code = 1;
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1,
queue->memOfItems);
}
@ -440,7 +450,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void
return code;
}
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp) {
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo) {
STaosQueue *queue;
int32_t code = 0;
@ -461,13 +471,16 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
qall->start = queue->head;
qall->numOfItems = queue->numOfItems;
code = qall->numOfItems;
if (ahandle) *ahandle = queue->ahandle;
if (itemsFp) *itemsFp = queue->itemsFp;
qinfo->ahandle = queue->ahandle;
qinfo->fp = queue->itemsFp;
qinfo->queue = queue;
queue->head = NULL;
queue->tail = NULL;
queue->numOfItems = 0;
// queue->numOfItems = 0;
queue->memOfItems = 0;
uTrace("read %d items from queue:%p, items:0 mem:%" PRId64, code, queue, queue->memOfItems);
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
for (int32_t j = 1; j < qall->numOfItems; ++j) {
tsem_wait(&qset->sem);

View File

@ -70,27 +70,27 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
static void *tQWorkerThreadFp(SQWorker *worker) {
SQWorkerPool *pool = worker->pool;
FItem fp = NULL;
void *msg = NULL;
void *ahandle = NULL;
int32_t code = 0;
int64_t ts = 0;
SQueueInfo qinfo = {0};
void *msg = NULL;
int32_t code = 0;
taosBlockSIGPIPE();
setThreadName(pool->name);
uDebug("worker:%s:%d is running", pool->name, worker->id);
while (1) {
if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ts, &ahandle, &fp) == 0) {
if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
break;
}
if (fp != NULL) {
SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num, .timestamp = ts};
(*fp)(&info, msg);
if (qinfo.fp != NULL) {
qinfo.workerId = worker->id;
qinfo.threadNum = pool->num;
(*((FItem)qinfo.fp))(&qinfo, msg);
}
taosUpdateItemSize(qinfo.queue, 1);
}
return NULL;
@ -195,28 +195,28 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
static void *tWWorkerThreadFp(SWWorker *worker) {
SWWorkerPool *pool = worker->pool;
FItems fp = NULL;
void *msg = NULL;
void *ahandle = NULL;
int32_t numOfMsgs = 0;
int32_t qtype = 0;
SQueueInfo qinfo = {0};
void *msg = NULL;
int32_t code = 0;
int32_t numOfMsgs = 0;
taosBlockSIGPIPE();
setThreadName(pool->name);
uDebug("worker:%s:%d is running", pool->name, worker->id);
while (1) {
numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp);
numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
if (numOfMsgs == 0) {
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
break;
}
if (fp != NULL) {
SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num};
(*fp)(&info, worker->qall, numOfMsgs);
if (qinfo.fp != NULL) {
qinfo.workerId = worker->id;
qinfo.threadNum = pool->num;
(*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs);
}
taosUpdateItemSize(qinfo.queue, numOfMsgs);
}
return NULL;