fix: heap-buffer-overflow in auto qworker
This commit is contained in:
parent
ed98fddf74
commit
367b6512e9
|
@ -138,7 +138,7 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
|
||||||
|
|
||||||
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
|
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
|
||||||
pool->qset = taosOpenQset();
|
pool->qset = taosOpenQset();
|
||||||
pool->workers = taosArrayInit(2, sizeof(SQWorker));
|
pool->workers = taosArrayInit(2, sizeof(SQWorker *));
|
||||||
if (pool->workers == NULL) {
|
if (pool->workers == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -153,20 +153,21 @@ int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
|
||||||
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
|
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
|
||||||
int32_t size = taosArrayGetSize(pool->workers);
|
int32_t size = taosArrayGetSize(pool->workers);
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SQWorker *worker = taosArrayGet(pool->workers, i);
|
SQWorker *worker = taosArrayGetP(pool->workers, i);
|
||||||
if (taosCheckPthreadValid(worker->thread)) {
|
if (taosCheckPthreadValid(worker->thread)) {
|
||||||
taosQsetThreadResume(pool->qset);
|
taosQsetThreadResume(pool->qset);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SQWorker *worker = taosArrayGet(pool->workers, i);
|
SQWorker *worker = taosArrayGetP(pool->workers, i);
|
||||||
if (taosCheckPthreadValid(worker->thread)) {
|
if (taosCheckPthreadValid(worker->thread)) {
|
||||||
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
|
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
|
||||||
taosThreadJoin(worker->thread, NULL);
|
taosThreadJoin(worker->thread, NULL);
|
||||||
taosThreadClear(&worker->thread);
|
taosThreadClear(&worker->thread);
|
||||||
uInfo("worker:%s:%d is stopped", pool->name, worker->id);
|
uInfo("worker:%s:%d is stopped", pool->name, worker->id);
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(worker);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pool->workers);
|
taosArrayDestroy(pool->workers);
|
||||||
|
@ -218,27 +219,28 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
|
||||||
int32_t curWorkerNum = taosArrayGetSize(pool->workers);
|
int32_t curWorkerNum = taosArrayGetSize(pool->workers);
|
||||||
int32_t dstWorkerNum = ceil(queueNum * pool->ratio);
|
int32_t dstWorkerNum = ceil(queueNum * pool->ratio);
|
||||||
if (dstWorkerNum < 1) dstWorkerNum = 1;
|
if (dstWorkerNum < 1) dstWorkerNum = 1;
|
||||||
// spawn a thread to process queue
|
|
||||||
|
|
||||||
|
// spawn a thread to process queue
|
||||||
while (curWorkerNum < dstWorkerNum) {
|
while (curWorkerNum < dstWorkerNum) {
|
||||||
SQWorker wobj = {
|
SQWorker *worker = taosMemoryCalloc(1, sizeof(SQWorker));
|
||||||
.id = (int32_t)taosArrayGetSize(pool->workers),
|
if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) {
|
||||||
.pool = pool,
|
uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
|
||||||
};
|
taosMemoryFree(worker);
|
||||||
SQWorker *worker = taosArrayPush(pool->workers, &wobj);
|
|
||||||
if (worker == NULL) {
|
|
||||||
uError("worker:%s:%d failed to create, total:%d", pool->name, wobj.id, (int32_t)taosArrayGetSize(pool->workers));
|
|
||||||
taosCloseQueue(queue);
|
taosCloseQueue(queue);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
worker->id = curWorkerNum;
|
||||||
|
worker->pool = pool;
|
||||||
|
|
||||||
TdThreadAttr thAttr;
|
TdThreadAttr thAttr;
|
||||||
taosThreadAttrInit(&thAttr);
|
taosThreadAttrInit(&thAttr);
|
||||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
|
if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
|
||||||
uError("worker:%s:%d failed to create thread, total:%d", pool->name, wobj.id,
|
uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum);
|
||||||
(int32_t)taosArrayGetSize(pool->workers));
|
(void)taosArrayPop(pool->workers);
|
||||||
|
taosMemoryFree(worker);
|
||||||
taosCloseQueue(queue);
|
taosCloseQueue(queue);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
Loading…
Reference in New Issue