refactor: shm queue in multi process mode
This commit is contained in:
parent
6e1d926f36
commit
77575d1e70
|
@ -41,8 +41,8 @@ typedef enum {
|
|||
typedef int32_t (*PutToQueueFp)(void* pMgmt, SRpcMsg* pMsg);
|
||||
typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype);
|
||||
typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg);
|
||||
typedef void (*SendRspFp)(const SRpcMsg* pMsg);
|
||||
typedef void (*SendRedirectRspFp)(const SRpcMsg* pMsg, const SEpSet* pNewEpSet);
|
||||
typedef void (*SendRspFp)(SRpcMsg* pMsg);
|
||||
typedef void (*SendRedirectRspFp)(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
|
||||
typedef void (*RegisterBrokenLinkArgFp)(SRpcMsg* pMsg);
|
||||
typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type);
|
||||
typedef void (*ReportStartup)(const char* name, const char* desc);
|
||||
|
@ -64,8 +64,8 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
|
|||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg);
|
||||
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
|
||||
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
|
||||
void tmsgSendRsp(const SRpcMsg* pMsg);
|
||||
void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet);
|
||||
void tmsgSendRsp(SRpcMsg* pMsg);
|
||||
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
|
||||
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg);
|
||||
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type);
|
||||
void tmsgReportStartup(const char* name, const char* desc);
|
||||
|
|
|
@ -21,9 +21,9 @@ static SMsgCb tsDefaultMsgCb;
|
|||
|
||||
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
|
||||
|
||||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
|
||||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg) {
|
||||
PutToQueueFp fp = pMsgCb->queueFps[qtype];
|
||||
return (*fp)(pMsgCb->mgmt, pReq);
|
||||
return (*fp)(pMsgCb->mgmt, pMsg);
|
||||
}
|
||||
|
||||
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
|
||||
|
@ -31,17 +31,17 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
|
|||
return (*fp)(pMsgCb->mgmt, vgId, qtype);
|
||||
}
|
||||
|
||||
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pReq) {
|
||||
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
|
||||
SendReqFp fp = tsDefaultMsgCb.sendReqFp;
|
||||
return (*fp)(epSet, pReq);
|
||||
return (*fp)(epSet, pMsg);
|
||||
}
|
||||
|
||||
void tmsgSendRsp(const SRpcMsg* pMsg) {
|
||||
void tmsgSendRsp(SRpcMsg* pMsg) {
|
||||
SendRspFp fp = tsDefaultMsgCb.sendRspFp;
|
||||
return (*fp)(pMsg);
|
||||
}
|
||||
|
||||
void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet) {
|
||||
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) {
|
||||
SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
|
||||
(*fp)(pMsg, pNewEpSet);
|
||||
}
|
||||
|
|
|
@ -75,6 +75,7 @@ int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
|
|||
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
|
||||
if (taosCheckPthreadValid(pMgmt->statusThread)) {
|
||||
taosThreadJoin(pMgmt->statusThread, NULL);
|
||||
taosThreadClear(&pMgmt->statusThread);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -95,6 +96,7 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
|
|||
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
|
||||
if (taosCheckPthreadValid(pMgmt->monitorThread)) {
|
||||
taosThreadJoin(pMgmt->monitorThread, NULL);
|
||||
taosThreadClear(&pMgmt->monitorThread);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -196,6 +196,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
|
|||
SVnodeThread *pThread = &threads[t];
|
||||
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
|
||||
taosThreadJoin(pThread->thread, NULL);
|
||||
taosThreadClear(&pThread->thread);
|
||||
}
|
||||
taosMemoryFree(pThread->pCfgs);
|
||||
}
|
||||
|
|
|
@ -151,12 +151,10 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper);
|
|||
void dmCleanupProc(struct SMgmtWrapper *pWrapper);
|
||||
int32_t dmRunProc(SProc *proc);
|
||||
void dmStopProc(SProc *proc);
|
||||
int64_t dmRemoveProcRpcHandle(SProc *proc, void *handle);
|
||||
void dmRemoveProcRpcHandle(SProc *proc, void *handle);
|
||||
void dmCloseProcRpcHandles(SProc *proc);
|
||||
int32_t dmPutToProcCQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
||||
void *handle, int64_t handleRef, EProcFuncType ftype);
|
||||
void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
||||
EProcFuncType ftype);
|
||||
int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype);
|
||||
void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype);
|
||||
|
||||
// dmTransport.c
|
||||
int32_t dmInitServer(SDnode *pDnode);
|
||||
|
|
|
@ -16,10 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "dmMgmt.h"
|
||||
|
||||
static inline int32_t CEIL8(int32_t v) {
|
||||
const int32_t c = ceil((float)(v) / 8) * 8;
|
||||
return c < 8 ? 8 : c;
|
||||
}
|
||||
static inline int32_t CEIL8(int32_t v) { return ceil((float)(v) / 8) * 8; }
|
||||
|
||||
static int32_t dmInitProcMutex(SProcQueue *queue) {
|
||||
TdThreadMutexAttr mattr = {0};
|
||||
|
@ -87,42 +84,17 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
|
|||
return queue;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static void dmDestroyProcQueue(SProcQueue *queue) {
|
||||
if (queue->mutex != NULL) {
|
||||
taosThreadMutexDestroy(queue->mutex);
|
||||
queue->mutex = NULL;
|
||||
}
|
||||
}
|
||||
static void dmCleanupProcQueue(SProcQueue *queue) {}
|
||||
|
||||
static void dmDestroyProcSem(SProcQueue *queue) {
|
||||
if (queue->sem != NULL) {
|
||||
tsem_destroy(queue->sem);
|
||||
queue->sem = NULL;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
static void dmCleanupProcQueue(SProcQueue *queue) {
|
||||
#if 0
|
||||
if (queue != NULL) {
|
||||
dmDestroyProcQueue(queue);
|
||||
dmDestroyProcSem(queue);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHead, int16_t rawHeadLen,
|
||||
const char *pBody, int32_t rawBodyLen, int64_t handle, int64_t handleRef,
|
||||
EProcFuncType ftype) {
|
||||
if (rawHeadLen == 0 || pHead == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return -1;
|
||||
}
|
||||
|
||||
const int32_t headLen = CEIL8(rawHeadLen);
|
||||
static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) {
|
||||
const void *pHead = pMsg;
|
||||
const void *pBody = pMsg->pCont;
|
||||
const int16_t rawHeadLen = sizeof(SRpcMsg);
|
||||
const int32_t rawBodyLen = pMsg->contLen;
|
||||
const int16_t headLen = CEIL8(rawHeadLen);
|
||||
const int32_t bodyLen = CEIL8(rawBodyLen);
|
||||
const int32_t fullLen = headLen + bodyLen + 8;
|
||||
const int64_t handle = (int64_t)pMsg->info.handle;
|
||||
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
if (fullLen > queue->avail) {
|
||||
|
@ -131,8 +103,8 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (handle != 0 && ftype == DND_FUNC_REQ) {
|
||||
if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) {
|
||||
if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0) {
|
||||
if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &pMsg->info, sizeof(SRpcConnInfo)) != 0) {
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
return -1;
|
||||
}
|
||||
|
@ -151,31 +123,31 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
|
|||
|
||||
if (queue->tail < queue->head) {
|
||||
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
|
||||
memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen);
|
||||
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen);
|
||||
queue->tail = queue->tail + 8 + headLen + bodyLen;
|
||||
} else {
|
||||
int32_t remain = queue->total - queue->tail;
|
||||
if (remain == 0) {
|
||||
memcpy(queue->pBuffer + 8, pHead, rawHeadLen);
|
||||
memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen);
|
||||
if (rawBodyLen > 0) memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen);
|
||||
queue->tail = 8 + headLen + bodyLen;
|
||||
} else if (remain == 8) {
|
||||
memcpy(queue->pBuffer, pHead, rawHeadLen);
|
||||
memcpy(queue->pBuffer + headLen, pBody, rawBodyLen);
|
||||
if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen, pBody, rawBodyLen);
|
||||
queue->tail = headLen + bodyLen;
|
||||
} else if (remain < 8 + headLen) {
|
||||
memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8);
|
||||
memcpy(queue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8));
|
||||
memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
|
||||
if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
|
||||
queue->tail = headLen - (remain - 8) + bodyLen;
|
||||
} else if (remain < 8 + headLen + bodyLen) {
|
||||
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
|
||||
memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
|
||||
memcpy(queue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
|
||||
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
|
||||
if (rawBodyLen > 0) memcpy(queue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
|
||||
queue->tail = bodyLen - (remain - 8 - headLen);
|
||||
} else {
|
||||
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
|
||||
memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen);
|
||||
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen);
|
||||
queue->tail = queue->tail + headLen + bodyLen + 8;
|
||||
}
|
||||
}
|
||||
|
@ -185,13 +157,12 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
|
|||
taosThreadMutexUnlock(&queue->mutex);
|
||||
tsem_post(&queue->sem);
|
||||
|
||||
dTrace("node:%s, push %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen,
|
||||
pBody, bodyLen, pos, queue->items);
|
||||
dTrace("node:%s, push %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype),
|
||||
pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, pMsg->code, pos, queue->items);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHeadLen, void **ppBody, int32_t *pBodyLen,
|
||||
EProcFuncType *pFuncType) {
|
||||
static int32_t dmPopFromProcQueue(SProcQueue *queue, SRpcMsg **ppMsg, EProcFuncType *pFuncType) {
|
||||
tsem_wait(&queue->sem);
|
||||
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
|
@ -217,8 +188,9 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
|
|||
int32_t bodyLen = CEIL8(rawBodyLen);
|
||||
|
||||
void *pHead = taosAllocateQitem(headLen, DEF_QITEM);
|
||||
void *pBody = rpcMallocCont(bodyLen);
|
||||
if (pHead == NULL || pBody == NULL) {
|
||||
void *pBody = NULL;
|
||||
if (bodyLen > 0) pBody = rpcMallocCont(bodyLen);
|
||||
if (pHead == NULL || (bodyLen > 0 && pBody == NULL)) {
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
tsem_post(&queue->sem);
|
||||
taosFreeQitem(pHead);
|
||||
|
@ -230,31 +202,31 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
|
|||
const int32_t pos = queue->head;
|
||||
if (queue->head < queue->tail) {
|
||||
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
|
||||
memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen);
|
||||
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen);
|
||||
queue->head = queue->head + 8 + headLen + bodyLen;
|
||||
} else {
|
||||
int32_t remain = queue->total - queue->head;
|
||||
if (remain == 0) {
|
||||
memcpy(pHead, queue->pBuffer + 8, headLen);
|
||||
memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen);
|
||||
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen);
|
||||
queue->head = 8 + headLen + bodyLen;
|
||||
} else if (remain == 8) {
|
||||
memcpy(pHead, queue->pBuffer, headLen);
|
||||
memcpy(pBody, queue->pBuffer + headLen, bodyLen);
|
||||
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen, bodyLen);
|
||||
queue->head = headLen + bodyLen;
|
||||
} else if (remain < 8 + headLen) {
|
||||
memcpy(pHead, queue->pBuffer + queue->head + 8, remain - 8);
|
||||
memcpy((char *)pHead + remain - 8, queue->pBuffer, headLen - (remain - 8));
|
||||
memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen);
|
||||
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen);
|
||||
queue->head = headLen - (remain - 8) + bodyLen;
|
||||
} else if (remain < 8 + headLen + bodyLen) {
|
||||
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
|
||||
memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen);
|
||||
memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen));
|
||||
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen);
|
||||
if (bodyLen > 0) memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen));
|
||||
queue->head = bodyLen - (remain - 8 - headLen);
|
||||
} else {
|
||||
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
|
||||
memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen);
|
||||
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen);
|
||||
queue->head = queue->head + headLen + bodyLen + 8;
|
||||
}
|
||||
}
|
||||
|
@ -263,14 +235,12 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
|
|||
queue->items--;
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
|
||||
*ppHead = pHead;
|
||||
*ppBody = pBody;
|
||||
*pHeadLen = rawHeadLen;
|
||||
*pBodyLen = rawBodyLen;
|
||||
*ppMsg = pHead;
|
||||
(*ppMsg)->pCont = pBody;
|
||||
*pFuncType = (EProcFuncType)ftype;
|
||||
|
||||
dTrace("node:%s, pop %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen, pBody,
|
||||
bodyLen, pos, queue->items);
|
||||
dTrace("node:%s, pop %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype),
|
||||
(*ppMsg), (*ppMsg)->msgType, (*ppMsg)->info.handle, (*ppMsg)->contLen, (*ppMsg)->code, pos, queue->items);
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -308,18 +278,14 @@ static void *dmConsumChildQueue(void *param) {
|
|||
SProc *proc = param;
|
||||
SMgmtWrapper *pWrapper = proc->wrapper;
|
||||
SProcQueue *queue = proc->cqueue;
|
||||
void *pHead = NULL;
|
||||
void *pBody = NULL;
|
||||
int16_t headLen = 0;
|
||||
int32_t bodyLen = 0;
|
||||
int32_t numOfMsgs = 0;
|
||||
int32_t code = 0;
|
||||
EProcFuncType ftype = DND_FUNC_REQ;
|
||||
SRpcMsg *pReq = NULL;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
||||
dDebug("node:%s, start to consume from cqueue", proc->name);
|
||||
do {
|
||||
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
|
||||
numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype);
|
||||
if (numOfMsgs == 0) {
|
||||
dDebug("node:%s, get no msg from cqueue and exit thread", proc->name);
|
||||
break;
|
||||
|
@ -332,25 +298,24 @@ static void *dmConsumChildQueue(void *param) {
|
|||
}
|
||||
|
||||
if (ftype != DND_FUNC_REQ) {
|
||||
dFatal("node:%s, get msg:%p from cqueue, invalid ftype:%d", proc->name, pHead, ftype);
|
||||
taosFreeQitem(pHead);
|
||||
rpcFreeCont(pBody);
|
||||
} else {
|
||||
pReq = pHead;
|
||||
pReq->pCont = pBody;
|
||||
code = dmProcessNodeMsg(pWrapper, pReq);
|
||||
if (code != 0) {
|
||||
dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pReq, terrstr());
|
||||
SRpcMsg rspMsg = {
|
||||
.info = pReq->info,
|
||||
.pCont = pReq->info.rsp,
|
||||
.contLen = pReq->info.rspLen,
|
||||
};
|
||||
dmPutToProcPQueue(proc, &rspMsg, sizeof(SRpcMsg), rspMsg.pCont, rspMsg.contLen, DND_FUNC_RSP);
|
||||
taosFreeQitem(pHead);
|
||||
rpcFreeCont(pBody);
|
||||
rpcFreeCont(rspMsg.pCont);
|
||||
dError("node:%s, invalid ftype:%d from cqueue", proc->name, ftype);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
continue;
|
||||
}
|
||||
|
||||
code = dmProcessNodeMsg(pWrapper, pMsg);
|
||||
if (code != 0) {
|
||||
dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pMsg, terrstr());
|
||||
SRpcMsg rsp = {
|
||||
.code = (terrno != 0 ? terrno : code),
|
||||
.pCont = pMsg->info.rsp,
|
||||
.contLen = pMsg->info.rspLen,
|
||||
.info = pMsg->info,
|
||||
};
|
||||
dmPutToProcPQueue(proc, &rsp, DND_FUNC_RSP);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
} while (1);
|
||||
|
||||
|
@ -361,18 +326,14 @@ static void *dmConsumParentQueue(void *param) {
|
|||
SProc *proc = param;
|
||||
SMgmtWrapper *pWrapper = proc->wrapper;
|
||||
SProcQueue *queue = proc->pqueue;
|
||||
void *pHead = NULL;
|
||||
void *pBody = NULL;
|
||||
int16_t headLen = 0;
|
||||
int32_t bodyLen = 0;
|
||||
int32_t numOfMsgs = 0;
|
||||
int32_t code = 0;
|
||||
EProcFuncType ftype = DND_FUNC_REQ;
|
||||
SRpcMsg *pRsp = NULL;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
||||
dDebug("node:%s, start to consume from pqueue", proc->name);
|
||||
do {
|
||||
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
|
||||
numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype);
|
||||
if (numOfMsgs == 0) {
|
||||
dDebug("node:%s, get no msg from pqueue and exit thread", proc->name);
|
||||
break;
|
||||
|
@ -385,31 +346,19 @@ static void *dmConsumParentQueue(void *param) {
|
|||
}
|
||||
|
||||
if (ftype == DND_FUNC_RSP) {
|
||||
pRsp = pHead;
|
||||
pRsp->pCont = pBody;
|
||||
dTrace("node:%s, get rsp msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->info.handle);
|
||||
dmRemoveProcRpcHandle(proc, pRsp->info.handle);
|
||||
rpcSendResponse(pRsp);
|
||||
dmRemoveProcRpcHandle(proc, pMsg->info.handle);
|
||||
rpcSendResponse(pMsg);
|
||||
} else if (ftype == DND_FUNC_REGIST) {
|
||||
pRsp = pHead;
|
||||
pRsp->pCont = pBody;
|
||||
dTrace("node:%s, get regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code,
|
||||
pRsp->info.handle);
|
||||
rpcRegisterBrokenLinkArg(pRsp);
|
||||
rpcRegisterBrokenLinkArg(pMsg);
|
||||
} else if (ftype == DND_FUNC_RELEASE) {
|
||||
pRsp = pHead;
|
||||
pRsp->pCont = NULL;
|
||||
dTrace("node:%s, get release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code,
|
||||
pRsp->info.handle);
|
||||
dmRemoveProcRpcHandle(proc, pRsp->info.handle);
|
||||
rpcReleaseHandle(pRsp->info.handle, (int8_t)pRsp->code);
|
||||
rpcFreeCont(pBody);
|
||||
dmRemoveProcRpcHandle(proc, pMsg->info.handle);
|
||||
rpcReleaseHandle(pMsg->info.handle, (int8_t)pMsg->code);
|
||||
} else {
|
||||
dFatal("node:%s, get msg:%p from pqueue, invalid ftype:%d", proc->name, pHead, ftype);
|
||||
rpcFreeCont(pBody);
|
||||
dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
|
||||
taosFreeQitem(pHead);
|
||||
taosFreeQitem(pMsg);
|
||||
} while (1);
|
||||
|
||||
return NULL;
|
||||
|
@ -468,51 +417,55 @@ void dmCleanupProc(struct SMgmtWrapper *pWrapper) {
|
|||
dmCleanupProcQueue(proc->cqueue);
|
||||
dmCleanupProcQueue(proc->pqueue);
|
||||
taosHashCleanup(proc->hash);
|
||||
proc->hash = NULL;
|
||||
dDebug("node:%s, proc is cleaned up", pWrapper->name);
|
||||
}
|
||||
|
||||
int64_t dmRemoveProcRpcHandle(SProc *proc, void *handle) {
|
||||
void dmRemoveProcRpcHandle(SProc *proc, void *handle) {
|
||||
int64_t h = (int64_t)handle;
|
||||
taosThreadMutexLock(&proc->cqueue->mutex);
|
||||
|
||||
int64_t *pRef = taosHashGet(proc->hash, &h, sizeof(int64_t));
|
||||
int64_t ref = 0;
|
||||
if (pRef != NULL) {
|
||||
ref = *pRef;
|
||||
}
|
||||
|
||||
taosHashRemove(proc->hash, &h, sizeof(int64_t));
|
||||
taosThreadMutexUnlock(&proc->cqueue->mutex);
|
||||
|
||||
return ref;
|
||||
}
|
||||
|
||||
void dmCloseProcRpcHandles(SProc *proc) {
|
||||
taosThreadMutexLock(&proc->cqueue->mutex);
|
||||
void *h = taosHashIterate(proc->hash, NULL);
|
||||
while (h != NULL) {
|
||||
void *handle = *((void **)h);
|
||||
h = taosHashIterate(proc->hash, h);
|
||||
|
||||
dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, handle);
|
||||
SRpcMsg rpcMsg = {.info.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
|
||||
SRpcHandleInfo *pInfo = taosHashIterate(proc->hash, NULL);
|
||||
while (pInfo != NULL) {
|
||||
dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, pInfo->handle);
|
||||
SRpcMsg rpcMsg = {.info = *pInfo, .code = TSDB_CODE_NODE_OFFLINE};
|
||||
rpcSendResponse(&rpcMsg);
|
||||
pInfo = taosHashIterate(proc->hash, pInfo);
|
||||
}
|
||||
taosHashClear(proc->hash);
|
||||
taosThreadMutexUnlock(&proc->cqueue->mutex);
|
||||
}
|
||||
|
||||
void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
||||
EProcFuncType ftype) {
|
||||
void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
|
||||
int32_t retry = 0;
|
||||
while (dmPushToProcQueue(proc, proc->pqueue, pHead, headLen, pBody, bodyLen, 0, 0, ftype) != 0) {
|
||||
dWarn("node:%s, failed to put msg:%p to pqueue since %s, retry:%d", proc->name, pHead, terrstr(), retry);
|
||||
while (1) {
|
||||
if (dmPushToProcQueue(proc, proc->pqueue, pMsg, ftype) == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (retry == 10) {
|
||||
pMsg->code = terrno;
|
||||
if (pMsg->contLen > 0) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
pMsg->contLen = 0;
|
||||
}
|
||||
dError("node:%s, failed to push %s msg:%p type:%d handle:%p then discard data and return error", proc->name,
|
||||
dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle);
|
||||
} else {
|
||||
dError("node:%s, failed to push %s msg:%p type:%d handle:%p len:%d since %s, retry:%d", proc->name,
|
||||
dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, terrstr(), retry);
|
||||
retry++;
|
||||
taosMsleep(retry);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t dmPutToProcCQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
||||
void *handle, int64_t ref, EProcFuncType ftype) {
|
||||
return dmPushToProcQueue(proc, proc->cqueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ref, ftype);
|
||||
}
|
||||
|
||||
int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
|
||||
return dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype);
|
||||
}
|
||||
|
|
|
@ -128,9 +128,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
}
|
||||
|
||||
if (InParentProc(pWrapper)) {
|
||||
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen,
|
||||
(IsReq(pRpc) && (pRpc->code == 0)) ? pRpc->info.handle : NULL, pRpc->info.refId,
|
||||
DND_FUNC_REQ);
|
||||
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
|
||||
} else {
|
||||
code = dmProcessNodeMsg(pWrapper, pMsg);
|
||||
}
|
||||
|
@ -255,23 +253,23 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
static inline void dmSendRsp(const SRpcMsg *pMsg) {
|
||||
static inline void dmSendRsp(SRpcMsg *pMsg) {
|
||||
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
||||
if (InChildProc(pWrapper)) {
|
||||
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_RSP);
|
||||
} else {
|
||||
if (pMsg->code == TSDB_CODE_NODE_REDIRECT) {
|
||||
dmSendRpcRedirectRsp(pMsg);
|
||||
} else {
|
||||
if (InChildProc(pWrapper)) {
|
||||
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
|
||||
} else {
|
||||
rpcSendResponse(pMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
|
||||
SMgmtWrapper *pWrapper = pRsp->info.wrapper;
|
||||
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
|
||||
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
||||
if (InChildProc(pWrapper)) {
|
||||
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
|
||||
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
|
||||
} else {
|
||||
SRpcMsg rsp = {0};
|
||||
SMEpSet msg = {.epSet = *pNewEpSet};
|
||||
|
@ -281,7 +279,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe
|
|||
tSerializeSMEpSet(rsp.pCont, len, &msg);
|
||||
|
||||
rsp.code = TSDB_CODE_RPC_REDIRECT;
|
||||
rsp.info = pRsp->info;
|
||||
rsp.info = pMsg->info;
|
||||
rpcSendResponse(&rsp);
|
||||
}
|
||||
}
|
||||
|
@ -289,7 +287,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe
|
|||
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
|
||||
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
||||
if (InChildProc(pWrapper)) {
|
||||
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST);
|
||||
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
|
||||
} else {
|
||||
rpcRegisterBrokenLinkArg(pMsg);
|
||||
}
|
||||
|
@ -299,7 +297,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
|
|||
SMgmtWrapper *pWrapper = pHandle->wrapper;
|
||||
if (InChildProc(pWrapper)) {
|
||||
SRpcMsg msg = {.code = type, .info = *pHandle};
|
||||
dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE);
|
||||
dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
|
||||
} else {
|
||||
rpcReleaseHandle(pHandle->handle, type);
|
||||
}
|
||||
|
|
|
@ -122,7 +122,7 @@ static void mndCleanupTimer(SMnode *pMnode) {
|
|||
pMnode->stopped = true;
|
||||
if (taosCheckPthreadValid(pMnode->thread)) {
|
||||
taosThreadJoin(pMnode->thread, NULL);
|
||||
memset(&pMnode->thread, 0, sizeof(pMnode->thread));
|
||||
taosThreadClear(&pMnode->thread);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,10 +16,9 @@
|
|||
#include "tcache.h"
|
||||
|
||||
void reportStartup(const char *name, const char *desc) {}
|
||||
void sendRsp(const SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); }
|
||||
void sendRsp(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); }
|
||||
|
||||
int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||
// rpcFreeCont(pMsg->pCont);
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -237,6 +237,7 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
|
|||
int32_t ret = 0;
|
||||
atomic_store_8(&io->isStart, 0);
|
||||
taosThreadJoin(io->consumerTid, NULL);
|
||||
taosThreadClear(&io->consumerTid);
|
||||
taosTmrCleanUp(io->timerMgr);
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -244,6 +244,7 @@ static void walStopThread() {
|
|||
|
||||
if (taosCheckPthreadValid(tsWal.thread)) {
|
||||
taosThreadJoin(tsWal.thread, NULL);
|
||||
taosThreadClear(&tsWal.thread);
|
||||
}
|
||||
|
||||
wDebug("wal thread is stopped");
|
||||
|
|
|
@ -145,6 +145,7 @@ void taosCloseLog() {
|
|||
taosStopLog();
|
||||
if (tsLogObj.logHandle != NULL && taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) {
|
||||
taosThreadJoin(tsLogObj.logHandle->asyncThread, NULL);
|
||||
taosThreadClear(&tsLogObj.logHandle->asyncThread);
|
||||
}
|
||||
tsLogInited = 0;
|
||||
|
||||
|
|
|
@ -209,6 +209,7 @@ void taosCleanUpScheduler(void *param) {
|
|||
for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
|
||||
if (taosCheckPthreadValid(pSched->qthread[i])) {
|
||||
taosThreadJoin(pSched->qthread[i], NULL);
|
||||
taosThreadClear(&pSched->qthread[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -57,6 +57,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
|
|||
if (worker == NULL) continue;
|
||||
if (taosCheckPthreadValid(worker->thread)) {
|
||||
taosThreadJoin(worker->thread, NULL);
|
||||
taosThreadClear(&worker->thread);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -179,6 +180,7 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
|
|||
SWWorker *worker = pool->workers + i;
|
||||
if (taosCheckPthreadValid(worker->thread)) {
|
||||
taosThreadJoin(worker->thread, NULL);
|
||||
taosThreadClear(&worker->thread);
|
||||
taosFreeQall(worker->qall);
|
||||
taosCloseQset(worker->qset);
|
||||
}
|
||||
|
|
|
@ -436,6 +436,7 @@ int32_t main(int32_t argc, char *argv[]) {
|
|||
taosMsleep(300);
|
||||
for (int32_t i = 0; i < numOfThreads; i++) {
|
||||
taosThreadJoin(pInfo[i].thread, NULL);
|
||||
taosThreadClear(&pInfo[i].thread);
|
||||
}
|
||||
|
||||
int64_t maxDelay = 0;
|
||||
|
|
|
@ -537,6 +537,7 @@ int main(int32_t argc, char* argv[]) {
|
|||
|
||||
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
|
||||
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
|
||||
taosThreadClear(&g_stConfInfo.stThreads[i].thread);
|
||||
}
|
||||
|
||||
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
|
||||
|
|
|
@ -56,6 +56,7 @@ void simFreeScript(SScript *script) {
|
|||
bgScript->killed = true;
|
||||
if (taosCheckPthreadValid(bgScript->bgPid)) {
|
||||
taosThreadJoin(bgScript->bgPid, NULL);
|
||||
taosThreadClear(&bgScript->bgPid);
|
||||
}
|
||||
|
||||
simDebug("script:%s, background thread joined", bgScript->fileName);
|
||||
|
|
|
@ -985,6 +985,7 @@ int32_t shellExecute() {
|
|||
while (1) {
|
||||
taosThreadCreate(&shell.pid, NULL, shellThreadLoop, shell.conn);
|
||||
taosThreadJoin(shell.pid, NULL);
|
||||
taosThreadClear(&shell.pid);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
Loading…
Reference in New Issue