feat:[TD-26056] add replay logic

This commit is contained in:
wangmm0220 2023-10-08 16:06:15 +08:00
parent f0c1585508
commit 3f2d890560
23 changed files with 192 additions and 278 deletions

View File

@ -2295,17 +2295,6 @@ int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateS
int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq);
void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq);
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
int64_t streamId;
char* sql;
char* executorMsg;
} SMVCreateStreamReq, SMSCreateStreamReq;
typedef struct {
int64_t streamId;
} SMVCreateStreamRsp, SMSCreateStreamRsp;
enum {
TOPIC_SUB_TYPE__DB = 1,
TOPIC_SUB_TYPE__TABLE,
@ -2327,16 +2316,9 @@ int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTo
int32_t tDeserializeSCMCreateTopicReq(void* buf, int32_t bufLen, SCMCreateTopicReq* pReq);
void tFreeSCMCreateTopicReq(SCMCreateTopicReq* pReq);
typedef struct {
int64_t topicId;
} SCMCreateTopicRsp;
int32_t tSerializeSCMCreateTopicRsp(void* buf, int32_t bufLen, const SCMCreateTopicRsp* pRsp);
int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicRsp* pRsp);
typedef struct {
int64_t consumerId;
} SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg;
} SMqConsumerRecoverMsg, SMqConsumerClearMsg;
typedef struct {
int64_t consumerId;
@ -2348,6 +2330,7 @@ typedef struct {
int8_t autoCommit;
int32_t autoCommitInterval;
int8_t resetOffsetCfg;
int8_t enableReplay;
} SCMSubscribeReq;
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
@ -2367,6 +2350,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
tlen += taosEncodeFixedI8(buf, pReq->autoCommit);
tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval);
tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg);
tlen += taosEncodeFixedI8(buf, pReq->enableReplay);
return tlen;
}
@ -2390,71 +2374,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
buf = taosDecodeFixedI8(buf, &pReq->autoCommit);
buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval);
buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg);
return buf;
}
typedef struct SMqSubTopic {
int32_t vgId;
int64_t topicId;
SEpSet epSet;
} SMqSubTopic;
typedef struct {
int32_t topicNum;
SMqSubTopic topics[];
} SCMSubscribeRsp;
static FORCE_INLINE int32_t tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pRsp->topicNum);
for (int32_t i = 0; i < pRsp->topicNum; i++) {
tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId);
tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId);
tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet);
}
return tlen;
}
static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) {
buf = taosDecodeFixedI32(buf, &pRsp->topicNum);
for (int32_t i = 0; i < pRsp->topicNum; i++) {
buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId);
buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId);
buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet);
}
return buf;
}
typedef struct {
int64_t topicId;
int64_t consumerId;
int64_t consumerGroupId;
int64_t offset;
char* sql;
char* logicalPlan;
char* physicalPlan;
} SMVSubscribeReq;
static FORCE_INLINE int32_t tSerializeSMVSubscribeReq(void** buf, SMVSubscribeReq* pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->topicId);
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeFixedI64(buf, pReq->consumerGroupId);
tlen += taosEncodeFixedI64(buf, pReq->offset);
tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->logicalPlan);
tlen += taosEncodeString(buf, pReq->physicalPlan);
return tlen;
}
static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->topicId);
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeFixedI64(buf, &pReq->consumerGroupId);
buf = taosDecodeFixedI64(buf, &pReq->offset);
buf = taosDecodeString(buf, &pReq->sql);
buf = taosDecodeString(buf, &pReq->logicalPlan);
buf = taosDecodeString(buf, &pReq->physicalPlan);
buf = taosDecodeFixedI8(buf, &pReq->enableReplay);
return buf;
}
@ -3534,6 +3454,7 @@ typedef struct {
int64_t consumerId;
int64_t timeout;
STqOffsetVal reqOffset;
int8_t enableReplay;
} SMqPollReq;
int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
@ -3593,6 +3514,7 @@ typedef struct {
SArray* blockData;
SArray* blockTbName;
SArray* blockSchema;
int64_t sleepTime;
} SMqDataRsp;
int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);

View File

@ -222,6 +222,7 @@ typedef struct SStoreTqReader {
bool (*tqReaderNextBlockInWal)();
bool (*tqNextBlockImpl)(); // todo remove it
SSDataBlock* (*tqGetResultBlock)();
int64_t (*tqGetResultBlockTime)();
void (*tqReaderSetColIdList)();
int32_t (*tqReaderSetQueryTableList)();

View File

@ -792,6 +792,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010)
#define TSDB_CODE_TMQ_NO_COMMITTED TAOS_DEF_ERROR_CODE(0, 0x4011)
#define TSDB_CODE_TMQ_SAME_COMMITTED_VALUE TAOS_DEF_ERROR_CODE(0, 0x4012)
#define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013)
#define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)

View File

@ -683,7 +683,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
taosHashCleanup(hashTmp);
return -1;
return TSDB_CODE_SML_INVALID_DATA;
}
}
taosHashCleanup(hashTmp);

View File

@ -256,7 +256,8 @@ int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset) {
}
if (unlikely(index >= OTD_JSON_FIELDS_NUM)) {
uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start) return -1;
uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start);
return TSDB_CODE_TSC_INVALID_JSON;
}
char *sTmp = *start;
@ -367,7 +368,8 @@ int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset) {
if (unlikely(index != OTD_JSON_FIELDS_NUM) || element->tags == NULL || element->cols == NULL ||
element->measure == NULL || element->timestamp == NULL) {
uError("elements != %d or element parse null", OTD_JSON_FIELDS_NUM) return -1;
uError("elements != %d or element parse null", OTD_JSON_FIELDS_NUM);
return TSDB_CODE_TSC_INVALID_JSON;
}
return 0;
}
@ -381,7 +383,8 @@ int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset) {
}
if (unlikely(index >= OTD_JSON_FIELDS_NUM)) {
uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start) return -1;
uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start);
return TSDB_CODE_TSC_INVALID_JSON;
}
if ((*start)[1] == 'm') {
@ -448,7 +451,8 @@ int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset) {
}
if (unlikely(index != 0 && index != OTD_JSON_FIELDS_NUM)) {
uError("elements != %d", OTD_JSON_FIELDS_NUM) return -1;
uError("elements != %d", OTD_JSON_FIELDS_NUM);
return TSDB_CODE_TSC_INVALID_JSON;
}
return 0;
}
@ -477,7 +481,7 @@ static int32_t smlGetJsonElements(cJSON *root, cJSON ***marks) {
}
if (*marks[i] == NULL) {
uError("smlGetJsonElements error, not find mark:%d:%s", i, jsonName[i]);
return -1;
return TSDB_CODE_TSC_INVALID_JSON;
}
}
return TSDB_CODE_SUCCESS;
@ -816,25 +820,25 @@ static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPr
int32_t size = cJSON_GetArraySize(root);
if (unlikely(size != OTD_JSON_SUB_FIELDS_NUM)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *value = cJSON_GetObjectItem(root, "value");
if (unlikely(!cJSON_IsNumber(value))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *type = cJSON_GetObjectItem(root, "type");
if (unlikely(!cJSON_IsString(type))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
return TSDB_CODE_TSC_INVALID_JSON;
}
double timeDouble = value->valuedouble;
if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return -1;
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
if (timeDouble == 0) {
@ -849,32 +853,28 @@ static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPr
size_t typeLen = strlen(type->valuestring);
if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
// seconds
int8_t fromPrecision = TSDB_TIME_PRECISION_SECONDS;
if (smlFactorS[toPrecision] < INT64_MAX / tsInt64) {
return tsInt64 * smlFactorS[toPrecision];
}
return -1;
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
} else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
switch (type->valuestring[0]) {
case 'm':
case 'M':
// milliseconds
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MILLI, toPrecision);
break;
case 'u':
case 'U':
// microseconds
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MICRO, toPrecision);
break;
case 'n':
case 'N':
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_NANO, toPrecision);
break;
default:
return -1;
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
} else {
return -1;
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
}
@ -895,7 +895,7 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) {
double timeDouble = timestamp->valuedouble;
if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return -1;
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
if (unlikely(timeDouble < 0)) {
@ -911,14 +911,14 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) {
if (unlikely(fromPrecision == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf,
"timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", NULL);
return -1;
return TSDB_CODE_SML_INVALID_DATA;
}
int64_t tsInt64 = timeDouble;
if (fromPrecision == TSDB_TIME_PRECISION_SECONDS) {
if (smlFactorS[toPrecision] < INT64_MAX / tsInt64) {
return tsInt64 * smlFactorS[toPrecision];
}
return -1;
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
} else {
return convertTimePrecision(timeDouble, fromPrecision, toPrecision);
}
@ -926,7 +926,7 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) {
return smlParseTSFromJSONObj(info, timestamp, toPrecision);
} else {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
return TSDB_CODE_TSC_INVALID_JSON;
}
}

View File

@ -70,7 +70,7 @@ static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t le
int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
if (unlikely(ts == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
return -1;
return TSDB_CODE_SML_INVALID_DATA;
}
return ts;
}

View File

@ -63,7 +63,7 @@ struct tmq_conf_t {
int8_t resetOffset;
int8_t withTbName;
int8_t snapEnable;
int32_t snapBatchSize;
int8_t replayEnable;
bool hbBgEnable;
uint16_t port;
int32_t autoCommitInterval;
@ -83,6 +83,7 @@ struct tmq_t {
int8_t autoCommit;
int32_t autoCommitInterval;
int8_t resetOffsetCfg;
int8_t replayEnable;
uint64_t consumerId;
bool hbBgEnable;
tmq_commit_cb* commitCb;
@ -92,19 +93,13 @@ struct tmq_t {
SRWLatch lock;
int8_t status;
int32_t epoch;
#if 0
int8_t epStatus;
int32_t epSkipCnt;
#endif
// poll info
int64_t pollCnt;
int64_t totalRows;
// bool needReportOffsetRows;
// timer
tmr_h hbLiveTimer;
tmr_h epTimer;
tmr_h reportTimer;
tmr_h commitTimer;
STscObj* pTscObj; // connection
SArray* clientTopics; // SArray<SMqClientTopic>
@ -152,6 +147,8 @@ typedef struct {
int32_t vgStatus;
int32_t vgSkipCnt; // here used to mark the slow vgroups
int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
int64_t blockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
int64_t blockSleepForReplay; // once empty block is received, idle for ignoreCnt then start to poll data
bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp.
SEpSet epSet;
} SMqClientVg;
@ -360,24 +357,6 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
}
if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
conf->snapBatchSize = taosStr2int64(value);
return TMQ_CONF_OK;
}
// if (strcasecmp(key, "enable.heartbeat.background") == 0) {
// if (strcasecmp(value, "true") == 0) {
// conf->hbBgEnable = true;
// return TMQ_CONF_OK;
// } else if (strcasecmp(value, "false") == 0) {
// conf->hbBgEnable = false;
// return TMQ_CONF_OK;
// } else {
// tscError("the default value of enable.heartbeat.background is true, can not be seted");
// return TMQ_CONF_INVALID;
// }
// }
if (strcasecmp(key, "td.connect.ip") == 0) {
conf->ip = taosStrdup(value);
return TMQ_CONF_OK;
@ -398,6 +377,18 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
return TMQ_CONF_OK;
}
if (strcasecmp(key, "enable.replay") == 0) {
if (strcasecmp(value, "true") == 0) {
conf->replayEnable = true;
return TMQ_CONF_OK;
} else if (strcasecmp(value, "false") == 0) {
conf->replayEnable = false;
return TMQ_CONF_OK;
} else {
return TMQ_CONF_INVALID;
}
}
if (strcasecmp(key, "td.connect.db") == 0) {
return TMQ_CONF_OK;
}
@ -1075,6 +1066,10 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->commitCb = conf->commitCb;
pTmq->commitCbUserParam = conf->commitCbUserParam;
pTmq->resetOffsetCfg = conf->resetOffset;
pTmq->replayEnable = conf->replayEnable;
if(conf->replayEnable){
pTmq->autoCommit = false;
}
taosInitRWLatch(&pTmq->lock);
pTmq->hbBgEnable = conf->hbBgEnable;
@ -1424,6 +1419,8 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
.vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
.vgSkipCnt = 0,
.emptyBlockReceiveTs = 0,
.blockReceiveTs = 0,
.blockSleepForReplay = 0,
.numOfRows = pInfo ? pInfo->numOfRows : 0,
};
@ -1695,6 +1692,12 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
continue;
}
if (tmq->replayEnable && taosGetTimestampMs() - pVg->blockReceiveTs < pVg->blockSleepForReplay) { // less than 10ms
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay", tmq->consumerId,
tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
continue;
}
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus == TMQ_VG_STATUS__WAIT) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
@ -1816,6 +1819,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
tmq->totalRows += numOfRows;
pVg->emptyBlockReceiveTs = 0;
if(tmq->replayEnable){
pVg->blockReceiveTs = taosGetTimestampMs();
pVg->blockSleepForReplay = pRsp->rsp.sleepTime;
}
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,

View File

@ -4352,31 +4352,6 @@ void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) {
}
}
int32_t tSerializeSCMCreateTopicRsp(void *buf, int32_t bufLen, const SCMCreateTopicRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->topicId) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSCMCreateTopicRsp(void *buf, int32_t bufLen, SCMCreateTopicRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->topicId) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -5983,6 +5958,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1;
if (tEncodeI64(&encoder, pReq->timeout) < 0) return -1;
if (tSerializeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1;
if (tEncodeI8(&encoder, pReq->enableReplay) < 0) return -1;
tEndEncode(&encoder);
@ -6019,6 +5995,10 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
if (tDecodeI64(&decoder, &pReq->timeout) < 0) return -1;
if (tDerializeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI8(&decoder, &pReq->enableReplay) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
@ -7795,6 +7775,7 @@ int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
}
}
}
if (tEncodeI64(pEncoder, pRsp->sleepTime) < 0) return -1;
return 0;
}
@ -7840,6 +7821,8 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
}
}
}
if (tDecodeI64(pDecoder, &pRsp->sleepTime) < 0) return -1;
return 0;
}

View File

@ -490,32 +490,6 @@ typedef struct {
char filterTb[TSDB_TABLE_NAME_LEN];
} SShowObj;
typedef struct {
int64_t id;
int8_t type;
int8_t replica;
int16_t numOfColumns;
int32_t rowSize;
int32_t numOfRows;
int32_t numOfReads;
int32_t payloadLen;
void* pIter;
SMnode* pMnode;
char db[TSDB_DB_FNAME_LEN];
int16_t offset[TSDB_MAX_COLUMNS];
int32_t bytes[TSDB_MAX_COLUMNS];
char payload[];
} SSysTableRetrieveObj;
typedef struct {
char key[TSDB_PARTITION_KEY_LEN];
int64_t dbUid;
int64_t offset;
} SMqOffsetObj;
int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset);
void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset);
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];

View File

@ -18,6 +18,7 @@
#include "mndPrivilege.h"
#include "mndVgroup.h"
#include "mndShow.h"
#include "mndDb.h"
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
@ -124,7 +125,7 @@ void mndRebCntDec() {
}
}
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) {
int32_t numOfTopics = taosArrayGetSize(pTopicList);
for (int32_t i = 0; i < numOfTopics; i++) {
@ -139,6 +140,22 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
return -1;
}
if(enableReplay){
if(pTopic->subType != TOPIC_SUB_TYPE__COLUMN){
return TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
}else if(pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) {
SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
if (pDb == NULL) {
mndReleaseTopic(pMnode, pTopic);
return -1;
}
if (pDb->cfg.numOfVgroups != 1) {
return TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP;
}
mndReleaseDb(pMnode, pDb);
}
}
mndTransSetDbName(pTrans, pOneTopic, NULL);
if(mndTransCheckConflict(pMnode, pTrans) != 0){
mndReleaseTopic(pMnode, pTopic);
@ -177,7 +194,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
if (pTrans == NULL) {
goto FAIL;
}
if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user) != 0){
if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false) != 0){
goto FAIL;
}
@ -697,7 +714,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
goto _over;
}
code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user);
code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user, subscribe.enableReplay);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}

View File

@ -330,24 +330,6 @@ void dumpSubscribe(SSdb *pSdb, SJson *json) {
}
}
void dumpOffset(SSdb *pSdb, SJson *json) {
void *pIter = NULL;
SJson *items = tjsonAddArrayToObject(json, "offsets");
while (1) {
SMqOffsetObj *pObj = NULL;
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pObj);
if (pIter == NULL) break;
SJson *item = tjsonCreateObject();
tjsonAddItemToArray(items, item);
tjsonAddStringToObject(item, "key", pObj->key);
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
tjsonAddStringToObject(item, "offset", i642str(pObj->offset));
sdbRelease(pSdb, pObj);
}
}
void dumpStream(SSdb *pSdb, SJson *json) {
void *pIter = NULL;
SJson *items = tjsonAddArrayToObject(json, "streams");
@ -608,7 +590,7 @@ void mndDumpSdb() {
dumpTopic(pSdb, json);
dumpConsumer(pSdb, json);
dumpSubscribe(pSdb, json);
dumpOffset(pSdb, json);
// dumpOffset(pSdb, json);
dumpStream(pSdb, json);
dumpAcct(pSdb, json);
dumpAuth(pSdb, json);

View File

@ -298,11 +298,6 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic
atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
/*taosWLockLatch(&pOldTopic->lock);*/
// TODO handle update
/*taosWUnLockLatch(&pOldTopic->lock);*/
return 0;
}
@ -320,23 +315,6 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
sdbRelease(pSdb, pTopic);
}
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
int32_t contLen = sizeof(SDDropTopicReq);
SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen);
if (pDrop == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pDrop->head.contLen = htonl(contLen);
pDrop->head.vgId = htonl(pVgroup->vgId);
memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
pDrop->tuid = htobe64(pTopic->uid);
return pDrop;
}
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
terrno = TSDB_CODE_MND_INVALID_TOPIC;

View File

@ -213,7 +213,7 @@ typedef struct STqReader {
SPackedData msg;
SSubmitReq2 submit;
int32_t nextBlk;
int64_t lastBlkUid;
int64_t lastTs;
SWalReader *pWalReader;
SMeta *pVnodeMeta;
SHashObj *tbIdHash;
@ -241,6 +241,7 @@ bool tqNextBlockInWal(STqReader *pReader, const char *idstr);
bool tqNextBlockImpl(STqReader *pReader, const char *idstr);
SWalReader *tqGetWalReader(STqReader *pReader);
SSDataBlock *tqGetResultBlock(STqReader *pReader);
int64_t tqGetResultBlockTime(STqReader *pReader);
int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);

View File

@ -47,6 +47,7 @@ typedef struct STqOffsetStore STqOffsetStore;
// tqPush
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
#define STREAM_EXEC_TASK_STATUS_CHECK_ID (-2)
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
// tqExec
typedef struct {
@ -91,6 +92,10 @@ typedef struct {
STqExecHandle execHandle; // exec
SRpcMsg* msg;
tq_handle_status status;
// for replay
SSDataBlock* block;
int64_t blockTime;
} STqHandle;
struct STQ {
@ -108,17 +113,13 @@ struct STQ {
SStreamMeta* pStreamMeta;
};
typedef struct {
int32_t size;
} STqOffsetHead;
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
void tqDestroyTqHandle(void* data);
// tqRead
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest);
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
// tqExec

View File

@ -82,6 +82,9 @@ void tqDestroyTqHandle(void* data) {
taosMemoryFree(pData->msg);
pData->msg = NULL;
}
if (pData->block != NULL){
blockDataDestroy(pData->block);
}
}
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {

View File

@ -37,11 +37,9 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
int32_t vgId = TD_VID(pStore->pTq->pVnode);
int64_t code = 0;
STqOffsetHead head = {0};
int32_t size = 0;
while (1) {
if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) {
if ((code = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) {
if (code == 0) {
break;
} else {
@ -49,7 +47,6 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
}
}
int32_t size = htonl(head.size);
void* pMemBuf = taosMemoryCalloc(1, size);
if (pMemBuf == NULL) {
tqError("vgId:%d failed to restore offset from file, since out of memory, malloc size:%d", vgId, size);
@ -175,11 +172,11 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
return -1;
}
int32_t totLen = sizeof(STqOffsetHead) + bodyLen;
int32_t totLen = INT_BYTES + bodyLen;
void* buf = taosMemoryCalloc(1, totLen);
void* abuf = POINTER_SHIFT(buf, sizeof(STqOffsetHead));
void* abuf = POINTER_SHIFT(buf, INT_BYTES);
((STqOffsetHead*)buf)->size = htonl(bodyLen);
*(int32_t*)buf = bodyLen;
SEncoder encoder;
tEncoderInit(&encoder, abuf, bodyLen);
tEncodeSTqOffset(&encoder, pOffset);

View File

@ -475,6 +475,10 @@ SSDataBlock* tqGetResultBlock (STqReader* pReader) {
return pReader->pResBlock;
}
int64_t tqGetResultBlockTime(STqReader *pReader){
return pReader->lastTs;
}
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
if (pReader->msg.msgStr == NULL) {
return false;
@ -641,7 +645,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid;
pReader->lastBlkUid = uid;
pReader->lastTs = pSubmitTbData->ctimeMs;
pBlock->info.id.uid = uid;
pBlock->info.version = pReader->msg.ver;
@ -783,9 +787,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
}
int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid;
pReader->lastBlkUid = uid;
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);

View File

@ -64,7 +64,23 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in
return 0;
}
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res){
uint64_t ts = 0;
qStreamSetOpen(task);
tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
int32_t code = qExecTask(task, res, &ts);
if (code != TSDB_CODE_SUCCESS) {
tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
terrno = code;
return -1;
}
tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res);
return 0;
}
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
const int32_t MAX_ROWS_TO_RETURN = 4096;
int32_t vgId = TD_VID(pTq->pVnode);
@ -80,34 +96,63 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
qStreamSetOpen(task);
tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
code = qExecTask(task, &pDataBlock, &ts);
if (code != TSDB_CODE_SUCCESS) {
tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
terrno = code;
return -1;
}
tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId,
pDataBlock);
// current scan should be stopped asap, since the rebalance occurs.
if (pDataBlock == NULL) {
break;
}
code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, failed to add block to rsp msg", vgId);
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
if (code != 0){
return code;
}
pRsp->blockNum++;
totalRows += pDataBlock->info.rows;
if (totalRows >= MAX_ROWS_TO_RETURN) {
if(pRequest->enableReplay){
if(IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL){
blockDataDestroy(pHandle->block);
pHandle->block = NULL;
}
if(pHandle->block == NULL){
if (pDataBlock == NULL) {
break;
}
STqOffsetVal offset = {0};
qStreamExtractOffset(task, &offset);
pHandle->block = createDataBlock();
copyDataBlock(pHandle->block, pDataBlock);
pHandle->blockTime = offset.ts;
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
if (code != 0){
return code;
}
}
code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, failed to add block to rsp msg", vgId);
return code;
}
pRsp->blockNum++;
if (pDataBlock == NULL) {
break;
}
copyDataBlock(pHandle->block, pDataBlock);
STqOffsetVal offset = {0};
qStreamExtractOffset(task, &offset);
pRsp->sleepTime = offset.ts - pHandle->blockTime;
pHandle->blockTime = offset.ts;
break;
}else{
if (pDataBlock == NULL) {
break;
}
code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, failed to add block to rsp msg", vgId);
return code;
}
pRsp->blockNum++;
totalRows += pDataBlock->info.rows;
if (totalRows >= MAX_ROWS_TO_RETURN) {
break;
}
}
}

View File

@ -15,8 +15,6 @@
#include "tq.h"
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqMetaRsp* pRsp, int32_t vgId);
@ -142,7 +140,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
tqInitDataRsp(&dataRsp, *pOffset);
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
goto end;
}

View File

@ -131,6 +131,7 @@ void initTqAPI(SStoreTqReader* pTq) {
pTq->tqGetResultBlock = tqGetResultBlock;
pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
pTq->tqGetResultBlockTime = tqGetResultBlockTime;
}
void initStateStoreAPI(SStateStore* pStore) {

View File

@ -1862,6 +1862,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
// curVersion move to next
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion);
// use ts to pass time when replay, because ts not used if type is log
pTaskInfo->streamInfo.currentOffset.ts = pAPI->tqReaderFn.tqGetResultBlockTime(pInfo->tqReader);
if (hasResult) {
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version);

View File

@ -654,6 +654,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE, "Topic num out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of range 100")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed value")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable")
// stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")

View File

@ -7,18 +7,14 @@
#include "tlog.h"
#include "tmsg.h"
typedef struct {
int32_t size;
} STqOffsetHead;
int32_t tqOffsetRestoreFromFile(const char* fname) {
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
if (pFile != NULL) {
STqOffsetHead head = {0};
int32_t code;
while (1) {
if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) {
int32_t size = 0;
if ((code = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) {
if (code == 0) {
break;
} else {
@ -26,7 +22,6 @@ int32_t tqOffsetRestoreFromFile(const char* fname) {
return -1;
}
}
int32_t size = htonl(head.size);
void* memBuf = taosMemoryCalloc(1, size);
if (memBuf == NULL) {
printf("memBuf == NULL\n");