Merge pull request #14166 from taosdata/feature/stream

feat(tmq): add with meta config
This commit is contained in:
Liu Jicong 2022-06-23 16:54:09 +08:00 committed by GitHub
commit 1d6682c535
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 41 additions and 33 deletions

View File

@ -26,6 +26,10 @@ static void msg_process(TAOS_RES* msg) {
printf("topic: %s\n", tmq_get_topic_name(msg));
printf("db: %s\n", tmq_get_db_name(msg));
printf("vg: %d\n", tmq_get_vgroup_id(msg));
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
printf("meta, skip\n");
return;
}
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
@ -129,8 +133,8 @@ int32_t create_topic() {
}
taos_free_result(pRes);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as database abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
@ -191,7 +195,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "experiment.use.snapshot", "true");
tmq_conf_set(conf, "experiment.use.snapshot", "false");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);

View File

@ -290,6 +290,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
pResultInfo->current += 1;
return pResultInfo->row;
}
} else if (TD_RES_TMQ_META(res)) {
return NULL;
} else {
// assert to avoid un-initialization error
ASSERT(0);

View File

@ -1160,8 +1160,6 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
// handle meta rsp
int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
}
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
if (pRspWrapper == NULL) {
@ -1174,19 +1172,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
pRspWrapper->vgHandle = pVg;
pRspWrapper->topicHandle = pTopic;
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);
} else {
ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
}
taosMemoryFree(pMsg->pData);
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset);
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld, type %d", tmq->consumerId, pVg->vgId,
pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset, rspType);
taosWriteQitem(tmq->mqueue, pRspWrapper);
tsem_post(&tmq->rspSem);
@ -1558,7 +1556,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
pRspObj->resType = RES_TYPE__TMQ;
pRspObj->resType = RES_TYPE__TMQ_META;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId;
@ -1676,7 +1674,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
return 0;
}
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
while (1) {
SMqRspWrapper* rspWrapper = NULL;
taosGetQitem(tmq->qall, (void**)&rspWrapper);
@ -1716,18 +1714,18 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// build rsp
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
taosFreeQitem(pollRspWrapper);
}
} else {
@ -1744,8 +1742,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
SMqRspObj* rspObj;
int64_t startTime = taosGetTimestampMs();
void* rspObj;
int64_t startTime = taosGetTimestampMs();
#if 0
tmqHandleAllDelayedTask(tmq);

View File

@ -387,6 +387,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->epoch = -1;
pHandle->execHandle.subType = req.subType;
pHandle->fetchMeta = req.withMeta;
pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
for (int32_t i = 0; i < 5; i++) {

View File

@ -340,7 +340,7 @@ typedef struct SStreamBlockScanInfo {
SReadHandle readHandle;
uint64_t tableUid; // queried super table uid
EStreamScanMode scanMode;
SOperatorInfo* pOperatorDumy;
SOperatorInfo* pSnapshotReadOp;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SArray* childIds;
SessionWindowSupporter sessionSup;

View File

@ -337,7 +337,8 @@ void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_
}
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, data, (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
colDataAppend(pColInfoData, i, data,
(data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
}
if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&
@ -776,7 +777,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
if (!needRead) {
return false;
}
STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info;
STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
pTableScanInfo->cond.twindows[0] = win;
pTableScanInfo->curTWinIdx = 0;
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
@ -821,11 +822,11 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
while (1) {
SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pOperatorDumy);
pResult = doTableScan(pInfo->pSnapshotReadOp);
if (pResult == NULL) {
if (prepareDataScan(pInfo)) {
// scan next window data
pResult = doTableScan(pInfo->pOperatorDumy);
pResult = doTableScan(pInfo->pSnapshotReadOp);
}
}
if (!pResult) {
@ -860,11 +861,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
ASSERT(pBlock->info.numOfCols == pUpdateBlock->info.numOfCols);
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
pInfo->groupId = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId);
int32_t i = 0;
for (; i < size; i++) {
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId);
if (pInfo->groupId != id) {
break;
}
@ -1063,7 +1064,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
} else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) {
SSDataBlock* pResult = doTableScan(pInfo->pOperatorDumy);
SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp);
if (pResult) {
return pResult->info.rows > 0 ? pResult : NULL;
}
@ -1135,7 +1136,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
} else {
pInfo->pUpdateInfo = NULL;
}
pInfo->pOperatorDumy = pTableScanDummy;
pInfo->pSnapshotReadOp = pTableScanDummy;
pInfo->interval = pSTInfo->interval;
pInfo->readHandle = *pHandle;
@ -1557,7 +1558,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
return NULL;
}
int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE : TDMT_MND_SYSTABLE_RETRIEVE;
int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE
: TDMT_MND_SYSTABLE_RETRIEVE;
pMsgSendInfo->param = pOperator;
pMsgSendInfo->msgInfo.pData = buf1;
@ -1843,7 +1845,8 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
} else {
data = (char*)p;
}
colDataAppend(pDst, count, data, (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
colDataAppend(pDst, count, data,
(data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
data != NULL) {
@ -1896,11 +1899,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
goto _error;
}
pInfo->pTableList = pTableListInfo;
pInfo->pColMatchInfo = colList;
pInfo->pRes = createResDataBlock(pDescNode);
pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0;
pInfo->pTableList = pTableListInfo;
pInfo->pColMatchInfo = colList;
pInfo->pRes = createResDataBlock(pDescNode);
pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0;
pOperator->name = "TagScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;