feat(tmq): add with meta config
This commit is contained in:
parent
2af4a4d250
commit
71bda7e658
|
@ -26,6 +26,10 @@ static void msg_process(TAOS_RES* msg) {
|
||||||
printf("topic: %s\n", tmq_get_topic_name(msg));
|
printf("topic: %s\n", tmq_get_topic_name(msg));
|
||||||
printf("db: %s\n", tmq_get_db_name(msg));
|
printf("db: %s\n", tmq_get_db_name(msg));
|
||||||
printf("vg: %d\n", tmq_get_vgroup_id(msg));
|
printf("vg: %d\n", tmq_get_vgroup_id(msg));
|
||||||
|
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
|
||||||
|
printf("meta, skip\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
while (1) {
|
while (1) {
|
||||||
TAOS_ROW row = taos_fetch_row(msg);
|
TAOS_ROW row = taos_fetch_row(msg);
|
||||||
if (row == NULL) break;
|
if (row == NULL) break;
|
||||||
|
@ -129,8 +133,8 @@ int32_t create_topic() {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
/*pRes = taos_query(pConn, "create topic topic_ctb_column as database abc1");*/
|
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
|
||||||
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
|
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -191,7 +195,7 @@ tmq_t* build_consumer() {
|
||||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
|
|
||||||
tmq_conf_set(conf, "experiment.use.snapshot", "true");
|
tmq_conf_set(conf, "experiment.use.snapshot", "false");
|
||||||
|
|
||||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||||
|
|
|
@ -290,6 +290,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
pResultInfo->current += 1;
|
pResultInfo->current += 1;
|
||||||
return pResultInfo->row;
|
return pResultInfo->row;
|
||||||
}
|
}
|
||||||
|
} else if (TD_RES_TMQ_META(res)) {
|
||||||
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
// assert to avoid un-initialization error
|
// assert to avoid un-initialization error
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
|
@ -1160,8 +1160,6 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
|
|
||||||
// handle meta rsp
|
// handle meta rsp
|
||||||
int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
|
int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
|
||||||
if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
|
||||||
}
|
|
||||||
|
|
||||||
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
|
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
|
||||||
if (pRspWrapper == NULL) {
|
if (pRspWrapper == NULL) {
|
||||||
|
@ -1174,19 +1172,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
pRspWrapper->vgHandle = pVg;
|
pRspWrapper->vgHandle = pVg;
|
||||||
pRspWrapper->topicHandle = pTopic;
|
pRspWrapper->topicHandle = pTopic;
|
||||||
|
|
||||||
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
|
|
||||||
|
|
||||||
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
|
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
|
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||||
tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);
|
tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
|
ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
|
||||||
|
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||||
tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
|
tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pMsg->pData);
|
taosMemoryFree(pMsg->pData);
|
||||||
|
|
||||||
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
|
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld, type %d", tmq->consumerId, pVg->vgId,
|
||||||
pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset);
|
pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset, rspType);
|
||||||
|
|
||||||
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
||||||
tsem_post(&tmq->rspSem);
|
tsem_post(&tmq->rspSem);
|
||||||
|
@ -1558,7 +1556,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
|
||||||
|
|
||||||
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||||
SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
|
SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
|
||||||
pRspObj->resType = RES_TYPE__TMQ;
|
pRspObj->resType = RES_TYPE__TMQ_META;
|
||||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||||
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
||||||
pRspObj->vgId = pWrapper->vgHandle->vgId;
|
pRspObj->vgId = pWrapper->vgHandle->vgId;
|
||||||
|
@ -1676,7 +1674,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
while (1) {
|
while (1) {
|
||||||
SMqRspWrapper* rspWrapper = NULL;
|
SMqRspWrapper* rspWrapper = NULL;
|
||||||
taosGetQitem(tmq->qall, (void**)&rspWrapper);
|
taosGetQitem(tmq->qall, (void**)&rspWrapper);
|
||||||
|
@ -1716,18 +1714,18 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
|
||||||
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
||||||
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
|
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
|
||||||
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
||||||
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
|
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
|
||||||
pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
|
pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
// build rsp
|
// build rsp
|
||||||
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
|
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
} else {
|
} else {
|
||||||
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
|
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
|
||||||
pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
|
pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1744,8 +1742,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
SMqRspObj* rspObj;
|
void* rspObj;
|
||||||
int64_t startTime = taosGetTimestampMs();
|
int64_t startTime = taosGetTimestampMs();
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
tmqHandleAllDelayedTask(tmq);
|
tmqHandleAllDelayedTask(tmq);
|
||||||
|
|
|
@ -387,6 +387,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
pHandle->epoch = -1;
|
pHandle->epoch = -1;
|
||||||
|
|
||||||
pHandle->execHandle.subType = req.subType;
|
pHandle->execHandle.subType = req.subType;
|
||||||
|
pHandle->fetchMeta = req.withMeta;
|
||||||
|
|
||||||
pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
||||||
for (int32_t i = 0; i < 5; i++) {
|
for (int32_t i = 0; i < 5; i++) {
|
||||||
|
|
|
@ -340,7 +340,7 @@ typedef struct SStreamBlockScanInfo {
|
||||||
SReadHandle readHandle;
|
SReadHandle readHandle;
|
||||||
uint64_t tableUid; // queried super table uid
|
uint64_t tableUid; // queried super table uid
|
||||||
EStreamScanMode scanMode;
|
EStreamScanMode scanMode;
|
||||||
SOperatorInfo* pOperatorDumy;
|
SOperatorInfo* pSnapshotReadOp;
|
||||||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
|
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
|
||||||
SArray* childIds;
|
SArray* childIds;
|
||||||
SessionWindowSupporter sessionSup;
|
SessionWindowSupporter sessionSup;
|
||||||
|
|
|
@ -337,7 +337,8 @@ void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
colDataAppend(pColInfoData, i, data, (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
|
colDataAppend(pColInfoData, i, data,
|
||||||
|
(data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&
|
if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&
|
||||||
|
@ -776,7 +777,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
|
||||||
if (!needRead) {
|
if (!needRead) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info;
|
STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
|
||||||
pTableScanInfo->cond.twindows[0] = win;
|
pTableScanInfo->cond.twindows[0] = win;
|
||||||
pTableScanInfo->curTWinIdx = 0;
|
pTableScanInfo->curTWinIdx = 0;
|
||||||
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
||||||
|
@ -821,11 +822,11 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_
|
||||||
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
|
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pResult = NULL;
|
SSDataBlock* pResult = NULL;
|
||||||
pResult = doTableScan(pInfo->pOperatorDumy);
|
pResult = doTableScan(pInfo->pSnapshotReadOp);
|
||||||
if (pResult == NULL) {
|
if (pResult == NULL) {
|
||||||
if (prepareDataScan(pInfo)) {
|
if (prepareDataScan(pInfo)) {
|
||||||
// scan next window data
|
// scan next window data
|
||||||
pResult = doTableScan(pInfo->pOperatorDumy);
|
pResult = doTableScan(pInfo->pSnapshotReadOp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!pResult) {
|
if (!pResult) {
|
||||||
|
@ -860,11 +861,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
|
||||||
ASSERT(pBlock->info.numOfCols == pUpdateBlock->info.numOfCols);
|
ASSERT(pBlock->info.numOfCols == pUpdateBlock->info.numOfCols);
|
||||||
|
|
||||||
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
|
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
|
||||||
pInfo->groupId = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
|
pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId);
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
for (; i < size; i++) {
|
for (; i < size; i++) {
|
||||||
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
|
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
|
||||||
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
|
uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId);
|
||||||
if (pInfo->groupId != id) {
|
if (pInfo->groupId != id) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1063,7 +1064,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
||||||
} else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) {
|
} else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) {
|
||||||
SSDataBlock* pResult = doTableScan(pInfo->pOperatorDumy);
|
SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp);
|
||||||
if (pResult) {
|
if (pResult) {
|
||||||
return pResult->info.rows > 0 ? pResult : NULL;
|
return pResult->info.rows > 0 ? pResult : NULL;
|
||||||
}
|
}
|
||||||
|
@ -1135,7 +1136,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
||||||
} else {
|
} else {
|
||||||
pInfo->pUpdateInfo = NULL;
|
pInfo->pUpdateInfo = NULL;
|
||||||
}
|
}
|
||||||
pInfo->pOperatorDumy = pTableScanDummy;
|
pInfo->pSnapshotReadOp = pTableScanDummy;
|
||||||
pInfo->interval = pSTInfo->interval;
|
pInfo->interval = pSTInfo->interval;
|
||||||
|
|
||||||
pInfo->readHandle = *pHandle;
|
pInfo->readHandle = *pHandle;
|
||||||
|
@ -1557,7 +1558,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE : TDMT_MND_SYSTABLE_RETRIEVE;
|
int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE
|
||||||
|
: TDMT_MND_SYSTABLE_RETRIEVE;
|
||||||
|
|
||||||
pMsgSendInfo->param = pOperator;
|
pMsgSendInfo->param = pOperator;
|
||||||
pMsgSendInfo->msgInfo.pData = buf1;
|
pMsgSendInfo->msgInfo.pData = buf1;
|
||||||
|
@ -1843,7 +1845,8 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
} else {
|
} else {
|
||||||
data = (char*)p;
|
data = (char*)p;
|
||||||
}
|
}
|
||||||
colDataAppend(pDst, count, data, (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
|
colDataAppend(pDst, count, data,
|
||||||
|
(data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
|
||||||
|
|
||||||
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
|
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
|
||||||
data != NULL) {
|
data != NULL) {
|
||||||
|
@ -1896,11 +1899,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pTableList = pTableListInfo;
|
pInfo->pTableList = pTableListInfo;
|
||||||
pInfo->pColMatchInfo = colList;
|
pInfo->pColMatchInfo = colList;
|
||||||
pInfo->pRes = createResDataBlock(pDescNode);
|
pInfo->pRes = createResDataBlock(pDescNode);
|
||||||
pInfo->readHandle = *pReadHandle;
|
pInfo->readHandle = *pReadHandle;
|
||||||
pInfo->curPos = 0;
|
pInfo->curPos = 0;
|
||||||
pOperator->name = "TagScanOperator";
|
pOperator->name = "TagScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue