diff --git a/cmake/bdb_CMakeLists.txt.in b/cmake/bdb_CMakeLists.txt.in.bak similarity index 100% rename from cmake/bdb_CMakeLists.txt.in rename to cmake/bdb_CMakeLists.txt.in.bak diff --git a/cmake/cmake.options b/cmake/cmake.options index 946eb5d258..b51f096185 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -55,17 +55,6 @@ option( OFF ) -IF(${TD_WINDOWS}) - MESSAGE("Not build BDB on Windows") -ELSE () - option( - BUILD_WITH_BDB - "If build with BerkleyDB" - ON - ) - -ENDIF () - option( BUILD_WITH_LUCENE "If build with lucene" diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 575a0e6274..377e3fbccc 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -63,9 +63,9 @@ if(${BUILD_WITH_UV}) endif(${BUILD_WITH_UV}) # bdb -if(${BUILD_WITH_BDB}) - cat("${CMAKE_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -endif(${BUILD_WITH_BDB}) +#if(${BUILD_WITH_BDB}) + #cat("${CMAKE_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) +#endif(${BUILD_WITH_BDB}) # sqlite if(${BUILD_WITH_SQLITE}) diff --git a/contrib/test/CMakeLists.txt b/contrib/test/CMakeLists.txt index eacaeb9524..740488b39b 100644 --- a/contrib/test/CMakeLists.txt +++ b/contrib/test/CMakeLists.txt @@ -7,9 +7,9 @@ if(${BUILD_WITH_LUCENE}) add_subdirectory(lucene) endif(${BUILD_WITH_LUCENE}) -if(${BUILD_WITH_BDB}) - add_subdirectory(bdb) -endif(${BUILD_WITH_BDB}) +#if(${BUILD_WITH_BDB}) + #add_subdirectory(bdb) +#endif(${BUILD_WITH_BDB}) if(${BUILD_WITH_SQLITE}) add_subdirectory(sqlite) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4efb68d490..bef323cdba 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2310,6 +2310,7 @@ typedef struct { char cgroup[TSDB_CGROUP_LEN]; int64_t currentOffset; + uint64_t reqId; char topic[TSDB_TOPIC_FNAME_LEN]; } SMqPollReq; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index a6e5fee2d1..5893a14bd5 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1149,6 +1149,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo pReq->consumerId = tmq->consumerId; pReq->epoch = tmq->epoch; pReq->currentOffset = reqOffset; + pReq->reqId = generateRequestId(); pReq->head.vgId = htonl(pVg->vgId); pReq->head.contLen = htonl(sizeof(SMqPollReq)); @@ -1279,7 +1280,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { .len = sizeof(SMqPollReq), .handle = NULL, }; - sendInfo->requestId = generateRequestId(); + sendInfo->requestId = pReq->reqId; sendInfo->requestObjRefId = 0; sendInfo->param = pParam; sendInfo->fp = tmqPollCb; @@ -1288,7 +1289,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { int64_t transporterId = 0; /*printf("send poll\n");*/ atomic_add_fetch_32(&tmq->waitingRequest, 1); - tscDebug("consumer %ld send poll: vg %d, epoch %d, req offset %ld", tmq->consumerId, pVg->vgId, tmq->epoch, pVg->currentOffset); + tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId); /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 8078338238..dbd5e43b6d 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -60,7 +60,7 @@ static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg); static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, const SMqConsumerEp *pConsumerEp); -static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp); +static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* topicName); static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* oldTopicName); int32_t mndInitSubscribe(SMnode *pMnode) { @@ -102,12 +102,13 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj return pSub; } -static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) { +static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp, const char* topicName) { SMqMVRebReq req = { .vgId = pConsumerEp->vgId, .oldConsumerId = pConsumerEp->oldConsumerId, .newConsumerId = pConsumerEp->consumerId, }; + req.topic = strdup(topicName); int32_t tlen = tEncodeSMqMVRebReq(NULL, &req); void *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen); @@ -122,6 +123,7 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncodeSMqMVRebReq(&abuf, &req); + taosMemoryFree(req.topic); *pBuf = buf; *pLen = tlen; @@ -129,12 +131,12 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume return 0; } -static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { +static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* topicName) { ASSERT(pConsumerEp->oldConsumerId != -1); void *buf; int32_t tlen; - if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp) < 0) { + if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp, topicName) < 0) { return -1; } @@ -502,10 +504,10 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { pConsumerEp->epoch = 0; taosArrayPush(pSubConsumer->vgInfo, pConsumerEp); + char topic[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CGROUP_LEN]; + mndSplitSubscribeKey(pSub->key, topic, cgroup); if (pConsumerEp->oldConsumerId == -1) { - char topic[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pSub->key, topic, cgroup); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 " cgroup: %s", pConsumerEp->vgId, @@ -517,7 +519,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "", pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId); - mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); + mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, topic); } } } @@ -849,7 +851,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { pConsumerEp->consumerId); mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); } else { - mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); + mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, newTopicName); } // to trigger rebalance at once, do not set status active /*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/ diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 28e19dc017..b5fe7d460f 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -68,7 +68,7 @@ target_link_libraries( PUBLIC executor PUBLIC scheduler PUBLIC tdb - PUBLIC bdb + #PUBLIC bdb PUBLIC transport PUBLIC stream ) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 28d71fb842..f168695383 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -316,7 +316,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { return 0; } - vDebug("poll topic %s from consumer %ld (epoch %d)", pTopic->topicName, consumerId, pReq->epoch); + vDebug("poll topic %s from consumer %ld (epoch %d) %s", pTopic->topicName, consumerId, pReq->epoch, pTopic->topicName); rsp.reqOffset = pReq->currentOffset; rsp.skipLogNum = 0; @@ -334,12 +334,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // TODO: no more log, set timer to wait blocking time // if data inserted during waiting, launch query and // response to user - vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset); + vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset); break; } - vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset, pHead->msgType); + vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, pHead->msgType); /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*pHead = pTopic->pReadhandle->pHead;*/ if (pHead->msgType == TDMT_VND_SUBMIT) { @@ -363,8 +361,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } if (taosArrayGetSize(pRes) == 0) { - vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, - pReq->epoch, pTq->pVnode->vgId, fetchOffset); + vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset); fetchOffset++; rsp.skipLogNum++; taosArrayDestroy(pRes); @@ -393,8 +390,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; - vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, - pHead->msgType, consumerId, pReq->epoch); + vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, pHead->msgType, consumerId, pReq->epoch); tmsgSendRsp(pMsg); taosMemoryFree(pHead); return 0; @@ -425,8 +421,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, - pReq->epoch); + vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, pReq->epoch); /*}*/ return 0; @@ -437,7 +432,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) { terrno = TSDB_CODE_SUCCESS; tDecodeSMqMVRebReq(msg, &req); - vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId, req.newConsumerId); + vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId ,req.newConsumerId); STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId); ASSERT(pConsumer); ASSERT(pConsumer->consumerId == req.oldConsumerId); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index a18374015b..fee8bdf27e 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -167,8 +167,8 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) { break; } - if (colDataAppend(pColData, curRow, sVal.val, false) < 0) { - /*if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {*/ + /*if (colDataAppend(pColData, curRow, sVal.val, false) < 0) {*/ + if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) { taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); return NULL; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d633284e13..dc7d33a5a9 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -944,8 +944,8 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; - tDebug("send request at thread:%d %p, dst: %s:%d", index, pMsg, ip, port); - transSendAsync(thrd->asyncPool, &(cliMsg->q)); + tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pMsg, ip, port, pMsg->ahandle); + ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0); } void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) { diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 22d8c2b735..7372745cb8 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -274,17 +274,11 @@ int main(int32_t argc, char *argv[]) { loop_consume(tmq); - err = tmq_unsubscribe(tmq); - ASSERT(err == TMQ_RESP_ERR__SUCCESS); - - - #if 0 err = tmq_unsubscribe(tmq); if (err) { printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); exit(-1); } - #endif return 0; }