Merge pull request #23187 from taosdata/feat/TD-26056
feat:add replay logic
This commit is contained in:
commit
c6dd73b68b
|
@ -356,6 +356,7 @@ You configure the following parameters when creating a consumer:
|
|||
| `enable.auto.commit` | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself | Default value is true |
|
||||
| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
|
||||
| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages. Not applicable if subscribe to a column (tbname can be written as a column in the subquery statement during column subscriptions) (This parameter has been deprecated since version 3.2.0.0 and remains true) | default value: false
|
||||
| `enable.replay` | boolean | Specify whether data replay function enabled or not |default value: false |
|
||||
|
||||
The method of specifying these parameters depends on the language used:
|
||||
|
||||
|
@ -526,6 +527,24 @@ var consumer = new ConsumerBuilder(cfg).Build();
|
|||
|
||||
A consumer group is automatically created when multiple consumers are configured with the same consumer group ID.
|
||||
|
||||
Data replay function description:
|
||||
- Subscription adds replay function, which replays according to the time of data writing.
|
||||
For example, writing three pieces of data at the following time.
|
||||
```sql
|
||||
2023/09/22 00:00:00.000
|
||||
2023/09/22 00:00:05.000
|
||||
2023/09/22 00:00:08.000
|
||||
```
|
||||
After subscribing to the first data for 5 seconds, the second data is returned, and after obtaining the second data for 3 seconds, the third data is returned.
|
||||
- Only column subscriptions support data replay.
|
||||
- Replay needs to ensure an independent timeline
|
||||
- If it is a sub table subscription or a normal table subscription, only one vnode has data, ensuring a timeline.
|
||||
- If subscribing to a super table, it is necessary to ensure that the DB has only one vnode, otherwise an error will be reported (because the data subscribed to on multiple vnodes is not on the same timeline).
|
||||
- Super table and database subscriptions do not support replay
|
||||
- Add the enable.replay parameter. True indicates that the subscription replay function is enabled, while false indicates that the subscription replay function is not enabled by default.
|
||||
- Replay does not support progress saving, so when the replay parameter enable, auto commit will automatically close.
|
||||
- Due to the processing time required for data replay, there is an error of tens of milliseconds in the accuracy of replay.
|
||||
|
||||
## Subscribe to a Topic
|
||||
|
||||
A single consumer can subscribe to multiple topics.
|
||||
|
|
|
@ -355,6 +355,7 @@ CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
|
|||
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit | 默认值为 true |
|
||||
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
|
||||
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句)(从3.2.0.0版本该参数废弃,恒为true) |默认关闭 |
|
||||
| `enable.replay` | boolean | 是否开启数据回放功能 |默认关闭 |
|
||||
|
||||
对于不同编程语言,其设置方式如下:
|
||||
|
||||
|
@ -527,6 +528,24 @@ var consumer = new ConsumerBuilder(cfg).Build();
|
|||
|
||||
上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。
|
||||
|
||||
数据回放功能说明:
|
||||
- 订阅增加 replay 功能,按照数据写入的时间回放。
|
||||
比如,如下时间写入三条数据
|
||||
```sql
|
||||
2023/09/22 00:00:00.000
|
||||
2023/09/22 00:00:05.000
|
||||
2023/09/22 00:00:08.000
|
||||
```
|
||||
则订阅出第一条数据 5s 后返回第二条数据,获取第二条数据 3s 后返回第三条数据。
|
||||
- 仅列订阅支持数据回放
|
||||
- 回放需要保证独立时间线
|
||||
- 如果是子表订阅或者普通表订阅,只有一个vnode上有数据,保证是一个时间线
|
||||
- 如果超级表订阅,则需保证该 DB 只有一个vnode,否则报错(因为多个vnode上订阅出的数据不在一个时间线上)
|
||||
- 超级表和库订阅不支持回放
|
||||
- 增加 enable.replay 参数,true表示开启订阅回放功能,false表示不开启订阅回放功能,默认不开启。
|
||||
- 回放不支持进度保存,所以回放参数 enable.replay = true 时,auto commit 自动关闭
|
||||
- 因为数据回放本身需要处理时间,所以回放的精度存在几十ms的误差
|
||||
|
||||
## 订阅 *topics*
|
||||
|
||||
一个 consumer 支持同时订阅多个 topic。
|
||||
|
|
|
@ -2365,17 +2365,6 @@ int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateS
|
|||
int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq);
|
||||
void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_STREAM_FNAME_LEN];
|
||||
int64_t streamId;
|
||||
char* sql;
|
||||
char* executorMsg;
|
||||
} SMVCreateStreamReq, SMSCreateStreamReq;
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
} SMVCreateStreamRsp, SMSCreateStreamRsp;
|
||||
|
||||
enum {
|
||||
TOPIC_SUB_TYPE__DB = 1,
|
||||
TOPIC_SUB_TYPE__TABLE,
|
||||
|
@ -2397,16 +2386,9 @@ int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTo
|
|||
int32_t tDeserializeSCMCreateTopicReq(void* buf, int32_t bufLen, SCMCreateTopicReq* pReq);
|
||||
void tFreeSCMCreateTopicReq(SCMCreateTopicReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int64_t topicId;
|
||||
} SCMCreateTopicRsp;
|
||||
|
||||
int32_t tSerializeSCMCreateTopicRsp(void* buf, int32_t bufLen, const SCMCreateTopicRsp* pRsp);
|
||||
int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
int64_t consumerId;
|
||||
} SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg;
|
||||
} SMqConsumerRecoverMsg, SMqConsumerClearMsg;
|
||||
|
||||
typedef struct {
|
||||
int64_t consumerId;
|
||||
|
@ -2418,6 +2400,7 @@ typedef struct {
|
|||
int8_t autoCommit;
|
||||
int32_t autoCommitInterval;
|
||||
int8_t resetOffsetCfg;
|
||||
int8_t enableReplay;
|
||||
} SCMSubscribeReq;
|
||||
|
||||
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
|
||||
|
@ -2437,6 +2420,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
|
|||
tlen += taosEncodeFixedI8(buf, pReq->autoCommit);
|
||||
tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval);
|
||||
tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg);
|
||||
tlen += taosEncodeFixedI8(buf, pReq->enableReplay);
|
||||
|
||||
return tlen;
|
||||
}
|
||||
|
@ -2460,71 +2444,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
|
|||
buf = taosDecodeFixedI8(buf, &pReq->autoCommit);
|
||||
buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval);
|
||||
buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg);
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SMqSubTopic {
|
||||
int32_t vgId;
|
||||
int64_t topicId;
|
||||
SEpSet epSet;
|
||||
} SMqSubTopic;
|
||||
|
||||
typedef struct {
|
||||
int32_t topicNum;
|
||||
SMqSubTopic topics[];
|
||||
} SCMSubscribeRsp;
|
||||
|
||||
static FORCE_INLINE int32_t tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pRsp->topicNum);
|
||||
for (int32_t i = 0; i < pRsp->topicNum; i++) {
|
||||
tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId);
|
||||
tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId);
|
||||
tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) {
|
||||
buf = taosDecodeFixedI32(buf, &pRsp->topicNum);
|
||||
for (int32_t i = 0; i < pRsp->topicNum; i++) {
|
||||
buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId);
|
||||
buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId);
|
||||
buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int64_t topicId;
|
||||
int64_t consumerId;
|
||||
int64_t consumerGroupId;
|
||||
int64_t offset;
|
||||
char* sql;
|
||||
char* logicalPlan;
|
||||
char* physicalPlan;
|
||||
} SMVSubscribeReq;
|
||||
|
||||
static FORCE_INLINE int32_t tSerializeSMVSubscribeReq(void** buf, SMVSubscribeReq* pReq) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pReq->topicId);
|
||||
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
||||
tlen += taosEncodeFixedI64(buf, pReq->consumerGroupId);
|
||||
tlen += taosEncodeFixedI64(buf, pReq->offset);
|
||||
tlen += taosEncodeString(buf, pReq->sql);
|
||||
tlen += taosEncodeString(buf, pReq->logicalPlan);
|
||||
tlen += taosEncodeString(buf, pReq->physicalPlan);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq* pReq) {
|
||||
buf = taosDecodeFixedI64(buf, &pReq->topicId);
|
||||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
||||
buf = taosDecodeFixedI64(buf, &pReq->consumerGroupId);
|
||||
buf = taosDecodeFixedI64(buf, &pReq->offset);
|
||||
buf = taosDecodeString(buf, &pReq->sql);
|
||||
buf = taosDecodeString(buf, &pReq->logicalPlan);
|
||||
buf = taosDecodeString(buf, &pReq->physicalPlan);
|
||||
buf = taosDecodeFixedI8(buf, &pReq->enableReplay);
|
||||
return buf;
|
||||
}
|
||||
|
||||
|
@ -3620,6 +3540,7 @@ typedef struct {
|
|||
int64_t consumerId;
|
||||
int64_t timeout;
|
||||
STqOffsetVal reqOffset;
|
||||
int8_t enableReplay;
|
||||
} SMqPollReq;
|
||||
|
||||
int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
|
||||
|
@ -3679,6 +3600,7 @@ typedef struct {
|
|||
SArray* blockData;
|
||||
SArray* blockTbName;
|
||||
SArray* blockSchema;
|
||||
int64_t sleepTime;
|
||||
} SMqDataRsp;
|
||||
|
||||
int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
|
||||
|
|
|
@ -227,6 +227,7 @@ typedef struct SStoreTqReader {
|
|||
bool (*tqReaderNextBlockInWal)();
|
||||
bool (*tqNextBlockImpl)(); // todo remove it
|
||||
SSDataBlock* (*tqGetResultBlock)();
|
||||
int64_t (*tqGetResultBlockTime)();
|
||||
|
||||
void (*tqReaderSetColIdList)();
|
||||
int32_t (*tqReaderSetQueryTableList)();
|
||||
|
|
|
@ -799,6 +799,8 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010)
|
||||
#define TSDB_CODE_TMQ_NO_COMMITTED TAOS_DEF_ERROR_CODE(0, 0x4011)
|
||||
#define TSDB_CODE_TMQ_SAME_COMMITTED_VALUE TAOS_DEF_ERROR_CODE(0, 0x4012)
|
||||
#define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013)
|
||||
#define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014)
|
||||
|
||||
// stream
|
||||
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
||||
|
|
|
@ -853,7 +853,6 @@ static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPr
|
|||
size_t typeLen = strlen(type->valuestring);
|
||||
if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
|
||||
// seconds
|
||||
// int8_t fromPrecision = TSDB_TIME_PRECISION_SECONDS;
|
||||
if (smlFactorS[toPrecision] < INT64_MAX / tsInt64) {
|
||||
return tsInt64 * smlFactorS[toPrecision];
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ struct tmq_conf_t {
|
|||
int8_t resetOffset;
|
||||
int8_t withTbName;
|
||||
int8_t snapEnable;
|
||||
// int32_t snapBatchSize;
|
||||
int8_t replayEnable;
|
||||
uint16_t port;
|
||||
int32_t autoCommitInterval;
|
||||
char* ip;
|
||||
|
@ -81,6 +81,7 @@ struct tmq_t {
|
|||
int8_t autoCommit;
|
||||
int32_t autoCommitInterval;
|
||||
int8_t resetOffsetCfg;
|
||||
int8_t replayEnable;
|
||||
uint64_t consumerId;
|
||||
tmq_commit_cb* commitCb;
|
||||
void* commitCbUserParam;
|
||||
|
@ -89,19 +90,13 @@ struct tmq_t {
|
|||
SRWLatch lock;
|
||||
int8_t status;
|
||||
int32_t epoch;
|
||||
#if 0
|
||||
int8_t epStatus;
|
||||
int32_t epSkipCnt;
|
||||
#endif
|
||||
// poll info
|
||||
int64_t pollCnt;
|
||||
int64_t totalRows;
|
||||
// bool needReportOffsetRows;
|
||||
|
||||
// timer
|
||||
tmr_h hbLiveTimer;
|
||||
tmr_h epTimer;
|
||||
tmr_h reportTimer;
|
||||
tmr_h commitTimer;
|
||||
STscObj* pTscObj; // connection
|
||||
SArray* clientTopics; // SArray<SMqClientTopic>
|
||||
|
@ -149,6 +144,8 @@ typedef struct {
|
|||
int32_t vgStatus;
|
||||
int32_t vgSkipCnt; // here used to mark the slow vgroups
|
||||
int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
|
||||
int64_t blockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
|
||||
int64_t blockSleepForReplay; // once empty block is received, idle for ignoreCnt then start to poll data
|
||||
bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp.
|
||||
SEpSet epSet;
|
||||
} SMqClientVg;
|
||||
|
@ -356,24 +353,6 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
}
|
||||
}
|
||||
|
||||
// if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
|
||||
// conf->snapBatchSize = taosStr2int64(value);
|
||||
// return TMQ_CONF_OK;
|
||||
// }
|
||||
|
||||
// if (strcasecmp(key, "enable.heartbeat.background") == 0) {
|
||||
// if (strcasecmp(value, "true") == 0) {
|
||||
// conf->hbBgEnable = true;
|
||||
// return TMQ_CONF_OK;
|
||||
// } else if (strcasecmp(value, "false") == 0) {
|
||||
// conf->hbBgEnable = false;
|
||||
// return TMQ_CONF_OK;
|
||||
// } else {
|
||||
// tscError("the default value of enable.heartbeat.background is true, can not be seted");
|
||||
// return TMQ_CONF_INVALID;
|
||||
// }
|
||||
// }
|
||||
|
||||
if (strcasecmp(key, "td.connect.ip") == 0) {
|
||||
conf->ip = taosStrdup(value);
|
||||
return TMQ_CONF_OK;
|
||||
|
@ -394,6 +373,18 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
if (strcasecmp(key, "enable.replay") == 0) {
|
||||
if (strcasecmp(value, "true") == 0) {
|
||||
conf->replayEnable = true;
|
||||
return TMQ_CONF_OK;
|
||||
} else if (strcasecmp(value, "false") == 0) {
|
||||
conf->replayEnable = false;
|
||||
return TMQ_CONF_OK;
|
||||
} else {
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
}
|
||||
|
||||
if (strcasecmp(key, "td.connect.db") == 0) {
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
@ -729,6 +720,17 @@ void tmqAssignAskEpTask(void* param, void* tmrId) {
|
|||
taosMemoryFree(param);
|
||||
}
|
||||
|
||||
void tmqReplayTask(void* param, void* tmrId) {
|
||||
int64_t refId = *(int64_t*)param;
|
||||
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
|
||||
if(tmq == NULL) goto END;
|
||||
|
||||
tsem_post(&tmq->rspSem);
|
||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
END:
|
||||
taosMemoryFree(param);
|
||||
}
|
||||
|
||||
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
|
||||
int64_t refId = *(int64_t*)param;
|
||||
generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
|
||||
|
@ -1071,6 +1073,10 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
pTmq->commitCb = conf->commitCb;
|
||||
pTmq->commitCbUserParam = conf->commitCbUserParam;
|
||||
pTmq->resetOffsetCfg = conf->resetOffset;
|
||||
pTmq->replayEnable = conf->replayEnable;
|
||||
if(conf->replayEnable){
|
||||
pTmq->autoCommit = false;
|
||||
}
|
||||
taosInitRWLatch(&pTmq->lock);
|
||||
|
||||
// assign consumerId
|
||||
|
@ -1140,6 +1146,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
req.autoCommit = tmq->autoCommit;
|
||||
req.autoCommitInterval = tmq->autoCommitInterval;
|
||||
req.resetOffsetCfg = tmq->resetOffsetCfg;
|
||||
req.enableReplay = tmq->replayEnable;
|
||||
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char* topic = taosArrayGetP(container, i);
|
||||
|
@ -1415,6 +1422,8 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
|||
.vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
|
||||
.vgSkipCnt = 0,
|
||||
.emptyBlockReceiveTs = 0,
|
||||
.blockReceiveTs = 0,
|
||||
.blockSleepForReplay = 0,
|
||||
.numOfRows = pInfo ? pInfo->numOfRows : 0,
|
||||
};
|
||||
|
||||
|
@ -1526,6 +1535,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
|
|||
pReq->head.vgId = pVg->vgId;
|
||||
pReq->useSnapshot = tmq->useSnapshot;
|
||||
pReq->reqId = generateRequestId();
|
||||
pReq->enableReplay = tmq->replayEnable;
|
||||
}
|
||||
|
||||
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||
|
@ -1686,6 +1696,12 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (tmq->replayEnable && taosGetTimestampMs() - pVg->blockReceiveTs < pVg->blockSleepForReplay) { // less than 10ms
|
||||
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay", tmq->consumerId,
|
||||
tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
||||
if (vgStatus == TMQ_VG_STATUS__WAIT) {
|
||||
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
|
||||
|
@ -1807,6 +1823,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|||
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
||||
tmq->totalRows += numOfRows;
|
||||
pVg->emptyBlockReceiveTs = 0;
|
||||
if(tmq->replayEnable){
|
||||
pVg->blockReceiveTs = taosGetTimestampMs();
|
||||
pVg->blockSleepForReplay = pRsp->rsp.sleepTime;
|
||||
if(pVg->blockSleepForReplay > 0){
|
||||
int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRefId1 = tmq->refId;
|
||||
taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, pRefId1, tmqMgmt.timer);
|
||||
}
|
||||
}
|
||||
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
||||
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
|
||||
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
|
||||
|
|
|
@ -4552,31 +4552,6 @@ void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t tSerializeSCMCreateTopicRsp(void *buf, int32_t bufLen, const SCMCreateTopicRsp *pRsp) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pRsp->topicId) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSCMCreateTopicRsp(void *buf, int32_t bufLen, SCMCreateTopicRsp *pRsp) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pRsp->topicId) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
@ -6219,6 +6194,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
|
|||
if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->timeout) < 0) return -1;
|
||||
if (tSerializeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->enableReplay) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -6255,6 +6231,10 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
|
|||
if (tDecodeI64(&decoder, &pReq->timeout) < 0) return -1;
|
||||
if (tDerializeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1;
|
||||
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
if (tDecodeI8(&decoder, &pReq->enableReplay) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -8057,6 +8037,7 @@ int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (tEncodeI64(pEncoder, pRsp->sleepTime) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -8102,6 +8083,8 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (tDecodeI64(pDecoder, &pRsp->sleepTime) < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -491,32 +491,6 @@ typedef struct {
|
|||
char filterTb[TSDB_TABLE_NAME_LEN];
|
||||
} SShowObj;
|
||||
|
||||
typedef struct {
|
||||
int64_t id;
|
||||
int8_t type;
|
||||
int8_t replica;
|
||||
int16_t numOfColumns;
|
||||
int32_t rowSize;
|
||||
int32_t numOfRows;
|
||||
int32_t numOfReads;
|
||||
int32_t payloadLen;
|
||||
void* pIter;
|
||||
SMnode* pMnode;
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int16_t offset[TSDB_MAX_COLUMNS];
|
||||
int32_t bytes[TSDB_MAX_COLUMNS];
|
||||
char payload[];
|
||||
} SSysTableRetrieveObj;
|
||||
|
||||
typedef struct {
|
||||
char key[TSDB_PARTITION_KEY_LEN];
|
||||
int64_t dbUid;
|
||||
int64_t offset;
|
||||
} SMqOffsetObj;
|
||||
|
||||
int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset);
|
||||
void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset);
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "mndPrivilege.h"
|
||||
#include "mndVgroup.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndSubscribe.h"
|
||||
#include "mndTopic.h"
|
||||
#include "mndTrans.h"
|
||||
|
@ -124,30 +125,55 @@ void mndRebCntDec() {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
|
||||
int32_t numOfTopics = taosArrayGetSize(pTopicList);
|
||||
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
int32_t numOfTopics = taosArrayGetSize(pTopicList);
|
||||
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||
char *pOneTopic = taosArrayGetP(pTopicList, i);
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
|
||||
pTopic = mndAcquireTopic(pMnode, pOneTopic);
|
||||
if (pTopic == NULL) { // terrno has been set by callee function
|
||||
return -1;
|
||||
code = -1;
|
||||
goto FAILED;
|
||||
}
|
||||
|
||||
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
return -1;
|
||||
code = -1;
|
||||
goto FAILED;
|
||||
}
|
||||
|
||||
if(enableReplay){
|
||||
if(pTopic->subType != TOPIC_SUB_TYPE__COLUMN){
|
||||
code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
|
||||
goto FAILED;
|
||||
}else if(pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) {
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
|
||||
if (pDb == NULL) {
|
||||
code = -1;
|
||||
goto FAILED;
|
||||
}
|
||||
if (pDb->cfg.numOfVgroups != 1) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
code = TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP;
|
||||
goto FAILED;
|
||||
}
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
}
|
||||
}
|
||||
|
||||
mndTransSetDbName(pTrans, pOneTopic, NULL);
|
||||
if(mndTransCheckConflict(pMnode, pTrans) != 0){
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
return -1;
|
||||
code = -1;
|
||||
goto FAILED;
|
||||
}
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
}
|
||||
|
||||
return 0;
|
||||
FAILED:
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
||||
|
@ -177,7 +203,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
|||
if (pTrans == NULL) {
|
||||
goto FAIL;
|
||||
}
|
||||
if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user) != 0){
|
||||
if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false) != 0){
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -697,7 +723,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
goto _over;
|
||||
}
|
||||
|
||||
code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user);
|
||||
code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user, subscribe.enableReplay);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _over;
|
||||
}
|
||||
|
|
|
@ -330,24 +330,6 @@ void dumpSubscribe(SSdb *pSdb, SJson *json) {
|
|||
}
|
||||
}
|
||||
|
||||
void dumpOffset(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "offsets");
|
||||
|
||||
while (1) {
|
||||
SMqOffsetObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "key", pObj->key);
|
||||
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
||||
tjsonAddStringToObject(item, "offset", i642str(pObj->offset));
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpStream(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "streams");
|
||||
|
@ -608,7 +590,7 @@ void mndDumpSdb() {
|
|||
dumpTopic(pSdb, json);
|
||||
dumpConsumer(pSdb, json);
|
||||
dumpSubscribe(pSdb, json);
|
||||
dumpOffset(pSdb, json);
|
||||
// dumpOffset(pSdb, json);
|
||||
dumpStream(pSdb, json);
|
||||
dumpAcct(pSdb, json);
|
||||
dumpAuth(pSdb, json);
|
||||
|
|
|
@ -298,11 +298,6 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic
|
|||
atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
|
||||
atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
|
||||
|
||||
/*taosWLockLatch(&pOldTopic->lock);*/
|
||||
|
||||
// TODO handle update
|
||||
|
||||
/*taosWUnLockLatch(&pOldTopic->lock);*/
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -320,23 +315,6 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
|
|||
sdbRelease(pSdb, pTopic);
|
||||
}
|
||||
|
||||
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
|
||||
int32_t contLen = sizeof(SDDropTopicReq);
|
||||
|
||||
SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen);
|
||||
if (pDrop == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pDrop->head.contLen = htonl(contLen);
|
||||
pDrop->head.vgId = htonl(pVgroup->vgId);
|
||||
memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
|
||||
pDrop->tuid = htobe64(pTopic->uid);
|
||||
|
||||
return pDrop;
|
||||
}
|
||||
|
||||
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
|
||||
terrno = TSDB_CODE_MND_INVALID_TOPIC;
|
||||
|
||||
|
|
|
@ -226,6 +226,7 @@ typedef struct STqReader {
|
|||
int64_t cachedSchemaUid;
|
||||
SSchemaWrapper *pSchemaWrapper;
|
||||
SSDataBlock *pResBlock;
|
||||
int64_t lastTs;
|
||||
} STqReader;
|
||||
|
||||
STqReader *tqReaderOpen(SVnode *pVnode);
|
||||
|
@ -244,6 +245,7 @@ bool tqNextBlockInWal(STqReader *pReader, const char *idstr);
|
|||
bool tqNextBlockImpl(STqReader *pReader, const char *idstr);
|
||||
SWalReader *tqGetWalReader(STqReader *pReader);
|
||||
SSDataBlock *tqGetResultBlock(STqReader *pReader);
|
||||
int64_t tqGetResultBlockTime(STqReader *pReader);
|
||||
|
||||
int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id);
|
||||
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
||||
|
|
|
@ -46,6 +46,7 @@ typedef struct STqOffsetStore STqOffsetStore;
|
|||
// tqPush
|
||||
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
|
||||
#define STREAM_EXEC_TASK_STATUS_CHECK_ID (-2)
|
||||
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
|
||||
|
||||
// tqExec
|
||||
typedef struct {
|
||||
|
@ -90,6 +91,10 @@ typedef struct {
|
|||
STqExecHandle execHandle; // exec
|
||||
SRpcMsg* msg;
|
||||
tq_handle_status status;
|
||||
|
||||
// for replay
|
||||
SSDataBlock* block;
|
||||
int64_t blockTime;
|
||||
} STqHandle;
|
||||
|
||||
struct STQ {
|
||||
|
@ -107,17 +112,13 @@ struct STQ {
|
|||
SStreamMeta* pStreamMeta;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
int32_t size;
|
||||
} STqOffsetHead;
|
||||
|
||||
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
|
||||
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
||||
void tqDestroyTqHandle(void* data);
|
||||
|
||||
// tqRead
|
||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
||||
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
|
||||
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest);
|
||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
|
||||
|
||||
// tqExec
|
||||
|
|
|
@ -82,6 +82,9 @@ void tqDestroyTqHandle(void* data) {
|
|||
taosMemoryFree(pData->msg);
|
||||
pData->msg = NULL;
|
||||
}
|
||||
if (pData->block != NULL){
|
||||
blockDataDestroy(pData->block);
|
||||
}
|
||||
}
|
||||
|
||||
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
|
||||
|
|
|
@ -37,11 +37,9 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
|
|||
|
||||
int32_t vgId = TD_VID(pStore->pTq->pVnode);
|
||||
int64_t code = 0;
|
||||
|
||||
STqOffsetHead head = {0};
|
||||
|
||||
int32_t size = 0;
|
||||
while (1) {
|
||||
if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) {
|
||||
if ((code = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) {
|
||||
if (code == 0) {
|
||||
break;
|
||||
} else {
|
||||
|
@ -49,7 +47,6 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t size = htonl(head.size);
|
||||
void* pMemBuf = taosMemoryCalloc(1, size);
|
||||
if (pMemBuf == NULL) {
|
||||
tqError("vgId:%d failed to restore offset from file, since out of memory, malloc size:%d", vgId, size);
|
||||
|
@ -175,11 +172,11 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t totLen = sizeof(STqOffsetHead) + bodyLen;
|
||||
int32_t totLen = INT_BYTES + bodyLen;
|
||||
void* buf = taosMemoryCalloc(1, totLen);
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(STqOffsetHead));
|
||||
void* abuf = POINTER_SHIFT(buf, INT_BYTES);
|
||||
|
||||
((STqOffsetHead*)buf)->size = htonl(bodyLen);
|
||||
*(int32_t*)buf = bodyLen;
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, bodyLen);
|
||||
tEncodeSTqOffset(&encoder, pOffset);
|
||||
|
|
|
@ -369,82 +369,56 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
|||
// todo ignore the error in wal?
|
||||
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
|
||||
SWalReader* pWalReader = pReader->pWalReader;
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
|
||||
uint64_t st = taosGetTimestampMs();
|
||||
while (1) {
|
||||
SArray* pBlockList = pReader->submit.aSubmitTbData;
|
||||
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
|
||||
// try next message in wal file
|
||||
// todo always retry to avoid read failure caused by wal file deletion
|
||||
if (walNextValidMsg(pWalReader) < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
||||
int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
||||
int64_t ver = pWalReader->pHead->head.version;
|
||||
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, pBody, bodyLen);
|
||||
|
||||
{
|
||||
int32_t nSubmitTbData = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
||||
for (int32_t i = 0; i < nSubmitTbData; i++) {
|
||||
SSubmitTbData* pData = taosArrayGet(pReader->submit.aSubmitTbData, i);
|
||||
if (pData->pCreateTbReq != NULL) {
|
||||
taosArrayDestroy(pData->pCreateTbReq->ctb.tagName);
|
||||
taosMemoryFreeClear(pData->pCreateTbReq);
|
||||
}
|
||||
pData->aRowP = taosArrayDestroy(pData->aRowP);
|
||||
}
|
||||
pReader->submit.aSubmitTbData = taosArrayDestroy(pReader->submit.aSubmitTbData);
|
||||
}
|
||||
|
||||
if (tDecodeSubmitReq(&decoder, &pReader->submit) < 0) {
|
||||
tDecoderClear(&decoder);
|
||||
tqError("decode wal file error, msgLen:%d, ver:%" PRId64, bodyLen, ver);
|
||||
return false;
|
||||
}
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
pReader->nextBlk = 0;
|
||||
// try next message in wal file
|
||||
if (walNextValidMsg(pWalReader) < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
||||
int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
||||
int64_t ver = pWalReader->pHead->head.version;
|
||||
|
||||
tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver);
|
||||
pReader->nextBlk = 0;
|
||||
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
||||
while (pReader->nextBlk < numOfBlocks) {
|
||||
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk,
|
||||
numOfBlocks, pReader->msg.msgLen, pReader->msg.ver, pReader->nextBlk);
|
||||
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk,
|
||||
numOfBlocks, pReader->msg.msgLen, pReader->msg.ver);
|
||||
|
||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||
|
||||
if (pReader->tbIdHash == NULL) {
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
|
||||
if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
|
||||
if (ret != NULL) {
|
||||
tqTrace("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver);
|
||||
|
||||
if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
|
||||
tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
|
||||
if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
|
||||
return true;
|
||||
if(pDataBlock == NULL){
|
||||
pDataBlock = createOneDataBlock(pRes, true);
|
||||
}else{
|
||||
blockDataMerge(pDataBlock, pRes);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
pReader->nextBlk += 1;
|
||||
tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
|
||||
}
|
||||
}
|
||||
|
||||
qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
|
||||
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
|
||||
|
||||
pReader->msg.msgStr = NULL;
|
||||
|
||||
if(pDataBlock != NULL){
|
||||
blockDataCleanup(pReader->pResBlock);
|
||||
copyDataBlock(pReader->pResBlock, pDataBlock);
|
||||
blockDataDestroy(pDataBlock);
|
||||
return true;
|
||||
}else{
|
||||
qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
|
||||
}
|
||||
|
||||
if(taosGetTimestampMs() - st > 1000){
|
||||
return false;
|
||||
}
|
||||
|
@ -478,6 +452,10 @@ SSDataBlock* tqGetResultBlock (STqReader* pReader) {
|
|||
return pReader->pResBlock;
|
||||
}
|
||||
|
||||
int64_t tqGetResultBlockTime(STqReader *pReader){
|
||||
return pReader->lastTs;
|
||||
}
|
||||
|
||||
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
|
||||
if (pReader->msg.msgStr == NULL) {
|
||||
return false;
|
||||
|
@ -644,7 +622,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
|
|||
int32_t sversion = pSubmitTbData->sver;
|
||||
int64_t suid = pSubmitTbData->suid;
|
||||
int64_t uid = pSubmitTbData->uid;
|
||||
pReader->lastBlkUid = uid;
|
||||
pReader->lastTs = pSubmitTbData->ctimeMs;
|
||||
|
||||
pBlock->info.id.uid = uid;
|
||||
pBlock->info.version = pReader->msg.ver;
|
||||
|
@ -786,7 +764,6 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
|
|||
}
|
||||
|
||||
int32_t sversion = pSubmitTbData->sver;
|
||||
int64_t suid = pSubmitTbData->suid;
|
||||
int64_t uid = pSubmitTbData->uid;
|
||||
pReader->lastBlkUid = uid;
|
||||
|
||||
|
|
|
@ -64,7 +64,23 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
|
||||
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res){
|
||||
uint64_t ts = 0;
|
||||
qStreamSetOpen(task);
|
||||
|
||||
tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
|
||||
int32_t code = qExecTask(task, res, &ts);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
|
||||
terrno = code;
|
||||
return -1;
|
||||
}
|
||||
|
||||
tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
|
||||
const int32_t MAX_ROWS_TO_RETURN = 4096;
|
||||
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
@ -80,34 +96,66 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
|
||||
while (1) {
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
uint64_t ts = 0;
|
||||
qStreamSetOpen(task);
|
||||
|
||||
tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
|
||||
code = qExecTask(task, &pDataBlock, &ts);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
|
||||
terrno = code;
|
||||
return -1;
|
||||
}
|
||||
|
||||
tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId,
|
||||
pDataBlock);
|
||||
// current scan should be stopped asap, since the rebalance occurs.
|
||||
if (pDataBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("vgId:%d, failed to add block to rsp msg", vgId);
|
||||
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
|
||||
if (code != 0){
|
||||
return code;
|
||||
}
|
||||
|
||||
pRsp->blockNum++;
|
||||
totalRows += pDataBlock->info.rows;
|
||||
if (totalRows >= MAX_ROWS_TO_RETURN) {
|
||||
if(pRequest->enableReplay){
|
||||
if(IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL){
|
||||
blockDataDestroy(pHandle->block);
|
||||
pHandle->block = NULL;
|
||||
}
|
||||
if(pHandle->block == NULL){
|
||||
if (pDataBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
STqOffsetVal offset = {0};
|
||||
qStreamExtractOffset(task, &offset);
|
||||
pHandle->block = createOneDataBlock(pDataBlock, true);
|
||||
// pHandle->block = createDataBlock();
|
||||
// copyDataBlock(pHandle->block, pDataBlock);
|
||||
pHandle->blockTime = offset.ts;
|
||||
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
|
||||
if (code != 0){
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("vgId:%d, failed to add block to rsp msg", vgId);
|
||||
return code;
|
||||
}
|
||||
|
||||
pRsp->blockNum++;
|
||||
if (pDataBlock == NULL) {
|
||||
blockDataDestroy(pHandle->block);
|
||||
pHandle->block = NULL;
|
||||
}else{
|
||||
copyDataBlock(pHandle->block, pDataBlock);
|
||||
|
||||
STqOffsetVal offset = {0};
|
||||
qStreamExtractOffset(task, &offset);
|
||||
pRsp->sleepTime = offset.ts - pHandle->blockTime;
|
||||
pHandle->blockTime = offset.ts;
|
||||
}
|
||||
break;
|
||||
}else{
|
||||
if (pDataBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("vgId:%d, failed to add block to rsp msg", vgId);
|
||||
return code;
|
||||
}
|
||||
|
||||
pRsp->blockNum++;
|
||||
totalRows += pDataBlock->info.rows;
|
||||
if (totalRows >= MAX_ROWS_TO_RETURN) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -15,8 +15,6 @@
|
|||
|
||||
#include "tq.h"
|
||||
|
||||
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
|
||||
|
||||
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
|
||||
const SMqMetaRsp* pRsp, int32_t vgId);
|
||||
|
||||
|
@ -152,7 +150,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
tqInitDataRsp(&dataRsp, *pOffset);
|
||||
|
||||
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
|
||||
if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
||||
goto end;
|
||||
}
|
||||
|
|
|
@ -131,6 +131,7 @@ void initTqAPI(SStoreTqReader* pTq) {
|
|||
pTq->tqGetResultBlock = tqGetResultBlock;
|
||||
|
||||
pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
|
||||
pTq->tqGetResultBlockTime = tqGetResultBlockTime;
|
||||
}
|
||||
|
||||
void initStateStoreAPI(SStateStore* pStore) {
|
||||
|
|
|
@ -110,7 +110,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
|||
pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
|
||||
pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
|
||||
|
||||
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM || pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
|
||||
pInfo->mergeDataBlocks = false;
|
||||
} else {
|
||||
if (!pProjPhyNode->ignoreGroupId) {
|
||||
|
|
|
@ -1863,6 +1863,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
|||
// curVersion move to next
|
||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion);
|
||||
|
||||
// use ts to pass time when replay, because ts not used if type is log
|
||||
pTaskInfo->streamInfo.currentOffset.ts = pAPI->tqReaderFn.tqGetResultBlockTime(pInfo->tqReader);
|
||||
|
||||
if (hasResult) {
|
||||
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
|
||||
pTaskInfo->streamInfo.currentOffset.version);
|
||||
|
|
|
@ -661,6 +661,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE, "Topic num out of range")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of range 100")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed value")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable")
|
||||
|
||||
// stream
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
||||
|
|
|
@ -163,6 +163,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py
|
||||
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
|
||||
,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py
|
||||
|
|
|
@ -0,0 +1,311 @@
|
|||
|
||||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
from enum import Enum
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
|
||||
class actionType(Enum):
|
||||
CREATE_DATABASE = 0
|
||||
CREATE_STABLE = 1
|
||||
CREATE_CTABLE = 2
|
||||
INSERT_DATA = 3
|
||||
|
||||
class TDTestCase:
|
||||
hostname = socket.gethostname()
|
||||
#rpcDebugFlagVal = '143'
|
||||
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||
#print ("===================: ", updatecfgDict)
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
|
||||
def getBuildPath(self):
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
if ("community" in selfPath):
|
||||
projPath = selfPath[:selfPath.find("community")]
|
||||
else:
|
||||
projPath = selfPath[:selfPath.find("tests")]
|
||||
|
||||
for root, dirs, files in os.walk(projPath):
|
||||
if ("taosd" in files or "taosd.exe" in files):
|
||||
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||
if ("packaging" not in rootRealPath):
|
||||
buildPath = root[:len(root) - len("/build/bin")]
|
||||
break
|
||||
return buildPath
|
||||
|
||||
def newcur(self,cfg,host,port):
|
||||
user = "root"
|
||||
password = "taosdata"
|
||||
con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
|
||||
cur=con.cursor()
|
||||
print(cur)
|
||||
return cur
|
||||
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||
tdSql.query("create database if not exists %s vgroups 1 wal_retention_period 3600"%(cdbName))
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
|
||||
|
||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||
|
||||
def initConsumerInfoTable(self,cdbName='cdb'):
|
||||
tdLog.info("drop consumeinfo table")
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
sql = "insert into %s.consumeinfo values "%cdbName
|
||||
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||
tdLog.info("consume info sql: %s"%sql)
|
||||
tdSql.query(sql)
|
||||
|
||||
def selectConsumeResult(self,expectRows,cdbName='cdb'):
|
||||
resultList=[]
|
||||
while 1:
|
||||
tdSql.query("select * from %s.consumeresult"%cdbName)
|
||||
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
||||
if tdSql.getRows() == expectRows:
|
||||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
for i in range(expectRows):
|
||||
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||
resultList.append(tdSql.getData(i , 3))
|
||||
|
||||
return resultList
|
||||
|
||||
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||
if valgrind == 1:
|
||||
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
||||
shellCmd = 'nohup valgrind --log-file=' + logFile
|
||||
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
|
||||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
||||
def create_database(self,tsql, dbName,dropFlag=1,vgroups=1,replica=1):
|
||||
if dropFlag == 1:
|
||||
tsql.execute("drop database if exists %s"%(dbName))
|
||||
|
||||
tsql.execute("create database if not exists %s vgroups %d replica %d wal_retention_period 3600"%(dbName, vgroups, replica))
|
||||
tdLog.debug("complete to create database %s"%(dbName))
|
||||
return
|
||||
|
||||
def create_stable(self,tsql, dbName,stbName):
|
||||
tsql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(dbName, stbName))
|
||||
tdLog.debug("complete to create %s.%s" %(dbName, stbName))
|
||||
return
|
||||
|
||||
def create_ctables(self,tsql, dbName,stbName,ctbNum):
|
||||
tsql.execute("use %s" %dbName)
|
||||
pre_create = "create table"
|
||||
sql = pre_create
|
||||
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
|
||||
for i in range(ctbNum):
|
||||
sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1)
|
||||
if (i > 0) and (i%100 == 0):
|
||||
tsql.execute(sql)
|
||||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
|
||||
return
|
||||
|
||||
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0):
|
||||
tdLog.debug("start to insert data ............")
|
||||
tsql.execute("use %s" %dbName)
|
||||
pre_insert = "insert into "
|
||||
sql = pre_insert
|
||||
|
||||
if startTs == 0:
|
||||
t = time.time()
|
||||
startTs = int(round(t * 1000))
|
||||
|
||||
for j in range(rowsPerTbl):
|
||||
for i in range(ctbNum):
|
||||
sql += " %s_%d values (%d, %d, 'tmqrow_%d') "%(stbName, i, startTs + j + i, j+i, j+i)
|
||||
tsql.execute(sql)
|
||||
time.sleep(1)
|
||||
sql = "insert into "
|
||||
#end sql
|
||||
if sql != pre_insert:
|
||||
#print("insert sql:%s"%sql)
|
||||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
# create new connector for my thread
|
||||
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
|
||||
|
||||
if parameterDict["actionType"] == actionType.CREATE_DATABASE:
|
||||
self.create_database(tsql, parameterDict["dbName"])
|
||||
elif parameterDict["actionType"] == actionType.CREATE_STABLE:
|
||||
self.create_stable(tsql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
elif parameterDict["actionType"] == actionType.CREATE_CTABLE:
|
||||
self.create_ctables(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
elif parameterDict["actionType"] == actionType.INSERT_DATA:
|
||||
self.insert_data(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"], \
|
||||
parameterDict["rowsPerTbl"],parameterDict["batchNum"])
|
||||
else:
|
||||
tdLog.exit("not support's action: ", parameterDict["actionType"])
|
||||
|
||||
return
|
||||
|
||||
def tmqCase8(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 8: ")
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
parameterDict = {'cfg': '', \
|
||||
'actionType': 0, \
|
||||
'dbName': 'db8', \
|
||||
'dropFlag': 1, \
|
||||
'vgroups': 1, \
|
||||
'replica': 1, \
|
||||
'stbName': 'stb1', \
|
||||
'ctbNum': 2, \
|
||||
'rowsPerTbl': 10, \
|
||||
'batchNum': 1, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
self.insert_data(tdSql,\
|
||||
parameterDict["dbName"],\
|
||||
parameterDict["stbName"],\
|
||||
parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"],\
|
||||
parameterDict["batchNum"])
|
||||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] * 2
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest,\
|
||||
enable.replay:true'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume 0 processor")
|
||||
pollDelay = 100
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
tdLog.info("start to check consume 0 result")
|
||||
expectRows = 1
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
# tdLog.info("start consume 1 processor")
|
||||
# self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
# tdLog.sleep(2)
|
||||
#
|
||||
# tdLog.info("start one new thread to insert data")
|
||||
# parameterDict['actionType'] = actionType.INSERT_DATA
|
||||
# prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
# prepareEnvThread.start()
|
||||
# prepareEnvThread.join()
|
||||
#
|
||||
# tdLog.info("start to check consume 0 and 1 result")
|
||||
# expectRows = 2
|
||||
# resultList = self.selectConsumeResult(expectRows)
|
||||
# totalConsumeRows = 0
|
||||
# for i in range(expectRows):
|
||||
# totalConsumeRows += resultList[i]
|
||||
#
|
||||
# if totalConsumeRows != expectrowcnt:
|
||||
# tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
# tdLog.exit("tmq consume rows error!")
|
||||
#
|
||||
# tdLog.info("start consume 2 processor")
|
||||
# self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
# tdLog.sleep(2)
|
||||
#
|
||||
# tdLog.info("start one new thread to insert data")
|
||||
# parameterDict['actionType'] = actionType.INSERT_DATA
|
||||
# prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
# prepareEnvThread.start()
|
||||
# prepareEnvThread.join()
|
||||
#
|
||||
# tdLog.info("start to check consume 0 and 1 and 2 result")
|
||||
# expectRows = 3
|
||||
# resultList = self.selectConsumeResult(expectRows)
|
||||
# totalConsumeRows = 0
|
||||
# for i in range(expectRows):
|
||||
# totalConsumeRows += resultList[i]
|
||||
#
|
||||
# if totalConsumeRows != expectrowcnt*2:
|
||||
# tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2))
|
||||
# tdLog.exit("tmq consume rows error!")
|
||||
#
|
||||
# tdSql.query("drop topic %s"%topicFromStb1)
|
||||
|
||||
tdLog.printNoPrefix("======== test case 8 end ...... ")
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
|
||||
buildPath = self.getBuildPath()
|
||||
if (buildPath == ""):
|
||||
tdLog.exit("taosd not found!")
|
||||
else:
|
||||
tdLog.info("taosd found in %s" % buildPath)
|
||||
cfgPath = buildPath + "/../sim/psim/cfg"
|
||||
tdLog.info("cfgPath: %s" % cfgPath)
|
||||
|
||||
self.tmqCase8(cfgPath, buildPath)
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -0,0 +1,39 @@
|
|||
|
||||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
sys.path.append("./7-tmq")
|
||||
from tmqCommon import *
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
buildPath = tdCom.getBuildPath()
|
||||
|
||||
cmdStr1 = '%s/build/bin/replay_test'%(buildPath)
|
||||
tdLog.info(cmdStr1)
|
||||
result = os.system(cmdStr1)
|
||||
|
||||
if result != 0:
|
||||
tdLog.exit("tmq_replay error!")
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -9,6 +9,7 @@ add_executable(get_db_name_test get_db_name_test.c)
|
|||
add_executable(tmq_offset tmqOffset.c)
|
||||
add_executable(tmq_offset_test tmq_offset_test.c)
|
||||
add_executable(varbinary_test varbinary_test.c)
|
||||
add_executable(replay_test replay_test.c)
|
||||
|
||||
if(${TD_LINUX})
|
||||
add_executable(tsz_test tsz_test.c)
|
||||
|
@ -57,6 +58,14 @@ target_link_libraries(
|
|||
PUBLIC os
|
||||
)
|
||||
|
||||
target_link_libraries(
|
||||
replay_test
|
||||
PUBLIC taos
|
||||
PUBLIC util
|
||||
PUBLIC common
|
||||
PUBLIC os
|
||||
)
|
||||
|
||||
target_link_libraries(
|
||||
write_raw_block_test
|
||||
PUBLIC taos
|
||||
|
|
|
@ -0,0 +1,410 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <assert.h>
|
||||
#include <time.h>
|
||||
#include "taos.h"
|
||||
#include "types.h"
|
||||
|
||||
tmq_t* build_consumer() {
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
tmq_conf_set(conf, "group.id", "g1");
|
||||
tmq_conf_set(conf, "client.id", "c1");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
tmq_conf_set(conf, "enable.replay", "true");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
assert(tmq);
|
||||
tmq_conf_destroy(conf);
|
||||
return tmq;
|
||||
}
|
||||
|
||||
void test_vgroup_error(TAOS* pConn){
|
||||
TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop database if exists d1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create database if not exists d1 vgroups 2 wal_retention_period 3600");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create topic t1 as select * from d1.s1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
tmq_list_t* topic_list = tmq_list_new();
|
||||
|
||||
tmq_list_append(topic_list, "t1");
|
||||
tmq_t* tmq = build_consumer();
|
||||
ASSERT(tmq_subscribe(tmq, topic_list) != 0);
|
||||
tmq_list_destroy(topic_list);
|
||||
tmq_consumer_close(tmq);
|
||||
}
|
||||
|
||||
void test_stable_db_error(TAOS* pConn){
|
||||
TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop database if exists d1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create database if not exists d1 vgroups 1 wal_retention_period 3600");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create topic t1 as stable d1.s1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
tmq_list_t* topic_list = tmq_list_new();
|
||||
|
||||
tmq_list_append(topic_list, "t1");
|
||||
tmq_t* tmq = build_consumer();
|
||||
ASSERT(tmq_subscribe(tmq, topic_list) != 0);
|
||||
tmq_list_destroy(topic_list);
|
||||
tmq_consumer_close(tmq);
|
||||
|
||||
pRes = taos_query(pConn, "drop topic if exists t1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create topic t1 as database d1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
topic_list = tmq_list_new();
|
||||
tmq_list_append(topic_list, "t1");
|
||||
tmq = build_consumer();
|
||||
ASSERT(tmq_subscribe(tmq, topic_list) != 0);
|
||||
tmq_list_destroy(topic_list);
|
||||
tmq_consumer_close(tmq);
|
||||
}
|
||||
|
||||
void insert_with_sleep(TAOS* pConn, int32_t* interval, int32_t len){
|
||||
for(int i = 0; i < len; i++){
|
||||
TAOS_RES* pRes = taos_query(pConn, "insert into d1.table1 (ts, c1) values (now, 1)");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
taosMsleep(interval[i]);
|
||||
}
|
||||
}
|
||||
|
||||
void insert_with_sleep_multi(TAOS* pConn, int32_t* interval, int32_t len){
|
||||
for(int i = 0; i < len; i++){
|
||||
TAOS_RES* pRes = taos_query(pConn, "insert into d1.table1 (ts, c1) values (now, 1) (now+1s, 2) d1.table2 (ts, c1) values (now, 1) (now+1s, 2)");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
taosMsleep(interval[i]);
|
||||
}
|
||||
}
|
||||
|
||||
void test_case1(TAOS* pConn, int32_t* interval, int32_t len){
|
||||
TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop database if exists d1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create database if not exists d1 vgroups 2 wal_retention_period 3600");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table d1.table1 using d1.s1 tags(1)");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
insert_with_sleep(pConn, interval, len);
|
||||
|
||||
pRes = taos_query(pConn, "create topic t1 as select * from d1.table1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
tmq_list_t* topic_list = tmq_list_new();
|
||||
|
||||
tmq_list_append(topic_list, "t1");
|
||||
tmq_t* tmq = build_consumer();
|
||||
// 启动订阅
|
||||
tmq_subscribe(tmq, topic_list);
|
||||
tmq_list_destroy(topic_list);
|
||||
|
||||
int32_t timeout = 5000;
|
||||
|
||||
int64_t t = 0;
|
||||
int32_t totalRows = 0;
|
||||
char buf[1024] = {0};
|
||||
while (1) {
|
||||
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, timeout);
|
||||
if (tmqmessage) {
|
||||
if(t != 0){
|
||||
ASSERT(taosGetTimestampMs() - t >= interval[totalRows - 1]);
|
||||
}
|
||||
t = taosGetTimestampMs();
|
||||
|
||||
TAOS_ROW row = taos_fetch_row(tmqmessage);
|
||||
if (row == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
TAOS_FIELD* fields = taos_fetch_fields(tmqmessage);
|
||||
int32_t numOfFields = taos_field_count(tmqmessage);
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
|
||||
printf("time:%" PRId64 " rows[%d]: %s\n", t, totalRows, buf);
|
||||
totalRows++;
|
||||
taos_free_result(tmqmessage);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(totalRows == len);
|
||||
tmq_consumer_close(tmq);
|
||||
}
|
||||
|
||||
void test_case2(TAOS* pConn, int32_t* interval, int32_t len, tsem_t* sem){
|
||||
TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop database if exists d1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create database if not exists d1 vgroups 1 wal_retention_period 3600");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table d1.table1 using d1.s1 tags(1)");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table d1.table2 using d1.s1 tags(2)");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
insert_with_sleep_multi(pConn, interval, len);
|
||||
|
||||
pRes = taos_query(pConn, "create topic t1 as select * from d1.s1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
tmq_list_t* topic_list = tmq_list_new();
|
||||
|
||||
tmq_list_append(topic_list, "t1");
|
||||
tmq_t* tmq = build_consumer();
|
||||
// 启动订阅
|
||||
tmq_subscribe(tmq, topic_list);
|
||||
tmq_list_destroy(topic_list);
|
||||
|
||||
int32_t timeout = 5000;
|
||||
|
||||
int64_t t = 0;
|
||||
int32_t totalRows = 0;
|
||||
char buf[1024] = {0};
|
||||
while (1) {
|
||||
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, timeout);
|
||||
if (tmqmessage) {
|
||||
if(t != 0 && totalRows % 4 == 0){
|
||||
ASSERT(taosGetTimestampMs() - t >= interval[totalRows/4 - 1]);
|
||||
}
|
||||
t = taosGetTimestampMs();
|
||||
|
||||
while(1){
|
||||
TAOS_ROW row = taos_fetch_row(tmqmessage);
|
||||
if (row == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
TAOS_FIELD* fields = taos_fetch_fields(tmqmessage);
|
||||
int32_t numOfFields = taos_field_count(tmqmessage);
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
|
||||
printf("time:%" PRId64 " rows[%d]: %s\n", t, totalRows, buf);
|
||||
totalRows++;
|
||||
}
|
||||
|
||||
taos_free_result(tmqmessage);
|
||||
|
||||
if(totalRows == len * 4){
|
||||
taosSsleep(1);
|
||||
tsem_post(sem);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(totalRows == len * 4 + 1);
|
||||
tmq_consumer_close(tmq);
|
||||
}
|
||||
|
||||
void test_case3(TAOS* pConn, int32_t* interval, int32_t len){
|
||||
TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop database if exists d1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create database if not exists d1 vgroups 1 wal_retention_period 3600");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table d1.table1 using d1.s1 tags(1)");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table d1.table2 using d1.s1 tags(2)");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
insert_with_sleep_multi(pConn, interval, len);
|
||||
|
||||
pRes = taos_query(pConn, "create topic t1 as select * from d1.s1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
tmq_list_t* topic_list = tmq_list_new();
|
||||
|
||||
tmq_list_append(topic_list, "t1");
|
||||
tmq_t* tmq = build_consumer();
|
||||
// 启动订阅
|
||||
tmq_subscribe(tmq, topic_list);
|
||||
|
||||
int32_t timeout = 5000;
|
||||
|
||||
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, timeout);
|
||||
taos_free_result(tmqmessage);
|
||||
|
||||
tmq_consumer_close(tmq);
|
||||
|
||||
tmq = build_consumer();
|
||||
// 启动订阅
|
||||
tmq_subscribe(tmq, topic_list);
|
||||
|
||||
int64_t t = 0;
|
||||
int32_t totalRows = 0;
|
||||
char buf[1024] = {0};
|
||||
while (1) {
|
||||
tmqmessage = tmq_consumer_poll(tmq, timeout);
|
||||
if (tmqmessage) {
|
||||
if(t != 0 && totalRows % 4 == 0){
|
||||
ASSERT(taosGetTimestampMs() - t >= interval[totalRows/4 - 1]);
|
||||
}
|
||||
t = taosGetTimestampMs();
|
||||
|
||||
while(1){
|
||||
TAOS_ROW row = taos_fetch_row(tmqmessage);
|
||||
if (row == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
TAOS_FIELD* fields = taos_fetch_fields(tmqmessage);
|
||||
int32_t numOfFields = taos_field_count(tmqmessage);
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
|
||||
printf("time:%" PRId64 " rows[%d]: %s\n", t, totalRows, buf);
|
||||
totalRows++;
|
||||
}
|
||||
|
||||
taos_free_result(tmqmessage);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(totalRows == len * 4);
|
||||
|
||||
tmq_consumer_close(tmq);
|
||||
tmq_list_destroy(topic_list);
|
||||
}
|
||||
|
||||
void* insertThreadFunc(void* param) {
|
||||
tsem_t* sem = (tsem_t*)param;
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
|
||||
tsem_wait(sem);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "insert into d1.table1 (ts, c1) values (now, 11)");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
printf("insert data again\n");
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
test_vgroup_error(pConn);
|
||||
test_stable_db_error(pConn);
|
||||
|
||||
tsem_t sem;
|
||||
tsem_init(&sem, 0, 0);
|
||||
TdThread thread;
|
||||
TdThreadAttr thattr;
|
||||
taosThreadAttrInit(&thattr);
|
||||
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
// pthread_create one thread to consume
|
||||
taosThreadCreate(&thread, &thattr, insertThreadFunc, (void*)(&sem));
|
||||
|
||||
int32_t interval[5] = {1000, 200, 3000, 40, 500};
|
||||
test_case1(pConn, interval, sizeof(interval)/sizeof(int32_t));
|
||||
printf("test_case1 success\n");
|
||||
test_case2(pConn, interval, sizeof(interval)/sizeof(int32_t), &sem);
|
||||
printf("test_case2 success\n");
|
||||
test_case3(pConn, interval, sizeof(interval)/sizeof(int32_t));
|
||||
taos_close(pConn);
|
||||
|
||||
taosThreadJoin(thread, NULL);
|
||||
taosThreadClear(&thread);
|
||||
tsem_destroy(&sem);
|
||||
return 0;
|
||||
}
|
|
@ -7,18 +7,14 @@
|
|||
#include "tlog.h"
|
||||
#include "tmsg.h"
|
||||
|
||||
typedef struct {
|
||||
int32_t size;
|
||||
} STqOffsetHead;
|
||||
|
||||
int32_t tqOffsetRestoreFromFile(const char* fname) {
|
||||
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
|
||||
if (pFile != NULL) {
|
||||
STqOffsetHead head = {0};
|
||||
int32_t code;
|
||||
|
||||
while (1) {
|
||||
if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) {
|
||||
int32_t size = 0;
|
||||
if ((code = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) {
|
||||
if (code == 0) {
|
||||
break;
|
||||
} else {
|
||||
|
@ -26,7 +22,6 @@ int32_t tqOffsetRestoreFromFile(const char* fname) {
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
int32_t size = htonl(head.size);
|
||||
void* memBuf = taosMemoryCalloc(1, size);
|
||||
if (memBuf == NULL) {
|
||||
printf("memBuf == NULL\n");
|
||||
|
|
|
@ -621,10 +621,11 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
|||
taos_print_row(buf, row, fields, numOfFields);
|
||||
|
||||
if (0 != g_stConfInfo.showRowFlag) {
|
||||
taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf);
|
||||
taosFprintfFile(g_fp, "time:%" PRId64 " tbname:%s, rows[%d]: %s\n", taosGetTimestampMs(), (tbName != NULL ? tbName : "null table"), totalRows, buf);
|
||||
// if (0 != g_stConfInfo.saveRowFlag) {
|
||||
// saveConsumeContentToTbl(pInfo, buf);
|
||||
// }
|
||||
// taosFsyncFile(g_fp);
|
||||
}
|
||||
|
||||
totalRows++;
|
||||
|
|
Loading…
Reference in New Issue