Merge pull request #12465 from taosdata/fix/mnode
enh: control the memory of the rpc queue
This commit is contained in:
commit
1f5941ebde
|
@ -51,6 +51,7 @@ extern int32_t tsVnodeShmSize;
|
|||
extern int32_t tsQnodeShmSize;
|
||||
extern int32_t tsSnodeShmSize;
|
||||
extern int32_t tsBnodeShmSize;
|
||||
extern int32_t tsNumOfShmThreads;
|
||||
|
||||
// queue & threads
|
||||
extern int32_t tsNumOfRpcThreads;
|
||||
|
@ -67,6 +68,7 @@ extern int32_t tsNumOfQnodeQueryThreads;
|
|||
extern int32_t tsNumOfQnodeFetchThreads;
|
||||
extern int32_t tsNumOfSnodeSharedThreads;
|
||||
extern int32_t tsNumOfSnodeUniqueThreads;
|
||||
extern int64_t tsRpcQueueMemoryAllowed;
|
||||
|
||||
// monitor
|
||||
extern bool tsEnableMonitor;
|
||||
|
|
|
@ -89,6 +89,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0115)
|
||||
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0116)
|
||||
#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0117)
|
||||
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0118)
|
||||
|
||||
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0140)
|
||||
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0141)
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#define _TD_UTIL_PROCESS_H_
|
||||
|
||||
#include "os.h"
|
||||
#include "tqueue.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -25,7 +26,7 @@ extern "C" {
|
|||
typedef enum { PROC_FUNC_REQ = 1, PROC_FUNC_RSP, PROC_FUNC_REGIST, PROC_FUNC_RELEASE } EProcFuncType;
|
||||
|
||||
typedef struct SProcObj SProcObj;
|
||||
typedef void *(*ProcMallocFp)(int32_t contLen);
|
||||
typedef void *(*ProcMallocFp)(int32_t contLen, EQItype itype);
|
||||
typedef void *(*ProcFreeFp)(void *pCont);
|
||||
typedef void (*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
|
||||
EProcFuncType ftype);
|
||||
|
|
|
@ -48,18 +48,24 @@ typedef struct {
|
|||
int32_t threadNum;
|
||||
} SQueueInfo;
|
||||
|
||||
typedef enum {
|
||||
DEF_QITEM = 0,
|
||||
RPC_QITEM = 1,
|
||||
} EQItype;
|
||||
|
||||
typedef void (*FItem)(SQueueInfo *pInfo, void *pItem);
|
||||
typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems);
|
||||
|
||||
STaosQueue *taosOpenQueue();
|
||||
void taosCloseQueue(STaosQueue *queue);
|
||||
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
|
||||
void *taosAllocateQitem(int32_t size);
|
||||
void *taosAllocateQitem(int32_t size, EQItype itype);
|
||||
void taosFreeQitem(void *pItem);
|
||||
void taosWriteQitem(STaosQueue *queue, void *pItem);
|
||||
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
|
||||
bool taosQueueEmpty(STaosQueue *queue);
|
||||
int32_t taosQueueSize(STaosQueue *queue);
|
||||
int32_t taosQueueItemSize(STaosQueue *queue);
|
||||
int64_t taosQueueMemorySize(STaosQueue *queue);
|
||||
|
||||
STaosQall *taosAllocateQall();
|
||||
void taosFreeQall(STaosQall *qall);
|
||||
|
@ -77,8 +83,8 @@ int32_t taosGetQueueNumber(STaosQset *qset);
|
|||
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp);
|
||||
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp);
|
||||
void taosResetQsetThread(STaosQset *qset, void *pItem);
|
||||
int32_t taosGetQueueItemsNumber(STaosQueue *queue);
|
||||
int32_t taosGetQsetItemsNumber(STaosQset *qset);
|
||||
|
||||
extern int64_t tsRpcQueueMemoryAllowed;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -316,7 +316,7 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
|
|||
|
||||
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
|
||||
tmq_t* tmq = (tmq_t*)param;
|
||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
|
||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
|
||||
*pTaskType = TMQ_DELAYED_TASK__HB;
|
||||
taosWriteQitem(tmq->delayedTask, pTaskType);
|
||||
tsem_post(&tmq->rspSem);
|
||||
|
@ -324,7 +324,7 @@ void tmqAssignDelayedHbTask(void* param, void* tmrId) {
|
|||
|
||||
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
|
||||
tmq_t* tmq = (tmq_t*)param;
|
||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
|
||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
|
||||
*pTaskType = TMQ_DELAYED_TASK__COMMIT;
|
||||
taosWriteQitem(tmq->delayedTask, pTaskType);
|
||||
tsem_post(&tmq->rspSem);
|
||||
|
@ -332,7 +332,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
|
|||
|
||||
void tmqAssignDelayedReportTask(void* param, void* tmrId) {
|
||||
tmq_t* tmq = (tmq_t*)param;
|
||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
|
||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
|
||||
*pTaskType = TMQ_DELAYED_TASK__REPORT;
|
||||
taosWriteQitem(tmq->delayedTask, pTaskType);
|
||||
tsem_post(&tmq->rspSem);
|
||||
|
@ -848,7 +848,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
|
||||
}
|
||||
|
||||
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper));
|
||||
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
|
||||
if (pRspWrapper == NULL) {
|
||||
tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
|
||||
goto CREATE_MSG_FAIL;
|
||||
|
@ -987,7 +987,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
tmqUpdateEp(tmq, head->epoch, &rsp);
|
||||
tDeleteSMqAskEpRsp(&rsp);
|
||||
} else {
|
||||
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
|
||||
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
|
||||
if (pWrapper == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = -1;
|
||||
|
|
|
@ -44,6 +44,7 @@ int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 128;
|
|||
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
||||
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
||||
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
||||
int32_t tsNumOfShmThreads = 1;
|
||||
|
||||
// queue & threads
|
||||
int32_t tsNumOfRpcThreads = 1;
|
||||
|
@ -375,6 +376,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfRpcThreads = tsNumOfCores / 2;
|
||||
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4);
|
||||
|
@ -428,6 +430,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
tsNumOfSnodeUniqueThreads = TRANGE(tsNumOfSnodeUniqueThreads, 2, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
||||
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_WAL_SIZE * 10L, TSDB_MAX_WAL_SIZE * 10000L);
|
||||
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, 1, INT64_MAX, 0) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1;
|
||||
|
@ -568,6 +574,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
|
||||
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
||||
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
|
||||
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
|
||||
|
||||
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
|
||||
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
|
||||
|
|
|
@ -104,7 +104,7 @@ int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
static inline int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) {
|
||||
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
||||
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
|
||||
if (pMsg == NULL) return -1;
|
||||
|
||||
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
|
||||
|
|
|
@ -102,7 +102,7 @@ int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
|
||||
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
||||
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
|
||||
if (pMsg == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -126,10 +126,10 @@ int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
|||
|
||||
switch (qtype) {
|
||||
case QUERY_QUEUE:
|
||||
size = taosQueueSize(pMgmt->queryWorker.queue);
|
||||
size = taosQueueItemSize(pMgmt->queryWorker.queue);
|
||||
break;
|
||||
case FETCH_QUEUE:
|
||||
size = taosQueueSize(pMgmt->fetchWorker.queue);
|
||||
size = taosQueueItemSize(pMgmt->fetchWorker.queue);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
|
|
@ -326,7 +326,7 @@ static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType q
|
|||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||
if (pVnode == NULL) return -1;
|
||||
|
||||
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
||||
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
|
||||
int32_t code = 0;
|
||||
|
||||
if (pMsg != NULL) {
|
||||
|
@ -397,22 +397,22 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
|||
if (pVnode != NULL) {
|
||||
switch (qtype) {
|
||||
case WRITE_QUEUE:
|
||||
size = taosQueueSize(pVnode->pWriteQ);
|
||||
size = taosQueueItemSize(pVnode->pWriteQ);
|
||||
break;
|
||||
case SYNC_QUEUE:
|
||||
size = taosQueueSize(pVnode->pSyncQ);
|
||||
size = taosQueueItemSize(pVnode->pSyncQ);
|
||||
break;
|
||||
case APPLY_QUEUE:
|
||||
size = taosQueueSize(pVnode->pApplyQ);
|
||||
size = taosQueueItemSize(pVnode->pApplyQ);
|
||||
break;
|
||||
case QUERY_QUEUE:
|
||||
size = taosQueueSize(pVnode->pQueryQ);
|
||||
size = taosQueueItemSize(pVnode->pQueryQ);
|
||||
break;
|
||||
case FETCH_QUEUE:
|
||||
size = taosQueueSize(pVnode->pFetchQ);
|
||||
size = taosQueueItemSize(pVnode->pFetchQ);
|
||||
break;
|
||||
case MERGE_QUEUE:
|
||||
size = taosQueueSize(pVnode->pMergeQ);
|
||||
size = taosQueueItemSize(pVnode->pMergeQ);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
|
|
@ -79,7 +79,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
|
|||
needRelease = true;
|
||||
|
||||
if ((msgFp = dmGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
|
||||
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
|
||||
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM)) == NULL) goto _OVER;
|
||||
if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER;
|
||||
|
||||
if (pWrapper->procType != DND_PROC_PARENT) {
|
||||
|
|
|
@ -66,11 +66,7 @@ static void mndPullupTrans(SMnode *pMnode) {
|
|||
static void mndCalMqRebalance(SMnode *pMnode) {
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildTimerMsg(&contLen);
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TDMT_MND_MQ_TIMER,
|
||||
.pCont = pReq,
|
||||
.contLen = contLen,
|
||||
};
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
|
||||
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
||||
}
|
||||
|
||||
|
|
|
@ -88,9 +88,9 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
|
|||
|
||||
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
||||
uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
|
||||
if (taosQueueSize(pDispatcher->pDataBlocks) > capacity) {
|
||||
if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
|
||||
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
|
||||
taosQueueSize(pDispatcher->pDataBlocks));
|
||||
taosQueueItemSize(pDispatcher->pDataBlocks));
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -106,7 +106,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
|
|||
|
||||
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
|
||||
taosThreadMutexLock(&pDispatcher->mutex);
|
||||
int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks);
|
||||
int32_t blockNums = taosQueueItemSize(pDispatcher->pDataBlocks);
|
||||
int32_t status =
|
||||
(0 == blockNums ? DS_BUF_EMPTY
|
||||
: (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
|
||||
|
@ -124,7 +124,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
|
|||
|
||||
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
|
||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
|
||||
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM);
|
||||
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) {
|
|||
syncRpcMsgLog2((char *)"==syncIOEqMsg==", pMsg);
|
||||
|
||||
SRpcMsg *pTemp;
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
|
||||
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
||||
|
||||
STaosQueue *pMsgQ = queue;
|
||||
|
@ -360,7 +360,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
syncRpcMsgLog2((char *)"==syncIOProcessRequest==", pMsg);
|
||||
SSyncIO *io = pParent;
|
||||
SRpcMsg *pTemp;
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
|
||||
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
||||
taosWriteQitem(io->pMsgQ, pTemp);
|
||||
}
|
||||
|
@ -420,7 +420,7 @@ static void syncIOTickQ(void *param, void *tmrId) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncPingReply2RpcMsg(pMsg, &rpcMsg);
|
||||
SRpcMsg *pTemp;
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
|
||||
memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg));
|
||||
syncRpcMsgLog2((char *)"==syncIOTickQ==", &rpcMsg);
|
||||
taosWriteQitem(io->pMsgQ, pTemp);
|
||||
|
|
|
@ -114,7 +114,7 @@ int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char
|
|||
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SRpcMsg *pTemp;
|
||||
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
|
||||
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
||||
|
||||
tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
|
||||
|
|
|
@ -103,7 +103,7 @@ int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char
|
|||
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SRpcMsg *pTemp;
|
||||
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
|
||||
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
||||
|
||||
tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
|
||||
|
|
|
@ -95,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate keys to hash")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue")
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
|
||||
|
|
|
@ -258,8 +258,8 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
|
|||
int16_t headLen = CEIL8(rawHeadLen);
|
||||
int32_t bodyLen = CEIL8(rawBodyLen);
|
||||
|
||||
void *pHead = (*mallocHeadFp)(headLen);
|
||||
void *pBody = (*mallocBodyFp)(bodyLen);
|
||||
void *pHead = (*mallocHeadFp)(headLen, RPC_QITEM);
|
||||
void *pBody = (*mallocBodyFp)(bodyLen, RPC_QITEM);
|
||||
if (pHead == NULL || pBody == NULL) {
|
||||
taosThreadMutexUnlock(&pQueue->mutex);
|
||||
tsem_post(&pQueue->sem);
|
||||
|
|
|
@ -18,41 +18,45 @@
|
|||
#include "taoserror.h"
|
||||
#include "tlog.h"
|
||||
|
||||
int64_t tsRpcQueueMemoryAllowed = 0;
|
||||
int64_t tsRpcQueueMemoryUsed = 0;
|
||||
|
||||
typedef struct STaosQnode STaosQnode;
|
||||
|
||||
typedef struct STaosQnode {
|
||||
STaosQnode *next;
|
||||
STaosQueue *queue;
|
||||
int32_t size;
|
||||
int8_t itype;
|
||||
int8_t reserved[3];
|
||||
char item[];
|
||||
} STaosQnode;
|
||||
|
||||
typedef struct STaosQueue {
|
||||
int32_t itemSize;
|
||||
int32_t numOfItems;
|
||||
int32_t threadId;
|
||||
STaosQnode *head;
|
||||
STaosQnode *tail;
|
||||
STaosQueue *next; // for queue set
|
||||
STaosQset *qset; // for queue set
|
||||
void *ahandle; // for queue set
|
||||
FItem itemFp;
|
||||
FItems itemsFp;
|
||||
STaosQnode *head;
|
||||
STaosQnode *tail;
|
||||
STaosQueue *next; // for queue set
|
||||
STaosQset *qset; // for queue set
|
||||
void *ahandle; // for queue set
|
||||
FItem itemFp;
|
||||
FItems itemsFp;
|
||||
TdThreadMutex mutex;
|
||||
int64_t memOfItems;
|
||||
int32_t numOfItems;
|
||||
} STaosQueue;
|
||||
|
||||
typedef struct STaosQset {
|
||||
STaosQueue *head;
|
||||
STaosQueue *current;
|
||||
STaosQueue *head;
|
||||
STaosQueue *current;
|
||||
TdThreadMutex mutex;
|
||||
int32_t numOfQueues;
|
||||
int32_t numOfItems;
|
||||
tsem_t sem;
|
||||
tsem_t sem;
|
||||
int32_t numOfQueues;
|
||||
int32_t numOfItems;
|
||||
} STaosQset;
|
||||
|
||||
typedef struct STaosQall {
|
||||
STaosQnode *current;
|
||||
STaosQnode *start;
|
||||
int32_t itemSize;
|
||||
int32_t numOfItems;
|
||||
} STaosQall;
|
||||
|
||||
|
@ -118,32 +122,61 @@ bool taosQueueEmpty(STaosQueue *queue) {
|
|||
return empty;
|
||||
}
|
||||
|
||||
int32_t taosQueueSize(STaosQueue *queue) {
|
||||
int32_t taosQueueItemSize(STaosQueue *queue) {
|
||||
if (queue == NULL) return 0;
|
||||
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
int32_t numOfItems = queue->numOfItems;
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
return numOfItems;
|
||||
}
|
||||
|
||||
void *taosAllocateQitem(int32_t size) {
|
||||
int64_t taosQueueMemorySize(STaosQueue *queue) {
|
||||
if (queue == NULL) return 0;
|
||||
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
int64_t memOfItems = queue->memOfItems;
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
return memOfItems;
|
||||
}
|
||||
|
||||
void *taosAllocateQitem(int32_t size, EQItype itype) {
|
||||
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
|
||||
pNode->size = size;
|
||||
pNode->itype = itype;
|
||||
|
||||
if (pNode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
|
||||
if (itype == RPC_QITEM) {
|
||||
int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size);
|
||||
if (alloced > tsRpcQueueMemoryUsed) {
|
||||
taosMemoryFree(pNode);
|
||||
terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
|
||||
return NULL;
|
||||
}
|
||||
uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced);
|
||||
} else {
|
||||
uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
|
||||
}
|
||||
|
||||
return (void *)pNode->item;
|
||||
}
|
||||
|
||||
void taosFreeQitem(void *pItem) {
|
||||
if (pItem == NULL) return;
|
||||
|
||||
char *temp = pItem;
|
||||
temp -= sizeof(STaosQnode);
|
||||
uTrace("item:%p, node:%p is freed", pItem, temp);
|
||||
taosMemoryFree(temp);
|
||||
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
|
||||
if (pNode->itype > 0) {
|
||||
int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size);
|
||||
uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
|
||||
} else {
|
||||
uTrace("item:%p, node:%p is freed", pItem, pNode);
|
||||
}
|
||||
|
||||
taosMemoryFree(pNode);
|
||||
}
|
||||
|
||||
void taosWriteQitem(STaosQueue *queue, void *pItem) {
|
||||
|
@ -161,8 +194,9 @@ void taosWriteQitem(STaosQueue *queue, void *pItem) {
|
|||
}
|
||||
|
||||
queue->numOfItems++;
|
||||
queue->memOfItems += pNode->size;
|
||||
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
|
||||
uTrace("item:%p is put into queue:%p, items:%d", pItem, queue, queue->numOfItems);
|
||||
uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
|
||||
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
|
||||
|
@ -181,9 +215,11 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
|
|||
queue->head = pNode->next;
|
||||
if (queue->head == NULL) queue->tail = NULL;
|
||||
queue->numOfItems--;
|
||||
queue->memOfItems -= pNode->size;
|
||||
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
|
||||
code = 1;
|
||||
uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
|
||||
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
|
||||
queue->memOfItems);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
|
@ -191,7 +227,13 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
|
|||
return code;
|
||||
}
|
||||
|
||||
STaosQall *taosAllocateQall() { return taosMemoryCalloc(1, sizeof(STaosQall)); }
|
||||
STaosQall *taosAllocateQall() {
|
||||
STaosQall *qall = taosMemoryCalloc(1, sizeof(STaosQall));
|
||||
if (qall != NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
return qall;
|
||||
}
|
||||
|
||||
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
|
||||
|
||||
|
@ -207,12 +249,12 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
|
|||
qall->current = queue->head;
|
||||
qall->start = queue->head;
|
||||
qall->numOfItems = queue->numOfItems;
|
||||
qall->itemSize = queue->itemSize;
|
||||
code = qall->numOfItems;
|
||||
|
||||
queue->head = NULL;
|
||||
queue->tail = NULL;
|
||||
queue->numOfItems = 0;
|
||||
queue->memOfItems = 0;
|
||||
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
|
||||
}
|
||||
|
||||
|
@ -377,9 +419,11 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FI
|
|||
queue->head = pNode->next;
|
||||
if (queue->head == NULL) queue->tail = NULL;
|
||||
queue->numOfItems--;
|
||||
queue->memOfItems -= pNode->size;
|
||||
atomic_sub_fetch_32(&qset->numOfItems, 1);
|
||||
code = 1;
|
||||
uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
|
||||
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
|
||||
queue->memOfItems);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
|
@ -411,7 +455,6 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
|
|||
qall->current = queue->head;
|
||||
qall->start = queue->head;
|
||||
qall->numOfItems = queue->numOfItems;
|
||||
qall->itemSize = queue->itemSize;
|
||||
code = qall->numOfItems;
|
||||
if (ahandle) *ahandle = queue->ahandle;
|
||||
if (itemsFp) *itemsFp = queue->itemsFp;
|
||||
|
@ -419,6 +462,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
|
|||
queue->head = NULL;
|
||||
queue->tail = NULL;
|
||||
queue->numOfItems = 0;
|
||||
queue->memOfItems = 0;
|
||||
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
|
||||
for (int32_t j = 1; j < qall->numOfItems; ++j) {
|
||||
tsem_wait(&qset->sem);
|
||||
|
@ -444,23 +488,3 @@ void taosResetQsetThread(STaosQset *qset, void *pItem) {
|
|||
}
|
||||
taosThreadMutexUnlock(&qset->mutex);
|
||||
}
|
||||
|
||||
int32_t taosGetQueueItemsNumber(STaosQueue *queue) {
|
||||
if (!queue) return 0;
|
||||
|
||||
int32_t num;
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
num = queue->numOfItems;
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
return num;
|
||||
}
|
||||
|
||||
int32_t taosGetQsetItemsNumber(STaosQset *qset) {
|
||||
if (!qset) return 0;
|
||||
|
||||
int32_t num = 0;
|
||||
taosThreadMutexLock(&qset->mutex);
|
||||
num = qset->numOfItems;
|
||||
taosThreadMutexUnlock(&qset->mutex);
|
||||
return num;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue