commit
1d1167676f
|
@ -126,7 +126,9 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg);
|
||||
}
|
||||
|
||||
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); }
|
||||
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg);
|
||||
}
|
||||
|
||||
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
||||
SSingleWorkerCfg qCfg = {
|
||||
|
|
|
@ -70,11 +70,10 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
if (code != 0) {
|
||||
if (terrno != 0) code = terrno;
|
||||
vmSendRsp(pMsg, code);
|
||||
|
||||
}
|
||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
|
@ -85,16 +84,15 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
if (code != 0) {
|
||||
if (terrno != 0) code = terrno;
|
||||
vmSendRsp(pMsg, code);
|
||||
|
||||
}
|
||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
|
||||
SArray * pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
|
||||
if (pArray == NULL) {
|
||||
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
|
||||
return;
|
||||
|
@ -216,16 +214,15 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
|||
if (code != 0) {
|
||||
if (terrno != 0) code = terrno;
|
||||
vmSendRsp(pMsg, code);
|
||||
|
||||
}
|
||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
|
||||
SRpcMsg *pRpc = pMsg;
|
||||
SRpcMsg * pRpc = pMsg;
|
||||
SMsgHead *pHead = pRpc->pCont;
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -304,7 +301,7 @@ int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) {
|
||||
SMsgHead *pHead = pRpc->pCont;
|
||||
SMsgHead * pHead = pRpc->pCont;
|
||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||
if (pVnode == NULL) return -1;
|
||||
|
||||
|
|
|
@ -129,7 +129,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
||||
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
|
||||
if (msgType != TDMT_VND_SUBMIT) return 0;
|
||||
void* pIter = NULL;
|
||||
STqExec* pExec = NULL;
|
||||
|
@ -239,10 +239,9 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
|||
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
tEncodeSMqDataBlkRsp(&abuf, &rsp);
|
||||
pMsg->pCont = buf;
|
||||
pMsg->contLen = tlen;
|
||||
pMsg->code = 0;
|
||||
tmsgSendRsp(pMsg);
|
||||
|
||||
SRpcMsg resp = {.info = handleInfo, .pCont = buf, .contLen = tlen, .code = 0};
|
||||
tmsgSendRsp(&resp);
|
||||
|
||||
atomic_store_ptr(&pExec->pushHandle.handle, NULL);
|
||||
taosWUnLockLatch(&pExec->pushHandle.lock);
|
||||
|
@ -663,10 +662,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
tEncodeSMqDataBlkRsp(&abuf, &rsp);
|
||||
pMsg->pCont = buf;
|
||||
pMsg->contLen = tlen;
|
||||
pMsg->code = 0;
|
||||
tmsgSendRsp(pMsg);
|
||||
|
||||
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
|
||||
tmsgSendRsp(&resp);
|
||||
|
||||
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
|
||||
TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
|
||||
|
@ -845,12 +843,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
/*rsp.pBlockData = pRes;*/
|
||||
|
||||
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
|
||||
pMsg->pCont = buf;
|
||||
pMsg->contLen = msgLen;
|
||||
pMsg->code = 0;
|
||||
SRpcMsg resp = {.info = pMsg->info, pCont = buf, .contLen = msgLen, .code = 0};
|
||||
tqDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
|
||||
pHead->msgType, consumerId, pReq->epoch);
|
||||
tmsgSendRsp(pMsg);
|
||||
tmsgSendRsp(&resp);
|
||||
taosMemoryFree(pHead);
|
||||
return 0;
|
||||
} else {
|
||||
|
@ -878,10 +874,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
tEncodeSMqPollRspV2(&abuf, &rspV2);
|
||||
pMsg->pCont = buf;
|
||||
pMsg->contLen = tlen;
|
||||
pMsg->code = 0;
|
||||
tmsgSendRsp(pMsg);
|
||||
|
||||
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
|
||||
tmsgSendRsp(&resp);
|
||||
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
|
||||
pReq->epoch);
|
||||
/*}*/
|
||||
|
|
|
@ -162,7 +162,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
|
||||
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||
vTrace("message in fetch queue is processing");
|
||||
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
char * msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_VND_FETCH:
|
||||
|
@ -184,8 +184,12 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
case TDMT_VND_TASK_PIPE_EXEC:
|
||||
case TDMT_VND_TASK_MERGE_EXEC:
|
||||
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
|
||||
case TDMT_VND_STREAM_TRIGGER:
|
||||
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
|
||||
case TDMT_VND_STREAM_TRIGGER: {
|
||||
// refactor, avoid double free
|
||||
int code = tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
|
||||
pMsg->pCont = NULL;
|
||||
return code;
|
||||
}
|
||||
case TDMT_VND_QUERY_HEARTBEAT:
|
||||
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
|
||||
default:
|
||||
|
@ -328,12 +332,12 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
|
|||
SDecoder decoder = {0};
|
||||
int rcode = 0;
|
||||
SVCreateTbBatchReq req = {0};
|
||||
SVCreateTbReq *pCreateReq;
|
||||
SVCreateTbReq * pCreateReq;
|
||||
SVCreateTbBatchRsp rsp = {0};
|
||||
SVCreateTbRsp cRsp = {0};
|
||||
char tbName[TSDB_TABLE_FNAME_LEN];
|
||||
STbUidStore *pStore = NULL;
|
||||
SArray *tbUids = NULL;
|
||||
STbUidStore * pStore = NULL;
|
||||
SArray * tbUids = NULL;
|
||||
|
||||
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
|
||||
pRsp->code = TSDB_CODE_SUCCESS;
|
||||
|
@ -517,7 +521,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
|
|||
SDecoder decoder = {0};
|
||||
SEncoder encoder = {0};
|
||||
int ret;
|
||||
SArray *tbUids = NULL;
|
||||
SArray * tbUids = NULL;
|
||||
|
||||
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
|
||||
pRsp->pCont = NULL;
|
||||
|
@ -572,9 +576,9 @@ _exit:
|
|||
|
||||
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
STSchema *pSchema = NULL;
|
||||
STSchema * pSchema = NULL;
|
||||
tb_uid_t suid = 0;
|
||||
STSRow *row = NULL;
|
||||
STSRow * row = NULL;
|
||||
|
||||
tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
|
||||
if (blkIter.row == NULL) return 0;
|
||||
|
@ -605,8 +609,8 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub
|
|||
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
|
||||
ASSERT(pMsg != NULL);
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
SMeta *pMeta = pVnode->pMeta;
|
||||
SSubmitBlk *pBlock = NULL;
|
||||
SMeta * pMeta = pVnode->pMeta;
|
||||
SSubmitBlk * pBlock = NULL;
|
||||
|
||||
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
||||
while (true) {
|
||||
|
@ -620,10 +624,10 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char
|
|||
}
|
||||
|
||||
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
|
||||
SSubmitReq * pSubmitReq = (SSubmitReq *)pReq;
|
||||
SSubmitRsp submitRsp = {0};
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
SSubmitBlk *pBlock;
|
||||
SSubmitBlk * pBlock;
|
||||
SSubmitRsp rsp = {0};
|
||||
SVCreateTbReq createTbReq = {0};
|
||||
SDecoder decoder = {0};
|
||||
|
|
|
@ -47,7 +47,7 @@ int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code) {
|
|||
SQueryTableRsp rsp = {.code = code};
|
||||
|
||||
int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
|
||||
void *msg = rpcMallocCont(contLen);
|
||||
void * msg = rpcMallocCont(contLen);
|
||||
tSerializeSQueryTableRsp(msg, contLen, &rsp);
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
|
@ -85,7 +85,7 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn
|
|||
SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};
|
||||
|
||||
int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
|
||||
void *pRsp = rpcMallocCont(contLen);
|
||||
void * pRsp = rpcMallocCont(contLen);
|
||||
tSerializeSExplainRsp(pRsp, contLen, &rsp);
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
|
@ -104,7 +104,7 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn
|
|||
|
||||
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
|
||||
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
|
||||
void *pRsp = rpcMallocCont(contLen);
|
||||
void * pRsp = rpcMallocCont(contLen);
|
||||
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
|
@ -212,7 +212,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
|
|||
showRsp.tableMeta.numOfColumns = cols;
|
||||
|
||||
int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp);
|
||||
void *pBuf = rpcMallocCont(bufLen);
|
||||
void * pBuf = rpcMallocCont(bufLen);
|
||||
tSerializeSShowRsp(pBuf, bufLen, &showRsp);
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -341,7 +341,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t code = 0;
|
||||
SSubQueryMsg *msg = pMsg->pCont;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
|
||||
QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -361,7 +361,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
int64_t rId = msg->refId;
|
||||
|
||||
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
|
||||
char *sql = strndup(msg->msg, msg->sqlLen);
|
||||
char * sql = strndup(msg->msg, msg->sqlLen);
|
||||
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql);
|
||||
taosMemoryFreeClear(sql);
|
||||
|
||||
|
@ -378,8 +378,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
bool queryDone = false;
|
||||
SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
|
||||
bool needStop = false;
|
||||
SQWTaskCtx *handles = NULL;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SQWTaskCtx * handles = NULL;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -407,7 +407,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SResReadyReq *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -467,7 +467,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
SResFetchReq *msg = pMsg->pCont;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -505,7 +505,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
int32_t code = 0;
|
||||
STaskCancelReq *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
|
@ -542,7 +542,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t code = 0;
|
||||
STaskDropReq *msg = pMsg->pCont;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -581,7 +581,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t code = 0;
|
||||
SSchedulerHbReq req = {0};
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == pMsg->pCont) {
|
||||
QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
|
||||
|
|
|
@ -94,7 +94,9 @@ void rpcFreeCont(void* cont) {
|
|||
if (cont == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosMemoryFree((char*)cont - TRANS_MSG_OVERHEAD);
|
||||
tTrace("free mem: %p", (char*)cont - TRANS_MSG_OVERHEAD);
|
||||
}
|
||||
void* rpcReallocCont(void* ptr, int contLen) {
|
||||
if (ptr == NULL) {
|
||||
|
|
|
@ -133,6 +133,7 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
|
|||
} else {
|
||||
p->cap = p->total;
|
||||
p->buf = taosMemoryRealloc(p->buf, p->cap);
|
||||
tTrace("internal malloc mem: %p, size: %d", p->buf, p->cap);
|
||||
|
||||
uvBuf->base = p->buf + p->len;
|
||||
uvBuf->len = p->cap - p->len;
|
||||
|
|
|
@ -469,6 +469,8 @@ static void uvStartSendResp(SSrvMsg* smsg) {
|
|||
|
||||
if (pConn->broken == true) {
|
||||
// persist by
|
||||
transFreeMsg(smsg->msg.pCont);
|
||||
taosMemoryFree(smsg);
|
||||
transUnrefSrvHandle(pConn);
|
||||
return;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue