commit
40e86b367f
|
@ -32,27 +32,51 @@ typedef struct {
|
|||
SRpcMsg rpcMsg;
|
||||
} SReadMsg;
|
||||
|
||||
typedef struct {
|
||||
pthread_t thread; // thread
|
||||
int32_t workerId; // worker ID
|
||||
} SReadWorker;
|
||||
|
||||
typedef struct {
|
||||
int32_t max; // max number of workers
|
||||
int32_t min; // min number of workers
|
||||
int32_t num; // current number of workers
|
||||
SReadWorker *readWorker;
|
||||
} SReadWorkerPool;
|
||||
|
||||
static void *dnodeProcessReadQueue(void *param);
|
||||
static void dnodeHandleIdleReadWorker();
|
||||
static void dnodeHandleIdleReadWorker(SReadWorker *);
|
||||
|
||||
// module global variable
|
||||
static taos_qset readQset;
|
||||
static int32_t threads; // number of query threads
|
||||
static int32_t maxThreads;
|
||||
static int32_t minThreads;
|
||||
static SReadWorkerPool readPool;
|
||||
static taos_qset readQset;
|
||||
|
||||
int32_t dnodeInitRead() {
|
||||
readQset = taosOpenQset();
|
||||
|
||||
minThreads = 3;
|
||||
maxThreads = tsNumOfCores * tsNumOfThreadsPerCore;
|
||||
if (maxThreads <= minThreads * 2) maxThreads = 2 * minThreads;
|
||||
readPool.min = 2;
|
||||
readPool.max = tsNumOfCores * tsNumOfThreadsPerCore;
|
||||
if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min;
|
||||
readPool.readWorker = (SReadWorker *) calloc(sizeof(SReadWorker), readPool.max);
|
||||
|
||||
if (readPool.readWorker == NULL) return -1;
|
||||
for (int i=0; i < readPool.max; ++i) {
|
||||
SReadWorker *pWorker = readPool.readWorker + i;
|
||||
pWorker->workerId = i;
|
||||
}
|
||||
|
||||
dPrint("dnode read is opened");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void dnodeCleanupRead() {
|
||||
|
||||
for (int i=0; i < readPool.max; ++i) {
|
||||
SReadWorker *pWorker = readPool.readWorker + i;
|
||||
if (pWorker->thread)
|
||||
pthread_join(pWorker->thread, NULL);
|
||||
}
|
||||
|
||||
taosCloseQset(readQset);
|
||||
dPrint("dnode read is closed");
|
||||
}
|
||||
|
@ -116,18 +140,25 @@ void *dnodeAllocateRqueue(void *pVnode) {
|
|||
taosAddIntoQset(readQset, queue, pVnode);
|
||||
|
||||
// spawn a thread to process queue
|
||||
if (threads < maxThreads) {
|
||||
pthread_t thread;
|
||||
pthread_attr_t thAttr;
|
||||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
if (readPool.num < readPool.max) {
|
||||
do {
|
||||
SReadWorker *pWorker = readPool.readWorker + readPool.num;
|
||||
|
||||
if (pthread_create(&thread, &thAttr, dnodeProcessReadQueue, readQset) != 0) {
|
||||
dError("failed to create thread to process read queue, reason:%s", strerror(errno));
|
||||
}
|
||||
pthread_attr_t thAttr;
|
||||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessReadQueue, pWorker) != 0) {
|
||||
dError("failed to create thread to process read queue, reason:%s", strerror(errno));
|
||||
}
|
||||
|
||||
pthread_attr_destroy(&thAttr);
|
||||
readPool.num++;
|
||||
dTrace("read worker:%d is launched, total:%d", pWorker->workerId, readPool.num);
|
||||
} while (readPool.num < readPool.min);
|
||||
}
|
||||
|
||||
dTrace("pVnode:%p, queue:%p is allocated", pVnode, queue);
|
||||
dTrace("pVnode:%p, read queue:%p is allocated", pVnode, queue);
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
@ -167,14 +198,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
|
|||
}
|
||||
|
||||
static void *dnodeProcessReadQueue(void *param) {
|
||||
taos_qset qset = (taos_qset)param;
|
||||
SReadMsg *pReadMsg;
|
||||
int type;
|
||||
void *pVnode;
|
||||
SReadWorker *pWorker = param;
|
||||
SReadMsg *pReadMsg;
|
||||
int type;
|
||||
void *pVnode;
|
||||
|
||||
while (1) {
|
||||
if (taosReadQitemFromQset(qset, &type, (void **)&pReadMsg, (void **)&pVnode) == 0) {
|
||||
dnodeHandleIdleReadWorker();
|
||||
if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) {
|
||||
dnodeHandleIdleReadWorker(pWorker);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -186,11 +217,12 @@ static void *dnodeProcessReadQueue(void *param) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void dnodeHandleIdleReadWorker() {
|
||||
static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) {
|
||||
int32_t num = taosGetQueueNumber(readQset);
|
||||
|
||||
if (num == 0 || (num <= minThreads && threads > minThreads)) {
|
||||
threads--;
|
||||
if (num == 0 || (num <= readPool.min && readPool.num > readPool.min)) {
|
||||
readPool.num--;
|
||||
dTrace("read worker:%d is released, total:%d", pWorker->workerId, readPool.num);
|
||||
pthread_exit(NULL);
|
||||
} else {
|
||||
usleep(100);
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "vnode.h"
|
||||
|
||||
typedef struct {
|
||||
taos_qall qall;
|
||||
taos_qset qset; // queue set
|
||||
pthread_t thread; // thread
|
||||
int32_t workerId; // worker ID
|
||||
|
@ -65,6 +66,14 @@ int32_t dnodeInitWrite() {
|
|||
}
|
||||
|
||||
void dnodeCleanupWrite() {
|
||||
|
||||
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
|
||||
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
|
||||
if (pWorker->thread) {
|
||||
pthread_join(pWorker->thread, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
free(wWorkerPool.writeWorker);
|
||||
dPrint("dnode write is closed");
|
||||
}
|
||||
|
@ -113,6 +122,7 @@ void *dnodeAllocateWqueue(void *pVnode) {
|
|||
if (pWorker->qset == NULL) return NULL;
|
||||
|
||||
taosAddIntoQset(pWorker->qset, queue, pVnode);
|
||||
pWorker->qall = taosAllocateQall();
|
||||
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
|
||||
|
||||
pthread_attr_t thAttr;
|
||||
|
@ -122,13 +132,17 @@ void *dnodeAllocateWqueue(void *pVnode) {
|
|||
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
|
||||
dError("failed to create thread to process read queue, reason:%s", strerror(errno));
|
||||
taosCloseQset(pWorker->qset);
|
||||
} else {
|
||||
dTrace("write worker:%d is launched", pWorker->workerId);
|
||||
}
|
||||
|
||||
pthread_attr_destroy(&thAttr);
|
||||
} else {
|
||||
taosAddIntoQset(pWorker->qset, queue, pVnode);
|
||||
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
|
||||
}
|
||||
|
||||
dTrace("pVnode:%p, queue:%p is allocated", pVnode, queue);
|
||||
dTrace("pVnode:%p, write queue:%p is allocated", pVnode, queue);
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
@ -160,17 +174,14 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code) {
|
|||
|
||||
static void *dnodeProcessWriteQueue(void *param) {
|
||||
SWriteWorker *pWorker = (SWriteWorker *)param;
|
||||
taos_qall qall;
|
||||
SWriteMsg *pWrite;
|
||||
SWalHead *pHead;
|
||||
int32_t numOfMsgs;
|
||||
int type;
|
||||
void *pVnode, *item;
|
||||
|
||||
qall = taosAllocateQall();
|
||||
|
||||
while (1) {
|
||||
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, qall, &pVnode);
|
||||
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
|
||||
if (numOfMsgs <=0) {
|
||||
dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore
|
||||
continue;
|
||||
|
@ -178,7 +189,7 @@ static void *dnodeProcessWriteQueue(void *param) {
|
|||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
pWrite = NULL;
|
||||
taosGetQitem(qall, &type, &item);
|
||||
taosGetQitem(pWorker->qall, &type, &item);
|
||||
if (type == TAOS_QTYPE_RPC) {
|
||||
pWrite = (SWriteMsg *)item;
|
||||
pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead));
|
||||
|
@ -196,9 +207,9 @@ static void *dnodeProcessWriteQueue(void *param) {
|
|||
walFsync(vnodeGetWal(pVnode));
|
||||
|
||||
// browse all items, and process them one by one
|
||||
taosResetQitems(qall);
|
||||
taosResetQitems(pWorker->qall);
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, &type, &item);
|
||||
taosGetQitem(pWorker->qall, &type, &item);
|
||||
if (type == TAOS_QTYPE_RPC) {
|
||||
pWrite = (SWriteMsg *)item;
|
||||
dnodeSendRpcWriteRsp(pVnode, item, pWrite->rpcMsg.code);
|
||||
|
@ -209,8 +220,6 @@ static void *dnodeProcessWriteQueue(void *param) {
|
|||
}
|
||||
}
|
||||
|
||||
taosFreeQall(qall);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -221,8 +230,10 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
|
|||
usleep(1000);
|
||||
sched_yield();
|
||||
} else {
|
||||
taosFreeQall(pWorker->qall);
|
||||
taosCloseQset(pWorker->qset);
|
||||
pWorker->qset = NULL;
|
||||
dTrace("write worker:%d is released", pWorker->workerId);
|
||||
pthread_exit(NULL);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -103,7 +103,8 @@ void rpcCloseConnCache(void *handle) {
|
|||
if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool);
|
||||
|
||||
tfree(pCache->connHashList);
|
||||
tfree(pCache->count)
|
||||
tfree(pCache->count);
|
||||
tfree(pCache->lockedBy);
|
||||
|
||||
pthread_mutex_unlock(&pCache->mutex);
|
||||
|
||||
|
|
|
@ -84,7 +84,9 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp,
|
|||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
if (pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)) != 0) {
|
||||
int code = pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp));
|
||||
pthread_attr_destroy(&thattr);
|
||||
if (code != 0) {
|
||||
tError("%s failed to create TCP read data thread, reason:%s", label, strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -83,6 +83,9 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
|
|||
}
|
||||
memset(pServerObj->pThreadObj, 0, sizeof(SThreadObj) * (size_t)numOfThreads);
|
||||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
pThreadObj = pServerObj->pThreadObj;
|
||||
for (i = 0; i < numOfThreads; ++i) {
|
||||
pThreadObj->processData = fp;
|
||||
|
@ -105,8 +108,6 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
if (pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)) != 0) {
|
||||
tError("%s failed to create TCP process data thread, reason:%s", label, strerror(errno));
|
||||
return NULL;
|
||||
|
@ -116,8 +117,6 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
|
|||
pThreadObj++;
|
||||
}
|
||||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
if (pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)) != 0) {
|
||||
tError("%s failed to create TCP accept thread, reason:%s", label, strerror(errno));
|
||||
return NULL;
|
||||
|
|
|
@ -146,10 +146,12 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
|
|||
pConn->tmrCtrl = pSet->tmrCtrl;
|
||||
}
|
||||
|
||||
if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) {
|
||||
int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
|
||||
if (code != 0) {
|
||||
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
|
||||
taosCloseSocket(pConn->fd);
|
||||
taosCleanUpUdpConnection(pSet);
|
||||
pthread_attr_destroy(&thAttr);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -298,7 +298,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) {
|
|||
*/
|
||||
static SHashNode *doCreateHashNode(const char *key, size_t keyLen, const char *pData, size_t dataSize,
|
||||
uint32_t hashVal) {
|
||||
size_t totalSize = dataSize + sizeof(SHashNode) + keyLen;
|
||||
size_t totalSize = dataSize + sizeof(SHashNode) + keyLen + 1; // one extra byte for null
|
||||
|
||||
SHashNode *pNewNode = calloc(1, totalSize);
|
||||
if (pNewNode == NULL) {
|
||||
|
|
|
@ -189,7 +189,6 @@ void taosCleanUpIntHash(void *handle) {
|
|||
free(pObj->hashList);
|
||||
}
|
||||
|
||||
memset(pObj, 0, sizeof(IHashObj));
|
||||
free(pObj->lockedBy);
|
||||
free(pObj);
|
||||
}
|
||||
|
|
|
@ -117,7 +117,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) {
|
|||
queue->numOfItems++;
|
||||
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
|
||||
|
||||
pTrace("item:%p is put into queue, type:%d items:%d", item, type, queue->numOfItems);
|
||||
pTrace("item:%p is put into queue:%p, type:%d items:%d", item, queue, type, queue->numOfItems);
|
||||
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
|
||||
|
@ -297,14 +297,16 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
|
|||
STaosQset *qset = (STaosQset *)param;
|
||||
STaosQnode *pNode = NULL;
|
||||
int code = 0;
|
||||
|
||||
pthread_mutex_lock(&qset->mutex);
|
||||
|
||||
for(int i=0; i<qset->numOfQueues; ++i) {
|
||||
pthread_mutex_lock(&qset->mutex);
|
||||
//pthread_mutex_lock(&qset->mutex);
|
||||
if (qset->current == NULL)
|
||||
qset->current = qset->head;
|
||||
STaosQueue *queue = qset->current;
|
||||
if (queue) qset->current = queue->next;
|
||||
pthread_mutex_unlock(&qset->mutex);
|
||||
//pthread_mutex_unlock(&qset->mutex);
|
||||
if (queue == NULL) break;
|
||||
|
||||
pthread_mutex_lock(&queue->mutex);
|
||||
|
@ -326,6 +328,8 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
|
|||
if (pNode) break;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&qset->mutex);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -335,13 +339,15 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
|
|||
STaosQall *qall = (STaosQall *)p2;
|
||||
int code = 0;
|
||||
|
||||
pthread_mutex_lock(&qset->mutex);
|
||||
|
||||
for(int i=0; i<qset->numOfQueues; ++i) {
|
||||
pthread_mutex_lock(&qset->mutex);
|
||||
// pthread_mutex_lock(&qset->mutex);
|
||||
if (qset->current == NULL)
|
||||
qset->current = qset->head;
|
||||
queue = qset->current;
|
||||
if (queue) qset->current = queue->next;
|
||||
pthread_mutex_unlock(&qset->mutex);
|
||||
// pthread_mutex_unlock(&qset->mutex);
|
||||
if (queue == NULL) break;
|
||||
|
||||
pthread_mutex_lock(&queue->mutex);
|
||||
|
@ -365,6 +371,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
|
|||
if (code != 0) break;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&qset->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -40,66 +40,68 @@ static void *taosProcessSchedQueue(void *param);
|
|||
static void taosDumpSchedulerStatus(void *qhandle, void *tmrId);
|
||||
|
||||
void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
|
||||
pthread_attr_t attr;
|
||||
SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue));
|
||||
SSchedQueue *pSched = (SSchedQueue *)calloc(sizeof(SSchedQueue), 1);
|
||||
if (pSched == NULL) {
|
||||
pError("%s: no enough memory for pSched, reason: %s", label, strerror(errno));
|
||||
goto _error;
|
||||
pError("%s: no enough memory for pSched", label);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pSched->queue = (SSchedMsg *)calloc(sizeof(SSchedMsg), queueSize);
|
||||
if (pSched->queue == NULL) {
|
||||
pError("%s: no enough memory for queue", label);
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pSched->qthread = calloc(sizeof(pthread_t), numOfThreads);
|
||||
if (pSched->qthread == NULL) {
|
||||
pError("%s: no enough memory for qthread", label);
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
memset(pSched, 0, sizeof(SSchedQueue));
|
||||
pSched->queueSize = queueSize;
|
||||
strncpy(pSched->label, label, sizeof(pSched->label)); // fix buffer overflow
|
||||
pSched->label[sizeof(pSched->label)-1] = '\0';
|
||||
|
||||
if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) {
|
||||
pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (tsem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) {
|
||||
pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (tsem_init(&pSched->fullSem, 0, 0) != 0) {
|
||||
pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) {
|
||||
pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
|
||||
memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg));
|
||||
pSched->fullSlot = 0;
|
||||
pSched->emptySlot = 0;
|
||||
|
||||
pSched->qthread = malloc(sizeof(pthread_t) * (size_t)numOfThreads);
|
||||
if (pSched->qthread == NULL) {
|
||||
pError("%s: no enough memory for qthread, reason: %s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) {
|
||||
pError("init %s:queueMutex failed(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pthread_attr_init(&attr);
|
||||
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
|
||||
if (tsem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) {
|
||||
pError("init %s:empty semaphore failed(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (tsem_init(&pSched->fullSem, 0, 0) != 0) {
|
||||
pError("init %s:full semaphore failed(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int i = 0; i < numOfThreads; ++i) {
|
||||
if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) {
|
||||
pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
pthread_attr_t attr;
|
||||
pthread_attr_init(&attr);
|
||||
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
|
||||
int code = pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched);
|
||||
pthread_attr_destroy(&attr);
|
||||
if (code != 0) {
|
||||
pError("%s: failed to create rpc thread(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
++pSched->numOfThreads;
|
||||
}
|
||||
|
||||
pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads);
|
||||
pTrace("%s scheduler is initialized, numOfThreads:%d", label, pSched->numOfThreads);
|
||||
|
||||
return (void *)pSched;
|
||||
|
||||
_error:
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) {
|
||||
|
@ -124,21 +126,21 @@ void *taosProcessSchedQueue(void *param) {
|
|||
pTrace("wait %s fullSem was interrupted", pSched->label);
|
||||
continue;
|
||||
}
|
||||
pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno));
|
||||
pError("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
|
||||
}
|
||||
|
||||
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
|
||||
pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
|
||||
pError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
msg = pSched->queue[pSched->fullSlot];
|
||||
memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));
|
||||
pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;
|
||||
|
||||
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
|
||||
pError("unlock %s queueMutex failed, reason:%s\n", pSched->label, strerror(errno));
|
||||
pError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
if (tsem_post(&pSched->emptySem) != 0)
|
||||
pError("post %s emptySem failed, reason:%s\n", pSched->label, strerror(errno));
|
||||
pError("post %s emptySem failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
if (msg.fp)
|
||||
(*(msg.fp))(&msg);
|
||||
|
@ -158,22 +160,23 @@ int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
|
|||
|
||||
while (tsem_wait(&pSched->emptySem) != 0) {
|
||||
if (errno != EINTR) {
|
||||
pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno));
|
||||
pError("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
|
||||
break;
|
||||
}
|
||||
pTrace("wait %s emptySem was interrupted", pSched->label);
|
||||
}
|
||||
|
||||
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
|
||||
pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
|
||||
pError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
pSched->queue[pSched->emptySlot] = *pMsg;
|
||||
pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
|
||||
|
||||
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
|
||||
pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
|
||||
pError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
if (tsem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno));
|
||||
if (tsem_post(&pSched->fullSem) != 0)
|
||||
pError("post %s fullSem failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -183,10 +186,12 @@ void taosCleanUpScheduler(void *param) {
|
|||
if (pSched == NULL) return;
|
||||
|
||||
for (int i = 0; i < pSched->numOfThreads; ++i) {
|
||||
pthread_cancel(pSched->qthread[i]);
|
||||
if (pSched->qthread[i])
|
||||
pthread_cancel(pSched->qthread[i]);
|
||||
}
|
||||
for (int i = 0; i < pSched->numOfThreads; ++i) {
|
||||
pthread_join(pSched->qthread[i], NULL);
|
||||
if (pSched->qthread[i])
|
||||
pthread_join(pSched->qthread[i], NULL);
|
||||
}
|
||||
|
||||
tsem_destroy(&pSched->emptySem);
|
||||
|
@ -197,8 +202,8 @@ void taosCleanUpScheduler(void *param) {
|
|||
taosTmrStopA(&pSched->pTimer);
|
||||
}
|
||||
|
||||
free(pSched->queue);
|
||||
free(pSched->qthread);
|
||||
if (pSched->queue) free(pSched->queue);
|
||||
if (pSched->qthread) free(pSched->qthread);
|
||||
free(pSched); // fix memory leak
|
||||
}
|
||||
|
||||
|
|
|
@ -224,10 +224,11 @@ void vnodeRelease(void *pVnodeRaw) {
|
|||
// remove the whole directory
|
||||
}
|
||||
|
||||
dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId);
|
||||
free(pVnode);
|
||||
|
||||
int32_t count = atomic_sub_fetch_32(&tsOpennedVnodes, 1);
|
||||
dTrace("pVnode:%p vgId:%d, vnode is released, vnodes:%d", pVnode, vgId, count);
|
||||
|
||||
if (count <= 0) {
|
||||
taosCleanUpIntHash(tsDnodeVnodesHash);
|
||||
vnodeModuleInit = PTHREAD_ONCE_INIT;
|
||||
|
|
Loading…
Reference in New Issue