fix(tmq): fix memory error and adjust some logs.
This commit is contained in:
parent
b4787cf53e
commit
da550029d8
|
@ -1291,7 +1291,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req offset:%" PRId64 ", rsp offset:%" PRId64 " type %d, reqId:0x%"PRIx64,
|
||||
tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
|
||||
rspType, pParam->requestId);
|
||||
rspType, requestId);
|
||||
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
|
||||
|
|
|
@ -35,9 +35,9 @@ static const char *mndConsumerStatusName(int status);
|
|||
|
||||
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
|
||||
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
|
||||
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
|
||||
static int32_t mndRetrieveConsumer(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
|
||||
|
||||
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
|
||||
|
@ -712,7 +712,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
|
||||
// no topics need to be rebalanced
|
||||
if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
|
||||
// mInfo();
|
||||
goto _over;
|
||||
}
|
||||
|
||||
|
|
|
@ -444,7 +444,9 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
|
||||
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
|
||||
if (pTrans == NULL) return -1;
|
||||
if (pTrans == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
|
||||
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
|
||||
|
@ -616,9 +618,9 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
|
||||
// if add more consumer to balanced subscribe,
|
||||
// possibly no vg is changed
|
||||
|
||||
// when each topic is re-balanced, issue an trans to save the results in sdb.
|
||||
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
|
||||
mError("mq re-balance persist re-balance output error, possibly vnode splitted or dropped");
|
||||
mError("mq re-balance persist output error, possibly vnode splitted or dropped");
|
||||
}
|
||||
|
||||
taosArrayDestroy(pRebInfo->lostConsumers);
|
||||
|
|
|
@ -875,6 +875,7 @@ int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
if (mndCheckTransConflict(pMnode, pTrans)) {
|
||||
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
|
|
|
@ -79,15 +79,15 @@ CFG_DIR=$PRG_DIR/cfg
|
|||
LOG_DIR=$PRG_DIR/log
|
||||
|
||||
echo "------------------------------------------------------------------------"
|
||||
echo "TOP_DIR: $TOP_DIR"
|
||||
echo "BUILD_DIR: $BUILD_DIR"
|
||||
echo "SIM_DIR : $SIM_DIR"
|
||||
echo "CFG_DIR : $CFG_DIR"
|
||||
|
||||
|
||||
echo "PROGRAM: $PROGRAM
|
||||
echo "CFG_DIR: $CFG_DIR
|
||||
echo "POLL_DELAY: $POLL_DELAY
|
||||
echo "DB_NAME: $DB_NAME
|
||||
echo "PROGRAM: $PROGRAM"
|
||||
echo "CFG_DIR: $CFG_DIR"
|
||||
echo "POLL_DELAY: $POLL_DELAY"
|
||||
echo "DB_NAME: $DB_NAME"
|
||||
|
||||
echo "------------------------------------------------------------------------"
|
||||
if [ "$EXEC_OPTON" = "start" ]; then
|
||||
|
|
Loading…
Reference in New Issue