Merge pull request #20094 from taosdata/feature/3_liaohj
fix(tmq): update the epset when the leader of vnode changed.
This commit is contained in:
commit
5b1c2fc314
|
@ -189,27 +189,46 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
|||
|
||||
tmq_t* build_consumer() {
|
||||
tmq_conf_res_t code;
|
||||
tmq_t* tmq = NULL;
|
||||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
code = tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
if (TMQ_CONF_OK != code) {
|
||||
goto _end;
|
||||
}
|
||||
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
if (TMQ_CONF_OK != code) {
|
||||
goto _end;
|
||||
}
|
||||
code = tmq_conf_set(conf, "group.id", "cgrpName");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
if (TMQ_CONF_OK != code) {
|
||||
goto _end;
|
||||
}
|
||||
code = tmq_conf_set(conf, "client.id", "user defined name");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
if (TMQ_CONF_OK != code) {
|
||||
goto _end;
|
||||
}
|
||||
code = tmq_conf_set(conf, "td.connect.user", "root");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
if (TMQ_CONF_OK != code) {
|
||||
goto _end;
|
||||
}
|
||||
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
if (TMQ_CONF_OK != code) {
|
||||
goto _end;
|
||||
}
|
||||
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
if (TMQ_CONF_OK != code) {
|
||||
goto _end;
|
||||
}
|
||||
code = tmq_conf_set(conf, "experimental.snapshot.enable", "false");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
if (TMQ_CONF_OK != code) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
_end:
|
||||
tmq_conf_destroy(conf);
|
||||
return tmq;
|
||||
}
|
||||
|
|
|
@ -27,13 +27,14 @@ typedef struct SCorEpSet {
|
|||
SEpSet epSet;
|
||||
} SCorEpSet;
|
||||
|
||||
#define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse]))
|
||||
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
|
||||
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
|
||||
|
||||
bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2);
|
||||
|
||||
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet);
|
||||
SEpSet getEpSet_s(SCorEpSet* pEpSet);
|
||||
bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2);
|
||||
void epsetAssign(SEpSet* dst, const SEpSet* pSrc);
|
||||
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet);
|
||||
SEpSet getEpSet_s(SCorEpSet* pEpSet);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -1830,10 +1830,10 @@ typedef struct {
|
|||
} SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg;
|
||||
|
||||
typedef struct {
|
||||
int64_t consumerId;
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
char clientId[256];
|
||||
SArray* topicNames; // SArray<char**>
|
||||
int64_t consumerId;
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
char clientId[256];
|
||||
SArray* topicNames; // SArray<char**>
|
||||
} SCMSubscribeReq;
|
||||
|
||||
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
|
||||
|
|
|
@ -58,15 +58,14 @@ struct tmq_list_t {
|
|||
};
|
||||
|
||||
struct tmq_conf_t {
|
||||
char clientId[256];
|
||||
char groupId[TSDB_CGROUP_LEN];
|
||||
int8_t autoCommit;
|
||||
int8_t resetOffset;
|
||||
int8_t withTbName;
|
||||
int8_t snapEnable;
|
||||
int32_t snapBatchSize;
|
||||
bool hbBgEnable;
|
||||
|
||||
char clientId[256];
|
||||
char groupId[TSDB_CGROUP_LEN];
|
||||
int8_t autoCommit;
|
||||
int8_t resetOffset;
|
||||
int8_t withTbName;
|
||||
int8_t snapEnable;
|
||||
int32_t snapBatchSize;
|
||||
bool hbBgEnable;
|
||||
uint16_t port;
|
||||
int32_t autoCommitInterval;
|
||||
char* ip;
|
||||
|
@ -213,6 +212,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
SMqCommitCbParamSet* params;
|
||||
STqOffset* pOffset;
|
||||
SMqClientVg* pMqVg;
|
||||
/*char topicName[TSDB_TOPIC_FNAME_LEN];*/
|
||||
/*int32_t vgId;*/
|
||||
} SMqCommitCbParam;
|
||||
|
@ -422,7 +422,6 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
|
|||
|
||||
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
|
||||
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
|
||||
ASSERT(waitingRspNum >= 0);
|
||||
if (waitingRspNum == 0) {
|
||||
tmqCommitDone(pParamSet);
|
||||
}
|
||||
|
@ -440,6 +439,17 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
|||
}
|
||||
#endif
|
||||
|
||||
// there may be race condition. fix it
|
||||
if (pBuf->pEpSet != NULL && pParam->pMqVg != NULL) {
|
||||
SMqClientVg* pMqVg = pParam->pMqVg;
|
||||
|
||||
SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet);
|
||||
SEp* pOld = GET_ACTIVE_EP(&(pMqVg->epSet));
|
||||
uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pMqVg->vgId,
|
||||
pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
|
||||
pParam->pMqVg->epSet = *pBuf->pEpSet;
|
||||
}
|
||||
|
||||
taosMemoryFree(pParam->pOffset);
|
||||
taosMemoryFree(pBuf->pData);
|
||||
taosMemoryFree(pBuf->pEpSet);
|
||||
|
@ -448,7 +458,6 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
|||
* pOffset->version);*/
|
||||
|
||||
tmqCommitRspCountDown(pParamSet);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -498,6 +507,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
|
|||
|
||||
pParam->params = pParamSet;
|
||||
pParam->pOffset = pOffset;
|
||||
pParam->pMqVg = pVg; // there may be an race condition
|
||||
|
||||
// build send info
|
||||
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
|
@ -518,7 +528,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
|
|||
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64" prev:%"PRId64", ep:%s:%d", tmq->consumerId, pOffset->subKey,
|
||||
pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port);
|
||||
|
||||
// TODO: put into cb
|
||||
// TODO: put into cb, the commit offset should be move to the callback function
|
||||
pVg->committedOffset = pVg->currentOffset;
|
||||
|
||||
pMsgSendInfo->requestId = generateRequestId();
|
||||
|
@ -540,7 +550,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
|
|||
int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
|
||||
char* topic;
|
||||
int32_t vgId;
|
||||
ASSERT(msg != NULL);
|
||||
|
||||
if (TD_RES_TMQ(msg)) {
|
||||
SMqRspObj* pRspObj = (SMqRspObj*)msg;
|
||||
topic = pRspObj->topic;
|
||||
|
@ -983,14 +993,12 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
|
||||
const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
|
||||
|
||||
ASSERT(conf->groupId[0]);
|
||||
|
||||
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
|
||||
pTmq->mqueue = taosOpenQueue();
|
||||
pTmq->qall = taosAllocateQall();
|
||||
pTmq->delayedTask = taosOpenQueue();
|
||||
|
||||
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
|
||||
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL || conf->groupId[0] == 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
|
||||
pTmq->groupId);
|
||||
|
@ -1069,7 +1077,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
SCMSubscribeReq req = {0};
|
||||
int32_t code = 0;
|
||||
|
||||
tscDebug("consumer:0x%"PRIx64" tmq subscribe start, numOfTopic %d", tmq->consumerId, sz);
|
||||
tscDebug("consumer:0x%"PRIx64" subscribe %d topics", tmq->consumerId, sz);
|
||||
|
||||
req.consumerId = tmq->consumerId;
|
||||
tstrncpy(req.clientId, tmq->clientId, 256);
|
||||
|
@ -1092,7 +1100,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
}
|
||||
|
||||
tNameExtractFullName(&name, topicFName);
|
||||
tscDebug("consumer:0x%"PRIx64" subscribe topic: %s", tmq->consumerId, topicFName);
|
||||
tscDebug("consumer:0x%"PRIx64" subscribe topic:%s", tmq->consumerId, topicFName);
|
||||
|
||||
taosArrayPush(req.topicNames, &topicFName);
|
||||
}
|
||||
|
@ -1763,6 +1771,8 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
|
|||
}
|
||||
|
||||
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||
tscDebug("consumer:0x%"PRIx64" start to handle the rsp", tmq->consumerId);
|
||||
|
||||
while (1) {
|
||||
SMqRspWrapper* rspWrapper = NULL;
|
||||
taosGetQitem(tmq->qall, (void**)&rspWrapper);
|
||||
|
@ -1772,20 +1782,17 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
taosGetQitem(tmq->qall, (void**)&rspWrapper);
|
||||
|
||||
if (rspWrapper == NULL) {
|
||||
/*tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId);*/
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " handle rsp %p", tmq->consumerId, rspWrapper);
|
||||
|
||||
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
|
||||
taosFreeQitem(rspWrapper);
|
||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||
return NULL;
|
||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
|
||||
tscDebug("consumer:0x%" PRIx64 " actual process poll rsp", tmq->consumerId);
|
||||
tscDebug("consumer:0x%" PRIx64 " process poll rsp", tmq->consumerId);
|
||||
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
|
||||
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
||||
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
|
||||
|
@ -1812,6 +1819,9 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
|
||||
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);
|
||||
|
||||
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
|
||||
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
||||
/*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
|
||||
|
@ -1876,7 +1886,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|||
void* rspObj;
|
||||
int64_t startTime = taosGetTimestampMs();
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 ", start poll at %" PRId64, tmq->consumerId, startTime);
|
||||
tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime);
|
||||
|
||||
#if 0
|
||||
tmqHandleAllDelayedTask(tmq);
|
||||
|
@ -1889,7 +1899,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|||
|
||||
// in no topic status, delayed task also need to be processed
|
||||
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
|
||||
tscDebug("consumer:0x%" PRIx64 ", poll return since consumer is init", tmq->consumerId);
|
||||
tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -1915,25 +1925,25 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|||
|
||||
rspObj = tmqHandleAllRsp(tmq, timeout, false);
|
||||
if (rspObj) {
|
||||
tscDebug("consumer:0x%" PRIx64 ", return rsp %p", tmq->consumerId, rspObj);
|
||||
tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
|
||||
return (TAOS_RES*)rspObj;
|
||||
} else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
||||
tscDebug("consumer:0x%" PRIx64 ", return null since no committed offset", tmq->consumerId);
|
||||
tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (timeout != -1) {
|
||||
int64_t currentTime = taosGetTimestampMs();
|
||||
int64_t passedTime = currentTime - startTime;
|
||||
if (passedTime > timeout) {
|
||||
tscDebug("consumer:0x%" PRIx64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
|
||||
int64_t elapsedTime = currentTime - startTime;
|
||||
if (elapsedTime > timeout) {
|
||||
tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
|
||||
tmq->consumerId, tmq->epoch, startTime, currentTime);
|
||||
return NULL;
|
||||
}
|
||||
/*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
|
||||
/*", left time %" PRId64,*/
|
||||
/*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/
|
||||
tsem_timewait(&tmq->rspSem, (timeout - passedTime));
|
||||
/*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - elapsedTime));*/
|
||||
tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
|
||||
} else {
|
||||
// use tsem_timewait instead of tsem_wait to avoid unexpected stuck
|
||||
tsem_timewait(&tmq->rspSem, 1000);
|
||||
|
|
|
@ -60,6 +60,19 @@ bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2) {
|
|||
return true;
|
||||
}
|
||||
|
||||
void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) {
|
||||
if (pSrc == NULL || pDst == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
pDst->inUse = pSrc->inUse;
|
||||
pDst->numOfEps = pSrc->numOfEps;
|
||||
for (int32_t i = 0; i < pSrc->numOfEps; ++i) {
|
||||
pDst->eps[i].port = pSrc->eps[i].port;
|
||||
tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn));
|
||||
}
|
||||
}
|
||||
|
||||
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) {
|
||||
taosCorBeginWrite(&pEpSet->version);
|
||||
pEpSet->epSet = *pNewEpSet;
|
||||
|
|
|
@ -238,7 +238,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
|||
// iterate all consumers, find all modification
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||
if (pIter == NULL) break;
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
|
||||
int32_t status = atomic_load_32(&pConsumer->status);
|
||||
|
@ -335,7 +337,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
|||
int64_t consumerId = req.consumerId;
|
||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
mError("consumer %" PRId64 " not exist", consumerId);
|
||||
mError("consumer:0x%"PRIx64 " not exist", consumerId);
|
||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
@ -345,7 +347,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
|||
int32_t status = atomic_load_32(&pConsumer->status);
|
||||
|
||||
if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
|
||||
mInfo("try to recover consumer %" PRId64 "", consumerId);
|
||||
mInfo("try to recover consumer:0x%"PRIx64 "", consumerId);
|
||||
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
|
||||
|
||||
pRecoverMsg->consumerId = consumerId;
|
||||
|
@ -390,7 +392,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
|||
|
||||
#if 1
|
||||
if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
|
||||
mInfo("try to recover consumer %" PRId64 "", consumerId);
|
||||
mInfo("try to recover consumer:0x%"PRIx64 "", consumerId);
|
||||
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
|
||||
|
||||
pRecoverMsg->consumerId = consumerId;
|
||||
|
@ -404,14 +406,14 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
|||
#endif
|
||||
|
||||
if (status != MQ_CONSUMER_STATUS__READY) {
|
||||
mInfo("consumer %" PRId64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
|
||||
mInfo("consumer:0x%"PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
|
||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
|
||||
|
||||
// 2. check epoch, only send ep info when epoches do not match
|
||||
// 2. check epoch, only send ep info when epochs do not match
|
||||
if (epoch != serverEpoch) {
|
||||
taosRLockLatch(&pConsumer->lock);
|
||||
mInfo("process ask ep, consumer:%" PRId64 "(epoch %d), server epoch %d", consumerId, epoch, serverEpoch);
|
||||
|
@ -526,12 +528,14 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
char *msgStr = pMsg->pCont;
|
||||
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
char *msgStr = pMsg->pCont;
|
||||
|
||||
SCMSubscribeReq subscribe = {0};
|
||||
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
|
||||
int64_t consumerId = subscribe.consumerId;
|
||||
|
||||
uint64_t consumerId = subscribe.consumerId;
|
||||
char *cgroup = subscribe.cgroup;
|
||||
SMqConsumerObj *pConsumerOld = NULL;
|
||||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
|
@ -542,21 +546,23 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
taosArrayRemoveDuplicateP(newSub, taosArrayCompareString, taosMemoryFree);
|
||||
|
||||
int32_t newTopicNum = taosArrayGetSize(newSub);
|
||||
|
||||
// check topic existance
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
|
||||
if (pTrans == NULL) goto SUBSCRIBE_OVER;
|
||||
if (pTrans == NULL) {
|
||||
goto _over;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < newTopicNum; i++) {
|
||||
char *topic = taosArrayGetP(newSub, i);
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||
if (pTopic == NULL) {
|
||||
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
|
||||
goto SUBSCRIBE_OVER;
|
||||
if (pTopic == NULL) { // terrno has been set by callee function
|
||||
goto _over;
|
||||
}
|
||||
|
||||
if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) {
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
goto SUBSCRIBE_OVER;
|
||||
goto _over;
|
||||
}
|
||||
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
@ -578,8 +584,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
|
||||
}
|
||||
|
||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
|
||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
|
||||
|
||||
} else {
|
||||
/*taosRLockLatch(&pConsumerOld->lock);*/
|
||||
|
@ -591,13 +597,13 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
|
||||
if (status != MQ_CONSUMER_STATUS__READY) {
|
||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||
goto SUBSCRIBE_OVER;
|
||||
goto _over;
|
||||
}
|
||||
|
||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
||||
if (pConsumerNew == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto SUBSCRIBE_OVER;
|
||||
goto _over;
|
||||
}
|
||||
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
||||
|
||||
|
@ -650,16 +656,16 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
/*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/
|
||||
/*pConsumerNew->updateType = */
|
||||
/*}*/
|
||||
goto SUBSCRIBE_OVER;
|
||||
goto _over;
|
||||
}
|
||||
|
||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
|
||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
|
||||
}
|
||||
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
SUBSCRIBE_OVER:
|
||||
_over:
|
||||
mndTransDrop(pTrans);
|
||||
|
||||
if (pConsumerOld) {
|
||||
|
@ -971,16 +977,19 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
|||
|
||||
while (numOfRows < rowsCapacity) {
|
||||
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
|
||||
if (pShow->pIter == NULL) break;
|
||||
if (pShow->pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
|
||||
mDebug("showing consumer %" PRId64 " no assigned topic, skip", pConsumer->consumerId);
|
||||
mDebug("showing consumer:0x%"PRIx64 " no assigned topic, skip", pConsumer->consumerId);
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
continue;
|
||||
}
|
||||
|
||||
taosRLockLatch(&pConsumer->lock);
|
||||
|
||||
mDebug("showing consumer %" PRId64, pConsumer->consumerId);
|
||||
mDebug("showing consumer:0x%"PRIx64, pConsumer->consumerId);
|
||||
|
||||
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
|
||||
bool hasTopic = true;
|
||||
|
|
|
@ -523,7 +523,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
SSdb* pSdb = pMnode->pSdb;
|
||||
SVgObj* pVgroup = NULL;
|
||||
SQueryPlan* pPlan = NULL;
|
||||
SSubplan* plan = NULL;
|
||||
SSubplan* pSubplan = NULL;
|
||||
|
||||
if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
pPlan = qStringToQueryPlan(pTopic->physicalPlan);
|
||||
|
@ -539,24 +539,27 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
return -1;
|
||||
}
|
||||
|
||||
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
|
||||
SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
|
||||
|
||||
int32_t opNum = LIST_LENGTH(inner->pNodeList);
|
||||
int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
|
||||
if (opNum != 1) {
|
||||
qDestroyQueryPlan(pPlan);
|
||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
|
||||
return -1;
|
||||
}
|
||||
plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
||||
|
||||
pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
|
||||
}
|
||||
|
||||
ASSERT(pSub->unassignedVgs);
|
||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
||||
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
|
@ -569,15 +572,15 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
pVgEp->vgId = pVgroup->vgId;
|
||||
taosArrayPush(pSub->unassignedVgs, &pVgEp);
|
||||
|
||||
mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId);
|
||||
mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
|
||||
|
||||
if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
int32_t msgLen;
|
||||
|
||||
plan->execNode.epSet = pVgEp->epSet;
|
||||
plan->execNode.nodeId = pVgEp->vgId;
|
||||
pSubplan->execNode.epSet = pVgEp->epSet;
|
||||
pSubplan->execNode.nodeId = pVgEp->vgId;
|
||||
|
||||
if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) {
|
||||
if (qSubPlanToString(pSubplan, &pVgEp->qmsg, &msgLen) < 0) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
qDestroyQueryPlan(pPlan);
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
|
@ -590,11 +593,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
ASSERT(pSub->unassignedVgs->size > 0);
|
||||
|
||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
||||
|
||||
ASSERT(taosArrayGetSize(pSub->unassignedVgs) > 0);
|
||||
qDestroyQueryPlan(pPlan);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -39,12 +39,10 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
|
|||
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
|
||||
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
|
||||
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
|
||||
|
||||
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg);
|
||||
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg);
|
||||
|
||||
static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
|
||||
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg);
|
||||
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg);
|
||||
static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
|
||||
|
||||
static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
|
||||
SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
|
||||
|
@ -85,12 +83,13 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
|
|||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) {
|
||||
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) {
|
||||
SMqSubscribeObj *pSub = tNewSubscribeObj(subKey);
|
||||
if (pSub == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pSub->dbUid = pTopic->dbUid;
|
||||
pSub->stbUid = pTopic->stbUid;
|
||||
pSub->subType = pTopic->subType;
|
||||
|
@ -205,7 +204,7 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
|||
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
|
||||
int32_t totalVgNum = pOutput->pSub->vgNum;
|
||||
const char *sub = pOutput->pSub->key;
|
||||
mInfo("sub:%s, mq rebalance vgNum:%d", sub, pOutput->pSub->vgNum);
|
||||
mInfo("sub:%s mq re-balance %d vgroups", sub, pOutput->pSub->vgNum);
|
||||
|
||||
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
|
||||
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
|
@ -214,7 +213,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
int32_t removedNum = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
|
||||
int32_t actualRemoved = 0;
|
||||
for (int32_t i = 0; i < removedNum; i++) {
|
||||
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
|
||||
uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
|
||||
|
||||
SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||
|
||||
|
@ -229,7 +228,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
.pVgEp = pVgEp,
|
||||
};
|
||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||
mInfo("sub:%s, mq rebalance remove vgId:%d from consumer:%" PRId64, sub, pVgEp->vgId, consumerId);
|
||||
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRId64, sub, pVgEp->vgId, consumerId);
|
||||
}
|
||||
taosArrayDestroy(pConsumerEp->vgs);
|
||||
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||
|
@ -239,7 +238,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
}
|
||||
|
||||
if (removedNum != actualRemoved) {
|
||||
mError("sub:%s, mq rebalance removedNum:%d not matched with actual:%d", sub, removedNum, actualRemoved);
|
||||
mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", sub, removedNum, actualRemoved);
|
||||
}
|
||||
|
||||
// if previously no consumer, there are vgs not assigned
|
||||
|
@ -253,7 +252,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
.pVgEp = pVgEp,
|
||||
};
|
||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
|
||||
mInfo("sub:%s, mq rebalance remove vgId:%d from unassigned", sub, pVgEp->vgId);
|
||||
mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", sub, pVgEp->vgId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -267,7 +266,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
minVgCnt = totalVgNum / afterRebConsumerNum;
|
||||
imbConsumerNum = totalVgNum % afterRebConsumerNum;
|
||||
}
|
||||
mInfo("sub:%s, mq rebalance %d consumer after rebalance, at least %d vg each, %d consumer has more vg", sub,
|
||||
|
||||
mInfo("sub:%s mq re-balance %d consumers: at least %d vg each, %d consumer has more vg", sub,
|
||||
afterRebConsumerNum, minVgCnt, imbConsumerNum);
|
||||
|
||||
// 4. first scan: remove consumer more than wanted, put to remove hash
|
||||
|
@ -275,7 +275,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
void *pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
|
||||
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||
|
@ -297,7 +300,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
.pVgEp = pVgEp,
|
||||
};
|
||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||
mInfo("sub:%s, mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
|
||||
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
imbCnt++;
|
||||
|
@ -312,7 +315,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
.pVgEp = pVgEp,
|
||||
};
|
||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||
mInfo("sub:%s, mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
|
||||
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
}
|
||||
|
@ -330,7 +333,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
||||
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
||||
taosArrayPush(pOutput->newConsumers, &consumerId);
|
||||
mInfo("sub:%s, mq rebalance add new consumer:%" PRId64, sub, consumerId);
|
||||
mInfo("sub:%s mq rebalance add new consumer:%" PRId64, sub, consumerId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -349,7 +352,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
// iter hash and find one vg
|
||||
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
||||
if (pRemovedIter == NULL) {
|
||||
mError("sub:%s, removed iter is null", sub);
|
||||
mError("sub:%s removed iter is null", sub);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -402,33 +405,36 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
SMqRebOutputVg *pRebOutput = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
pRebOutput = (SMqRebOutputVg *)pIter;
|
||||
|
||||
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
|
||||
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
||||
mInfo("sub:%s, mq rebalance unassign vgId:%d (second scan)", sub, pRebOutput->pVgEp->vgId);
|
||||
mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", sub, pRebOutput->pVgEp->vgId);
|
||||
}
|
||||
}
|
||||
|
||||
// 8. generate logs
|
||||
mInfo("sub:%s, mq rebalance calculation completed, rebalanced vg", sub);
|
||||
mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", sub);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
|
||||
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
|
||||
mInfo("sub:%s, mq rebalance vgId:%d, moved from consumer:%" PRId64 ", to consumer:%" PRId64, sub,
|
||||
mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, sub,
|
||||
pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
|
||||
}
|
||||
{
|
||||
void *pIter = NULL;
|
||||
pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
||||
mInfo("sub:%s, mq rebalance final cfg: consumer %" PRId64 " has %d vg", sub, pConsumerEp->consumerId, sz);
|
||||
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRId64 " has %d vg", sub, pConsumerEp->consumerId, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
||||
mInfo("sub:%s, mq rebalance final cfg: vg %d to consumer %" PRId64 "", sub, pVgEp->vgId,
|
||||
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRId64, sub, pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
}
|
||||
|
@ -552,11 +558,14 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
SMqDoRebalanceMsg *pReq = pMsg->pCont;
|
||||
void *pIter = NULL;
|
||||
|
||||
mInfo("mq rebalance start");
|
||||
mInfo("mq re-balance start");
|
||||
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pReq->rebSubHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SMqRebInputObj rebInput = {0};
|
||||
|
||||
SMqRebOutputObj rebOutput = {0};
|
||||
|
@ -577,12 +586,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||
if (pTopic == NULL) {
|
||||
mError("mq rebalance %s failed since topic %s not exist, abort", pRebInfo->key, topic);
|
||||
mError("mq re-balance %s ignored since topic %s not exist", pRebInfo->key, topic);
|
||||
continue;
|
||||
}
|
||||
|
||||
taosRLockLatch(&pTopic->lock);
|
||||
|
||||
rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key);
|
||||
rebOutput.pSub = mndCreateSubscription(pMnode, pTopic, pRebInfo->key);
|
||||
|
||||
if (rebOutput.pSub == NULL) {
|
||||
mError("mq rebalance %s failed create sub since %s, abort", pRebInfo->key, terrstr());
|
||||
|
@ -605,15 +615,16 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
if (mndDoRebalance(pMnode, &rebInput, &rebOutput) < 0) {
|
||||
mError("mq rebalance internal error");
|
||||
mError("mq re-balance internal error");
|
||||
}
|
||||
|
||||
// if add more consumer to balanced subscribe,
|
||||
// possibly no vg is changed
|
||||
|
||||
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
|
||||
mError("mq rebalance persist rebalance output error, possibly vnode splitted or dropped");
|
||||
mError("mq re-balance persist re-balance output error, possibly vnode splitted or dropped");
|
||||
}
|
||||
|
||||
taosArrayDestroy(pRebInfo->lostConsumers);
|
||||
taosArrayDestroy(pRebInfo->newConsumers);
|
||||
taosArrayDestroy(pRebInfo->removedConsumers);
|
||||
|
@ -627,19 +638,18 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
// reset flag
|
||||
mInfo("mq rebalance completed successfully");
|
||||
mInfo("mq re-balance completed successfully");
|
||||
taosHashCleanup(pReq->rebSubHash);
|
||||
mndRebEnd();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
SMDropCgroupReq dropReq = {0};
|
||||
|
||||
if (tDeserializeSMDropCgroupReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
||||
if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
@ -663,7 +673,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "drop-cgroup");
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
|
||||
if (pTrans == NULL) {
|
||||
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
|
@ -956,7 +966,7 @@ END:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
||||
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
|
@ -1090,7 +1100,7 @@ static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
|||
return numOfRows;
|
||||
}
|
||||
|
||||
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
|
||||
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
|
||||
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
|
||||
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
|
||||
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
|
||||
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic);
|
||||
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq);
|
||||
|
||||
|
|
|
@ -148,7 +148,7 @@ bool tsdbDelFileIsSame(SDelFile *pDelFile1, SDelFile *pDelFile2) { return pDelFi
|
|||
|
||||
int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) {
|
||||
int32_t code = 0;
|
||||
int64_t size;
|
||||
int64_t size = 0;
|
||||
int64_t n;
|
||||
TdFilePtr pFD;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
@ -167,7 +167,7 @@ int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) {
|
|||
tPutSmaFile(hdr, pSet->pSmaF);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
goto _err; // make the coverity scan happy
|
||||
}
|
||||
|
||||
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);
|
||||
|
|
|
@ -220,6 +220,8 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil
|
|||
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
|
||||
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
|
||||
|
||||
static STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id);
|
||||
|
||||
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);
|
||||
|
||||
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
||||
|
@ -699,13 +701,11 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
|
|||
|
||||
if (pBlockIdx->uid == pList->tableUidList[j]) {
|
||||
// this block belongs to a table that is not queried.
|
||||
void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
|
||||
if (p == NULL) {
|
||||
tsdbError("failed to locate the tableBlockScan Info in hashmap, uid:%"PRIu64", %s", pBlockIdx->uid, pReader->idStr);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr);
|
||||
if (pScanInfo == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
|
||||
if (pScanInfo->pBlockList == NULL) {
|
||||
pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
|
||||
}
|
||||
|
@ -753,9 +753,10 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
|||
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
|
||||
|
||||
STableBlockScanInfo* pScanInfo =
|
||||
*(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
|
||||
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr);
|
||||
if (pScanInfo == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
tMapDataReset(&pScanInfo->mapData);
|
||||
tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
|
||||
|
@ -854,9 +855,7 @@ static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int or
|
|||
s = pos;
|
||||
|
||||
// check
|
||||
assert(pos >= 0 && pos < num);
|
||||
assert(num > 0);
|
||||
|
||||
ASSERT(pos >= 0 && pos < num && num > 0);
|
||||
if (order == TSDB_ORDER_ASC) {
|
||||
// find the first position which is smaller than the key
|
||||
e = num - 1;
|
||||
|
@ -1257,14 +1256,13 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
|
|||
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
|
||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
|
||||
if (pBlockInfo != NULL) {
|
||||
STableBlockScanInfo** pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pBlockIter->pTableMap, pBlockInfo->uid, idStr);
|
||||
if (pScanInfo == NULL) {
|
||||
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SBlockIndex* pIndex = taosArrayGet((*pScanInfo)->pBlockList, pBlockInfo->tbBlockIdx);
|
||||
tMapDataGetItemByIdx(&(*pScanInfo)->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
|
||||
SBlockIndex* pIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
|
||||
tMapDataGetItemByIdx(&pScanInfo->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
@ -2507,16 +2505,11 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
|
||||
STableBlockScanInfo* pBlockScanInfo = NULL;
|
||||
if (pBlockInfo != NULL) {
|
||||
void* p = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||
if (p == NULL) {
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
|
||||
taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
|
||||
pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
|
||||
if (pBlockScanInfo == NULL) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
pBlockScanInfo = *(STableBlockScanInfo**)p;
|
||||
|
||||
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
|
||||
|
||||
|
@ -2855,13 +2848,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
|
||||
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
||||
|
||||
ASSERT(pBlockInfo != NULL);
|
||||
|
||||
pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
|
||||
if (pScanInfo == NULL) {
|
||||
tsdbError("failed to get table scan-info, %s", pReader->idStr);
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
return code;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pBlock = getCurrentBlock(pBlockIter);
|
||||
|
@ -4202,7 +4191,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
|
|||
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||
if (pResBlock->pBlockAgg == NULL) {
|
||||
size_t num = taosArrayGetSize(pResBlock->pDataBlock);
|
||||
pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg));
|
||||
pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES);
|
||||
}
|
||||
|
||||
// do fill all null column value SMA info
|
||||
|
@ -4232,6 +4221,18 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
|
|||
return code;
|
||||
}
|
||||
|
||||
STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id) {
|
||||
STableBlockScanInfo** p = taosHashGet(pTableMap, &uid, sizeof(uid));
|
||||
if (p == NULL || *p == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
int32_t size = taosHashGetSize(pTableMap);
|
||||
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return *p;
|
||||
}
|
||||
|
||||
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
|
@ -4240,12 +4241,8 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
|
||||
STableBlockScanInfo* pBlockScanInfo =
|
||||
*(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
|
||||
if (pBlockScanInfo == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
|
||||
taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -62,9 +62,6 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
|
|||
pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock);
|
||||
pEntry->dataLen = sizeof(SDeleterRes);
|
||||
|
||||
// ASSERT(1 == pEntry->numOfRows);
|
||||
// ASSERT(3 == pEntry->numOfCols);
|
||||
|
||||
pBuf->useSize = sizeof(SDataCacheEntry);
|
||||
|
||||
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
|
||||
|
|
|
@ -170,7 +170,6 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in
|
|||
}
|
||||
|
||||
pGroupResInfo->index = 0;
|
||||
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
|
||||
}
|
||||
|
||||
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
|
||||
|
@ -334,10 +333,7 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle,
|
|||
return code;
|
||||
}
|
||||
|
||||
ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
|
||||
SValueNode* pValue = (SValueNode*)pNew;
|
||||
|
||||
ASSERT(pValue->node.resType.type == TSDB_DATA_TYPE_BOOL);
|
||||
*pQualified = pValue->datum.b;
|
||||
|
||||
nodesDestroyNode(pNew);
|
||||
|
@ -1056,7 +1052,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
|||
}
|
||||
|
||||
if (!pTagCond) { // no tag filter condition exists, let's fetch all tables of this super table
|
||||
ASSERT(pTagIndexCond == NULL);
|
||||
vnodeGetCtbIdList(pVnode, pScanNode->suid, pUidList);
|
||||
} else {
|
||||
// failed to find the result in the cache, let try to calculate the results
|
||||
|
@ -1148,7 +1143,6 @@ int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode,
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
REPLACE_NODE(pNew);
|
||||
} else {
|
||||
taosMemoryFree(keyBuf);
|
||||
nodesDestroyList(groupNew);
|
||||
metaReaderClear(&mr);
|
||||
return code;
|
||||
|
@ -1166,7 +1160,6 @@ int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode,
|
|||
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
|
||||
if (tTagIsJson(data)) {
|
||||
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
||||
taosMemoryFree(keyBuf);
|
||||
nodesDestroyList(groupNew);
|
||||
metaReaderClear(&mr);
|
||||
return terrno;
|
||||
|
@ -1368,7 +1361,6 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
|
||||
pExprNode->_function.functionName[len] == 0) {
|
||||
pFuncNode->pParameterList = nodesMakeList();
|
||||
ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0);
|
||||
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
if (NULL == res) { // todo handle error
|
||||
} else {
|
||||
|
@ -1808,7 +1800,6 @@ uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
|
|||
// TODO handle the group offset info, fix it, the rule of group output will be broken by this function
|
||||
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
|
||||
if (pTableList->map == NULL) {
|
||||
ASSERT(taosArrayGetSize(pTableList->pTableList) == 0);
|
||||
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
}
|
||||
|
||||
|
@ -1958,7 +1949,6 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
|||
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group,
|
||||
bool groupSort) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
ASSERT(pTableListInfo->map != NULL);
|
||||
|
||||
bool groupByTbname = groupbyTbname(group);
|
||||
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
||||
|
@ -2015,7 +2005,6 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
|||
}
|
||||
|
||||
int32_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
||||
ASSERT(pTableListInfo->numOfOuputGroups == 1);
|
||||
|
||||
int64_t st1 = taosGetTimestampUs();
|
||||
pTaskInfo->cost.extractListTime = (st1 - st) / 1000.0;
|
||||
|
|
|
@ -35,7 +35,6 @@ static void initRefPool() {
|
|||
}
|
||||
|
||||
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
|
||||
ASSERT(pOperator != NULL);
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
if (pOperator->numOfDownstream == 0) {
|
||||
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
|
||||
|
@ -75,27 +74,23 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf
|
|||
}
|
||||
|
||||
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
|
||||
{
|
||||
ASSERT(pOperator != NULL);
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
if (pOperator->numOfDownstream == 0) {
|
||||
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
|
||||
if (pOperator->numOfDownstream > 1) { // not handle this in join query
|
||||
qError("join not supported for stream block scan, %s" PRIx64, id);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
return doSetStreamOpOpen(pOperator->pDownstream[0], id);
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
if (pOperator->numOfDownstream == 0) {
|
||||
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
|
||||
if (pOperator->numOfDownstream > 1) { // not handle this in join query
|
||||
qError("join not supported for stream block scan, %s" PRIx64, id);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
return doSetStreamOpOpen(pOperator->pDownstream[0], id);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
|
||||
ASSERT(pOperator != NULL);
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
if (pOperator->numOfDownstream == 0) {
|
||||
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
|
||||
|
@ -353,7 +348,6 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
return code;
|
||||
}
|
||||
|
||||
// todo refactor STableList
|
||||
bool assignUid = false;
|
||||
size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
|
||||
char* keyBuf = NULL;
|
||||
|
|
|
@ -240,7 +240,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
|||
|
||||
// allocate a new buffer page
|
||||
if (pResult == NULL) {
|
||||
ASSERT(pSup->resultRowSize > 0);
|
||||
pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
|
||||
if (pResult == NULL) {
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
|
@ -310,7 +309,6 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
|
|||
pWindowRes->offset = (int32_t)pData->num;
|
||||
|
||||
pData->num += size;
|
||||
assert(pWindowRes->pageId >= 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -488,7 +486,6 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int
|
|||
// todo: refactor this
|
||||
if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
|
||||
pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data.
|
||||
// ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
}
|
||||
ASSERT(pInput->pData[j] != NULL);
|
||||
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
|
||||
|
@ -1024,8 +1021,6 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
|
|||
|
||||
SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
|
||||
sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
|
||||
assert(pResultRow != NULL);
|
||||
|
||||
/*
|
||||
* not assign result buffer yet, add new result buffer
|
||||
* all group belong to one result set, and each group result has different group id so set the id to be one
|
||||
|
@ -1279,7 +1274,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
|||
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
|
||||
//
|
||||
// assert(pQueryAttr->limit.offset == 0);
|
||||
// STimeWindow tw = *win;
|
||||
// getNextTimeWindow(pQueryAttr, &tw);
|
||||
//
|
||||
|
@ -1294,7 +1288,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
|||
// tw = *win;
|
||||
// int32_t startPos =
|
||||
// getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
|
||||
// assert(startPos >= 0);
|
||||
//
|
||||
// // set the abort info
|
||||
// pQueryAttr->pos = startPos;
|
||||
|
@ -1329,11 +1322,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
|||
|
||||
// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
|
||||
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
// if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
|
||||
// assert(*start <= pRuntimeEnv->current->lastKey);
|
||||
// } else {
|
||||
// assert(*start >= pRuntimeEnv->current->lastKey);
|
||||
// }
|
||||
//
|
||||
// // if queried with value filter, do NOT forward query start position
|
||||
// if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
|
||||
|
@ -1347,8 +1335,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
|||
// value is
|
||||
// * not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
|
||||
// */
|
||||
// assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
|
||||
//
|
||||
// STimeWindow w = TSWINDOW_INITIALIZER;
|
||||
// bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
|
||||
//
|
||||
|
@ -1418,8 +1404,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
|||
// tw = win;
|
||||
// int32_t startPos =
|
||||
// getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
|
||||
// assert(startPos >= 0);
|
||||
//
|
||||
// // set the abort info
|
||||
// pQueryAttr->pos = startPos;
|
||||
// pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
|
||||
|
@ -1441,10 +1425,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
|||
// }
|
||||
|
||||
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
|
||||
if (p->pDownstream == NULL) {
|
||||
assert(p->numOfDownstream == 0);
|
||||
}
|
||||
|
||||
p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
|
||||
if (p->pDownstream == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1800,7 +1780,10 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo
|
|||
}
|
||||
|
||||
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
|
||||
ASSERT(numOfRows != 0);
|
||||
if (numOfRows == 0) {
|
||||
numOfRows = 4096;
|
||||
}
|
||||
|
||||
pResultInfo->capacity = numOfRows;
|
||||
pResultInfo->threshold = numOfRows * 0.75;
|
||||
|
||||
|
@ -1941,7 +1924,6 @@ _error:
|
|||
}
|
||||
|
||||
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
|
||||
assert(pInfo != NULL);
|
||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||
}
|
||||
|
||||
|
@ -2022,7 +2004,12 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode,
|
|||
tDecoderClear(&mr.coder);
|
||||
|
||||
tb_uid_t suid = mr.me.ctbEntry.suid;
|
||||
metaGetTableEntryByUidCache(&mr, suid);
|
||||
code = metaGetTableEntryByUidCache(&mr, suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
metaReaderClear(&mr);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
|
||||
pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
|
||||
} else {
|
||||
|
@ -2248,7 +2235,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
} else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
|
||||
pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (pOperator != NULL) {
|
||||
|
@ -2340,7 +2328,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
|
||||
pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
taosMemoryFree(ops);
|
||||
|
@ -2578,7 +2567,6 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
*pResult = (SResultRow*)value;
|
||||
ASSERT(*pResult);
|
||||
// set time window for current result
|
||||
(*pResult)->win = (*win);
|
||||
setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
|
||||
|
|
|
@ -193,8 +193,6 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
return pResBlock;
|
||||
}
|
||||
} else if (pInfo->existNewGroupBlock) { // try next group
|
||||
assert(pBlock != NULL);
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
|
||||
doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
|
|
|
@ -204,7 +204,6 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
|
|||
}
|
||||
|
||||
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
|
||||
ASSERT(pKey != NULL);
|
||||
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
|
||||
|
||||
char* isNull = (char*)pKey;
|
||||
|
@ -570,7 +569,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
(*columnLen) += contentLen;
|
||||
ASSERT(*columnLen >= 0);
|
||||
}
|
||||
|
||||
(*rows) += 1;
|
||||
|
@ -681,7 +679,6 @@ static int compareDataGroupInfo(const void* group1, const void* group2) {
|
|||
const SDataGroupInfo* pGroupInfo2 = group2;
|
||||
|
||||
if (pGroupInfo1->groupId == pGroupInfo2->groupId) {
|
||||
ASSERT(0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -3019,8 +3019,8 @@ int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCoun
|
|||
}
|
||||
}
|
||||
} else {
|
||||
strncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
|
||||
strncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
|
||||
tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
|
||||
tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -768,8 +768,6 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
|||
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
|
||||
|
||||
int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
|
||||
ASSERT(rowSize < 100 * 1024 * 1024);
|
||||
|
||||
int32_t numOfOutputCols = 0;
|
||||
code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
|
||||
&pInfo->matchInfo);
|
||||
|
|
|
@ -213,6 +213,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
|||
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
||||
if (pPage == NULL) {
|
||||
blockDataDestroy(p);
|
||||
taosArrayDestroy(pPageIdList);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
|
|
@ -2310,6 +2310,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
|
|||
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
||||
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||
pCtx->epSet = *pEpSet;
|
||||
pCtx->origEpSet = *pEpSet;
|
||||
pCtx->ahandle = pReq->info.ahandle;
|
||||
pCtx->msgType = pReq->msgType;
|
||||
|
||||
|
|
|
@ -118,7 +118,7 @@ char **strsplit(char *z, const char *delim, int32_t *num) {
|
|||
if ((*num) >= size) {
|
||||
size = (size << 1);
|
||||
split = taosMemoryRealloc(split, POINTER_BYTES * size);
|
||||
ASSERTS(NULL != split, "realloc memory failed. size=%d", POINTER_BYTES * size);
|
||||
ASSERTS(NULL != split, "realloc memory failed. size=%d", (int32_t) POINTER_BYTES * size);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -400,7 +400,7 @@ TAOS* createNewTaosConnect() {
|
|||
int32_t retryCnt = 10;
|
||||
|
||||
while (retryCnt--) {
|
||||
TAOS* taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||
taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||
if (NULL != taos) {
|
||||
return taos;
|
||||
}
|
||||
|
@ -780,7 +780,8 @@ void loop_consume(SThreadInfo* pInfo) {
|
|||
|
||||
if (pInfo->ifCheckData) {
|
||||
char filename[256] = {0};
|
||||
char tmpString[128];
|
||||
memset(tmpString, 0, tListLen(tmpString));
|
||||
|
||||
// sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId,
|
||||
// getCurrentTimeString(tmpString));
|
||||
sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
|
||||
|
@ -834,12 +835,12 @@ void loop_consume(SThreadInfo* pInfo) {
|
|||
}
|
||||
|
||||
if ((totalRows >= pInfo->expectMsgCnt) || (totalMsgs >= pInfo->expectMsgCnt)) {
|
||||
char tmpString[128];
|
||||
memset(tmpString, 0, tListLen(tmpString));
|
||||
taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
char tmpString[128];
|
||||
memset(tmpString, 0, tListLen(tmpString));
|
||||
taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
|
||||
break;
|
||||
}
|
||||
|
@ -1113,7 +1114,7 @@ void omb_loop_consume(SThreadInfo* pInfo) {
|
|||
lastTotalLenOfMsg = totalLenOfMsg;
|
||||
}
|
||||
} else {
|
||||
char tmpString[128];
|
||||
memset(tmpString, 0, tListLen(tmpString));
|
||||
taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
|
||||
printf("%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
|
||||
int64_t currentPrintTime = taosGetTimestampMs();
|
||||
|
@ -1381,7 +1382,7 @@ void startOmbConsume() {
|
|||
printf("SQL: %s\n", sql);
|
||||
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
||||
|
||||
int32_t producerRate = ceil(g_stConfInfo.producerRate / g_stConfInfo.producers);
|
||||
int32_t producerRate = ceil(((double)g_stConfInfo.producerRate) / g_stConfInfo.producers);
|
||||
|
||||
printf("==== create %d produce thread ====\n", g_stConfInfo.producers);
|
||||
for (int32_t i = 0; i < g_stConfInfo.producers; ++i) {
|
||||
|
|
Loading…
Reference in New Issue