refact worker util
This commit is contained in:
parent
e777ee729b
commit
5fca9015b8
|
@ -15,57 +15,74 @@
|
|||
|
||||
#ifndef _TD_UTIL_WORKER_H
|
||||
#define _TD_UTIL_WORKER_H
|
||||
|
||||
#include "os.h"
|
||||
#include "tqueue.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct SWorkerPool SWorkerPool;
|
||||
typedef struct SMWorkerPool SMWorkerPool;
|
||||
typedef struct SQWorkerPool SQWorkerPool;
|
||||
typedef struct SFWorkerPool SFWorkerPool;
|
||||
typedef struct SWWorkerPool SWWorkerPool;
|
||||
|
||||
typedef struct SWorker {
|
||||
typedef struct SQWorker {
|
||||
int32_t id; // worker ID
|
||||
pthread_t thread; // thread
|
||||
SWorkerPool *pool;
|
||||
} SWorker;
|
||||
SQWorkerPool *pool;
|
||||
} SQWorker;
|
||||
|
||||
typedef struct SWorkerPool {
|
||||
typedef struct SQWorkerPool {
|
||||
int32_t max; // max number of workers
|
||||
int32_t min; // min number of workers
|
||||
int32_t num; // current number of workers
|
||||
STaosQset *qset;
|
||||
const char *name;
|
||||
SWorker *workers;
|
||||
STaosQset * qset;
|
||||
const char * name;
|
||||
SQWorker * workers;
|
||||
pthread_mutex_t mutex;
|
||||
} SWorkerPool;
|
||||
} SQWorkerPool;
|
||||
|
||||
typedef struct SMWorker {
|
||||
typedef struct SFWorker {
|
||||
int32_t id; // worker ID
|
||||
pthread_t thread; // thread
|
||||
SFWorkerPool *pool;
|
||||
} SFWorker;
|
||||
|
||||
typedef struct SFWorkerPool {
|
||||
int32_t max; // max number of workers
|
||||
int32_t min; // min number of workers
|
||||
int32_t num; // current number of workers
|
||||
STaosQset * qset;
|
||||
const char * name;
|
||||
SFWorker * workers;
|
||||
pthread_mutex_t mutex;
|
||||
} SFWorkerPool;
|
||||
|
||||
typedef struct SWWorker {
|
||||
int32_t id; // worker id
|
||||
pthread_t thread; // thread
|
||||
STaosQall *qall;
|
||||
STaosQset *qset; // queue set
|
||||
SMWorkerPool *pool;
|
||||
} SMWorker;
|
||||
STaosQall * qall;
|
||||
STaosQset * qset; // queue set
|
||||
SWWorkerPool *pool;
|
||||
} SWWorker;
|
||||
|
||||
typedef struct SMWorkerPool {
|
||||
typedef struct SWWorkerPool {
|
||||
int32_t max; // max number of workers
|
||||
int32_t nextId; // from 0 to max-1, cyclic
|
||||
const char *name;
|
||||
SMWorker *workers;
|
||||
const char * name;
|
||||
SWWorker * workers;
|
||||
pthread_mutex_t mutex;
|
||||
} SMWorkerPool;
|
||||
} SWWorkerPool;
|
||||
|
||||
int32_t tWorkerInit(SWorkerPool *pool);
|
||||
void tWorkerCleanup(SWorkerPool *pool);
|
||||
STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp);
|
||||
void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue);
|
||||
int32_t tQWorkerInit(SQWorkerPool *pool);
|
||||
void tQWorkerCleanup(SQWorkerPool *pool);
|
||||
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem fp);
|
||||
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue);
|
||||
|
||||
int32_t tMWorkerInit(SMWorkerPool *pool);
|
||||
void tMWorkerCleanup(SMWorkerPool *pool);
|
||||
STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp);
|
||||
void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue);
|
||||
int32_t tWWorkerInit(SWWorkerPool *pool);
|
||||
void tWWorkerCleanup(SWWorkerPool *pool);
|
||||
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems fp);
|
||||
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -31,8 +31,8 @@ typedef struct {
|
|||
SDnode *pDnode;
|
||||
STaosQueue *queue;
|
||||
union {
|
||||
SWorkerPool pool;
|
||||
SMWorkerPool mpool;
|
||||
SQWorkerPool pool;
|
||||
SWWorkerPool mpool;
|
||||
};
|
||||
} SDnodeWorker;
|
||||
|
||||
|
@ -109,10 +109,10 @@ typedef struct {
|
|||
int32_t openVnodes;
|
||||
int32_t totalVnodes;
|
||||
SRWLatch latch;
|
||||
SWorkerPool queryPool;
|
||||
SWorkerPool fetchPool;
|
||||
SMWorkerPool syncPool;
|
||||
SMWorkerPool writePool;
|
||||
SQWorkerPool queryPool;
|
||||
SQWorkerPool fetchPool;
|
||||
SWWorkerPool syncPool;
|
||||
SWWorkerPool writePool;
|
||||
} SVnodesMgmt;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -910,27 +910,27 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
|
|||
int32_t maxWriteThreads = TMAX(pDnode->env.numOfCores, 1);
|
||||
int32_t maxSyncThreads = TMAX(pDnode->env.numOfCores / 2, 1);
|
||||
|
||||
SWorkerPool *pPool = &pMgmt->queryPool;
|
||||
SQWorkerPool *pPool = &pMgmt->queryPool;
|
||||
pPool->name = "vnode-query";
|
||||
pPool->min = minQueryThreads;
|
||||
pPool->max = maxQueryThreads;
|
||||
if (tWorkerInit(pPool) != 0) return -1;
|
||||
if (tQWorkerInit(pPool) != 0) return -1;
|
||||
|
||||
pPool = &pMgmt->fetchPool;
|
||||
pPool->name = "vnode-fetch";
|
||||
pPool->min = minFetchThreads;
|
||||
pPool->max = maxFetchThreads;
|
||||
if (tWorkerInit(pPool) != 0) return -1;
|
||||
if (tQWorkerInit(pPool) != 0) return -1;
|
||||
|
||||
SMWorkerPool *pMPool = &pMgmt->writePool;
|
||||
SWWorkerPool *pMPool = &pMgmt->writePool;
|
||||
pMPool->name = "vnode-write";
|
||||
pMPool->max = maxWriteThreads;
|
||||
if (tMWorkerInit(pMPool) != 0) return -1;
|
||||
if (tWWorkerInit(pMPool) != 0) return -1;
|
||||
|
||||
pMPool = &pMgmt->syncPool;
|
||||
pMPool->name = "vnode-sync";
|
||||
pMPool->max = maxSyncThreads;
|
||||
if (tMWorkerInit(pMPool) != 0) return -1;
|
||||
if (tWWorkerInit(pMPool) != 0) return -1;
|
||||
|
||||
dDebug("vnode workers is initialized");
|
||||
return 0;
|
||||
|
@ -938,21 +938,21 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
|
|||
|
||||
static void dndCleanupVnodeWorkers(SDnode *pDnode) {
|
||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||
tWorkerCleanup(&pMgmt->fetchPool);
|
||||
tWorkerCleanup(&pMgmt->queryPool);
|
||||
tMWorkerCleanup(&pMgmt->writePool);
|
||||
tMWorkerCleanup(&pMgmt->syncPool);
|
||||
tQWorkerCleanup(&pMgmt->fetchPool);
|
||||
tQWorkerCleanup(&pMgmt->queryPool);
|
||||
tWWorkerCleanup(&pMgmt->writePool);
|
||||
tWWorkerCleanup(&pMgmt->syncPool);
|
||||
dDebug("vnode workers is closed");
|
||||
}
|
||||
|
||||
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||
|
||||
pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue);
|
||||
pVnode->pApplyQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue);
|
||||
pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue);
|
||||
pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue);
|
||||
pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue);
|
||||
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue);
|
||||
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue);
|
||||
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue);
|
||||
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue);
|
||||
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue);
|
||||
|
||||
if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
|
||||
pVnode->pQueryQ == NULL) {
|
||||
|
@ -965,11 +965,11 @@ static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
|||
|
||||
static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||
tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||
tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||
tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
|
||||
tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
|
||||
tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
||||
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||
tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
|
||||
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
|
||||
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
||||
pVnode->pWriteQ = NULL;
|
||||
pVnode->pApplyQ = NULL;
|
||||
pVnode->pSyncQ = NULL;
|
||||
|
|
|
@ -31,28 +31,28 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c
|
|||
pWorker->pDnode = pDnode;
|
||||
|
||||
if (pWorker->type == DND_WORKER_SINGLE) {
|
||||
SWorkerPool *pPool = &pWorker->pool;
|
||||
SQWorkerPool *pPool = &pWorker->pool;
|
||||
pPool->name = name;
|
||||
pPool->min = minNum;
|
||||
pPool->max = maxNum;
|
||||
if (tWorkerInit(pPool) != 0) {
|
||||
if (tQWorkerInit(pPool) != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pWorker->queue = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)queueFp);
|
||||
pWorker->queue = tQWorkerAllocQueue(pPool, pDnode, (FProcessItem)queueFp);
|
||||
if (pWorker->queue == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
} else if (pWorker->type == DND_WORKER_MULTI) {
|
||||
SMWorkerPool *pPool = &pWorker->mpool;
|
||||
SWWorkerPool *pPool = &pWorker->mpool;
|
||||
pPool->name = name;
|
||||
pPool->max = maxNum;
|
||||
if (tMWorkerInit(pPool) != 0) {
|
||||
if (tWWorkerInit(pPool) != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pWorker->queue = tMWorkerAllocQueue(pPool, pDnode, (FProcessItems)queueFp);
|
||||
pWorker->queue = tWWorkerAllocQueue(pPool, pDnode, (FProcessItems)queueFp);
|
||||
if (pWorker->queue == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
@ -70,11 +70,11 @@ void dndCleanupWorker(SDnodeWorker *pWorker) {
|
|||
}
|
||||
|
||||
if (pWorker->type == DND_WORKER_SINGLE) {
|
||||
tWorkerCleanup(&pWorker->pool);
|
||||
tWorkerFreeQueue(&pWorker->pool, pWorker->queue);
|
||||
tQWorkerCleanup(&pWorker->pool);
|
||||
tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
|
||||
} else if (pWorker->type == DND_WORKER_MULTI) {
|
||||
tMWorkerCleanup(&pWorker->mpool);
|
||||
tMWorkerFreeQueue(&pWorker->mpool, pWorker->queue);
|
||||
tWWorkerCleanup(&pWorker->mpool);
|
||||
tWWorkerFreeQueue(&pWorker->mpool, pWorker->queue);
|
||||
} else {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,38 +14,39 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "ulog.h"
|
||||
#include "tqueue.h"
|
||||
#include "tworker.h"
|
||||
#include "ulog.h"
|
||||
|
||||
typedef void* (*ThreadFp)(void *param);
|
||||
typedef void *(*ThreadFp)(void *param);
|
||||
|
||||
int32_t tWorkerInit(SWorkerPool *pool) {
|
||||
int32_t tQWorkerInit(SQWorkerPool *pool) {
|
||||
pool->qset = taosOpenQset();
|
||||
pool->workers = calloc(sizeof(SWorker), pool->max);
|
||||
pthread_mutex_init(&pool->mutex, NULL);
|
||||
for (int i = 0; i < pool->max; ++i) {
|
||||
SWorker *worker = pool->workers + i;
|
||||
pool->workers = calloc(sizeof(SQWorker), pool->max);
|
||||
if (pthread_mutex_init(&pool->mutex, NULL)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pool->max; ++i) {
|
||||
SQWorker *worker = pool->workers + i;
|
||||
worker->id = i;
|
||||
worker->pool = pool;
|
||||
}
|
||||
|
||||
uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
|
||||
uDebug("qworker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tWorkerCleanup(SWorkerPool *pool) {
|
||||
for (int i = 0; i < pool->max; ++i) {
|
||||
SWorker *worker = pool->workers + i;
|
||||
void tQWorkerCleanup(SQWorkerPool *pool) {
|
||||
for (int32_t i = 0; i < pool->max; ++i) {
|
||||
SQWorker *worker = pool->workers + i;
|
||||
if (worker == NULL) continue;
|
||||
if (taosCheckPthreadValid(worker->thread)) {
|
||||
taosQsetThreadResume(pool->qset);
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < pool->max; ++i) {
|
||||
SWorker *worker = pool->workers + i;
|
||||
for (int32_t i = 0; i < pool->max; ++i) {
|
||||
SQWorker *worker = pool->workers + i;
|
||||
if (worker == NULL) continue;
|
||||
if (taosCheckPthreadValid(worker->thread)) {
|
||||
pthread_join(worker->thread, NULL);
|
||||
|
@ -56,28 +57,28 @@ void tWorkerCleanup(SWorkerPool *pool) {
|
|||
taosCloseQset(pool->qset);
|
||||
pthread_mutex_destroy(&pool->mutex);
|
||||
|
||||
uInfo("worker:%s is closed", pool->name);
|
||||
uDebug("qworker:%s is closed", pool->name);
|
||||
}
|
||||
|
||||
static void *tWorkerThreadFp(SWorker *worker) {
|
||||
SWorkerPool *pool = worker->pool;
|
||||
static void *tQWorkerThreadFp(SQWorker *worker) {
|
||||
SQWorkerPool *pool = worker->pool;
|
||||
FProcessItem fp = NULL;
|
||||
|
||||
void *msg = NULL;
|
||||
void *ahandle = NULL;
|
||||
void * msg = NULL;
|
||||
void * ahandle = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
taosBlockSIGPIPE();
|
||||
setThreadName(pool->name);
|
||||
uDebug("worker:%s:%d is running", pool->name, worker->id);
|
||||
uDebug("qworker:%s:%d is running", pool->name, worker->id);
|
||||
|
||||
while (1) {
|
||||
if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) {
|
||||
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
|
||||
uDebug("qworker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
|
||||
break;
|
||||
}
|
||||
|
||||
if (fp) {
|
||||
if (fp != NULL) {
|
||||
(*fp)(ahandle, msg);
|
||||
}
|
||||
}
|
||||
|
@ -85,7 +86,7 @@ static void *tWorkerThreadFp(SWorker *worker) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) {
|
||||
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem fp) {
|
||||
pthread_mutex_lock(&pool->mutex);
|
||||
STaosQueue *queue = taosOpenQueue();
|
||||
if (queue == NULL) {
|
||||
|
@ -99,61 +100,66 @@ STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp)
|
|||
// spawn a thread to process queue
|
||||
if (pool->num < pool->max) {
|
||||
do {
|
||||
SWorker *worker = pool->workers + pool->num;
|
||||
SQWorker *worker = pool->workers + pool->num;
|
||||
|
||||
pthread_attr_t thAttr;
|
||||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWorkerThreadFp, worker) != 0) {
|
||||
uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
|
||||
if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
|
||||
uError("qworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
|
||||
taosCloseQueue(queue);
|
||||
queue = NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
pthread_attr_destroy(&thAttr);
|
||||
pool->num++;
|
||||
uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
|
||||
uDebug("qworker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
|
||||
} while (pool->num < pool->min);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&pool->mutex);
|
||||
uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
|
||||
uDebug("qworker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue) {
|
||||
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
|
||||
taosCloseQueue(queue);
|
||||
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
|
||||
uDebug("qworker:%s, queue:%p is freed", pool->name, queue);
|
||||
}
|
||||
|
||||
int32_t tMWorkerInit(SMWorkerPool *pool) {
|
||||
int32_t tWWorkerInit(SWWorkerPool *pool) {
|
||||
pool->nextId = 0;
|
||||
pool->workers = calloc(sizeof(SMWorker), pool->max);
|
||||
pool->workers = calloc(sizeof(SWWorker), pool->max);
|
||||
if (pool->workers == NULL) return -1;
|
||||
|
||||
pthread_mutex_init(&pool->mutex, NULL);
|
||||
for (int32_t i = 0; i < pool->max; ++i) {
|
||||
SMWorker *worker = pool->workers + i;
|
||||
SWWorker *worker = pool->workers + i;
|
||||
worker->id = i;
|
||||
worker->qall = NULL;
|
||||
worker->qset = NULL;
|
||||
worker->pool = pool;
|
||||
}
|
||||
|
||||
uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
|
||||
uInfo("wworker:%s is initialized, max:%d", pool->name, pool->max);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tMWorkerCleanup(SMWorkerPool *pool) {
|
||||
void tWWorkerCleanup(SWWorkerPool *pool) {
|
||||
for (int32_t i = 0; i < pool->max; ++i) {
|
||||
SMWorker *worker = pool->workers + i;
|
||||
SWWorker *worker = pool->workers + i;
|
||||
if (taosCheckPthreadValid(worker->thread)) {
|
||||
if (worker->qset) taosQsetThreadResume(worker->qset);
|
||||
if (worker->qset) {
|
||||
taosQsetThreadResume(worker->qset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pool->max; ++i) {
|
||||
SMWorker *worker = pool->workers + i;
|
||||
SWWorker *worker = pool->workers + i;
|
||||
if (taosCheckPthreadValid(worker->thread)) {
|
||||
pthread_join(worker->thread, NULL);
|
||||
taosFreeQall(worker->qall);
|
||||
|
@ -164,30 +170,30 @@ void tMWorkerCleanup(SMWorkerPool *pool) {
|
|||
tfree(pool->workers);
|
||||
pthread_mutex_destroy(&pool->mutex);
|
||||
|
||||
uInfo("worker:%s is closed", pool->name);
|
||||
uInfo("wworker:%s is closed", pool->name);
|
||||
}
|
||||
|
||||
static void *tWriteWorkerThreadFp(SMWorker *worker) {
|
||||
SMWorkerPool *pool = worker->pool;
|
||||
static void *tWriteWorkerThreadFp(SWWorker *worker) {
|
||||
SWWorkerPool *pool = worker->pool;
|
||||
FProcessItems fp = NULL;
|
||||
|
||||
void *msg = NULL;
|
||||
void *ahandle = NULL;
|
||||
void * msg = NULL;
|
||||
void * ahandle = NULL;
|
||||
int32_t numOfMsgs = 0;
|
||||
int32_t qtype = 0;
|
||||
|
||||
taosBlockSIGPIPE();
|
||||
setThreadName(pool->name);
|
||||
uDebug("worker:%s:%d is running", pool->name, worker->id);
|
||||
uDebug("wworker:%s:%d is running", pool->name, worker->id);
|
||||
|
||||
while (1) {
|
||||
numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp);
|
||||
if (numOfMsgs == 0) {
|
||||
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
|
||||
uDebug("wworker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
|
||||
break;
|
||||
}
|
||||
|
||||
if (fp) {
|
||||
if (fp != NULL) {
|
||||
(*fp)(ahandle, worker->qall, numOfMsgs);
|
||||
}
|
||||
}
|
||||
|
@ -195,9 +201,9 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) {
|
||||
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems fp) {
|
||||
pthread_mutex_lock(&pool->mutex);
|
||||
SMWorker *worker = pool->workers + pool->nextId;
|
||||
SWWorker *worker = pool->workers + pool->nextId;
|
||||
|
||||
STaosQueue *queue = taosOpenQueue();
|
||||
if (queue == NULL) {
|
||||
|
@ -228,13 +234,13 @@ STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems
|
|||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) {
|
||||
uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
|
||||
uError("wworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
|
||||
taosFreeQall(worker->qall);
|
||||
taosCloseQset(worker->qset);
|
||||
taosCloseQueue(queue);
|
||||
queue = NULL;
|
||||
} else {
|
||||
uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
|
||||
uDebug("wworker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
|
||||
pool->nextId = (pool->nextId + 1) % pool->max;
|
||||
}
|
||||
|
||||
|
@ -245,12 +251,12 @@ STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems
|
|||
}
|
||||
|
||||
pthread_mutex_unlock(&pool->mutex);
|
||||
uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
|
||||
uDebug("wworker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue) {
|
||||
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
|
||||
taosCloseQueue(queue);
|
||||
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
|
||||
uDebug("wworker:%s, queue:%p is freed", pool->name, queue);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue