use semphore instead of polling for tqeueu
This commit is contained in:
parent
aa296b8ca6
commit
19836961ce
|
@ -74,8 +74,10 @@ void dnodeCleanupRead() {
|
||||||
|
|
||||||
for (int i=0; i < readPool.max; ++i) {
|
for (int i=0; i < readPool.max; ++i) {
|
||||||
SReadWorker *pWorker = readPool.readWorker + i;
|
SReadWorker *pWorker = readPool.readWorker + i;
|
||||||
if (pWorker->thread)
|
if (pWorker->thread) {
|
||||||
|
pthread_cancel(pWorker->thread);
|
||||||
pthread_join(pWorker->thread, NULL);
|
pthread_join(pWorker->thread, NULL);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCloseQset(readQset);
|
taosCloseQset(readQset);
|
||||||
|
@ -114,12 +116,12 @@ void dnodeRead(SRpcMsg *pMsg) {
|
||||||
pRead->pCont = pCont;
|
pRead->pCont = pCont;
|
||||||
pRead->contLen = pHead->contLen;
|
pRead->contLen = pHead->contLen;
|
||||||
|
|
||||||
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
|
|
||||||
|
|
||||||
// next vnode
|
// next vnode
|
||||||
leftLen -= pHead->contLen;
|
leftLen -= pHead->contLen;
|
||||||
pCont -= pHead->contLen;
|
pCont -= pHead->contLen;
|
||||||
queuedMsgNum++;
|
queuedMsgNum++;
|
||||||
|
|
||||||
|
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (queuedMsgNum == 0) {
|
if (queuedMsgNum == 0) {
|
||||||
|
|
|
@ -71,7 +71,10 @@ void dnodeCleanupWrite() {
|
||||||
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
|
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
|
||||||
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
|
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
|
||||||
if (pWorker->thread) {
|
if (pWorker->thread) {
|
||||||
|
pthread_cancel(pWorker->thread);
|
||||||
pthread_join(pWorker->thread, NULL);
|
pthread_join(pWorker->thread, NULL);
|
||||||
|
taosFreeQall(pWorker->qall);
|
||||||
|
taosCloseQset(pWorker->qset);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1364,7 +1364,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tTrace("%s %p, auth spi not matched, msg discarded", pRpc->label, pConn);
|
tTrace("%s %p, auth spi:%d not matched with received:%d", pRpc->label, pConn, pConn->spi, pHead->spi);
|
||||||
code = TSDB_CODE_AUTH_FAILURE;
|
code = TSDB_CODE_AUTH_FAILURE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,6 +41,7 @@ typedef struct _taos_qset {
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
int32_t numOfQueues;
|
int32_t numOfQueues;
|
||||||
int32_t numOfItems;
|
int32_t numOfItems;
|
||||||
|
tsem_t sem;
|
||||||
} STaosQset;
|
} STaosQset;
|
||||||
|
|
||||||
typedef struct _taos_qall {
|
typedef struct _taos_qall {
|
||||||
|
@ -59,6 +60,7 @@ taos_queue taosOpenQueue() {
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_init(&queue->mutex, NULL);
|
pthread_mutex_init(&queue->mutex, NULL);
|
||||||
|
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,7 +81,7 @@ void taosCloseQueue(taos_queue param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&queue->mutex);
|
pthread_mutex_unlock(&queue->mutex);
|
||||||
|
pthread_mutex_destroy(&queue->mutex);
|
||||||
free(queue);
|
free(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,11 +118,12 @@ int taosWriteQitem(taos_queue param, int type, void *item) {
|
||||||
|
|
||||||
queue->numOfItems++;
|
queue->numOfItems++;
|
||||||
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
|
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
|
||||||
|
|
||||||
uTrace("item:%p is put into queue:%p, type:%d items:%d", item, queue, type, queue->numOfItems);
|
uTrace("item:%p is put into queue:%p, type:%d items:%d", item, queue, type, queue->numOfItems);
|
||||||
|
|
||||||
pthread_mutex_unlock(&queue->mutex);
|
pthread_mutex_unlock(&queue->mutex);
|
||||||
|
|
||||||
|
if (queue->qset) tsem_post(&queue->qset->sem);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -217,12 +220,15 @@ taos_qset taosOpenQset() {
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_init(&qset->mutex, NULL);
|
pthread_mutex_init(&qset->mutex, NULL);
|
||||||
|
tsem_init(&qset->sem, 0, 0);
|
||||||
|
|
||||||
return qset;
|
return qset;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCloseQset(taos_qset param) {
|
void taosCloseQset(taos_qset param) {
|
||||||
STaosQset *qset = (STaosQset *)param;
|
STaosQset *qset = (STaosQset *)param;
|
||||||
|
pthread_mutex_destroy(&qset->mutex);
|
||||||
|
tsem_destroy(&qset->sem);
|
||||||
free(qset);
|
free(qset);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -298,6 +304,8 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
|
||||||
STaosQnode *pNode = NULL;
|
STaosQnode *pNode = NULL;
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
||||||
|
tsem_wait(&qset->sem);
|
||||||
|
|
||||||
pthread_mutex_lock(&qset->mutex);
|
pthread_mutex_lock(&qset->mutex);
|
||||||
|
|
||||||
for(int i=0; i<qset->numOfQueues; ++i) {
|
for(int i=0; i<qset->numOfQueues; ++i) {
|
||||||
|
@ -339,6 +347,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
|
||||||
STaosQall *qall = (STaosQall *)p2;
|
STaosQall *qall = (STaosQall *)p2;
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
||||||
|
tsem_wait(&qset->sem);
|
||||||
pthread_mutex_lock(&qset->mutex);
|
pthread_mutex_lock(&qset->mutex);
|
||||||
|
|
||||||
for(int i=0; i<qset->numOfQueues; ++i) {
|
for(int i=0; i<qset->numOfQueues; ++i) {
|
||||||
|
@ -364,6 +373,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
|
||||||
queue->tail = NULL;
|
queue->tail = NULL;
|
||||||
queue->numOfItems = 0;
|
queue->numOfItems = 0;
|
||||||
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
|
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
|
||||||
|
for (int j=1; j<qall->numOfItems; ++j) tsem_wait(&qset->sem);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&queue->mutex);
|
pthread_mutex_unlock(&queue->mutex);
|
||||||
|
|
Loading…
Reference in New Issue