Merge pull request #2077 from taosdata/hotfix/queue
set pNode->next to null, since item may be write into queue multiple …
This commit is contained in:
commit
75f50850a7
|
@ -18,24 +18,24 @@
|
|||
#include "taoserror.h"
|
||||
#include "tqueue.h"
|
||||
|
||||
typedef struct _taos_qnode {
|
||||
typedef struct STaosQnode {
|
||||
int type;
|
||||
struct _taos_qnode *next;
|
||||
struct STaosQnode *next;
|
||||
char item[];
|
||||
} STaosQnode;
|
||||
|
||||
typedef struct _taos_q {
|
||||
typedef struct STaosQueue {
|
||||
int32_t itemSize;
|
||||
int32_t numOfItems;
|
||||
struct _taos_qnode *head;
|
||||
struct _taos_qnode *tail;
|
||||
struct _taos_q *next; // for queue set
|
||||
struct _taos_qset *qset; // for queue set
|
||||
struct STaosQnode *head;
|
||||
struct STaosQnode *tail;
|
||||
struct STaosQueue *next; // for queue set
|
||||
struct STaosQset *qset; // for queue set
|
||||
void *ahandle; // for queue set
|
||||
pthread_mutex_t mutex;
|
||||
} STaosQueue;
|
||||
|
||||
typedef struct _taos_qset {
|
||||
typedef struct STaosQset {
|
||||
STaosQueue *head;
|
||||
STaosQueue *current;
|
||||
pthread_mutex_t mutex;
|
||||
|
@ -44,7 +44,7 @@ typedef struct _taos_qset {
|
|||
tsem_t sem;
|
||||
} STaosQset;
|
||||
|
||||
typedef struct _taos_qall {
|
||||
typedef struct STaosQall {
|
||||
STaosQnode *current;
|
||||
STaosQnode *start;
|
||||
int32_t itemSize;
|
||||
|
@ -95,6 +95,7 @@ void *taosAllocateQitem(int size) {
|
|||
void taosFreeQitem(void *param) {
|
||||
if (param == NULL) return;
|
||||
|
||||
uTrace("item:%p is freed", param);
|
||||
char *temp = (char *)param;
|
||||
temp -= sizeof(STaosQnode);
|
||||
free(temp);
|
||||
|
@ -104,6 +105,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) {
|
|||
STaosQueue *queue = (STaosQueue *)param;
|
||||
STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode));
|
||||
pNode->type = type;
|
||||
pNode->next = NULL;
|
||||
|
||||
pthread_mutex_lock(&queue->mutex);
|
||||
|
||||
|
@ -143,7 +145,7 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) {
|
|||
queue->numOfItems--;
|
||||
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
|
||||
code = 1;
|
||||
uTrace("item:%p is read out from queue, items:%d", *pitem, queue->numOfItems);
|
||||
uTrace("item:%p is read out from queue:%p, type:%d items:%d", *pitem, *type, queue->numOfItems);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
|
@ -337,6 +339,7 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
|
|||
queue->numOfItems--;
|
||||
atomic_sub_fetch_32(&qset->numOfItems, 1);
|
||||
code = 1;
|
||||
uTrace("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, *type, queue->numOfItems);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
|
|
Loading…
Reference in New Issue