refactor: control the memory of the rpc queue
This commit is contained in:
parent
53b031f262
commit
65e9e97252
|
@ -51,6 +51,7 @@ extern int32_t tsVnodeShmSize;
|
||||||
extern int32_t tsQnodeShmSize;
|
extern int32_t tsQnodeShmSize;
|
||||||
extern int32_t tsSnodeShmSize;
|
extern int32_t tsSnodeShmSize;
|
||||||
extern int32_t tsBnodeShmSize;
|
extern int32_t tsBnodeShmSize;
|
||||||
|
extern int32_t tsNumOfShmThreads;
|
||||||
|
|
||||||
// queue & threads
|
// queue & threads
|
||||||
extern int32_t tsNumOfRpcThreads;
|
extern int32_t tsNumOfRpcThreads;
|
||||||
|
@ -67,6 +68,8 @@ extern int32_t tsNumOfQnodeQueryThreads;
|
||||||
extern int32_t tsNumOfQnodeFetchThreads;
|
extern int32_t tsNumOfQnodeFetchThreads;
|
||||||
extern int32_t tsNumOfSnodeSharedThreads;
|
extern int32_t tsNumOfSnodeSharedThreads;
|
||||||
extern int32_t tsNumOfSnodeUniqueThreads;
|
extern int32_t tsNumOfSnodeUniqueThreads;
|
||||||
|
extern int64_t tsRpcQueueMemoryAllowed;
|
||||||
|
extern int64_t tsRpcQueueMemoryUsed;
|
||||||
|
|
||||||
// monitor
|
// monitor
|
||||||
extern bool tsEnableMonitor;
|
extern bool tsEnableMonitor;
|
||||||
|
|
|
@ -89,6 +89,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0115)
|
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0115)
|
||||||
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0116)
|
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0116)
|
||||||
#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0117)
|
#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0117)
|
||||||
|
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0118)
|
||||||
|
|
||||||
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0140)
|
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0140)
|
||||||
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0141)
|
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0141)
|
||||||
|
|
|
@ -59,7 +59,7 @@ void taosFreeQitem(void *pItem);
|
||||||
void taosWriteQitem(STaosQueue *queue, void *pItem);
|
void taosWriteQitem(STaosQueue *queue, void *pItem);
|
||||||
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
|
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
|
||||||
bool taosQueueEmpty(STaosQueue *queue);
|
bool taosQueueEmpty(STaosQueue *queue);
|
||||||
int32_t taosQueueSize(STaosQueue *queue);
|
int32_t taosQueueItemSize(STaosQueue *queue);
|
||||||
|
|
||||||
STaosQall *taosAllocateQall();
|
STaosQall *taosAllocateQall();
|
||||||
void taosFreeQall(STaosQall *qall);
|
void taosFreeQall(STaosQall *qall);
|
||||||
|
|
|
@ -44,6 +44,7 @@ int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 128;
|
||||||
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
||||||
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
||||||
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
||||||
|
int32_t tsNumOfShmThreads = 1;
|
||||||
|
|
||||||
// queue & threads
|
// queue & threads
|
||||||
int32_t tsNumOfRpcThreads = 1;
|
int32_t tsNumOfRpcThreads = 1;
|
||||||
|
@ -60,6 +61,8 @@ int32_t tsNumOfQnodeQueryThreads = 2;
|
||||||
int32_t tsNumOfQnodeFetchThreads = 2;
|
int32_t tsNumOfQnodeFetchThreads = 2;
|
||||||
int32_t tsNumOfSnodeSharedThreads = 2;
|
int32_t tsNumOfSnodeSharedThreads = 2;
|
||||||
int32_t tsNumOfSnodeUniqueThreads = 2;
|
int32_t tsNumOfSnodeUniqueThreads = 2;
|
||||||
|
int64_t tsRpcQueueMemoryAllowed = 0;
|
||||||
|
int64_t tsRpcQueueMemoryUsed = 0;
|
||||||
|
|
||||||
// monitor
|
// monitor
|
||||||
bool tsEnableMonitor = true;
|
bool tsEnableMonitor = true;
|
||||||
|
@ -374,6 +377,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
||||||
|
if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfRpcThreads = tsNumOfCores / 2;
|
tsNumOfRpcThreads = tsNumOfCores / 2;
|
||||||
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4);
|
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4);
|
||||||
|
@ -427,6 +431,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfSnodeUniqueThreads = TRANGE(tsNumOfSnodeUniqueThreads, 2, 4);
|
tsNumOfSnodeUniqueThreads = TRANGE(tsNumOfSnodeUniqueThreads, 2, 4);
|
||||||
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
|
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
||||||
|
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_WAL_SIZE * 10L, TSDB_MAX_WAL_SIZE * 10000L);
|
||||||
|
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, 1, INT64_MAX, 0) != 0) return -1;
|
||||||
|
|
||||||
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1;
|
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1;
|
||||||
|
@ -566,6 +574,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
|
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
|
||||||
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
||||||
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
|
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
|
||||||
|
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
|
||||||
|
|
||||||
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
|
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
|
||||||
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
|
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
|
||||||
|
|
|
@ -126,10 +126,10 @@ int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
||||||
|
|
||||||
switch (qtype) {
|
switch (qtype) {
|
||||||
case QUERY_QUEUE:
|
case QUERY_QUEUE:
|
||||||
size = taosQueueSize(pMgmt->queryWorker.queue);
|
size = taosQueueItemSize(pMgmt->queryWorker.queue);
|
||||||
break;
|
break;
|
||||||
case FETCH_QUEUE:
|
case FETCH_QUEUE:
|
||||||
size = taosQueueSize(pMgmt->fetchWorker.queue);
|
size = taosQueueItemSize(pMgmt->fetchWorker.queue);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -397,22 +397,22 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
||||||
if (pVnode != NULL) {
|
if (pVnode != NULL) {
|
||||||
switch (qtype) {
|
switch (qtype) {
|
||||||
case WRITE_QUEUE:
|
case WRITE_QUEUE:
|
||||||
size = taosQueueSize(pVnode->pWriteQ);
|
size = taosQueueItemSize(pVnode->pWriteQ);
|
||||||
break;
|
break;
|
||||||
case SYNC_QUEUE:
|
case SYNC_QUEUE:
|
||||||
size = taosQueueSize(pVnode->pSyncQ);
|
size = taosQueueItemSize(pVnode->pSyncQ);
|
||||||
break;
|
break;
|
||||||
case APPLY_QUEUE:
|
case APPLY_QUEUE:
|
||||||
size = taosQueueSize(pVnode->pApplyQ);
|
size = taosQueueItemSize(pVnode->pApplyQ);
|
||||||
break;
|
break;
|
||||||
case QUERY_QUEUE:
|
case QUERY_QUEUE:
|
||||||
size = taosQueueSize(pVnode->pQueryQ);
|
size = taosQueueItemSize(pVnode->pQueryQ);
|
||||||
break;
|
break;
|
||||||
case FETCH_QUEUE:
|
case FETCH_QUEUE:
|
||||||
size = taosQueueSize(pVnode->pFetchQ);
|
size = taosQueueItemSize(pVnode->pFetchQ);
|
||||||
break;
|
break;
|
||||||
case MERGE_QUEUE:
|
case MERGE_QUEUE:
|
||||||
size = taosQueueSize(pVnode->pMergeQ);
|
size = taosQueueItemSize(pVnode->pMergeQ);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -88,9 +88,9 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
|
||||||
|
|
||||||
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
||||||
uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
|
uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
|
||||||
if (taosQueueSize(pDispatcher->pDataBlocks) > capacity) {
|
if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
|
||||||
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
|
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
|
||||||
taosQueueSize(pDispatcher->pDataBlocks));
|
taosQueueItemSize(pDispatcher->pDataBlocks));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,7 +106,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
|
||||||
|
|
||||||
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
|
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
|
||||||
taosThreadMutexLock(&pDispatcher->mutex);
|
taosThreadMutexLock(&pDispatcher->mutex);
|
||||||
int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks);
|
int32_t blockNums = taosQueueItemSize(pDispatcher->pDataBlocks);
|
||||||
int32_t status =
|
int32_t status =
|
||||||
(0 == blockNums ? DS_BUF_EMPTY
|
(0 == blockNums ? DS_BUF_EMPTY
|
||||||
: (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
|
: (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
|
||||||
|
|
|
@ -95,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization")
|
TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate keys to hash")
|
TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate keys to hash")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue")
|
||||||
|
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
|
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
|
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
|
||||||
|
|
|
@ -23,36 +23,36 @@ typedef struct STaosQnode STaosQnode;
|
||||||
typedef struct STaosQnode {
|
typedef struct STaosQnode {
|
||||||
STaosQnode *next;
|
STaosQnode *next;
|
||||||
STaosQueue *queue;
|
STaosQueue *queue;
|
||||||
|
int32_t size;
|
||||||
char item[];
|
char item[];
|
||||||
} STaosQnode;
|
} STaosQnode;
|
||||||
|
|
||||||
typedef struct STaosQueue {
|
typedef struct STaosQueue {
|
||||||
int32_t itemSize;
|
int32_t memOfItems;
|
||||||
int32_t numOfItems;
|
int32_t numOfItems;
|
||||||
int32_t threadId;
|
int32_t threadId;
|
||||||
STaosQnode *head;
|
STaosQnode *head;
|
||||||
STaosQnode *tail;
|
STaosQnode *tail;
|
||||||
STaosQueue *next; // for queue set
|
STaosQueue *next; // for queue set
|
||||||
STaosQset *qset; // for queue set
|
STaosQset *qset; // for queue set
|
||||||
void *ahandle; // for queue set
|
void *ahandle; // for queue set
|
||||||
FItem itemFp;
|
FItem itemFp;
|
||||||
FItems itemsFp;
|
FItems itemsFp;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
} STaosQueue;
|
} STaosQueue;
|
||||||
|
|
||||||
typedef struct STaosQset {
|
typedef struct STaosQset {
|
||||||
STaosQueue *head;
|
STaosQueue *head;
|
||||||
STaosQueue *current;
|
STaosQueue *current;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
int32_t numOfQueues;
|
int32_t numOfQueues;
|
||||||
int32_t numOfItems;
|
int32_t numOfItems;
|
||||||
tsem_t sem;
|
tsem_t sem;
|
||||||
} STaosQset;
|
} STaosQset;
|
||||||
|
|
||||||
typedef struct STaosQall {
|
typedef struct STaosQall {
|
||||||
STaosQnode *current;
|
STaosQnode *current;
|
||||||
STaosQnode *start;
|
STaosQnode *start;
|
||||||
int32_t itemSize;
|
|
||||||
int32_t numOfItems;
|
int32_t numOfItems;
|
||||||
} STaosQall;
|
} STaosQall;
|
||||||
|
|
||||||
|
@ -118,15 +118,23 @@ bool taosQueueEmpty(STaosQueue *queue) {
|
||||||
return empty;
|
return empty;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosQueueSize(STaosQueue *queue) {
|
int32_t taosQueueItemSize(STaosQueue *queue) {
|
||||||
taosThreadMutexLock(&queue->mutex);
|
taosThreadMutexLock(&queue->mutex);
|
||||||
int32_t numOfItems = queue->numOfItems;
|
int32_t numOfItems = queue->numOfItems;
|
||||||
taosThreadMutexUnlock(&queue->mutex);
|
taosThreadMutexUnlock(&queue->mutex);
|
||||||
return numOfItems;
|
return numOfItems;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t taosQueueMemorySize(STaosQueue *queue) {
|
||||||
|
taosThreadMutexLock(&queue->mutex);
|
||||||
|
int32_t memOfItems = queue->memOfItems;
|
||||||
|
taosThreadMutexUnlock(&queue->mutex);
|
||||||
|
return memOfItems;
|
||||||
|
}
|
||||||
|
|
||||||
void *taosAllocateQitem(int32_t size) {
|
void *taosAllocateQitem(int32_t size) {
|
||||||
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
|
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
|
||||||
|
pNode->size = size;
|
||||||
|
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -161,8 +169,9 @@ void taosWriteQitem(STaosQueue *queue, void *pItem) {
|
||||||
}
|
}
|
||||||
|
|
||||||
queue->numOfItems++;
|
queue->numOfItems++;
|
||||||
|
queue->memOfItems += pNode->size;
|
||||||
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
|
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
|
||||||
uTrace("item:%p is put into queue:%p, items:%d", pItem, queue, queue->numOfItems);
|
uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&queue->mutex);
|
taosThreadMutexUnlock(&queue->mutex);
|
||||||
|
|
||||||
|
@ -181,9 +190,11 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
|
||||||
queue->head = pNode->next;
|
queue->head = pNode->next;
|
||||||
if (queue->head == NULL) queue->tail = NULL;
|
if (queue->head == NULL) queue->tail = NULL;
|
||||||
queue->numOfItems--;
|
queue->numOfItems--;
|
||||||
|
queue->memOfItems -= pNode->size;
|
||||||
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
|
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
|
||||||
code = 1;
|
code = 1;
|
||||||
uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
|
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
|
||||||
|
queue->memOfItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&queue->mutex);
|
taosThreadMutexUnlock(&queue->mutex);
|
||||||
|
@ -207,12 +218,12 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
|
||||||
qall->current = queue->head;
|
qall->current = queue->head;
|
||||||
qall->start = queue->head;
|
qall->start = queue->head;
|
||||||
qall->numOfItems = queue->numOfItems;
|
qall->numOfItems = queue->numOfItems;
|
||||||
qall->itemSize = queue->itemSize;
|
|
||||||
code = qall->numOfItems;
|
code = qall->numOfItems;
|
||||||
|
|
||||||
queue->head = NULL;
|
queue->head = NULL;
|
||||||
queue->tail = NULL;
|
queue->tail = NULL;
|
||||||
queue->numOfItems = 0;
|
queue->numOfItems = 0;
|
||||||
|
queue->memOfItems = 0;
|
||||||
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
|
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -377,9 +388,11 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FI
|
||||||
queue->head = pNode->next;
|
queue->head = pNode->next;
|
||||||
if (queue->head == NULL) queue->tail = NULL;
|
if (queue->head == NULL) queue->tail = NULL;
|
||||||
queue->numOfItems--;
|
queue->numOfItems--;
|
||||||
|
queue->memOfItems -= pNode->size;
|
||||||
atomic_sub_fetch_32(&qset->numOfItems, 1);
|
atomic_sub_fetch_32(&qset->numOfItems, 1);
|
||||||
code = 1;
|
code = 1;
|
||||||
uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
|
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
|
||||||
|
queue->memOfItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&queue->mutex);
|
taosThreadMutexUnlock(&queue->mutex);
|
||||||
|
@ -411,7 +424,6 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
|
||||||
qall->current = queue->head;
|
qall->current = queue->head;
|
||||||
qall->start = queue->head;
|
qall->start = queue->head;
|
||||||
qall->numOfItems = queue->numOfItems;
|
qall->numOfItems = queue->numOfItems;
|
||||||
qall->itemSize = queue->itemSize;
|
|
||||||
code = qall->numOfItems;
|
code = qall->numOfItems;
|
||||||
if (ahandle) *ahandle = queue->ahandle;
|
if (ahandle) *ahandle = queue->ahandle;
|
||||||
if (itemsFp) *itemsFp = queue->itemsFp;
|
if (itemsFp) *itemsFp = queue->itemsFp;
|
||||||
|
@ -419,6 +431,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
|
||||||
queue->head = NULL;
|
queue->head = NULL;
|
||||||
queue->tail = NULL;
|
queue->tail = NULL;
|
||||||
queue->numOfItems = 0;
|
queue->numOfItems = 0;
|
||||||
|
queue->memOfItems = 0;
|
||||||
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
|
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
|
||||||
for (int32_t j = 1; j < qall->numOfItems; ++j) {
|
for (int32_t j = 1; j < qall->numOfItems; ++j) {
|
||||||
tsem_wait(&qset->sem);
|
tsem_wait(&qset->sem);
|
||||||
|
|
Loading…
Reference in New Issue