Merge pull request #10017 from taosdata/feature/mnode

refact worker for fetch
This commit is contained in:
Shengliang Guan 2022-01-25 18:39:22 +08:00 committed by GitHub
commit 59d7cb6d82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 319 additions and 166 deletions

View File

@ -15,6 +15,7 @@
#ifndef _TD_UTIL_QUEUE_H #ifndef _TD_UTIL_QUEUE_H
#define _TD_UTIL_QUEUE_H #define _TD_UTIL_QUEUE_H
#include "os.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -40,12 +41,12 @@ shall be used to set up the protection.
typedef struct STaosQueue STaosQueue; typedef struct STaosQueue STaosQueue;
typedef struct STaosQset STaosQset; typedef struct STaosQset STaosQset;
typedef struct STaosQall STaosQall; typedef struct STaosQall STaosQall;
typedef void (*FProcessItem)(void *ahandle, void *pItem); typedef void (*FItem)(void *ahandle, void *pItem);
typedef void (*FProcessItems)(void *ahandle, STaosQall *qall, int32_t numOfItems); typedef void (*FItems)(void *ahandle, STaosQall *qall, int32_t numOfItems);
STaosQueue *taosOpenQueue(); STaosQueue *taosOpenQueue();
void taosCloseQueue(STaosQueue *queue); void taosCloseQueue(STaosQueue *queue);
void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp); void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
void * taosAllocateQitem(int32_t size); void * taosAllocateQitem(int32_t size);
void taosFreeQitem(void *pItem); void taosFreeQitem(void *pItem);
int32_t taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosWriteQitem(STaosQueue *queue, void *pItem);
@ -66,8 +67,11 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle);
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue); void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue);
int32_t taosGetQueueNumber(STaosQset *qset); int32_t taosGetQueueNumber(STaosQset *qset);
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp); int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp);
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp); int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp);
int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId);
void taosResetQsetThread(STaosQset *qset, void *pItem);
int32_t taosGetQueueItemsNumber(STaosQueue *queue); int32_t taosGetQueueItemsNumber(STaosQueue *queue);
int32_t taosGetQsetItemsNumber(STaosQset *qset); int32_t taosGetQsetItemsNumber(STaosQset *qset);

View File

@ -15,57 +15,61 @@
#ifndef _TD_UTIL_WORKER_H #ifndef _TD_UTIL_WORKER_H
#define _TD_UTIL_WORKER_H #define _TD_UTIL_WORKER_H
#include "tqueue.h" #include "tqueue.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct SWorkerPool SWorkerPool; typedef struct SQWorkerPool SQWorkerPool;
typedef struct SMWorkerPool SMWorkerPool; typedef struct SWWorkerPool SWWorkerPool;
typedef struct SWorker { typedef struct SQWorker {
int32_t id; // worker ID int32_t id; // worker ID
pthread_t thread; // thread pthread_t thread; // thread
SWorkerPool *pool; SQWorkerPool *pool;
} SWorker; } SQWorker, SFWorker;
typedef struct SWorkerPool { typedef struct SQWorkerPool {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t min; // min number of workers int32_t min; // min number of workers
int32_t num; // current number of workers int32_t num; // current number of workers
STaosQset * qset; STaosQset * qset;
const char * name; const char * name;
SWorker *workers; SQWorker * workers;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SWorkerPool; } SQWorkerPool, SFWorkerPool;
typedef struct SMWorker { typedef struct SWWorker {
int32_t id; // worker id int32_t id; // worker id
pthread_t thread; // thread pthread_t thread; // thread
STaosQall * qall; STaosQall * qall;
STaosQset * qset; // queue set STaosQset * qset; // queue set
SMWorkerPool *pool; SWWorkerPool *pool;
} SMWorker; } SWWorker;
typedef struct SMWorkerPool { typedef struct SWWorkerPool {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic int32_t nextId; // from 0 to max-1, cyclic
const char * name; const char * name;
SMWorker *workers; SWWorker * workers;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SMWorkerPool; } SWWorkerPool;
int32_t tWorkerInit(SWorkerPool *pool); int32_t tQWorkerInit(SQWorkerPool *pool);
void tWorkerCleanup(SWorkerPool *pool); void tQWorkerCleanup(SQWorkerPool *pool);
STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp); STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp);
void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue); void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue);
int32_t tMWorkerInit(SMWorkerPool *pool); int32_t tFWorkerInit(SFWorkerPool *pool);
void tMWorkerCleanup(SMWorkerPool *pool); void tFWorkerCleanup(SFWorkerPool *pool);
STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp); STaosQueue *tFWorkerAllocQueue(SFWorkerPool *pool, void *ahandle, FItem fp);
void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue); void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue);
int32_t tWWorkerInit(SWWorkerPool *pool);
void tWWorkerCleanup(SWWorkerPool *pool);
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp);
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -31,8 +31,8 @@ typedef struct {
SDnode *pDnode; SDnode *pDnode;
STaosQueue *queue; STaosQueue *queue;
union { union {
SWorkerPool pool; SQWorkerPool pool;
SMWorkerPool mpool; SWWorkerPool mpool;
}; };
} SDnodeWorker; } SDnodeWorker;
@ -109,10 +109,10 @@ typedef struct {
int32_t openVnodes; int32_t openVnodes;
int32_t totalVnodes; int32_t totalVnodes;
SRWLatch latch; SRWLatch latch;
SWorkerPool queryPool; SQWorkerPool queryPool;
SWorkerPool fetchPool; SFWorkerPool fetchPool;
SMWorkerPool syncPool; SWWorkerPool syncPool;
SMWorkerPool writePool; SWWorkerPool writePool;
} SVnodesMgmt; } SVnodesMgmt;
typedef struct { typedef struct {

View File

@ -910,27 +910,27 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
int32_t maxWriteThreads = TMAX(pDnode->env.numOfCores, 1); int32_t maxWriteThreads = TMAX(pDnode->env.numOfCores, 1);
int32_t maxSyncThreads = TMAX(pDnode->env.numOfCores / 2, 1); int32_t maxSyncThreads = TMAX(pDnode->env.numOfCores / 2, 1);
SWorkerPool *pPool = &pMgmt->queryPool; SQWorkerPool *pQPool = &pMgmt->queryPool;
pPool->name = "vnode-query"; pQPool->name = "vnode-query";
pPool->min = minQueryThreads; pQPool->min = minQueryThreads;
pPool->max = maxQueryThreads; pQPool->max = maxQueryThreads;
if (tWorkerInit(pPool) != 0) return -1; if (tQWorkerInit(pQPool) != 0) return -1;
pPool = &pMgmt->fetchPool; SFWorkerPool *pFPool = &pMgmt->fetchPool;
pPool->name = "vnode-fetch"; pFPool->name = "vnode-fetch";
pPool->min = minFetchThreads; pFPool->min = minFetchThreads;
pPool->max = maxFetchThreads; pFPool->max = maxFetchThreads;
if (tWorkerInit(pPool) != 0) return -1; if (tFWorkerInit(pFPool) != 0) return -1;
SMWorkerPool *pMPool = &pMgmt->writePool; SWWorkerPool *pWPool = &pMgmt->writePool;
pMPool->name = "vnode-write"; pWPool->name = "vnode-write";
pMPool->max = maxWriteThreads; pWPool->max = maxWriteThreads;
if (tMWorkerInit(pMPool) != 0) return -1; if (tWWorkerInit(pWPool) != 0) return -1;
pMPool = &pMgmt->syncPool; pWPool = &pMgmt->syncPool;
pMPool->name = "vnode-sync"; pWPool->name = "vnode-sync";
pMPool->max = maxSyncThreads; pWPool->max = maxSyncThreads;
if (tMWorkerInit(pMPool) != 0) return -1; if (tWWorkerInit(pWPool) != 0) return -1;
dDebug("vnode workers is initialized"); dDebug("vnode workers is initialized");
return 0; return 0;
@ -938,21 +938,21 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
static void dndCleanupVnodeWorkers(SDnode *pDnode) { static void dndCleanupVnodeWorkers(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tWorkerCleanup(&pMgmt->fetchPool); tFWorkerCleanup(&pMgmt->fetchPool);
tWorkerCleanup(&pMgmt->queryPool); tQWorkerCleanup(&pMgmt->queryPool);
tMWorkerCleanup(&pMgmt->writePool); tWWorkerCleanup(&pMgmt->writePool);
tMWorkerCleanup(&pMgmt->syncPool); tWWorkerCleanup(&pMgmt->syncPool);
dDebug("vnode workers is closed"); dDebug("vnode workers is closed");
} }
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue); pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeWriteQueue);
pVnode->pApplyQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeApplyQueue);
pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)dndProcessVnodeSyncQueue);
pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue); pVnode->pFetchQ = tFWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)dndProcessVnodeFetchQueue);
pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)dndProcessVnodeQueryQueue);
if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
pVnode->pQueryQ == NULL) { pVnode->pQueryQ == NULL) {
@ -965,11 +965,11 @@ static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tFWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
pVnode->pWriteQ = NULL; pVnode->pWriteQ = NULL;
pVnode->pApplyQ = NULL; pVnode->pApplyQ = NULL;
pVnode->pSyncQ = NULL; pVnode->pSyncQ = NULL;

View File

@ -31,28 +31,28 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c
pWorker->pDnode = pDnode; pWorker->pDnode = pDnode;
if (pWorker->type == DND_WORKER_SINGLE) { if (pWorker->type == DND_WORKER_SINGLE) {
SWorkerPool *pPool = &pWorker->pool; SQWorkerPool *pPool = &pWorker->pool;
pPool->name = name; pPool->name = name;
pPool->min = minNum; pPool->min = minNum;
pPool->max = maxNum; pPool->max = maxNum;
if (tWorkerInit(pPool) != 0) { if (tQWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pWorker->queue = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)queueFp); pWorker->queue = tQWorkerAllocQueue(pPool, pDnode, (FItem)queueFp);
if (pWorker->queue == NULL) { if (pWorker->queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
} else if (pWorker->type == DND_WORKER_MULTI) { } else if (pWorker->type == DND_WORKER_MULTI) {
SMWorkerPool *pPool = &pWorker->mpool; SWWorkerPool *pPool = &pWorker->mpool;
pPool->name = name; pPool->name = name;
pPool->max = maxNum; pPool->max = maxNum;
if (tMWorkerInit(pPool) != 0) { if (tWWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pWorker->queue = tMWorkerAllocQueue(pPool, pDnode, (FProcessItems)queueFp); pWorker->queue = tWWorkerAllocQueue(pPool, pDnode, (FItems)queueFp);
if (pWorker->queue == NULL) { if (pWorker->queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
@ -70,11 +70,11 @@ void dndCleanupWorker(SDnodeWorker *pWorker) {
} }
if (pWorker->type == DND_WORKER_SINGLE) { if (pWorker->type == DND_WORKER_SINGLE) {
tWorkerCleanup(&pWorker->pool); tQWorkerCleanup(&pWorker->pool);
tWorkerFreeQueue(&pWorker->pool, pWorker->queue); tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
} else if (pWorker->type == DND_WORKER_MULTI) { } else if (pWorker->type == DND_WORKER_MULTI) {
tMWorkerCleanup(&pWorker->mpool); tWWorkerCleanup(&pWorker->mpool);
tMWorkerFreeQueue(&pWorker->mpool, pWorker->queue); tWWorkerFreeQueue(&pWorker->mpool, pWorker->queue);
} else { } else {
} }
} }

View File

@ -13,29 +13,30 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h" #define _DEFAULT_SOURCE
#include "taoserror.h"
#include "tqueue.h" #include "tqueue.h"
#include "taoserror.h"
#include "ulog.h" #include "ulog.h"
typedef struct STaosQnode STaosQnode; typedef struct STaosQnode STaosQnode;
typedef struct STaosQnode { typedef struct STaosQnode {
STaosQnode *next; STaosQnode *next;
STaosQueue *queue;
char item[]; char item[];
} STaosQnode; } STaosQnode;
typedef struct STaosQueue { typedef struct STaosQueue {
int32_t itemSize; int32_t itemSize;
int32_t numOfItems; int32_t numOfItems;
int32_t threadId;
STaosQnode * head; STaosQnode * head;
STaosQnode * tail; STaosQnode * tail;
STaosQueue * next; // for queue set STaosQueue * next; // for queue set
STaosQset * qset; // for queue set STaosQset * qset; // for queue set
void * ahandle; // for queue set void * ahandle; // for queue set
FProcessItem itemFp; FItem itemFp;
FProcessItems itemsFp; FItems itemsFp;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} STaosQueue; } STaosQueue;
@ -56,19 +57,23 @@ typedef struct STaosQall {
} STaosQall; } STaosQall;
STaosQueue *taosOpenQueue() { STaosQueue *taosOpenQueue() {
STaosQueue *queue = calloc(sizeof(STaosQueue), 1); STaosQueue *queue = calloc(1, sizeof(STaosQueue));
if (queue == NULL) { if (queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pthread_mutex_init(&queue->mutex, NULL); if (pthread_mutex_init(&queue->mutex, NULL) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
uTrace("queue:%p is opened", queue); queue->threadId = -1;
uDebug("queue:%p is opened", queue);
return queue; return queue;
} }
void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp) { void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
if (queue == NULL) return; if (queue == NULL) return;
queue->itemFp = itemFp; queue->itemFp = itemFp;
queue->itemsFp = itemsFp; queue->itemsFp = itemsFp;
@ -85,7 +90,9 @@ void taosCloseQueue(STaosQueue *queue) {
qset = queue->qset; qset = queue->qset;
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
if (queue->qset) taosRemoveFromQset(qset, queue); if (queue->qset) {
taosRemoveFromQset(qset, queue);
}
while (pNode) { while (pNode) {
pTemp = pNode; pTemp = pNode;
@ -96,7 +103,7 @@ void taosCloseQueue(STaosQueue *queue) {
pthread_mutex_destroy(&queue->mutex); pthread_mutex_destroy(&queue->mutex);
free(queue); free(queue);
uTrace("queue:%p is closed", queue); uDebug("queue:%p is closed", queue);
} }
bool taosQueueEmpty(STaosQueue *queue) { bool taosQueueEmpty(STaosQueue *queue) {
@ -120,19 +127,23 @@ int32_t taosQueueSize(STaosQueue *queue) {
} }
void *taosAllocateQitem(int32_t size) { void *taosAllocateQitem(int32_t size) {
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); STaosQnode *pNode = calloc(1, sizeof(STaosQnode) + size);
if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
if (pNode == NULL) return NULL;
uTrace("item:%p, node:%p is allocated", pNode->item, pNode); uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
return (void *)pNode->item; return (void *)pNode->item;
} }
void taosFreeQitem(void *param) { void taosFreeQitem(void *pItem) {
if (param == NULL) return; if (pItem == NULL) return;
char *temp = (char *)param; char *temp = pItem;
temp -= sizeof(STaosQnode); temp -= sizeof(STaosQnode);
uTrace("item:%p, node:%p is freed", param, temp); uTrace("item:%p, node:%p is freed", pItem, temp);
free(temp); free(temp);
} }
@ -175,7 +186,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
queue->numOfItems--; queue->numOfItems--;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
code = 1; code = 1;
uDebug("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
} }
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
@ -183,7 +194,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
return code; return code;
} }
STaosQall *taosAllocateQall() { return calloc(sizeof(STaosQall), 1); } STaosQall *taosAllocateQall() { return calloc(1, sizeof(STaosQall)); }
void taosFreeQall(STaosQall *qall) { free(qall); } void taosFreeQall(STaosQall *qall) { free(qall); }
@ -238,7 +249,7 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
STaosQset *taosOpenQset() { STaosQset *taosOpenQset() {
STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1); STaosQset *qset = calloc(sizeof(STaosQset), 1);
if (qset == NULL) { if (qset == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
@ -247,7 +258,7 @@ STaosQset *taosOpenQset() {
pthread_mutex_init(&qset->mutex, NULL); pthread_mutex_init(&qset->mutex, NULL);
tsem_init(&qset->sem, 0, 0); tsem_init(&qset->sem, 0, 0);
uTrace("qset:%p is opened", qset); uDebug("qset:%p is opened", qset);
return qset; return qset;
} }
@ -268,7 +279,7 @@ void taosCloseQset(STaosQset *qset) {
pthread_mutex_destroy(&qset->mutex); pthread_mutex_destroy(&qset->mutex);
tsem_destroy(&qset->sem); tsem_destroy(&qset->sem);
free(qset); free(qset);
uTrace("qset:%p is closed", qset); uDebug("qset:%p is closed", qset);
} }
// tsem_post 'qset->sem', so that reader threads waiting for it // tsem_post 'qset->sem', so that reader threads waiting for it
@ -338,12 +349,12 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
pthread_mutex_unlock(&qset->mutex); pthread_mutex_unlock(&qset->mutex);
uTrace("queue:%p is removed from qset:%p", queue, qset); uDebug("queue:%p is removed from qset:%p", queue, qset);
} }
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp) { int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp) {
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int32_t code = 0; int32_t code = 0;
@ -365,6 +376,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FP
*ppItem = pNode->item; *ppItem = pNode->item;
if (ahandle) *ahandle = queue->ahandle; if (ahandle) *ahandle = queue->ahandle;
if (itemFp) *itemFp = queue->itemFp; if (itemFp) *itemFp = queue->itemFp;
queue->head = pNode->next; queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL; if (queue->head == NULL) queue->tail = NULL;
queue->numOfItems--; queue->numOfItems--;
@ -382,7 +394,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FP
return code; return code;
} }
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp) { int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp) {
STaosQueue *queue; STaosQueue *queue;
int32_t code = 0; int32_t code = 0;
@ -411,7 +423,9 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
queue->tail = NULL; queue->tail = NULL;
queue->numOfItems = 0; queue->numOfItems = 0;
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
for (int32_t j = 1; j < qall->numOfItems; ++j) tsem_wait(&qset->sem); for (int32_t j = 1; j < qall->numOfItems; ++j) {
tsem_wait(&qset->sem);
}
} }
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
@ -423,6 +437,65 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
return code; return code;
} }
int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId) {
STaosQnode *pNode = NULL;
int32_t code = -1;
tsem_wait(&qset->sem);
pthread_mutex_lock(&qset->mutex);
for (int32_t i = 0; i < qset->numOfQueues; ++i) {
if (qset->current == NULL) qset->current = qset->head;
STaosQueue *queue = qset->current;
if (queue) qset->current = queue->next;
if (queue == NULL) break;
if (queue->head == NULL) continue;
if (queue->threadId != -1 && queue->threadId != threadId) {
code = 0;
continue;
}
pthread_mutex_lock(&queue->mutex);
if (queue->head) {
pNode = queue->head;
pNode->queue = queue;
queue->threadId = threadId;
*ppItem = pNode->item;
if (ahandle) *ahandle = queue->ahandle;
if (itemFp) *itemFp = queue->itemFp;
queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL;
queue->numOfItems--;
atomic_sub_fetch_32(&qset->numOfItems, 1);
code = 1;
uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
}
pthread_mutex_unlock(&queue->mutex);
if (pNode) break;
}
pthread_mutex_unlock(&qset->mutex);
return code;
}
void taosResetQsetThread(STaosQset *qset, void *pItem) {
if (pItem == NULL) return;
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
pthread_mutex_lock(&qset->mutex);
pNode->queue->threadId = -1;
for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
tsem_post(&qset->sem);
}
pthread_mutex_unlock(&qset->mutex);
}
int32_t taosGetQueueItemsNumber(STaosQueue *queue) { int32_t taosGetQueueItemsNumber(STaosQueue *queue) {
if (!queue) return 0; if (!queue) return 0;

View File

@ -14,38 +14,46 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h"
#include "ulog.h"
#include "tqueue.h"
#include "tworker.h" #include "tworker.h"
#include "taoserror.h"
#include "ulog.h"
typedef void *(*ThreadFp)(void *param); typedef void *(*ThreadFp)(void *param);
int32_t tWorkerInit(SWorkerPool *pool) { int32_t tQWorkerInit(SQWorkerPool *pool) {
pool->qset = taosOpenQset(); pool->qset = taosOpenQset();
pool->workers = calloc(sizeof(SWorker), pool->max); pool->workers = calloc(sizeof(SQWorker), pool->max);
pthread_mutex_init(&pool->mutex, NULL); if (pool->workers == NULL) {
for (int i = 0; i < pool->max; ++i) { terrno = TSDB_CODE_OUT_OF_MEMORY;
SWorker *worker = pool->workers + i; return -1;
}
if (pthread_mutex_init(&pool->mutex, NULL)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < pool->max; ++i) {
SQWorker *worker = pool->workers + i;
worker->id = i; worker->id = i;
worker->pool = pool; worker->pool = pool;
} }
uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max); uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
return 0; return 0;
} }
void tWorkerCleanup(SWorkerPool *pool) { void tQWorkerCleanup(SQWorkerPool *pool) {
for (int i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SWorker *worker = pool->workers + i; SQWorker *worker = pool->workers + i;
if (worker == NULL) continue; if (worker == NULL) continue;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
taosQsetThreadResume(pool->qset); taosQsetThreadResume(pool->qset);
} }
} }
for (int i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SWorker *worker = pool->workers + i; SQWorker *worker = pool->workers + i;
if (worker == NULL) continue; if (worker == NULL) continue;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
pthread_join(worker->thread, NULL); pthread_join(worker->thread, NULL);
@ -56,12 +64,12 @@ void tWorkerCleanup(SWorkerPool *pool) {
taosCloseQset(pool->qset); taosCloseQset(pool->qset);
pthread_mutex_destroy(&pool->mutex); pthread_mutex_destroy(&pool->mutex);
uInfo("worker:%s is closed", pool->name); uDebug("worker:%s is closed", pool->name);
} }
static void *tWorkerThreadFp(SWorker *worker) { static void *tQWorkerThreadFp(SQWorker *worker) {
SWorkerPool *pool = worker->pool; SQWorkerPool *pool = worker->pool;
FProcessItem fp = NULL; FItem fp = NULL;
void * msg = NULL; void * msg = NULL;
void * ahandle = NULL; void * ahandle = NULL;
@ -77,7 +85,7 @@ static void *tWorkerThreadFp(SWorker *worker) {
break; break;
} }
if (fp) { if (fp != NULL) {
(*fp)(ahandle, msg); (*fp)(ahandle, msg);
} }
} }
@ -85,11 +93,12 @@ static void *tWorkerThreadFp(SWorker *worker) {
return NULL; return NULL;
} }
STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) { STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, ThreadFp threadFp) {
pthread_mutex_lock(&pool->mutex); pthread_mutex_lock(&pool->mutex);
STaosQueue *queue = taosOpenQueue(); STaosQueue *queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
pthread_mutex_unlock(&pool->mutex); pthread_mutex_unlock(&pool->mutex);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
@ -99,14 +108,18 @@ STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp)
// spawn a thread to process queue // spawn a thread to process queue
if (pool->num < pool->max) { if (pool->num < pool->max) {
do { do {
SWorker *worker = pool->workers + pool->num; SQWorker *worker = pool->workers + pool->num;
pthread_attr_t thAttr; pthread_attr_t thAttr;
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWorkerThreadFp, worker) != 0) { if (pthread_create(&worker->thread, &thAttr, threadFp, worker) != 0) {
uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
taosCloseQueue(queue);
terrno = TSDB_CODE_OUT_OF_MEMORY;
queue = NULL;
break;
} }
pthread_attr_destroy(&thAttr); pthread_attr_destroy(&thAttr);
@ -121,19 +134,73 @@ STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp)
return queue; return queue;
} }
void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue) { STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tQWorkerThreadFp);
}
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
taosCloseQueue(queue); taosCloseQueue(queue);
uDebug("worker:%s, queue:%p is freed", pool->name, queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue);
} }
int32_t tMWorkerInit(SMWorkerPool *pool) { int32_t tFWorkerInit(SFWorkerPool *pool) { return tQWorkerInit((SQWorkerPool *)pool); }
pool->nextId = 0;
pool->workers = calloc(sizeof(SMWorker), pool->max); void tFWorkerCleanup(SFWorkerPool *pool) { tQWorkerCleanup(pool); }
if (pool->workers == NULL) return -1;
static void *tFWorkerThreadFp(SQWorker *worker) {
SQWorkerPool *pool = worker->pool;
FItem fp = NULL;
void * msg = NULL;
void * ahandle = NULL;
int32_t code = 0;
taosBlockSIGPIPE();
setThreadName(pool->name);
uDebug("worker:%s:%d is running", pool->name, worker->id);
while (1) {
code = taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id);
if (code < 0) {
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
break;
} else if (code == 0) {
// uTrace("worker:%s:%d qset:%p, got no message and continue", pool->name, worker->id, pool->qset);
continue;
}
if (fp != NULL) {
(*fp)(ahandle, msg);
}
taosResetQsetThread(pool->qset, msg);
}
return NULL;
}
STaosQueue *tFWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tFWorkerThreadFp);
}
void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueue(pool, queue); }
int32_t tWWorkerInit(SWWorkerPool *pool) {
pool->nextId = 0;
pool->workers = calloc(sizeof(SWWorker), pool->max);
if (pool->workers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (pthread_mutex_init(&pool->mutex, NULL) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pthread_mutex_init(&pool->mutex, NULL);
for (int32_t i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SMWorker *worker = pool->workers + i; SWWorker *worker = pool->workers + i;
worker->id = i; worker->id = i;
worker->qall = NULL; worker->qall = NULL;
worker->qset = NULL; worker->qset = NULL;
@ -144,16 +211,18 @@ int32_t tMWorkerInit(SMWorkerPool *pool) {
return 0; return 0;
} }
void tMWorkerCleanup(SMWorkerPool *pool) { void tWWorkerCleanup(SWWorkerPool *pool) {
for (int32_t i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SMWorker *worker = pool->workers + i; SWWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
if (worker->qset) taosQsetThreadResume(worker->qset); if (worker->qset) {
taosQsetThreadResume(worker->qset);
}
} }
} }
for (int32_t i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SMWorker *worker = pool->workers + i; SWWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
pthread_join(worker->thread, NULL); pthread_join(worker->thread, NULL);
taosFreeQall(worker->qall); taosFreeQall(worker->qall);
@ -167,9 +236,9 @@ void tMWorkerCleanup(SMWorkerPool *pool) {
uInfo("worker:%s is closed", pool->name); uInfo("worker:%s is closed", pool->name);
} }
static void *tWriteWorkerThreadFp(SMWorker *worker) { static void *tWWorkerThreadFp(SWWorker *worker) {
SMWorkerPool *pool = worker->pool; SWWorkerPool *pool = worker->pool;
FProcessItems fp = NULL; FItems fp = NULL;
void * msg = NULL; void * msg = NULL;
void * ahandle = NULL; void * ahandle = NULL;
@ -187,7 +256,7 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) {
break; break;
} }
if (fp) { if (fp != NULL) {
(*fp)(ahandle, worker->qall, numOfMsgs); (*fp)(ahandle, worker->qall, numOfMsgs);
} }
} }
@ -195,13 +264,14 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) {
return NULL; return NULL;
} }
STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) { STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
pthread_mutex_lock(&pool->mutex); pthread_mutex_lock(&pool->mutex);
SMWorker *worker = pool->workers + pool->nextId; SWWorker *worker = pool->workers + pool->nextId;
STaosQueue *queue = taosOpenQueue(); STaosQueue *queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
pthread_mutex_unlock(&pool->mutex); pthread_mutex_unlock(&pool->mutex);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
@ -221,17 +291,19 @@ STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems
taosCloseQset(worker->qset); taosCloseQset(worker->qset);
taosCloseQueue(queue); taosCloseQueue(queue);
pthread_mutex_unlock(&pool->mutex); pthread_mutex_unlock(&pool->mutex);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pthread_attr_t thAttr; pthread_attr_t thAttr;
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) { if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) {
uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
taosFreeQall(worker->qall); taosFreeQall(worker->qall);
taosCloseQset(worker->qset); taosCloseQset(worker->qset);
taosCloseQueue(queue); taosCloseQueue(queue);
terrno = TSDB_CODE_OUT_OF_MEMORY;
queue = NULL; queue = NULL;
} else { } else {
uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
@ -250,7 +322,7 @@ STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems
return queue; return queue;
} }
void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue) { void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
taosCloseQueue(queue); taosCloseQueue(queue);
uDebug("worker:%s, queue:%p is freed", pool->name, queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue);
} }