Merge pull request #29885 from taosdata/feat/TD-33798

feat:[TD-33798]send batch metadata & remove data if subscribe only meta
This commit is contained in:
Shengliang Guan 2025-02-24 19:32:26 +08:00 committed by GitHub
commit 8a324a45d5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 474 additions and 112 deletions

View File

@ -188,7 +188,7 @@ char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/";
// tmq
int32_t tmqMaxTopicNum = 20;
int32_t tmqRowSize = 4096;
int32_t tmqRowSize = 1000;
// query
int32_t tsQueryPolicy = 1;
bool tsQueryTbNotExistAsEmpty = false;

View File

@ -1098,6 +1098,22 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block
*pSubmitTbDataRet = pSubmitTbData;
}
if (fetchMeta == ONLY_META) {
if (pSubmitTbData->pCreateTbReq != NULL) {
if (pRsp->createTableReq == NULL){
pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
if (pRsp->createTableReq == NULL){
return terrno;
}
}
if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL){
return terrno;
}
pSubmitTbData->pCreateTbReq = NULL;
}
return 0;
}
int32_t sversion = pSubmitTbData->sver;
int64_t uid = pSubmitTbData->uid;
pReader->lastBlkUid = uid;

View File

@ -339,6 +339,9 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
bool tmp = (pSubmitTbData->flags & pRequest->sourceExcluded) != 0;
TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
if (pHandle->fetchMeta == ONLY_META){
goto END;
}
int32_t blockNum = taosArrayGetSize(pBlocks) == 0 ? 1 : taosArrayGetSize(pBlocks);
if (pRsp->withTbName) {
@ -347,7 +350,6 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
TSDB_CHECK_CODE(code, lino, END);
}
tmp = (pHandle->fetchMeta == ONLY_META && pSubmitTbData->pCreateTbReq == NULL);
TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
for (int32_t i = 0; i < blockNum; i++) {
if (taosArrayGetSize(pBlocks) == 0){
@ -403,14 +405,16 @@ static void preProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest,
}
int64_t uid = pSubmitTbData->uid;
if (taosHashGet(pRequest->uidHash, &uid, LONG_BYTES) != NULL) {
tqDebug("poll rawdata split,uid:%" PRId64 " is already exists", uid);
terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
return;
} else {
int32_t code = taosHashPut(pRequest->uidHash, &uid, LONG_BYTES, &uid, LONG_BYTES);
if (code != 0){
tqError("failed to add table uid to hash, code:%d, uid:%"PRId64, code, uid);
if (pRequest->rawData) {
if (taosHashGet(pRequest->uidHash, &uid, LONG_BYTES) != NULL) {
tqDebug("poll rawdata split,uid:%" PRId64 " is already exists", uid);
terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
return;
} else {
int32_t code = taosHashPut(pRequest->uidHash, &uid, LONG_BYTES, &uid, LONG_BYTES);
if (code != 0) {
tqError("failed to add table uid to hash, code:%d, uid:%" PRId64, code, uid);
}
}
}
@ -453,9 +457,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData
}
code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver, rawList);
TSDB_CHECK_CODE(code, lino, END);
if (pRequest->rawData) {
preProcessSubmitMsg(pHandle, pRequest, &rawList);
}
preProcessSubmitMsg(pHandle, pRequest, &rawList);
// data could not contains same uid data in rawdata mode
if (pRequest->rawData != 0 && terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
goto END;

View File

@ -226,6 +226,82 @@ static void tDeleteCommon(void* parm) {}
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \
(pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP)
static int32_t buildBatchMeta(SMqBatchMetaRsp *btMetaRsp, int16_t type, int32_t bodyLen, void* body){
int32_t code = 0;
if (!btMetaRsp->batchMetaReq) {
btMetaRsp->batchMetaReq = taosArrayInit(4, POINTER_BYTES);
TQ_NULL_GO_TO_END(btMetaRsp->batchMetaReq);
btMetaRsp->batchMetaLen = taosArrayInit(4, sizeof(int32_t));
TQ_NULL_GO_TO_END(btMetaRsp->batchMetaLen);
}
SMqMetaRsp tmpMetaRsp = {0};
tmpMetaRsp.resMsgType = type;
tmpMetaRsp.metaRspLen = bodyLen;
tmpMetaRsp.metaRsp = body;
uint32_t len = 0;
tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
if (TSDB_CODE_SUCCESS != code) {
tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
goto END;
}
int32_t tLen = sizeof(SMqRspHead) + len;
void* tBuf = taosMemoryCalloc(1, tLen);
TQ_NULL_GO_TO_END(tBuf);
void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
SEncoder encoder = {0};
tEncoderInit(&encoder, metaBuff, len);
code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
tEncoderClear(&encoder);
if (code < 0) {
tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
goto END;
}
TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaReq, &tBuf));
TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaLen, &tLen));
END:
return code;
}
static int32_t buildCreateTbBatchReqBinary(SMqDataRsp *taosxRsp, void** pBuf, int32_t *len){
int32_t code = 0;
SVCreateTbBatchReq pReq = {0};
pReq.nReqs = taosArrayGetSize(taosxRsp->createTableReq);
pReq.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
TQ_NULL_GO_TO_END(pReq.pArray);
for (int i = 0; i < taosArrayGetSize(taosxRsp->createTableReq); i++){
void *createTableReq = taosArrayGetP(taosxRsp->createTableReq, i);
TQ_NULL_GO_TO_END(taosArrayPush(pReq.pArray, createTableReq));
}
tEncodeSize(tEncodeSVCreateTbBatchReq, &pReq, *len, code);
if (code < 0) {
goto END;
}
*len += sizeof(SMsgHead);
*pBuf = taosMemoryMalloc(*len);
TQ_NULL_GO_TO_END(pBuf);
SEncoder coder = {0};
tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *len);
code = tEncodeSVCreateTbBatchReq(&coder, &pReq);
tEncoderClear(&coder);
END:
taosArrayDestroy(pReq.pArray);
return code;
}
#define SEND_BATCH_META_RSP \
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);\
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);\
goto END;
#define SEND_DATA_RSP \
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);\
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, POLL_RSP_TYPE(pRequest, taosxRsp), vgId);\
goto END;
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* offset) {
int32_t vgId = TD_VID(pTq->pVnode);
@ -272,14 +348,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
if (totalMetaRows > 0) {
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
goto END;
SEND_BATCH_META_RSP
}
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
goto END;
SEND_DATA_RSP
}
SWalCont* pHead = &pHandle->pWalReader->pHead->head;
@ -289,10 +360,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
// process meta
if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
goto END;
SEND_DATA_RSP
}
if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
@ -318,53 +386,20 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
goto END;
}
if (!btMetaRsp.batchMetaReq) {
btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
TQ_NULL_GO_TO_END(btMetaRsp.batchMetaReq);
btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
TQ_NULL_GO_TO_END(btMetaRsp.batchMetaLen);
}
code = buildBatchMeta(&btMetaRsp, pHead->msgType, pHead->bodyLen, pHead->body);
fetchVer++;
SMqMetaRsp tmpMetaRsp = {0};
tmpMetaRsp.resMsgType = pHead->msgType;
tmpMetaRsp.metaRspLen = pHead->bodyLen;
tmpMetaRsp.metaRsp = pHead->body;
uint32_t len = 0;
tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
if (TSDB_CODE_SUCCESS != code) {
tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
continue;
if (code != 0){
goto END;
}
int32_t tLen = sizeof(SMqRspHead) + len;
void* tBuf = taosMemoryCalloc(1, tLen);
TQ_NULL_GO_TO_END(tBuf);
void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
SEncoder encoder = {0};
tEncoderInit(&encoder, metaBuff, len);
code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
tEncoderClear(&encoder);
if (code < 0) {
tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
continue;
}
TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf));
TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaLen, &tLen));
totalMetaRows++;
if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) {
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
goto END;
SEND_BATCH_META_RSP
}
continue;
}
if (totalMetaRows > 0) {
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
goto END;
if (totalMetaRows > 0 && pHandle->fetchMeta != ONLY_META) {
SEND_BATCH_META_RSP
}
// process data
@ -376,17 +411,39 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest));
if (pHandle->fetchMeta == ONLY_META && taosArrayGetSize(taosxRsp.createTableReq) > 0){
int32_t len = 0;
void *pBuf = NULL;
code = buildCreateTbBatchReqBinary(&taosxRsp, &pBuf, &len);
if (code == 0){
code = buildBatchMeta(&btMetaRsp, TDMT_VND_CREATE_TABLE, len, pBuf);
}
taosMemoryFree(pBuf);
taosArrayDestroyP(taosxRsp.createTableReq, NULL);
taosxRsp.createTableReq = NULL;
fetchVer++;
if (code != 0){
goto END;
}
totalMetaRows++;
if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) ||
(taosGetTimestampMs() - st > pRequest->timeout) ||
(!pRequest->enableBatchMeta && !pRequest->useSnapshot)) {
SEND_BATCH_META_RSP
}
continue;
}
if ((pRequest->rawData == 0 && totalRows >= pRequest->minPollRows) ||
(taosGetTimestampMs() - st > pRequest->timeout) ||
(pRequest->rawData != 0 && (taosArrayGetSize(taosxRsp.blockData) > pRequest->minPollRows ||
terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
// tqDebug("start to send rsp, block num:%d, totalRows:%d, createTableNum:%d, terrno:%d",
// (int)taosArrayGetSize(taosxRsp.blockData), totalRows, taosxRsp.createTableNum, terrno);
tqOffsetResetToLog(&taosxRsp.rspOffset, terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT ? fetchVer : fetchVer + 1);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){terrno = 0;}
goto END;
if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
terrno = 0;
} else{
fetchVer++;
}
SEND_DATA_RSP
} else {
fetchVer++;
}

View File

@ -470,8 +470,6 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5466.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td33504.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5473.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5776.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5906.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-32187.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-33225.py

View File

@ -298,7 +298,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5466.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5473.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_c_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-32187.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-33225.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py

View File

@ -29,6 +29,14 @@ class TDTestCase:
tdLog.info(cmdStr)
os.system(cmdStr)
cmdStr = '%s/build/bin/tmq_ts5776'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
cmdStr = '%s/build/bin/tmq_td33798'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
return
def stop(self):

View File

@ -1,39 +0,0 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from taos.tmq import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def run(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_ts5776'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
return
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -8,6 +8,7 @@ add_executable(tmq_td32526 tmq_td32526.c)
add_executable(tmq_td32187 tmq_td32187.c)
add_executable(tmq_ts5776 tmq_ts5776.c)
add_executable(tmq_td32471 tmq_td32471.c)
add_executable(tmq_td33798 tmq_td33798.c)
add_executable(tmq_write_raw_test tmq_write_raw_test.c)
add_executable(write_raw_block_test write_raw_block_test.c)
add_executable(sml_test sml_test.c)
@ -81,6 +82,13 @@ target_link_libraries(
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_td33798
PUBLIC ${TAOS_LIB}
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_td32526
PUBLIC ${TAOS_LIB}

312
utils/test/c/tmq_td33798.c Normal file
View File

@ -0,0 +1,312 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "cJSON.h"
#include "taos.h"
#include "tmsg.h"
#include "types.h"
bool batchMeta = false;
int32_t consumeIndex = 0;
static TAOS* use_db() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return NULL;
}
TAOS_RES* pRes = taos_query(pConn, "use db_taosx");
if (taos_errno(pRes) != 0) {
printf("error in use db_taosx, reason:%s\n", taos_errstr(pRes));
return NULL;
}
taos_free_result(pRes);
return pConn;
}
void checkBatchMeta(TAOS_RES* msg){
char* result = tmq_get_json_meta(msg);
printf("meta result: %s\n", result);
switch (consumeIndex) {
case 0:
ASSERT(strcmp(result, "{\"tmq_meta_version\":\"1.0\",\"metas\":[{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false,\"encode\":\"delta-d\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":\"zstd\",\"level\":\"medium\"}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]},{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]},{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct1\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]},{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct11\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]},{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct10\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]},{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[{\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]},{\"tableName\":\"ct3\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}]}]}") == 0);
break;
default:
ASSERT(0);
break;
}
tmq_free_json_meta(result);
}
void checkNonBatchMeta(TAOS_RES* msg){
char* result = tmq_get_json_meta(msg);
printf("meta result: %s\n", result);
switch (consumeIndex) {
case 0:
ASSERT(strcmp(result, "{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false,\"encode\":\"delta-d\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":\"zstd\",\"level\":\"medium\"}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}") == 0);
break;
case 1:
ASSERT(strcmp(result, "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}") == 0);
break;
case 2:
ASSERT(strcmp(result, "{\"tmq_meta_version\":\"1.0\",\"metas\":[{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct1\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}]}") == 0);
break;
case 3:
ASSERT(strcmp(result, "{\"tmq_meta_version\":\"1.0\",\"metas\":[{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct11\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}]}") == 0);
break;
case 4:
ASSERT(strcmp(result, "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct10\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}") == 0);
break;
case 5:
ASSERT(strcmp(result, "{\"tmq_meta_version\":\"1.0\",\"metas\":[{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[{\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]},{\"tableName\":\"ct3\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}]}]}") == 0);
break;
default:
ASSERT(0);
break;
}
tmq_free_json_meta(result);
}
static void msg_process(TAOS_RES* msg) {
printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg));
printf("db: %s\n", tmq_get_db_name(msg));
printf("vg: %d\n", tmq_get_vgroup_id(msg));
TAOS* pConn = use_db();
ASSERT (tmq_get_res_type(msg) == TMQ_RES_TABLE_META);
if (batchMeta){
checkBatchMeta(msg);
} else {
checkNonBatchMeta(msg);
}
taos_close(pConn);
}
int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct1 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct11 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct10 using st1 tags(1000, \"ttt\", true)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taosSsleep(1);
pRes = taos_query(pConn, "insert into ct1 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a') ct2 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a') ct3 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists db_taosx vgroups 1 wal_retention_period 3600");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop topic if exists topic_db");
if (taos_errno(pRes) != 0) {
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists abc1");
if (taos_errno(pRes) != 0) {
printf("error in drop db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1 wal_retention_period 3600");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
buildDatabase(pConn, pRes);
taos_close(pConn);
return 0;
}
int32_t create_topic() {
printf("create topic\n");
TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
pRes = taos_query(pConn, "create topic topic_db only meta as database abc1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("commit %d tmq %p param %p\n", code, tmq, param);
}
tmq_t* build_consumer() {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", batchMeta ? "batch" : "nonbatch");
tmq_conf_set(conf, "client.id", "my app 1");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.consume.excluded", "1");
if (batchMeta) {
tmq_conf_set(conf, "msg.enable.batchmeta", "1");
}
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
}
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "topic_db");
return topic_list;
}
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
int32_t code;
if ((code = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
printf("subscribe err\n");
return;
}
while (1) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 5000);
if (tmqmessage) {
msg_process(tmqmessage);
consumeIndex++;
taos_free_result(tmqmessage);
} else {
break;
}
}
code = tmq_consumer_close(tmq);
if (code)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
else
fprintf(stderr, "%% Consumer closed\n");
}
int main(int argc, char* argv[]) {
if (init_env() < 0) {
return -1;
}
create_topic();
tmq_list_t* topic_list = build_topic_list();
tmq_t* tmq = build_consumer();
basic_consume_loop(tmq, topic_list);
batchMeta = true;
consumeIndex = 0;
tmq = build_consumer();
basic_consume_loop(tmq, topic_list);
tmq_list_destroy(topic_list);
}