diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 980c86b877..25e4b5c193 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -26,6 +26,10 @@ static void msg_process(TAOS_RES* msg) { printf("topic: %s\n", tmq_get_topic_name(msg)); printf("db: %s\n", tmq_get_db_name(msg)); printf("vg: %d\n", tmq_get_vgroup_id(msg)); + if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { + printf("meta, skip\n"); + return; + } while (1) { TAOS_ROW row = taos_fetch_row(msg); if (row == NULL) break; @@ -129,8 +133,8 @@ int32_t create_topic() { } taos_free_result(pRes); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as database abc1");*/ - pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); + pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); + /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/ if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; @@ -191,7 +195,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "enable.auto.commit", "true"); - tmq_conf_set(conf, "experiment.use.snapshot", "true"); + tmq_conf_set(conf, "experiment.use.snapshot", "false"); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index ab4d14b1cc..22747425f0 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -290,6 +290,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { pResultInfo->current += 1; return pResultInfo->row; } + } else if (TD_RES_TMQ_META(res)) { + return NULL; } else { // assert to avoid un-initialization error ASSERT(0); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 949965296f..6892a4d1a7 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1160,8 +1160,6 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { // handle meta rsp int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; - if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { - } SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); if (pRspWrapper == NULL) { @@ -1174,19 +1172,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { pRspWrapper->vgHandle = pVg; pRspWrapper->topicHandle = pTopic; - memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); - if (rspType == TMQ_MSG_TYPE__POLL_RSP) { + memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp); } else { ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP); + memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp); } taosMemoryFree(pMsg->pData); - tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId, - pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset); + tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld, type %d", tmq->consumerId, pVg->vgId, + pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset, rspType); taosWriteQitem(tmq->mqueue, pRspWrapper); tsem_post(&tmq->rspSem); @@ -1558,7 +1556,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj)); - pRspObj->resType = RES_TYPE__TMQ; + pRspObj->resType = RES_TYPE__TMQ_META; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); pRspObj->vgId = pWrapper->vgHandle->vgId; @@ -1676,7 +1674,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) return 0; } -SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { +void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { while (1) { SMqRspWrapper* rspWrapper = NULL; taosGetQitem(tmq->qall, (void**)&rspWrapper); @@ -1716,18 +1714,18 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); - if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { + if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ - pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset; + pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // build rsp - SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); + SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); taosFreeQitem(pollRspWrapper); return pRsp; } else { tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", - pollRspWrapper->dataRsp.head.epoch, consumerEpoch); + pollRspWrapper->metaRsp.head.epoch, consumerEpoch); taosFreeQitem(pollRspWrapper); } } else { @@ -1744,8 +1742,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { - SMqRspObj* rspObj; - int64_t startTime = taosGetTimestampMs(); + void* rspObj; + int64_t startTime = taosGetTimestampMs(); #if 0 tmqHandleAllDelayedTask(tmq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b23ec13105..b428e9aa9f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -387,6 +387,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->epoch = -1; pHandle->execHandle.subType = req.subType; + pHandle->fetchMeta = req.withMeta; pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal); for (int32_t i = 0; i < 5; i++) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 286bcea820..91931c2fd8 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -340,7 +340,7 @@ typedef struct SStreamBlockScanInfo { SReadHandle readHandle; uint64_t tableUid; // queried super table uid EStreamScanMode scanMode; - SOperatorInfo* pOperatorDumy; + SOperatorInfo* pSnapshotReadOp; SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SArray* childIds; SessionWindowSupporter sessionSup; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a703be86e4..80dd01cf4c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -337,7 +337,8 @@ void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_ } for (int32_t i = 0; i < pBlock->info.rows; ++i) { - colDataAppend(pColInfoData, i, data, (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); + colDataAppend(pColInfoData, i, data, + (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); } if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL && @@ -776,7 +777,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { if (!needRead) { return false; } - STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info; + STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info; pTableScanInfo->cond.twindows[0] = win; pTableScanInfo->curTWinIdx = 0; tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); @@ -821,11 +822,11 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) { while (1) { SSDataBlock* pResult = NULL; - pResult = doTableScan(pInfo->pOperatorDumy); + pResult = doTableScan(pInfo->pSnapshotReadOp); if (pResult == NULL) { if (prepareDataScan(pInfo)) { // scan next window data - pResult = doTableScan(pInfo->pOperatorDumy); + pResult = doTableScan(pInfo->pSnapshotReadOp); } } if (!pResult) { @@ -860,11 +861,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa ASSERT(pBlock->info.numOfCols == pUpdateBlock->info.numOfCols); int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex); - pInfo->groupId = getGroupId(pInfo->pOperatorDumy, pBlock, rowId); + pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId); int32_t i = 0; for (; i < size; i++) { rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex); - uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, rowId); + uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId); if (pInfo->groupId != id) { break; } @@ -1063,7 +1064,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; } else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) { - SSDataBlock* pResult = doTableScan(pInfo->pOperatorDumy); + SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp); if (pResult) { return pResult->info.rows > 0 ? pResult : NULL; } @@ -1135,7 +1136,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan } else { pInfo->pUpdateInfo = NULL; } - pInfo->pOperatorDumy = pTableScanDummy; + pInfo->pSnapshotReadOp = pTableScanDummy; pInfo->interval = pSTInfo->interval; pInfo->readHandle = *pHandle; @@ -1557,7 +1558,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { return NULL; } - int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE : TDMT_MND_SYSTABLE_RETRIEVE; + int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE + : TDMT_MND_SYSTABLE_RETRIEVE; pMsgSendInfo->param = pOperator; pMsgSendInfo->msgInfo.pData = buf1; @@ -1843,7 +1845,8 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { } else { data = (char*)p; } - colDataAppend(pDst, count, data, (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); + colDataAppend(pDst, count, data, + (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) && data != NULL) { @@ -1896,11 +1899,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi goto _error; } - pInfo->pTableList = pTableListInfo; - pInfo->pColMatchInfo = colList; - pInfo->pRes = createResDataBlock(pDescNode); - pInfo->readHandle = *pReadHandle; - pInfo->curPos = 0; + pInfo->pTableList = pTableListInfo; + pInfo->pColMatchInfo = colList; + pInfo->pRes = createResDataBlock(pDescNode); + pInfo->readHandle = *pReadHandle; + pInfo->curPos = 0; pOperator->name = "TagScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;