diff --git a/examples/c/tmq.c b/examples/c/tmq.c index eb41ad039a..8a2112fbcc 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -189,27 +189,46 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { tmq_t* build_consumer() { tmq_conf_res_t code; + tmq_t* tmq = NULL; + tmq_conf_t* conf = tmq_conf_new(); code = tmq_conf_set(conf, "enable.auto.commit", "true"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + goto _end; + } code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + goto _end; + } code = tmq_conf_set(conf, "group.id", "cgrpName"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + goto _end; + } code = tmq_conf_set(conf, "client.id", "user defined name"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + goto _end; + } code = tmq_conf_set(conf, "td.connect.user", "root"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + goto _end; + } code = tmq_conf_set(conf, "td.connect.pass", "taosdata"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + goto _end; + } code = tmq_conf_set(conf, "auto.offset.reset", "earliest"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + goto _end; + } code = tmq_conf_set(conf, "experimental.snapshot.enable", "false"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + goto _end; + } tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq = tmq_consumer_new(conf, NULL, 0); - tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + _end: tmq_conf_destroy(conf); return tmq; } diff --git a/include/common/tmisce.h b/include/common/tmisce.h index b9f5cf5b91..bc6558900c 100644 --- a/include/common/tmisce.h +++ b/include/common/tmisce.h @@ -27,13 +27,14 @@ typedef struct SCorEpSet { SEpSet epSet; } SCorEpSet; +#define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse])) int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp); void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port); -bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2); - -void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet); -SEpSet getEpSet_s(SCorEpSet* pEpSet); +bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2); +void epsetAssign(SEpSet* dst, const SEpSet* pSrc); +void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet); +SEpSet getEpSet_s(SCorEpSet* pEpSet); #ifdef __cplusplus } diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8dcadf47b6..335c57afd3 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1830,10 +1830,10 @@ typedef struct { } SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg; typedef struct { - int64_t consumerId; - char cgroup[TSDB_CGROUP_LEN]; - char clientId[256]; - SArray* topicNames; // SArray + int64_t consumerId; + char cgroup[TSDB_CGROUP_LEN]; + char clientId[256]; + SArray* topicNames; // SArray } SCMSubscribeReq; static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index fb8cdcfde3..69e10bd649 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -58,15 +58,14 @@ struct tmq_list_t { }; struct tmq_conf_t { - char clientId[256]; - char groupId[TSDB_CGROUP_LEN]; - int8_t autoCommit; - int8_t resetOffset; - int8_t withTbName; - int8_t snapEnable; - int32_t snapBatchSize; - bool hbBgEnable; - + char clientId[256]; + char groupId[TSDB_CGROUP_LEN]; + int8_t autoCommit; + int8_t resetOffset; + int8_t withTbName; + int8_t snapEnable; + int32_t snapBatchSize; + bool hbBgEnable; uint16_t port; int32_t autoCommitInterval; char* ip; @@ -213,6 +212,7 @@ typedef struct { typedef struct { SMqCommitCbParamSet* params; STqOffset* pOffset; + SMqClientVg* pMqVg; /*char topicName[TSDB_TOPIC_FNAME_LEN];*/ /*int32_t vgId;*/ } SMqCommitCbParam; @@ -422,7 +422,6 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) { int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); - ASSERT(waitingRspNum >= 0); if (waitingRspNum == 0) { tmqCommitDone(pParamSet); } @@ -440,6 +439,17 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { } #endif + // there may be race condition. fix it + if (pBuf->pEpSet != NULL && pParam->pMqVg != NULL) { + SMqClientVg* pMqVg = pParam->pMqVg; + + SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet); + SEp* pOld = GET_ACTIVE_EP(&(pMqVg->epSet)); + uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pMqVg->vgId, + pEp->fqdn, pEp->port, pOld->fqdn, pOld->port); + pParam->pMqVg->epSet = *pBuf->pEpSet; + } + taosMemoryFree(pParam->pOffset); taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pEpSet); @@ -448,7 +458,6 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { * pOffset->version);*/ tmqCommitRspCountDown(pParamSet); - return 0; } @@ -498,6 +507,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT pParam->params = pParamSet; pParam->pOffset = pOffset; + pParam->pMqVg = pVg; // there may be an race condition // build send info SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); @@ -518,7 +528,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64" prev:%"PRId64", ep:%s:%d", tmq->consumerId, pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port); - // TODO: put into cb + // TODO: put into cb, the commit offset should be move to the callback function pVg->committedOffset = pVg->currentOffset; pMsgSendInfo->requestId = generateRequestId(); @@ -540,7 +550,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) { char* topic; int32_t vgId; - ASSERT(msg != NULL); + if (TD_RES_TMQ(msg)) { SMqRspObj* pRspObj = (SMqRspObj*)msg; topic = pRspObj->topic; @@ -983,14 +993,12 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user; const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass; - ASSERT(conf->groupId[0]); - pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); pTmq->mqueue = taosOpenQueue(); pTmq->qall = taosAllocateQall(); pTmq->delayedTask = taosOpenQueue(); - if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) { + if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL || conf->groupId[0] == 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId); @@ -1069,7 +1077,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SCMSubscribeReq req = {0}; int32_t code = 0; - tscDebug("consumer:0x%"PRIx64" tmq subscribe start, numOfTopic %d", tmq->consumerId, sz); + tscDebug("consumer:0x%"PRIx64" subscribe %d topics", tmq->consumerId, sz); req.consumerId = tmq->consumerId; tstrncpy(req.clientId, tmq->clientId, 256); @@ -1092,7 +1100,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } tNameExtractFullName(&name, topicFName); - tscDebug("consumer:0x%"PRIx64" subscribe topic: %s", tmq->consumerId, topicFName); + tscDebug("consumer:0x%"PRIx64" subscribe topic:%s", tmq->consumerId, topicFName); taosArrayPush(req.topicNames, &topicFName); } @@ -1763,6 +1771,8 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) } void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { + tscDebug("consumer:0x%"PRIx64" start to handle the rsp", tmq->consumerId); + while (1) { SMqRspWrapper* rspWrapper = NULL; taosGetQitem(tmq->qall, (void**)&rspWrapper); @@ -1772,20 +1782,17 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosGetQitem(tmq->qall, (void**)&rspWrapper); if (rspWrapper == NULL) { - /*tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId);*/ return NULL; } } - tscDebug("consumer:0x%" PRIx64 " handle rsp %p", tmq->consumerId, rspWrapper); - if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { taosFreeQitem(rspWrapper); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; return NULL; } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; - tscDebug("consumer:0x%" PRIx64 " actual process poll rsp", tmq->consumerId); + tscDebug("consumer:0x%" PRIx64 " process poll rsp", tmq->consumerId); /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ int32_t consumerEpoch = atomic_load_32(&tmq->epoch); if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { @@ -1812,6 +1819,9 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); + + tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId); + if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, @@ -1876,7 +1886,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { void* rspObj; int64_t startTime = taosGetTimestampMs(); - tscDebug("consumer:0x%" PRIx64 ", start poll at %" PRId64, tmq->consumerId, startTime); + tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime); #if 0 tmqHandleAllDelayedTask(tmq); @@ -1889,7 +1899,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { // in no topic status, delayed task also need to be processed if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { - tscDebug("consumer:0x%" PRIx64 ", poll return since consumer is init", tmq->consumerId); + tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId); return NULL; } @@ -1915,25 +1925,25 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { rspObj = tmqHandleAllRsp(tmq, timeout, false); if (rspObj) { - tscDebug("consumer:0x%" PRIx64 ", return rsp %p", tmq->consumerId, rspObj); + tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); return (TAOS_RES*)rspObj; } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { - tscDebug("consumer:0x%" PRIx64 ", return null since no committed offset", tmq->consumerId); + tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId); return NULL; } if (timeout != -1) { int64_t currentTime = taosGetTimestampMs(); - int64_t passedTime = currentTime - startTime; - if (passedTime > timeout) { - tscDebug("consumer:0x%" PRIx64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, + int64_t elapsedTime = currentTime - startTime; + if (elapsedTime > timeout) { + tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, tmq->consumerId, tmq->epoch, startTime, currentTime); return NULL; } /*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/ /*", left time %" PRId64,*/ - /*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/ - tsem_timewait(&tmq->rspSem, (timeout - passedTime)); + /*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - elapsedTime));*/ + tsem_timewait(&tmq->rspSem, (timeout - elapsedTime)); } else { // use tsem_timewait instead of tsem_wait to avoid unexpected stuck tsem_timewait(&tmq->rspSem, 1000); diff --git a/source/common/src/tmisce.c b/source/common/src/tmisce.c index dfb1917fcf..59afce1bbb 100644 --- a/source/common/src/tmisce.c +++ b/source/common/src/tmisce.c @@ -60,6 +60,19 @@ bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2) { return true; } +void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) { + if (pSrc == NULL || pDst == NULL) { + return; + } + + pDst->inUse = pSrc->inUse; + pDst->numOfEps = pSrc->numOfEps; + for (int32_t i = 0; i < pSrc->numOfEps; ++i) { + pDst->eps[i].port = pSrc->eps[i].port; + tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn)); + } +} + void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) { taosCorBeginWrite(&pEpSet->version); pEpSet->epSet = *pNewEpSet; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 1aa2fa997b..d167c9c09a 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -238,7 +238,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { // iterate all consumers, find all modification while (1) { pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t status = atomic_load_32(&pConsumer->status); @@ -335,7 +337,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { int64_t consumerId = req.consumerId; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { - mError("consumer %" PRId64 " not exist", consumerId); + mError("consumer:0x%"PRIx64 " not exist", consumerId); terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; return -1; } @@ -345,7 +347,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { int32_t status = atomic_load_32(&pConsumer->status); if (status == MQ_CONSUMER_STATUS__LOST_REBD) { - mInfo("try to recover consumer %" PRId64 "", consumerId); + mInfo("try to recover consumer:0x%"PRIx64 "", consumerId); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); pRecoverMsg->consumerId = consumerId; @@ -390,7 +392,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { #if 1 if (status == MQ_CONSUMER_STATUS__LOST_REBD) { - mInfo("try to recover consumer %" PRId64 "", consumerId); + mInfo("try to recover consumer:0x%"PRIx64 "", consumerId); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); pRecoverMsg->consumerId = consumerId; @@ -404,14 +406,14 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { #endif if (status != MQ_CONSUMER_STATUS__READY) { - mInfo("consumer %" PRId64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); + mInfo("consumer:0x%"PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; return -1; } int32_t serverEpoch = atomic_load_32(&pConsumer->epoch); - // 2. check epoch, only send ep info when epoches do not match + // 2. check epoch, only send ep info when epochs do not match if (epoch != serverEpoch) { taosRLockLatch(&pConsumer->lock); mInfo("process ask ep, consumer:%" PRId64 "(epoch %d), server epoch %d", consumerId, epoch, serverEpoch); @@ -526,12 +528,14 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj return 0; } -static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; - char *msgStr = pMsg->pCont; +int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { + SMnode *pMnode = pMsg->info.node; + char *msgStr = pMsg->pCont; + SCMSubscribeReq subscribe = {0}; tDeserializeSCMSubscribeReq(msgStr, &subscribe); - int64_t consumerId = subscribe.consumerId; + + uint64_t consumerId = subscribe.consumerId; char *cgroup = subscribe.cgroup; SMqConsumerObj *pConsumerOld = NULL; SMqConsumerObj *pConsumerNew = NULL; @@ -542,21 +546,23 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { taosArrayRemoveDuplicateP(newSub, taosArrayCompareString, taosMemoryFree); int32_t newTopicNum = taosArrayGetSize(newSub); + // check topic existance STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); - if (pTrans == NULL) goto SUBSCRIBE_OVER; + if (pTrans == NULL) { + goto _over; + } for (int32_t i = 0; i < newTopicNum; i++) { char *topic = taosArrayGetP(newSub, i); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - if (pTopic == NULL) { - terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; - goto SUBSCRIBE_OVER; + if (pTopic == NULL) { // terrno has been set by callee function + goto _over; } if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) { mndReleaseTopic(pMnode, pTopic); - goto SUBSCRIBE_OVER; + goto _over; } mndReleaseTopic(pMnode, pTopic); @@ -578,8 +584,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy); } - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER; + if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over; + if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; } else { /*taosRLockLatch(&pConsumerOld->lock);*/ @@ -591,13 +597,13 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { if (status != MQ_CONSUMER_STATUS__READY) { terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; - goto SUBSCRIBE_OVER; + goto _over; } pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); if (pConsumerNew == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - goto SUBSCRIBE_OVER; + goto _over; } pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; @@ -650,16 +656,16 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { /*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/ /*pConsumerNew->updateType = */ /*}*/ - goto SUBSCRIBE_OVER; + goto _over; } - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER; + if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over; + if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; } code = TSDB_CODE_ACTION_IN_PROGRESS; -SUBSCRIBE_OVER: +_over: mndTransDrop(pTrans); if (pConsumerOld) { @@ -971,16 +977,19 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer); - if (pShow->pIter == NULL) break; + if (pShow->pIter == NULL) { + break; + } + if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { - mDebug("showing consumer %" PRId64 " no assigned topic, skip", pConsumer->consumerId); + mDebug("showing consumer:0x%"PRIx64 " no assigned topic, skip", pConsumer->consumerId); sdbRelease(pSdb, pConsumer); continue; } taosRLockLatch(&pConsumer->lock); - mDebug("showing consumer %" PRId64, pConsumer->consumerId); + mDebug("showing consumer:0x%"PRIx64, pConsumer->consumerId); int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics); bool hasTopic = true; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index af1a29def0..ca79b8e122 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -523,7 +523,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib SSdb* pSdb = pMnode->pSdb; SVgObj* pVgroup = NULL; SQueryPlan* pPlan = NULL; - SSubplan* plan = NULL; + SSubplan* pSubplan = NULL; if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) { pPlan = qStringToQueryPlan(pTopic->physicalPlan); @@ -539,24 +539,27 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib return -1; } - SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); + SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); - int32_t opNum = LIST_LENGTH(inner->pNodeList); + int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList); if (opNum != 1) { qDestroyQueryPlan(pPlan); terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY; return -1; } - plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); + + pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0); } ASSERT(pSub->unassignedVgs); - ASSERT(taosHashGetSize(pSub->consumerHash) == 0); void* pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) { sdbRelease(pSdb, pVgroup); continue; @@ -569,15 +572,15 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib pVgEp->vgId = pVgroup->vgId; taosArrayPush(pSub->unassignedVgs, &pVgEp); - mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId); + mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId); if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) { int32_t msgLen; - plan->execNode.epSet = pVgEp->epSet; - plan->execNode.nodeId = pVgEp->vgId; + pSubplan->execNode.epSet = pVgEp->epSet; + pSubplan->execNode.nodeId = pVgEp->vgId; - if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) { + if (qSubPlanToString(pSubplan, &pVgEp->qmsg, &msgLen) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); terrno = TSDB_CODE_QRY_INVALID_INPUT; @@ -590,11 +593,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib sdbRelease(pSdb, pVgroup); } - ASSERT(pSub->unassignedVgs->size > 0); - - ASSERT(taosHashGetSize(pSub->consumerHash) == 0); - + ASSERT(taosArrayGetSize(pSub->unassignedVgs) > 0); qDestroyQueryPlan(pPlan); - return 0; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index d127ceacf5..0d805b04fc 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -39,12 +39,10 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw); static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *); static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *); static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub); - -static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg); -static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg); - -static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); -static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter); +static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg); +static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg); +static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); +static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter); static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { SSdbRaw *pRedoRaw = mndSubActionEncode(pSub); @@ -85,12 +83,13 @@ int32_t mndInitSubscribe(SMnode *pMnode) { return sdbSetTable(pMnode->pSdb, table); } -static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) { +static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) { SMqSubscribeObj *pSub = tNewSubscribeObj(subKey); if (pSub == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pSub->dbUid = pTopic->dbUid; pSub->stbUid = pTopic->stbUid; pSub->subType = pTopic->subType; @@ -205,7 +204,7 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { int32_t totalVgNum = pOutput->pSub->vgNum; const char *sub = pOutput->pSub->key; - mInfo("sub:%s, mq rebalance vgNum:%d", sub, pOutput->pSub->vgNum); + mInfo("sub:%s mq re-balance %d vgroups", sub, pOutput->pSub->vgNum); // 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); @@ -214,7 +213,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR int32_t removedNum = taosArrayGetSize(pInput->pRebInfo->removedConsumers); int32_t actualRemoved = 0; for (int32_t i = 0; i < removedNum; i++) { - int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i); + uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i); SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); @@ -229,7 +228,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("sub:%s, mq rebalance remove vgId:%d from consumer:%" PRId64, sub, pVgEp->vgId, consumerId); + mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRId64, sub, pVgEp->vgId, consumerId); } taosArrayDestroy(pConsumerEp->vgs); taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); @@ -239,7 +238,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR } if (removedNum != actualRemoved) { - mError("sub:%s, mq rebalance removedNum:%d not matched with actual:%d", sub, removedNum, actualRemoved); + mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", sub, removedNum, actualRemoved); } // if previously no consumer, there are vgs not assigned @@ -253,7 +252,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg)); - mInfo("sub:%s, mq rebalance remove vgId:%d from unassigned", sub, pVgEp->vgId); + mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", sub, pVgEp->vgId); } } @@ -267,7 +266,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR minVgCnt = totalVgNum / afterRebConsumerNum; imbConsumerNum = totalVgNum % afterRebConsumerNum; } - mInfo("sub:%s, mq rebalance %d consumer after rebalance, at least %d vg each, %d consumer has more vg", sub, + + mInfo("sub:%s mq re-balance %d consumers: at least %d vg each, %d consumer has more vg", sub, afterRebConsumerNum, minVgCnt, imbConsumerNum); // 4. first scan: remove consumer more than wanted, put to remove hash @@ -275,7 +275,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR void *pIter = NULL; while (1) { pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); @@ -297,7 +300,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("sub:%s, mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId, + mInfo("sub:%s mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId, pConsumerEp->consumerId); } imbCnt++; @@ -312,7 +315,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("sub:%s, mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId, + mInfo("sub:%s mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId, pConsumerEp->consumerId); } } @@ -330,7 +333,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR newConsumerEp.vgs = taosArrayInit(0, sizeof(void *)); taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosArrayPush(pOutput->newConsumers, &consumerId); - mInfo("sub:%s, mq rebalance add new consumer:%" PRId64, sub, consumerId); + mInfo("sub:%s mq rebalance add new consumer:%" PRId64, sub, consumerId); } } @@ -349,7 +352,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR // iter hash and find one vg pRemovedIter = taosHashIterate(pHash, pRemovedIter); if (pRemovedIter == NULL) { - mError("sub:%s, removed iter is null", sub); + mError("sub:%s removed iter is null", sub); continue; } @@ -402,33 +405,36 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR SMqRebOutputVg *pRebOutput = NULL; while (1) { pIter = taosHashIterate(pHash, pIter); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + pRebOutput = (SMqRebOutputVg *)pIter; taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); taosArrayPush(pOutput->rebVgs, pRebOutput); - mInfo("sub:%s, mq rebalance unassign vgId:%d (second scan)", sub, pRebOutput->pVgEp->vgId); + mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", sub, pRebOutput->pVgEp->vgId); } } // 8. generate logs - mInfo("sub:%s, mq rebalance calculation completed, rebalanced vg", sub); + mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", sub); for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) { SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i); - mInfo("sub:%s, mq rebalance vgId:%d, moved from consumer:%" PRId64 ", to consumer:%" PRId64, sub, + mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, sub, pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId); } { - void *pIter = NULL; + pIter = NULL; while (1) { pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) break; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; int32_t sz = taosArrayGetSize(pConsumerEp->vgs); - mInfo("sub:%s, mq rebalance final cfg: consumer %" PRId64 " has %d vg", sub, pConsumerEp->consumerId, sz); + mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRId64 " has %d vg", sub, pConsumerEp->consumerId, sz); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); - mInfo("sub:%s, mq rebalance final cfg: vg %d to consumer %" PRId64 "", sub, pVgEp->vgId, + mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRId64, sub, pVgEp->vgId, pConsumerEp->consumerId); } } @@ -552,11 +558,14 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { SMqDoRebalanceMsg *pReq = pMsg->pCont; void *pIter = NULL; - mInfo("mq rebalance start"); + mInfo("mq re-balance start"); while (1) { pIter = taosHashIterate(pReq->rebSubHash, pIter); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + SMqRebInputObj rebInput = {0}; SMqRebOutputObj rebOutput = {0}; @@ -577,12 +586,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); if (pTopic == NULL) { - mError("mq rebalance %s failed since topic %s not exist, abort", pRebInfo->key, topic); + mError("mq re-balance %s ignored since topic %s not exist", pRebInfo->key, topic); continue; } + taosRLockLatch(&pTopic->lock); - rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key); + rebOutput.pSub = mndCreateSubscription(pMnode, pTopic, pRebInfo->key); if (rebOutput.pSub == NULL) { mError("mq rebalance %s failed create sub since %s, abort", pRebInfo->key, terrstr()); @@ -605,15 +615,16 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { } if (mndDoRebalance(pMnode, &rebInput, &rebOutput) < 0) { - mError("mq rebalance internal error"); + mError("mq re-balance internal error"); } // if add more consumer to balanced subscribe, // possibly no vg is changed if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) { - mError("mq rebalance persist rebalance output error, possibly vnode splitted or dropped"); + mError("mq re-balance persist re-balance output error, possibly vnode splitted or dropped"); } + taosArrayDestroy(pRebInfo->lostConsumers); taosArrayDestroy(pRebInfo->newConsumers); taosArrayDestroy(pRebInfo->removedConsumers); @@ -627,19 +638,18 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { } // reset flag - mInfo("mq rebalance completed successfully"); + mInfo("mq re-balance completed successfully"); taosHashCleanup(pReq->rebSubHash); mndRebEnd(); return 0; } -static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; +static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { + SMnode *pMnode = pMsg->info.node; SMDropCgroupReq dropReq = {0}; - if (tDeserializeSMDropCgroupReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -663,7 +673,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "drop-cgroup"); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup"); if (pTrans == NULL) { mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); mndReleaseSubscribe(pMnode, pSub); @@ -956,7 +966,7 @@ END: return code; } -static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { +int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; @@ -1090,7 +1100,7 @@ static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock return numOfRows; } -static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) { +void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 48c35f3f07..f712825b22 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -33,7 +33,7 @@ static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic); -static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic); +static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic); static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq); static int32_t mndProcessDropTopicReq(SRpcMsg *pReq); diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 42728be657..9b3dbcd8ea 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -148,7 +148,7 @@ bool tsdbDelFileIsSame(SDelFile *pDelFile1, SDelFile *pDelFile2) { return pDelFi int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) { int32_t code = 0; - int64_t size; + int64_t size = 0; int64_t n; TdFilePtr pFD; char fname[TSDB_FILENAME_LEN]; @@ -167,7 +167,7 @@ int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) { tPutSmaFile(hdr, pSet->pSmaF); break; default: - ASSERT(0); + goto _err; // make the coverity scan happy } taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4e1b24750a..78793e6a95 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -220,6 +220,8 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); +static STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id); + static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid); static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } @@ -699,13 +701,11 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, if (pBlockIdx->uid == pList->tableUidList[j]) { // this block belongs to a table that is not queried. - void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t)); - if (p == NULL) { - tsdbError("failed to locate the tableBlockScan Info in hashmap, uid:%"PRIu64", %s", pBlockIdx->uid, pReader->idStr); - return TSDB_CODE_APP_ERROR; + STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr); + if (pScanInfo == NULL) { + return terrno; } - STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p; if (pScanInfo->pBlockList == NULL) { pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex)); } @@ -753,9 +753,10 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN for (int32_t i = 0; i < numOfTables; ++i) { SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i); - - STableBlockScanInfo* pScanInfo = - *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t)); + STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr); + if (pScanInfo == NULL) { + return terrno; + } tMapDataReset(&pScanInfo->mapData); tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); @@ -854,9 +855,7 @@ static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int or s = pos; // check - assert(pos >= 0 && pos < num); - assert(num > 0); - + ASSERT(pos >= 0 && pos < num && num > 0); if (order == TSDB_ORDER_ASC) { // find the first position which is smaller than the key e = num - 1; @@ -1257,14 +1256,13 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); if (pBlockInfo != NULL) { - STableBlockScanInfo** pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pBlockIter->pTableMap, pBlockInfo->uid, idStr); if (pScanInfo == NULL) { - tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr); - return TSDB_CODE_INVALID_PARA; + return terrno; } - SBlockIndex* pIndex = taosArrayGet((*pScanInfo)->pBlockList, pBlockInfo->tbBlockIdx); - tMapDataGetItemByIdx(&(*pScanInfo)->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk); + SBlockIndex* pIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx); + tMapDataGetItemByIdx(&pScanInfo->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk); } #if 0 @@ -2507,16 +2505,11 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { STableBlockScanInfo* pBlockScanInfo = NULL; if (pBlockInfo != NULL) { - void* p = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); - if (p == NULL) { - code = TSDB_CODE_INVALID_PARA; - tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, - taosHashGetSize(pReader->status.pTableMap), pReader->idStr); + pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); + if (pBlockScanInfo == NULL) { goto _end; } - pBlockScanInfo = *(STableBlockScanInfo**)p; - SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); @@ -2855,13 +2848,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; - ASSERT(pBlockInfo != NULL); - - pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); if (pScanInfo == NULL) { - tsdbError("failed to get table scan-info, %s", pReader->idStr); - code = TSDB_CODE_INVALID_PARA; - return code; + return terrno; } pBlock = getCurrentBlock(pBlockIter); @@ -4202,7 +4191,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, SSDataBlock* pResBlock = pReader->pResBlock; if (pResBlock->pBlockAgg == NULL) { size_t num = taosArrayGetSize(pResBlock->pDataBlock); - pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg)); + pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES); } // do fill all null column value SMA info @@ -4232,6 +4221,18 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, return code; } +STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id) { + STableBlockScanInfo** p = taosHashGet(pTableMap, &uid, sizeof(uid)); + if (p == NULL || *p == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + int32_t size = taosHashGetSize(pTableMap); + tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id); + return NULL; + } + + return *p; +} + static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; @@ -4240,12 +4241,8 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { } SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); - STableBlockScanInfo* pBlockScanInfo = - *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr); if (pBlockScanInfo == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, - taosHashGetSize(pReader->status.pTableMap), pReader->idStr); return NULL; } diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index a8051ea7c3..2366d0c2f1 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -62,9 +62,6 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock); pEntry->dataLen = sizeof(SDeleterRes); -// ASSERT(1 == pEntry->numOfRows); -// ASSERT(3 == pEntry->numOfCols); - pBuf->useSize = sizeof(SDataCacheEntry); SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index b2d2a06953..de6cd1a2b3 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -170,7 +170,6 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in } pGroupResInfo->index = 0; - assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); } void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) { @@ -334,10 +333,7 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, return code; } - ASSERT(nodeType(pNew) == QUERY_NODE_VALUE); SValueNode* pValue = (SValueNode*)pNew; - - ASSERT(pValue->node.resType.type == TSDB_DATA_TYPE_BOOL); *pQualified = pValue->datum.b; nodesDestroyNode(pNew); @@ -1056,7 +1052,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, } if (!pTagCond) { // no tag filter condition exists, let's fetch all tables of this super table - ASSERT(pTagIndexCond == NULL); vnodeGetCtbIdList(pVnode, pScanNode->suid, pUidList); } else { // failed to find the result in the cache, let try to calculate the results @@ -1148,7 +1143,6 @@ int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, if (TSDB_CODE_SUCCESS == code) { REPLACE_NODE(pNew); } else { - taosMemoryFree(keyBuf); nodesDestroyList(groupNew); metaReaderClear(&mr); return code; @@ -1166,7 +1160,6 @@ int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) { if (tTagIsJson(data)) { terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR; - taosMemoryFree(keyBuf); nodesDestroyList(groupNew); metaReaderClear(&mr); return terrno; @@ -1368,7 +1361,6 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) && pExprNode->_function.functionName[len] == 0) { pFuncNode->pParameterList = nodesMakeList(); - ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0); SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); if (NULL == res) { // todo handle error } else { @@ -1808,7 +1800,6 @@ uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { // TODO handle the group offset info, fix it, the rule of group output will be broken by this function int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) { if (pTableList->map == NULL) { - ASSERT(taosArrayGetSize(pTableList->pTableList) == 0); pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); } @@ -1958,7 +1949,6 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) { int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) { int32_t code = TSDB_CODE_SUCCESS; - ASSERT(pTableListInfo->map != NULL); bool groupByTbname = groupbyTbname(group); size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); @@ -2015,7 +2005,6 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags } int32_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); - ASSERT(pTableListInfo->numOfOuputGroups == 1); int64_t st1 = taosGetTimestampUs(); pTaskInfo->cost.extractListTime = (st1 - st) / 1000.0; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6c354c3d61..8f8b32ff07 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -35,7 +35,6 @@ static void initRefPool() { } static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { - ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id); @@ -75,27 +74,23 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf } static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) { - { - ASSERT(pOperator != NULL); - if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - if (pOperator->numOfDownstream == 0) { - qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id); - return TSDB_CODE_APP_ERROR; - } - - if (pOperator->numOfDownstream > 1) { // not handle this in join query - qError("join not supported for stream block scan, %s" PRIx64, id); - return TSDB_CODE_APP_ERROR; - } - pOperator->status = OP_NOT_OPENED; - return doSetStreamOpOpen(pOperator->pDownstream[0], id); + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + if (pOperator->numOfDownstream == 0) { + qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id); + return TSDB_CODE_APP_ERROR; } + + if (pOperator->numOfDownstream > 1) { // not handle this in join query + qError("join not supported for stream block scan, %s" PRIx64, id); + return TSDB_CODE_APP_ERROR; + } + pOperator->status = OP_NOT_OPENED; + return doSetStreamOpOpen(pOperator->pDownstream[0], id); } return 0; } static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { - ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id); @@ -353,7 +348,6 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo return code; } - // todo refactor STableList bool assignUid = false; size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0; char* keyBuf = NULL; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d28c3cfe58..35c884c1a1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -240,7 +240,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // allocate a new buffer page if (pResult == NULL) { - ASSERT(pSup->resultRowSize > 0); pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize); if (pResult == NULL) { T_LONG_JMP(pTaskInfo->env, terrno); @@ -310,7 +309,6 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes pWindowRes->offset = (int32_t)pData->num; pData->num += size; - assert(pWindowRes->pageId >= 0); } return 0; @@ -488,7 +486,6 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int // todo: refactor this if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) { pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data. - // ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP); } ASSERT(pInput->pData[j] != NULL); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { @@ -1024,8 +1021,6 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup); - assert(pResultRow != NULL); - /* * not assign result buffer yet, add new result buffer * all group belong to one result set, and each group result has different group id so set the id to be one @@ -1279,7 +1274,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG // STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; // SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; // -// assert(pQueryAttr->limit.offset == 0); // STimeWindow tw = *win; // getNextTimeWindow(pQueryAttr, &tw); // @@ -1294,7 +1288,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG // tw = *win; // int32_t startPos = // getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1); -// assert(startPos >= 0); // // // set the abort info // pQueryAttr->pos = startPos; @@ -1329,11 +1322,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG // static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) { // STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; -// if (QUERY_IS_ASC_QUERY(pQueryAttr)) { -// assert(*start <= pRuntimeEnv->current->lastKey); -// } else { -// assert(*start >= pRuntimeEnv->current->lastKey); -// } // // // if queried with value filter, do NOT forward query start position // if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || @@ -1347,8 +1335,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG // value is // * not valid. otherwise, we only forward pQueryAttr->limit.offset number of points // */ -// assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL); -// // STimeWindow w = TSWINDOW_INITIALIZER; // bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); // @@ -1418,8 +1404,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG // tw = win; // int32_t startPos = // getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1); -// assert(startPos >= 0); -// // // set the abort info // pQueryAttr->pos = startPos; // pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos]; @@ -1441,10 +1425,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG // } int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) { - if (p->pDownstream == NULL) { - assert(p->numOfDownstream == 0); - } - p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES); if (p->pDownstream == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -1800,7 +1780,10 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo } void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) { - ASSERT(numOfRows != 0); + if (numOfRows == 0) { + numOfRows = 4096; + } + pResultInfo->capacity = numOfRows; pResultInfo->threshold = numOfRows * 0.75; @@ -1941,7 +1924,6 @@ _error: } void cleanupBasicInfo(SOptrBasicInfo* pInfo) { - assert(pInfo != NULL); pInfo->pRes = blockDataDestroy(pInfo->pRes); } @@ -2022,7 +2004,12 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, tDecoderClear(&mr.coder); tb_uid_t suid = mr.me.ctbEntry.suid; - metaGetTableEntryByUidCache(&mr, suid); + code = metaGetTableEntryByUidCache(&mr, suid); + if (code != TSDB_CODE_SUCCESS) { + metaReaderClear(&mr); + return terrno; + } + pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version; } else { @@ -2248,7 +2235,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo); } else { - ASSERT(0); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; } if (pOperator != NULL) { @@ -2340,7 +2328,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) { pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else { - ASSERT(0); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; } taosMemoryFree(ops); @@ -2578,7 +2567,6 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul return TSDB_CODE_OUT_OF_MEMORY; } *pResult = (SResultRow*)value; - ASSERT(*pResult); // set time window for current result (*pResult)->win = (*win); setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 16983cb507..f7de84d085 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -193,8 +193,6 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { return pResBlock; } } else if (pInfo->existNewGroupBlock) { // try next group - assert(pBlock != NULL); - blockDataCleanup(pResBlock); doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 9d88126220..4e17e1a3f2 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -204,7 +204,6 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData } static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) { - ASSERT(pKey != NULL); size_t numOfGroupCols = taosArrayGetSize(pGroupColVals); char* isNull = (char*)pKey; @@ -570,7 +569,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } (*columnLen) += contentLen; - ASSERT(*columnLen >= 0); } (*rows) += 1; @@ -681,7 +679,6 @@ static int compareDataGroupInfo(const void* group1, const void* group2) { const SDataGroupInfo* pGroupInfo2 = group2; if (pGroupInfo1->groupId == pGroupInfo2->groupId) { - ASSERT(0); return 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c06fc40b9b..1c3dafd0e1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3019,8 +3019,8 @@ int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCoun } } } else { - strncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN); - strncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN); + tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN); + tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN); } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 98ef6b8a36..3db29f40fd 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -768,8 +768,6 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); int32_t rowSize = pInfo->binfo.pRes->info.rowSize; - ASSERT(rowSize < 100 * 1024 * 1024); - int32_t numOfOutputCols = 0; code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 90f20f40b8..a656b8e3fd 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -213,6 +213,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { void* pPage = getNewBufPage(pHandle->pBuf, &pageId); if (pPage == NULL) { blockDataDestroy(p); + taosArrayDestroy(pPageIdList); return terrno; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9e74825a1f..829db74d3f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2310,6 +2310,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); pCtx->epSet = *pEpSet; + pCtx->origEpSet = *pEpSet; pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; diff --git a/source/util/src/tutil.c b/source/util/src/tutil.c index 55d7d4f6e7..1f9ca7407e 100644 --- a/source/util/src/tutil.c +++ b/source/util/src/tutil.c @@ -118,7 +118,7 @@ char **strsplit(char *z, const char *delim, int32_t *num) { if ((*num) >= size) { size = (size << 1); split = taosMemoryRealloc(split, POINTER_BYTES * size); - ASSERTS(NULL != split, "realloc memory failed. size=%d", POINTER_BYTES * size); + ASSERTS(NULL != split, "realloc memory failed. size=%d", (int32_t) POINTER_BYTES * size); } } diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index 9c1dc2e063..1acf50a7d8 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -400,7 +400,7 @@ TAOS* createNewTaosConnect() { int32_t retryCnt = 10; while (retryCnt--) { - TAOS* taos = taos_connect(NULL, "root", "taosdata", NULL, 0); + taos = taos_connect(NULL, "root", "taosdata", NULL, 0); if (NULL != taos) { return taos; } @@ -780,7 +780,8 @@ void loop_consume(SThreadInfo* pInfo) { if (pInfo->ifCheckData) { char filename[256] = {0}; - char tmpString[128]; + memset(tmpString, 0, tListLen(tmpString)); + // sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId, // getCurrentTimeString(tmpString)); sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId); @@ -834,12 +835,12 @@ void loop_consume(SThreadInfo* pInfo) { } if ((totalRows >= pInfo->expectMsgCnt) || (totalMsgs >= pInfo->expectMsgCnt)) { - char tmpString[128]; + memset(tmpString, 0, tListLen(tmpString)); taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString)); break; } } else { - char tmpString[128]; + memset(tmpString, 0, tListLen(tmpString)); taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString)); break; } @@ -1113,7 +1114,7 @@ void omb_loop_consume(SThreadInfo* pInfo) { lastTotalLenOfMsg = totalLenOfMsg; } } else { - char tmpString[128]; + memset(tmpString, 0, tListLen(tmpString)); taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString)); printf("%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString)); int64_t currentPrintTime = taosGetTimestampMs(); @@ -1381,7 +1382,7 @@ void startOmbConsume() { printf("SQL: %s\n", sql); queryDbExec(taos, sql, NO_INSERT_TYPE); - int32_t producerRate = ceil(g_stConfInfo.producerRate / g_stConfInfo.producers); + int32_t producerRate = ceil(((double)g_stConfInfo.producerRate) / g_stConfInfo.producers); printf("==== create %d produce thread ====\n", g_stConfInfo.producers); for (int32_t i = 0; i < g_stConfInfo.producers; ++i) {