refactor: add some logs.
This commit is contained in:
parent
edc64122cb
commit
6015729560
|
@ -912,7 +912,7 @@ TEST(clientCase, subscription_test) {
|
||||||
tmq_conf_t* conf = tmq_conf_new();
|
tmq_conf_t* conf = tmq_conf_new();
|
||||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||||
tmq_conf_set(conf, "group.id", "consumer_group");
|
tmq_conf_set(conf, "group.id", "cgrpName");
|
||||||
tmq_conf_set(conf, "td.connect.user", "root");
|
tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||||
|
|
|
@ -100,8 +100,8 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
mInfo("receive consumer lost msg, consumer id %" PRId64 ", status %s", pLostMsg->consumerId,
|
mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId,
|
||||||
mndConsumerStatusName(pConsumer->status));
|
pConsumer->status, mndConsumerStatusName(pConsumer->status));
|
||||||
|
|
||||||
if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
|
if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
@ -114,9 +114,17 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
|
||||||
if (pTrans == NULL) goto FAIL;
|
if (pTrans == NULL) {
|
||||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
goto FAIL;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
}
|
||||||
|
|
||||||
|
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
tDeleteSMqConsumerObj(pConsumerNew);
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
taosMemoryFree(pConsumerNew);
|
taosMemoryFree(pConsumerNew);
|
||||||
|
@ -135,8 +143,8 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
||||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
|
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
|
||||||
ASSERT(pConsumer);
|
ASSERT(pConsumer);
|
||||||
|
|
||||||
mInfo("receive consumer recover msg, consumer id %" PRId64 ", status %s", pRecoverMsg->consumerId,
|
mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
|
||||||
mndConsumerStatusName(pConsumer->status));
|
pConsumer->status, mndConsumerStatusName(pConsumer->status));
|
||||||
|
|
||||||
if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
|
if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
@ -150,7 +158,10 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
|
||||||
if (pTrans == NULL) goto FAIL;
|
if (pTrans == NULL) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
||||||
|
|
||||||
|
@ -244,7 +255,12 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
|
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
|
||||||
int32_t status = atomic_load_32(&pConsumer->status);
|
int32_t status = atomic_load_32(&pConsumer->status);
|
||||||
if (status == MQ_CONSUMER_STATUS__READY && hbStatus > MND_CONSUMER_LOST_HB_CNT) {
|
|
||||||
|
mDebug("check for consumer:0x%"PRIx64" status:%d(%s), sub-time:%"PRId64", uptime:%"PRId64,
|
||||||
|
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime);
|
||||||
|
|
||||||
|
if (status == MQ_CONSUMER_STATUS__READY) {
|
||||||
|
if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
|
||||||
SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
|
SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
|
||||||
|
|
||||||
pLostMsg->consumerId = pConsumer->consumerId;
|
pLostMsg->consumerId = pConsumer->consumerId;
|
||||||
|
@ -255,10 +271,8 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
};
|
};
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status == MQ_CONSUMER_STATUS__READY) {
|
|
||||||
// do nothing
|
|
||||||
} else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
|
} else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
|
||||||
|
// if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers.
|
||||||
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {
|
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {
|
||||||
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
|
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
|
||||||
|
|
||||||
|
@ -379,11 +393,18 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||||
if (pConsumer == NULL) {
|
if (pConsumer == NULL) {
|
||||||
|
mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
|
||||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(strcmp(req.cgroup, pConsumer->cgroup) == 0);
|
int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
|
||||||
|
if (ret != 0) {
|
||||||
|
mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
|
||||||
|
pConsumer->cgroup);
|
||||||
|
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
atomic_store_32(&pConsumer->hbStatus, 0);
|
atomic_store_32(&pConsumer->hbStatus, 0);
|
||||||
|
|
||||||
|
@ -392,7 +413,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
|
if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
|
||||||
mInfo("try to recover consumer:0x%"PRIx64 "", consumerId);
|
mInfo("try to recover consumer:0x%"PRIx64, consumerId);
|
||||||
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
|
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
|
||||||
|
|
||||||
pRecoverMsg->consumerId = consumerId;
|
pRecoverMsg->consumerId = consumerId;
|
||||||
|
@ -401,6 +422,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
.pCont = pRecoverMsg,
|
.pCont = pRecoverMsg,
|
||||||
.contLen = sizeof(SMqConsumerRecoverMsg),
|
.contLen = sizeof(SMqConsumerRecoverMsg),
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -416,7 +438,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
// 2. check epoch, only send ep info when epochs do not match
|
// 2. check epoch, only send ep info when epochs do not match
|
||||||
if (epoch != serverEpoch) {
|
if (epoch != serverEpoch) {
|
||||||
taosRLockLatch(&pConsumer->lock);
|
taosRLockLatch(&pConsumer->lock);
|
||||||
mInfo("process ask ep, consumer:%" PRId64 "(epoch %d), server epoch %d", consumerId, epoch, serverEpoch);
|
mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch, serverEpoch);
|
||||||
int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
|
int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
|
||||||
|
|
||||||
rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
|
rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
|
||||||
|
@ -426,7 +448,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// handle all topic subscribed by the consumer
|
// handle all topics subscribed by this consumer
|
||||||
for (int32_t i = 0; i < numOfTopics; i++) {
|
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||||
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
|
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
|
||||||
|
@ -455,6 +477,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
|
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||||
int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
|
int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||||
|
|
||||||
|
// this customer assigned vgroups
|
||||||
topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
|
topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
|
||||||
if (topicEp.vgs == NULL) {
|
if (topicEp.vgs == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -484,6 +507,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
taosRUnLockLatch(&pConsumer->lock);
|
taosRUnLockLatch(&pConsumer->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
// encode rsp
|
// encode rsp
|
||||||
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
|
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
|
||||||
void *buf = rpcMallocCont(tlen);
|
void *buf = rpcMallocCont(tlen);
|
||||||
|
@ -491,6 +515,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
|
((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
|
||||||
((SMqRspHead *)buf)->epoch = serverEpoch;
|
((SMqRspHead *)buf)->epoch = serverEpoch;
|
||||||
((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
|
((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
|
||||||
|
@ -506,6 +531,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
pMsg->info.rsp = buf;
|
pMsg->info.rsp = buf;
|
||||||
pMsg->info.rspLen = tlen;
|
pMsg->info.rspLen = tlen;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
FAIL:
|
FAIL:
|
||||||
tDeleteSMqAskEpRsp(&rsp);
|
tDeleteSMqAskEpRsp(&rsp);
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
@ -547,7 +573,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t newTopicNum = taosArrayGetSize(newSub);
|
int32_t newTopicNum = taosArrayGetSize(newSub);
|
||||||
|
|
||||||
// check topic existance
|
// check topic existence
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
goto _over;
|
goto _over;
|
||||||
|
@ -718,13 +744,14 @@ CM_ENCODE_OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
SSdbRow *pRow = NULL;
|
SSdbRow *pRow = NULL;
|
||||||
SMqConsumerObj *pConsumer = NULL;
|
SMqConsumerObj *pConsumer = NULL;
|
||||||
void *buf = NULL;
|
void *buf = NULL;
|
||||||
|
|
||||||
int8_t sver = 0;
|
int8_t sver = 0;
|
||||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER;
|
if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
|
||||||
|
goto CM_DECODE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
if (sver != MND_CONSUMER_VER_NUMBER) {
|
if (sver != MND_CONSUMER_VER_NUMBER) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||||
|
@ -732,52 +759,62 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pRow = sdbAllocRow(sizeof(SMqConsumerObj));
|
pRow = sdbAllocRow(sizeof(SMqConsumerObj));
|
||||||
if (pRow == NULL) goto CM_DECODE_OVER;
|
if (pRow == NULL) {
|
||||||
|
goto CM_DECODE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
pConsumer = sdbGetRowObj(pRow);
|
pConsumer = sdbGetRowObj(pRow);
|
||||||
if (pConsumer == NULL) goto CM_DECODE_OVER;
|
if (pConsumer == NULL) {
|
||||||
|
goto CM_DECODE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
|
SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
|
||||||
buf = taosMemoryMalloc(len);
|
buf = taosMemoryMalloc(len);
|
||||||
if (buf == NULL) goto CM_DECODE_OVER;
|
if (buf == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto CM_DECODE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
|
SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
|
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
|
||||||
|
|
||||||
if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
|
if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY; // TODO set correct error code
|
||||||
goto CM_DECODE_OVER;
|
goto CM_DECODE_OVER;
|
||||||
}
|
}
|
||||||
tmsgUpdateDnodeEpSet(&pConsumer->ep);
|
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
tmsgUpdateDnodeEpSet(&pConsumer->ep);
|
||||||
|
|
||||||
CM_DECODE_OVER:
|
CM_DECODE_OVER:
|
||||||
taosMemoryFreeClear(buf);
|
taosMemoryFreeClear(buf);
|
||||||
if (terrno != TSDB_CODE_SUCCESS) {
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
mError("consumer:%" PRId64 ", failed to decode from raw:%p since %s", pConsumer == NULL ? 0 : pConsumer->consumerId,
|
mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
|
||||||
pRaw, terrstr());
|
pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
|
||||||
taosMemoryFreeClear(pRow);
|
taosMemoryFreeClear(pRow);
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
||||||
mTrace("consumer:%" PRId64 ", perform insert action", pConsumer->consumerId);
|
mDebug("consumer:0x%" PRIx64 " cgroup:%s status:%d(%s) epoch:%d load from sdb, perform insert action",
|
||||||
|
pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
|
||||||
|
pConsumer->epoch);
|
||||||
pConsumer->subscribeTime = pConsumer->upTime;
|
pConsumer->subscribeTime = pConsumer->upTime;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
||||||
mTrace("consumer:%" PRId64 ", perform delete action", pConsumer->consumerId);
|
mDebug("consumer:0x%" PRIx64 " perform delete action, status:%s", pConsumer->consumerId, mndConsumerStatusName(pConsumer->status));
|
||||||
tDeleteSMqConsumerObj(pConsumer);
|
tDeleteSMqConsumerObj(pConsumer);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
|
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
|
||||||
mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId);
|
mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%"PRId64", uptime:%"PRId64,
|
||||||
|
pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime);
|
||||||
|
|
||||||
taosWLockLatch(&pOldConsumer->lock);
|
taosWLockLatch(&pOldConsumer->lock);
|
||||||
|
|
||||||
|
@ -817,7 +854,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
|
|
||||||
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
||||||
|
|
||||||
|
int32_t status = pOldConsumer->status;
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
|
||||||
|
mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
|
||||||
|
pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
|
||||||
|
pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
|
||||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
|
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
|
||||||
ASSERT(taosArrayGetSize(pOldConsumer->currentTopics) == 0);
|
ASSERT(taosArrayGetSize(pOldConsumer->currentTopics) == 0);
|
||||||
ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
|
ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
|
||||||
|
@ -844,7 +885,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
// not exist in current topic
|
// not exist in current topic
|
||||||
bool existing = false;
|
bool existing = false;
|
||||||
#if 1
|
#if 1
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->currentTopics); i++) {
|
int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics);
|
||||||
|
for (int32_t i = 0; i < numOfExistedTopics; i++) {
|
||||||
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
|
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
|
||||||
if (strcmp(topic, addedTopic) == 0) {
|
if (strcmp(topic, addedTopic) == 0) {
|
||||||
existing = true;
|
existing = true;
|
||||||
|
@ -869,27 +911,28 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
}
|
}
|
||||||
|
|
||||||
// set status
|
// set status
|
||||||
|
int32_t status = pOldConsumer->status;
|
||||||
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
|
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
|
||||||
if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
|
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
||||||
pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
|
||||||
} else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB ||
|
} else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
|
||||||
pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) {
|
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
|
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
||||||
pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
|
||||||
} else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST ||
|
} else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
|
||||||
pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
|
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the re-balance is triggered when the new consumer is launched.
|
||||||
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
||||||
|
|
||||||
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||||
|
mDebug("consumer:0x%" PRIx64 " state %s -> %s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
|
||||||
|
pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
|
||||||
|
pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
|
||||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
|
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
|
||||||
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);
|
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);
|
||||||
ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);
|
ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);
|
||||||
|
@ -928,27 +971,27 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
ASSERT(i < sz);
|
ASSERT(i < sz);
|
||||||
|
|
||||||
// set status
|
// set status
|
||||||
|
int32_t status = pOldConsumer->status;
|
||||||
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
|
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
|
||||||
if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
|
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
||||||
pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
|
||||||
} else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB ||
|
} else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
|
||||||
pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) {
|
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
|
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
||||||
pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
|
||||||
} else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST ||
|
} else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
|
||||||
pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
|
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
||||||
|
|
||||||
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||||
|
|
||||||
|
mDebug("consumer:0x%" PRIx64 " state %s -> %s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
|
||||||
|
pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
|
||||||
|
pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pOldConsumer->lock);
|
taosWUnLockLatch(&pOldConsumer->lock);
|
||||||
|
@ -1036,8 +1079,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
if (hasTopic) {
|
if (hasTopic) {
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
|
const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
|
||||||
tstrncpy(varDataVal(topic), topicName, TSDB_TOPIC_FNAME_LEN);
|
STR_TO_VARSTR(topic, topicName);
|
||||||
varDataSetLen(topic, strlen(varDataVal(topic)));
|
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)topic, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)topic, false);
|
||||||
} else {
|
} else {
|
||||||
colDataAppend(pColInfo, numOfRows, NULL, true);
|
colDataAppend(pColInfo, numOfRows, NULL, true);
|
||||||
|
|
|
@ -561,17 +561,17 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
mInfo("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
|
mInfo("topic:%s start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
|
||||||
|
|
||||||
if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
|
if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
|
||||||
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
|
mError("topic:%s failed to create since %s", createTopicReq.name, terrstr());
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
|
pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
|
||||||
if (pTopic != NULL) {
|
if (pTopic != NULL) {
|
||||||
if (createTopicReq.igExists) {
|
if (createTopicReq.igExists) {
|
||||||
mInfo("topic:%s, already exist, ignore exist is set", createTopicReq.name);
|
mInfo("topic:%s already exist, ignore exist is set", createTopicReq.name);
|
||||||
code = 0;
|
code = 0;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} else {
|
} else {
|
||||||
|
@ -731,8 +731,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
if (pTopic->ntbUid != 0) {
|
if (pTopic->ntbUid != 0) {
|
||||||
// broadcast to all vnode
|
// broadcast to all vnode
|
||||||
void *pIter = NULL;
|
pIter = NULL;
|
||||||
SVgObj *pVgroup = NULL;
|
SVgObj *pVgroup = NULL;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
|
@ -746,7 +746,10 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pHandle, pIter);
|
pIter = taosHashIterate(pTq->pHandle, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
STqHandle* pExec = (STqHandle*)pIter;
|
STqHandle* pExec = (STqHandle*)pIter;
|
||||||
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
int32_t code = qUpdateQualifiedTableId(pExec->execHandle.task, tbUidList, isAdd);
|
int32_t code = qUpdateQualifiedTableId(pExec->execHandle.task, tbUidList, isAdd);
|
||||||
|
|
|
@ -279,11 +279,15 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
|
||||||
|
|
||||||
static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) {
|
static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) {
|
||||||
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
|
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
|
||||||
|
int32_t numOfUids = taosArrayGetSize(tableIdList);
|
||||||
|
if (numOfUids == 0) {
|
||||||
|
return qa;
|
||||||
|
}
|
||||||
|
|
||||||
// let's discard the tables those are not created according to the queried super table.
|
// let's discard the tables those are not created according to the queried super table.
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
|
metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
|
for (int32_t i = 0; i < numOfUids; ++i) {
|
||||||
uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
|
uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
|
||||||
|
|
||||||
int32_t code = metaGetTableEntryByUid(&mr, *id);
|
int32_t code = metaGetTableEntryByUid(&mr, *id);
|
||||||
|
|
Loading…
Reference in New Issue