From 3f2d8905607a63ded0599bbe08ccde03d6e49d56 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 8 Oct 2023 16:06:15 +0800 Subject: [PATCH 01/11] feat:[TD-26056] add replay logic --- include/common/tmsg.h | 90 ++------------------- include/libs/executor/storageapi.h | 1 + include/util/taoserror.h | 2 + source/client/src/clientSml.c | 2 +- source/client/src/clientSmlJson.c | 40 +++++----- source/client/src/clientSmlLine.c | 2 +- source/client/src/clientTmq.c | 57 ++++++++------ source/common/src/tmsg.c | 33 ++------ source/dnode/mnode/impl/inc/mndDef.h | 26 ------- source/dnode/mnode/impl/src/mndConsumer.c | 23 +++++- source/dnode/mnode/impl/src/mndDump.c | 20 +---- source/dnode/mnode/impl/src/mndTopic.c | 22 ------ source/dnode/vnode/inc/vnode.h | 3 +- source/dnode/vnode/src/inc/tq.h | 11 +-- source/dnode/vnode/src/tq/tq.c | 3 + source/dnode/vnode/src/tq/tqOffset.c | 13 ++-- source/dnode/vnode/src/tq/tqRead.c | 8 +- source/dnode/vnode/src/tq/tqScan.c | 95 +++++++++++++++++------ source/dnode/vnode/src/tq/tqUtil.c | 4 +- source/dnode/vnode/src/vnd/vnodeInitApi.c | 1 + source/libs/executor/src/scanoperator.c | 3 + source/util/src/terror.c | 2 + utils/test/c/tmqOffset.c | 9 +-- 23 files changed, 192 insertions(+), 278 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index af44184de6..90147a3c31 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2295,17 +2295,6 @@ int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateS int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq); void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq); -typedef struct { - char name[TSDB_STREAM_FNAME_LEN]; - int64_t streamId; - char* sql; - char* executorMsg; -} SMVCreateStreamReq, SMSCreateStreamReq; - -typedef struct { - int64_t streamId; -} SMVCreateStreamRsp, SMSCreateStreamRsp; - enum { TOPIC_SUB_TYPE__DB = 1, TOPIC_SUB_TYPE__TABLE, @@ -2327,16 +2316,9 @@ int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTo int32_t tDeserializeSCMCreateTopicReq(void* buf, int32_t bufLen, SCMCreateTopicReq* pReq); void tFreeSCMCreateTopicReq(SCMCreateTopicReq* pReq); -typedef struct { - int64_t topicId; -} SCMCreateTopicRsp; - -int32_t tSerializeSCMCreateTopicRsp(void* buf, int32_t bufLen, const SCMCreateTopicRsp* pRsp); -int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicRsp* pRsp); - typedef struct { int64_t consumerId; -} SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg; +} SMqConsumerRecoverMsg, SMqConsumerClearMsg; typedef struct { int64_t consumerId; @@ -2348,6 +2330,7 @@ typedef struct { int8_t autoCommit; int32_t autoCommitInterval; int8_t resetOffsetCfg; + int8_t enableReplay; } SCMSubscribeReq; static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { @@ -2367,6 +2350,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc tlen += taosEncodeFixedI8(buf, pReq->autoCommit); tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval); tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg); + tlen += taosEncodeFixedI8(buf, pReq->enableReplay); return tlen; } @@ -2390,71 +2374,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq buf = taosDecodeFixedI8(buf, &pReq->autoCommit); buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval); buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); - return buf; -} - -typedef struct SMqSubTopic { - int32_t vgId; - int64_t topicId; - SEpSet epSet; -} SMqSubTopic; - -typedef struct { - int32_t topicNum; - SMqSubTopic topics[]; -} SCMSubscribeRsp; - -static FORCE_INLINE int32_t tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { - int32_t tlen = 0; - tlen += taosEncodeFixedI32(buf, pRsp->topicNum); - for (int32_t i = 0; i < pRsp->topicNum; i++) { - tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId); - tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId); - tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet); - } - return tlen; -} - -static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) { - buf = taosDecodeFixedI32(buf, &pRsp->topicNum); - for (int32_t i = 0; i < pRsp->topicNum; i++) { - buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId); - buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId); - buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet); - } - return buf; -} - -typedef struct { - int64_t topicId; - int64_t consumerId; - int64_t consumerGroupId; - int64_t offset; - char* sql; - char* logicalPlan; - char* physicalPlan; -} SMVSubscribeReq; - -static FORCE_INLINE int32_t tSerializeSMVSubscribeReq(void** buf, SMVSubscribeReq* pReq) { - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pReq->topicId); - tlen += taosEncodeFixedI64(buf, pReq->consumerId); - tlen += taosEncodeFixedI64(buf, pReq->consumerGroupId); - tlen += taosEncodeFixedI64(buf, pReq->offset); - tlen += taosEncodeString(buf, pReq->sql); - tlen += taosEncodeString(buf, pReq->logicalPlan); - tlen += taosEncodeString(buf, pReq->physicalPlan); - return tlen; -} - -static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq* pReq) { - buf = taosDecodeFixedI64(buf, &pReq->topicId); - buf = taosDecodeFixedI64(buf, &pReq->consumerId); - buf = taosDecodeFixedI64(buf, &pReq->consumerGroupId); - buf = taosDecodeFixedI64(buf, &pReq->offset); - buf = taosDecodeString(buf, &pReq->sql); - buf = taosDecodeString(buf, &pReq->logicalPlan); - buf = taosDecodeString(buf, &pReq->physicalPlan); + buf = taosDecodeFixedI8(buf, &pReq->enableReplay); return buf; } @@ -3534,6 +3454,7 @@ typedef struct { int64_t consumerId; int64_t timeout; STqOffsetVal reqOffset; + int8_t enableReplay; } SMqPollReq; int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq); @@ -3593,6 +3514,7 @@ typedef struct { SArray* blockData; SArray* blockTbName; SArray* blockSchema; + int64_t sleepTime; } SMqDataRsp; int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp); diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 0a240dd8f5..fa110d66e2 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -222,6 +222,7 @@ typedef struct SStoreTqReader { bool (*tqReaderNextBlockInWal)(); bool (*tqNextBlockImpl)(); // todo remove it SSDataBlock* (*tqGetResultBlock)(); + int64_t (*tqGetResultBlockTime)(); void (*tqReaderSetColIdList)(); int32_t (*tqReaderSetQueryTableList)(); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 39bf2b5681..eb991379c4 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -792,6 +792,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010) #define TSDB_CODE_TMQ_NO_COMMITTED TAOS_DEF_ERROR_CODE(0, 0x4011) #define TSDB_CODE_TMQ_SAME_COMMITTED_VALUE TAOS_DEF_ERROR_CODE(0, 0x4012) +#define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013) +#define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 10f8b89f4d..91c21fe344 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -683,7 +683,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) { taosHashCleanup(hashTmp); - return -1; + return TSDB_CODE_SML_INVALID_DATA; } } taosHashCleanup(hashTmp); diff --git a/source/client/src/clientSmlJson.c b/source/client/src/clientSmlJson.c index f9076112c4..5e656c71a7 100644 --- a/source/client/src/clientSmlJson.c +++ b/source/client/src/clientSmlJson.c @@ -256,7 +256,8 @@ int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset) { } if (unlikely(index >= OTD_JSON_FIELDS_NUM)) { - uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start) return -1; + uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start); + return TSDB_CODE_TSC_INVALID_JSON; } char *sTmp = *start; @@ -367,7 +368,8 @@ int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset) { if (unlikely(index != OTD_JSON_FIELDS_NUM) || element->tags == NULL || element->cols == NULL || element->measure == NULL || element->timestamp == NULL) { - uError("elements != %d or element parse null", OTD_JSON_FIELDS_NUM) return -1; + uError("elements != %d or element parse null", OTD_JSON_FIELDS_NUM); + return TSDB_CODE_TSC_INVALID_JSON; } return 0; } @@ -381,7 +383,8 @@ int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset) { } if (unlikely(index >= OTD_JSON_FIELDS_NUM)) { - uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start) return -1; + uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start); + return TSDB_CODE_TSC_INVALID_JSON; } if ((*start)[1] == 'm') { @@ -448,7 +451,8 @@ int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset) { } if (unlikely(index != 0 && index != OTD_JSON_FIELDS_NUM)) { - uError("elements != %d", OTD_JSON_FIELDS_NUM) return -1; + uError("elements != %d", OTD_JSON_FIELDS_NUM); + return TSDB_CODE_TSC_INVALID_JSON; } return 0; } @@ -477,7 +481,7 @@ static int32_t smlGetJsonElements(cJSON *root, cJSON ***marks) { } if (*marks[i] == NULL) { uError("smlGetJsonElements error, not find mark:%d:%s", i, jsonName[i]); - return -1; + return TSDB_CODE_TSC_INVALID_JSON; } } return TSDB_CODE_SUCCESS; @@ -816,25 +820,25 @@ static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPr int32_t size = cJSON_GetArraySize(root); if (unlikely(size != OTD_JSON_SUB_FIELDS_NUM)) { smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL); - return -1; + return TSDB_CODE_TSC_INVALID_JSON; } cJSON *value = cJSON_GetObjectItem(root, "value"); if (unlikely(!cJSON_IsNumber(value))) { smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL); - return -1; + return TSDB_CODE_TSC_INVALID_JSON; } cJSON *type = cJSON_GetObjectItem(root, "type"); if (unlikely(!cJSON_IsString(type))) { smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL); - return -1; + return TSDB_CODE_TSC_INVALID_JSON; } double timeDouble = value->valuedouble; if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) { smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL); - return -1; + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } if (timeDouble == 0) { @@ -849,32 +853,28 @@ static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPr size_t typeLen = strlen(type->valuestring); if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) { // seconds - int8_t fromPrecision = TSDB_TIME_PRECISION_SECONDS; if (smlFactorS[toPrecision] < INT64_MAX / tsInt64) { return tsInt64 * smlFactorS[toPrecision]; } - return -1; + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) { switch (type->valuestring[0]) { case 'm': case 'M': // milliseconds return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MILLI, toPrecision); - break; case 'u': case 'U': // microseconds return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MICRO, toPrecision); - break; case 'n': case 'N': return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_NANO, toPrecision); - break; default: - return -1; + return TSDB_CODE_TSC_INVALID_JSON_TYPE; } } else { - return -1; + return TSDB_CODE_TSC_INVALID_JSON_TYPE; } } @@ -895,7 +895,7 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) { double timeDouble = timestamp->valuedouble; if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) { smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL); - return -1; + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } if (unlikely(timeDouble < 0)) { @@ -911,14 +911,14 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) { if (unlikely(fromPrecision == -1)) { smlBuildInvalidDataMsg(&info->msgBuf, "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", NULL); - return -1; + return TSDB_CODE_SML_INVALID_DATA; } int64_t tsInt64 = timeDouble; if (fromPrecision == TSDB_TIME_PRECISION_SECONDS) { if (smlFactorS[toPrecision] < INT64_MAX / tsInt64) { return tsInt64 * smlFactorS[toPrecision]; } - return -1; + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } else { return convertTimePrecision(timeDouble, fromPrecision, toPrecision); } @@ -926,7 +926,7 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) { return smlParseTSFromJSONObj(info, timestamp, toPrecision); } else { smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL); - return -1; + return TSDB_CODE_TSC_INVALID_JSON; } } diff --git a/source/client/src/clientSmlLine.c b/source/client/src/clientSmlLine.c index a565fb1a21..006475654a 100644 --- a/source/client/src/clientSmlLine.c +++ b/source/client/src/clientSmlLine.c @@ -70,7 +70,7 @@ static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t le int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision); if (unlikely(ts == -1)) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data); - return -1; + return TSDB_CODE_SML_INVALID_DATA; } return ts; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 6ee5508048..50b8eb1eca 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -63,7 +63,7 @@ struct tmq_conf_t { int8_t resetOffset; int8_t withTbName; int8_t snapEnable; - int32_t snapBatchSize; + int8_t replayEnable; bool hbBgEnable; uint16_t port; int32_t autoCommitInterval; @@ -83,6 +83,7 @@ struct tmq_t { int8_t autoCommit; int32_t autoCommitInterval; int8_t resetOffsetCfg; + int8_t replayEnable; uint64_t consumerId; bool hbBgEnable; tmq_commit_cb* commitCb; @@ -92,19 +93,13 @@ struct tmq_t { SRWLatch lock; int8_t status; int32_t epoch; -#if 0 - int8_t epStatus; - int32_t epSkipCnt; -#endif // poll info int64_t pollCnt; int64_t totalRows; -// bool needReportOffsetRows; // timer tmr_h hbLiveTimer; tmr_h epTimer; - tmr_h reportTimer; tmr_h commitTimer; STscObj* pTscObj; // connection SArray* clientTopics; // SArray @@ -152,6 +147,8 @@ typedef struct { int32_t vgStatus; int32_t vgSkipCnt; // here used to mark the slow vgroups int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data + int64_t blockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data + int64_t blockSleepForReplay; // once empty block is received, idle for ignoreCnt then start to poll data bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp. SEpSet epSet; } SMqClientVg; @@ -360,24 +357,6 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } } - if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) { - conf->snapBatchSize = taosStr2int64(value); - return TMQ_CONF_OK; - } - -// if (strcasecmp(key, "enable.heartbeat.background") == 0) { - // if (strcasecmp(value, "true") == 0) { - // conf->hbBgEnable = true; - // return TMQ_CONF_OK; - // } else if (strcasecmp(value, "false") == 0) { - // conf->hbBgEnable = false; - // return TMQ_CONF_OK; - // } else { -// tscError("the default value of enable.heartbeat.background is true, can not be seted"); -// return TMQ_CONF_INVALID; - // } -// } - if (strcasecmp(key, "td.connect.ip") == 0) { conf->ip = taosStrdup(value); return TMQ_CONF_OK; @@ -398,6 +377,18 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value return TMQ_CONF_OK; } + if (strcasecmp(key, "enable.replay") == 0) { + if (strcasecmp(value, "true") == 0) { + conf->replayEnable = true; + return TMQ_CONF_OK; + } else if (strcasecmp(value, "false") == 0) { + conf->replayEnable = false; + return TMQ_CONF_OK; + } else { + return TMQ_CONF_INVALID; + } + } + if (strcasecmp(key, "td.connect.db") == 0) { return TMQ_CONF_OK; } @@ -1075,6 +1066,10 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->commitCb = conf->commitCb; pTmq->commitCbUserParam = conf->commitCbUserParam; pTmq->resetOffsetCfg = conf->resetOffset; + pTmq->replayEnable = conf->replayEnable; + if(conf->replayEnable){ + pTmq->autoCommit = false; + } taosInitRWLatch(&pTmq->lock); pTmq->hbBgEnable = conf->hbBgEnable; @@ -1424,6 +1419,8 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, .emptyBlockReceiveTs = 0, + .blockReceiveTs = 0, + .blockSleepForReplay = 0, .numOfRows = pInfo ? pInfo->numOfRows : 0, }; @@ -1695,6 +1692,12 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { continue; } + if (tmq->replayEnable && taosGetTimestampMs() - pVg->blockReceiveTs < pVg->blockSleepForReplay) { // less than 10ms + tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay", tmq->consumerId, + tmq->epoch, pVg->vgId, pVg->blockSleepForReplay); + continue; + } + int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus == TMQ_VG_STATUS__WAIT) { int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); @@ -1816,6 +1819,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); tmq->totalRows += numOfRows; pVg->emptyBlockReceiveTs = 0; + if(tmq->replayEnable){ + pVg->blockReceiveTs = taosGetTimestampMs(); + pVg->blockSleepForReplay = pRsp->rsp.sleepTime; + } tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index fd39ae98d9..7d285434d9 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4352,31 +4352,6 @@ void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) { } } -int32_t tSerializeSCMCreateTopicRsp(void *buf, int32_t bufLen, const SCMCreateTopicRsp *pRsp) { - SEncoder encoder = {0}; - tEncoderInit(&encoder, buf, bufLen); - - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeI64(&encoder, pRsp->topicId) < 0) return -1; - tEndEncode(&encoder); - - int32_t tlen = encoder.pos; - tEncoderClear(&encoder); - return tlen; -} - -int32_t tDeserializeSCMCreateTopicRsp(void *buf, int32_t bufLen, SCMCreateTopicRsp *pRsp) { - SDecoder decoder = {0}; - tDecoderInit(&decoder, buf, bufLen); - - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeI64(&decoder, &pRsp->topicId) < 0) return -1; - tEndDecode(&decoder); - - tDecoderClear(&decoder); - return 0; -} - int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -5983,6 +5958,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1; if (tEncodeI64(&encoder, pReq->timeout) < 0) return -1; if (tSerializeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1; + if (tEncodeI8(&encoder, pReq->enableReplay) < 0) return -1; tEndEncode(&encoder); @@ -6019,6 +5995,10 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (tDecodeI64(&decoder, &pReq->timeout) < 0) return -1; if (tDerializeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1; + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI8(&decoder, &pReq->enableReplay) < 0) return -1; + } + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -7795,6 +7775,7 @@ int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { } } } + if (tEncodeI64(pEncoder, pRsp->sleepTime) < 0) return -1; return 0; } @@ -7840,6 +7821,8 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { } } } + if (tDecodeI64(pDecoder, &pRsp->sleepTime) < 0) return -1; + return 0; } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 844e69e659..1715f56091 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -490,32 +490,6 @@ typedef struct { char filterTb[TSDB_TABLE_NAME_LEN]; } SShowObj; -typedef struct { - int64_t id; - int8_t type; - int8_t replica; - int16_t numOfColumns; - int32_t rowSize; - int32_t numOfRows; - int32_t numOfReads; - int32_t payloadLen; - void* pIter; - SMnode* pMnode; - char db[TSDB_DB_FNAME_LEN]; - int16_t offset[TSDB_MAX_COLUMNS]; - int32_t bytes[TSDB_MAX_COLUMNS]; - char payload[]; -} SSysTableRetrieveObj; - -typedef struct { - char key[TSDB_PARTITION_KEY_LEN]; - int64_t dbUid; - int64_t offset; -} SMqOffsetObj; - -int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset); -void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset); - typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN]; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 7273e13317..05156e1427 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -18,6 +18,7 @@ #include "mndPrivilege.h" #include "mndVgroup.h" #include "mndShow.h" +#include "mndDb.h" #include "mndSubscribe.h" #include "mndTopic.h" #include "mndTrans.h" @@ -124,7 +125,7 @@ void mndRebCntDec() { } } -static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser) { +static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) { int32_t numOfTopics = taosArrayGetSize(pTopicList); for (int32_t i = 0; i < numOfTopics; i++) { @@ -139,6 +140,22 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode * return -1; } + if(enableReplay){ + if(pTopic->subType != TOPIC_SUB_TYPE__COLUMN){ + return TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT; + }else if(pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) { + SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db); + if (pDb == NULL) { + mndReleaseTopic(pMnode, pTopic); + return -1; + } + if (pDb->cfg.numOfVgroups != 1) { + return TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP; + } + mndReleaseDb(pMnode, pDb); + } + } + mndTransSetDbName(pTrans, pOneTopic, NULL); if(mndTransCheckConflict(pMnode, pTrans) != 0){ mndReleaseTopic(pMnode, pTopic); @@ -177,7 +194,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { if (pTrans == NULL) { goto FAIL; } - if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user) != 0){ + if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false) != 0){ goto FAIL; } @@ -697,7 +714,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto _over; } - code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user); + code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user, subscribe.enableReplay); if (code != TSDB_CODE_SUCCESS) { goto _over; } diff --git a/source/dnode/mnode/impl/src/mndDump.c b/source/dnode/mnode/impl/src/mndDump.c index 62b5cb00e6..481495cbe5 100644 --- a/source/dnode/mnode/impl/src/mndDump.c +++ b/source/dnode/mnode/impl/src/mndDump.c @@ -330,24 +330,6 @@ void dumpSubscribe(SSdb *pSdb, SJson *json) { } } -void dumpOffset(SSdb *pSdb, SJson *json) { - void *pIter = NULL; - SJson *items = tjsonAddArrayToObject(json, "offsets"); - - while (1) { - SMqOffsetObj *pObj = NULL; - pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pObj); - if (pIter == NULL) break; - - SJson *item = tjsonCreateObject(); - tjsonAddItemToArray(items, item); - tjsonAddStringToObject(item, "key", pObj->key); - tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid)); - tjsonAddStringToObject(item, "offset", i642str(pObj->offset)); - sdbRelease(pSdb, pObj); - } -} - void dumpStream(SSdb *pSdb, SJson *json) { void *pIter = NULL; SJson *items = tjsonAddArrayToObject(json, "streams"); @@ -608,7 +590,7 @@ void mndDumpSdb() { dumpTopic(pSdb, json); dumpConsumer(pSdb, json); dumpSubscribe(pSdb, json); - dumpOffset(pSdb, json); +// dumpOffset(pSdb, json); dumpStream(pSdb, json); dumpAcct(pSdb, json); dumpAuth(pSdb, json); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 94fd6027c0..4b5cfc830f 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -298,11 +298,6 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime); atomic_exchange_32(&pOldTopic->version, pNewTopic->version); - /*taosWLockLatch(&pOldTopic->lock);*/ - - // TODO handle update - - /*taosWUnLockLatch(&pOldTopic->lock);*/ return 0; } @@ -320,23 +315,6 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) { sdbRelease(pSdb, pTopic); } -static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) { - int32_t contLen = sizeof(SDDropTopicReq); - - SDDropTopicReq *pDrop = taosMemoryCalloc(1, contLen); - if (pDrop == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pDrop->head.contLen = htonl(contLen); - pDrop->head.vgId = htonl(pVgroup->vgId); - memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN); - pDrop->tuid = htobe64(pTopic->uid); - - return pDrop; -} - static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) { terrno = TSDB_CODE_MND_INVALID_TOPIC; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e15f5f911d..ee0f0e2eeb 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -213,7 +213,7 @@ typedef struct STqReader { SPackedData msg; SSubmitReq2 submit; int32_t nextBlk; - int64_t lastBlkUid; + int64_t lastTs; SWalReader *pWalReader; SMeta *pVnodeMeta; SHashObj *tbIdHash; @@ -241,6 +241,7 @@ bool tqNextBlockInWal(STqReader *pReader, const char *idstr); bool tqNextBlockImpl(STqReader *pReader, const char *idstr); SWalReader *tqGetWalReader(STqReader *pReader); SSDataBlock *tqGetResultBlock(STqReader *pReader); +int64_t tqGetResultBlockTime(STqReader *pReader); int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index ee96e602d8..5d4ea8699b 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -47,6 +47,7 @@ typedef struct STqOffsetStore STqOffsetStore; // tqPush #define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) #define STREAM_EXEC_TASK_STATUS_CHECK_ID (-2) +#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) // tqExec typedef struct { @@ -91,6 +92,10 @@ typedef struct { STqExecHandle execHandle; // exec SRpcMsg* msg; tq_handle_status status; + + // for replay + SSDataBlock* block; + int64_t blockTime; } STqHandle; struct STQ { @@ -108,17 +113,13 @@ struct STQ { SStreamMeta* pStreamMeta; }; -typedef struct { - int32_t size; -} STqOffsetHead; - int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); void tqDestroyTqHandle(void* data); // tqRead int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); -int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset); +int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest); int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId); // tqExec diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index adf3abe4d9..2fda70bcc1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -82,6 +82,9 @@ void tqDestroyTqHandle(void* data) { taosMemoryFree(pData->msg); pData->msg = NULL; } + if (pData->block != NULL){ + blockDataDestroy(pData->block); + } } static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) { diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 11bb737225..5e67f3c3ac 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -37,11 +37,9 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { int32_t vgId = TD_VID(pStore->pTq->pVnode); int64_t code = 0; - - STqOffsetHead head = {0}; - + int32_t size = 0; while (1) { - if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) { + if ((code = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) { if (code == 0) { break; } else { @@ -49,7 +47,6 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { } } - int32_t size = htonl(head.size); void* pMemBuf = taosMemoryCalloc(1, size); if (pMemBuf == NULL) { tqError("vgId:%d failed to restore offset from file, since out of memory, malloc size:%d", vgId, size); @@ -175,11 +172,11 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { return -1; } - int32_t totLen = sizeof(STqOffsetHead) + bodyLen; + int32_t totLen = INT_BYTES + bodyLen; void* buf = taosMemoryCalloc(1, totLen); - void* abuf = POINTER_SHIFT(buf, sizeof(STqOffsetHead)); + void* abuf = POINTER_SHIFT(buf, INT_BYTES); - ((STqOffsetHead*)buf)->size = htonl(bodyLen); + *(int32_t*)buf = bodyLen; SEncoder encoder; tEncoderInit(&encoder, abuf, bodyLen); tEncodeSTqOffset(&encoder, pOffset); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 4d470ee5b6..65914cc70e 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -475,6 +475,10 @@ SSDataBlock* tqGetResultBlock (STqReader* pReader) { return pReader->pResBlock; } +int64_t tqGetResultBlockTime(STqReader *pReader){ + return pReader->lastTs; +} + bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { if (pReader->msg.msgStr == NULL) { return false; @@ -641,7 +645,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* int32_t sversion = pSubmitTbData->sver; int64_t suid = pSubmitTbData->suid; int64_t uid = pSubmitTbData->uid; - pReader->lastBlkUid = uid; + pReader->lastTs = pSubmitTbData->ctimeMs; pBlock->info.id.uid = uid; pBlock->info.version = pReader->msg.ver; @@ -783,9 +787,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas } int32_t sversion = pSubmitTbData->sver; - int64_t suid = pSubmitTbData->suid; int64_t uid = pSubmitTbData->uid; - pReader->lastBlkUid = uid; tDeleteSchemaWrapper(pReader->pSchemaWrapper); pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index cbe3ffee9e..705fb86fab 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -64,7 +64,23 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in return 0; } -int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { +int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res){ + uint64_t ts = 0; + qStreamSetOpen(task); + + tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId); + int32_t code = qExecTask(task, res, &ts); + if (code != TSDB_CODE_SUCCESS) { + tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code)); + terrno = code; + return -1; + } + + tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res); + return 0; +} + +int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) { const int32_t MAX_ROWS_TO_RETURN = 4096; int32_t vgId = TD_VID(pTq->pVnode); @@ -80,34 +96,63 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs while (1) { SSDataBlock* pDataBlock = NULL; - uint64_t ts = 0; - qStreamSetOpen(task); - - tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId); - code = qExecTask(task, &pDataBlock, &ts); - if (code != TSDB_CODE_SUCCESS) { - tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code)); - terrno = code; - return -1; - } - - tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, - pDataBlock); - // current scan should be stopped asap, since the rebalance occurs. - if (pDataBlock == NULL) { - break; - } - - code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); - if (code != TSDB_CODE_SUCCESS) { - tqError("vgId:%d, failed to add block to rsp msg", vgId); + code = getDataBlock(task, pHandle, vgId, &pDataBlock); + if (code != 0){ return code; } - pRsp->blockNum++; - totalRows += pDataBlock->info.rows; - if (totalRows >= MAX_ROWS_TO_RETURN) { + if(pRequest->enableReplay){ + if(IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL){ + blockDataDestroy(pHandle->block); + pHandle->block = NULL; + } + if(pHandle->block == NULL){ + if (pDataBlock == NULL) { + break; + } + STqOffsetVal offset = {0}; + qStreamExtractOffset(task, &offset); + pHandle->block = createDataBlock(); + copyDataBlock(pHandle->block, pDataBlock); + pHandle->blockTime = offset.ts; + code = getDataBlock(task, pHandle, vgId, &pDataBlock); + if (code != 0){ + return code; + } + } + + code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d, failed to add block to rsp msg", vgId); + return code; + } + + pRsp->blockNum++; + if (pDataBlock == NULL) { + break; + } + copyDataBlock(pHandle->block, pDataBlock); + + STqOffsetVal offset = {0}; + qStreamExtractOffset(task, &offset); + pRsp->sleepTime = offset.ts - pHandle->blockTime; + pHandle->blockTime = offset.ts; break; + }else{ + if (pDataBlock == NULL) { + break; + } + code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d, failed to add block to rsp msg", vgId); + return code; + } + + pRsp->blockNum++; + totalRows += pDataBlock->info.rows; + if (totalRows >= MAX_ROWS_TO_RETURN) { + break; + } } } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 04695c1f63..a4c3d395e3 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -15,8 +15,6 @@ #include "tq.h" -#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) - static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId); @@ -142,7 +140,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, tqInitDataRsp(&dataRsp, *pOffset); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); - int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); + int code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest); if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) { goto end; } diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index c72ecd4824..a6a2e128d8 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -131,6 +131,7 @@ void initTqAPI(SStoreTqReader* pTq) { pTq->tqGetResultBlock = tqGetResultBlock; pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut; + pTq->tqGetResultBlockTime = tqGetResultBlockTime; } void initStateStoreAPI(SStateStore* pStore) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 474128007a..913e246f63 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1862,6 +1862,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { // curVersion move to next tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion); + // use ts to pass time when replay, because ts not used if type is log + pTaskInfo->streamInfo.currentOffset.ts = pAPI->tqReaderFn.tqGetResultBlockTime(pInfo->tqReader); + if (hasResult) { qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 9832720994..dd51b99cd1 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -654,6 +654,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE, "Topic num out of range") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of range 100") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed value") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") diff --git a/utils/test/c/tmqOffset.c b/utils/test/c/tmqOffset.c index 7225cb87bd..9699e71f24 100644 --- a/utils/test/c/tmqOffset.c +++ b/utils/test/c/tmqOffset.c @@ -7,18 +7,14 @@ #include "tlog.h" #include "tmsg.h" -typedef struct { - int32_t size; -} STqOffsetHead; - int32_t tqOffsetRestoreFromFile(const char* fname) { TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); if (pFile != NULL) { - STqOffsetHead head = {0}; int32_t code; while (1) { - if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) { + int32_t size = 0; + if ((code = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) { if (code == 0) { break; } else { @@ -26,7 +22,6 @@ int32_t tqOffsetRestoreFromFile(const char* fname) { return -1; } } - int32_t size = htonl(head.size); void* memBuf = taosMemoryCalloc(1, size); if (memBuf == NULL) { printf("memBuf == NULL\n"); From 5f7b6f19baeac0a5d4d30895f80e11ece6246328 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 8 Oct 2023 19:05:59 +0800 Subject: [PATCH 02/11] feat:[TD-26056] add replay logic --- source/client/src/clientTmq.c | 1 + source/dnode/vnode/inc/vnode.h | 3 +- source/libs/executor/src/projectoperator.c | 3 +- tests/system-test/7-tmq/replay.py | 320 +++++++++++++++++++++ 4 files changed, 325 insertions(+), 2 deletions(-) create mode 100644 tests/system-test/7-tmq/replay.py diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 50b8eb1eca..252959ecd8 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1532,6 +1532,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl pReq->head.vgId = pVg->vgId; pReq->useSnapshot = tmq->useSnapshot; pReq->reqId = generateRequestId(); + pReq->enableReplay = tmq->replayEnable; } SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 122f6733fb..787b717015 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -216,7 +216,7 @@ typedef struct STqReader { SPackedData msg; SSubmitReq2 submit; int32_t nextBlk; - int64_t lastTs; + int64_t lastBlkUid; SWalReader *pWalReader; SMeta *pVnodeMeta; SHashObj *tbIdHash; @@ -226,6 +226,7 @@ typedef struct STqReader { int64_t cachedSchemaUid; SSchemaWrapper *pSchemaWrapper; SSDataBlock *pResBlock; + int64_t lastTs; } STqReader; STqReader *tqReaderOpen(SVnode *pVnode); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 00b246afad..227960ab22 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -110,7 +110,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder; - if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM || pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { pInfo->mergeDataBlocks = false; } else { if (!pProjPhyNode->ignoreGroupId) { @@ -330,6 +330,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { break; } + qDebug("project return %d", pProjectInfo->mergeDataBlocks); if (pProjectInfo->mergeDataBlocks) { if (pRes->info.rows > 0) { pFinalRes->info.id.groupId = 0; // clear groupId diff --git a/tests/system-test/7-tmq/replay.py b/tests/system-test/7-tmq/replay.py new file mode 100644 index 0000000000..7eee6743a7 --- /dev/null +++ b/tests/system-test/7-tmq/replay.py @@ -0,0 +1,320 @@ + +import taos +import sys +import time +import socket +import os +import threading +from enum import Enum + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + +class actionType(Enum): + CREATE_DATABASE = 0 + CREATE_STABLE = 1 + CREATE_CTABLE = 2 + INSERT_DATA = 3 + +class TDTestCase: + hostname = socket.gethostname() + #rpcDebugFlagVal = '143' + #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #print ("===================: ", updatecfgDict) + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def newcur(self,cfg,host,port): + user = "root" + password = "taosdata" + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + cur=con.cursor() + print(cur) + return cur + + def initConsumerTable(self,cdbName='cdb'): + tdLog.info("create consume database, and consume info table, and consume result table") + tdSql.query("create database if not exists %s vgroups 1 wal_retention_period 3600"%(cdbName)) + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) + + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + def initConsumerInfoTable(self,cdbName='cdb'): + tdLog.info("drop consumeinfo table") + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) + tdLog.info("consume info sql: %s"%sql) + tdSql.query(sql) + + def selectConsumeResult(self,expectRows,cdbName='cdb'): + resultList=[] + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == expectRows: + break + else: + time.sleep(5) + + for i in range(expectRows): + tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) + resultList.append(tdSql.getData(i , 3)) + + return resultList + + def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): + if valgrind == 1: + logFile = cfgPath + '/../log/valgrind-tmq.log' + shellCmd = 'nohup valgrind --log-file=' + logFile + shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' + + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1): + if dropFlag == 1: + tsql.execute("drop database if exists %s"%(dbName)) + + tsql.execute("create database if not exists %s vgroups %d replica %d wal_retention_period 3600"%(dbName, vgroups, replica)) + tdLog.debug("complete to create database %s"%(dbName)) + return + + def create_stable(self,tsql, dbName,stbName): + tsql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(dbName, stbName)) + tdLog.debug("complete to create %s.%s" %(dbName, stbName)) + return + + def create_ctables(self,tsql, dbName,stbName,ctbNum): + tsql.execute("use %s" %dbName) + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1) + if (i > 0) and (i%100 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) + return + + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + if startTs == 0: + t = time.time() + startTs = int(round(t * 1000)) + + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + rowsOfSql = 0 + for i in range(ctbNum): + sql += " %s_%d values "%(stbName,i) + for j in range(rowsPerTbl): + sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) + rowsOfSql += 1 + if ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + time.sleep(1) + rowsOfSql = 0 + if j < rowsPerTbl - 1: + sql = "insert into %s_%d values " %(stbName,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def prepareEnv(self, **parameterDict): + # create new connector for my thread + tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) + + if parameterDict["actionType"] == actionType.CREATE_DATABASE: + self.create_database(tsql, parameterDict["dbName"]) + elif parameterDict["actionType"] == actionType.CREATE_STABLE: + self.create_stable(tsql, parameterDict["dbName"], parameterDict["stbName"]) + elif parameterDict["actionType"] == actionType.CREATE_CTABLE: + self.create_ctables(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) + elif parameterDict["actionType"] == actionType.INSERT_DATA: + self.insert_data(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"], \ + parameterDict["rowsPerTbl"],parameterDict["batchNum"]) + else: + tdLog.exit("not support's action: ", parameterDict["actionType"]) + + return + + def tmqCase8(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 8: ") + + self.initConsumerTable() + + # create and start thread + parameterDict = {'cfg': '', \ + 'actionType': 0, \ + 'dbName': 'db8', \ + 'dropFlag': 1, \ + 'vgroups': 4, \ + 'replica': 1, \ + 'stbName': 'stb1', \ + 'ctbNum': 1, \ + 'rowsPerTbl': 10, \ + 'batchNum': 1, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + self.create_database(tdSql, parameterDict["dbName"]) + self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) + self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) + self.insert_data(tdSql,\ + parameterDict["dbName"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"],\ + parameterDict["batchNum"]) + + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + topicList = topicFromStb1 + ifcheckdata = 0 + ifManualCommit = 1 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest,\ + enable.replay:true' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume 0 processor") + pollDelay = 100 + showMsg = 1 + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + tdLog.info("start to check consume 0 result") + expectRows = 1 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if totalConsumeRows != 0: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0)) + tdLog.exit("tmq consume rows error!") + + # tdLog.info("start consume 1 processor") + # self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + # tdLog.sleep(2) + # + # tdLog.info("start one new thread to insert data") + # parameterDict['actionType'] = actionType.INSERT_DATA + # prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + # prepareEnvThread.start() + # prepareEnvThread.join() + # + # tdLog.info("start to check consume 0 and 1 result") + # expectRows = 2 + # resultList = self.selectConsumeResult(expectRows) + # totalConsumeRows = 0 + # for i in range(expectRows): + # totalConsumeRows += resultList[i] + # + # if totalConsumeRows != expectrowcnt: + # tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + # tdLog.exit("tmq consume rows error!") + # + # tdLog.info("start consume 2 processor") + # self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + # tdLog.sleep(2) + # + # tdLog.info("start one new thread to insert data") + # parameterDict['actionType'] = actionType.INSERT_DATA + # prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + # prepareEnvThread.start() + # prepareEnvThread.join() + # + # tdLog.info("start to check consume 0 and 1 and 2 result") + # expectRows = 3 + # resultList = self.selectConsumeResult(expectRows) + # totalConsumeRows = 0 + # for i in range(expectRows): + # totalConsumeRows += resultList[i] + # + # if totalConsumeRows != expectrowcnt*2: + # tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2)) + # tdLog.exit("tmq consume rows error!") + # + # tdSql.query("drop topic %s"%topicFromStb1) + + tdLog.printNoPrefix("======== test case 8 end ...... ") + + def run(self): + tdSql.prepare() + + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + + self.tmqCase8(cfgPath, buildPath) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) From 33045e63ae88bd4335ec203f6e6e22a619684498 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 9 Oct 2023 17:35:40 +0800 Subject: [PATCH 03/11] feat:[TD-26056] add replay logic --- source/client/src/clientTmq.c | 17 ++ source/dnode/vnode/src/tq/tqScan.c | 21 +- source/dnode/vnode/src/tq/tqUtil.c | 6 +- source/libs/executor/src/projectoperator.c | 1 - tests/parallel_test/cases.task | 1 + tests/system-test/7-tmq/replay.py | 33 +-- tests/system-test/7-tmq/tmq_replay.py | 39 +++ utils/test/c/CMakeLists.txt | 9 + utils/test/c/replay_test.c | 323 +++++++++++++++++++++ utils/test/c/tmqSim.c | 3 +- 10 files changed, 418 insertions(+), 35 deletions(-) create mode 100644 tests/system-test/7-tmq/tmq_replay.py create mode 100644 utils/test/c/replay_test.c diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 252959ecd8..e398441979 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -724,6 +724,17 @@ void tmqAssignAskEpTask(void* param, void* tmrId) { taosMemoryFree(param); } +void tmqReplayTask(void* param, void* tmrId) { + int64_t refId = *(int64_t*)param; + tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); + if(tmq == NULL) goto END; + + tsem_post(&tmq->rspSem); + taosReleaseRef(tmqMgmt.rsetId, refId); +END: + taosMemoryFree(param); +} + void tmqAssignDelayedCommitTask(void* param, void* tmrId) { int64_t refId = *(int64_t*)param; generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT); @@ -1144,6 +1155,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { req.autoCommit = tmq->autoCommit; req.autoCommitInterval = tmq->autoCommitInterval; req.resetOffsetCfg = tmq->resetOffsetCfg; + req.enableReplay = tmq->replayEnable; for (int32_t i = 0; i < sz; i++) { char* topic = taosArrayGetP(container, i); @@ -1823,6 +1835,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { if(tmq->replayEnable){ pVg->blockReceiveTs = taosGetTimestampMs(); pVg->blockSleepForReplay = pRsp->rsp.sleepTime; + if(pVg->blockSleepForReplay > 0){ + int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t)); + *pRefId1 = tmq->refId; + taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, pRefId1, tmqMgmt.timer); + } } tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 705fb86fab..01866ef893 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -112,8 +112,9 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* } STqOffsetVal offset = {0}; qStreamExtractOffset(task, &offset); - pHandle->block = createDataBlock(); - copyDataBlock(pHandle->block, pDataBlock); + pHandle->block = createOneDataBlock(pDataBlock, true); +// pHandle->block = createDataBlock(); +// copyDataBlock(pHandle->block, pDataBlock); pHandle->blockTime = offset.ts; code = getDataBlock(task, pHandle, vgId, &pDataBlock); if (code != 0){ @@ -129,14 +130,16 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pRsp->blockNum++; if (pDataBlock == NULL) { - break; - } - copyDataBlock(pHandle->block, pDataBlock); + blockDataDestroy(pHandle->block); + pHandle->block = NULL; + }else{ + copyDataBlock(pHandle->block, pDataBlock); - STqOffsetVal offset = {0}; - qStreamExtractOffset(task, &offset); - pRsp->sleepTime = offset.ts - pHandle->blockTime; - pHandle->blockTime = offset.ts; + STqOffsetVal offset = {0}; + qStreamExtractOffset(task, &offset); + pRsp->sleepTime = offset.ts - pHandle->blockTime; + pHandle->blockTime = offset.ts; + } break; }else{ if (pDataBlock == NULL) { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index a4c3d395e3..215f8d3cb2 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -40,11 +40,11 @@ void tqUpdateNodeStage(STQ* pTq) { tqDebug("vgId:%d update the meta stage to be:%"PRId64, pTq->pStreamMeta->vgId, pTq->pStreamMeta->stage); } -static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) { +static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset, bool withTbName) { pRsp->reqOffset = pOffset; pRsp->rspOffset = pOffset; - pRsp->withTbName = 1; + pRsp->withTbName = withTbName; pRsp->withSchema = 1; pRsp->blockData = taosArrayInit(0, sizeof(void*)); pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); @@ -177,7 +177,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, int32_t vgId = TD_VID(pTq->pVnode); SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, *offset); + tqInitTaosxRsp(&taosxRsp, *offset, pRequest->withTbName); if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 227960ab22..f60890ecca 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -330,7 +330,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { break; } - qDebug("project return %d", pProjectInfo->mergeDataBlocks); if (pProjectInfo->mergeDataBlocks) { if (pRes->info.rows > 0) { pFinalRes->info.id.groupId = 0; // clear groupId diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 36b8fded81..b5856fea63 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -159,6 +159,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py diff --git a/tests/system-test/7-tmq/replay.py b/tests/system-test/7-tmq/replay.py index 7eee6743a7..bbda8600fb 100644 --- a/tests/system-test/7-tmq/replay.py +++ b/tests/system-test/7-tmq/replay.py @@ -110,7 +110,7 @@ class TDTestCase: tdLog.info(shellCmd) os.system(shellCmd) - def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1): + def create_database(self,tsql, dbName,dropFlag=1,vgroups=1,replica=1): if dropFlag == 1: tsql.execute("drop database if exists %s"%(dbName)) @@ -149,21 +149,12 @@ class TDTestCase: t = time.time() startTs = int(round(t * 1000)) - #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 - for i in range(ctbNum): - sql += " %s_%d values "%(stbName,i) - for j in range(rowsPerTbl): - sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) - rowsOfSql += 1 - if ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)): - tsql.execute(sql) - time.sleep(1) - rowsOfSql = 0 - if j < rowsPerTbl - 1: - sql = "insert into %s_%d values " %(stbName,i) - else: - sql = "insert into " + for j in range(rowsPerTbl): + for i in range(ctbNum): + sql += " %s_%d values (%d, %d, 'tmqrow_%d') "%(stbName, i, startTs + j + i, j+i, j+i) + tsql.execute(sql) + time.sleep(1) + sql = "insert into " #end sql if sql != pre_insert: #print("insert sql:%s"%sql) @@ -199,10 +190,10 @@ class TDTestCase: 'actionType': 0, \ 'dbName': 'db8', \ 'dropFlag': 1, \ - 'vgroups': 4, \ + 'vgroups': 1, \ 'replica': 1, \ 'stbName': 'stb1', \ - 'ctbNum': 1, \ + 'ctbNum': 2, \ 'rowsPerTbl': 10, \ 'batchNum': 1, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 @@ -223,7 +214,7 @@ class TDTestCase: tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 - expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] * 2 topicList = topicFromStb1 ifcheckdata = 0 ifManualCommit = 1 @@ -247,8 +238,8 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows != 0: - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0)) + if totalConsumeRows != expectrowcnt: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") # tdLog.info("start consume 1 processor") diff --git a/tests/system-test/7-tmq/tmq_replay.py b/tests/system-test/7-tmq/tmq_replay.py new file mode 100644 index 0000000000..1e19d58516 --- /dev/null +++ b/tests/system-test/7-tmq/tmq_replay.py @@ -0,0 +1,39 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + + def run(self): + tdSql.prepare() + buildPath = tdCom.getBuildPath() + + cmdStr1 = '%s/build/bin/replay_test'%(buildPath) + tdLog.info(cmdStr1) + result = os.system(cmdStr1) + + if result != 0: + tdLog.exit("tmq_replay error!") + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index 343e3d8454..db5eb21ad8 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -9,6 +9,7 @@ add_executable(get_db_name_test get_db_name_test.c) add_executable(tmq_offset tmqOffset.c) add_executable(tmq_offset_test tmq_offset_test.c) add_executable(varbinary_test varbinary_test.c) +add_executable(replay_test replay_test.c) if(${TD_LINUX}) add_executable(tsz_test tsz_test.c) @@ -57,6 +58,14 @@ target_link_libraries( PUBLIC os ) +target_link_libraries( + replay_test + PUBLIC taos + PUBLIC util + PUBLIC common + PUBLIC os +) + target_link_libraries( write_raw_block_test PUBLIC taos diff --git a/utils/test/c/replay_test.c b/utils/test/c/replay_test.c new file mode 100644 index 0000000000..1fbaac0796 --- /dev/null +++ b/utils/test/c/replay_test.c @@ -0,0 +1,323 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include "taos.h" +#include "types.h" + +tmq_t* build_consumer() { + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "g1"); + tmq_conf_set(conf, "client.id", "c1"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "enable.replay", "true"); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + assert(tmq); + tmq_conf_destroy(conf); + return tmq; +} + +void test_vgroup_error(TAOS* pConn){ + TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists d1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists d1 vgroups 2 wal_retention_period 3600"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create topic t1 as select * from d1.s1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + tmq_list_t* topic_list = tmq_list_new(); + + tmq_list_append(topic_list, "t1"); + tmq_t* tmq = build_consumer(); + ASSERT(tmq_subscribe(tmq, topic_list) != 0); + tmq_list_destroy(topic_list); + tmq_consumer_close(tmq); +} + +void test_stable_db_error(TAOS* pConn){ + TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists d1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists d1 vgroups 1 wal_retention_period 3600"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create topic t1 as stable d1.s1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + tmq_list_t* topic_list = tmq_list_new(); + + tmq_list_append(topic_list, "t1"); + tmq_t* tmq = build_consumer(); + ASSERT(tmq_subscribe(tmq, topic_list) != 0); + tmq_list_destroy(topic_list); + tmq_consumer_close(tmq); + + pRes = taos_query(pConn, "drop topic if exists t1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create topic t1 as database d1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + topic_list = tmq_list_new(); + tmq_list_append(topic_list, "t1"); + tmq = build_consumer(); + ASSERT(tmq_subscribe(tmq, topic_list) != 0); + tmq_list_destroy(topic_list); + tmq_consumer_close(tmq); +} + +void insert_with_sleep(TAOS* pConn, int32_t* interval, int32_t len){ + for(int i = 0; i < len; i++){ + TAOS_RES* pRes = taos_query(pConn, "insert into d1.table1 (ts, c1) values (now, 1)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + taosMsleep(interval[i]); + } +} + +void insert_with_sleep_multi(TAOS* pConn, int32_t* interval, int32_t len){ + for(int i = 0; i < len; i++){ + TAOS_RES* pRes = taos_query(pConn, "insert into d1.table1 (ts, c1) values (now, 1) (now+1s, 2) d1.table2 (ts, c1) values (now, 1) (now+1s, 2)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + taosMsleep(interval[i]); + } +} + +void test_case1(TAOS* pConn, int32_t* interval, int32_t len){ + TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists d1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists d1 vgroups 2 wal_retention_period 3600"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table d1.table1 using d1.s1 tags(1)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + insert_with_sleep(pConn, interval, len); + + pRes = taos_query(pConn, "create topic t1 as select * from d1.table1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + tmq_list_t* topic_list = tmq_list_new(); + + tmq_list_append(topic_list, "t1"); + tmq_t* tmq = build_consumer(); + // 启动订阅 + tmq_subscribe(tmq, topic_list); + tmq_list_destroy(topic_list); + + int32_t timeout = 5000; + + int64_t t = 0; + int32_t totalRows = 0; + char buf[1024] = {0}; + while (1) { + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, timeout); + if (tmqmessage) { + if(t != 0){ + ASSERT(taosGetTimestampMs() - t >= interval[totalRows - 1]); + } + t = taosGetTimestampMs(); + + TAOS_ROW row = taos_fetch_row(tmqmessage); + if (row == NULL) { + break; + } + + TAOS_FIELD* fields = taos_fetch_fields(tmqmessage); + int32_t numOfFields = taos_field_count(tmqmessage); + const char* tbName = tmq_get_table_name(tmqmessage); + taos_print_row(buf, row, fields, numOfFields); + + printf("%lld tbname:%s, rows[%d]: %s\n", t, (tbName != NULL ? tbName : "null table"), totalRows, buf); + totalRows++; + taos_free_result(tmqmessage); + } else { + break; + } + } + + ASSERT(totalRows == len); + tmq_consumer_close(tmq); +} + +void test_case2(TAOS* pConn, int32_t* interval, int32_t len, tsem_t* sem){ + TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists d1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists d1 vgroups 1 wal_retention_period 3600"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table d1.table1 using d1.s1 tags(1)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table d1.table2 using d1.s1 tags(2)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + insert_with_sleep_multi(pConn, interval, len); + + pRes = taos_query(pConn, "create topic t1 as select * from d1.s1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + tmq_list_t* topic_list = tmq_list_new(); + + tmq_list_append(topic_list, "t1"); + tmq_t* tmq = build_consumer(); + // 启动订阅 + tmq_subscribe(tmq, topic_list); + tmq_list_destroy(topic_list); + + int32_t timeout = 5000; + + int64_t t = 0; + int32_t totalRows = 0; + char buf[1024] = {0}; + while (1) { + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, timeout); + if (tmqmessage) { + if(t != 0 && totalRows % 4 == 0){ + ASSERT(taosGetTimestampMs() - t >= interval[totalRows/4 - 1]); + } + t = taosGetTimestampMs(); + + while(1){ + TAOS_ROW row = taos_fetch_row(tmqmessage); + if (row == NULL) { + break; + } + + TAOS_FIELD* fields = taos_fetch_fields(tmqmessage); + int32_t numOfFields = taos_field_count(tmqmessage); + const char* tbName = tmq_get_table_name(tmqmessage); + taos_print_row(buf, row, fields, numOfFields); + + printf("%lld tbname:%s, rows[%d]: %s\n", t, (tbName != NULL ? tbName : "null table"), totalRows, buf); + totalRows++; + } + + taos_free_result(tmqmessage); + + if(totalRows == len * 4){ + taosSsleep(1); + tsem_post(sem); + } + } else { + break; + } + } + + ASSERT(totalRows == len * 4 + 1); + tmq_consumer_close(tmq); +} + +void* insertThreadFunc(void* param) { + tsem_t* sem = (tsem_t*)param; + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + + tsem_wait(sem); + + TAOS_RES* pRes = taos_query(pConn, "insert into d1.table1 (ts, c1) values (now, 11)"); + ASSERT(taos_errno(pRes) == 0); + printf("insert data again\n"); + taos_free_result(pRes); + taos_close(pConn); + return NULL; +} + +int main(int argc, char* argv[]) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + test_vgroup_error(pConn); + test_stable_db_error(pConn); + + tsem_t sem; + tsem_init(&sem, 0, 0); + TdThread thread; + TdThreadAttr thattr; + taosThreadAttrInit(&thattr); + taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); + + // pthread_create one thread to consume + taosThreadCreate(&thread, &thattr, insertThreadFunc, (void*)(&sem)); + + int32_t interval[5] = {1000, 200, 3000, 40, 500}; + test_case1(pConn, interval, sizeof(interval)/sizeof(int32_t)); + printf("test_case1 success\n"); + test_case2(pConn, interval, sizeof(interval)/sizeof(int32_t), &sem); + taos_close(pConn); + + taosThreadJoin(thread, NULL); + taosThreadClear(&thread); + tsem_destroy(&sem); + return 0; +} diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index 6b774b3eff..34f4a9d094 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -621,10 +621,11 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn taos_print_row(buf, row, fields, numOfFields); if (0 != g_stConfInfo.showRowFlag) { - taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf); + taosFprintfFile(g_fp, "%lld tbname:%s, rows[%d]: %s\n", taosGetTimestampMs(), (tbName != NULL ? tbName : "null table"), totalRows, buf); // if (0 != g_stConfInfo.saveRowFlag) { // saveConsumeContentToTbl(pInfo, buf); // } +// taosFsyncFile(g_fp); } totalRows++; From 3e2e924e9867b41691a6b547485f626d9b590278 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 9 Oct 2023 18:36:39 +0800 Subject: [PATCH 04/11] feat:[TD-26056] add replay logic --- source/dnode/mnode/impl/src/mndConsumer.c | 31 +++++++++++++++-------- source/dnode/vnode/src/tq/tqUtil.c | 6 ++--- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 05156e1427..c9ee66d3a0 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -126,31 +126,37 @@ void mndRebCntDec() { } static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) { - int32_t numOfTopics = taosArrayGetSize(pTopicList); + SMqTopicObj *pTopic = NULL; + int32_t code = 0; + int32_t numOfTopics = taosArrayGetSize(pTopicList); for (int32_t i = 0; i < numOfTopics; i++) { char *pOneTopic = taosArrayGetP(pTopicList, i); - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic); + pTopic = mndAcquireTopic(pMnode, pOneTopic); if (pTopic == NULL) { // terrno has been set by callee function - return -1; + code = -1; + goto FAILED; } if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) { - mndReleaseTopic(pMnode, pTopic); - return -1; + code = -1; + goto FAILED; } if(enableReplay){ if(pTopic->subType != TOPIC_SUB_TYPE__COLUMN){ - return TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT; + code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT; + goto FAILED; }else if(pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) { SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db); if (pDb == NULL) { - mndReleaseTopic(pMnode, pTopic); - return -1; + code = -1; + goto FAILED; } if (pDb->cfg.numOfVgroups != 1) { - return TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP; + mndReleaseDb(pMnode, pDb); + code = TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP; + goto FAILED; } mndReleaseDb(pMnode, pDb); } @@ -158,13 +164,16 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode * mndTransSetDbName(pTrans, pOneTopic, NULL); if(mndTransCheckConflict(pMnode, pTrans) != 0){ - mndReleaseTopic(pMnode, pTopic); - return -1; + code = -1; + goto FAILED; } mndReleaseTopic(pMnode, pTopic); } return 0; +FAILED: + mndReleaseTopic(pMnode, pTopic); + return code; } static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 215f8d3cb2..a4c3d395e3 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -40,11 +40,11 @@ void tqUpdateNodeStage(STQ* pTq) { tqDebug("vgId:%d update the meta stage to be:%"PRId64, pTq->pStreamMeta->vgId, pTq->pStreamMeta->stage); } -static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset, bool withTbName) { +static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) { pRsp->reqOffset = pOffset; pRsp->rspOffset = pOffset; - pRsp->withTbName = withTbName; + pRsp->withTbName = 1; pRsp->withSchema = 1; pRsp->blockData = taosArrayInit(0, sizeof(void*)); pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); @@ -177,7 +177,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, int32_t vgId = TD_VID(pTq->pVnode); SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, *offset, pRequest->withTbName); + tqInitTaosxRsp(&taosxRsp, *offset); if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { From d10915dce6a3cced5024c307834650f7ee19191e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 9 Oct 2023 19:03:13 +0800 Subject: [PATCH 05/11] feat:[TD-26056] add replay logic --- utils/test/c/replay_test.c | 89 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/utils/test/c/replay_test.c b/utils/test/c/replay_test.c index 1fbaac0796..df493ce2ac 100644 --- a/utils/test/c/replay_test.c +++ b/utils/test/c/replay_test.c @@ -281,6 +281,93 @@ void test_case2(TAOS* pConn, int32_t* interval, int32_t len, tsem_t* sem){ tmq_consumer_close(tmq); } +void test_case3(TAOS* pConn, int32_t* interval, int32_t len){ + TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists d1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists d1 vgroups 1 wal_retention_period 3600"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table d1.table1 using d1.s1 tags(1)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table d1.table2 using d1.s1 tags(2)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + insert_with_sleep_multi(pConn, interval, len); + + pRes = taos_query(pConn, "create topic t1 as select * from d1.s1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + tmq_list_t* topic_list = tmq_list_new(); + + tmq_list_append(topic_list, "t1"); + tmq_t* tmq = build_consumer(); + // 启动订阅 + tmq_subscribe(tmq, topic_list); + + int32_t timeout = 5000; + + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, timeout); + taos_free_result(tmqmessage); + + tmq_consumer_close(tmq); + + tmq = build_consumer(); + // 启动订阅 + tmq_subscribe(tmq, topic_list); + + int64_t t = 0; + int32_t totalRows = 0; + char buf[1024] = {0}; + while (1) { + tmqmessage = tmq_consumer_poll(tmq, timeout); + if (tmqmessage) { + if(t != 0 && totalRows % 4 == 0){ + ASSERT(taosGetTimestampMs() - t >= interval[totalRows/4 - 1]); + } + t = taosGetTimestampMs(); + + while(1){ + TAOS_ROW row = taos_fetch_row(tmqmessage); + if (row == NULL) { + break; + } + + TAOS_FIELD* fields = taos_fetch_fields(tmqmessage); + int32_t numOfFields = taos_field_count(tmqmessage); + const char* tbName = tmq_get_table_name(tmqmessage); + taos_print_row(buf, row, fields, numOfFields); + + printf("%lld tbname:%s, rows[%d]: %s\n", t, (tbName != NULL ? tbName : "null table"), totalRows, buf); + totalRows++; + } + + taos_free_result(tmqmessage); + } else { + break; + } + } + + ASSERT(totalRows == len * 4); + + tmq_consumer_close(tmq); + tmq_list_destroy(topic_list); +} + void* insertThreadFunc(void* param) { tsem_t* sem = (tsem_t*)param; TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -314,6 +401,8 @@ int main(int argc, char* argv[]) { test_case1(pConn, interval, sizeof(interval)/sizeof(int32_t)); printf("test_case1 success\n"); test_case2(pConn, interval, sizeof(interval)/sizeof(int32_t), &sem); + printf("test_case2 success\n"); + test_case3(pConn, interval, sizeof(interval)/sizeof(int32_t)); taos_close(pConn); taosThreadJoin(thread, NULL); From b12c734f096b348143a475969c138ee841b9c2a9 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 9 Oct 2023 21:14:32 +0800 Subject: [PATCH 06/11] feat:[TD-26056] add replay logic --- utils/test/c/tmqSim.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index 34f4a9d094..14e30008fe 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -621,7 +621,7 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn taos_print_row(buf, row, fields, numOfFields); if (0 != g_stConfInfo.showRowFlag) { - taosFprintfFile(g_fp, "%lld tbname:%s, rows[%d]: %s\n", taosGetTimestampMs(), (tbName != NULL ? tbName : "null table"), totalRows, buf); + taosFprintfFile(g_fp, "time:%" PRId64 " tbname:%s, rows[%d]: %s\n", taosGetTimestampMs(), (tbName != NULL ? tbName : "null table"), totalRows, buf); // if (0 != g_stConfInfo.saveRowFlag) { // saveConsumeContentToTbl(pInfo, buf); // } From d42e819d2d89d00a43e10c32ba0a92f64821bffd Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 10 Oct 2023 09:02:37 +0800 Subject: [PATCH 07/11] feat:[TD-26056] add replay logic --- utils/test/c/replay_test.c | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/utils/test/c/replay_test.c b/utils/test/c/replay_test.c index df493ce2ac..c105670733 100644 --- a/utils/test/c/replay_test.c +++ b/utils/test/c/replay_test.c @@ -184,10 +184,9 @@ void test_case1(TAOS* pConn, int32_t* interval, int32_t len){ TAOS_FIELD* fields = taos_fetch_fields(tmqmessage); int32_t numOfFields = taos_field_count(tmqmessage); - const char* tbName = tmq_get_table_name(tmqmessage); taos_print_row(buf, row, fields, numOfFields); - printf("%lld tbname:%s, rows[%d]: %s\n", t, (tbName != NULL ? tbName : "null table"), totalRows, buf); + printf("time:%" PRId64 " rows[%d]: %s\n", t, totalRows, buf); totalRows++; taos_free_result(tmqmessage); } else { @@ -259,10 +258,9 @@ void test_case2(TAOS* pConn, int32_t* interval, int32_t len, tsem_t* sem){ TAOS_FIELD* fields = taos_fetch_fields(tmqmessage); int32_t numOfFields = taos_field_count(tmqmessage); - const char* tbName = tmq_get_table_name(tmqmessage); taos_print_row(buf, row, fields, numOfFields); - printf("%lld tbname:%s, rows[%d]: %s\n", t, (tbName != NULL ? tbName : "null table"), totalRows, buf); + printf("time:%" PRId64 " rows[%d]: %s\n", t, totalRows, buf); totalRows++; } @@ -349,10 +347,9 @@ void test_case3(TAOS* pConn, int32_t* interval, int32_t len){ TAOS_FIELD* fields = taos_fetch_fields(tmqmessage); int32_t numOfFields = taos_field_count(tmqmessage); - const char* tbName = tmq_get_table_name(tmqmessage); taos_print_row(buf, row, fields, numOfFields); - printf("%lld tbname:%s, rows[%d]: %s\n", t, (tbName != NULL ? tbName : "null table"), totalRows, buf); + printf("time:%" PRId64 " rows[%d]: %s\n", t, totalRows, buf); totalRows++; } From 53c84a7245c952514c485e227f2827c96230df65 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 10 Oct 2023 11:02:01 +0800 Subject: [PATCH 08/11] doc:add doc for replay --- docs/en/07-develop/07-tmq.mdx | 18 ++++++++++++++++++ docs/zh/07-develop/07-tmq.mdx | 19 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx index f833dbf439..a511eaa4c3 100644 --- a/docs/en/07-develop/07-tmq.mdx +++ b/docs/en/07-develop/07-tmq.mdx @@ -514,6 +514,24 @@ var consumer = new ConsumerBuilder(cfg).Build(); A consumer group is automatically created when multiple consumers are configured with the same consumer group ID. +Data replay function description: +- Subscription adds replay function, which replays according to the time of data writing. + For example, writing three pieces of data at the following time. + ```sql + 2023/09/22 00:00:00.000 + 2023/09/22 00:00:05.000 + 2023/09/22 00:00:08.000 + ``` + After subscribing to the first data for 5 seconds, the second data is returned, and after obtaining the second data for 3 seconds, the third data is returned. +- Only column subscriptions support data replay. + - Replay needs to ensure an independent timeline + - If it is a sub table subscription or a normal table subscription, only one vnode has data, ensuring a timeline. + - If subscribing to a super table, it is necessary to ensure that the DB has only one vnode, otherwise an error will be reported (because the data subscribed to on multiple vnodes is not on the same timeline). +- Super table and database subscriptions do not support replay +- Add the enable.replay parameter. True indicates that the subscription replay function is enabled, while false indicates that the subscription replay function is not enabled by default. +- Replay does not support progress saving, so when the replay parameter enable, auto commit will automatically close. +- Due to the processing time required for data replay, there is an error of tens of milliseconds in the accuracy of replay. + ## Subscribe to a Topic A single consumer can subscribe to multiple topics. diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index 927d762829..fee0ec86c2 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -355,6 +355,7 @@ CREATE TOPIC topic_name [with meta] AS DATABASE db_name; | `enable.auto.commit` | boolean | 是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit | 默认值为 true | | `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 | | `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) |默认关闭 | +| `enable.replay` | boolean | 是否开启数据回放功能 |默认关闭 | 对于不同编程语言,其设置方式如下: @@ -515,6 +516,24 @@ var consumer = new ConsumerBuilder(cfg).Build(); 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 +数据回放功能说明: +- 订阅增加 replay 功能,按照数据写入的时间回放。 + 比如,如下时间写入三条数据 + ```sql + 2023/09/22 00:00:00.000 + 2023/09/22 00:00:05.000 + 2023/09/22 00:00:08.000 + ``` + 则订阅出第一条数据 5s 后返回第二条数据,获取第二条数据 3s 后返回第三条数据。 +- 仅列订阅支持数据回放 + - 回放需要保证独立时间线 + - 如果是子表订阅或者普通表订阅,只有一个vnode上有数据,保证是一个时间线 + - 如果超级表订阅,则需保证该 DB 只有一个vnode,否则报错(因为多个vnode上订阅出的数据不在一个时间线上) +- 超级表和库订阅不支持回放 +- 增加 enable.replay 参数,true表示开启订阅回放功能,false表示不开启订阅回放功能,默认不开启。 +- 回放不支持进度保存,所以回放参数 enable.replay = true 时,auto commit 自动关闭 +- 因为数据回放本身需要处理时间,所以回放的精度存在几十ms的误差 + ## 订阅 *topics* 一个 consumer 支持同时订阅多个 topic。 From 9113c3c3b6032e9d2d0c2d4c7e4592f80da2b672 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 10 Oct 2023 11:53:54 +0800 Subject: [PATCH 09/11] fix:rollback removed code --- docs/en/07-develop/07-tmq.mdx | 1 + source/dnode/vnode/src/tq/tqRead.c | 1 + 2 files changed, 2 insertions(+) diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx index a511eaa4c3..86b2498345 100644 --- a/docs/en/07-develop/07-tmq.mdx +++ b/docs/en/07-develop/07-tmq.mdx @@ -356,6 +356,7 @@ You configure the following parameters when creating a consumer: | `enable.auto.commit` | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself | Default value is true | | `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds | | `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages | default value: false +| `enable.replay` | boolean | Specify whether data replay function enabled or not |default value: false | The method of specifying these parameters depends on the language used: diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 65914cc70e..3b052a3edd 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -788,6 +788,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas int32_t sversion = pSubmitTbData->sver; int64_t uid = pSubmitTbData->uid; + pReader->lastBlkUid = uid; tDeleteSchemaWrapper(pReader->pSchemaWrapper); pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); From 440fa772828229b99d21851e067d827a30f91381 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 10 Oct 2023 16:55:36 +0800 Subject: [PATCH 10/11] fix:merge datablock if data in same wal version --- source/dnode/vnode/src/tq/tqRead.c | 82 ++++++++++-------------------- 1 file changed, 28 insertions(+), 54 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 3b052a3edd..1c2561b1a8 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -366,82 +366,56 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con // todo ignore the error in wal? bool tqNextBlockInWal(STqReader* pReader, const char* id) { SWalReader* pWalReader = pReader->pWalReader; + SSDataBlock* pDataBlock = NULL; uint64_t st = taosGetTimestampMs(); while (1) { - SArray* pBlockList = pReader->submit.aSubmitTbData; - if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) { - // try next message in wal file - // todo always retry to avoid read failure caused by wal file deletion - if (walNextValidMsg(pWalReader) < 0) { - return false; - } - - void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); - int64_t ver = pWalReader->pHead->head.version; - - SDecoder decoder = {0}; - tDecoderInit(&decoder, pBody, bodyLen); - - { - int32_t nSubmitTbData = taosArrayGetSize(pReader->submit.aSubmitTbData); - for (int32_t i = 0; i < nSubmitTbData; i++) { - SSubmitTbData* pData = taosArrayGet(pReader->submit.aSubmitTbData, i); - if (pData->pCreateTbReq != NULL) { - taosArrayDestroy(pData->pCreateTbReq->ctb.tagName); - taosMemoryFreeClear(pData->pCreateTbReq); - } - pData->aRowP = taosArrayDestroy(pData->aRowP); - } - pReader->submit.aSubmitTbData = taosArrayDestroy(pReader->submit.aSubmitTbData); - } - - if (tDecodeSubmitReq(&decoder, &pReader->submit) < 0) { - tDecoderClear(&decoder); - tqError("decode wal file error, msgLen:%d, ver:%" PRId64, bodyLen, ver); - return false; - } - - tDecoderClear(&decoder); - pReader->nextBlk = 0; + // try next message in wal file + if (walNextValidMsg(pWalReader) < 0) { + return false; } + void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); + int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); + int64_t ver = pWalReader->pHead->head.version; + + tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver); + pReader->nextBlk = 0; int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < numOfBlocks) { - tqTrace("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk, - numOfBlocks, pReader->msg.msgLen, pReader->msg.ver, pReader->nextBlk); + tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, + numOfBlocks, pReader->msg.msgLen, pReader->msg.ver); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); - if (pReader->tbIdHash == NULL) { - SSDataBlock* pRes = NULL; - int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL); - if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) { - return true; - } - } - - void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); - if (ret != NULL) { - tqTrace("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver); - + if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) { + tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid); SSDataBlock* pRes = NULL; int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL); if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) { - return true; + if(pDataBlock == NULL){ + pDataBlock = createOneDataBlock(pRes, true); + }else{ + blockDataMerge(pDataBlock, pRes); + } } } else { pReader->nextBlk += 1; tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid); } } - - qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id); tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); - pReader->msg.msgStr = NULL; + if(pDataBlock != NULL){ + blockDataCleanup(pReader->pResBlock); + copyDataBlock(pReader->pResBlock, pDataBlock); + blockDataDestroy(pDataBlock); + return true; + }else{ + qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id); + } + if(taosGetTimestampMs() - st > 1000){ return false; } From 7a23615b3fda7553c0ad17f6cee90f6b4d56920d Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 16 Oct 2023 17:53:48 +0800 Subject: [PATCH 11/11] fix:offset set to earliest clearly --- utils/test/c/replay_test.c | 1 + 1 file changed, 1 insertion(+) diff --git a/utils/test/c/replay_test.c b/utils/test/c/replay_test.c index c105670733..92f3fe5102 100644 --- a/utils/test/c/replay_test.c +++ b/utils/test/c/replay_test.c @@ -27,6 +27,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.replay", "true"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); assert(tmq);