enh: adjusting the operation mode of the stream thread pool
This commit is contained in:
parent
e47bf7ba9f
commit
ed98fddf74
|
@ -733,7 +733,7 @@ To prevent system resource from being exhausted by multiple concurrent streams,
|
||||||
| 42 | numOfCommitThreads | Yes | Yes |
|
| 42 | numOfCommitThreads | Yes | Yes |
|
||||||
| 43 | numOfMnodeReadThreads | No | Yes |
|
| 43 | numOfMnodeReadThreads | No | Yes |
|
||||||
| 44 | numOfVnodeQueryThreads | No | Yes |
|
| 44 | numOfVnodeQueryThreads | No | Yes |
|
||||||
| 45 | numOfVnodeStreamThreads | No | Yes |
|
| 45 | ratioOfVnodeStreamThreads | No | Yes |
|
||||||
| 46 | numOfVnodeFetchThreads | No | Yes |
|
| 46 | numOfVnodeFetchThreads | No | Yes |
|
||||||
| 47 | numOfVnodeRsmaThreads | No | Yes |
|
| 47 | numOfVnodeRsmaThreads | No | Yes |
|
||||||
| 48 | numOfQnodeQueryThreads | No | Yes |
|
| 48 | numOfQnodeQueryThreads | No | Yes |
|
||||||
|
|
|
@ -709,7 +709,7 @@ charset 的有效值是 UTF-8。
|
||||||
| 42 | numOfCommitThreads | 是 | 是 | |
|
| 42 | numOfCommitThreads | 是 | 是 | |
|
||||||
| 43 | numOfMnodeReadThreads | 否 | 是 | |
|
| 43 | numOfMnodeReadThreads | 否 | 是 | |
|
||||||
| 44 | numOfVnodeQueryThreads | 否 | 是 | |
|
| 44 | numOfVnodeQueryThreads | 否 | 是 | |
|
||||||
| 45 | numOfVnodeStreamThreads | 否 | 是 | |
|
| 45 | ratioOfVnodeStreamThreads | 否 | 是 | |
|
||||||
| 46 | numOfVnodeFetchThreads | 否 | 是 | |
|
| 46 | numOfVnodeFetchThreads | 否 | 是 | |
|
||||||
| 47 | numOfVnodeRsmaThreads | 否 | 是 | |
|
| 47 | numOfVnodeRsmaThreads | 否 | 是 | |
|
||||||
| 48 | numOfQnodeQueryThreads | 否 | 是 | |
|
| 48 | numOfQnodeQueryThreads | 否 | 是 | |
|
||||||
|
|
|
@ -55,7 +55,7 @@ extern int32_t tsNumOfMnodeQueryThreads;
|
||||||
extern int32_t tsNumOfMnodeFetchThreads;
|
extern int32_t tsNumOfMnodeFetchThreads;
|
||||||
extern int32_t tsNumOfMnodeReadThreads;
|
extern int32_t tsNumOfMnodeReadThreads;
|
||||||
extern int32_t tsNumOfVnodeQueryThreads;
|
extern int32_t tsNumOfVnodeQueryThreads;
|
||||||
extern int32_t tsNumOfVnodeStreamThreads;
|
extern float tsRatioOfVnodeStreamThreads;
|
||||||
extern int32_t tsNumOfVnodeFetchThreads;
|
extern int32_t tsNumOfVnodeFetchThreads;
|
||||||
extern int32_t tsNumOfVnodeRsmaThreads;
|
extern int32_t tsNumOfVnodeRsmaThreads;
|
||||||
extern int32_t tsNumOfQnodeQueryThreads;
|
extern int32_t tsNumOfQnodeQueryThreads;
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#define _TD_UTIL_WORKER_H_
|
#define _TD_UTIL_WORKER_H_
|
||||||
|
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
|
#include "tarray.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -29,7 +30,7 @@ typedef struct SQWorker {
|
||||||
int32_t id; // worker id
|
int32_t id; // worker id
|
||||||
int64_t pid; // thread pid
|
int64_t pid; // thread pid
|
||||||
TdThread thread; // thread id
|
TdThread thread; // thread id
|
||||||
SQWorkerPool *pool;
|
void *pool;
|
||||||
} SQWorker;
|
} SQWorker;
|
||||||
|
|
||||||
typedef struct SQWorkerPool {
|
typedef struct SQWorkerPool {
|
||||||
|
@ -42,6 +43,14 @@ typedef struct SQWorkerPool {
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
} SQWorkerPool;
|
} SQWorkerPool;
|
||||||
|
|
||||||
|
typedef struct SAutoQWorkerPool {
|
||||||
|
float ratio;
|
||||||
|
STaosQset *qset;
|
||||||
|
const char *name;
|
||||||
|
SArray *workers;
|
||||||
|
TdThreadMutex mutex;
|
||||||
|
} SAutoQWorkerPool;
|
||||||
|
|
||||||
typedef struct SWWorker {
|
typedef struct SWWorker {
|
||||||
int32_t id; // worker id
|
int32_t id; // worker id
|
||||||
int64_t pid; // thread pid
|
int64_t pid; // thread pid
|
||||||
|
@ -65,6 +74,11 @@ void tQWorkerCleanup(SQWorkerPool *pool);
|
||||||
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp);
|
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp);
|
||||||
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue);
|
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue);
|
||||||
|
|
||||||
|
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool);
|
||||||
|
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool);
|
||||||
|
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp);
|
||||||
|
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue);
|
||||||
|
|
||||||
int32_t tWWorkerInit(SWWorkerPool *pool);
|
int32_t tWWorkerInit(SWWorkerPool *pool);
|
||||||
void tWWorkerCleanup(SWWorkerPool *pool);
|
void tWWorkerCleanup(SWWorkerPool *pool);
|
||||||
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp);
|
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp);
|
||||||
|
|
|
@ -47,7 +47,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
|
||||||
int32_t tsNumOfMnodeFetchThreads = 1;
|
int32_t tsNumOfMnodeFetchThreads = 1;
|
||||||
int32_t tsNumOfMnodeReadThreads = 1;
|
int32_t tsNumOfMnodeReadThreads = 1;
|
||||||
int32_t tsNumOfVnodeQueryThreads = 4;
|
int32_t tsNumOfVnodeQueryThreads = 4;
|
||||||
int32_t tsNumOfVnodeStreamThreads = 2;
|
float tsRatioOfVnodeStreamThreads = 1.0;
|
||||||
int32_t tsNumOfVnodeFetchThreads = 4;
|
int32_t tsNumOfVnodeFetchThreads = 4;
|
||||||
int32_t tsNumOfVnodeRsmaThreads = 2;
|
int32_t tsNumOfVnodeRsmaThreads = 2;
|
||||||
int32_t tsNumOfQnodeQueryThreads = 4;
|
int32_t tsNumOfQnodeQueryThreads = 4;
|
||||||
|
@ -392,9 +392,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
|
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
|
||||||
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfVnodeStreamThreads = tsNumOfCores / 4;
|
if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 100, 0) != 0) return -1;
|
||||||
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
|
|
||||||
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
|
|
||||||
|
|
||||||
tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
|
tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
|
||||||
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
|
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
|
||||||
|
@ -513,11 +511,9 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
|
||||||
pItem->stype = stype;
|
pItem->stype = stype;
|
||||||
}
|
}
|
||||||
|
|
||||||
pItem = cfgGetItem(tsCfg, "numOfVnodeStreamThreads");
|
pItem = cfgGetItem(tsCfg, "ratioOfVnodeStreamThreads");
|
||||||
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
|
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
|
||||||
tsNumOfVnodeStreamThreads = numOfCores / 4;
|
pItem->fval = tsRatioOfVnodeStreamThreads;
|
||||||
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
|
|
||||||
pItem->i32 = tsNumOfVnodeStreamThreads;
|
|
||||||
pItem->stype = stype;
|
pItem->stype = stype;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -710,7 +706,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
|
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
|
||||||
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
|
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
|
||||||
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
|
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
|
||||||
tsNumOfVnodeStreamThreads = cfgGetItem(pCfg, "numOfVnodeStreamThreads")->i32;
|
tsRatioOfVnodeStreamThreads = cfgGetItem(pCfg, "ratioOfVnodeStreamThreads")->fval;
|
||||||
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
|
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
|
||||||
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
|
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
|
||||||
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
||||||
|
|
|
@ -31,7 +31,7 @@ typedef struct SVnodeMgmt {
|
||||||
const char *path;
|
const char *path;
|
||||||
const char *name;
|
const char *name;
|
||||||
SQWorkerPool queryPool;
|
SQWorkerPool queryPool;
|
||||||
SQWorkerPool streamPool;
|
SAutoQWorkerPool streamPool;
|
||||||
SWWorkerPool fetchPool;
|
SWWorkerPool fetchPool;
|
||||||
SSingleWorker mgmtWorker;
|
SSingleWorker mgmtWorker;
|
||||||
SHashObj *hash;
|
SHashObj *hash;
|
||||||
|
|
|
@ -318,7 +318,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
(void)tMultiWorkerInit(&pVnode->pApplyW, &acfg);
|
(void)tMultiWorkerInit(&pVnode->pApplyW, &acfg);
|
||||||
|
|
||||||
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||||
pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
|
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
|
||||||
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
|
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
|
||||||
|
|
||||||
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncCtrlW.queue == NULL ||
|
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncCtrlW.queue == NULL ||
|
||||||
|
@ -344,7 +344,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
|
|
||||||
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||||
tQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
|
tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
|
||||||
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||||
pVnode->pQueryQ = NULL;
|
pVnode->pQueryQ = NULL;
|
||||||
pVnode->pStreamQ = NULL;
|
pVnode->pStreamQ = NULL;
|
||||||
|
@ -359,11 +359,10 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
||||||
pQPool->max = tsNumOfVnodeQueryThreads;
|
pQPool->max = tsNumOfVnodeQueryThreads;
|
||||||
if (tQWorkerInit(pQPool) != 0) return -1;
|
if (tQWorkerInit(pQPool) != 0) return -1;
|
||||||
|
|
||||||
SQWorkerPool *pStreamPool = &pMgmt->streamPool;
|
SAutoQWorkerPool *pStreamPool = &pMgmt->streamPool;
|
||||||
pStreamPool->name = "vnode-stream";
|
pStreamPool->name = "vnode-stream";
|
||||||
pStreamPool->min = tsNumOfVnodeStreamThreads;
|
pStreamPool->ratio = tsRatioOfVnodeStreamThreads;
|
||||||
pStreamPool->max = tsNumOfVnodeStreamThreads;
|
if (tAutoQWorkerInit(pStreamPool) != 0) return -1;
|
||||||
if (tQWorkerInit(pStreamPool) != 0) return -1;
|
|
||||||
|
|
||||||
SWWorkerPool *pFPool = &pMgmt->fetchPool;
|
SWWorkerPool *pFPool = &pMgmt->fetchPool;
|
||||||
pFPool->name = "vnode-fetch";
|
pFPool->name = "vnode-fetch";
|
||||||
|
@ -385,7 +384,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
||||||
|
|
||||||
void vmStopWorker(SVnodeMgmt *pMgmt) {
|
void vmStopWorker(SVnodeMgmt *pMgmt) {
|
||||||
tQWorkerCleanup(&pMgmt->queryPool);
|
tQWorkerCleanup(&pMgmt->queryPool);
|
||||||
tQWorkerCleanup(&pMgmt->streamPool);
|
tAutoQWorkerCleanup(&pMgmt->streamPool);
|
||||||
tWWorkerCleanup(&pMgmt->fetchPool);
|
tWWorkerCleanup(&pMgmt->fetchPool);
|
||||||
dDebug("vnode workers are closed");
|
dDebug("vnode workers are closed");
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,7 +36,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
|
||||||
worker->pool = pool;
|
worker->pool = pool;
|
||||||
}
|
}
|
||||||
|
|
||||||
uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
|
uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,8 +51,10 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
|
||||||
for (int32_t i = 0; i < pool->max; ++i) {
|
for (int32_t i = 0; i < pool->max; ++i) {
|
||||||
SQWorker *worker = pool->workers + i;
|
SQWorker *worker = pool->workers + i;
|
||||||
if (taosCheckPthreadValid(worker->thread)) {
|
if (taosCheckPthreadValid(worker->thread)) {
|
||||||
|
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
|
||||||
taosThreadJoin(worker->thread, NULL);
|
taosThreadJoin(worker->thread, NULL);
|
||||||
taosThreadClear(&worker->thread);
|
taosThreadClear(&worker->thread);
|
||||||
|
uInfo("worker:%s:%d is stopped", pool->name, worker->id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,7 +62,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
|
||||||
taosCloseQset(pool->qset);
|
taosCloseQset(pool->qset);
|
||||||
taosThreadMutexDestroy(&pool->mutex);
|
taosThreadMutexDestroy(&pool->mutex);
|
||||||
|
|
||||||
uDebug("worker:%s is closed", pool->name);
|
uInfo("worker:%s is closed", pool->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *tQWorkerThreadFp(SQWorker *worker) {
|
static void *tQWorkerThreadFp(SQWorker *worker) {
|
||||||
|
@ -119,7 +121,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
|
||||||
|
|
||||||
taosThreadAttrDestroy(&thAttr);
|
taosThreadAttrDestroy(&thAttr);
|
||||||
pool->num++;
|
pool->num++;
|
||||||
uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
|
uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
|
||||||
} while (pool->num < pool->min);
|
} while (pool->num < pool->min);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -130,7 +132,132 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
|
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
|
||||||
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
|
uInfo("worker:%s, queue:%p is freed", pool->name, queue);
|
||||||
|
taosCloseQueue(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
|
||||||
|
pool->qset = taosOpenQset();
|
||||||
|
pool->workers = taosArrayInit(2, sizeof(SQWorker));
|
||||||
|
if (pool->workers == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
(void)taosThreadMutexInit(&pool->mutex, NULL);
|
||||||
|
|
||||||
|
uInfo("worker:%s is initialized as auto", pool->name);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
|
||||||
|
int32_t size = taosArrayGetSize(pool->workers);
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
SQWorker *worker = taosArrayGet(pool->workers, i);
|
||||||
|
if (taosCheckPthreadValid(worker->thread)) {
|
||||||
|
taosQsetThreadResume(pool->qset);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
SQWorker *worker = taosArrayGet(pool->workers, i);
|
||||||
|
if (taosCheckPthreadValid(worker->thread)) {
|
||||||
|
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
|
||||||
|
taosThreadJoin(worker->thread, NULL);
|
||||||
|
taosThreadClear(&worker->thread);
|
||||||
|
uInfo("worker:%s:%d is stopped", pool->name, worker->id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pool->workers);
|
||||||
|
taosCloseQset(pool->qset);
|
||||||
|
taosThreadMutexDestroy(&pool->mutex);
|
||||||
|
|
||||||
|
uInfo("worker:%s is closed", pool->name);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *tAutoQWorkerThreadFp(SQWorker *worker) {
|
||||||
|
SAutoQWorkerPool *pool = worker->pool;
|
||||||
|
SQueueInfo qinfo = {0};
|
||||||
|
void *msg = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
taosBlockSIGPIPE();
|
||||||
|
setThreadName(pool->name);
|
||||||
|
worker->pid = taosGetSelfPthreadId();
|
||||||
|
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
|
||||||
|
uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
|
||||||
|
worker->pid);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (qinfo.fp != NULL) {
|
||||||
|
qinfo.workerId = worker->id;
|
||||||
|
qinfo.threadNum = taosArrayGetSize(pool->workers);
|
||||||
|
(*((FItem)qinfo.fp))(&qinfo, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosUpdateItemSize(qinfo.queue, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp) {
|
||||||
|
STaosQueue *queue = taosOpenQueue();
|
||||||
|
if (queue == NULL) return NULL;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pool->mutex);
|
||||||
|
taosSetQueueFp(queue, fp, NULL);
|
||||||
|
taosAddIntoQset(pool->qset, queue, ahandle);
|
||||||
|
|
||||||
|
int32_t queueNum = taosGetQueueNumber(pool->qset);
|
||||||
|
int32_t curWorkerNum = taosArrayGetSize(pool->workers);
|
||||||
|
int32_t dstWorkerNum = ceil(queueNum * pool->ratio);
|
||||||
|
if (dstWorkerNum < 1) dstWorkerNum = 1;
|
||||||
|
// spawn a thread to process queue
|
||||||
|
|
||||||
|
while (curWorkerNum < dstWorkerNum) {
|
||||||
|
SQWorker wobj = {
|
||||||
|
.id = (int32_t)taosArrayGetSize(pool->workers),
|
||||||
|
.pool = pool,
|
||||||
|
};
|
||||||
|
SQWorker *worker = taosArrayPush(pool->workers, &wobj);
|
||||||
|
if (worker == NULL) {
|
||||||
|
uError("worker:%s:%d failed to create, total:%d", pool->name, wobj.id, (int32_t)taosArrayGetSize(pool->workers));
|
||||||
|
taosCloseQueue(queue);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
TdThreadAttr thAttr;
|
||||||
|
taosThreadAttrInit(&thAttr);
|
||||||
|
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
|
||||||
|
uError("worker:%s:%d failed to create thread, total:%d", pool->name, wobj.id,
|
||||||
|
(int32_t)taosArrayGetSize(pool->workers));
|
||||||
|
taosCloseQueue(queue);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadAttrDestroy(&thAttr);
|
||||||
|
uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, (int32_t)taosArrayGetSize(pool->workers));
|
||||||
|
|
||||||
|
curWorkerNum++;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pool->mutex);
|
||||||
|
uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
|
||||||
|
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue) {
|
||||||
|
uInfo("worker:%s, queue:%p is freed", pool->name, queue);
|
||||||
taosCloseQueue(queue);
|
taosCloseQueue(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,7 +279,7 @@ int32_t tWWorkerInit(SWWorkerPool *pool) {
|
||||||
worker->pool = pool;
|
worker->pool = pool;
|
||||||
}
|
}
|
||||||
|
|
||||||
uDebug("worker:%s is initialized, max:%d", pool->name, pool->max);
|
uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -169,17 +296,19 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
|
||||||
for (int32_t i = 0; i < pool->max; ++i) {
|
for (int32_t i = 0; i < pool->max; ++i) {
|
||||||
SWWorker *worker = pool->workers + i;
|
SWWorker *worker = pool->workers + i;
|
||||||
if (taosCheckPthreadValid(worker->thread)) {
|
if (taosCheckPthreadValid(worker->thread)) {
|
||||||
|
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
|
||||||
taosThreadJoin(worker->thread, NULL);
|
taosThreadJoin(worker->thread, NULL);
|
||||||
taosThreadClear(&worker->thread);
|
taosThreadClear(&worker->thread);
|
||||||
taosFreeQall(worker->qall);
|
taosFreeQall(worker->qall);
|
||||||
taosCloseQset(worker->qset);
|
taosCloseQset(worker->qset);
|
||||||
|
uInfo("worker:%s:%d is stopped", pool->name, worker->id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFreeClear(pool->workers);
|
taosMemoryFreeClear(pool->workers);
|
||||||
taosThreadMutexDestroy(&pool->mutex);
|
taosThreadMutexDestroy(&pool->mutex);
|
||||||
|
|
||||||
uDebug("worker:%s is closed", pool->name);
|
uInfo("worker:%s is closed", pool->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *tWWorkerThreadFp(SWWorker *worker) {
|
static void *tWWorkerThreadFp(SWWorker *worker) {
|
||||||
|
@ -235,7 +364,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
|
||||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) goto _OVER;
|
if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) goto _OVER;
|
||||||
|
|
||||||
uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
|
uInfo("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
|
||||||
pool->nextId = (pool->nextId + 1) % pool->max;
|
pool->nextId = (pool->nextId + 1) % pool->max;
|
||||||
|
|
||||||
taosThreadAttrDestroy(&thAttr);
|
taosThreadAttrDestroy(&thAttr);
|
||||||
|
@ -259,13 +388,14 @@ _OVER:
|
||||||
} else {
|
} else {
|
||||||
while (worker->pid <= 0) taosMsleep(10);
|
while (worker->pid <= 0) taosMsleep(10);
|
||||||
queue->threadId = worker->pid;
|
queue->threadId = worker->pid;
|
||||||
uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, queue->threadId);
|
uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle,
|
||||||
|
queue->threadId);
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
|
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
|
||||||
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
|
uInfo("worker:%s, queue:%p is freed", pool->name, queue);
|
||||||
taosCloseQueue(queue);
|
taosCloseQueue(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue